sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomm...@apache.org
Subject svn commit: r1568345 - in /sling/trunk/contrib/extensions/replication: ./ src/main/java/org/apache/sling/replication/agent/impl/ src/main/java/org/apache/sling/replication/queue/impl/jobhandling/ src/main/java/org/apache/sling/replication/rule/impl/ sr...
Date Fri, 14 Feb 2014 16:34:56 GMT
Author: tommaso
Date: Fri Feb 14 16:34:56 2014
New Revision: 1568345

URL: http://svn.apache.org/r1568345
Log:
SLING-3395 - event based reverse replication via Server Sent Events

Added:
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/ReplicateOnQueueEventRule.java   (with props)
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentQueueEventServlet.java   (with props)
Modified:
    sling/trunk/contrib/extensions/replication/pom.xml
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentQueueResource.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentResourceProvider.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/ScheduleReplicateReplicationRule.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandlerFactory.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerFactory.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandlerFactory.java
    sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.author/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-publish-reverse.json

Modified: sling/trunk/contrib/extensions/replication/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/pom.xml?rev=1568345&r1=1568344&r2=1568345&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/pom.xml (original)
+++ sling/trunk/contrib/extensions/replication/pom.xml Fri Feb 14 16:34:56 2014
@@ -79,6 +79,7 @@
                     org.apache.sling.replication.rule,
                     org.apache.sling.replication.serialization
                   </Export-Package>
+                  <Embed-Dependency>httpasyncclient</Embed-Dependency>
                 </instructions>
               </configuration>
             </plugin>
@@ -178,12 +179,22 @@
     <dependency>
       <groupId>org.apache.httpcomponents</groupId>
       <artifactId>fluent-hc</artifactId>
-      <version>4.2</version>
+      <version>4.3.2</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpasyncclient</artifactId>
+      <version>4.0</version>
     </dependency>
     <dependency>
       <groupId>org.apache.httpcomponents</groupId>
       <artifactId>httpcore-osgi</artifactId>
-      <version>4.2</version>
+      <version>4.3.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient-osgi</artifactId>
+      <version>4.3.2</version>
     </dependency>
     <!-- JACKRABBIT -->
     <dependency>

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentQueueResource.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentQueueResource.java?rev=1568345&r1=1568344&r2=1568345&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentQueueResource.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentQueueResource.java Fri Feb 14 16:34:56 2014
@@ -32,6 +32,10 @@ public class ReplicationAgentQueueResour
 
     public static final String SUFFIX_PATH = "/queue";
 
+    public static final String EVENT_RESOURCE_TYPE = "sling/replication/agent/queue/event";
+
+    public static final String EVENT_SUFFIX_PATH = "/queue/event";
+
     private final ReplicationQueue queue;
 
     private final ResourceResolver resourceResolver;

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentResourceProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentResourceProvider.java?rev=1568345&r1=1568344&r2=1568345&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentResourceProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentResourceProvider.java Fri Feb 14 16:34:56 2014
@@ -49,11 +49,11 @@ import org.slf4j.LoggerFactory;
 @Service(value = ResourceProvider.class)
 @Properties({
         @Property(name = ResourceProvider.ROOTS,
-            value = {
-                    ReplicationAgentResource.BASE_PATH,
-                    ReplicationAgentConfigurationResource.BASE_PATH,
-                    ReplicationAgentResource.IMPORTER_BASE_PATH
-            })
+                value = {
+                        ReplicationAgentResource.BASE_PATH,
+                        ReplicationAgentConfigurationResource.BASE_PATH,
+                        ReplicationAgentResource.IMPORTER_BASE_PATH
+                })
 
 })
 public class ReplicationAgentResourceProvider implements ResourceProvider {
@@ -82,7 +82,7 @@ public class ReplicationAgentResourcePro
 
     public Resource getResource(ResourceResolver resourceResolver, String path) {
 
-        if(!isAuthorized(resourceResolver)) return null;
+        if (!isAuthorized(resourceResolver)) return null;
 
         Resource resource = null;
 
@@ -118,7 +118,7 @@ public class ReplicationAgentResourcePro
             } else {
                 log.warn("could not find a configuration manager service");
             }
-        } else if(path.startsWith(ReplicationAgentResource.BASE_PATH+"/")) {
+        } else if (path.startsWith(ReplicationAgentResource.BASE_PATH + "/")) {
 
             if (path.endsWith(ReplicationAgentQueueResource.SUFFIX_PATH)) {
                 String agentPath = path.substring(0, path.lastIndexOf('/'));
@@ -131,10 +131,11 @@ public class ReplicationAgentResourcePro
                 } catch (ReplicationQueueException e) {
                     log.warn("could not find a queue for agent {}", agentPath);
                 }
+            } else if (path.endsWith(ReplicationAgentQueueResource.EVENT_SUFFIX_PATH)) {
+                resource = new SyntheticResource(resourceResolver, path, ReplicationAgentQueueResource.EVENT_RESOURCE_TYPE);
             } else {
-                String agentPath = path;
                 ReplicationAgent replicationAgent = getAgentAtPath(path);
-                log.info("resolving agent with path {}", agentPath);
+                log.info("resolving agent with path {}", path);
 
                 resource = replicationAgent != null ? new ReplicationAgentResource(replicationAgent,
                         resourceResolver) : null;
@@ -146,22 +147,20 @@ public class ReplicationAgentResourcePro
     }
 
 
