Return-Path: X-Original-To: apmail-sling-commits-archive@www.apache.org Delivered-To: apmail-sling-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 15B3410F43 for ; Thu, 16 Jan 2014 13:48:25 +0000 (UTC) Received: (qmail 25903 invoked by uid 500); 16 Jan 2014 13:48:23 -0000 Delivered-To: apmail-sling-commits-archive@sling.apache.org Received: (qmail 25857 invoked by uid 500); 16 Jan 2014 13:48:21 -0000 Mailing-List: contact commits-help@sling.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@sling.apache.org Delivered-To: mailing list commits@sling.apache.org Received: (qmail 25850 invoked by uid 99); 16 Jan 2014 13:48:20 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Jan 2014 13:48:20 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Jan 2014 13:48:17 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 1656D23888FE; Thu, 16 Jan 2014 13:47:57 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1558791 - in /sling/trunk/contrib/extensions/replication/src: main/java/org/apache/sling/replication/agent/ main/java/org/apache/sling/replication/agent/impl/ main/java/org/apache/sling/replication/queue/impl/jobhandling/ main/java/org/apa... Date: Thu, 16 Jan 2014 13:47:56 -0000 To: commits@sling.apache.org From: tommaso@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140116134757.1656D23888FE@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tommaso Date: Thu Jan 16 13:47:55 2014 New Revision: 1558791 URL: http://svn.apache.org/r1558791 Log: SLING-3309 - Marius Petria's patch for flushing of external systems applied Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageBuilder.java (with props) sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.publish/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-httpcacheflush.json sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.publish/org.apache.sling.replication.transport.impl.HttpTransportHandlerFactory-httpcache.json sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config/org.apache.sling.replication.transport.impl.HttpTransportHandlerFactory-http.json sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerCustomHeadersTest.java (with props) Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentConfiguration.java sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackage.java sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackage.java sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandler.java sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandler.java sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/impl/ReplicationAgentResourceProviderTest.java sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueTest.java sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtilsTest.java sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageTest.java sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerTest.java Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentConfiguration.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentConfiguration.java?rev=1558791&r1=1558790&r2=1558791&view=diff ============================================================================== --- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentConfiguration.java (original) +++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentConfiguration.java Thu Jan 16 13:47:55 2014 @@ -45,11 +45,12 @@ public class ReplicationAgentConfigurati public static final String AUTHENTICATION_PROPERTIES = "authentication.properties"; public static final String QUEUE_DISTRIBUTION = "ReplicationQueueDistributionStrategy.target"; - public static final String RULES = "rules"; public static final String ENABLED = "enabled"; + public static final String USE_AGGREGATE_PATHS = "useAggregatePaths"; + public static final String[] COMPONENTS = { TRANSPORT, PACKAGING }; private final boolean enabled; @@ -72,6 +73,8 @@ public class ReplicationAgentConfigurati private final String[] rules; + private final boolean useAggregatePaths; + private final Dictionary componentConfiguration; public ReplicationAgentConfiguration(Dictionary dictionary, Dictionary componentConfiguration) { @@ -88,10 +91,12 @@ public class ReplicationAgentConfigurati String[] ap = PropertiesUtil.toStringArray(dictionary.get(AUTHENTICATION_PROPERTIES)); this.authenticationProperties = ap != null ? ap : new String[0]; this.rules = PropertiesUtil.toStringArray(dictionary.get(RULES), new String[0]); + this.useAggregatePaths = PropertiesUtil.toBoolean(dictionary.get(USE_AGGREGATE_PATHS), true); this.componentConfiguration = componentConfiguration; } + public String[] getAuthenticationProperties() { return authenticationProperties; } @@ -133,6 +138,7 @@ public class ReplicationAgentConfigurati + QUEUEPROVIDER + "\":\"" + targetReplicationQueueProvider + "\", \"" + QUEUE_DISTRIBUTION + "\":\"" + targetReplicationQueueDistributionStrategy+ "\", \"" + TRANSPORT_AUTHENTICATION_FACTORY + "\":\"" + targetAuthenticationHandlerFactory + "\", \"" + + USE_AGGREGATE_PATHS + "\":\"" + useAggregatePaths + "\", \"" + AUTHENTICATION_PROPERTIES + "\":\"" + Arrays.toString(authenticationProperties) + "\", \""; result += toComponentString(); Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java?rev=1558791&r1=1558790&r2=1558791&view=diff ============================================================================== --- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java (original) +++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java Thu Jan 16 13:47:55 2014 @@ -18,10 +18,8 @@ */ package org.apache.sling.replication.agent.impl; -import java.util.Dictionary; -import java.util.Hashtable; -import java.util.Map; -import java.util.Random; +import java.util.*; + import org.apache.felix.scr.annotations.Activate; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.ConfigurationPolicy; @@ -108,6 +106,9 @@ public class ReplicationAgentServiceFact @Property private static final String RULES = ReplicationAgentConfiguration.RULES; + @Property(boolValue = true) + private static final String USE_AGGREGATE_PATHS = ReplicationAgentConfiguration.USE_AGGREGATE_PATHS; + @Property(name = TRANSPORT, value = DEFAULT_TRANSPORT) @Reference(name = "TransportHandler", target = DEFAULT_TRANSPORT, policy = ReferencePolicy.DYNAMIC) private TransportHandler transportHandler; @@ -173,6 +174,10 @@ public class ReplicationAgentServiceFact String af = PropertiesUtil.toString(config.get(TRANSPORT_AUTHENTICATION_FACTORY), ""); props.put(TRANSPORT_AUTHENTICATION_FACTORY, af); + + boolean useAggregatePaths = PropertiesUtil.toBoolean(config.get(USE_AGGREGATE_PATHS), true); + props.put(USE_AGGREGATE_PATHS, useAggregatePaths); + // check configuration is valid if (name == null || packageBuilder == null || queueProvider == null || queueDistributionStrategy == null) { throw new AgentConfigurationException("configuration for this agent is not valid"); @@ -192,7 +197,8 @@ public class ReplicationAgentServiceFact transportHandler, transportAuthenticationProvider, endpoint, packageBuilder, queueProvider, queueDistributionStrategy}); } - ReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, rules, transportHandler, packageBuilder, queueProvider, transportAuthenticationProvider, queueDistributionStrategy); + ReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, rules, useAggregatePaths, + transportHandler, packageBuilder, queueProvider, transportAuthenticationProvider, queueDistributionStrategy); // register agent service agentReg = context.registerService(ReplicationAgent.class.getName(), agent, props); Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java?rev=1558791&r1=1558790&r2=1558791&view=diff ============================================================================== --- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java (original) +++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java Thu Jan 16 13:47:55 2014 @@ -38,6 +38,9 @@ import org.apache.sling.replication.tran import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; + /** * Basic implementation of a {@link ReplicationAgent} */ @@ -61,8 +64,12 @@ public class SimpleReplicationAgent impl private final String[] rules; + private final boolean useAggregatePaths; + public SimpleReplicationAgent(String name, String endpoint, String[] rules, - TransportHandler transportHandler, ReplicationPackageBuilder packageBuilder, + boolean useAggregatePaths, + TransportHandler transportHandler, + ReplicationPackageBuilder packageBuilder, ReplicationQueueProvider queueProvider, TransportAuthenticationProvider transportAuthenticationProvider, ReplicationQueueDistributionStrategy queueDistributionHandler) { @@ -74,11 +81,29 @@ public class SimpleReplicationAgent impl this.queueProvider = queueProvider; this.transportAuthenticationProvider = transportAuthenticationProvider; this.queueDistributionStrategy = queueDistributionHandler; + this.useAggregatePaths = useAggregatePaths; } public ReplicationResponse execute(ReplicationRequest replicationRequest) throws AgentReplicationException { + // create packages from request + ReplicationPackage[] replicationPackages = buildPackages(replicationRequest); + + ReplicationResponse replicationResponse = schedule(replicationPackages, false); + + return replicationResponse; + } + + public void send(ReplicationRequest replicationRequest) throws AgentReplicationException { + // create packages from request + ReplicationPackage[] replicationPackages = buildPackages(replicationRequest); + + schedule(replicationPackages, true); + } + + + private ReplicationPackage buildPackage(ReplicationRequest replicationRequest) throws AgentReplicationException { // create package from request ReplicationPackage replicationPackage; try { @@ -87,43 +112,78 @@ public class SimpleReplicationAgent impl throw new AgentReplicationException(e); } - ReplicationResponse replicationResponse = new ReplicationResponse(); + return replicationPackage; + } - // send the replication package to the queue distribution handler - try { - ReplicationQueueItemState state = queueDistributionStrategy.add(replicationPackage, - this, queueProvider); - if (state != null) { - replicationResponse.setStatus(state.getItemState().toString()); - replicationResponse.setSuccessful(state.isSuccessful()); - } else { - replicationResponse.setStatus(ReplicationQueueItemState.ItemState.ERROR.toString()); - } - } catch (Exception e) { - if (log.isErrorEnabled()) { - log.error("an error happened during queue processing", e); + private ReplicationPackage[] buildPackages(ReplicationRequest replicationRequest) throws AgentReplicationException { + + List packages = new ArrayList(); + + if(useAggregatePaths){ + ReplicationPackage replicationPackage = buildPackage(replicationRequest); + packages.add(replicationPackage); + } + else { + for (String path : replicationRequest.getPaths()){ + ReplicationPackage replicationPackage = buildPackage(new ReplicationRequest(replicationRequest.getTime(), + replicationRequest.getAction(), + new String[] { path })); + + packages.add(replicationPackage); } - replicationResponse.setSuccessful(false); + } + + return packages.toArray(new ReplicationPackage[0]); + } + + // offer option throws an exception at first error + private ReplicationResponse schedule(ReplicationPackage[] packages, boolean offer) throws AgentReplicationException { + ReplicationResponse replicationResponse = new ReplicationResponse(); + + for (ReplicationPackage replicationPackage : packages){ + ReplicationResponse currentReplicationResponse = schedule(replicationPackage, offer); + + replicationResponse.setSuccessful(currentReplicationResponse.isSuccessful()); + replicationResponse.setStatus(currentReplicationResponse.getStatus()); } return replicationResponse; } - public void send(ReplicationRequest replicationRequest) throws AgentReplicationException { - // create package from request - ReplicationPackage replicationPackage; - try { - replicationPackage = packageBuilder.createPackage(replicationRequest); - } catch (ReplicationPackageBuildingException e) { - throw new AgentReplicationException(e); + private ReplicationResponse schedule(ReplicationPackage replicationPackage, boolean offer) throws AgentReplicationException { + ReplicationResponse replicationResponse = new ReplicationResponse(); + + if(offer){ + try { + queueDistributionStrategy.offer(replicationPackage, this, queueProvider); + } catch (ReplicationQueueException e) { + replicationResponse.setSuccessful(false); + throw new AgentReplicationException(e); + } } - try { - queueDistributionStrategy.offer(replicationPackage, this, queueProvider); - } catch (ReplicationQueueException e) { - throw new AgentReplicationException(e); + else { + // send the replication package to the queue distribution handler + try { + ReplicationQueueItemState state = queueDistributionStrategy.add(replicationPackage, + this, queueProvider); + if (state != null) { + replicationResponse.setStatus(state.getItemState().toString()); + replicationResponse.setSuccessful(state.isSuccessful()); + } else { + replicationResponse.setStatus(ReplicationQueueItemState.ItemState.ERROR.toString()); + replicationResponse.setSuccessful(false); + } + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("an error happened during queue processing", e); + } + replicationResponse.setSuccessful(false); + } } + return replicationResponse; } + public boolean process(ReplicationPackage item) throws AgentReplicationException { try { if (transportHandler != null || (endpoint != null && endpoint.length() > 0)) { Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java?rev=1558791&r1=1558790&r2=1558791&view=diff ============================================================================== --- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java (original) +++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java Thu Jan 16 13:47:55 2014 @@ -110,7 +110,7 @@ public class JobHandlingUtils { return (Long) job.getProperty(LENGTH); } - public InputStream getInputStream() throws IOException { + public InputStream createInputStream() throws IOException { return IOUtils.toInputStream(String.valueOf(job.getProperty(BIN))); // workaround to make void package work while we get SLING-3140 to be released @@ -139,7 +139,7 @@ public class JobHandlingUtils { properties.put(PATHS, replicationPackage.getPaths()); properties.put(LENGTH, replicationPackage.getLength()); properties.put(ACTION, replicationPackage.getAction()); - properties.put(BIN, IOUtils.toString(replicationPackage.getInputStream())); + properties.put(BIN, IOUtils.toString(replicationPackage.createInputStream())); properties.put(TYPE, replicationPackage.getType()); return properties; } Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackage.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackage.java?rev=1558791&r1=1558790&r2=1558791&view=diff ============================================================================== --- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackage.java (original) +++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackage.java Thu Jan 16 13:47:55 2014 @@ -52,11 +52,12 @@ public interface ReplicationPackage exte String getType(); /** - * get package stream + * creates a package stream. + * a new stream is created for each call and it is the caller's obligation to close the stream. * @return an {@link InputStream} * @throws IOException */ - InputStream getInputStream() throws IOException; + InputStream createInputStream() throws IOException; /** * get package stream length Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java?rev=1558791&r1=1558790&r2=1558791&view=diff ============================================================================== --- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java (original) +++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackage.java Thu Jan 16 13:47:55 2014 @@ -39,23 +39,36 @@ public class VoidReplicationPackage impl private final String id; + private final String action; + public VoidReplicationPackage(ReplicationRequest request, String type) { this.type = type; this.paths = request.getPaths(); - this.id = ReplicationActionType.DELETE.toString() + ':' + Arrays.toString(request.getPaths()) + ':' + request.getTime(); + this.action = request.getAction().toString(); + this.id = request.getAction().toString() + ':' + Arrays.toString(request.getPaths()) + ':' + request.getTime(); } public static VoidReplicationPackage fromStream(InputStream stream) throws IOException { VoidReplicationPackage replicationPackage = null; String streamString = IOUtils.toString(stream); + int beginIndex = streamString.indexOf(':'); int endIndex = streamString.lastIndexOf(':'); - if (beginIndex >= 0 && endIndex > beginIndex && streamString.startsWith(ReplicationActionType.DELETE.toString())) { - String pathsArrayString = Text.unescape(streamString.substring(beginIndex + 1, endIndex - 1)); - String[] paths = pathsArrayString.replaceAll("\\[", "").replaceAll("\\]", "").split(", "); - ReplicationRequest request = new ReplicationRequest(Long.valueOf(streamString.substring(streamString.lastIndexOf(':') + 1)), - ReplicationActionType.DELETE, paths); - replicationPackage = new VoidReplicationPackage(request, "VOID"); + if (beginIndex >= 0 && endIndex > beginIndex){ + String actionString = streamString.substring(0, beginIndex); + String pathsString = streamString.substring(beginIndex+1, endIndex); + String timeString = streamString.substring(endIndex + 1); + + ReplicationActionType replicationActionType = ReplicationActionType.fromName(actionString); + + if(replicationActionType != null){ + pathsString = Text.unescape(pathsString); + String[] paths = pathsString.replaceAll("\\[", "").replaceAll("\\]", "").split(", "); + + ReplicationRequest request = new ReplicationRequest(Long.valueOf(timeString), + replicationActionType, paths); + replicationPackage = new VoidReplicationPackage(request, "VOID"); + } } return replicationPackage; } @@ -75,7 +88,7 @@ public class VoidReplicationPackage impl return id.getBytes().length; } - public InputStream getInputStream() throws IOException { + public InputStream createInputStream() throws IOException { return new ByteArrayInputStream(id.getBytes()); } @@ -84,7 +97,6 @@ public class VoidReplicationPackage impl } public String getAction() { - return ReplicationActionType.DELETE.toString(); + return action; } - } Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageBuilder.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageBuilder.java?rev=1558791&view=auto ============================================================================== --- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageBuilder.java (added) +++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageBuilder.java Thu Jan 16 13:47:55 2014 @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.replication.serialization.impl; + +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Property; +import org.apache.felix.scr.annotations.Service; +import org.apache.sling.replication.communication.ReplicationRequest; +import org.apache.sling.replication.serialization.ReplicationPackage; +import org.apache.sling.replication.serialization.ReplicationPackageBuilder; +import org.apache.sling.replication.serialization.ReplicationPackageBuildingException; +import org.apache.sling.replication.serialization.ReplicationPackageReadingException; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; + +@Component(metatype = false) +@Service(value = ReplicationPackageBuilder.class) +@Property(name = "name", value = VoidReplicationPackageBuilder.NAME) +public class VoidReplicationPackageBuilder implements ReplicationPackageBuilder { + public static final String NAME = "void"; + + public ReplicationPackage createPackage(ReplicationRequest request) throws ReplicationPackageBuildingException { + return new VoidReplicationPackage(request, "VOID"); + } + + public ReplicationPackage readPackage(InputStream stream, boolean install) throws ReplicationPackageReadingException { + try { + return VoidReplicationPackage.fromStream(stream); + } catch (Exception e) { + throw new ReplicationPackageReadingException(e); + } + } + + public ReplicationPackage getPackage(String id) { + try { + return VoidReplicationPackage.fromStream(new ByteArrayInputStream(id.getBytes())); + } + catch (IOException ex){ + return null; + } + } +} Propchange: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageBuilder.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackage.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackage.java?rev=1558791&r1=1558790&r2=1558791&view=diff ============================================================================== --- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackage.java (original) +++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/vlt/FileVaultReplicationPackage.java Thu Jan 16 13:47:55 2014 @@ -64,7 +64,7 @@ public class FileVaultReplicationPackage return paths; } - public InputStream getInputStream() throws IOException { + public InputStream createInputStream() throws IOException { return new FileInputStream(pkg.getFile()); } Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java?rev=1558791&r1=1558790&r2=1558791&view=diff ============================================================================== --- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java (original) +++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java Thu Jan 16 13:47:55 2014 @@ -19,6 +19,7 @@ package org.apache.sling.replication.servlet; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Enumeration; import java.util.List; @@ -137,8 +138,16 @@ public class ReplicationAgentServlet ext // get first item ReplicationPackage head = queue.getHead(); if (head != null) { - int bytesCopied = IOUtils.copy(head.getInputStream(), - response.getOutputStream()); + InputStream inputStream = null; + int bytesCopied = -1; + try { + inputStream = head.createInputStream(); + bytesCopied = IOUtils.copy(inputStream, response.getOutputStream()); + } + finally { + IOUtils.closeQuietly(inputStream); + } + response.setHeader(ReplicationHeader.TYPE.toString(), head.getType()); if (log.isInfoEnabled()) { log.info("{} bytes written into the response", bytesCopied); Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandler.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandler.java?rev=1558791&r1=1558790&r2=1558791&view=diff ============================================================================== --- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandler.java (original) +++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandler.java Thu Jan 16 13:47:55 2014 @@ -18,11 +18,12 @@ */ package org.apache.sling.replication.transport.impl; +import java.io.ByteArrayInputStream; import java.io.IOException; -import java.util.Arrays; -import org.apache.felix.scr.annotations.Component; -import org.apache.felix.scr.annotations.Property; -import org.apache.felix.scr.annotations.Service; +import java.io.InputStream; +import java.util.*; + +import org.apache.commons.io.IOUtils; import org.apache.http.client.fluent.Content; import org.apache.http.client.fluent.Executor; import org.apache.http.client.fluent.Request; @@ -41,14 +42,32 @@ import org.slf4j.LoggerFactory; /** * basic HTTP POST {@link TransportHandler} */ -@Component(metatype = false) -@Service(value = TransportHandler.class) -@Property(name = "name", value = HttpTransportHandler.NAME) public class HttpTransportHandler implements TransportHandler { public static final String NAME = "http"; - private final Logger log = LoggerFactory.getLogger(getClass()); + private static final String PATH_VARIABLE_NAME = "{path}"; + + private static final Logger log = LoggerFactory.getLogger(HttpTransportHandler.class); + + private final boolean useCustomHeaders; + + private final String[] customHeaders; + + private final boolean useCustomBody; + + private final String customBody; + + public HttpTransportHandler(boolean useCustomHeaders, String[] customHeaders, boolean useCustomBody, String customBody) { + this.useCustomHeaders = useCustomHeaders; + this.customHeaders = customHeaders; + this.useCustomBody = useCustomBody; + this.customBody = customBody; + } + + public HttpTransportHandler(){ + this(false, new String[0], false, ""); + } @SuppressWarnings("unchecked") public void transport(ReplicationPackage replicationPackage, @@ -67,31 +86,112 @@ public class HttpTransportHandler implem executor = ((TransportAuthenticationProvider) transportAuthenticationProvider) .authenticate(executor, context); - String[] paths = replicationPackage.getPaths(); - String type = replicationPackage.getType(); - String pathsString = Arrays.toString(paths); - Request req = Request.Post(replicationEndpoint.getUri()).useExpectContinue() - .addHeader(ReplicationHeader.TYPE.toString(), type); - if (replicationPackage.getInputStream() != null) { - req = req.bodyStream(replicationPackage.getInputStream(), - ContentType.APPLICATION_OCTET_STREAM); - } - Response response = executor.execute(req); - if (response != null) { - Content content = response.returnContent(); - if (log.isInfoEnabled()) { - log.info("Replication content of type {} for {} delivered: {}", new Object[]{ - type, pathsString, content}); + deliverPackage(executor, replicationPackage, replicationEndpoint); + + } catch (Exception e) { + throw new ReplicationTransportException(e); + } + } + + public static String[] getCustomizedHeaders(String[] additionalHeaders, String action, String[] paths){ + List headers = new ArrayList(); + + for(String additionalHeader : additionalHeaders){ + int idx = additionalHeader.indexOf("->"); + + if(idx < 0){ + headers.add(additionalHeader); + } else { + String actionSelector = additionalHeader.substring(0, idx).trim(); + String header = additionalHeader.substring(idx+2).trim(); + + if(actionSelector.equalsIgnoreCase(action) || actionSelector.equals("*")){ + headers.add(header); } } + } + + + + StringBuilder sb = new StringBuilder(); + + if(paths != null && paths.length > 0) { + sb.append(paths[0]); + for(int i=1; i < paths.length; i++){ + sb.append(", " + paths[i]); + } + } + + String path = sb.toString(); + + List boundHeaders = new ArrayList(); + + for(String header : headers){ + boundHeaders.add(header.replace(PATH_VARIABLE_NAME, path)); + } + + return boundHeaders.toArray(new String[0]); + } + + private void deliverPackage(Executor executor, ReplicationPackage replicationPackage, + ReplicationEndpoint replicationEndpoint) throws IOException{ + String type = replicationPackage.getType(); + + + Request req = Request.Post(replicationEndpoint.getUri()).useExpectContinue(); + + if(useCustomHeaders){ + String[] customizedHeaders = getCustomizedHeaders(customHeaders, replicationPackage.getAction(), replicationPackage.getPaths()); + for(String header : customizedHeaders){ + addHeader(req, header); + } + } + else { + req.addHeader(ReplicationHeader.TYPE.toString(), type); + } + + InputStream inputStream = null; + Response response = null; + try{ + if (useCustomBody) { + String body = customBody == null ? "" : customBody; + inputStream = new ByteArrayInputStream(body.getBytes()); + } else { - throw new IOException("response is empty"); + inputStream = replicationPackage.createInputStream(); } - } catch (Exception e) { - throw new ReplicationTransportException(e); + + if(inputStream != null) { + req = req.bodyStream(inputStream, ContentType.APPLICATION_OCTET_STREAM); + } + + response = executor.execute(req); + } + finally { + IOUtils.closeQuietly(inputStream); + } + + if (response != null) { + Content content = response.returnContent(); + if (log.isInfoEnabled()) { + log.info("Replication content of type {} for {} delivered: {}", new Object[]{ + type, Arrays.toString(replicationPackage.getPaths()), content}); + } + } + else { + throw new IOException("response is empty"); } } + private static void addHeader(Request req, String header){ + int idx = header.indexOf(":"); + if(idx < 0) return; + String headerName = header.substring(0, idx).trim(); + String headerValue = header.substring(idx+1).trim(); + req.addHeader(headerName, headerValue); + } + + public boolean supportsAuthenticationProvider(TransportAuthenticationProvider transportAuthenticationProvider) { return transportAuthenticationProvider.canAuthenticate(Executor.class); } Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandler.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandler.java?rev=1558791&r1=1558790&r2=1558791&view=diff ============================================================================== --- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandler.java (original) +++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandler.java Thu Jan 16 13:47:55 2014 @@ -18,6 +18,7 @@ */ package org.apache.sling.replication.transport.impl; +import java.io.InputStream; import java.net.URI; import java.util.Dictionary; import java.util.Properties; @@ -25,6 +26,7 @@ import javax.jcr.Node; import javax.jcr.Session; import javax.jcr.nodetype.NodeType; +import org.apache.commons.io.IOUtils; import org.apache.felix.scr.annotations.Component; import org.apache.felix.scr.annotations.Property; import org.apache.felix.scr.annotations.Reference; @@ -83,8 +85,15 @@ public class RepositoryTransportHandler NodeType.NT_FILE); Node contentNode = addedNode.addNode(JcrConstants.JCR_CONTENT, NodeType.NT_RESOURCE); if (contentNode != null) { - contentNode.setProperty(JcrConstants.JCR_DATA, session.getValueFactory().createBinary(replicationPackage.getInputStream())); - session.save(); + InputStream inputStream = null; + try { + inputStream = replicationPackage.createInputStream(); + contentNode.setProperty(JcrConstants.JCR_DATA, session.getValueFactory().createBinary(inputStream)); + session.save(); + } + finally { + IOUtils.closeQuietly(inputStream); + } } if (log.isInfoEnabled()) { log.info("package {} delivered to the repository as node {} ", Added: sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.publish/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-httpcacheflush.json URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.publish/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-httpcacheflush.json?rev=1558791&view=auto ============================================================================== --- sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.publish/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-httpcacheflush.json (added) +++ sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.publish/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-httpcacheflush.json Thu Jan 16 13:47:55 2014 @@ -0,0 +1,13 @@ +{ + "jcr:primaryType" : "sling:OsgiConfig", + "name" : "httpcacheflush", + "endpoint" : "http://localhost:8000/invalidatecache", + "TransportHandler.target" : "(name=httpcache)", + "useAggregatePaths" : false, + "ReplicationPackageBuilder.target" : "(name=void)", + "ReplicationQueueProvider.target" : "(name=simple)", + "ReplicationQueueDistributionStrategy.target" : "(name=single)", + "TransportAuthenticationProviderFactory.target" : "(name=nop)", + "rules" : ["trigger on path: /content/usergenerated"], + "enabled" : false +} Added: sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.publish/org.apache.sling.replication.transport.impl.HttpTransportHandlerFactory-httpcache.json URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.publish/org.apache.sling.replication.transport.impl.HttpTransportHandlerFactory-httpcache.json?rev=1558791&view=auto ============================================================================== --- sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.publish/org.apache.sling.replication.transport.impl.HttpTransportHandlerFactory-httpcache.json (added) +++ sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.publish/org.apache.sling.replication.transport.impl.HttpTransportHandlerFactory-httpcache.json Thu Jan 16 13:47:55 2014 @@ -0,0 +1,11 @@ +{ + "jcr:primaryType" : "sling:OsgiConfig", + "name" : "httpcache", + "useCustomHeaders" : true, + "customHeaders" : [ + "Path: {path}", + "add -> Action : REFRESH", + "delete -> Change : REMOVE" ], + "useCustomBody": true, + "customBody" : "" +} \ No newline at end of file Added: sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config/org.apache.sling.replication.transport.impl.HttpTransportHandlerFactory-http.json URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config/org.apache.sling.replication.transport.impl.HttpTransportHandlerFactory-http.json?rev=1558791&view=auto ============================================================================== --- sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config/org.apache.sling.replication.transport.impl.HttpTransportHandlerFactory-http.json (added) +++ sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config/org.apache.sling.replication.transport.impl.HttpTransportHandlerFactory-http.json Thu Jan 16 13:47:55 2014 @@ -0,0 +1,4 @@ +{ + "jcr:primaryType" : "sling:OsgiConfig", + "name" : "http" +} \ No newline at end of file Modified: sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/impl/ReplicationAgentResourceProviderTest.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/impl/ReplicationAgentResourceProviderTest.java?rev=1558791&r1=1558790&r2=1558791&view=diff ============================================================================== --- sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/impl/ReplicationAgentResourceProviderTest.java (original) +++ sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/impl/ReplicationAgentResourceProviderTest.java Thu Jan 16 13:47:55 2014 @@ -107,7 +107,7 @@ public class ReplicationAgentResourcePro String filter = "(name=" + path + ")"; when(context.getServiceReferences(ReplicationAgent.class.getName(), filter)).thenReturn( agentServiceReferences); - SimpleReplicationAgent replicationAgent = new SimpleReplicationAgent(path, null, null, + SimpleReplicationAgent replicationAgent = new SimpleReplicationAgent(path, null, null, true, null, null, null, null, null); when(context.getService(serviceReference)).thenReturn(replicationAgent); return replicationAgent; Modified: sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java?rev=1558791&r1=1558790&r2=1558791&view=diff ============================================================================== --- sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java (original) +++ sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java Thu Jan 16 13:47:55 2014 @@ -36,6 +36,7 @@ import static org.junit.Assert.assertEqu import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -53,7 +54,7 @@ public class SimpleReplicationAgentTest ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class); TransportAuthenticationProvider transportAuthenticationProvider = mock(TransportAuthenticationProvider.class); ReplicationQueueDistributionStrategy distributionHandler = mock(ReplicationQueueDistributionStrategy.class); - SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, new String[0], + SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, new String[0], true, transportHandler, packageBuilder, queueProvider, transportAuthenticationProvider, distributionHandler); ReplicationPackage item = mock(ReplicationPackage.class); assertTrue(agent.process(item)); @@ -68,7 +69,7 @@ public class SimpleReplicationAgentTest ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class); TransportAuthenticationProvider transportAuthenticationProvider = mock(TransportAuthenticationProvider.class); ReplicationQueueDistributionStrategy distributionHandler = mock(ReplicationQueueDistributionStrategy.class); - SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, new String[0], + SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, new String[0], true, transportHandler, packageBuilder, queueProvider, transportAuthenticationProvider, distributionHandler); ReplicationRequest request = new ReplicationRequest(System.nanoTime(), ReplicationActionType.ADD, "/"); @@ -92,7 +93,7 @@ public class SimpleReplicationAgentTest ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class); TransportAuthenticationProvider transportAuthenticationProvider = mock(TransportAuthenticationProvider.class); ReplicationQueueDistributionStrategy distributionHandler = mock(ReplicationQueueDistributionStrategy.class); - SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, new String[0], + SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, new String[0], true, transportHandler, packageBuilder, queueProvider, transportAuthenticationProvider, distributionHandler); ReplicationRequest request = new ReplicationRequest(System.nanoTime(), ReplicationActionType.ADD, "/"); @@ -100,7 +101,7 @@ public class SimpleReplicationAgentTest ReplicationQueueItemState state = new ReplicationQueueItemState(); state.setItemState(ReplicationQueueItemState.ItemState.SUCCEEDED); when(distributionHandler.add(replicationPackage, agent, queueProvider)).thenReturn(state); - when(packageBuilder.createPackage(request)).thenReturn(replicationPackage); + when(packageBuilder.createPackage(any(ReplicationRequest.class))).thenReturn(replicationPackage); when(queueProvider.getQueue(agent, replicationPackage)).thenReturn( new SimpleReplicationQueue(agent, "name")); when(queueProvider.getDefaultQueue(agent)).thenReturn( @@ -119,7 +120,7 @@ public class SimpleReplicationAgentTest ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class); TransportAuthenticationProvider transportAuthenticationProvider = mock(TransportAuthenticationProvider.class); ReplicationQueueDistributionStrategy distributionHandler = mock(ReplicationQueueDistributionStrategy.class); - SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, new String[0], + SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, new String[0], true, transportHandler, packageBuilder, queueProvider, transportAuthenticationProvider, distributionHandler); ReplicationRequest request = new ReplicationRequest(System.nanoTime(), ReplicationActionType.ADD, "/"); @@ -141,7 +142,7 @@ public class SimpleReplicationAgentTest ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class); TransportAuthenticationProvider transportAuthenticationProvider = mock(TransportAuthenticationProvider.class); ReplicationQueueDistributionStrategy distributionHandler = mock(ReplicationQueueDistributionStrategy.class); - SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, new String[0], + SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, new String[0], true, transportHandler, packageBuilder, queueProvider, transportAuthenticationProvider, distributionHandler); ReplicationQueue queue = mock(ReplicationQueue.class); when(queueProvider.getDefaultQueue(agent)).thenReturn(queue); @@ -157,7 +158,7 @@ public class SimpleReplicationAgentTest ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class); TransportAuthenticationProvider transportAuthenticationProvider = mock(TransportAuthenticationProvider.class); ReplicationQueueDistributionStrategy distributionHandler = mock(ReplicationQueueDistributionStrategy.class); - SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, new String[0], + SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, new String[0], true, transportHandler, packageBuilder, queueProvider, transportAuthenticationProvider, distributionHandler); ReplicationQueue queue = mock(ReplicationQueue.class); when(queueProvider.getQueue(agent, "priority")).thenReturn(queue); @@ -173,7 +174,7 @@ public class SimpleReplicationAgentTest ReplicationQueueProvider queueProvider = mock(ReplicationQueueProvider.class); TransportAuthenticationProvider transportAuthenticationProvider = mock(TransportAuthenticationProvider.class); ReplicationQueueDistributionStrategy distributionHandler = mock(ReplicationQueueDistributionStrategy.class); - SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, new String[0], + SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, new String[0], true, transportHandler, packageBuilder, queueProvider, transportAuthenticationProvider, distributionHandler); ReplicationQueue queue = mock(ReplicationQueue.class); when(queueProvider.getQueue(agent, "priority")).thenReturn(queue); Modified: sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueTest.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueTest.java?rev=1558791&r1=1558790&r2=1558791&view=diff ============================================================================== --- sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueTest.java (original) +++ sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueTest.java Thu Jan 16 13:47:55 2014 @@ -56,7 +56,7 @@ public class JobHandlingReplicationQueue ReplicationQueue queue = new JobHandlingReplicationQueue("aname", topic, jobManager); ReplicationPackage pkg = mock(ReplicationPackage.class); InputStream stream = new ByteArrayInputStream("rep".getBytes()); - when(pkg.getInputStream()).thenReturn(stream); + when(pkg.createInputStream()).thenReturn(stream); assertTrue(queue.add(pkg)); } @@ -72,7 +72,7 @@ public class JobHandlingReplicationQueue ReplicationQueue queue = new JobHandlingReplicationQueue("aname", topic, jobManager); ReplicationPackage pkg = mock(ReplicationPackage.class); InputStream stream = new ByteArrayInputStream("rep".getBytes()); - when(pkg.getInputStream()).thenReturn(stream); + when(pkg.createInputStream()).thenReturn(stream); assertTrue(queue.add(pkg)); ReplicationQueueItemState status = queue.getStatus(pkg); assertNotNull(status); Modified: sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtilsTest.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtilsTest.java?rev=1558791&r1=1558790&r2=1558791&view=diff ============================================================================== --- sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtilsTest.java (original) +++ sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtilsTest.java Thu Jan 16 13:47:55 2014 @@ -37,7 +37,7 @@ public class JobHandlingUtilsTest { public void testFullPropertiesFromPackageCreation() throws Exception { ReplicationPackage replicationPackage = mock(ReplicationPackage.class); InputStream stream = IOUtils.toInputStream("some text"); - when(replicationPackage.getInputStream()).thenReturn(stream); + when(replicationPackage.createInputStream()).thenReturn(stream); when(replicationPackage.getAction()).thenReturn("ADD"); when(replicationPackage.getId()).thenReturn("an-id"); when(replicationPackage.getLength()).thenReturn(10l); Modified: sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageTest.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageTest.java?rev=1558791&r1=1558790&r2=1558791&view=diff ============================================================================== --- sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageTest.java (original) +++ sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/serialization/impl/VoidReplicationPackageTest.java Thu Jan 16 13:47:55 2014 @@ -43,6 +43,6 @@ public class VoidReplicationPackageTest assertEquals(createdPackage.getType(), readPackage.getType()); assertEquals(createdPackage.getLength(), readPackage.getLength()); assertEquals(Arrays.toString(createdPackage.getPaths()), Arrays.toString(readPackage.getPaths())); - assertTrue(IOUtils.contentEquals(createdPackage.getInputStream(), readPackage.getInputStream())); + assertTrue(IOUtils.contentEquals(createdPackage.createInputStream(), readPackage.createInputStream())); } } Added: sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerCustomHeadersTest.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerCustomHeadersTest.java?rev=1558791&view=auto ============================================================================== --- sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerCustomHeadersTest.java (added) +++ sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerCustomHeadersTest.java Thu Jan 16 13:47:55 2014 @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sling.replication.transport.impl; + + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Map; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + + +@RunWith(Parameterized.class) +public class HttpTransportHandlerCustomHeadersTest { + + private final String[] inputTransportProperties; + private final String inputSelector; + private final String[] inputPaths; + + private final String[] outputHeaders; + + + @Parameterized.Parameters + public static Iterable data() { + return Arrays.asList(new Object[][]{ + { new String[]{}, "", new String[] {}, + new String[]{}}, + { new String[]{}, "add", new String[] {}, + new String[]{}}, + { new String[]{"add -> Header: Add" }, "add", new String[] {}, + new String[]{ "Header: Add" }}, + { new String[]{"add -> Header: Add", "Header: Always" }, "add", new String[] {}, + new String[]{ "Header: Add", "Header: Always" }}, + { new String[]{"add -> Header: Add", "* -> Header: Always", "delete -> Header:Del" }, "add", new String[] {}, + new String[]{"Header: Add", "Header: Always" }}, + { new String[]{"add -> Header: Add", "Header: Always" }, "delete", new String[] {}, + new String[]{"Header: Always" }}, + { new String[]{"add -> Header: Add", "Header: Always" }, "add", new String[] {}, + new String[] {"Header: Add", "Header: Always" }}, + { new String[]{"add -> Header: Add", "Header: Always", "PathHeader: {path}" }, "add", new String[] { "/content"}, + new String[]{"Header: Add", "Header: Always", "PathHeader: /content"}}, + }); + + } + + public HttpTransportHandlerCustomHeadersTest(String[] inputTransportProperties, String inputSelector, String[] inputPaths, + String[] outputHeaders){ + this.inputTransportProperties = inputTransportProperties; + this.inputSelector = inputSelector; + this.outputHeaders = outputHeaders; + this.inputPaths = inputPaths; + } + + @Test + public void testHttpTransportProperties () { + String[] headers = HttpTransportHandler.getCustomizedHeaders (inputTransportProperties, inputSelector, inputPaths); + + assertArrayEquals(outputHeaders, headers); + } +} Propchange: sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerCustomHeadersTest.java ------------------------------------------------------------------------------ svn:eol-style = native Modified: sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerTest.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerTest.java?rev=1558791&r1=1558790&r2=1558791&view=diff ============================================================================== --- sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerTest.java (original) +++ sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerTest.java Thu Jan 16 13:47:55 2014 @@ -19,6 +19,8 @@ package org.apache.sling.replication.transport.impl; import java.net.URI; + + import org.apache.http.client.fluent.Content; import org.apache.http.client.fluent.Executor; import org.apache.http.client.fluent.Request; @@ -33,6 +35,8 @@ import org.junit.Test; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.times; /** * Testcase for {@link HttpTransportHandler} @@ -55,4 +59,27 @@ public class HttpTransportHandlerTest { when(transportAuthenticationProvider.authenticate(any(Executor.class), any(TransportAuthenticationContext.class))).thenReturn(executor); httpTransportHandler.transport(replicationPackage, replicationEndpoint, transportAuthenticationProvider); } + + @Test + public void testHttpTransportWithMultipleCalls() throws Exception { + HttpTransportHandler httpTransportHandler = new HttpTransportHandler(); + + ReplicationPackage replicationPackage = mock(ReplicationPackage.class); + when(replicationPackage.getAction()).thenReturn(ReplicationActionType.ADD.toString()); + when(replicationPackage.getType()).thenReturn("test"); + when(replicationPackage.getPaths()).thenReturn(new String[]{"/content/a", "/content/b"}); + + ReplicationEndpoint replicationEndpoint = new ReplicationEndpoint(new URI("http://localhost:8080/system/replication/receive")); + TransportAuthenticationProvider transportAuthenticationProvider = mock(TransportAuthenticationProvider.class); + Executor executor = mock(Executor.class); + Response response = mock(Response.class); + Content content = mock(Content.class); + when(response.returnContent()).thenReturn(content); + when(executor.execute(any(Request.class))).thenReturn(response); + when(transportAuthenticationProvider.authenticate(any(Executor.class), any(TransportAuthenticationContext.class))).thenReturn(executor); + + httpTransportHandler.transport(replicationPackage, replicationEndpoint, transportAuthenticationProvider); + + verify(executor, times(1)).execute(any(Request.class)); + } }