incubator-sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomm...@apache.org
Subject svn commit: r1564382 [1/2] - in /sling/trunk/contrib/extensions/replication/src: main/java/org/apache/sling/replication/agent/ main/java/org/apache/sling/replication/agent/impl/ main/java/org/apache/sling/replication/queue/impl/jobhandling/ main/java/o...
Date Tue, 04 Feb 2014 17:16:36 GMT
Author: tommaso
Date: Tue Feb  4 17:16:36 2014
New Revision: 1564382

URL: http://svn.apache.org/r1564382
Log:
SLING-3336 - applied Marius patch for multiple endpoints support

Added:
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandler.java   (with props)
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandlerFactory.java   (with props)
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandlerFactory.java   (with props)
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandlerFactory.java   (with props)
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/TransportEndpointStrategyType.java   (with props)
    sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.author/org.apache.sling.replication.transport.impl.HttpTransportHandlerFactory-http-publish-receive.json
    sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.author/org.apache.sling.replication.transport.impl.PollingTransportHandlerFactory-http-publish-poll.json
    sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.publish/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-author.json
    sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.publish/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-cache-flush.json
    sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.publish/org.apache.sling.replication.transport.impl.HttpTransportHandlerFactory-http-author-receive.json
    sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.publish/org.apache.sling.replication.transport.impl.HttpTransportHandlerFactory-http-cache-flush.json
Modified:
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentConfiguration.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentsManager.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/DefaultReplicationAgentsManager.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/TransportHandler.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandler.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerFactory.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandler.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandler.java
    sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.author/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-publish-reverse.json
    sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.author/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-publish.json
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/impl/ReplicationAgentResourceProviderTest.java
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactoryTest.java
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgentTest.java
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerTest.java
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/transport/impl/PollingTransportHandlerTest.java
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandlerTest.java

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java?rev=1564382&r1=1564381&r2=1564382&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java Tue Feb  4 17:16:36 2014
@@ -18,7 +18,6 @@
  */
 package org.apache.sling.replication.agent;
 
-import java.net.URI;
 import org.apache.sling.replication.communication.ReplicationRequest;
 import org.apache.sling.replication.communication.ReplicationResponse;
 import org.apache.sling.replication.queue.ReplicationQueue;
@@ -47,21 +46,13 @@ public interface ReplicationAgent {
     ReplicationQueue getQueue(String name) throws ReplicationQueueException;
 
     /**
-     * get the rules defined for this {@link ReplicationAgent}
-     *
-     * @return an <code>Array</code> of <code>String</code>s for this agent's rules
-     */
-    String[] getRules();
-
-    /**
      * Synchronously sends a {@link ReplicationRequest} waiting for a {@link ReplicationResponse}
      *
      * @param replicationRequest the replication request
      * @return a {@link ReplicationResponse}
      * @throws AgentReplicationException
      */
-    ReplicationResponse execute(ReplicationRequest replicationRequest)
-            throws AgentReplicationException;
+    ReplicationResponse execute(ReplicationRequest replicationRequest) throws AgentReplicationException;
 
     /**
      * Asynchronously sends a {@link ReplicationRequest} without waiting for any response
@@ -72,14 +63,6 @@ public interface ReplicationAgent {
     void send(ReplicationRequest replicationRequest) throws AgentReplicationException;
 
     /**
-     * get the agent configured endpoint
-     *
-     * @return an <code>URI</code> specifying its endpoint
-     */
-    URI getEndpoint();
-
-
-    /**
      * removes a package from the top of the queue
      * @param queueName
      *          the name of a {@link ReplicationQueue} bound tothis agent
@@ -87,4 +70,14 @@ public interface ReplicationAgent {
      * @throws ReplicationQueueException
      */
     ReplicationPackage removeHead(String queueName) throws ReplicationQueueException;
+
+    /**
+     * enables the current  {@link ReplicationAgent}
+     */
+    void enable();
+
+    /**
+     * disables the current  {@link ReplicationAgent}
+     */
+    void disable();
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentConfiguration.java?rev=1564382&r1=1564381&r2=1564382&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentConfiguration.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentConfiguration.java Tue Feb  4 17:16:36 2014
@@ -21,8 +21,6 @@ package org.apache.sling.replication.age
 import java.util.Arrays;
 import java.util.Dictionary;
 import java.util.Enumeration;
-import java.util.Map;
-
 import org.apache.sling.commons.osgi.PropertiesUtil;
 
 /**
@@ -40,7 +38,9 @@ public class ReplicationAgentConfigurati
 
     public static final String NAME = "name";
 
-    public static final String ENDPOINT = "endpoint";
+    public static final String ENDPOINT = "endpoints";
+
+    public static final String ENDPOINT_STRATEGY = "endpoints.strategy";
 
     public static final String AUTHENTICATION_PROPERTIES = "authentication.properties";
 
@@ -51,7 +51,7 @@ public class ReplicationAgentConfigurati
 
     public static final String USE_AGGREGATE_PATHS = "useAggregatePaths";
 
-    public static final String[] COMPONENTS = { TRANSPORT, PACKAGING };
+    public static final String[] COMPONENTS = {TRANSPORT, PACKAGING};
 
     private final boolean enabled;
 
@@ -125,7 +125,9 @@ public class ReplicationAgentConfigurati
         return targetTransportHandler;
     }
 
-    public String getTargetReplicationQueueDistributionStrategy() { return targetReplicationQueueDistributionStrategy; }
+    public String getTargetReplicationQueueDistributionStrategy() {
+        return targetReplicationQueueDistributionStrategy;
+    }
 
     @Override
     public String toString() {
@@ -136,7 +138,7 @@ public class ReplicationAgentConfigurati
                 + TRANSPORT + "\":\"" + targetTransportHandler + "\", \""
                 + PACKAGING + "\":\"" + targetReplicationPackageBuilder + "\", \""
                 + QUEUEPROVIDER + "\":\"" + targetReplicationQueueProvider + "\", \""
-                + QUEUE_DISTRIBUTION + "\":\"" + targetReplicationQueueDistributionStrategy+ "\", \""
+                + QUEUE_DISTRIBUTION + "\":\"" + targetReplicationQueueDistributionStrategy + "\", \""
                 + TRANSPORT_AUTHENTICATION_FACTORY + "\":\"" + targetAuthenticationHandlerFactory + "\", \""
                 + USE_AGGREGATE_PATHS + "\":\"" + useAggregatePaths + "\", \""
                 + AUTHENTICATION_PROPERTIES + "\":\"" + Arrays.toString(authenticationProperties) + "\", \"";
@@ -152,36 +154,30 @@ public class ReplicationAgentConfigurati
 
         String result = "";
 
-        if(componentConfiguration == null)
+        if (componentConfiguration == null)
             return result;
 
-        for (String component : COMPONENTS){
+        for (String component : COMPONENTS) {
             Dictionary properties = componentConfiguration.get(component);
-            if(properties == null) continue;
+            if (properties == null) continue;
 
             Enumeration keys = properties.keys();
 
-            while (keys.hasMoreElements()){
+            while (keys.hasMoreElements()) {
                 String key = (String) keys.nextElement();
                 Object value = properties.get(key);
 
-                if(key.equals("service.pid")) continue;
+                if (key.equals("service.pid")) continue;
 
-                result += component + "." + key + "\":\"" +  PropertiesUtil.toString(value, "")  + "\", \"";
+                result += component + "." + key + "\":\"" + PropertiesUtil.toString(value, "") + "\", \"";
             }
         }
 
-        return  result;
+        return result;
     }
 
     public String toSimpleString() {
-        String result = "{";
-
-        result += "\"" + NAME + "\": \"" + name + "\""
-                + ", \"" + ENABLED + "\": " + enabled;
-
-        result += "}";
-
-        return result;
+        return "{\"" + NAME + "\": \"" + name + "\""
+                + ", \"" + ENABLED + "\": " + enabled + "}";
     }
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentsManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentsManager.java?rev=1564382&r1=1564381&r2=1564382&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentsManager.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentsManager.java Tue Feb  4 17:16:36 2014
@@ -21,7 +21,6 @@ package org.apache.sling.replication.age
 import java.util.Collection;
 import java.util.SortedSet;
 import org.apache.sling.replication.communication.ReplicationActionType;
-import org.apache.sling.replication.communication.ReplicationEndpoint;
 
 /**
  * A manager for {@link ReplicationAgent}s
@@ -43,6 +42,4 @@ public interface ReplicationAgentsManage
      * @return a <code>Collection</code> of {@link ReplicationAgent}s
      */
     Collection<ReplicationAgent> getAllAvailableAgents();
-
-    Collection<ReplicationAgent> getAgentsFor(ReplicationEndpoint endpoint);
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/DefaultReplicationAgentsManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/DefaultReplicationAgentsManager.java?rev=1564382&r1=1564381&r2=1564382&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/DefaultReplicationAgentsManager.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/DefaultReplicationAgentsManager.java Tue Feb  4 17:16:36 2014
@@ -21,7 +21,6 @@ package org.apache.sling.replication.age
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.LinkedList;
 import java.util.Map;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -35,7 +34,6 @@ import org.apache.felix.scr.annotations.
 import org.apache.sling.replication.agent.ReplicationAgent;
 import org.apache.sling.replication.agent.ReplicationAgentsManager;
 import org.apache.sling.replication.communication.ReplicationActionType;
-import org.apache.sling.replication.communication.ReplicationEndpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,16 +65,6 @@ public class DefaultReplicationAgentsMan
         return Collections.unmodifiableCollection(replicationAgents);
     }
 