-    private boolean isAuthorized(ResourceResolver resourceResolver){
+    private boolean isAuthorized(ResourceResolver resourceResolver) {
         boolean isAuthorized = false;
         Session session = resourceResolver.adaptTo(Session.class);
-        if(session != null) {
-            try{
+        if (session != null) {
+            try {
                 isAuthorized = session.nodeExists(SECURITY_OBJECT);
-            }
-            catch (Exception ex){
+            } catch (Exception ex) {
             }
         }
 
 
-        if(isAuthorized){
+        if (isAuthorized) {
             log.debug("granting access to agent resources as user can read /system/replication/security");
-        }
-        else {
+        } else {
             log.debug("denying access to agent resources as user can't read /system/replication/security");
         }
 
@@ -169,9 +168,7 @@ public class ReplicationAgentResourcePro
     }
 
     private String getAgentNameAtPath(String path) {
-        String agentName = path.substring(path.lastIndexOf('/') + 1);
-
-        return agentName;
+        return path.substring(path.lastIndexOf('/') + 1);
     }
 
     private ReplicationAgent getAgentAtPath(String path) {

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java?rev=1568345&r1=1568344&r2=1568345&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/ReplicationAgentServiceFactory.java Fri Feb 14 16:34:56 2014
@@ -113,8 +113,6 @@ public class ReplicationAgentServiceFact
 
     private ServiceRegistration agentReg;
 
-    private ServiceRegistration jobReg;
-
     @Reference
     private ReplicationRuleEngine replicationRuleEngine;
 
@@ -183,9 +181,7 @@ public class ReplicationAgentServiceFact
 
             replicationAgent.disable();
 
-            if (agentReg != null) {
-                agentReg.unregister();
-            }
+            agentReg.unregister();
         }
     }
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java?rev=1568345&r1=1568344&r2=1568345&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/agent/impl/SimpleReplicationAgent.java Fri Feb 14 16:34:56 2014
@@ -208,7 +208,7 @@ public class SimpleReplicationAgent impl
         try {
             ReplicationPackage replicationPackage = packageBuilder.getPackage(itemInfo.getId());
 
-            if (replicationPackage == null || transportHandler == null) {
+            if (replicationPackage == null || isPassive()) {
                 log.info("agent {} processing skipped", name);
                 return false;
             } else {
@@ -254,22 +254,26 @@ public class SimpleReplicationAgent impl
 
 
     public void enable() {
+        log.info("enabling agent");
         // apply rules if any
         if (rules.length > 0) {
             ruleEngine.applyRules(this, rules);
         }
 
-        if (!isPassive())
+        if (!isPassive()) {
             queueProvider.enableQueueProcessing(this, this);
+        }
     }
 
     public void disable() {
+        log.info("disabling agent");
         if (rules != null) {
             ruleEngine.unapplyRules(this, rules);
         }
 
-        if (!isPassive())
+        if (!isPassive()) {
             queueProvider.disableQueueProcessing(this);
+        }
     }
 
 }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java?rev=1568345&r1=1568344&r2=1568345&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java Fri Feb 14 16:34:56 2014
@@ -23,55 +23,62 @@ import java.util.Dictionary;
 import java.util.Hashtable;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.felix.scr.annotations.*;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
 import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
 import org.apache.sling.event.jobs.JobManager;
 import org.apache.sling.event.jobs.Queue;
 import org.apache.sling.event.jobs.QueueConfiguration;
 import org.apache.sling.event.jobs.consumer.JobConsumer;
-import org.apache.sling.replication.queue.ReplicationQueueProcessor;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceRegistration;
-import org.osgi.service.cm.Configuration;
-import org.osgi.service.cm.ConfigurationAdmin;
-
 import org.apache.sling.replication.agent.ReplicationAgent;
 import org.apache.sling.replication.queue.ReplicationQueue;
 import org.apache.sling.replication.queue.ReplicationQueueException;
+import org.apache.sling.replication.queue.ReplicationQueueProcessor;
 import org.apache.sling.replication.queue.ReplicationQueueProvider;
 import org.apache.sling.replication.queue.impl.AbstractReplicationQueueProvider;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.cm.Configuration;
+import org.osgi.service.cm.ConfigurationAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @Component(metatype = false)
 @Service(value = ReplicationQueueProvider.class)
 @Property(name = "name", value = JobHandlingReplicationQueueProvider.NAME)
 public class JobHandlingReplicationQueueProvider extends AbstractReplicationQueueProvider
-                implements ReplicationQueueProvider {
+        implements ReplicationQueueProvider {
 
     public static final String NAME = "sjh";
 
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
     @Reference
     private JobManager jobManager;
 
     @Reference
     private ConfigurationAdmin configAdmin;
 
-    private Map<String, ServiceRegistration> jobs = new ConcurrentHashMap<String, ServiceRegistration>();
+    private final Map<String, ServiceRegistration> jobs = new ConcurrentHashMap<String, ServiceRegistration>();
     private BundleContext context;
 
     @Override
     protected ReplicationQueue getOrCreateQueue(ReplicationAgent agent, String queueName)
-                    throws ReplicationQueueException {
+            throws ReplicationQueueException {
         try {
             String name = agent.getName() + queueName;
             String topic = JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC + '/' + name;
             if (jobManager.getQueue(name) == null) {
                 Configuration config = configAdmin.createFactoryConfiguration(
-                                QueueConfiguration.class.getName(), null);
+                        QueueConfiguration.class.getName(), null);
                 Dictionary<String, Object> props = new Hashtable<String, Object>();
                 props.put(ConfigurationConstants.PROP_NAME, name);
                 props.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.ORDERED.name());
-                props.put(ConfigurationConstants.PROP_TOPICS, new String[] { topic });
+                props.put(ConfigurationConstants.PROP_TOPICS, new String[]{topic});
                 props.put(ConfigurationConstants.PROP_RETRIES, -1);
                 props.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L);
                 props.put(ConfigurationConstants.PROP_KEEP_JOBS, true);
@@ -96,26 +103,34 @@ public class JobHandlingReplicationQueue
         String topic = JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC + '/' + agent.getName();
         String childTopic = topic + "/*";
         jobProps.put(JobConsumer.PROPERTY_TOPICS, new String[]{topic, childTopic});
-        ServiceRegistration jobReg = context.registerService(JobConsumer.class.getName(),
-                new ReplicationAgentJobConsumer(agent, queueProcessor), jobProps);
-        jobs.put(agent.getName(), jobReg);
+        synchronized (jobs) {
+            log.info("registering job consumer for agent {}", agent.getName());
+            ServiceRegistration jobReg = context.registerService(JobConsumer.class.getName(),
+                    new ReplicationAgentJobConsumer(agent, queueProcessor), jobProps);
+            jobs.put(agent.getName(), jobReg);
+            log.info("job consumer for agent {} registered", agent.getName());
+        }
     }
 
     public void disableQueueProcessing(ReplicationAgent agent) {
-        ServiceRegistration jobReg = jobs.remove(agent.getName());
-        if (jobReg != null) {
-            jobReg.unregister();
+        synchronized (jobs) {
+            log.info("unregistering job consumer for agent {}", agent.getName());
+            ServiceRegistration jobReg = jobs.remove(agent.getName());
+            if (jobReg != null) {
+                jobReg.unregister();
+                log.info("job consumer for agent {} unregistered", agent.getName());
+            }
         }
     }
 
     @Activate
-    private void activate(BundleContext context){
+    private void activate(BundleContext context) {
         this.context = context;
     }
 
     @Deactivate
-    private void deactivate(BundleContext context){
-        for (ServiceRegistration jobReg : jobs.values()){
+    private void deactivate(BundleContext context) {
+        for (ServiceRegistration jobReg : jobs.values()) {
             jobReg.unregister();
         }
     }

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/ReplicateOnQueueEventRule.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/ReplicateOnQueueEventRule.java?rev=1568345&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/ReplicateOnQueueEventRule.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/ReplicateOnQueueEventRule.java Fri Feb 14 16:34:56 2014
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.rule.impl;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpResponse;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIUtils;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.impl.nio.client.HttpAsyncClients;
+import org.apache.http.nio.ContentDecoder;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.protocol.BasicAsyncRequestProducer;
+import org.apache.http.nio.protocol.BasicAsyncResponseConsumer;
+import org.apache.sling.commons.scheduler.ScheduleOptions;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.replication.agent.AgentReplicationException;
+import org.apache.sling.replication.agent.ReplicationAgent;
+import org.apache.sling.replication.agent.ReplicationAgentConfiguration;
+import org.apache.sling.replication.agent.ReplicationAgentConfigurationManager;
+import org.apache.sling.replication.agent.impl.ReplicationAgentQueueResource;
+import org.apache.sling.replication.agent.impl.ReplicationAgentResource;
+import org.apache.sling.replication.communication.ReplicationActionType;
+import org.apache.sling.replication.communication.ReplicationRequest;
+import org.apache.sling.replication.rule.ReplicationRule;
+import org.apache.sling.replication.transport.TransportHandler;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link org.apache.sling.replication.rule.ReplicationRule} to trigger
+ */
+@Component(immediate = true)
+@Service(value = ReplicationRule.class)
+public class ReplicateOnQueueEventRule implements ReplicationRule {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private static final String SIGNATURE = "queue event based {action} [on ${path}]";
+
+    private static final String SIGNATURE_REGEX = "(queue\\sevent\\sbased)\\s(add|delete|poll)(\\s(on)\\s(\\/\\w+)+)?";
+
+    private static final Pattern signaturePattern = Pattern.compile(SIGNATURE_REGEX);
+
+    @Reference
+    private ReplicationAgentConfigurationManager replicationAgentConfigurationManager;
+
+    @Reference
+    private Scheduler scheduler;
+
+    private BundleContext context;
+
+    private Map<String, Future<HttpResponse>> requests;
+
+    @Activate
+    protected void activate(BundleContext context) {
+        this.context = context;
+        this.requests = new ConcurrentHashMap<String, Future<HttpResponse>>();
+    }
+
+    public String getSignature() {
+        return SIGNATURE;
+    }
+
+    public boolean signatureMatches(String ruleString) {
+        return ruleString.matches(SIGNATURE_REGEX);
+    }
+
+    public void apply(String ruleString, ReplicationAgent agent) {
+        Matcher matcher = signaturePattern.matcher(ruleString);
+        if (matcher.find()) {
+            String action = matcher.group(2);
+            ReplicationActionType actionType = ReplicationActionType.fromName(action.toUpperCase());
+            String path = matcher.group(5); // can be null
+            try {
+                log.info("applying queue event replication rule");
+                // get configuration
+                ReplicationAgentConfiguration configuration = replicationAgentConfigurationManager.getConfiguration(agent.getName());
+
+                // get URI of the event queue
+                String targetTransport = configuration.getTargetTransportHandler();
+
+                log.info("found target transport {}", targetTransport);
+
+                ScheduleOptions options = scheduler.NOW();
+                options.name(agent.getName() + " " + ruleString);
+                scheduler.schedule(new EventBasedReplication(agent, actionType, path, targetTransport), options);
+
+            } catch (Exception e) {
+                log.error("{}", e);
+                log.error("cannot apply rule {} to agent {}", ruleString, agent);
+            }
+
+        }
+    }
+
+    public void undo(String ruleString, ReplicationAgent agent) {
+        Future<HttpResponse> httpResponseFuture = requests.remove(agent.getName());
+        if (httpResponseFuture != null) {
+            httpResponseFuture.cancel(true);
+        }
+    }
+
+    private class SSEResponseConsumer extends BasicAsyncResponseConsumer {
+
+        private final ReplicationAgent agent;
+        private final ReplicationActionType action;
+        private final String path;
+
+        private SSEResponseConsumer(ReplicationAgent agent, ReplicationActionType action, String path) {
+            this.agent = agent;
+            this.action = action;
+            this.path = path == null ? "/" : path;
+        }
+
+        @Override
+        protected void onContentReceived(ContentDecoder decoder, IOControl ioctrl) throws IOException {
+//            log.info("complete ? ", decoder.isCompleted());
+//            ByteBuffer buffer = ByteBuffer.allocate(1024);
+//            decoder.read(buffer);
+//            log.info("content {} received {},{}", new Object[]{buffer, decoder, ioctrl});
+            log.info("event received");
+
+            try {
+                asyncReplicate(agent, action, path);
+                log.info("replication request to agent {} sent ({} on {})", new Object[]{agent.getName(), action, path});
+            } catch (AgentReplicationException e) {
+                log.error("cannot replicate to agent {}, {}", agent.getName(), e);
+            }
+
+            super.onContentReceived(decoder, ioctrl);
+        }
+
+        @Override
+        protected void onResponseReceived(HttpResponse response) throws IOException {
+            log.info("response received {}", response);
+            super.onResponseReceived(response);
+        }
+    }
+
+    private void asyncReplicate(ReplicationAgent agent, ReplicationActionType action, String path) throws AgentReplicationException {
+        agent.send(new ReplicationRequest(System.currentTimeMillis(), action, path));
+    }
+
+    private class EventBasedReplication implements Runnable {
+        private final ReplicationAgent agent;
+        private final ReplicationActionType actionType;
+        private final String targetTransport;
+        private final String path;
+
+        public EventBasedReplication(ReplicationAgent agent, ReplicationActionType actionType, String path, String targetTransport) {
+            this.agent = agent;
+            this.actionType = actionType;
+            this.targetTransport = targetTransport;
+            this.path = path;
+        }
+
+        public void run() {
+            try {
+                ServiceReference[] serviceReferences = context.getServiceReferences(TransportHandler.class.getName(), targetTransport);
+
+                log.info("reference transport for {} found {}", targetTransport, serviceReferences != null);
+
+                if (serviceReferences != null && serviceReferences.length == 1) {
+
+                    Object endpointsProperty = serviceReferences[0].getProperty("endpoints");
+                    Object authenticationPropertiesProperty = serviceReferences[0].getProperty("authentication.properties");
+
+                    log.info("endpoint prop: {} authentication properties prop: {}", endpointsProperty, authenticationPropertiesProperty);
+
+                    String[] endpoints = (String[]) endpointsProperty;
+                    Map<String, String> authenticationProperties = (Map<String, String>) authenticationPropertiesProperty;
+
+                    log.info("endpoint {} props {}", endpoints, authenticationProperties);
+                    // only works with HTTP
+                    if (endpoints.length == 1 && endpoints[0].startsWith("http") && endpoints[0].contains(ReplicationAgentResource.BASE_PATH) && authenticationProperties != null) {
+                        log.info("getting event queue URI");
+                        URI eventEndpoint = URI.create(endpoints[0] + ReplicationAgentQueueResource.EVENT_SUFFIX_PATH);
+                        String userName = authenticationProperties.get("user");
+                        String password = authenticationProperties.get("password");
+
+                        log.info("preparing request");
+                        CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+                        credentialsProvider.setCredentials(
+                                new AuthScope(eventEndpoint.getHost(), eventEndpoint.getPort()),
+                                new UsernamePasswordCredentials(userName, password));
+                        CloseableHttpAsyncClient httpClient = HttpAsyncClients.custom()
+                                .setDefaultCredentialsProvider(credentialsProvider)
+                                .build();
+                        try {
+                            HttpGet get = new HttpGet(eventEndpoint);
+                            HttpHost target = URIUtils.extractHost(get.getURI());
+                            BasicAsyncRequestProducer basicAsyncRequestProducer = new BasicAsyncRequestProducer(target, get);
+                            httpClient.start();
+                            log.info("sending request");
+                            Future<HttpResponse> futureResponse = httpClient.execute(
+                                    basicAsyncRequestProducer,
+                                    new SSEResponseConsumer(agent, actionType, path), null);
+                            requests.put(agent.getName(), futureResponse);
+                            futureResponse.get();
+
+                        } finally {
+                            httpClient.close();
+                        }
+                        log.info("request finished");
+                    }
+                }
+            } catch (Exception e) {
+                log.error("cannot execute event based replication");
+            }
+        }
+    }
+}

Propchange: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/ReplicateOnQueueEventRule.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/ScheduleReplicateReplicationRule.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/ScheduleReplicateReplicationRule.java?rev=1568345&r1=1568344&r2=1568345&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/ScheduleReplicateReplicationRule.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/rule/impl/ScheduleReplicateReplicationRule.java Fri Feb 14 16:34:56 2014
@@ -61,14 +61,15 @@ public class ScheduleReplicateReplicatio
     public void apply(String ruleString, ReplicationAgent agent) {
         if (signatureMatches(ruleString)) {
             Matcher matcher = signaturePattern.matcher(ruleString);
-            matcher.find();
-            String action = matcher.group(2);
-            ReplicationActionType actionType = ReplicationActionType.fromName(action.toUpperCase());
-            String path = matcher.group(5); // can be null
-            int seconds = Integer.parseInt(matcher.group(7));
-            ScheduleOptions options = scheduler.NOW(-1, seconds);
-            options.name(agent.getName() + " " + ruleString);
-            scheduler.schedule(new ScheduledReplication(agent, actionType, path), options);
+            if (matcher.find()) {
+                String action = matcher.group(2);
+                ReplicationActionType actionType = ReplicationActionType.fromName(action.toUpperCase());
+                String path = matcher.group(5); // can be null
+                int seconds = Integer.parseInt(matcher.group(7));
+                ScheduleOptions options = scheduler.NOW(-1, seconds);
+                options.name(agent.getName() + " " + ruleString);
+                scheduler.schedule(new ScheduledReplication(agent, actionType, path), options);
+            }
         }
     }
 

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentQueueEventServlet.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentQueueEventServlet.java?rev=1568345&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentQueueEventServlet.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentQueueEventServlet.java Fri Feb 14 16:34:56 2014
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.servlet;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.servlet.Servlet;
+import javax.servlet.ServletException;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.api.SlingHttpServletRequest;
+import org.apache.sling.api.SlingHttpServletResponse;
+import org.apache.sling.api.servlets.SlingAllMethodsServlet;
+import org.apache.sling.replication.agent.ReplicationAgent;
+import org.apache.sling.replication.agent.ReplicationAgentsManager;
+import org.apache.sling.replication.agent.impl.ReplicationAgentQueueResource;
+import org.apache.sling.replication.event.ReplicationEvent;
+import org.apache.sling.replication.event.ReplicationEventType;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Queue Server Sent Events servlet
+ */
+@SuppressWarnings("serial")
+@Component(metatype = false)
+@Service(value = Servlet.class)
+@Properties({
+        @Property(name = "sling.servlet.resourceTypes", value = ReplicationAgentQueueResource.EVENT_RESOURCE_TYPE),
+        @Property(name = "sling.servlet.methods", value = "GET")
+})
+public class ReplicationAgentQueueEventServlet extends SlingAllMethodsServlet {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private static final Map<String, Collection<String>> cachedEvents = new ConcurrentHashMap<String, Collection<String>>();
+
+    @Reference
+    private ReplicationAgentsManager replicationAgentsManager;
+
+    private ServiceRegistration registration;
+
+    @Activate
+    protected void activate(BundleContext context) {
+        log.info("activating SSE");
+        Dictionary<String, Object> properties = new Hashtable<String, Object>();
+        properties.put(EventConstants.EVENT_TOPIC, ReplicationEvent.getTopic(ReplicationEventType.PACKAGE_QUEUED));
+        registration = context.registerService(EventHandler.class.getName(), new SSEListener(), properties);
+        if (log.isInfoEnabled()) {
+            log.info("SSE activated : {}", registration != null);
+        }
+    }
+
+    @Deactivate
+    protected void deactivate() throws Exception {
+        log.info("deactivating SSE");
+        if (registration != null) {
+            registration.unregister();
+        }
+        synchronized (cachedEvents) {
+            cachedEvents.clear();
+            cachedEvents.notifyAll();
+        }
+    }
+
+
+    @Override
+    protected void doGet(SlingHttpServletRequest request, SlingHttpServletResponse response)
+            throws ServletException, IOException {
+
+        // setup SSE headers
+        response.setContentType("text/event-stream");
+        response.setCharacterEncoding("UTF-8");
+        response.setHeader("Cache-Control", "no-cache");
+        response.setHeader("Connection", "keep-alive");
+        // needed to allow the JavaScript EventSource API to make a call from author to this server and listen for the events
+        response.setHeader("Access-Control-Allow-Origin", request.getHeader("Origin"));
+        response.setHeader("Access-Control-Allow-Credentials", "true");
+
+        String agentName = request.getResource().getParent().getParent().adaptTo(ReplicationAgent.class).getName();
+        PrintWriter writer = response.getWriter();
+        while (true) {
+            try {
+                synchronized (cachedEvents) {
+                    cachedEvents.wait();
+                    Collection<String> eventsForAgent = cachedEvents.get(agentName);
+                    if (eventsForAgent != null) {
+                        for (String event : eventsForAgent) {
+                            writeEvent(writer, agentName + "-queue-event", event);
+                        }
+                    }
+                }
+            } catch (InterruptedException e) {
+                if (log.isErrorEnabled()) {
+                    log.error("error during SSE", e);
+                }
+                throw new ServletException(e);
+            }
+        }
+
+    }
+
+    /* Write a single server-sent event to the response stream for the given event and message */
+    private void writeEvent(PrintWriter writer, String event, String message)
+            throws IOException {
+
+        // write the event type (make sure to include the double newline)
+        writer.write("id: " + System.nanoTime() + "\n");
+
+        // write the actual data
+        // this could be simple text or could be JSON-encoded text that the
+        // client then decodes
+        writer.write("data: " + message + "\n\n");
+
+        // flush the buffers to make sure the container sends the bytes
+        writer.flush();
+        if (log.isInfoEnabled()) {
+            log.info("SSE event {}: {}", event, message);
+        }
+    }
+
+    private class SSEListener implements EventHandler {
+        public void handleEvent(Event event) {
+            if (log.isInfoEnabled()) {
+                log.info("SSE listener running on event {}", event);
+            }
+            Object pathProperty = event.getProperty("replication.package.paths");
+            Object agentNameProperty = event.getProperty("replication.agent.name");
+            if (log.isInfoEnabled()) {
+                log.info("cached events {}", cachedEvents.size());
+            }
+            if (pathProperty != null && agentNameProperty != null) {
+                String agentName = String.valueOf(agentNameProperty);
+                String[] paths = (String[]) pathProperty;
+                synchronized (cachedEvents) {
+                    if (log.isInfoEnabled()) {
+                        log.info("queue event for agent {} on paths {}", agentName, Arrays.toString(paths));
+                    }
+                    Collection<String> eventsForAgent = cachedEvents.get(agentName);
+                    if (eventsForAgent == null) {
+                        eventsForAgent = new LinkedList<String>();
+                    }
+                    eventsForAgent.add(Arrays.toString(paths));
+                    cachedEvents.put(agentName, eventsForAgent);
+                    cachedEvents.notifyAll();
+                }
+            }
+        }
+    }
+}

Propchange: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/servlet/ReplicationAgentQueueEventServlet.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandlerFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandlerFactory.java?rev=1568345&r1=1568344&r2=1568345&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandlerFactory.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/AbstractTransportHandlerFactory.java Fri Feb 14 16:34:56 2014
@@ -18,6 +18,12 @@
  */
 package org.apache.sling.replication.transport.impl;
 
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
 import org.apache.sling.commons.osgi.PropertiesUtil;
 import org.apache.sling.replication.agent.ReplicationAgentConfiguration;
 import org.apache.sling.replication.communication.ReplicationEndpoint;
@@ -27,8 +33,6 @@ import org.apache.sling.replication.tran
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
 
-import java.util.*;
-
 public abstract class AbstractTransportHandlerFactory {
 
     private ServiceRegistration serviceRegistration;
@@ -44,7 +48,6 @@ public abstract class AbstractTransportH
                     .toString(config.get(ReplicationAgentConfiguration.NAME), String.valueOf(new Random().nextInt(1000)));
             props.put(ReplicationAgentConfiguration.NAME, name);
 
-
             Map<String, String> authenticationProperties = PropertiesUtil.toMap(config.get(ReplicationAgentConfiguration.AUTHENTICATION_PROPERTIES), new String[0]);
             props.put(ReplicationAgentConfiguration.AUTHENTICATION_PROPERTIES, authenticationProperties);
 
@@ -65,8 +68,8 @@ public abstract class AbstractTransportH
 
             List<ReplicationEndpoint> replicationEndpoints = new ArrayList<ReplicationEndpoint>();
 
-            for(String endpoint : endpoints){
-                if(endpoint != null && endpoint.length() > 0){
+            for (String endpoint : endpoints) {
+                if (endpoint != null && endpoint.length() > 0) {
                     replicationEndpoints.add(new ReplicationEndpoint(endpoint));
                 }
             }
@@ -88,7 +91,7 @@ public abstract class AbstractTransportH
     }
 
     protected void deactivate() {
-        if(serviceRegistration != null){
+        if (serviceRegistration != null) {
             serviceRegistration.unregister();
             serviceRegistration = null;
         }

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerFactory.java?rev=1568345&r1=1568344&r2=1568345&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerFactory.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/HttpTransportHandlerFactory.java Fri Feb 14 16:34:56 2014
@@ -44,7 +44,6 @@ public class HttpTransportHandlerFactory
 
     private static final String DEFAULT_AUTHENTICATION_FACTORY = "(name=" + UserCredentialsTransportAuthenticationProviderFactory.TYPE + ")";
 
-
     @Property(boolValue = true)
     private static final String ENABLED = "enabled";
 
@@ -54,7 +53,6 @@ public class HttpTransportHandlerFactory
     @Property(cardinality = 1000)
     private static final String ENDPOINT = ReplicationAgentConfiguration.ENDPOINT;
 
-
     @Property(options = {
             @PropertyOption(name = "All",
                     value = "all endpoints"
@@ -76,7 +74,6 @@ public class HttpTransportHandlerFactory
     @Property
     private static final String AUTHENTICATION_PROPERTIES = ReplicationAgentConfiguration.AUTHENTICATION_PROPERTIES;
 
-
     @Property(boolValue = false)
     private static final String USE_CUSTOM_HEADERS = "useCustomHeaders";
 

Modified: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandlerFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandlerFactory.java?rev=1568345&r1=1568344&r2=1568345&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandlerFactory.java (original)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/transport/impl/PollingTransportHandlerFactory.java Fri Feb 14 16:34:56 2014
@@ -43,17 +43,14 @@ import java.util.Map;
 public class PollingTransportHandlerFactory extends AbstractTransportHandlerFactory {
     static final String SERVICE_PID = "org.apache.sling.replication.transport.impl.PollingTransportHandlerFactory";
 
-
     private static final String DEFAULT_AUTHENTICATION_FACTORY = "(name=" + UserCredentialsTransportAuthenticationProviderFactory.TYPE + ")";
 
-
     @Property(boolValue = true)
     private static final String ENABLED = "enabled";
 
     @Property
     private static final String NAME = "name";
 
-
     @Property(cardinality = 1000)
     private static final String ENDPOINT = ReplicationAgentConfiguration.ENDPOINT;
 
@@ -71,7 +68,6 @@ public class PollingTransportHandlerFact
     private ReplicationPackageImporter replicationPackageImporter;
 
 
-
     protected TransportHandler createTransportHandler(Map<String, ?> config,
                                                       Dictionary<String, Object> props,
                                                       TransportAuthenticationProvider transportAuthenticationProvider,
@@ -92,8 +88,6 @@ public class PollingTransportHandlerFact
         return transportAuthenticationProviderFactory;
     }
 
-
-
     @Activate
     protected void activate(BundleContext context, Map<String, ?> config) throws Exception {
         super.activate(context, config);

Modified: sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.author/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-publish-reverse.json
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.author/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-publish-reverse.json?rev=1568345&r1=1568344&r2=1568345&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.author/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-publish-reverse.json (original)
+++ sling/trunk/contrib/extensions/replication/src/main/resources/SLING-CONTENT/libs/sling/replication/config.author/org.apache.sling.replication.agent.impl.ReplicationAgentServiceFactory-publish-reverse.json Fri Feb 14 16:34:56 2014
@@ -4,6 +4,6 @@
     "TransportHandler.target" : "(name=http-publish-poll)",
     "ReplicationPackageBuilder.target" : "(name=void)",
     "ReplicationQueueProvider.target" : "(name=sjh)",
-    "ReplicationQueueDistributionStrategy.target" : "(name=single)",
+    "ReplicationQueueDistributionStrategy.target" : "(name=error)",
     "rules" : ["scheduled poll every 30 sec"]
 }



Mime
View raw message