incubator-sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bdelacre...@apache.org
Subject svn commit: r1552571 - in /sling/trunk/contrib/extensions/replication/src: main/java/org/apache/sling/replication/agent/ main/java/org/apache/sling/replication/agent/impl/ main/java/org/apache/sling/replication/communication/ main/java/org/apache/sling...
Date Fri, 20 Dec 2013 09:56:52 GMT
Author: bdelacretaz
Date: Fri Dec 20 09:56:51 2013
New Revision: 1552571

URL: http://svn.apache.org/r1552571
Log:
SLING-3281 - expose more information on replication queues, contributed by Tommaso Teofili, thanks!

Added:
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageImporterTest.java
Modified:
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentConfiguration.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentsManager.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/DefaultReplicationAgentsManager.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/communication/ReplicationActionType.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueue.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemState.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessor.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/TriggerPathReplicationRule.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/AbstractReplicationPackageBuilder.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentPollServlet.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationQueueServlet.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationReceiverServlet.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandler.java
    sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.author/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-publish-reverse.json
    sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.publish/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-reverserepo.json
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/ReplicationAgentConfigurationTest.java
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueTest.java
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtilsTest.java
    sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueTest.java

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java?rev=1552571&r1=1552570&r2=1552571&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgent.java Fri Dec 20 09:56:51 2013
@@ -18,6 +18,7 @@
  */
 package org.apache.sling.replication.agent;
 
+import java.net.URI;
 import org.apache.sling.replication.communication.ReplicationRequest;
 import org.apache.sling.replication.communication.ReplicationResponse;
 import org.apache.sling.replication.queue.ReplicationQueue;
@@ -53,7 +54,7 @@ public interface ReplicationAgent {
     String[] getRules();
 
     /**
-     * execute a {@link ReplicationRequest} against this agent waiting for a {@link ReplicationResponse}
+     * Synchronously sends a {@link ReplicationRequest} waiting for a {@link ReplicationResponse}
      *
      * @param replicationRequest the replication request
      * @return a {@link ReplicationResponse}
@@ -71,7 +72,7 @@ public interface ReplicationAgent {
     void send(ReplicationRequest replicationRequest) throws AgentReplicationException;
 
     /**
-     * process the replication of a certain item
+     * synchronously process the replication of a certain item skipping the underlying queue(s)
      *
      * @param item a {@link ReplicationPackage} to process
      * @return <code>true</code> if process was successful, <code>false</code> otherwise
@@ -79,4 +80,10 @@ public interface ReplicationAgent {
      */
     boolean process(ReplicationPackage item) throws AgentReplicationException;
 
+    /**
+     * get the agent configured endpoint
+     *
+     * @return an <code>URI</code> specifying its endpoint
+     */
+    URI getEndpoint();
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentConfiguration.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentConfiguration.java?rev=1552571&r1=1552570&r2=1552571&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentConfiguration.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentConfiguration.java Fri Dec 20 09:56:51 2013
@@ -20,7 +20,6 @@ package org.apache.sling.replication.age
 
 import java.util.Arrays;
 import java.util.Dictionary;
-
 import org.apache.sling.commons.osgi.PropertiesUtil;
 
 /**
@@ -42,7 +41,7 @@ public class ReplicationAgentConfigurati
 
     public static final String AUTHENTICATION_PROPERTIES = "authentication.properties";
 
-    public static final String DISTRIBUTION = "ReplicationQueueDistributionStrategy.target";
+    public static final String QUEUE_DISTRIBUTION = "ReplicationQueueDistributionStrategy.target";
 
     public static final String RULES = "rules";
 
@@ -54,10 +53,12 @@ public class ReplicationAgentConfigurati
 
     private final String targetTransportHandler;
 
-    private final String targetReplicationBuilder;
+    private final String targetReplicationPackageBuilder;
 
     private final String targetReplicationQueueProvider;
 
+    private final String targetReplicationQueueDistributionStrategy;
+
     private final String targetAuthenticationHandlerFactory;
 
     private final String[] authenticationProperties;
@@ -68,10 +69,11 @@ public class ReplicationAgentConfigurati
         this.name = PropertiesUtil.toString(dictionary.get(NAME), "");
         this.endpoint = PropertiesUtil.toString(dictionary.get(ENDPOINT), "");
         this.targetAuthenticationHandlerFactory = PropertiesUtil.toString(
-                        dictionary.get(TRANSPORT_AUTHENTICATION_FACTORY), "");
-        this.targetReplicationBuilder = PropertiesUtil.toString(dictionary.get(PACKAGING), "");
+                dictionary.get(TRANSPORT_AUTHENTICATION_FACTORY), "");
+        this.targetReplicationPackageBuilder = PropertiesUtil.toString(dictionary.get(PACKAGING), "");
         this.targetReplicationQueueProvider = PropertiesUtil.toString(
-                        dictionary.get(QUEUEPROVIDER), "");
+                dictionary.get(QUEUEPROVIDER), "");
+        this.targetReplicationQueueDistributionStrategy = PropertiesUtil.toString(dictionary.get(QUEUE_DISTRIBUTION), "");
         this.targetTransportHandler = PropertiesUtil.toString(dictionary.get(TRANSPORT), "");
         String[] ap = PropertiesUtil.toStringArray(dictionary.get(AUTHENTICATION_PROPERTIES));
         this.authenticationProperties = ap != null ? ap : new String[0];
@@ -94,8 +96,8 @@ public class ReplicationAgentConfigurati
         return targetAuthenticationHandlerFactory;
     }
 
-    public String getTargetReplicationBuilder() {
-        return targetReplicationBuilder;
+    public String getTargetReplicationPackageBuilder() {
+        return targetReplicationPackageBuilder;
     }
 
     public String getTargetReplicationQueueProvider() {
@@ -106,15 +108,19 @@ public class ReplicationAgentConfigurati
         return targetTransportHandler;
     }
 
+    public String getTargetReplicationQueueDistributionStrategy() { return targetReplicationQueueDistributionStrategy; }
+
     @Override
     public String toString() {
-        return "{\"name\":\"" + name + "\", \"endpoint\":\"" + endpoint + "\", \"targetTransportHandler\":\""
-                        + targetTransportHandler + "\", \"targetReplicationBuilder\":\""
-                        + targetReplicationBuilder + "\", \"targetReplicationQueueProvider\":\""
-                        + targetReplicationQueueProvider + "\", \"targetAuthenticationHandlerFactory\":\""
-                        + targetAuthenticationHandlerFactory + "\", \"authenticationProperties\":\""
-                        + Arrays.toString(authenticationProperties) + "\", \"rules\":\""
-                        + Arrays.toString(rules) + "\"}";
+        return "{\"" + NAME + "\":\"" + name + "\", \""
+                + ENDPOINT + "\":\"" + endpoint + "\", \""
+                + TRANSPORT + "\":\"" + targetTransportHandler + "\", \""
+                + PACKAGING + "\":\"" + targetReplicationPackageBuilder + "\", \""
+                + QUEUEPROVIDER + "\":\"" + targetReplicationQueueProvider + "\", \""
+                + QUEUE_DISTRIBUTION + "\":\"" + targetReplicationQueueDistributionStrategy+ "\", \""
+                + TRANSPORT_AUTHENTICATION_FACTORY + "\":\"" + targetAuthenticationHandlerFactory + "\", \""
+                + AUTHENTICATION_PROPERTIES + "\":\"" + Arrays.toString(authenticationProperties) + "\", \""
+                + RULES + "\":\"" + Arrays.toString(rules) + "\"}";
     }
 
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentsManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentsManager.java?rev=1552571&r1=1552570&r2=1552571&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentsManager.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/ReplicationAgentsManager.java Fri Dec 20 09:56:51 2013
@@ -21,6 +21,7 @@ package org.apache.sling.replication.age
 import java.util.Collection;
 import java.util.SortedSet;
 import org.apache.sling.replication.communication.ReplicationActionType;
+import org.apache.sling.replication.communication.ReplicationEndpoint;
 
 /**
  * A manager for {@link ReplicationAgent}s
@@ -42,4 +43,6 @@ public interface ReplicationAgentsManage
      * @return a <code>Collection</code> of {@link ReplicationAgent}s
      */
     Collection<ReplicationAgent> getAllAvailableAgents();
+
+    Collection<ReplicationAgent> getAgentsFor(ReplicationEndpoint endpoint);
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/DefaultReplicationAgentsManager.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/DefaultReplicationAgentsManager.java?rev=1552571&r1=1552570&r2=1552571&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/DefaultReplicationAgentsManager.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/DefaultReplicationAgentsManager.java Fri Dec 20 09:56:51 2013
@@ -21,6 +21,7 @@ package org.apache.sling.replication.age
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -34,6 +35,7 @@ import org.apache.felix.scr.annotations.
 import org.apache.sling.replication.agent.ReplicationAgent;
 import org.apache.sling.replication.agent.ReplicationAgentsManager;
 import org.apache.sling.replication.communication.ReplicationActionType;
+import org.apache.sling.replication.communication.ReplicationEndpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,6 +67,16 @@ public class DefaultReplicationAgentsMan
         return Collections.unmodifiableCollection(replicationAgents);
     }
 
