incubator-sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomm...@apache.org
Subject svn commit: r1558791 - in /sling/trunk/contrib/extensions/replication/src: main/java/org/apache/sling/replication/agent/ main/java/org/apache/sling/replication/agent/impl/ main/java/org/apache/sling/replication/queue/impl/jobhandling/ main/java/org/apa...
Date Thu, 16 Jan 2014 13:47:56 GMT
Author: tommaso
Date: Thu Jan 16 13:47:55 2014
New Revision: 1558791

URL: http://svn.apache.org/r1558791
Log:
SLING-3309 - Marius Petria's patch for flushing of external systems applied

Added:
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageBuilder.java   (with props)
    sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.publish/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-httpcacheflush.json
    sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.publish/org.apache.sling.replication.transport.impl.HttpTransportHandlerFactory-httpcache.json
    sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config/org.apache.sling.replication.transport.impl.HttpTransportHandlerFactory-http.json
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerCustomHeadersTest.java   (with props)
Modified:
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentConfiguration.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackage.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackage.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandler.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandler.java
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/impl/ReplicationAgentResourceProviderTest.java
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueTest.java
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtilsTest.java
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageTest.java
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerTest.java

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentConfiguration.java?rev=1558791&r1=1558790&r2=1558791&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentConfiguration.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentConfiguration.java Thu Jan 16 13:47:55 2014
@@ -45,11 +45,12 @@ public class ReplicationAgentConfigurati
     public static final String AUTHENTICATION_PROPERTIES = "authentication.properties";
 
     public static final String QUEUE_DISTRIBUTION = "ReplicationQueueDistributionStrategy.target";
-
     public static final String RULES = "rules";
 
     public static final String ENABLED = "enabled";
 
+    public static final String USE_AGGREGATE_PATHS = "useAggregatePaths";
+
     public static final String[] COMPONENTS = { TRANSPORT, PACKAGING };
 
     private final boolean enabled;
@@ -72,6 +73,8 @@ public class ReplicationAgentConfigurati
 
     private final String[] rules;
 
+    private final boolean useAggregatePaths;
+
     private final Dictionary<String, Dictionary> componentConfiguration;
 
     public ReplicationAgentConfiguration(Dictionary<?, ?> dictionary, Dictionary<String, Dictionary> componentConfiguration) {
@@ -88,10 +91,12 @@ public class ReplicationAgentConfigurati
         String[] ap = PropertiesUtil.toStringArray(dictionary.get(AUTHENTICATION_PROPERTIES));
         this.authenticationProperties = ap != null ? ap : new String[0];
         this.rules = PropertiesUtil.toStringArray(dictionary.get(RULES), new String[0]);
+        this.useAggregatePaths = PropertiesUtil.toBoolean(dictionary.get(USE_AGGREGATE_PATHS), true);
 
         this.componentConfiguration = componentConfiguration;
     }
 
+
     public String[] getAuthenticationProperties() {
         return authenticationProperties;
     }
@@ -133,6 +138,7 @@ public class ReplicationAgentConfigurati
                 + QUEUEPROVIDER + "\":\"" + targetReplicationQueueProvider + "\", \""
                 + QUEUE_DISTRIBUTION + "\":\"" + targetReplicationQueueDistributionStrategy+ "\", \""
                 + TRANSPORT_AUTHENTICATION_FACTORY + "\":\"" + targetAuthenticationHandlerFactory + "\", \""
+                + USE_AGGREGATE_PATHS + "\":\"" + useAggregatePaths + "\", \""
                 + AUTHENTICATION_PROPERTIES + "\":\"" + Arrays.toString(authenticationProperties) + "\", \"";
 
         result += toComponentString();

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java?rev=1558791&r1=1558790&r2=1558791&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java Thu Jan 16 13:47:55 2014
@@ -18,10 +18,8 @@
  */
 package org.apache.sling.replication.agent.impl;
 
-import java.util.Dictionary;
-import java.util.Hashtable;
-import java.util.Map;
-import java.util.Random;
+import java.util.*;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.ConfigurationPolicy;
@@ -108,6 +106,9 @@ public class ReplicationAgentServiceFact
     @Property
     private static final String RULES = ReplicationAgentConfiguration.RULES;
 
+    @Property(boolValue = true)
+    private static final String USE_AGGREGATE_PATHS = ReplicationAgentConfiguration.USE_AGGREGATE_PATHS;
+
     @Property(name = TRANSPORT, value = DEFAULT_TRANSPORT)
     @Reference(name = "TransportHandler", target = DEFAULT_TRANSPORT, policy = ReferencePolicy.DYNAMIC)
     private TransportHandler transportHandler;
@@ -173,6 +174,10 @@ public class ReplicationAgentServiceFact
             String af = PropertiesUtil.toString(config.get(TRANSPORT_AUTHENTICATION_FACTORY), "");
             props.put(TRANSPORT_AUTHENTICATION_FACTORY, af);
 
+
+            boolean useAggregatePaths = PropertiesUtil.toBoolean(config.get(USE_AGGREGATE_PATHS), true);
+            props.put(USE_AGGREGATE_PATHS, useAggregatePaths);
+
             // check configuration is valid
             if (name == null || packageBuilder == null || queueProvider == null || queueDistributionStrategy == null) {
                 throw new AgentConfigurationException("configuration for this agent is not valid");
@@ -192,7 +197,8 @@ public class ReplicationAgentServiceFact
                         transportHandler, transportAuthenticationProvider, endpoint, packageBuilder, queueProvider, queueDistributionStrategy});
             }
 