-    public Collection<ReplicationAgent> getAgentsFor(ReplicationEndpoint endpoint) {
-        Collection<ReplicationAgent> agents = new LinkedList<ReplicationAgent>();
-        for (ReplicationAgent replicationAgent : replicationAgents) {
-            if (endpoint.getUri().equals(replicationAgent.getEndpoint())) {
-                agents.add(replicationAgent);
-            }
-        }
-        return Collections.unmodifiableCollection(agents);
-    }
-
     @Deactivate
     protected void deactivate() {
         replicationAgents.clear();

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java?rev=1564382&r1=1564381&r2=1564382&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java Tue Feb  4 17:16:36 2014
@@ -41,10 +41,6 @@ import org.apache.sling.replication.rule
 import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
 import org.apache.sling.replication.serialization.impl.vlt.FileVaultReplicationPackageBuilder;
 import org.apache.sling.replication.transport.TransportHandler;
-import org.apache.sling.replication.transport.authentication.TransportAuthenticationProvider;
-import org.apache.sling.replication.transport.authentication.TransportAuthenticationProviderFactory;
-import org.apache.sling.replication.transport.authentication.impl.UserCredentialsTransportAuthenticationProviderFactory;
-import org.apache.sling.replication.transport.impl.HttpTransportHandler;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceReference;
 import org.osgi.framework.ServiceRegistration;
@@ -70,18 +66,12 @@ public class ReplicationAgentServiceFact
 
     private static final String TRANSPORT = ReplicationAgentConfiguration.TRANSPORT;
 
-    private static final String TRANSPORT_AUTHENTICATION_FACTORY = ReplicationAgentConfiguration.TRANSPORT_AUTHENTICATION_FACTORY;
-
     private static final String QUEUEPROVIDER = ReplicationAgentConfiguration.QUEUEPROVIDER;
 
     private static final String PACKAGING = ReplicationAgentConfiguration.PACKAGING;
 
     private static final String QUEUE_DISTRIBUTION = ReplicationAgentConfiguration.QUEUE_DISTRIBUTION;
 
-    private static final String DEFAULT_TRANSPORT = "(name=" + HttpTransportHandler.NAME + ")";
-
-    private static final String DEFAULT_AUTHENTICATION_FACTORY = "(name=" + UserCredentialsTransportAuthenticationProviderFactory.TYPE + ")";
-
     private static final String DEFAULT_PACKAGING = "(name="
             + FileVaultReplicationPackageBuilder.NAME + ")";
 
@@ -98,19 +88,13 @@ public class ReplicationAgentServiceFact
     private static final String NAME = ReplicationAgentConfiguration.NAME;
 
     @Property
-    private static final String ENDPOINT = ReplicationAgentConfiguration.ENDPOINT;
-
-    @Property
-    private static final String AUTHENTICATION_PROPERTIES = ReplicationAgentConfiguration.AUTHENTICATION_PROPERTIES;
-
-    @Property
     private static final String RULES = ReplicationAgentConfiguration.RULES;
 
     @Property(boolValue = true)
     private static final String USE_AGGREGATE_PATHS = ReplicationAgentConfiguration.USE_AGGREGATE_PATHS;
 
-    @Property(name = TRANSPORT, value = DEFAULT_TRANSPORT)
-    @Reference(name = "TransportHandler", target = DEFAULT_TRANSPORT, policy = ReferencePolicy.DYNAMIC)
+    @Property(name = TRANSPORT)
+    @Reference(name = "TransportHandler", policy = ReferencePolicy.DYNAMIC)
     private TransportHandler transportHandler;
 
     @Property(name = PACKAGING, value = DEFAULT_PACKAGING)
@@ -121,10 +105,6 @@ public class ReplicationAgentServiceFact
     @Reference(name = "ReplicationQueueProvider", target = DEFAULT_QUEUEPROVIDER, policy = ReferencePolicy.DYNAMIC)
     private ReplicationQueueProvider queueProvider;
 
-    @Property(name = TRANSPORT_AUTHENTICATION_FACTORY, value = DEFAULT_AUTHENTICATION_FACTORY)
-    @Reference(name = "TransportAuthenticationProviderFactory", target = DEFAULT_AUTHENTICATION_FACTORY, policy = ReferencePolicy.DYNAMIC)
-    private TransportAuthenticationProviderFactory transportAuthenticationProviderFactory;
-
     @Property(name = QUEUE_DISTRIBUTION, value = DEFAULT_DISTRIBUTION)
     @Reference(name = "ReplicationQueueDistributionStrategy", target = DEFAULT_DISTRIBUTION, policy = ReferencePolicy.DYNAMIC)
     private ReplicationQueueDistributionStrategy queueDistributionStrategy;
@@ -144,15 +124,12 @@ public class ReplicationAgentServiceFact
 
         boolean enabled = PropertiesUtil.toBoolean(config.get(ENABLED), true);
         if (enabled) {
-            props.put(ENABLED, enabled);
+            props.put(ENABLED, true);
 
             String name = PropertiesUtil
                     .toString(config.get(NAME), String.valueOf(new Random().nextInt(1000)));
             props.put(NAME, name);
 
-            String endpoint = PropertiesUtil.toString(config.get(ENDPOINT), "");
-            props.put(ENDPOINT, endpoint);
-
             String transport = PropertiesUtil.toString(config.get(TRANSPORT), "");
             props.put(TRANSPORT, transport);
 
@@ -165,15 +142,10 @@ public class ReplicationAgentServiceFact
             String distribution = PropertiesUtil.toString(config.get(QUEUE_DISTRIBUTION), "");
             props.put(QUEUE_DISTRIBUTION, distribution);
 
-            Map<String, String> authenticationProperties = PropertiesUtil.toMap(config.get(AUTHENTICATION_PROPERTIES), new String[0]);
-            props.put(AUTHENTICATION_PROPERTIES, authenticationProperties);
 
             String[] rules = PropertiesUtil.toStringArray(config.get(RULES), new String[0]);
             props.put(RULES, rules);
 
-            String af = PropertiesUtil.toString(config.get(TRANSPORT_AUTHENTICATION_FACTORY), "");
-            props.put(TRANSPORT_AUTHENTICATION_FACTORY, af);
-
 
             boolean useAggregatePaths = PropertiesUtil.toBoolean(config.get(USE_AGGREGATE_PATHS), true);
             props.put(USE_AGGREGATE_PATHS, useAggregatePaths);
@@ -183,32 +155,19 @@ public class ReplicationAgentServiceFact
                 throw new AgentConfigurationException("configuration for this agent is not valid");
             }
 
-            TransportAuthenticationProvider transportAuthenticationProvider = null;
-            if (transportAuthenticationProviderFactory != null) {
-                transportAuthenticationProvider = transportAuthenticationProviderFactory.createAuthenticationProvider(authenticationProperties);
-                if (transportHandler != null && !transportHandler.supportsAuthenticationProvider(transportAuthenticationProvider)) {
-                    throw new Exception("authentication handler " + transportAuthenticationProvider
-                            + " not supported by transport handler " + transportHandler);
-                }
-            }
 
             if (log.isInfoEnabled()) {
                 log.info("bound services for {} :  {} - {} - {} - {} - {} - {}", new Object[]{name,
-                        transportHandler, transportAuthenticationProvider, endpoint, packageBuilder, queueProvider, queueDistributionStrategy});
+                        transportHandler, packageBuilder, queueProvider, queueDistributionStrategy});
             }
 