+    public Collection<ReplicationAgent> getAgentsFor(ReplicationEndpoint endpoint) {
+        Collection<ReplicationAgent> agents = new LinkedList<ReplicationAgent>();
+        for (ReplicationAgent replicationAgent : replicationAgents) {
+            if (endpoint.getUri().equals(replicationAgent.getEndpoint())) {
+                agents.add(replicationAgent);
+            }
+        }
+        return Collections.unmodifiableCollection(agents);
+    }
+
     @Deactivate
     protected void deactivate() {
         replicationAgents.clear();

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java?rev=1552571&r1=1552570&r2=1552571&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java Fri Dec 20 09:56:51 2013
@@ -48,6 +48,7 @@ import org.apache.sling.replication.tran
 import org.apache.sling.replication.transport.authentication.impl.UserCredentialsTransportAuthenticationProviderFactory;
 import org.apache.sling.replication.transport.impl.HttpTransportHandler;
 import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
 import org.osgi.framework.ServiceRegistration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -77,14 +78,12 @@ public class ReplicationAgentServiceFact
 
     private static final String PACKAGING = ReplicationAgentConfiguration.PACKAGING;
 
-    private static final String DISTRIBUTION = ReplicationAgentConfiguration.DISTRIBUTION;
+    private static final String QUEUE_DISTRIBUTION = ReplicationAgentConfiguration.QUEUE_DISTRIBUTION;
 
     private static final String DEFAULT_TRANSPORT = "(name=" + HttpTransportHandler.NAME + ")";
 
     private static final String DEFAULT_AUTHENTICATION_FACTORY = "(name=" + UserCredentialsTransportAuthenticationProviderFactory.TYPE + ")";
 
-    private static final String DEFAULT_ENDPOINT = "http://localhost:4503/system/replication/receive";
-
     private static final String DEFAULT_PACKAGING = "(name="
             + FileVaultReplicationPackageBuilder.NAME + ")";
 
@@ -125,7 +124,7 @@ public class ReplicationAgentServiceFact
     @Reference(name = "TransportAuthenticationProviderFactory", target = DEFAULT_AUTHENTICATION_FACTORY, policy = ReferencePolicy.DYNAMIC)
     private TransportAuthenticationProviderFactory transportAuthenticationProviderFactory;
 
-    @Property(name = DISTRIBUTION, value = DEFAULT_DISTRIBUTION)
+    @Property(name = QUEUE_DISTRIBUTION, value = DEFAULT_DISTRIBUTION)
     @Reference(name = "ReplicationQueueDistributionStrategy", target = DEFAULT_DISTRIBUTION, policy = ReferencePolicy.DYNAMIC)
     private ReplicationQueueDistributionStrategy queueDistributionStrategy;
 
@@ -150,20 +149,20 @@ public class ReplicationAgentServiceFact
                     .toString(config.get(NAME), String.valueOf(new Random().nextInt(1000)));
             props.put(NAME, name);
 
-            String endpoint = PropertiesUtil.toString(config.get(ENDPOINT), DEFAULT_ENDPOINT);
+            String endpoint = PropertiesUtil.toString(config.get(ENDPOINT), "");
             props.put(ENDPOINT, endpoint);
 
-            String transport = PropertiesUtil.toString(config.get(TRANSPORT), DEFAULT_TRANSPORT);
+            String transport = PropertiesUtil.toString(config.get(TRANSPORT), "");
             props.put(TRANSPORT, transport);
 
-            String packaging = PropertiesUtil.toString(config.get(PACKAGING), DEFAULT_PACKAGING);
+            String packaging = PropertiesUtil.toString(config.get(PACKAGING), "");
             props.put(PACKAGING, packaging);
 
-            String queue = PropertiesUtil.toString(config.get(QUEUEPROVIDER), DEFAULT_QUEUEPROVIDER);
+            String queue = PropertiesUtil.toString(config.get(QUEUEPROVIDER), "");
             props.put(QUEUEPROVIDER, queue);
 
-            String distribution = PropertiesUtil.toString(config.get(DISTRIBUTION), DEFAULT_DISTRIBUTION);
-            props.put(DISTRIBUTION, distribution);
+            String distribution = PropertiesUtil.toString(config.get(QUEUE_DISTRIBUTION), "");
+            props.put(QUEUE_DISTRIBUTION, distribution);
 
             Map<String, String> authenticationProperties = PropertiesUtil.toMap(config.get(AUTHENTICATION_PROPERTIES), new String[0]);
             props.put(AUTHENTICATION_PROPERTIES, authenticationProperties);
@@ -171,19 +170,21 @@ public class ReplicationAgentServiceFact
             String[] rules = PropertiesUtil.toStringArray(config.get(RULES), new String[0]);
             props.put(RULES, rules);
 
-            String af = PropertiesUtil.toString(config.get(TRANSPORT_AUTHENTICATION_FACTORY), DEFAULT_AUTHENTICATION_FACTORY);
+            String af = PropertiesUtil.toString(config.get(TRANSPORT_AUTHENTICATION_FACTORY), "");
             props.put(TRANSPORT_AUTHENTICATION_FACTORY, af);
 
             // check configuration is valid
-            if (name == null || transportHandler == null || endpoint == null || packageBuilder == null || queueProvider == null || transportAuthenticationProviderFactory == null || queueDistributionStrategy == null) {
+            if (name == null || packageBuilder == null || queueProvider == null || queueDistributionStrategy == null) {
                 throw new AgentConfigurationException("configuration for this agent is not valid");
             }
 
-            TransportAuthenticationProvider<?, ?> transportAuthenticationProvider = transportAuthenticationProviderFactory.createAuthenticationProvider(authenticationProperties);
-
-            if (!transportHandler.supportsAuthenticationProvider(transportAuthenticationProvider)) {
-                throw new Exception("authentication handler " + transportAuthenticationProvider
-                        + " not supported by transport handler " + transportHandler);
+            TransportAuthenticationProvider transportAuthenticationProvider = null;
+            if (transportAuthenticationProviderFactory != null) {
+                transportAuthenticationProvider = transportAuthenticationProviderFactory.createAuthenticationProvider(authenticationProperties);
+                if (transportHandler != null && !transportHandler.supportsAuthenticationProvider(transportAuthenticationProvider)) {
+                    throw new Exception("authentication handler " + transportAuthenticationProvider
+                            + " not supported by transport handler " + transportHandler);
+                }
             }
 
             if (log.isInfoEnabled()) {
@@ -202,7 +203,7 @@ public class ReplicationAgentServiceFact
             }
 
             // eventually register job consumer for sling job handling based queues
-            if (DEFAULT_QUEUEPROVIDER.equals(queue)) {
+            if (DEFAULT_QUEUEPROVIDER.equals(queue) && (transportHandler != null && endpoint != null && endpoint.length() > 0)) {
                 Dictionary<String, Object> jobProps = new Hashtable<String, Object>();
                 String topic = JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC + '/' + name;
                 String childTopic = topic + "/*";
@@ -213,19 +214,23 @@ public class ReplicationAgentServiceFact
     }
 
     @Deactivate
-    private void deactivate() {
-        ReplicationAgent replicationAgent = (ReplicationAgent) agentReg.getReference();
-        String[] rules = replicationAgent.getRules();
-        if (rules != null) {
-            replicationRuleEngine.unapplyRules(replicationAgent, rules);
-        }
+    private void deactivate(BundleContext context) throws Exception {
+        if (agentReg != null) {
+            ServiceReference reference = agentReg.getReference();
+            ReplicationAgent replicationAgent = (ReplicationAgent) context.getService(reference);
 
-        if (jobReg != null) {
-            jobReg.unregister();
-        }
+            String[] rules = replicationAgent.getRules();
+            if (rules != null) {
+                replicationRuleEngine.unapplyRules(replicationAgent, rules);
+            }
 
-        if (agentReg != null) {
-            agentReg.unregister();
+            if (jobReg != null) {
+                jobReg.unregister();
+            }
+
+            if (agentReg != null) {
+                agentReg.unregister();
+            }
         }
     }
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java?rev=1552571&r1=1552570&r2=1552571&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java Fri Dec 20 09:56:51 2013
@@ -18,6 +18,7 @@
  */
 package org.apache.sling.replication.agent.impl;
 
+import java.net.URI;
 import org.apache.sling.replication.agent.AgentReplicationException;
 import org.apache.sling.replication.agent.ReplicationAgent;
 import org.apache.sling.replication.communication.ReplicationEndpoint;
@@ -125,14 +126,13 @@ public class SimpleReplicationAgent impl
 
     public boolean process(ReplicationPackage item) throws AgentReplicationException {
         try {
-            if (transportHandler != null) {
+            if (transportHandler != null || (endpoint != null && endpoint.length() > 0)) {
                 transportHandler.transport(item, new ReplicationEndpoint(endpoint),
                         transportAuthenticationProvider);
                 return true;
             } else {
-                if (log.isWarnEnabled()) {
-                    log.warn("could not process an item as a transport handler is not bound to agent {}",
-                            name);
+                if (log.isInfoEnabled()) {
+                    log.info("agent {} processing skipped", name);
                 }
                 return false;
             }
@@ -141,6 +141,10 @@ public class SimpleReplicationAgent impl
         }
     }
 
+    public URI getEndpoint() {
+        return new ReplicationEndpoint(endpoint).getUri();
+    }
+
     public String getName() {
         return name;
     }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/communication/ReplicationActionType.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/communication/ReplicationActionType.java?rev=1552571&r1=1552570&r2=1552571&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/communication/ReplicationActionType.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/communication/ReplicationActionType.java Fri Dec 20 09:56:51 2013
@@ -32,7 +32,12 @@ public enum ReplicationActionType {
     /**
      * Content is deleted
      */
-    DELETE("Delete");
+    DELETE("Delete"),
+
+    /**
+     * Content is polled
+     */
+    POLL("Poll");
 
     /**
      * internal human readable name

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueue.java?rev=1552571&r1=1552570&r2=1552571&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueue.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueue.java Fri Dec 20 09:56:51 2013
@@ -18,7 +18,7 @@
  */
 package org.apache.sling.replication.queue;
 
-import org.apache.sling.replication.agent.ReplicationAgent;
+import java.util.Collection;
 import org.apache.sling.replication.serialization.ReplicationPackage;
 
 /**
@@ -28,6 +28,7 @@ public interface ReplicationQueue {
 
     /**
      * get this queue name
+     *
      * @return queue name as a <code>String</code>
      */
     String getName();
@@ -35,10 +36,9 @@ public interface ReplicationQueue {
     /**
      * add a replication package to this queue
      *
-     * @param replicationPackage
-     *            a replication package to replicate
+     * @param replicationPackage a replication package to replicate
      * @return <code>true</code> if the replication package was added correctly to the queue,
-     *         <code>false</code otherwise
+     * <code>false</code otherwise
      * @throws ReplicationQueueException
      */
     boolean add(ReplicationPackage replicationPackage) throws ReplicationQueueException;
@@ -46,20 +46,12 @@ public interface ReplicationQueue {
     /**
      * get the status of a certain package in the queue
      *
-     * @param replicationPackage
-     *            the replication package to get the status for
+     * @param replicationPackage the replication package to get the status for
      * @return the item status in the queue
      * @throws ReplicationQueueException
      */
     ReplicationQueueItemState getStatus(ReplicationPackage replicationPackage)
-                    throws ReplicationQueueException;
-
-    /**
-     * get the agent this queue is used for
-     *
-     * @return a replication agent
-     */
-    ReplicationAgent getAgent();
+            throws ReplicationQueueException;
 
     /**
      * get the first item (FIFO wise, the next to be processed) into the queue
@@ -81,4 +73,17 @@ public interface ReplicationQueue {
      * @return <code>true</code> if the queue is empty, <code>false</code> otherwise
      */
     boolean isEmpty();
+
+    /**
+     * get the items in the queue
+     *
+     * @return a <code>Collection</code> of {@link org.apache.sling.replication.serialization.ReplicationPackage}s
+     */
+    Collection<ReplicationPackage> getItems();
+
+    /**
+     * remove an item from the queue by specifying its id
+     * @param id <code>String</code> representing an item's identifier
+     */
+    void remove(String id);
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemState.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemState.java?rev=1552571&r1=1552570&r2=1552571&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemState.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemState.java Fri Dec 20 09:56:51 2013
@@ -18,6 +18,8 @@
  */
 package org.apache.sling.replication.queue;
 
+import java.util.Calendar;
+
 /**
  * the current status of a certain item in a {@link ReplicationQueue}
  */
@@ -28,6 +30,8 @@ public class ReplicationQueueItemState {
 
     private ItemState state;
 
+    private Calendar entered;
+
     public boolean isSuccessful() {
         return ItemState.SUCCEEDED.equals(state);
     }
@@ -57,6 +61,14 @@ public class ReplicationQueueItemState {
         return "{\"attempts\":\"" + attempts + "\",\"" + "successful\":\"" + isSuccessful() + "\",\"" + "state\":\"" + state + "\"}";
     }
 
+    public Calendar getEntered() {
+        return entered;
+    }
+
+    public void setEntered(Calendar entered) {
+        this.entered = entered;
+    }
+
     public enum ItemState {
         QUEUED, // waiting in queue after adding or for restart after failing
         ACTIVE, // job is currently in processing

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java?rev=1552571&r1=1552570&r2=1552571&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java Fri Dec 20 09:56:51 2013
@@ -18,6 +18,7 @@
  */
 package org.apache.sling.replication.queue.impl;
 
+import java.util.Calendar;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.PropertyOption;
@@ -54,8 +55,11 @@ public class ErrorAwareQueueDistribution
     @Property
     private static final String ATTEMPTS_THRESHOLD = "attempts.threshold";
 
+    @Property
+    private static final String TIME_THRESHOLD = "time.threshold";
+
     @Property(name = "Stuck Queue Handling", options = {
-            @PropertyOption(name = ERROR, value = "Error"),
+            @PropertyOption(name = "ERROR", value = "Error"),
             @PropertyOption(name = "DROP", value = "Drop")})
     private static final String STUCK_HANDLING = "stuck.handling";
 
@@ -63,11 +67,13 @@ public class ErrorAwareQueueDistribution
 
     private Integer attemptsThreshold;
 
+    private Integer timeThreshold;
+
     protected void activate(final ComponentContext ctx) {
         stuckQueueHandling = PropertiesUtil
                 .toString(ctx.getProperties().get(STUCK_HANDLING), ERROR);
-        attemptsThreshold = PropertiesUtil.toInteger(ctx.getProperties().get(ATTEMPTS_THRESHOLD),
-                100);
+        attemptsThreshold = PropertiesUtil.toInteger(ctx.getProperties().get(ATTEMPTS_THRESHOLD), 100);
+        timeThreshold = PropertiesUtil.toInteger(ctx.getProperties().get(TIME_THRESHOLD), 10800000);
     }
 
     public ReplicationQueueItemState add(ReplicationPackage replicationPackage,
@@ -128,13 +134,11 @@ public class ErrorAwareQueueDistribution
             ReplicationQueueItemState status = defaultQueue.getStatus(firstItem);
             // if item is still in the queue after a max no. of attempts, move it to the error queue
             int attempts = status.getAttempts();
-            if (log.isInfoEnabled()) {
-                log.info("attempts for item {}: {}", firstItem.getId(), attempts);
-            }
-            if (attempts > attemptsThreshold) {
+            Calendar entered = status.getEntered();
+            if (attempts > attemptsThreshold || Calendar.getInstance().getTimeInMillis() - entered.getTimeInMillis() > timeThreshold) {
                 if (ERROR.equals(stuckQueueHandling)) {
                     if (log.isWarnEnabled()) {
-                        log.warn("item moved to the error queue");
+                        log.warn("item {} moved to the error queue", firstItem);
                     }
                     ReplicationQueue errorQueue = queueProvider.getQueue(agent, "-error");
                     if (!errorQueue.add(firstItem)) {
@@ -145,7 +149,7 @@ public class ErrorAwareQueueDistribution
                     }
                 }
                 if (log.isWarnEnabled()) {
-                    log.warn("item dropped from the default queue");
+                    log.warn("item {} dropped from the default queue", firstItem);
                 }
                 defaultQueue.removeHead();
             }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java?rev=1552571&r1=1552570&r2=1552571&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java Fri Dec 20 09:56:51 2013
@@ -23,20 +23,18 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.Map;
-
 import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.JobManager;
 import org.apache.sling.event.jobs.JobManager.QueryType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.sling.replication.agent.ReplicationAgent;
 import org.apache.sling.replication.queue.ReplicationQueue;
 import org.apache.sling.replication.queue.ReplicationQueueException;
 import org.apache.sling.replication.queue.ReplicationQueueItemState;
 import org.apache.sling.replication.queue.ReplicationQueueItemState.ItemState;
 import org.apache.sling.replication.serialization.ReplicationPackage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * a {@link ReplicationQueue} based on Sling Job Handling facilities
@@ -67,7 +65,7 @@ public class JobHandlingReplicationQueue
         boolean result = true;
         try {
             Map<String, Object> properties = JobHandlingUtils
-                            .createFullPropertiesFromPackage(replicationPackage);
+                    .createFullPropertiesFromPackage(replicationPackage);
 
             Job job = jobManager.createJob(topic).properties(properties).add();
             if (log.isInfoEnabled()) {
@@ -83,15 +81,16 @@ public class JobHandlingReplicationQueue
     }
 
     public ReplicationQueueItemState getStatus(ReplicationPackage replicationPackage)
-                    throws ReplicationQueueException {
+            throws ReplicationQueueException {
         ReplicationQueueItemState itemStatus = new ReplicationQueueItemState();
         try {
             Map<String, Object> properties = JobHandlingUtils
-                            .createIdPropertiesFromPackage(replicationPackage);
+                    .createIdPropertiesFromPackage(replicationPackage);
             Job job = jobManager.getJob(topic, properties);
             if (job != null) {
                 itemStatus.setAttempts(job.getRetryCount());
                 itemStatus.setItemState(ItemState.valueOf(job.getJobState().toString()));
+                itemStatus.setEntered(job.getCreated());
                 if (log.isInfoEnabled()) {
                     log.info("status of job {} is {}", job.getId(), job.getJobState());
                 }
@@ -104,10 +103,6 @@ public class JobHandlingReplicationQueue
         return itemStatus;
     }
 
-    public ReplicationAgent getAgent() {
-        throw new RuntimeException("not implemented");
-    }
-
     public ReplicationPackage getHead() {
         Job firstItem = getFirstItem();
         if (firstItem != null) {
@@ -136,15 +131,14 @@ public class JobHandlingReplicationQueue
         if (jobs.size() > 0) {
             ArrayList<Job> list = new ArrayList<Job>(jobs);
             Collections.sort(list, new Comparator<Job>() {
-
                 public int compare(Job o1, Job o2) {
                     return o2.getRetryCount() - o1.getRetryCount();
                 }
             });
             Job firstItem = list.get(0);
             if (log.isInfoEnabled()) {
-                log.info("first item in the queue is {} retried {} times", firstItem.getId(),
-                                firstItem.getRetryCount());
+                log.info("first item in the queue is {}, retried {} times", firstItem.getId(),
+                        firstItem.getRetryCount());
             }
             return firstItem;
         }
@@ -152,9 +146,23 @@ public class JobHandlingReplicationQueue
     }
 
     public boolean isEmpty() {
-        // TODO : fix this
-//        return jobManager.getQueue(name).getStatistics().getNumberOfJobs() == 0;
-        return false;
+        return getItems().isEmpty();
+    }
+
+    public Collection<ReplicationPackage> getItems() {
+        Collection<ReplicationPackage> items = new LinkedList<ReplicationPackage>();
+        Collection<Job> jobs = jobManager.findJobs(QueryType.ALL, topic, -1);
+        for (Job job : jobs) {
+            items.add(JobHandlingUtils.getPackage(job));
+        }
+        return Collections.unmodifiableCollection(items);
+    }
+
+    public void remove(String id) {
+        boolean removed = jobManager.removeJobById(id);
+        if (log.isInfoEnabled()) {
+            log.info("item with id {} removed from the queue: {}", id, removed);
+        }
     }
 
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java?rev=1552571&r1=1552570&r2=1552571&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java Fri Dec 20 09:56:51 2013
@@ -58,7 +58,7 @@ public class JobHandlingReplicationQueue
                     throws ReplicationQueueException {
         try {
             String name = agent.getName() + queueName;
-            String topic = JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC + '/' + agent.getName() + queueName;
+            String topic = JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC + '/' + name;
             if (jobManager.getQueue(name) == null) {
                 Configuration config = configAdmin.createFactoryConfiguration(
                                 QueueConfiguration.class.getName(), null);
@@ -80,8 +80,8 @@ public class JobHandlingReplicationQueue
 
     @Override
     protected void deleteQueue(ReplicationQueue queue) throws ReplicationQueueException {
-        Queue q = jobManager.getQueue(queue.getAgent().getName());
-        q.removeAll(); // need to check if this is correct
+        Queue q = jobManager.getQueue(queue.getName());
+        q.removeAll();
     }
 
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java?rev=1552571&r1=1552570&r2=1552571&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java Fri Dec 20 09:56:51 2013
@@ -41,7 +41,7 @@ public class JobHandlingUtils {
 
     private static final String LENGTH = "replication.package.length";
 
-//    private static final String BIN = "replication.package.stream";
+    private static final String BIN = "replication.package.stream";
 
     private static final String TYPE = "replication.package.type";
 
@@ -111,11 +111,10 @@ public class JobHandlingUtils {
             }
 
             public InputStream getInputStream() throws IOException {
-                // TODO : use this once SLING-3140 gets released
-                // return new ByteArrayInputStream((byte[]) job.getProperty(BIN));
+                return IOUtils.toInputStream(String.valueOf(job.getProperty(BIN)));
 
                 // workaround to make void package work while we get SLING-3140 to be released
-                return IOUtils.toInputStream(String.valueOf(job.getProperty(ID)));
+//                return IOUtils.toInputStream(String.valueOf(job.getProperty(ID)));
             }
 
             public String getId() {
@@ -134,13 +133,13 @@ public class JobHandlingUtils {
     }
 
     public static Map<String, Object> createFullPropertiesFromPackage(
-                    ReplicationPackage replicationPackage) {
+                    ReplicationPackage replicationPackage) throws IOException {
         Map<String, Object> properties = new HashMap<String, Object>();
         properties.put(ID, replicationPackage.getId());
         properties.put(PATHS, replicationPackage.getPaths());
         properties.put(LENGTH, replicationPackage.getLength());
         properties.put(ACTION, replicationPackage.getAction());
-//        properties.put(BIN, IOUtils.toByteArray(replicationPackage.getInputStream()));
+        properties.put(BIN, IOUtils.toString(replicationPackage.getInputStream()));
         properties.put(TYPE, replicationPackage.getType());
         return properties;
     }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessor.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessor.java?rev=1552571&r1=1552570&r2=1552571&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessor.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessor.java Fri Dec 20 09:56:51 2013
@@ -59,7 +59,7 @@ public class ScheduledReplicationQueuePr
                     ReplicationPackage item = queue.getHead();
                     if (item != null) {
                         try {
-                            if (queue.getAgent().process(item)) {
+                            if (((SimpleReplicationQueue)queue).getAgent().process(item)) {
                                 queue.removeHead();
                             } else {
                                 if (log.isWarnEnabled()) {

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java?rev=1552571&r1=1552570&r2=1552571&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java Fri Dec 20 09:56:51 2013
@@ -18,24 +18,24 @@
  */
 package org.apache.sling.replication.queue.impl.simple;
 
+import java.util.Calendar;
+import java.util.Collection;
 import java.util.Map;
 import java.util.WeakHashMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.sling.replication.agent.ReplicationAgent;
 import org.apache.sling.replication.queue.ReplicationQueue;
 import org.apache.sling.replication.queue.ReplicationQueueItemState;
 import org.apache.sling.replication.queue.ReplicationQueueItemState.ItemState;
 import org.apache.sling.replication.serialization.ReplicationPackage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A simple implementation of a {@link ReplicationQueue}.
- * 
+ * <p/>
  * Note that, at the moment, this is a transient in memory queue not persisted on the repository and
  * therefore not usable for production.
  */
@@ -70,6 +70,7 @@ public class SimpleReplicationQueue impl
         boolean result = false;
         try {
             result = queue.offer(replicationPackage, 10, TimeUnit.SECONDS);
+            status.setEntered(Calendar.getInstance());
         } catch (InterruptedException e) {
             log.error("cannot add an item to the queue", e);
             status.setSuccessful(false);
@@ -111,4 +112,24 @@ public class SimpleReplicationQueue impl
         return queue.isEmpty();
     }
 
+    public Collection<ReplicationPackage> getItems() {
+        return queue;
+    }
+
+    public void remove(String id) {
+        ReplicationPackage toRemove = null;
+        for (ReplicationPackage item : queue) {
+            if (id.equals(item.getId())) {
+                toRemove = item;
+            }
+        }
+        boolean removed = false;
+        if (toRemove != null) {
+            removed = queue.remove(toRemove);
+        }
+        if (log.isInfoEnabled()) {
+            log.info("item with id {} removed from the queue: {}", id, removed);
+        }
+    }
+
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/TriggerPathReplicationRule.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/TriggerPathReplicationRule.java?rev=1552571&r1=1552570&r2=1552571&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/TriggerPathReplicationRule.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/TriggerPathReplicationRule.java Fri Dec 20 09:56:51 2013
@@ -141,7 +141,7 @@ public class TriggerPathReplicationRule 
                     agent.send(new ReplicationRequest(System.currentTimeMillis(), action, replicatingPath));
                 } catch (AgentReplicationException e) {
                     if (log.isErrorEnabled()) {
-                        log.error("triggered replication resulted in an error {}", e);
+                        log.error("triggered replication resulted in an error", e);
                     }
                 }
             }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/AbstractReplicationPackageBuilder.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/AbstractReplicationPackageBuilder.java?rev=1552571&r1=1552570&r2=1552571&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/AbstractReplicationPackageBuilder.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/AbstractReplicationPackageBuilder.java Fri Dec 20 09:56:51 2013
@@ -45,6 +45,8 @@ public abstract class AbstractReplicatio
             replicationPackage = createPackageForAdd(request);
         } else if (ReplicationActionType.DELETE.equals(request.getAction())) {
             replicationPackage = new VoidReplicationPackage(request, getName());
+        } else if (ReplicationActionType.POLL.equals(request.getAction())) {
+            replicationPackage = new VoidReplicationPackage(request, getName()); // TODO : change this
         } else {
             throw new ReplicationPackageBuildingException("unknown action type "
                     + request.getAction());

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentPollServlet.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentPollServlet.java?rev=1552571&r1=1552570&r2=1552571&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentPollServlet.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentPollServlet.java Fri Dec 20 09:56:51 2013
@@ -77,7 +77,6 @@ public class ReplicationAgentPollServlet
                         log.info("{} bytes written into the response", bytesCopied);
                     }
                     // remove the item from the queue
-                    // TODO : this should be conditional to successful import on polling instance
                     queue.removeHead();
                 } else {
                     if (log.isInfoEnabled()) {

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationQueueServlet.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationQueueServlet.java?rev=1552571&r1=1552570&r2=1552571&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationQueueServlet.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationQueueServlet.java Fri Dec 20 09:56:51 2013
@@ -31,6 +31,8 @@ import org.apache.sling.api.resource.Res
 import org.apache.sling.api.servlets.SlingAllMethodsServlet;
 import org.apache.sling.replication.agent.impl.ReplicationAgentQueueResource;
 import org.apache.sling.replication.queue.ReplicationQueue;
+import org.apache.sling.replication.queue.ReplicationQueueItemState;
+import org.apache.sling.replication.serialization.ReplicationPackage;
 
 @SuppressWarnings("serial")
 @Component(metatype = false)
@@ -48,14 +50,56 @@ public class ReplicationQueueServlet ext
         ReplicationQueue queue = resource
                 .adaptTo(ReplicationQueue.class);
         if (queue != null) {
-            response.getWriter().write(toJSoN(queue));
+            try {
+                response.getWriter().write(toJSoN(queue));
+            } catch (Exception e) {
+                response.getWriter().write("{\"status\" : \"error\",\"message\":\"error reading from the queue\"}");
+            }
         } else {
-            response.getWriter().write("{\"status\" : \"error\", \"message\":\"queue not found\"}");
+            response.getWriter().write("{\"status\" : \"error\",\"message\":\"queue not found\"}");
         }
     }
 
-    private String toJSoN(ReplicationQueue queue) {
-        return "{" + "\"name\":\"" + queue.getName() + "\",\"empty\":" + queue.isEmpty() + "}";
+    private String toJSoN(ReplicationQueue queue) throws Exception {
+        StringBuilder builder = new StringBuilder("{\"name\":\"" + queue.getName() + "\",\"empty\":" + queue.isEmpty());
+        if (!queue.isEmpty()) {
+            builder.append(",\"items\":[");
+            for (ReplicationPackage item : queue.getItems()) {
+                builder.append('{');
+                builder.append(toJSoN(item));
+                builder.append(',');
+                builder.append(toJSoN(queue.getStatus(item)));
+                builder.append("},");
+            }
+            if (queue.getItems().size() > 0) {
+                builder.deleteCharAt(builder.length() - 1);
+            }
+            builder.append(']');
+        }
+        builder.append('}');
+        return builder.toString();
+    }
+
+    private String toJSoN(ReplicationQueueItemState status) {
+        return "\"attempts\":" + status.getAttempts() + ",\"state\":\"" + status.getItemState().name() + "\",\"entered\":\"" + status.getEntered().getTime() + "\"";
+    }
+
+    private String toJSoN(ReplicationPackage item) {
+        StringBuilder builder = new StringBuilder();
+        builder.append("\"id\":\"").append(item.getId());
+        builder.append("\",\"paths\":[");
+        for (int i = 0; i < item.getPaths().length; i++) {
+            builder.append("\"");
+            builder.append(item.getPaths()[i]);
+            builder.append("\",");
+        }
+        builder.deleteCharAt(builder.length() - 1);
+        builder.append(']');
+        builder.append(",\"action\":\"").append(item.getAction());
+        builder.append("\",\"type\":\"").append(item.getType());
+        builder.append("\",\"length\":").append(item.getLength());
+
+        return builder.toString();
     }
 
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationReceiverServlet.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationReceiverServlet.java?rev=1552571&r1=1552570&r2=1552571&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationReceiverServlet.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationReceiverServlet.java Fri Dec 20 09:56:51 2013
@@ -18,14 +18,10 @@
  */
 package org.apache.sling.replication.servlet;
 
-import java.io.BufferedInputStream;
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.Dictionary;
-import java.util.Hashtable;
+import java.io.InputStream;
 import javax.servlet.Servlet;
 import javax.servlet.ServletException;
-import javax.servlet.ServletInputStream;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Properties;
 import org.apache.felix.scr.annotations.Property;
@@ -35,11 +31,7 @@ import org.apache.sling.api.SlingHttpSer
 import org.apache.sling.api.SlingHttpServletResponse;
 import org.apache.sling.api.servlets.SlingAllMethodsServlet;
 import org.apache.sling.replication.communication.ReplicationHeader;
-import org.apache.sling.replication.event.ReplicationEventFactory;
-import org.apache.sling.replication.event.ReplicationEventType;
-import org.apache.sling.replication.serialization.ReplicationPackage;
-import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
-import org.apache.sling.replication.serialization.ReplicationPackageBuilderProvider;
+import org.apache.sling.replication.serialization.ReplicationPackageImporter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,10 +48,7 @@ public class ReplicationReceiverServlet 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     @Reference
-    private ReplicationPackageBuilderProvider replicationPackageBuilderProvider;
-
-    @Reference
-    private ReplicationEventFactory replicationEventFactory;
+    private ReplicationPackageImporter replicationPackageImporter;
 
     @Override
     protected void doPost(SlingHttpServletRequest request, SlingHttpServletResponse response)
@@ -69,48 +58,10 @@ public class ReplicationReceiverServlet 
         response.setContentType("text/plain");
         response.setCharacterEncoding("utf-8");
 
+        InputStream stream = request.getInputStream();
+        String type = request.getHeader(ReplicationHeader.TYPE.toString());
         try {
-            ReplicationPackage replicationPackage = null;
-            ServletInputStream stream = request.getInputStream();
-            String typeHeader = request.getHeader(ReplicationHeader.TYPE.toString());
-            if (typeHeader != null) {
-                ReplicationPackageBuilder replicationPackageBuilder = replicationPackageBuilderProvider.getReplicationPackageBuilder(typeHeader);
-                if (replicationPackageBuilder != null) {
-                    replicationPackage = replicationPackageBuilder.readPackage(stream, true);
-                } else {
-                    if (log.isWarnEnabled()) {
-                        log.warn("cannot read streams of type {}", typeHeader);
-                    }
-                }
-            } else {
-                BufferedInputStream bufferedInputStream = new BufferedInputStream(stream); // needed to allow for multiple reads
-                for (ReplicationPackageBuilder replicationPackageBuilder : replicationPackageBuilderProvider.getAvailableReplicationPackageBuilders()) {
-                    try {
-                        replicationPackage = replicationPackageBuilder.readPackage(bufferedInputStream, true);
-                    } catch (Exception e) {
-                        if (log.isWarnEnabled()) {
-                            log.warn("received stream cannot be read with {}", replicationPackageBuilder);
-                        }
-                    }
-                }
-            }
-
-            if (replicationPackage != null) {
-                if (log.isInfoEnabled()) {
-                    log.info("replication package read and installed for path(s) {}",
-                            Arrays.toString(replicationPackage.getPaths()));
-                }
-                success = true;
-
-                Dictionary<String, Object> dictionary = new Hashtable<String, Object>();
-                dictionary.put("replication.action", replicationPackage.getAction());
-                dictionary.put("replication.path", replicationPackage.getPaths());
-                replicationEventFactory.generateEvent(ReplicationEventType.PACKAGE_INSTALLED, dictionary);
-            } else {
-                if (log.isWarnEnabled()) {
-                    log.warn("could not read a replication package");
-                }
-            }
+           success = replicationPackageImporter.importStream(stream, type);
         } catch (final Exception e) {
             response.setStatus(400);
             if (log.isErrorEnabled()) {

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandler.java?rev=1552571&r1=1552570&r2=1552571&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandler.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandler.java Fri Dec 20 09:56:51 2013
@@ -27,11 +27,10 @@ import org.apache.http.HttpEntity;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.fluent.Executor;
 import org.apache.http.client.fluent.Request;
-import org.apache.http.client.fluent.Response;
 import org.apache.sling.replication.communication.ReplicationEndpoint;
 import org.apache.sling.replication.communication.ReplicationHeader;
 import org.apache.sling.replication.serialization.ReplicationPackage;
-import org.apache.sling.replication.serialization.ReplicationPackageBuilderProvider;
+import org.apache.sling.replication.serialization.ReplicationPackageImporter;
 import org.apache.sling.replication.transport.ReplicationTransportException;
 import org.apache.sling.replication.transport.TransportHandler;
 import org.apache.sling.replication.transport.authentication.TransportAuthenticationContext;
@@ -52,42 +51,43 @@ public class PollingTransportHandler imp
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     @Reference
-    private ReplicationPackageBuilderProvider packageBuilderProvider;
+    private ReplicationPackageImporter replicationPackageImporter;
 
     @SuppressWarnings("unchecked")
     public void transport(ReplicationPackage replicationPackage,
-                    ReplicationEndpoint replicationEndpoint,
-                    TransportAuthenticationProvider<?, ?> transportAuthenticationProvider)
-                    throws ReplicationTransportException {
+                          ReplicationEndpoint replicationEndpoint,
+                          TransportAuthenticationProvider<?, ?> transportAuthenticationProvider)
+            throws ReplicationTransportException {
         if (log.isInfoEnabled()) {
             log.info("polling from {}", replicationEndpoint.getUri());
         }
+
         try {
             Executor executor = Executor.newInstance();
             TransportAuthenticationContext context = new TransportAuthenticationContext();
             context.addAttribute("endpoint", replicationEndpoint);
             executor = ((TransportAuthenticationProvider<Executor, Executor>) transportAuthenticationProvider)
-                            .authenticate(executor, context);
+                    .authenticate(executor, context);
 
             Request req = Request.Get(replicationEndpoint.getUri()).useExpectContinue();
-            // TODO : missing queue header
-            Response response = executor.execute(req);
-            HttpResponse httpResponse = response.returnResponse();
-            HttpEntity entity = httpResponse.getEntity();
-            Header typeHeader = httpResponse.getFirstHeader(ReplicationHeader.TYPE.toString());
-
-            if (typeHeader != null) {
-                String type = typeHeader.getValue();
-                ReplicationPackage readPackage = packageBuilderProvider
-                                .getReplicationPackageBuilder(type).readPackage(entity.getContent(), true);
-
-                if (log.isInfoEnabled()) {
-                    log.info("package {} fetched and installed", readPackage.getId());
-                }
+            // TODO : add queue header
 
-            } else {
-                if (log.isInfoEnabled()) {
-                    log.info("nothing to fetch");
+            // continuously requests package streams as long as type header is received with the response (meaning there's a package of a certain type)
+            HttpResponse httpResponse;
+            while ((httpResponse = executor.execute(req).returnResponse()).containsHeader(ReplicationHeader.TYPE.toString())) {
+                HttpEntity entity = httpResponse.getEntity();
+                Header typeHeader = httpResponse.getFirstHeader(ReplicationHeader.TYPE.toString());
+
+                if (entity.getContentLength() > 0) {
+                    replicationPackageImporter.scheduleImport(entity.getContent(), typeHeader.getValue());
+                    if (log.isInfoEnabled()) {
+                        log.info("scheduled import of package stream");
+                    }
+
+                } else {
+                    if (log.isInfoEnabled()) {
+                        log.info("nothing to fetch");
+                    }
                 }
             }
         } catch (Exception e) {

Modified: sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.author/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-publish-reverse.json
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.author/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-publish-reverse.json?rev=1552571&r1=1552570&r2=1552571&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.author/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-publish-reverse.json (original)
+++ sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.author/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-publish-reverse.json Fri Dec 20 09:56:51 2013
@@ -7,5 +7,6 @@
     "ReplicationQueueProvider.target" : "(name=sjh)",
     "TransportAuthenticationProviderFactory.target" : "(name=user)",
     "authentication.properties" : ["user=admin","password=admin"],
-    "ReplicationQueueDistributionStrategy.target" : "(name=single)"
+    "ReplicationQueueDistributionStrategy.target" : "(name=single)",
+    "rules" : ["scheduled poll every 30 sec"]
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.publish/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-reverserepo.json
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.publish/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-reverserepo.json?rev=1552571&r1=1552570&r2=1552571&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.publish/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-reverserepo.json (original)
+++ sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.publish/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-reverserepo.json Fri Dec 20 09:56:51 2013
@@ -1,12 +1,8 @@
 {
     "jcr:primaryType" : "sling:OsgiConfig",
     "name" : "reverserepo",
-    "endpoint" : "repo://var/replication/outbox/reverserepo",
-    "TransportHandler.target" : "(name=repository)", 
     "ReplicationPackageBuilder.target" : "(name=vlt)",
     "ReplicationQueueProvider.target" : "(name=sjh)",
-    "TransportAuthenticationProviderFactory.target" : "(name=repo-user)",
-    "authentication.properties" : ["user=admin","password=admin"],
     "ReplicationQueueDistributionStrategy.target" : "(name=error)",
     "rules" : ["trigger on path: /content/usergenerated"]
 }

Modified: sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/ReplicationAgentConfigurationTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/ReplicationAgentConfigurationTest.java?rev=1552571&r1=1552570&r2=1552571&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/ReplicationAgentConfigurationTest.java (original)
+++ sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/agent/ReplicationAgentConfigurationTest.java Fri Dec 20 09:56:51 2013
@@ -34,8 +34,9 @@ public class ReplicationAgentConfigurati
         assertNotNull(replicationAgentConfiguration.getEndpoint());
         assertNotNull(replicationAgentConfiguration.getName());
         assertNotNull(replicationAgentConfiguration.getTargetAuthenticationHandlerFactory());
-        assertNotNull(replicationAgentConfiguration.getTargetReplicationBuilder());
+        assertNotNull(replicationAgentConfiguration.getTargetReplicationPackageBuilder());
         assertNotNull(replicationAgentConfiguration.getTargetReplicationQueueProvider());
+        assertNotNull(replicationAgentConfiguration.getTargetReplicationQueueDistributionStrategy());
         assertNotNull(replicationAgentConfiguration.getTargetTransportHandler());
     }
 

Modified: sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueTest.java?rev=1552571&r1=1552570&r2=1552571&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueTest.java (original)
+++ sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueTest.java Fri Dec 20 09:56:51 2013
@@ -18,26 +18,26 @@
  */
 package org.apache.sling.replication.queue.impl.jobhandling;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
+import java.util.Collections;
 import java.util.Map;
-
+import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.JobBuilder;
 import org.apache.sling.event.jobs.JobManager;
-import org.junit.Test;
-
 import org.apache.sling.replication.queue.ReplicationQueue;
 import org.apache.sling.replication.queue.ReplicationQueueItemState;
 import org.apache.sling.replication.queue.ReplicationQueueItemState.ItemState;
 import org.apache.sling.replication.serialization.ReplicationPackage;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Testcase for {@link JobHandlingReplicationQueue}
@@ -51,6 +51,7 @@ public class JobHandlingReplicationQueue
         JobBuilder builder = mock(JobBuilder.class);
         String topic = JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC + "/aname";
         when(jobManager.createJob(topic)).thenReturn(builder);
+        when(jobManager.findJobs(JobManager.QueryType.ALL, topic, -1)).thenReturn(Collections.<Job>emptySet());
         when(builder.properties(any(Map.class))).thenReturn(builder);
         ReplicationQueue queue = new JobHandlingReplicationQueue("aname", topic, jobManager);
         ReplicationPackage pkg = mock(ReplicationPackage.class);
@@ -66,6 +67,7 @@ public class JobHandlingReplicationQueue
         JobBuilder builder = mock(JobBuilder.class);
         String topic = JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC + "/aname";
         when(jobManager.createJob(topic)).thenReturn(builder);
+        when(jobManager.findJobs(JobManager.QueryType.ALL, topic, -1)).thenReturn(Collections.<Job>emptySet());
         when(builder.properties(any(Map.class))).thenReturn(builder);
         ReplicationQueue queue = new JobHandlingReplicationQueue("aname", topic, jobManager);
         ReplicationPackage pkg = mock(ReplicationPackage.class);

Modified: sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtilsTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtilsTest.java?rev=1552571&r1=1552570&r2=1552571&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtilsTest.java (original)
+++ sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtilsTest.java Fri Dec 20 09:56:51 2013
@@ -45,11 +45,11 @@ public class JobHandlingUtilsTest {
         when(replicationPackage.getType()).thenReturn("vlt");
         Map<String,Object> fullPropertiesFromPackage = JobHandlingUtils.createFullPropertiesFromPackage(replicationPackage);
         assertNotNull(fullPropertiesFromPackage);
-        assertEquals(5, fullPropertiesFromPackage.size());
+        assertEquals(6, fullPropertiesFromPackage.size());
         assertNotNull(fullPropertiesFromPackage.get("replication.package.paths"));
         assertNotNull(fullPropertiesFromPackage.get("replication.package.id"));
         assertNotNull(fullPropertiesFromPackage.get("replication.package.length"));
-//        assertNotNull(fullPropertiesFromPackage.get("replication.package.stream"));
+        assertNotNull(fullPropertiesFromPackage.get("replication.package.stream"));
         assertNotNull(fullPropertiesFromPackage.get("replication.package.type"));
         assertNotNull(fullPropertiesFromPackage.get("replication.package.action"));
     }

Modified: sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueTest.java?rev=1552571&r1=1552570&r2=1552571&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueTest.java (original)
+++ sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueTest.java Fri Dec 20 09:56:51 2013
@@ -25,6 +25,7 @@ import org.apache.sling.replication.seri
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -40,6 +41,7 @@ public class SimpleReplicationQueueTest 
         ReplicationQueue queue = new SimpleReplicationQueue(agent, "default");
         ReplicationPackage pkg = mock(ReplicationPackage.class);
         assertTrue(queue.add(pkg));
+        assertFalse(queue.isEmpty());
     }
 
     @Test
@@ -48,7 +50,9 @@ public class SimpleReplicationQueueTest 
         ReplicationQueue queue = new SimpleReplicationQueue(agent, "default");
         ReplicationPackage pkg = mock(ReplicationPackage.class);
         assertTrue(queue.add(pkg));
+        assertFalse(queue.isEmpty());
         queue.removeHead();
+        assertTrue(queue.isEmpty());
         ReplicationQueueItemState status = queue.getStatus(pkg);
         assertNotNull(status);
         assertTrue(status.isSuccessful());
@@ -60,8 +64,11 @@ public class SimpleReplicationQueueTest 
         ReplicationQueue queue = new SimpleReplicationQueue(agent, "default");
         ReplicationPackage pkg = mock(ReplicationPackage.class);
         assertTrue(queue.add(pkg));
+        assertFalse(queue.isEmpty());
         assertEquals(pkg, queue.getHead());
+        assertFalse(queue.isEmpty());
         queue.removeHead();
+        assertTrue(queue.isEmpty());
         ReplicationQueueItemState status = queue.getStatus(pkg);
         assertNotNull(status);
         assertTrue(status.isSuccessful());

Added: sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageImporterTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageImporterTest.java?rev=1552571&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageImporterTest.java (added)
+++ sling/trunk/contrib/extensions/replication/src/test/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageImporterTest.java Fri Dec 20 09:56:51 2013
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.serialization.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.Map;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobBuilder;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.replication.communication.ReplicationActionType;
+import org.apache.sling.replication.communication.ReplicationRequest;
+import org.apache.sling.replication.event.ReplicationEventFactory;
+import org.apache.sling.replication.serialization.ReplicationPackage;
+import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
+import org.apache.sling.replication.serialization.ReplicationPackageBuilderProvider;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Testcase for {@link org.apache.sling.replication.serialization.impl.DefaultReplicationPackageImporter}
+ */
+public class DefaultReplicationPackageImporterTest {
+
+    @Test
+    public void testSynchronousImportWithoutServices() throws Exception {
+        DefaultReplicationPackageImporter importer = new DefaultReplicationPackageImporter();
+        InputStream stream = mock(InputStream.class);
+        assertFalse(importer.importStream(stream, "some-type"));
+        assertFalse(importer.importStream(stream, null));
+    }
+
+    @Test
+    public void testSynchronousImportWithNotExistingType() throws Exception {
+        DefaultReplicationPackageImporter importer = new DefaultReplicationPackageImporter();
+        Field replicationEventFactoryField = importer.getClass().getDeclaredField("replicationEventFactory");
+        replicationEventFactoryField.setAccessible(true);
+        ReplicationEventFactory replicationEventFactory = mock(ReplicationEventFactory.class);
+        replicationEventFactoryField.set(importer, replicationEventFactory);
+
+        Field replicationPackageBuilderProviderField = importer.getClass().getDeclaredField("replicationPackageBuilderProvider");
+        replicationPackageBuilderProviderField.setAccessible(true);
+        ReplicationPackageBuilderProvider replicationPackageBuilderProvider = mock(ReplicationPackageBuilderProvider.class);
+        ReplicationPackageBuilder packageBuilder = mock(ReplicationPackageBuilder.class);
+        ReplicationPackage replicationPackage = mock(ReplicationPackage.class);
+        when(packageBuilder.readPackage(any(InputStream.class), eq(true))).thenReturn(replicationPackage);
+        when(replicationPackageBuilderProvider.getReplicationPackageBuilder("void")).thenReturn(packageBuilder);
+        replicationPackageBuilderProviderField.set(importer, replicationPackageBuilderProvider);
+
+        InputStream stream = mock(InputStream.class);
+        assertFalse(importer.importStream(stream, "some-type"));
+        assertFalse(importer.importStream(stream, null));
+    }
+
+    @Test
+    public void testSynchronousImportWithTypeParameter() throws Exception {
+        DefaultReplicationPackageImporter importer = new DefaultReplicationPackageImporter();
+        Field replicationEventFactoryField = importer.getClass().getDeclaredField("replicationEventFactory");
+        replicationEventFactoryField.setAccessible(true);
+        ReplicationEventFactory replicationEventFactory = mock(ReplicationEventFactory.class);
+        replicationEventFactoryField.set(importer, replicationEventFactory);
+
+        Field replicationPackageBuilderProviderField = importer.getClass().getDeclaredField("replicationPackageBuilderProvider");
+        replicationPackageBuilderProviderField.setAccessible(true);
+        ReplicationPackageBuilderProvider replicationPackageBuilderProvider = mock(ReplicationPackageBuilderProvider.class);
+        ReplicationPackageBuilder packageBuilder = mock(ReplicationPackageBuilder.class);
+        ReplicationPackage replicationPackage = new VoidReplicationPackage(new ReplicationRequest(System.currentTimeMillis(), ReplicationActionType.DELETE, "/content"), "void");
+        when(packageBuilder.readPackage(any(InputStream.class), eq(true))).thenReturn(replicationPackage);
+        when(replicationPackageBuilderProvider.getReplicationPackageBuilder("void")).thenReturn(packageBuilder);
+        replicationPackageBuilderProviderField.set(importer, replicationPackageBuilderProvider);
+
+        InputStream stream = mock(InputStream.class);
+        assertTrue(importer.importStream(stream, "void"));
+    }
+
+    @Test
+    public void testSynchronousImportWithoutTypeParameter() throws Exception {
+        DefaultReplicationPackageImporter importer = new DefaultReplicationPackageImporter();
+        Field replicationEventFactoryField = importer.getClass().getDeclaredField("replicationEventFactory");
+        replicationEventFactoryField.setAccessible(true);
+        ReplicationEventFactory replicationEventFactory = mock(ReplicationEventFactory.class);
+        replicationEventFactoryField.set(importer, replicationEventFactory);
+
+        Field replicationPackageBuilderProviderField = importer.getClass().getDeclaredField("replicationPackageBuilderProvider");
+        replicationPackageBuilderProviderField.setAccessible(true);
+        ReplicationPackageBuilderProvider replicationPackageBuilderProvider = mock(ReplicationPackageBuilderProvider.class);
+        ReplicationPackageBuilder packageBuilder = mock(ReplicationPackageBuilder.class);
+        ReplicationPackage replicationPackage = new VoidReplicationPackage(new ReplicationRequest(System.currentTimeMillis(), ReplicationActionType.DELETE, "/content"), "void");
+        when(packageBuilder.readPackage(any(InputStream.class), eq(true))).thenReturn(replicationPackage);
+        Collection<ReplicationPackageBuilder> packageBuilders = new LinkedList<ReplicationPackageBuilder>();
+        packageBuilders.add(packageBuilder);
+        when(replicationPackageBuilderProvider.getAvailableReplicationPackageBuilders()).thenReturn(packageBuilders);
+        replicationPackageBuilderProviderField.set(importer, replicationPackageBuilderProvider);
+
+        InputStream stream = mock(InputStream.class);
+        assertTrue(importer.importStream(stream, null));
+    }
+
+    @Test
+    public void testAsynchronousImportWithoutServices() throws Exception {
+        try {
+            DefaultReplicationPackageImporter importer = new DefaultReplicationPackageImporter();
+            InputStream stream = new ByteArrayInputStream("something".getBytes());
+            importer.scheduleImport(stream, "some-type");
+            importer.scheduleImport(stream, null);
+            fail("cannot work without a JobManager");
+        } catch (Exception e) {
+            // expected
+        }
+    }
+
+    @Test
+    public void testAsynchronousImportWithTypeParameter() throws Exception {
+        DefaultReplicationPackageImporter importer = new DefaultReplicationPackageImporter();
+        Field jobManagerField = importer.getClass().getDeclaredField("jobManager");
+        jobManagerField.setAccessible(true);
+        JobManager jobManager = mock(JobManager.class);
+        JobBuilder jobBuilder = mock(JobBuilder.class);
+        when(jobBuilder.properties(any(Map.class))).thenReturn(jobBuilder);
+        Job job = mock(Job.class);
+        when(jobBuilder.add()).thenReturn(job);
+        when(jobManager.createJob("org/apache/sling/replication/import")).thenReturn(jobBuilder);
+        jobManagerField.set(importer, jobManager);
+
+        InputStream stream = new ByteArrayInputStream("something".getBytes());
+        importer.scheduleImport(stream, "void");
+    }
+
+    @Test
+    public void testAsynchronousImportWithoutTypeParameter() throws Exception {
+        DefaultReplicationPackageImporter importer = new DefaultReplicationPackageImporter();
+        Field jobManagerField = importer.getClass().getDeclaredField("jobManager");
+        jobManagerField.setAccessible(true);
+        JobManager jobManager = mock(JobManager.class);
+        JobBuilder jobBuilder = mock(JobBuilder.class);
+        when(jobBuilder.properties(any(Map.class))).thenReturn(jobBuilder);
+        Job job = mock(Job.class);
+        when(jobBuilder.add()).thenReturn(job);
+        when(jobManager.createJob("org/apache/sling/replication/import")).thenReturn(jobBuilder);
+        jobManagerField.set(importer, jobManager);
+
+        InputStream stream = new ByteArrayInputStream("something".getBytes());
+        importer.scheduleImport(stream, null);
+    }
+}



Mime
View raw message