-            ReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, rules, transportHandler, packageBuilder, queueProvider, transportAuthenticationProvider, queueDistributionStrategy);
+            ReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, rules, useAggregatePaths,
+                    transportHandler, packageBuilder, queueProvider, transportAuthenticationProvider, queueDistributionStrategy);
 
             // register agent service
             agentReg = context.registerService(ReplicationAgent.class.getName(), agent, props);

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java?rev=1558791&r1=1558790&r2=1558791&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java Thu Jan 16 13:47:55 2014
@@ -38,6 +38,9 @@ import org.apache.sling.replication.tran
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * Basic implementation of a {@link ReplicationAgent}
  */
@@ -61,8 +64,12 @@ public class SimpleReplicationAgent impl
 
     private final String[] rules;
 
+    private final boolean useAggregatePaths;
+
     public SimpleReplicationAgent(String name, String endpoint, String[] rules,
-                                  TransportHandler transportHandler, ReplicationPackageBuilder packageBuilder,
+                                  boolean useAggregatePaths,
+                                  TransportHandler transportHandler,
+                                  ReplicationPackageBuilder packageBuilder,
                                   ReplicationQueueProvider queueProvider,
                                   TransportAuthenticationProvider<?, ?> transportAuthenticationProvider,
                                   ReplicationQueueDistributionStrategy queueDistributionHandler) {
@@ -74,11 +81,29 @@ public class SimpleReplicationAgent impl
         this.queueProvider = queueProvider;
         this.transportAuthenticationProvider = transportAuthenticationProvider;
         this.queueDistributionStrategy = queueDistributionHandler;
+        this.useAggregatePaths = useAggregatePaths;
     }
 
     public ReplicationResponse execute(ReplicationRequest replicationRequest)
             throws AgentReplicationException {
 
+        // create packages from request
+        ReplicationPackage[] replicationPackages = buildPackages(replicationRequest);
+
+        ReplicationResponse replicationResponse = schedule(replicationPackages, false);
+
+        return replicationResponse;
+    }
+
+    public void send(ReplicationRequest replicationRequest) throws AgentReplicationException {
+        // create packages from request
+        ReplicationPackage[] replicationPackages = buildPackages(replicationRequest);
+
+        schedule(replicationPackages, true);
+    }
+
+
+    private ReplicationPackage buildPackage(ReplicationRequest replicationRequest) throws AgentReplicationException {
         // create package from request
         ReplicationPackage replicationPackage;
         try {
@@ -87,43 +112,78 @@ public class SimpleReplicationAgent impl
             throw new AgentReplicationException(e);
         }
 
-        ReplicationResponse replicationResponse = new ReplicationResponse();
+        return replicationPackage;
+    }
 