-            SimpleReplicationAgent agent = new SimpleReplicationAgent(name, endpoint, rules, useAggregatePaths,
-                    transportHandler, packageBuilder, queueProvider, transportAuthenticationProvider, queueDistributionStrategy);
+            ReplicationAgent agent = new SimpleReplicationAgent(name, rules, useAggregatePaths,
+                    transportHandler, packageBuilder, queueProvider, queueDistributionStrategy, replicationRuleEngine);
 
             // register agent service
             agentReg = context.registerService(ReplicationAgent.class.getName(), agent, props);
 
-            // apply rules if any
-            if (rules.length > 0) {
-                replicationRuleEngine.applyRules(agent, rules);
-            }
-
-            queueProvider.enableQueueProcessing(agent, agent);
+            agent.enable();
         }
     }
 
@@ -218,12 +177,7 @@ public class ReplicationAgentServiceFact
             ServiceReference reference = agentReg.getReference();
             ReplicationAgent replicationAgent = (ReplicationAgent) context.getService(reference);
 
-            String[] rules = replicationAgent.getRules();
-            if (rules != null) {
-                replicationRuleEngine.unapplyRules(replicationAgent, rules);
-            }
-
-           queueProvider.disableQueueProcessing(replicationAgent);
+            replicationAgent.disable();
 
             if (agentReg != null) {
                 agentReg.unregister();

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java?rev=1564382&r1=1564381&r2=1564382&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java Tue Feb  4 17:16:36 2014
@@ -18,20 +18,18 @@
  */
 package org.apache.sling.replication.agent.impl;
 
-import java.net.URI;
 import org.apache.sling.replication.agent.AgentReplicationException;
 import org.apache.sling.replication.agent.ReplicationAgent;
-import org.apache.sling.replication.communication.ReplicationEndpoint;
 import org.apache.sling.replication.communication.ReplicationRequest;
 import org.apache.sling.replication.communication.ReplicationResponse;
 import org.apache.sling.replication.queue.*;
+import org.apache.sling.replication.rule.ReplicationRuleEngine;
 import org.apache.sling.replication.serialization.ReplicationPackage;
 import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
 import org.apache.sling.replication.serialization.ReplicationPackageBuildingException;
 import org.apache.sling.replication.queue.ReplicationQueueItem;
 import org.apache.sling.replication.transport.ReplicationTransportException;
 import org.apache.sling.replication.transport.TransportHandler;
-import org.apache.sling.replication.transport.authentication.TransportAuthenticationProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,34 +49,32 @@ public class SimpleReplicationAgent impl
 
     private final TransportHandler transportHandler;
 
-    private final TransportAuthenticationProvider<?, ?> transportAuthenticationProvider;
-
     private final ReplicationQueueDistributionStrategy queueDistributionStrategy;
 
     private final String name;
 
-    private final String endpoint;
-
     private final String[] rules;
 
+    String endpoint;
+
     private final boolean useAggregatePaths;
+    private final ReplicationRuleEngine ruleEngine;
 
-    public SimpleReplicationAgent(String name, String endpoint, String[] rules,
+    public SimpleReplicationAgent(String name, String[] rules,
                                   boolean useAggregatePaths,
                                   TransportHandler transportHandler,
                                   ReplicationPackageBuilder packageBuilder,
                                   ReplicationQueueProvider queueProvider,
-                                  TransportAuthenticationProvider<?, ?> transportAuthenticationProvider,
-                                  ReplicationQueueDistributionStrategy queueDistributionHandler) {
+                                  ReplicationQueueDistributionStrategy queueDistributionHandler,
+                                  ReplicationRuleEngine ruleEngine) {
         this.name = name;
-        this.endpoint = endpoint;
         this.rules = rules;
         this.transportHandler = transportHandler;
         this.packageBuilder = packageBuilder;
         this.queueProvider = queueProvider;
-        this.transportAuthenticationProvider = transportAuthenticationProvider;
         this.queueDistributionStrategy = queueDistributionHandler;
         this.useAggregatePaths = useAggregatePaths;
+        this.ruleEngine = ruleEngine;
     }
 
     public ReplicationResponse execute(ReplicationRequest replicationRequest)
@@ -87,9 +83,7 @@ public class SimpleReplicationAgent impl
         // create packages from request
         ReplicationPackage[] replicationPackages = buildPackages(replicationRequest);
 
-        ReplicationResponse replicationResponse = schedule(replicationPackages, false);
-
-        return replicationResponse;
+        return schedule(replicationPackages, false);
     }
 
     public void send(ReplicationRequest replicationRequest) throws AgentReplicationException {
@@ -99,6 +93,10 @@ public class SimpleReplicationAgent impl
         schedule(replicationPackages, true);
     }
 
+    public boolean isPassive() {
+        return transportHandler == null; // TODO : improve this
+    }
+
 
     private ReplicationPackage buildPackage(ReplicationRequest replicationRequest) throws AgentReplicationException {
         // create package from request
@@ -124,13 +122,13 @@ public class SimpleReplicationAgent impl
             for (String path : replicationRequest.getPaths()){
                 ReplicationPackage replicationPackage = buildPackage(new ReplicationRequest(replicationRequest.getTime(),
                         replicationRequest.getAction(),
-                        new String[] { path }));
+                        path));
 
                 packages.add(replicationPackage);
             }
         }
 
-        return packages.toArray(new ReplicationPackage[0]);
+        return packages.toArray(new ReplicationPackage[packages.size()]);
     }
 
     // offer option throws an exception at first error
@@ -188,25 +186,20 @@ public class SimpleReplicationAgent impl
     public boolean process(ReplicationQueueItem itemInfo)  {
         try {
             ReplicationPackage replicationPackage = packageBuilder.getPackage(itemInfo.getId());
+
             if(replicationPackage == null){
                 return false;
             }
 
-            try {
-                if (transportHandler != null || (endpoint != null && endpoint.length() > 0)) {
-                    transportHandler.transport(replicationPackage,
-                            new ReplicationEndpoint(endpoint),
-                            transportAuthenticationProvider);
-
-                    return true;
-                } else {
-                    log.info("agent {} processing skipped", name);
-                    return false;
-                }
-            }
-            finally {
+            if (transportHandler != null) {
+                transportHandler.transport(replicationPackage);
                 replicationPackage.delete();
+                return true;
+            } else {
+                log.info("agent {} processing skipped", name);
+                return false;
             }
+
         } catch (ReplicationTransportException e) {
             log.error("transport error", e);
             return false;
@@ -214,18 +207,15 @@ public class SimpleReplicationAgent impl
     }
 
     public ReplicationPackage removeHead(String queueName) throws ReplicationQueueException {
+        if(isPassive()) return null;
+
         ReplicationQueue queue = getQueue(queueName);
         ReplicationQueueItem info = queue.getHead();
         if(info == null) return null;
 
         queue.removeHead();
 
-        ReplicationPackage replicationPackage = packageBuilder.getPackage(info.getId());
-        return replicationPackage;
-    }
-
-    public URI getEndpoint() {
-        return new ReplicationEndpoint(endpoint).getUri();
+        return packageBuilder.getPackage(info.getId());
     }
 
     public String getName() {
@@ -242,8 +232,23 @@ public class SimpleReplicationAgent impl
         return queue;
     }
 
-    public String[] getRules() {
-        return rules;
+
+    public void enable() {
+        // apply rules if any
+        if (rules.length > 0) {
+            ruleEngine.applyRules(this, rules);
+        }
+
+        if(!isPassive())
+            queueProvider.enableQueueProcessing(this, this);    }
+
+    public void disable() {
+        if (rules != null) {
+            ruleEngine.unapplyRules(this, rules);
+        }
+
+        if(!isPassive())
+            queueProvider.disableQueueProcessing(this);
     }
 
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java?rev=1564382&r1=1564381&r2=1564382&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java Tue Feb  4 17:16:36 2014
@@ -59,7 +59,6 @@ public class JobHandlingReplicationQueue
     private Map<String, ServiceRegistration> jobs = new ConcurrentHashMap<String, ServiceRegistration>();
     private BundleContext context;
 
-
     @Override
     protected ReplicationQueue getOrCreateQueue(ReplicationAgent agent, String queueName)
                     throws ReplicationQueueException {
@@ -92,9 +91,6 @@ public class JobHandlingReplicationQueue
     }
 
     public void enableQueueProcessing(ReplicationAgent agent, ReplicationQueueProcessor queueProcessor) {
-        // TODO: make this configurable (whether to create self processing queues or not)
-        if (agent.getEndpoint() == null || agent.getEndpoint().toString().length() == 0) return;
-
         // eventually register job consumer for sling job handling based queues
         Dictionary<String, Object> jobProps = new Hashtable<String, Object>();
         String topic = JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC + '/' + agent.getName();
@@ -105,7 +101,6 @@ public class JobHandlingReplicationQueue
         jobs.put(agent.getName(), jobReg);
     }
 
-
     public void disableQueueProcessing(ReplicationAgent agent) {
         ServiceRegistration jobReg = jobs.remove(agent.getName());
         if (jobReg != null) {

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java?rev=1564382&r1=1564381&r2=1564382&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentServlet.java Tue Feb  4 17:16:36 2014
@@ -69,11 +69,9 @@ public class ReplicationAgentServlet ext
 
         if(ReplicationActionType.POLL.getName().equalsIgnoreCase(action)){
             doRemove(request, response);
-            return;
         }
         else {
             doCreate(request, response);
-            return;
         }
     }
 
@@ -127,7 +125,7 @@ public class ReplicationAgentServlet ext
 
         /* directly polling an agent queue is only possible if such an agent doesn't have its own endpoint
         (that is it just adds items to its queue to be polled remotely)*/
-        if (agent != null && (agent.getEndpoint() == null || agent.getEndpoint().toString().length() == 0 )) {
+        if (agent != null) {
             try {
                 // TODO : consider using queue distribution strategy and validating who's making this request
                 log.info("getting item from queue {}", queueName);

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/TransportHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/TransportHandler.java?rev=1564382&r1=1564381&r2=1564382&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/TransportHandler.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/TransportHandler.java Tue Feb  4 17:16:36 2014
@@ -33,22 +33,8 @@ public interface TransportHandler {
      * transport and the supplied {@link TransportAuthenticationProvider} for authenticating the endpoint
      *
      * @param replicationPackage  a {@link ReplicationPackage} to transport
-     * @param replicationEndpoint a {@link ReplicationEndpoint} to transport the package to
-     * @param transportAuthenticationProvider
-     *                            a {@link TransportAuthenticationProvider} to authenticate the endpoint
      * @throws ReplicationTransportException if any error occurs during the transport
      */
-    void transport(ReplicationPackage replicationPackage, ReplicationEndpoint replicationEndpoint,
-                   TransportAuthenticationProvider<?, ?> transportAuthenticationProvider)
-            throws ReplicationTransportException;
-
-    /**
-     * defines if this {@link TransportHandler} can be authenticated using the given {@link TransportAuthenticationProvider}
-     *
-     * @param transportAuthenticationProvider
-     *         a {@link TransportAuthenticationProvider} to be used to authenticate {@link ReplicationEndpoint}s with this {@link TransportHandler}
-     * @return <code>true</code> if the given {@link TransportAuthenticationProvider} is supported by this {@link TransportHandler}
-     */
-    boolean supportsAuthenticationProvider(TransportAuthenticationProvider<?, ?> transportAuthenticationProvider);
+    void transport(ReplicationPackage replicationPackage) throws ReplicationTransportException;
 
 }
\ No newline at end of file

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandler.java?rev=1564382&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandler.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandler.java Tue Feb  4 17:16:36 2014
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.transport.impl;
+
+import org.apache.sling.replication.communication.ReplicationEndpoint;
+import org.apache.sling.replication.serialization.ReplicationPackage;
+import org.apache.sling.replication.transport.ReplicationTransportException;
+import org.apache.sling.replication.transport.TransportHandler;
+
+public abstract class AbstractTransportHandler implements TransportHandler {
+    private final ReplicationEndpoint[] endpoints;
+    private final TransportEndpointStrategyType endpointStrategyType;
+
+    private int lastSuccessfulEnpointId = 0;
+
+    public AbstractTransportHandler(ReplicationEndpoint[] endpoints, TransportEndpointStrategyType endpointStrategyType) {
+        this.endpoints = endpoints;
+        this.endpointStrategyType = endpointStrategyType;
+    }
+
+    public void transport(ReplicationPackage replicationPackage)
+            throws ReplicationTransportException {
+
+        ReplicationTransportException lastException = null;
+        int offset = 0;
+        if (endpointStrategyType.equals(TransportEndpointStrategyType.OneSuccessful)) {
+            offset = lastSuccessfulEnpointId;
+        }
+
+        for (int i = 0; i < endpoints.length; i++) {
+            int currentId = (offset + i) % endpoints.length;
+
+            ReplicationEndpoint replicationEndpoint = endpoints[currentId];
+            try {
+                deliverPackage(replicationPackage, replicationEndpoint);
+                lastSuccessfulEnpointId = currentId;
+                if (endpointStrategyType.equals(TransportEndpointStrategyType.FirstSuccessful) ||
+                        endpointStrategyType.equals(TransportEndpointStrategyType.OneSuccessful))
+                    return;
+            } catch (ReplicationTransportException ex) {
+                lastException = ex;
+            }
+        }
+
+        if (lastException != null)
+            throw lastException;
+
+    }
+
+    private void deliverPackage(ReplicationPackage replicationPackage, ReplicationEndpoint replicationEndpoint)
+            throws ReplicationTransportException {
+        if (!validateEndpoint(replicationEndpoint))
+            throw new ReplicationTransportException("invalid endpoint " + replicationEndpoint.getUri());
+
+        try {
+            deliverPackageToEndpoint(replicationPackage, replicationEndpoint);
+        } catch (Exception e) {
+            throw new ReplicationTransportException(e);
+        }
+    }
+
+    protected abstract void deliverPackageToEndpoint(ReplicationPackage replicationPackage,
+                                                     ReplicationEndpoint replicationEndpoint) throws Exception;
+
+    protected abstract boolean validateEndpoint(ReplicationEndpoint endpoint);
+
+}

Propchange: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandlerFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandlerFactory.java?rev=1564382&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandlerFactory.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandlerFactory.java Tue Feb  4 17:16:36 2014
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.transport.impl;
+
+import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.replication.agent.ReplicationAgentConfiguration;
+import org.apache.sling.replication.communication.ReplicationEndpoint;
+import org.apache.sling.replication.transport.TransportHandler;
+import org.apache.sling.replication.transport.authentication.TransportAuthenticationProvider;
+import org.apache.sling.replication.transport.authentication.TransportAuthenticationProviderFactory;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+
+import java.util.*;
+
+public abstract class AbstractTransportHandlerFactory {
+
+    private ServiceRegistration serviceRegistration;
+
+    protected void activate(BundleContext context, Map<String, ?> config) throws Exception {
+
+        // inject configuration
+        Dictionary<String, Object> props = new Hashtable<String, Object>();
+
+        boolean enabled = PropertiesUtil.toBoolean(config.get(ReplicationAgentConfiguration.ENABLED), true);
+        if (enabled) {
+            String name = PropertiesUtil
+                    .toString(config.get(ReplicationAgentConfiguration.NAME), String.valueOf(new Random().nextInt(1000)));
+            props.put(ReplicationAgentConfiguration.NAME, name);
+
+
+            Map<String, String> authenticationProperties = PropertiesUtil.toMap(config.get(ReplicationAgentConfiguration.AUTHENTICATION_PROPERTIES), new String[0]);
+            props.put(ReplicationAgentConfiguration.AUTHENTICATION_PROPERTIES, authenticationProperties);
+
+            String[] endpoints = PropertiesUtil.toStringArray(config.get(ReplicationAgentConfiguration.ENDPOINT), new String[0]);
+            props.put(ReplicationAgentConfiguration.ENDPOINT, endpoints);
+
+            String endpointStrategyName = PropertiesUtil.toString(config.get(ReplicationAgentConfiguration.ENDPOINT_STRATEGY), "All");
+            props.put(ReplicationAgentConfiguration.ENDPOINT_STRATEGY, endpointStrategyName);
+
+            TransportEndpointStrategyType transportEndpointStrategyType = TransportEndpointStrategyType.valueOf(endpointStrategyName);
+
+
+            TransportAuthenticationProviderFactory transportAuthenticationProviderFactory = getAuthenticationFactory();
+            TransportAuthenticationProvider transportAuthenticationProvider = null;
+            if (transportAuthenticationProviderFactory != null) {
+                transportAuthenticationProvider = transportAuthenticationProviderFactory.createAuthenticationProvider(authenticationProperties);
+            }
+
+            List<ReplicationEndpoint> replicationEndpoints = new ArrayList<ReplicationEndpoint>();
+
+            for(String endpoint : endpoints){
+                if(endpoint != null && endpoint.length() > 0){
+                    replicationEndpoints.add(new ReplicationEndpoint(endpoint));
+                }
+            }
+
+            // register transport handler
+            TransportHandler transportHandler = createTransportHandler(config,
+                    props,
+                    transportAuthenticationProvider,
+                    replicationEndpoints.toArray(new ReplicationEndpoint[replicationEndpoints.size()]),
+                    transportEndpointStrategyType);
+
+
+            if (transportHandler == null) {
+                throw new Exception("could not create transport handler");
+            }
+
+            serviceRegistration = context.registerService(TransportHandler.class.getName(), transportHandler, props);
+        }
+    }
+
+    protected void deactivate() {
+        if(serviceRegistration != null){
+            serviceRegistration.unregister();
+            serviceRegistration = null;
+        }
+    }
+
+    protected abstract TransportHandler createTransportHandler(Map<String, ?> config,
+                                                               Dictionary<String, Object> props,
+                                                               TransportAuthenticationProvider transportAuthenticationProvider,
+                                                               ReplicationEndpoint[] endpoints,
+                                                               TransportEndpointStrategyType endpointStrategyType);
+
+    protected abstract TransportAuthenticationProviderFactory getAuthenticationFactory();
+
+}

Propchange: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandlerFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandler.java?rev=1564382&r1=1564381&r2=1564382&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandler.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandler.java Tue Feb  4 17:16:36 2014
@@ -21,8 +21,9 @@ package org.apache.sling.replication.tra
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.*;
-
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import org.apache.commons.io.IOUtils;
 import org.apache.http.client.fluent.Content;
 import org.apache.http.client.fluent.Executor;
@@ -32,7 +33,6 @@ import org.apache.http.entity.ContentTyp
 import org.apache.sling.replication.communication.ReplicationEndpoint;
 import org.apache.sling.replication.communication.ReplicationHeader;
 import org.apache.sling.replication.serialization.ReplicationPackage;
-import org.apache.sling.replication.transport.ReplicationTransportException;
 import org.apache.sling.replication.transport.TransportHandler;
 import org.apache.sling.replication.transport.authentication.TransportAuthenticationContext;
 import org.apache.sling.replication.transport.authentication.TransportAuthenticationProvider;
@@ -42,14 +42,15 @@ import org.slf4j.LoggerFactory;
 /**
  * basic HTTP POST {@link TransportHandler}
  */
-public class HttpTransportHandler implements TransportHandler {
-
-    public static final String NAME = "http";
+public class HttpTransportHandler extends AbstractTransportHandler
+        implements TransportHandler {
 
     private static final String PATH_VARIABLE_NAME = "{path}";
 
     private static final Logger log = LoggerFactory.getLogger(HttpTransportHandler.class);
 
+    private final TransportAuthenticationProvider<Executor, Executor> transportAuthenticationProvider;
+
     private final boolean useCustomHeaders;
 
     private final String[] customHeaders;
@@ -58,39 +59,43 @@ public class HttpTransportHandler implem
 
     private final String customBody;
 
-    public HttpTransportHandler(boolean useCustomHeaders, String[] customHeaders, boolean useCustomBody, String customBody) {
+    public HttpTransportHandler(boolean useCustomHeaders,
+                                String[] customHeaders,
+                                boolean useCustomBody,
+                                String customBody,
+                                TransportAuthenticationProvider<Executor, Executor> transportAuthenticationProvider,
+                                ReplicationEndpoint[] replicationEndpoints,
+                                TransportEndpointStrategyType endpointStrategyType) {
+
+        super(replicationEndpoints, endpointStrategyType);
+
         this.useCustomHeaders = useCustomHeaders;
         this.customHeaders = customHeaders;
         this.useCustomBody = useCustomBody;
         this.customBody = customBody;
-    }
+        this.transportAuthenticationProvider = transportAuthenticationProvider;
 
-    public HttpTransportHandler(){
-        this(false, new String[0], false, "");
     }
 
-    @SuppressWarnings("unchecked")
-    public void transport(ReplicationPackage replicationPackage,
-                          ReplicationEndpoint replicationEndpoint,
-                          TransportAuthenticationProvider<?, ?> transportAuthenticationProvider)
-            throws ReplicationTransportException {
-        if (log.isInfoEnabled()) {
-            log.info("delivering package {} to {} using auth {}",
-                    new Object[]{replicationPackage.getId(),
-                            replicationEndpoint.getUri(), transportAuthenticationProvider});
-        }
-        try {
-            Executor executor = Executor.newInstance();
-            TransportAuthenticationContext context = new TransportAuthenticationContext();
-            context.addAttribute("endpoint", replicationEndpoint);
-            executor = ((TransportAuthenticationProvider<Executor, Executor>) transportAuthenticationProvider)
-                    .authenticate(executor, context);
+    @Override
+    public void deliverPackageToEndpoint(ReplicationPackage replicationPackage,
+                                         ReplicationEndpoint replicationEndpoint) throws Exception {
+        log.info("delivering package {} to {} using auth {}",
+                new Object[]{replicationPackage.getId(),
+                        replicationEndpoint.getUri(), transportAuthenticationProvider});
 
-            deliverPackage(executor, replicationPackage, replicationEndpoint);
 
-        } catch (Exception e) {
-            throw new ReplicationTransportException(e);
-        }
+        Executor executor = Executor.newInstance();
+        TransportAuthenticationContext context = new TransportAuthenticationContext();
+        context.addAttribute("endpoint", replicationEndpoint);
+        executor =  transportAuthenticationProvider.authenticate(executor, context);
+
+        deliverPackage(executor, replicationPackage, replicationEndpoint);
+    }
+
+    @Override
+    protected boolean validateEndpoint(ReplicationEndpoint endpoint) {
+        return true;
     }
 
     public static String[] getCustomizedHeaders(String[] additionalHeaders, String action, String[] paths){
@@ -111,14 +116,12 @@ public class HttpTransportHandler implem
             }
         }
 
-
-
         StringBuilder sb = new StringBuilder();
 
         if(paths != null && paths.length > 0) {
             sb.append(paths[0]);
             for(int i=1; i < paths.length; i++){
-                sb.append(", " + paths[i]);
+                sb.append(", ").append(paths[i]);
             }
         }
 
@@ -130,7 +133,7 @@ public class HttpTransportHandler implem
             boundHeaders.add(header.replace(PATH_VARIABLE_NAME, path));
         }
 
-        return boundHeaders.toArray(new String[0]);
+        return boundHeaders.toArray(new String[boundHeaders.size()]);
     }
 
     private void deliverPackage(Executor executor, ReplicationPackage replicationPackage,
@@ -190,9 +193,4 @@ public class HttpTransportHandler implem
         String headerValue = header.substring(idx+1).trim();
         req.addHeader(headerName, headerValue);
     }
-
-
-    public boolean supportsAuthenticationProvider(TransportAuthenticationProvider<?, ?> transportAuthenticationProvider) {
-        return transportAuthenticationProvider.canAuthenticate(Executor.class);
-    }
 }
\ No newline at end of file

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerFactory.java?rev=1564382&r1=1564381&r2=1564382&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerFactory.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerFactory.java Tue Feb  4 17:16:36 2014
@@ -19,39 +19,63 @@
 package org.apache.sling.replication.transport.impl;
 
 import org.apache.felix.scr.annotations.*;
+import org.apache.http.client.fluent.Executor;
 import org.apache.sling.commons.osgi.PropertiesUtil;
 import org.apache.sling.replication.agent.ReplicationAgentConfiguration;
 import org.apache.sling.replication.communication.ReplicationEndpoint;
-import org.apache.sling.replication.serialization.ReplicationPackage;
-import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
-import org.apache.sling.replication.serialization.impl.vlt.FileVaultReplicationPackageBuilder;
-import org.apache.sling.replication.transport.ReplicationTransportException;
 import org.apache.sling.replication.transport.TransportHandler;
 import org.apache.sling.replication.transport.authentication.TransportAuthenticationProvider;
+import org.apache.sling.replication.transport.authentication.TransportAuthenticationProviderFactory;
+import org.apache.sling.replication.transport.authentication.impl.UserCredentialsTransportAuthenticationProviderFactory;
 import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceRegistration;
 
 import java.util.Dictionary;
-import java.util.Hashtable;
 import java.util.Map;
-import java.util.Random;
 
 @Component(metatype = true,
-        label = "Replication Http Transport Handler Factory",
+        label = "Replication Transport Handler Factory - Http Push",
         description = "OSGi configuration based HttpTransportHandler service factory",
         name = HttpTransportHandlerFactory.SERVICE_PID,
         configurationFactory = true,
         specVersion = "1.1",
         policy = ConfigurationPolicy.REQUIRE)
-public class HttpTransportHandlerFactory {
+public class HttpTransportHandlerFactory extends AbstractTransportHandlerFactory {
     static final String SERVICE_PID = "org.apache.sling.replication.transport.impl.HttpTransportHandlerFactory";
 
+    private static final String DEFAULT_AUTHENTICATION_FACTORY = "(name=" + UserCredentialsTransportAuthenticationProviderFactory.TYPE + ")";
+
 
     @Property(boolValue = true)
     private static final String ENABLED = "enabled";
 
     @Property
-    private static final String NAME = ReplicationAgentConfiguration.NAME;
+    private static final String NAME = "name";
+
+    @Property(cardinality = 1000)
+    private static final String ENDPOINT = ReplicationAgentConfiguration.ENDPOINT;
+
+
+    @Property(options = {
+            @PropertyOption(name = "All",
+                    value = "all endpoints"
+            ),
+            @PropertyOption(name = "OneSuccessful",
+                value = "one successful endpoint"
+            ),
+            @PropertyOption(name = "FirstSuccessful",
+                    value = "first successful endpoint"
+            )},
+            value = "All"
+    )
+    private static final String ENDPOINT_STRATEGY = ReplicationAgentConfiguration.ENDPOINT_STRATEGY;
+
+    @Property(name = ReplicationAgentConfiguration.TRANSPORT_AUTHENTICATION_FACTORY, value = DEFAULT_AUTHENTICATION_FACTORY)
+    @Reference(name = "TransportAuthenticationProviderFactory", target = DEFAULT_AUTHENTICATION_FACTORY, policy = ReferencePolicy.DYNAMIC)
+    private TransportAuthenticationProviderFactory transportAuthenticationProviderFactory;
+
+    @Property
+    private static final String AUTHENTICATION_PROPERTIES = ReplicationAgentConfiguration.AUTHENTICATION_PROPERTIES;
+
 
     @Property(boolValue = false)
     private static final String USE_CUSTOM_HEADERS = "useCustomHeaders";
@@ -65,44 +89,45 @@ public class HttpTransportHandlerFactory
     @Property
     private static final String CUSTOM_BODY = "customBody";
 
-    private ServiceRegistration serviceRegistration;
-
-    @Activate
-    public void activate(BundleContext context, Map<String, ?> config) throws Exception {
-
-        // inject configuration
-        Dictionary<String, Object> props = new Hashtable<String, Object>();
-
-        boolean enabled = PropertiesUtil.toBoolean(config.get(ENABLED), true);
-        if (enabled) {
-            String name = PropertiesUtil
-                    .toString(config.get(NAME), String.valueOf(new Random().nextInt(1000)));
-            props.put(NAME, name);
-
-            boolean useCustomHeaders = PropertiesUtil.toBoolean(config.get(USE_CUSTOM_HEADERS), false);
-            props.put(USE_CUSTOM_HEADERS, useCustomHeaders);
-
-            String[] customHeaders = PropertiesUtil.toStringArray(config.get(CUSTOM_HEADERS), new String[0]);
-            props.put(CUSTOM_HEADERS, customHeaders);
-
-            boolean useCustomBody = PropertiesUtil.toBoolean(config.get(USE_CUSTOM_BODY), false);
-            props.put(USE_CUSTOM_BODY, useCustomBody);
-
-            String customBody = PropertiesUtil.toString(config.get(CUSTOM_BODY), "");
-            props.put(CUSTOM_BODY, customBody);
+    protected TransportHandler createTransportHandler(Map<String, ?> config,
+                                                      Dictionary<String, Object> props,
+                                                      TransportAuthenticationProvider transportAuthenticationProvider,
+                                                      ReplicationEndpoint[] endpoints, TransportEndpointStrategyType endpointStrategyType) {
+        boolean useCustomHeaders = PropertiesUtil.toBoolean(config.get(USE_CUSTOM_HEADERS), false);
+        props.put(USE_CUSTOM_HEADERS, useCustomHeaders);
+
+        String[] customHeaders = PropertiesUtil.toStringArray(config.get(CUSTOM_HEADERS), new String[0]);
+        props.put(CUSTOM_HEADERS, customHeaders);
+
+        boolean useCustomBody = PropertiesUtil.toBoolean(config.get(USE_CUSTOM_BODY), false);
+        props.put(USE_CUSTOM_BODY, useCustomBody);
+
+        String customBody = PropertiesUtil.toString(config.get(CUSTOM_BODY), "");
+        props.put(CUSTOM_BODY, customBody);
+
+        return new HttpTransportHandler(useCustomHeaders,
+                customHeaders,
+                useCustomBody,
+                customBody,
+                (TransportAuthenticationProvider<Executor, Executor>) transportAuthenticationProvider,
+                endpoints,
+                endpointStrategyType);
+    }
 
-            // register transport handler
-            TransportHandler transportHandler = new HttpTransportHandler(useCustomHeaders, customHeaders,  useCustomBody, customBody);
+    @Override
+    protected TransportAuthenticationProviderFactory getAuthenticationFactory() {
+        return transportAuthenticationProviderFactory;
+    }
 
-            serviceRegistration = context.registerService(TransportHandler.class.getName(), transportHandler , props);
-        }
+    @Activate
+    protected void activate(BundleContext context, Map<String, ?> config) throws Exception {
+        super.activate(context, config);
     }
 
     @Deactivate
-    private void deactivate() {
-        if(serviceRegistration != null){
-            serviceRegistration.unregister();
-            serviceRegistration = null;
-        }
+    protected void deactivate() {
+        super.deactivate();
     }
+
+
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandler.java?rev=1564382&r1=1564381&r2=1564382&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandler.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandler.java Tue Feb  4 17:16:36 2014
@@ -18,106 +18,85 @@
  */
 package org.apache.sling.replication.transport.impl;
 
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Property;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.Service;
 import org.apache.http.Header;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.fluent.Executor;
 import org.apache.http.client.fluent.Request;
-import org.apache.sling.commons.osgi.PropertiesUtil;
 import org.apache.sling.replication.communication.ReplicationActionType;
 import org.apache.sling.replication.communication.ReplicationEndpoint;
 import org.apache.sling.replication.communication.ReplicationHeader;
 import org.apache.sling.replication.serialization.ReplicationPackage;
 import org.apache.sling.replication.serialization.ReplicationPackageImporter;
-import org.apache.sling.replication.transport.ReplicationTransportException;
 import org.apache.sling.replication.transport.TransportHandler;
 import org.apache.sling.replication.transport.authentication.TransportAuthenticationContext;
 import org.apache.sling.replication.transport.authentication.TransportAuthenticationProvider;
-import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * basic HTTP GET {@link TransportHandler}
  */
-@Component(metatype = true)
-@Service(value = TransportHandler.class)
-@Property(name = "name", value = PollingTransportHandler.NAME, propertyPrivate = true)
-public class PollingTransportHandler implements TransportHandler {
-
-    public static final String NAME = "poll";
+public class PollingTransportHandler extends AbstractTransportHandler
+        implements TransportHandler {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    @Property(name = "poll items", description = "number of subsequent poll requests to make", intValue = -1)
-    private static final String POLL_ITEMS = "poll.items";
-
-    private int pollItems;
-
-    @Reference
-    private ReplicationPackageImporter replicationPackageImporter;
-
-    @Activate
-    protected void activate(ComponentContext context) {
-        pollItems = PropertiesUtil.toInteger(context.getProperties().get(POLL_ITEMS), -1);
+    private final TransportAuthenticationProvider<Executor, Executor> transportAuthenticationProvider;
+    private final ReplicationPackageImporter replicationPackageImporter;
+    private final int pollItems;
+
+    public PollingTransportHandler(ReplicationPackageImporter replicationPackageImporter,
+                                   int pollItems,
+                                   TransportAuthenticationProvider<Executor, Executor> transportAuthenticationProvider,
+                                   ReplicationEndpoint[] replicationEndpoints){
+        super(replicationEndpoints, TransportEndpointStrategyType.All);
+
+        this.replicationPackageImporter = replicationPackageImporter;
+        this.pollItems = pollItems;
+        this.transportAuthenticationProvider = transportAuthenticationProvider;
     }
 
-    @SuppressWarnings("unchecked")
-    public void transport(ReplicationPackage replicationPackage,
-                          ReplicationEndpoint replicationEndpoint,
-                          TransportAuthenticationProvider<?, ?> transportAuthenticationProvider)
-            throws ReplicationTransportException {
-        if (log.isInfoEnabled()) {
-            log.info("polling from {}", replicationEndpoint.getUri());
-        }
-
-        try {
-            Executor executor = Executor.newInstance();
-            TransportAuthenticationContext context = new TransportAuthenticationContext();
-            context.addAttribute("endpoint", replicationEndpoint);
-            executor = ((TransportAuthenticationProvider<Executor, Executor>) transportAuthenticationProvider)
-                    .authenticate(executor, context);
-
-            Request req = Request.Post(replicationEndpoint.getUri())
-                    .addHeader(ReplicationHeader.ACTION.toString(), ReplicationActionType.POLL.getName())
-                    .useExpectContinue();
-            // TODO : add queue header
-
-            int polls = pollItems;
-
-            // continuously requests package streams as long as type header is received with the response (meaning there's a package of a certain type)
-            HttpResponse httpResponse;
-            while ((httpResponse = executor.execute(req).returnResponse()).containsHeader(ReplicationHeader.TYPE.toString())
-                    && polls != 0) {
-                HttpEntity entity = httpResponse.getEntity();
-                Header typeHeader = httpResponse.getFirstHeader(ReplicationHeader.TYPE.toString());
-
-                if (entity.getContentLength() > 0) {
-                    replicationPackageImporter.scheduleImport(entity.getContent(), typeHeader.getValue());
-                    polls--;
-                    if (log.isInfoEnabled()) {
-                        log.info("scheduled import of package stream");
-                    }
-
-                } else {
-                    if (log.isInfoEnabled()) {
-                        log.info("nothing to fetch");
-                    }
-                    break;
-                }
+    @Override
+    public void deliverPackageToEndpoint(ReplicationPackage replicationPackage, ReplicationEndpoint replicationEndpoint)
+            throws Exception {
+        log.info("polling from {}", replicationEndpoint.getUri());
+
+
+        Executor executor = Executor.newInstance();
+        TransportAuthenticationContext context = new TransportAuthenticationContext();
+        context.addAttribute("endpoint", replicationEndpoint);
+        executor = transportAuthenticationProvider.authenticate(executor, context);
+
+        Request req = Request.Post(replicationEndpoint.getUri())
+                .addHeader(ReplicationHeader.ACTION.toString(), ReplicationActionType.POLL.getName())
+                .useExpectContinue();
+        // TODO : add queue header
+
+        int polls = pollItems;
+
+        // continuously requests package streams as long as type header is received with the response (meaning there's a package of a certain type)
+        HttpResponse httpResponse;
+        while ((httpResponse = executor.execute(req).returnResponse()).containsHeader(ReplicationHeader.TYPE.toString())
+                && polls != 0) {
+            HttpEntity entity = httpResponse.getEntity();
+            Header typeHeader = httpResponse.getFirstHeader(ReplicationHeader.TYPE.toString());
+
+            if (entity.getContentLength() > 0) {
+                replicationPackageImporter.scheduleImport(entity.getContent(), typeHeader.getValue());
+                polls--;
+                log.info("scheduled import of package stream");
+
+            } else {
+                log.info("nothing to fetch");
+                break;
             }
-        } catch (Exception e) {
-            throw new ReplicationTransportException(e);
         }
 
     }
 
-    public boolean supportsAuthenticationProvider(TransportAuthenticationProvider<?, ?> transportAuthenticationProvider) {
-        return transportAuthenticationProvider.canAuthenticate(Executor.class);
+    @Override
+    protected boolean validateEndpoint(ReplicationEndpoint endpoint) {
+        return true;
     }
 }

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandlerFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandlerFactory.java?rev=1564382&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandlerFactory.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandlerFactory.java Tue Feb  4 17:16:36 2014
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.transport.impl;
+
+import org.apache.felix.scr.annotations.*;
+import org.apache.http.client.fluent.Executor;
+import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.replication.agent.ReplicationAgentConfiguration;
+import org.apache.sling.replication.communication.ReplicationEndpoint;
+import org.apache.sling.replication.serialization.ReplicationPackageImporter;
+import org.apache.sling.replication.transport.TransportHandler;
+import org.apache.sling.replication.transport.authentication.TransportAuthenticationProvider;
+import org.apache.sling.replication.transport.authentication.TransportAuthenticationProviderFactory;
+import org.apache.sling.replication.transport.authentication.impl.UserCredentialsTransportAuthenticationProviderFactory;
+import org.osgi.framework.BundleContext;
+
+import java.util.Dictionary;
+import java.util.Map;
+
+@Component(metatype = true,
+        label = "Replication Transport Handler Factory - Http Poll",
+        description = "OSGi configuration based PollingTransportHandler service factory",
+        name = PollingTransportHandlerFactory.SERVICE_PID,
+        configurationFactory = true,
+        specVersion = "1.1",
+        policy = ConfigurationPolicy.REQUIRE)
+public class PollingTransportHandlerFactory extends AbstractTransportHandlerFactory {
+    static final String SERVICE_PID = "org.apache.sling.replication.transport.impl.PollingTransportHandlerFactory";
+
+
+    private static final String DEFAULT_AUTHENTICATION_FACTORY = "(name=" + UserCredentialsTransportAuthenticationProviderFactory.TYPE + ")";
+
+
+    @Property(boolValue = true)
+    private static final String ENABLED = "enabled";
+
+    @Property
+    private static final String NAME = "name";
+
+
+    @Property(cardinality = 1000)
+    private static final String ENDPOINT = ReplicationAgentConfiguration.ENDPOINT;
+
+    @Property(name = ReplicationAgentConfiguration.TRANSPORT_AUTHENTICATION_FACTORY, value = DEFAULT_AUTHENTICATION_FACTORY)
+    @Reference(name = "TransportAuthenticationProviderFactory", target = DEFAULT_AUTHENTICATION_FACTORY, policy = ReferencePolicy.DYNAMIC)
+    private TransportAuthenticationProviderFactory transportAuthenticationProviderFactory;
+
+    @Property
+    private static final String AUTHENTICATION_PROPERTIES = ReplicationAgentConfiguration.AUTHENTICATION_PROPERTIES;
+
+    @Property(name = "poll items", description = "number of subsequent poll requests to make", intValue = -1)
+    private static final String POLL_ITEMS = "poll.items";
+
+    @Reference
+    private ReplicationPackageImporter replicationPackageImporter;
+
+
+
+    protected TransportHandler createTransportHandler(Map<String, ?> config,
+                                                      Dictionary<String, Object> props,
+                                                      TransportAuthenticationProvider transportAuthenticationProvider,
+                                                      ReplicationEndpoint[] endpoints, TransportEndpointStrategyType endpointStrategyType) {
+        int pollItems = PropertiesUtil.toInteger(config.get(POLL_ITEMS), -1);
+        props.put(POLL_ITEMS, pollItems);
+
+
+
+        return new PollingTransportHandler(replicationPackageImporter,
+                pollItems,
+                (TransportAuthenticationProvider<Executor, Executor>) transportAuthenticationProvider,
+                endpoints);
+    }
+
+    @Override
+    protected TransportAuthenticationProviderFactory getAuthenticationFactory() {
+        return transportAuthenticationProviderFactory;
+    }
+
+
+
+    @Activate
+    protected void activate(BundleContext context, Map<String, ?> config) throws Exception {
+        super.activate(context, config);
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        super.deactivate();
+    }
+}

Propchange: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandlerFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandler.java?rev=1564382&r1=1564381&r2=1564382&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandler.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/RepositoryTransportHandler.java Tue Feb  4 17:16:36 2014
@@ -25,12 +25,7 @@ import java.util.Properties;
 import javax.jcr.Node;
 import javax.jcr.Session;
 import javax.jcr.nodetype.NodeType;
-
 import org.apache.commons.io.IOUtils;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Property;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.Service;
 import org.apache.jackrabbit.JcrConstants;
 import org.apache.jackrabbit.util.Text;
 import org.apache.sling.jcr.api.SlingRepository;
@@ -38,95 +33,84 @@ import org.apache.sling.replication.comm
 import org.apache.sling.replication.event.ReplicationEventFactory;
 import org.apache.sling.replication.event.ReplicationEventType;
 import org.apache.sling.replication.serialization.ReplicationPackage;
-import org.apache.sling.replication.transport.ReplicationTransportException;
 import org.apache.sling.replication.transport.TransportHandler;
 import org.apache.sling.replication.transport.authentication.TransportAuthenticationContext;
 import org.apache.sling.replication.transport.authentication.TransportAuthenticationProvider;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@Component(metatype = false)
-@Service(value = TransportHandler.class)
-@Property(name = "name", value = RepositoryTransportHandler.NAME)
-public class RepositoryTransportHandler implements TransportHandler {
-
-    public static final String NAME = "repository";
+public class RepositoryTransportHandler extends AbstractTransportHandler
+        implements TransportHandler {
 
     private static final String REPO_SCHEME = "repo";
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    @Reference
-    private SlingRepository repository;
+    private final SlingRepository repository;
+    private final ReplicationEventFactory replicationEventFactory;
+    private final TransportAuthenticationProvider<SlingRepository, Session> transportAuthenticationProvider;
 
-    @Reference
-    private ReplicationEventFactory replicationEventFactory;
 
-    public void transport(ReplicationPackage replicationPackage,
-                          ReplicationEndpoint replicationEndpoint,
-                          TransportAuthenticationProvider<?, ?> transportAuthenticationProvider)
-            throws ReplicationTransportException {
-        if (validateEndpoint(replicationEndpoint)) {
-            Session session = null;
-            try {
-                TransportAuthenticationContext transportAuthenticationContext = new TransportAuthenticationContext();
-                String path = replicationEndpoint.getUri().toString().replace("repo:/", "");
-                transportAuthenticationContext.addAttribute("path", path);
-                session = ((TransportAuthenticationProvider<SlingRepository, Session>) transportAuthenticationProvider)
-                        .authenticate(repository, transportAuthenticationContext);
-                int lastSlash = replicationPackage.getId().lastIndexOf('/');
-                String nodeName = Text.escape(lastSlash < 0 ? replicationPackage.getId() : replicationPackage.getId().substring(lastSlash + 1));
-                if (log.isInfoEnabled()) {
-                    log.info("creating node {} in {}", replicationPackage.getId(), nodeName);
-                }
-                if (session != null) {
-                    Node addedNode = session.getNode(path).addNode(nodeName,
-                            NodeType.NT_FILE);
-                    Node contentNode = addedNode.addNode(JcrConstants.JCR_CONTENT, NodeType.NT_RESOURCE);
-                    if (contentNode != null) {
-                        InputStream inputStream = null;
-                        try {
-                            inputStream = replicationPackage.createInputStream();
-                            contentNode.setProperty(JcrConstants.JCR_DATA, session.getValueFactory().createBinary(inputStream));
-                            session.save();
-                        }
-                        finally {
-                            IOUtils.closeQuietly(inputStream);
-                        }
+    public RepositoryTransportHandler(SlingRepository repository, ReplicationEventFactory replicationEventFactory,
+                                      TransportAuthenticationProvider<SlingRepository, Session> transportAuthenticationProvider,
+                                      ReplicationEndpoint[] replicationEndpoints){
+        super(replicationEndpoints, TransportEndpointStrategyType.All);
+
+
+        this.repository = repository;
+        this.replicationEventFactory = replicationEventFactory;
+        this.transportAuthenticationProvider = transportAuthenticationProvider;
+    }
+
+    @Override
+    public void deliverPackageToEndpoint(ReplicationPackage replicationPackage, ReplicationEndpoint replicationEndpoint)
+            throws Exception {
+
+        Session session = null;
+        try {
+            TransportAuthenticationContext transportAuthenticationContext = new TransportAuthenticationContext();
+            String path = replicationEndpoint.getUri().toString().replace("repo:/", "");
+            transportAuthenticationContext.addAttribute("path", path);
+            session =  transportAuthenticationProvider.authenticate(repository, transportAuthenticationContext);
+            int lastSlash = replicationPackage.getId().lastIndexOf('/');
+            String nodeName = Text.escape(lastSlash < 0 ? replicationPackage.getId() : replicationPackage.getId().substring(lastSlash + 1));
+            log.info("creating node {} in {}", replicationPackage.getId(), nodeName);
+
+            if (session != null) {
+                Node addedNode = session.getNode(path).addNode(nodeName,
+                        NodeType.NT_FILE);
+                Node contentNode = addedNode.addNode(JcrConstants.JCR_CONTENT, NodeType.NT_RESOURCE);
+                if (contentNode != null) {
+                    InputStream inputStream = null;
+                    try {
+                        inputStream = replicationPackage.createInputStream();
+                        contentNode.setProperty(JcrConstants.JCR_DATA, session.getValueFactory().createBinary(inputStream));
+                        session.save();
                     }
-                    if (log.isInfoEnabled()) {
-                        log.info("package {} delivered to the repository as node {} ",
-                                replicationPackage.getId(), addedNode.getPath());
+                    finally {
+                        IOUtils.closeQuietly(inputStream);
                     }
-                    Dictionary<Object, Object> props = new Properties();
-                    props.put("transport", NAME);
-                    props.put("path", replicationPackage.getPaths());
-                    replicationEventFactory.generateEvent(ReplicationEventType.PACKAGE_REPLICATED, props);
-
-                } else {
-                    throw new ReplicationTransportException(
-                            "could not get a Session to deliver package to the repository");
-                }
-            } catch (Exception e) {
-                throw new ReplicationTransportException(e);
-            } finally {
-                if (session != null) {
-                    session.logout();
                 }
+                log.info("package {} delivered to the repository as node {} ",
+                        replicationPackage.getId(), addedNode.getPath());
+
+                Dictionary<Object, Object> props = new Properties();
+                props.put("path", replicationPackage.getPaths());
+                replicationEventFactory.generateEvent(ReplicationEventType.PACKAGE_REPLICATED, props);
+
+            } else {
+                throw new Exception("could not get a Session to deliver package to the repository");
+            }
+        } finally {
+            if (session != null) {
+                session.logout();
             }
-        } else {
-            throw new ReplicationTransportException("invalid endpoint "
-                    + replicationEndpoint.getUri());
         }
     }
 
-    private boolean validateEndpoint(ReplicationEndpoint replicationEndpoint) {
+    @Override
+    protected boolean validateEndpoint(ReplicationEndpoint replicationEndpoint) {
         URI uri = replicationEndpoint.getUri();
         return REPO_SCHEME.equals(uri.getScheme()) && uri.getHost() != null;
     }
-
-    public boolean supportsAuthenticationProvider(TransportAuthenticationProvider<?, ?> transportAuthenticationProvider) {
-        return transportAuthenticationProvider.canAuthenticate(SlingRepository.class);
-    }
 }



Mime
View raw message