-        // send the replication package to the queue distribution handler
-        try {
-            ReplicationQueueItemState state = queueDistributionStrategy.add(replicationPackage,
-                    this, queueProvider);
-            if (state != null) {
-                replicationResponse.setStatus(state.getItemState().toString());
-                replicationResponse.setSuccessful(state.isSuccessful());
-            } else {
-                replicationResponse.setStatus(ReplicationQueueItemState.ItemState.ERROR.toString());
-            }
-        } catch (Exception e) {
-            if (log.isErrorEnabled()) {
-                log.error("an error happened during queue processing", e);
+    private ReplicationPackage[] buildPackages(ReplicationRequest replicationRequest) throws AgentReplicationException {
+
+        List<ReplicationPackage> packages = new ArrayList<ReplicationPackage>();
+
+        if(useAggregatePaths){
+            ReplicationPackage replicationPackage = buildPackage(replicationRequest);
+            packages.add(replicationPackage);
+        }
+        else {
+            for (String path : replicationRequest.getPaths()){
+                ReplicationPackage replicationPackage = buildPackage(new ReplicationRequest(replicationRequest.getTime(),
+                        replicationRequest.getAction(),
+                        new String[] { path }));
+
+                packages.add(replicationPackage);
             }
-            replicationResponse.setSuccessful(false);
+        }
+
+        return packages.toArray(new ReplicationPackage[0]);
+    }
+
+    // offer option throws an exception at first error
+    private ReplicationResponse schedule(ReplicationPackage[] packages, boolean offer) throws AgentReplicationException {
+        ReplicationResponse replicationResponse = new ReplicationResponse();
+
+        for (ReplicationPackage replicationPackage : packages){
+            ReplicationResponse currentReplicationResponse = schedule(replicationPackage, offer);
+
+            replicationResponse.setSuccessful(currentReplicationResponse.isSuccessful());
+            replicationResponse.setStatus(currentReplicationResponse.getStatus());
         }
 
         return replicationResponse;
     }
 
-    public void send(ReplicationRequest replicationRequest) throws AgentReplicationException {
-        // create package from request
-        ReplicationPackage replicationPackage;
-        try {
-            replicationPackage = packageBuilder.createPackage(replicationRequest);
-        } catch (ReplicationPackageBuildingException e) {
-            throw new AgentReplicationException(e);
+    private ReplicationResponse schedule(ReplicationPackage replicationPackage, boolean offer) throws AgentReplicationException {
+        ReplicationResponse replicationResponse = new ReplicationResponse();
+
+        if(offer){
+            try {
+                queueDistributionStrategy.offer(replicationPackage, this, queueProvider);
+            } catch (ReplicationQueueException e) {
+                replicationResponse.setSuccessful(false);
+                throw new AgentReplicationException(e);
+            }
         }
-        try {
-            queueDistributionStrategy.offer(replicationPackage, this, queueProvider);
-        } catch (ReplicationQueueException e) {
-            throw new AgentReplicationException(e);
+        else {
+            // send the replication package to the queue distribution handler
+            try {
+                ReplicationQueueItemState state = queueDistributionStrategy.add(replicationPackage,
+                        this, queueProvider);
+                if (state != null) {
+                    replicationResponse.setStatus(state.getItemState().toString());
+                    replicationResponse.setSuccessful(state.isSuccessful());
+                } else {
+                    replicationResponse.setStatus(ReplicationQueueItemState.ItemState.ERROR.toString());
+                    replicationResponse.setSuccessful(false);
+                }
+            } catch (Exception e) {
+                if (log.isErrorEnabled()) {
+                    log.error("an error happened during queue processing", e);
+                }
+                replicationResponse.setSuccessful(false);
+            }
         }
+        return replicationResponse;
     }
 
+
     public boolean process(ReplicationPackage item) throws AgentReplicationException {
         try {
             if (transportHandler != null || (endpoint != null && endpoint.length() > 0)) {

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java?rev=1558791&r1=1558790&r2=1558791&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java Thu Jan 16 13:47:55 2014
@@ -110,7 +110,7 @@ public class JobHandlingUtils {
                 return (Long) job.getProperty(LENGTH);
             }
 
-            public InputStream getInputStream() throws IOException {
+            public InputStream createInputStream() throws IOException {
                 return IOUtils.toInputStream(String.valueOf(job.getProperty(BIN)));
 
                 // workaround to make void package work while we get SLING-3140 to be released
@@ -139,7 +139,7 @@ public class JobHandlingUtils {
         properties.put(PATHS, replicationPackage.getPaths());
         properties.put(LENGTH, replicationPackage.getLength());
         properties.put(ACTION, replicationPackage.getAction());
-        properties.put(BIN, IOUtils.toString(replicationPackage.getInputStream()));
+        properties.put(BIN, IOUtils.toString(replicationPackage.createInputStream()));
         properties.put(TYPE, replicationPackage.getType());
         return properties;
     }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackage.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackage.java?rev=1558791&r1=1558790&r2=1558791&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackage.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackage.java Thu Jan 16 13:47:55 2014
@@ -52,11 +52,12 @@ public interface ReplicationPackage exte
     String getType();
 
     /**
-     * get package stream
+     * creates a package stream.
+     * a new stream is created for each call and it is the caller's obligation to close the stream.
      * @return an {@link InputStream}
      * @throws IOException
      */
-    InputStream getInputStream() throws IOException;
+    InputStream createInputStream() throws IOException;
 
     /**
      * get package stream length

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java?rev=1558791&r1=1558790&r2=1558791&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java Thu Jan 16 13:47:55 2014
@@ -39,23 +39,36 @@ public class VoidReplicationPackage impl
 
     private final String id;
 
+    private final String action;
+
     public VoidReplicationPackage(ReplicationRequest request, String type) {
         this.type = type;
         this.paths = request.getPaths();
-        this.id = ReplicationActionType.DELETE.toString() + ':' + Arrays.toString(request.getPaths()) + ':' + request.getTime();
+        this.action = request.getAction().toString();
+        this.id = request.getAction().toString() + ':' + Arrays.toString(request.getPaths()) + ':' + request.getTime();
     }
 
     public static VoidReplicationPackage fromStream(InputStream stream) throws IOException {
         VoidReplicationPackage replicationPackage = null;
         String streamString = IOUtils.toString(stream);
+
         int beginIndex = streamString.indexOf(':');
         int endIndex = streamString.lastIndexOf(':');
-        if (beginIndex >= 0 && endIndex > beginIndex && streamString.startsWith(ReplicationActionType.DELETE.toString())) {
-            String pathsArrayString = Text.unescape(streamString.substring(beginIndex + 1, endIndex - 1));
-            String[] paths = pathsArrayString.replaceAll("\\[", "").replaceAll("\\]", "").split(", ");
-            ReplicationRequest request = new ReplicationRequest(Long.valueOf(streamString.substring(streamString.lastIndexOf(':') + 1)),
-                    ReplicationActionType.DELETE, paths);
-            replicationPackage = new VoidReplicationPackage(request, "VOID");
+        if (beginIndex >= 0 && endIndex > beginIndex){
+            String actionString = streamString.substring(0, beginIndex);
+            String pathsString = streamString.substring(beginIndex+1, endIndex);
+            String timeString =  streamString.substring(endIndex + 1);
+
+            ReplicationActionType replicationActionType = ReplicationActionType.fromName(actionString);
+
+            if(replicationActionType != null){
+                pathsString = Text.unescape(pathsString);
+                String[] paths = pathsString.replaceAll("\\[", "").replaceAll("\\]", "").split(", ");
+
+                ReplicationRequest request = new ReplicationRequest(Long.valueOf(timeString),
+                        replicationActionType, paths);
+                replicationPackage = new VoidReplicationPackage(request, "VOID");
+            }
         }
         return replicationPackage;
     }
@@ -75,7 +88,7 @@ public class VoidReplicationPackage impl
         return id.getBytes().length;
     }
 
-    public InputStream getInputStream() throws IOException {
+    public InputStream createInputStream() throws IOException {
         return new ByteArrayInputStream(id.getBytes());
     }
 
@@ -84,7 +97,6 @@ public class VoidReplicationPackage impl
     }
 
     public String getAction() {
-        return ReplicationActionType.DELETE.toString();
+        return action;
     }
-
 }

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageBuilder.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageBuilder.java?rev=1558791&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageBuilder.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageBuilder.java Thu Jan 16 13:47:55 2014
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.serialization.impl;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.replication.communication.ReplicationRequest;
+import org.apache.sling.replication.serialization.ReplicationPackage;
+import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
+import org.apache.sling.replication.serialization.ReplicationPackageBuildingException;
+import org.apache.sling.replication.serialization.ReplicationPackageReadingException;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+@Component(metatype = false)
+@Service(value = ReplicationPackageBuilder.class)
+@Property(name = "name", value = VoidReplicationPackageBuilder.NAME)
+public class VoidReplicationPackageBuilder implements ReplicationPackageBuilder {
+    public static final String NAME = "void";
+
+    public ReplicationPackage createPackage(ReplicationRequest request) throws ReplicationPackageBuildingException {
+        return new VoidReplicationPackage(request, "VOID");
+    }
+
+    public ReplicationPackage readPackage(InputStream stream, boolean install) throws ReplicationPackageReadingException {
+        try {
+            return VoidReplicationPackage.fromStream(stream);
+        } catch (Exception e) {
+            throw new ReplicationPackageReadingException(e);
+        }
+    }
+
+    public ReplicationPackage getPackage(String id) {
+        try {
+            return VoidReplicationPackage.fromStream(new ByteArrayInputStream(id.getBytes()));
+        }
+        catch (IOException ex){
+            return null;
+        }
+    }
+}

Propchange: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageBuilder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackage.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackage.java?rev=1558791&r1=1558790&r2=1558791&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackage.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackage.java Thu Jan 16 13:47:55 2014
@@ -64,7 +64,7 @@ public class FileVaultReplicationPackage
         return paths;
     }
 
-    public InputStream getInputStream() throws IOException {
+    public InputStream createInputStream() throws IOException {
         return new FileInputStream(pkg.getFile());
     }
 

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java?rev=1558791&r1=1558790&r2=1558791&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java Thu Jan 16 13:47:55 2014
@@ -19,6 +19,7 @@
 package org.apache.sling.replication.servlet;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Enumeration;
 import java.util.List;
@@ -137,8 +138,16 @@ public class ReplicationAgentServlet ext
                 // get first item
                 ReplicationPackage head = queue.getHead();
                 if (head != null) {
-                    int bytesCopied = IOUtils.copy(head.getInputStream(),
-                            response.getOutputStream());
+                    InputStream inputStream = null;
+                    int bytesCopied = -1;
+                    try {
+                        inputStream = head.createInputStream();
+                        bytesCopied = IOUtils.copy(inputStream, response.getOutputStream());
+                    }
+                    finally {
+                        IOUtils.closeQuietly(inputStream);
+                    }
+
                     response.setHeader(ReplicationHeader.TYPE.toString(), head.getType());
                     if (log.isInfoEnabled()) {
                         log.info("{} bytes written into the response", bytesCopied);

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandler.java?rev=1558791&r1=1558790&r2=1558791&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandler.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandler.java Thu Jan 16 13:47:55 2014
@@ -18,11 +18,12 @@
  */
 package org.apache.sling.replication.transport.impl;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.util.Arrays;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Property;
-import org.apache.felix.scr.annotations.Service;
+import java.io.InputStream;
+import java.util.*;
+
+import org.apache.commons.io.IOUtils;
 import org.apache.http.client.fluent.Content;
 import org.apache.http.client.fluent.Executor;
 import org.apache.http.client.fluent.Request;
@@ -41,14 +42,32 @@ import org.slf4j.LoggerFactory;
 /**
  * basic HTTP POST {@link TransportHandler}
  */
-@Component(metatype = false)
-@Service(value = TransportHandler.class)
-@Property(name = "name", value = HttpTransportHandler.NAME)
 public class HttpTransportHandler implements TransportHandler {
 
     public static final String NAME = "http";
 
-    private final Logger log = LoggerFactory.getLogger(getClass());
+    private static final String PATH_VARIABLE_NAME = "{path}";
+
+    private static final Logger log = LoggerFactory.getLogger(HttpTransportHandler.class);
+
+    private final boolean useCustomHeaders;
+
+    private final String[] customHeaders;
+
+    private final boolean useCustomBody;
+
+    private final String customBody;
+
+    public HttpTransportHandler(boolean useCustomHeaders, String[] customHeaders, boolean useCustomBody, String customBody) {
+        this.useCustomHeaders = useCustomHeaders;
+        this.customHeaders = customHeaders;
+        this.useCustomBody = useCustomBody;
+        this.customBody = customBody;
+    }
+
+    public HttpTransportHandler(){
+        this(false, new String[0], false, "");
+    }
 
     @SuppressWarnings("unchecked")
     public void transport(ReplicationPackage replicationPackage,
@@ -67,31 +86,112 @@ public class HttpTransportHandler implem
             executor = ((TransportAuthenticationProvider<Executor, Executor>) transportAuthenticationProvider)
                     .authenticate(executor, context);
 
-            String[] paths = replicationPackage.getPaths();
-            String type = replicationPackage.getType();
-            String pathsString = Arrays.toString(paths);
-            Request req = Request.Post(replicationEndpoint.getUri()).useExpectContinue()
-                    .addHeader(ReplicationHeader.TYPE.toString(), type);
-            if (replicationPackage.getInputStream() != null) {
-                req = req.bodyStream(replicationPackage.getInputStream(),
-                        ContentType.APPLICATION_OCTET_STREAM);
-            }
-            Response response = executor.execute(req);
-            if (response != null) {
-                Content content = response.returnContent();
-                if (log.isInfoEnabled()) {
-                    log.info("Replication content of type {} for {} delivered: {}", new Object[]{
-                            type, pathsString, content});
+            deliverPackage(executor, replicationPackage, replicationEndpoint);
+
+        } catch (Exception e) {
+            throw new ReplicationTransportException(e);
+        }
+    }
+
+    public static String[] getCustomizedHeaders(String[] additionalHeaders, String action, String[] paths){
+        List<String> headers = new ArrayList<String>();
+
+        for(String additionalHeader : additionalHeaders){
+            int idx = additionalHeader.indexOf("->");
+
+            if(idx < 0){
+                headers.add(additionalHeader);
+            } else {
+                String actionSelector = additionalHeader.substring(0, idx).trim();
+                String header = additionalHeader.substring(idx+2).trim();
+
+                if(actionSelector.equalsIgnoreCase(action) || actionSelector.equals("*")){
+                    headers.add(header);
                 }
             }
+        }
+
+
+
+        StringBuilder sb = new StringBuilder();
+
+        if(paths != null && paths.length > 0) {
+            sb.append(paths[0]);
+            for(int i=1; i < paths.length; i++){
+                sb.append(", " + paths[i]);
+            }
+        }
+
+        String path = sb.toString();
+
+        List<String> boundHeaders = new ArrayList<String>();
+
+        for(String header : headers){
+            boundHeaders.add(header.replace(PATH_VARIABLE_NAME, path));
+        }
+
+        return boundHeaders.toArray(new String[0]);
+    }
+
+    private void deliverPackage(Executor executor, ReplicationPackage replicationPackage,
+                                ReplicationEndpoint replicationEndpoint) throws IOException{
+        String type = replicationPackage.getType();
+
+
+        Request req = Request.Post(replicationEndpoint.getUri()).useExpectContinue();
+
+        if(useCustomHeaders){
+            String[] customizedHeaders = getCustomizedHeaders(customHeaders, replicationPackage.getAction(), replicationPackage.getPaths());
+            for(String header : customizedHeaders){
+                addHeader(req, header);
+            }
+        }
+        else {
+            req.addHeader(ReplicationHeader.TYPE.toString(), type);
+        }
+
+        InputStream inputStream = null;
+        Response response = null;
+        try{
+            if (useCustomBody) {
+                String body = customBody == null ? "" : customBody;
+                inputStream = new ByteArrayInputStream(body.getBytes());
+            }
             else {
-                throw new IOException("response is empty");
+                inputStream = replicationPackage.createInputStream();
             }
-        } catch (Exception e) {
-            throw new ReplicationTransportException(e);
+
+            if(inputStream != null) {
+                req = req.bodyStream(inputStream, ContentType.APPLICATION_OCTET_STREAM);
+            }
+
+            response = executor.execute(req);
+        }
+        finally {
+            IOUtils.closeQuietly(inputStream);
+        }
+
+        if (response != null) {
+            Content content = response.returnContent();
+            if (log.isInfoEnabled()) {
+                log.info("Replication content of type {} for {} delivered: {}", new Object[]{
+                        type, Arrays.toString(replicationPackage.getPaths()), content});
+            }
+        }
+        else {
+            throw new IOException("response is empty");
         }
     }
 
+    private static void addHeader(Request req, String header){
+        int idx = header.indexOf(":");
+        if(idx < 0) return;
+        String headerName = header.substring(0, idx).trim();
+        String headerValue = header.substring(idx+1).trim();
+        req.addHeader(headerName, headerValue);
+    }
+
+
     public boolean supportsAuthenticationProvider(TransportAuthenticationProvider<?, ?> transportAuthenticationProvider) {
         return transportAuthenticationProvider.canAuthenticate(Executor.class);
     }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandler.java?rev=1558791&r1=1558790&r2=1558791&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandler.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandler.java Thu Jan 16 13:47:55 2014
@@ -18,6 +18,7 @@
  */
 package org.apache.sling.replication.transport.impl;
 
+import java.io.InputStream;
 import java.net.URI;
 import java.util.Dictionary;
 import java.util.Properties;
@@ -25,6 +26,7 @@ import javax.jcr.Node;
 import javax.jcr.Session;
 import javax.jcr.nodetype.NodeType;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
@@ -83,8 +85,15 @@ public class RepositoryTransportHandler 
                             NodeType.NT_FILE);
                     Node contentNode = addedNode.addNode(JcrConstants.JCR_CONTENT, NodeType.NT_RESOURCE);
                     if (contentNode != null) {
-                        contentNode.setProperty(JcrConstants.JCR_DATA, session.getValueFactory().createBinary(replicationPackage.getInputStream()));
-                        session.save();
+                        InputStream inputStream = null;
+                        try {
+                            inputStream = replicationPackage.createInputStream();
+                            contentNode.setProperty(JcrConstants.JCR_DATA, session.getValueFactory().createBinary(inputStream));
+                            session.save();
+                        }
+                        finally {
+                            IOUtils.closeQuietly(inputStream);
+                        }
                     }
                     if (log.isInfoEnabled()) {
                         log.info("package {} delivered to the repository as node {} ",

Added: sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.publish/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-httpcacheflush.json
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.publish/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-httpcacheflush.json?rev=1558791&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.publish/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-httpcacheflush.json (added)
+++ sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.publish/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-httpcacheflush.json Thu Jan 16 13:47:55 2014
@@ -0,0 +1,13 @@
+{
+    "jcr:primaryType" : "sling:OsgiConfig",
+    "name" : "httpcacheflush",
+    "endpoint" : "http://localhost:8000/invalidatecache",
+    "TransportHandler.target" : "(name=httpcache)",
+    "useAggregatePaths" : false,
+    "ReplicationPackageBuilder.target" : "(name=void)",
+    "ReplicationQueueProvider.target" : "(name=simple)",
+    "ReplicationQueueDistributionStrategy.target" : "(name=single)",
+    "TransportAuthenticationProviderFactory.target" : "(name=nop)",
+    "rules" : ["trigger on path: /content/usergenerated"],
+    "enabled" : false
+}

Added: sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.publish/org.apache.sling.replication.transport.impl.HttpTransportHandlerFactory-httpcache.json
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.publish/org.apache.sling.replication.transport.impl.HttpTransportHandlerFactory-httpcache.json?rev=1558791&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.publish/org.apache.sling.replication.transport.impl.HttpTransportHandlerFactory-httpcache.json (added)
+++ sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.publish/org.apache.sling.replication.transport.impl.HttpTransportHandlerFactory-httpcache.json Thu Jan 16 13:47:55 2014
@@ -0,0 +1,11 @@
+{
+    "jcr:primaryType" : "sling:OsgiConfig",
+    "name" : "httpcache",
+    "useCustomHeaders" : true,
+    "customHeaders" : [
+            "Path: {path}",
+            "add -> Action : REFRESH",
+            "delete -> Change : REMOVE" ],
+    "useCustomBody": true,
+    "customBody" : ""
+}
\ No newline at end of file

Added: sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config/org.apache.sling.replication.transport.impl.HttpTransportHandlerFactory-http.json
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config/org.apache.sling.replication.transport.impl.HttpTransportHandlerFactory-http.json?rev=1558791&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config/org.apache.sling.replication.transport.impl.HttpTransportHandlerFactory-http.json (added)
+++ sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config/org.apache.sling.replication.transport.impl.HttpTransportHandlerFactory-http.json Thu Jan 16 13:47:55 2014
@@ -0,0 +1,4 @@
+{
+    "jcr:primaryType" : "sling:OsgiConfig",
+    "name" : "http"
+}
\ No newline at end of file

Modified: sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/impl/ReplicationAgentResourceProviderTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/impl/ReplicationAgentResourceProviderTest.java?rev=1558791&r1=1558790&r2=1558791&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/impl/ReplicationAgentResourceProviderTest.java (original)
+++ sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/impl/ReplicationAgentResourceProviderTest.java Thu Jan 16 13:47:55 2014
@@ -107,7 +107,7 @@ public class ReplicationAgentResourcePro
         String filter = "(name=" + path + ")";
         when(context.getServiceReferences(ReplicationAgent.class.getName(), filter)).thenReturn(
                         agentServiceReferences);
-        SimpleReplicationAgent replicationAgent = new SimpleReplicationAgent(path, null, null,
+        SimpleReplicationAgent replicationAgent = new SimpleReplicationAgent(path, null, null, true,
                         null, null, null, null, null);
         when(context.getService(serviceReference)).thenReturn(replicationAgent);
         return replicationAgent;

Modified: sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java?rev=1558791&r1=1558790&r2=1558791&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java (original)
+++ sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java Thu Jan 16 13:47:55 2014
@@ -36,6 +36,7 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -53,7 +54,7 @@ public class SimpleReplicationAgentTest 
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         TransportAuthenticationProvider transportAuthenticationProvider = mock(TransportAuthenticationProvider.class);
         ReplicationQueueDistributionStrategy distributionHandler = mock(ReplicationQueueDistributionStrategy.class);
-        SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, new String[0],
+        SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, new String[0], true,
                         transportHandler, packageBuilder, queueProvider, transportAuthenticationProvider, distributionHandler);
         ReplicationPackage item = mock(ReplicationPackage.class);
         assertTrue(agent.process(item));
@@ -68,7 +69,7 @@ public class SimpleReplicationAgentTest 
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         TransportAuthenticationProvider transportAuthenticationProvider = mock(TransportAuthenticationProvider.class);
         ReplicationQueueDistributionStrategy distributionHandler = mock(ReplicationQueueDistributionStrategy.class);
-        SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, new String[0],
+        SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, new String[0], true,
                         transportHandler, packageBuilder, queueProvider, transportAuthenticationProvider, distributionHandler);
         ReplicationRequest request = new ReplicationRequest(System.nanoTime(),
                         ReplicationActionType.ADD, "/");
@@ -92,7 +93,7 @@ public class SimpleReplicationAgentTest 
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         TransportAuthenticationProvider transportAuthenticationProvider = mock(TransportAuthenticationProvider.class);
         ReplicationQueueDistributionStrategy distributionHandler = mock(ReplicationQueueDistributionStrategy.class);
-        SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, new String[0],
+        SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, new String[0], true,
                 transportHandler, packageBuilder, queueProvider, transportAuthenticationProvider, distributionHandler);
         ReplicationRequest request = new ReplicationRequest(System.nanoTime(),
                 ReplicationActionType.ADD, "/");
@@ -100,7 +101,7 @@ public class SimpleReplicationAgentTest 
         ReplicationQueueItemState state = new ReplicationQueueItemState();
         state.setItemState(ReplicationQueueItemState.ItemState.SUCCEEDED);
         when(distributionHandler.add(replicationPackage, agent, queueProvider)).thenReturn(state);
-        when(packageBuilder.createPackage(request)).thenReturn(replicationPackage);
+        when(packageBuilder.createPackage(any(ReplicationRequest.class))).thenReturn(replicationPackage);
         when(queueProvider.getQueue(agent, replicationPackage)).thenReturn(
                 new SimpleReplicationQueue(agent, "name"));
         when(queueProvider.getDefaultQueue(agent)).thenReturn(
@@ -119,7 +120,7 @@ public class SimpleReplicationAgentTest 
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         TransportAuthenticationProvider transportAuthenticationProvider = mock(TransportAuthenticationProvider.class);
         ReplicationQueueDistributionStrategy distributionHandler = mock(ReplicationQueueDistributionStrategy.class);
-        SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, new String[0],
+        SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, new String[0], true,
                 transportHandler, packageBuilder, queueProvider, transportAuthenticationProvider, distributionHandler);
         ReplicationRequest request = new ReplicationRequest(System.nanoTime(),
                 ReplicationActionType.ADD, "/");
@@ -141,7 +142,7 @@ public class SimpleReplicationAgentTest 
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         TransportAuthenticationProvider transportAuthenticationProvider = mock(TransportAuthenticationProvider.class);
         ReplicationQueueDistributionStrategy distributionHandler = mock(ReplicationQueueDistributionStrategy.class);
-        SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, new String[0],
+        SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, new String[0], true,
                 transportHandler, packageBuilder, queueProvider, transportAuthenticationProvider, distributionHandler);
         ReplicationQueue queue = mock(ReplicationQueue.class);
         when(queueProvider.getDefaultQueue(agent)).thenReturn(queue);
@@ -157,7 +158,7 @@ public class SimpleReplicationAgentTest 
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         TransportAuthenticationProvider transportAuthenticationProvider = mock(TransportAuthenticationProvider.class);
         ReplicationQueueDistributionStrategy distributionHandler = mock(ReplicationQueueDistributionStrategy.class);
-        SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, new String[0],
+        SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, new String[0], true,
                 transportHandler, packageBuilder, queueProvider, transportAuthenticationProvider, distributionHandler);
         ReplicationQueue queue = mock(ReplicationQueue.class);
         when(queueProvider.getQueue(agent, "priority")).thenReturn(queue);
@@ -173,7 +174,7 @@ public class SimpleReplicationAgentTest 
         ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class);
         TransportAuthenticationProvider transportAuthenticationProvider = mock(TransportAuthenticationProvider.class);
         ReplicationQueueDistributionStrategy distributionHandler = mock(ReplicationQueueDistributionStrategy.class);
-        SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, new String[0],
+        SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, new String[0], true,
                 transportHandler, packageBuilder, queueProvider, transportAuthenticationProvider, distributionHandler);
         ReplicationQueue queue = mock(ReplicationQueue.class);
         when(queueProvider.getQueue(agent, "priority")).thenReturn(queue);

Modified: sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueTest.java?rev=1558791&r1=1558790&r2=1558791&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueTest.java (original)
+++ sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueTest.java Thu Jan 16 13:47:55 2014
@@ -56,7 +56,7 @@ public class JobHandlingReplicationQueue
         ReplicationQueue queue = new JobHandlingReplicationQueue("aname", topic, jobManager);
         ReplicationPackage pkg = mock(ReplicationPackage.class);
         InputStream stream = new ByteArrayInputStream("rep".getBytes());
-        when(pkg.getInputStream()).thenReturn(stream);
+        when(pkg.createInputStream()).thenReturn(stream);
         assertTrue(queue.add(pkg));
     }
 
@@ -72,7 +72,7 @@ public class JobHandlingReplicationQueue
         ReplicationQueue queue = new JobHandlingReplicationQueue("aname", topic, jobManager);
         ReplicationPackage pkg = mock(ReplicationPackage.class);
         InputStream stream = new ByteArrayInputStream("rep".getBytes());
-        when(pkg.getInputStream()).thenReturn(stream);
+        when(pkg.createInputStream()).thenReturn(stream);
         assertTrue(queue.add(pkg));
         ReplicationQueueItemState status = queue.getStatus(pkg);
         assertNotNull(status);

Modified: sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtilsTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtilsTest.java?rev=1558791&r1=1558790&r2=1558791&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtilsTest.java (original)
+++ sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtilsTest.java Thu Jan 16 13:47:55 2014
@@ -37,7 +37,7 @@ public class JobHandlingUtilsTest {
     public void testFullPropertiesFromPackageCreation() throws Exception {
         ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
         InputStream stream = IOUtils.toInputStream("some text");
-        when(replicationPackage.getInputStream()).thenReturn(stream);
+        when(replicationPackage.createInputStream()).thenReturn(stream);
         when(replicationPackage.getAction()).thenReturn("ADD");
         when(replicationPackage.getId()).thenReturn("an-id");
         when(replicationPackage.getLength()).thenReturn(10l);

Modified: sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageTest.java?rev=1558791&r1=1558790&r2=1558791&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageTest.java (original)
+++ sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageTest.java Thu Jan 16 13:47:55 2014
@@ -43,6 +43,6 @@ public class VoidReplicationPackageTest 
         assertEquals(createdPackage.getType(), readPackage.getType());
         assertEquals(createdPackage.getLength(), readPackage.getLength());
         assertEquals(Arrays.toString(createdPackage.getPaths()), Arrays.toString(readPackage.getPaths()));
-        assertTrue(IOUtils.contentEquals(createdPackage.getInputStream(), readPackage.getInputStream()));
+        assertTrue(IOUtils.contentEquals(createdPackage.createInputStream(), readPackage.createInputStream()));
     }
 }

Added: sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerCustomHeadersTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerCustomHeadersTest.java?rev=1558791&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerCustomHeadersTest.java (added)
+++ sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerCustomHeadersTest.java Thu Jan 16 13:47:55 2014
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.replication.transport.impl;
+
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+
+@RunWith(Parameterized.class)
+public class HttpTransportHandlerCustomHeadersTest {
+
+    private final String[] inputTransportProperties;
+    private final String inputSelector;
+    private final String[] inputPaths;
+
+    private final String[] outputHeaders;
+
+
+    @Parameterized.Parameters
+    public static Iterable<Object[]> data() {
+        return Arrays.asList(new Object[][]{
+                { new String[]{}, "", new String[] {},
+                        new String[]{}},
+                { new String[]{}, "add", new String[] {},
+                        new String[]{}},
+                { new String[]{"add -> Header: Add" }, "add", new String[] {},
+                        new String[]{ "Header: Add" }},
+                { new String[]{"add -> Header: Add", "Header: Always" }, "add", new String[] {},
+                        new String[]{ "Header: Add", "Header: Always" }},
+                { new String[]{"add -> Header: Add", "* -> Header: Always", "delete -> Header:Del" }, "add", new String[] {},
+                        new String[]{"Header: Add", "Header: Always" }},
+                { new String[]{"add -> Header: Add", "Header: Always" }, "delete", new String[] {},
+                        new String[]{"Header: Always" }},
+                { new String[]{"add -> Header: Add", "Header: Always" }, "add", new String[] {},
+                        new String[] {"Header: Add", "Header: Always" }},
+                { new String[]{"add -> Header: Add", "Header: Always", "PathHeader: {path}" }, "add", new String[] { "/content"},
+                        new String[]{"Header: Add", "Header: Always", "PathHeader: /content"}},
+        });
+
+    }
+
+    public HttpTransportHandlerCustomHeadersTest(String[] inputTransportProperties, String inputSelector, String[] inputPaths,
+                                                 String[] outputHeaders){
+        this.inputTransportProperties = inputTransportProperties;
+        this.inputSelector = inputSelector;
+        this.outputHeaders = outputHeaders;
+        this.inputPaths = inputPaths;
+    }
+
+    @Test
+    public void testHttpTransportProperties () {
+        String[] headers = HttpTransportHandler.getCustomizedHeaders (inputTransportProperties, inputSelector, inputPaths);
+
+        assertArrayEquals(outputHeaders, headers);
+    }
+}

Propchange: sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerCustomHeadersTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerTest.java?rev=1558791&r1=1558790&r2=1558791&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerTest.java (original)
+++ sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerTest.java Thu Jan 16 13:47:55 2014
@@ -19,6 +19,8 @@
 package org.apache.sling.replication.transport.impl;
 
 import java.net.URI;
+
+
 import org.apache.http.client.fluent.Content;
 import org.apache.http.client.fluent.Executor;
 import org.apache.http.client.fluent.Request;
@@ -33,6 +35,8 @@ import org.junit.Test;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
 
 /**
  * Testcase for {@link HttpTransportHandler}
@@ -55,4 +59,27 @@ public class HttpTransportHandlerTest {
         when(transportAuthenticationProvider.authenticate(any(Executor.class), any(TransportAuthenticationContext.class))).thenReturn(executor);
         httpTransportHandler.transport(replicationPackage, replicationEndpoint, transportAuthenticationProvider);
     }
+
+    @Test
+    public void testHttpTransportWithMultipleCalls() throws Exception {
+        HttpTransportHandler httpTransportHandler = new HttpTransportHandler();
+
+        ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
+        when(replicationPackage.getAction()).thenReturn(ReplicationActionType.ADD.toString());
+        when(replicationPackage.getType()).thenReturn("test");
+        when(replicationPackage.getPaths()).thenReturn(new String[]{"/content/a", "/content/b"});
+
+        ReplicationEndpoint replicationEndpoint = new ReplicationEndpoint(new URI("http://localhost:8080/system/replication/receive"));
+        TransportAuthenticationProvider<Executor, Executor> transportAuthenticationProvider = mock(TransportAuthenticationProvider.class);
+        Executor executor = mock(Executor.class);
+        Response response = mock(Response.class);
+        Content content = mock(Content.class);
+        when(response.returnContent()).thenReturn(content);
+        when(executor.execute(any(Request.class))).thenReturn(response);
+        when(transportAuthenticationProvider.authenticate(any(Executor.class), any(TransportAuthenticationContext.class))).thenReturn(executor);
+
+        httpTransportHandler.transport(replicationPackage, replicationEndpoint, transportAuthenticationProvider);
+
+        verify(executor, times(1)).execute(any(Request.class));
+    }
 }



Mime
View raw message