incubator-sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bdelacre...@apache.org
Subject svn commit: r1547378 [2/5] - in /sling/trunk/contrib/extensions/replication: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/sling/ src/main/java/org/apache/sling/replication/ src/main/java/org/apa...
Date Tue, 03 Dec 2013 13:19:53 GMT
Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/communication/ReplicationActionType.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/communication/ReplicationActionType.java?rev=1547378&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/communication/ReplicationActionType.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/communication/ReplicationActionType.java Tue Dec  3 13:19:50 2013
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.communication;
+
+/**
+ * The type of a specific replication action, used to decide what to do with specific replication
+ * items / requests.
+ */
+public enum ReplicationActionType {
+
+    /**
+     * Content is activated
+     */
+    ACTIVATE("Activate"),
+
+    /**
+     * Content is deactivated
+     */
+    DEACTIVATE("Deactivate");
+
+    /**
+     * internal human readable name
+     */
+    private final String name;
+
+    /**
+     * Create a type
+     * 
+     * @param name
+     *            name
+     */
+    private ReplicationActionType(String name) {
+        this.name = name;
+    }
+
+    /**
+     * Returns the human readable type name of this type.
+     * 
+     * @return the name
+     */
+    public String getName() {
+        return name;
+    }
+
+    /**
+     * Creates an action type for the given name. if the name cannot be mapped to a enum type or if
+     * it's <code>null</code>, <code>null</code> is returned.
+     * 
+     * @param n
+     *            the name
+     * @return the type or <code>null</code>
+     */
+    public static ReplicationActionType fromName(String n) {
+        if (n == null) {
+            return null;
+        }
+        try {
+            return ReplicationActionType.valueOf(n.toUpperCase());
+        } catch (IllegalArgumentException e) {
+            return null;
+        }
+    }
+
+}

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/communication/ReplicationAgentHistory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/communication/ReplicationAgentHistory.java?rev=1547378&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/communication/ReplicationAgentHistory.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/communication/ReplicationAgentHistory.java Tue Dec  3 13:19:50 2013
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.communication;
+
+import java.util.SortedMap;
+
+/**
+ * The communication history for a certain agent
+ */
+public interface ReplicationAgentHistory {
+
+    SortedMap<ReplicationRequest, ReplicationResponse> getCommunication();
+
+    ReplicationResponse getResponseFor(ReplicationRequest request);
+
+    ReplicationResponse getResponseFor(String requestId);
+}

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/communication/ReplicationEndpoint.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/communication/ReplicationEndpoint.java?rev=1547378&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/communication/ReplicationEndpoint.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/communication/ReplicationEndpoint.java Tue Dec  3 13:19:50 2013
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.communication;
+
+import java.net.URI;
+
+/**
+ * A replication endpoint
+ */
+public class ReplicationEndpoint {
+
+    private final URI uri;
+
+    public ReplicationEndpoint(URI uri) {
+        this.uri = uri;
+    }
+
+    public ReplicationEndpoint(String uriString) {
+        this.uri = URI.create(uriString);
+    }
+
+    public URI getUri() {
+        return uri;
+    }
+
+}

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/communication/ReplicationRequest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/communication/ReplicationRequest.java?rev=1547378&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/communication/ReplicationRequest.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/communication/ReplicationRequest.java Tue Dec  3 13:19:50 2013
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.communication;
+
+/**
+ * A replication request
+ */
+public class ReplicationRequest {
+
+    private final long time;
+
+    private final ReplicationActionType action;
+
+    private final String[] paths;
+
+    public ReplicationRequest(long time, ReplicationActionType action, String... paths) {
+        this.time = time;
+        this.action = action;
+        this.paths = paths;
+    }
+
+    public long getTime() {
+        return time;
+    }
+
+    public ReplicationActionType getAction() {
+        return action;
+    }
+
+    public String[] getPaths() {
+        return paths;
+    }
+}

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/communication/ReplicationResponse.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/communication/ReplicationResponse.java?rev=1547378&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/communication/ReplicationResponse.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/communication/ReplicationResponse.java Tue Dec  3 13:19:50 2013
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.communication;
+
+/**
+ * A replication response
+ */
+public class ReplicationResponse {
+    private boolean successfull;
+
+    private String status;
+
+    public boolean isSuccessfull() {
+        return successfull;
+    }
+
+    public void setSuccessfull(boolean successfull) {
+        this.successfull = successfull;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("{\"success\":");
+        sb.append(successfull);
+        sb.append(", \"status\":\"");
+        sb.append(status);
+        sb.append("\"}");
+        return sb.toString();
+    }
+
+    public String getStatus() {
+        return status;
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+    }
+}

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/event/ReplicationEvent.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/event/ReplicationEvent.java?rev=1547378&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/event/ReplicationEvent.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/event/ReplicationEvent.java Tue Dec  3 13:19:50 2013
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.event;
+
+import java.util.Dictionary;
+import org.osgi.service.event.Event;
+
+/**
+ * A replication related event
+ */
+public class ReplicationEvent extends Event {
+
+    public static final String EVENT_TOPIC = "org/apache/sling/replication/event";
+
+    public ReplicationEvent(ReplicationEventType type, Dictionary properties) {
+        super(new StringBuilder(EVENT_TOPIC).append(type.toString()).toString(), properties);
+    }
+}

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/event/ReplicationEventFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/event/ReplicationEventFactory.java?rev=1547378&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/event/ReplicationEventFactory.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/event/ReplicationEventFactory.java Tue Dec  3 13:19:50 2013
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.event;
+
+import java.util.Dictionary;
+
+/**
+ * generate replication related events
+ */
+public interface ReplicationEventFactory {
+
+    /**
+     * generate a replication event
+     * @param replicationEventType the type of event to be generated
+     * @param properties a dictionary of properties to be attached to the event
+     */
+    void generateEvent(ReplicationEventType replicationEventType, Dictionary properties);
+}

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/event/ReplicationEventType.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/event/ReplicationEventType.java?rev=1547378&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/event/ReplicationEventType.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/event/ReplicationEventType.java Tue Dec  3 13:19:50 2013
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.event;
+
+/**
+ * an enum of the possible types of events related to replication
+ */
+public enum ReplicationEventType {
+
+    PACKAGE_CREATED,
+    PACKAGE_DELETED,
+    PACKAGE_REPLICATED,
+    PACKAGE_INSTALLED,
+    AGENT_CREATED,
+    AGENT_MODIFIED,
+    AGENT_DELETED
+
+}

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/event/impl/DefaultReplicationEventFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/event/impl/DefaultReplicationEventFactory.java?rev=1547378&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/event/impl/DefaultReplicationEventFactory.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/event/impl/DefaultReplicationEventFactory.java Tue Dec  3 13:19:50 2013
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.event.impl;
+
+import java.util.Dictionary;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.replication.event.ReplicationEvent;
+import org.apache.sling.replication.event.ReplicationEventFactory;
+import org.apache.sling.replication.event.ReplicationEventType;
+import org.osgi.service.event.EventAdmin;
+
+/**
+ * {@link ReplicationEventFactory} OSGi service
+ */
+@Component(immediate = true)
+@Service(value = ReplicationEventFactory.class)
+public class DefaultReplicationEventFactory implements ReplicationEventFactory {
+
+    @Reference
+    private EventAdmin eventAdmin;
+
+    public void postEvent(ReplicationEvent event) {
+        eventAdmin.postEvent(event);
+    }
+
+    public void generateEvent(ReplicationEventType replicationEventType, Dictionary properties) {
+        postEvent(new ReplicationEvent(replicationEventType, properties));
+    }
+
+}

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueue.java?rev=1547378&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueue.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueue.java Tue Dec  3 13:19:50 2013
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.queue;
+
+import org.apache.sling.replication.agent.ReplicationAgent;
+import org.apache.sling.replication.serialization.ReplicationPackage;
+
+/**
+ * a queue for handling {@link org.apache.sling.replication.agent.ReplicationAgent}s' requests
+ */
+public interface ReplicationQueue {
+
+    /**
+     * add a replication package to this queue
+     *
+     * @param replicationPackage
+     *            a replication package to replicate
+     * @return <code>true</code> if the replication package was added correctly to the queue,
+     *         <code>false</code otherwise
+     * @throws ReplicationQueueException
+     */
+    boolean add(ReplicationPackage replicationPackage) throws ReplicationQueueException;
+
+    /**
+     * get the status of a certain package in the queue
+     *
+     * @param replicationPackage
+     *            the replication package to get the status for
+     * @return the item status in the queue
+     * @throws ReplicationQueueException
+     */
+    ReplicationQueueItemState getStatus(ReplicationPackage replicationPackage)
+                    throws ReplicationQueueException;
+
+    /**
+     * get the agent this queue is used for
+     *
+     * @return a replication agent
+     * @throws ReplicationQueueException
+     */
+    ReplicationAgent getAgent() throws ReplicationQueueException;
+
+    /**
+     * get the first item (FIFO wise, the next to be processed) into the queue
+     *
+     * @return the first replication package into the queue
+     * @throws ReplicationQueueException
+     */
+    ReplicationPackage getHead() throws ReplicationQueueException;
+
+    /**
+     * remove the first package into the queue from it
+     *
+     * @throws ReplicationQueueException
+     */
+    void removeHead() throws ReplicationQueueException;
+
+    /**
+     * check if the queue is empty
+     *
+     * @return <code>true</code> if the queue is empty, <code>false</code> otherwise
+     * @throws ReplicationQueueException
+     */
+    boolean isEmpty() throws ReplicationQueueException;
+}

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java?rev=1547378&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueDistributionStrategy.java Tue Dec  3 13:19:50 2013
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.queue;
+
+import org.apache.sling.replication.agent.ReplicationAgent;
+import org.apache.sling.replication.serialization.ReplicationPackage;
+
+/**
+ * a {@link ReplicationQueueDistributionStrategy} implements an algorithm for the distribution of
+ * replication packages among the available queues for a certain agent
+ */
+public interface ReplicationQueueDistributionStrategy {
+
+    ReplicationQueueItemState add(ReplicationPackage replicationPackage, ReplicationAgent agent,
+                                  ReplicationQueueProvider queueProvider) throws ReplicationQueueException;
+
+    void offer(ReplicationPackage replicationPackage, ReplicationAgent agent,
+               ReplicationQueueProvider queueProvider) throws ReplicationQueueException;
+
+}

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueException.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueException.java?rev=1547378&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueException.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueException.java Tue Dec  3 13:19:50 2013
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.queue;
+
+/**
+ * Represents errors happening during queue operations
+ */
+public class ReplicationQueueException extends Exception {
+
+    public ReplicationQueueException(String message, Exception e) {
+        super(message, e);
+    }
+
+    public ReplicationQueueException(String string) {
+        super(string);
+    }
+}

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemState.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemState.java?rev=1547378&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemState.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueItemState.java Tue Dec  3 13:19:50 2013
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.queue;
+
+/**
+ * the current status of a certain item in a {@link ReplicationQueue}
+ */
+
+public class ReplicationQueueItemState {
+
+    private int attempts;
+
+    private boolean successfull;
+
+    private ItemState state;
+
+    public boolean isSuccessfull() {
+        return ItemState.SUCCEEDED.equals(state);
+    }
+
+    public void setSuccessfull(boolean successfull) {
+        state = successfull ? ItemState.SUCCEEDED : ItemState.ERROR;
+    }
+
+    public int getAttempts() {
+        return attempts;
+    }
+
+    public void setAttempts(int attempts) {
+        this.attempts = attempts;
+    }
+
+    public ItemState getItemState() {
+        return state;
+    }
+
+    public void setItemState(ItemState status) {
+        this.state = status;
+    }
+
+    @Override
+    public String toString() {
+        return new StringBuilder("{\"attempts\":\"").append(attempts).append("\",\"")
+                        .append("successfull\":\"").append(successfull).append("\",\"")
+                        .append("state\":\"").append(state).append("\"}").toString();
+    }
+
+    public enum ItemState {
+        QUEUED, // waiting in queue after adding or for restart after failing
+        ACTIVE, // job is currently in processing
+        SUCCEEDED, // processing finished successfully
+        STOPPED, // processing was stopped by a user
+        GIVEN_UP, // number of retries reached
+        ERROR, // processing signaled CANCELLED or throw an exception
+        DROPPED // dropped jobs
+    }
+
+}

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java?rev=1547378&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/ReplicationQueueProvider.java Tue Dec  3 13:19:50 2013
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.queue;
+
+import java.util.Collection;
+
+import org.apache.sling.replication.agent.ReplicationAgent;
+import org.apache.sling.replication.serialization.ReplicationPackage;
+
+/**
+ * A provider for {@link ReplicationQueue}s
+ */
+public interface ReplicationQueueProvider {
+
+    /**
+     * provide the queue to be used for a certain agent and package or creates it if it doesn't
+     * exist
+     * 
+     * @param agent
+     *            the replication agent needing the queue
+     * @param name
+     *            the name of the queue to retrieve
+     * @return a replication queue to be used for the given parameters
+     * @throws ReplicationQueueException
+     */
+    ReplicationQueue getOrCreateQueue(ReplicationAgent agent, String name)
+                    throws ReplicationQueueException;
+
+    /**
+     * provide the queue to be used for a certain agent and package or creates it if it doesn't
+     * exist
+     * 
+     * @param agent
+     *            the replication agent needing the queue
+     * @param replicationPackage
+     *            the package for which the queue should be used
+     * @return a replication queue to be used for the given parameters
+     * @throws ReplicationQueueException
+     */
+    ReplicationQueue getOrCreateQueue(ReplicationAgent agent, ReplicationPackage replicationPackage)
+                    throws ReplicationQueueException;
+
+    /**
+     * get the default queue to be used for a certain agent
+     * 
+     * @param agent
+     *            a replication agent
+     * @return the default replication queue for the given agent
+     * @throws ReplicationQueueException
+     */
+    ReplicationQueue getOrCreateDefaultQueue(ReplicationAgent agent)
+                    throws ReplicationQueueException;
+
+    /**
+     * get all the available queues from this provider
+     * 
+     * @return a collection of replication queues
+     * @throws ReplicationQueueException
+     */
+    Collection<ReplicationQueue> getAllQueues() throws ReplicationQueueException;
+
+    /**
+     * removes an existing queue owned by this provider
+     * 
+     * @param queue
+     *            a replication queue to be removed
+     * @throws ReplicationQueueException
+     */
+    void removeQueue(ReplicationQueue queue) throws ReplicationQueueException;
+}

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java?rev=1547378&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/AbstractReplicationQueueProvider.java Tue Dec  3 13:19:50 2013
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.queue.impl;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.sling.replication.agent.ReplicationAgent;
+import org.apache.sling.replication.queue.ReplicationQueue;
+import org.apache.sling.replication.queue.ReplicationQueueException;
+import org.apache.sling.replication.queue.ReplicationQueueProvider;
+import org.apache.sling.replication.serialization.ReplicationPackage;
+
+/**
+ * abstract base implementation of a {@link ReplicationQueueProvider}
+ */
+public abstract class AbstractReplicationQueueProvider implements ReplicationQueueProvider {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private Map<String, ReplicationQueue> queueMap = new HashMap<String, ReplicationQueue>();
+
+    public ReplicationQueue getOrCreateQueue(ReplicationAgent agent,
+                    ReplicationPackage replicationPackage) throws ReplicationQueueException {
+        return getOrCreateQueue(agent, replicationPackage.getAction());
+    }
+
+    public ReplicationQueue getOrCreateQueue(ReplicationAgent agent, String queueName)
+                    throws ReplicationQueueException {
+        String key = new StringBuilder(agent.getName()).append(queueName).toString();
+        if (log.isInfoEnabled()) {
+            log.info("creating a queue with key {}", key);
+        }
+        ReplicationQueue queue = queueMap.get(key);
+        if (queue == null) {
+            queue = createQueue(agent, queueName);
+            queueMap.put(key, queue);
+            if (log.isInfoEnabled()) {
+                log.info("queue created {}", queue);
+            }
+        }
+        return queue;
+    }
+
+    protected abstract ReplicationQueue createQueue(ReplicationAgent agent, String selector) throws ReplicationQueueException;
+
+    public ReplicationQueue getOrCreateDefaultQueue(ReplicationAgent agent)
+                    throws ReplicationQueueException {
+        return getOrCreateQueue(agent, "");
+    }
+
+    public Collection<ReplicationQueue> getAllQueues() {
+        return queueMap.values();
+    }
+
+    public void removeQueue(ReplicationQueue queue) throws ReplicationQueueException {
+        deleteQueue(queue);
+        // flush cache
+        if (queueMap.containsValue(queue)) {
+            if (!queueMap.values().remove(queue)) {
+                throw new ReplicationQueueException("could not remove the queue " + queue);
+            }
+        }
+    }
+
+    protected abstract void deleteQueue(ReplicationQueue queue) throws ReplicationQueueException;
+
+}

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java?rev=1547378&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/ErrorAwareQueueDistributionStrategy.java Tue Dec  3 13:19:50 2013
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.queue.impl;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.PropertyOption;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.sling.replication.agent.ReplicationAgent;
+import org.apache.sling.replication.queue.ReplicationQueue;
+import org.apache.sling.replication.queue.ReplicationQueueDistributionStrategy;
+import org.apache.sling.replication.queue.ReplicationQueueException;
+import org.apache.sling.replication.queue.ReplicationQueueItemState;
+import org.apache.sling.replication.queue.ReplicationQueueItemState.ItemState;
+import org.apache.sling.replication.queue.ReplicationQueueProvider;
+import org.apache.sling.replication.serialization.ReplicationPackage;
+
+/**
+ * The error strategy for delivering packages to queues. Each agent manages a single queue for
+ * delivering packages with an error queue which can be used when an item is stuck in the default
+ * queue for too much time, then the stuck item is moved to the error queue or dropped.
+ */
+@Component(immediate = true, metatype = true)
+@Service(value = ReplicationQueueDistributionStrategy.class)
+@Property(name = "name", value = ErrorAwareQueueDistributionStrategy.NAME, propertyPrivate = true)
+public class ErrorAwareQueueDistributionStrategy implements ReplicationQueueDistributionStrategy {
+
+    private static final String ERROR = "ERROR";
+
+    public static final String NAME = "error";
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Property
+    private static final String ATTEMPTS_THRESHOLD = "attempts.threshold";
+
+    @Property(name = "Stuck Queue Handling", options = {
+            @PropertyOption(name = ERROR, value = "Error"),
+            @PropertyOption(name = "DROP", value = "Drop") })
+    private static final String STUCK_HANDLING = "stuck.handling";
+
+    private String stuckQueueHandling;
+
+    private Integer attemptsThreshold;
+
+    protected void activate(final ComponentContext ctx) {
+        stuckQueueHandling = PropertiesUtil
+                        .toString(ctx.getProperties().get(STUCK_HANDLING), ERROR);
+        attemptsThreshold = PropertiesUtil.toInteger(ctx.getProperties().get(ATTEMPTS_THRESHOLD),
+                        100);
+    }
+
+    public ReplicationQueueItemState add(ReplicationPackage replicationPackage,
+                    ReplicationAgent agent, ReplicationQueueProvider queueProvider)
+                    throws ReplicationQueueException {
+        try {
+            if (log.isInfoEnabled()) {
+                log.info("using single queue distribution");
+            }
+            ReplicationQueueItemState state = new ReplicationQueueItemState();
+            ReplicationQueue queue = queueProvider.getOrCreateDefaultQueue(agent);
+            if (log.isInfoEnabled()) {
+                log.info("obtained queue {}", queue);
+            }
+            if (queue != null) {
+                if (queue.add(replicationPackage)) {
+                    if (log.isInfoEnabled()) {
+                        log.info("replication status: {}", state);
+                    }
+                    state = queue.getStatus(replicationPackage);
+                } else {
+                    if (log.isErrorEnabled()) {
+                        log.error("could not add the item to the queue {}", queue);
+                    }
+                    state.setItemState(ItemState.ERROR);
+                    state.setSuccessfull(false);
+                }
+                return state;
+            } else {
+                throw new ReplicationQueueException("could not get a queue for agent "
+                                + agent.getName());
+            }
+        } finally {
+            checkAndRemoveStuckItems(agent, queueProvider);
+        }
+    }
+
+    public void offer(ReplicationPackage replicationPackage, ReplicationAgent agent,
+                    ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
+        ReplicationQueue queue = queueProvider.getOrCreateDefaultQueue(agent);
+        if (queue != null) {
+            queue.add(replicationPackage);
+        } else {
+            throw new ReplicationQueueException("could not get a queue for agent "
+                            + agent.getName());
+        }
+        checkAndRemoveStuckItems(agent, queueProvider);
+    }
+
+    private void checkAndRemoveStuckItems(ReplicationAgent agent,
+                    ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
+        ReplicationQueue defaultQueue = queueProvider.getOrCreateDefaultQueue(agent);
+        // get first item in the queue with its status
+        ReplicationPackage firstItem = defaultQueue.getHead();
+        if (firstItem != null) {
+            ReplicationQueueItemState status = defaultQueue.getStatus(firstItem);
+            // if item is still in the queue after a max no. of attempts, move it to the error queue
+            int attempts = status.getAttempts();
+            if (log.isInfoEnabled()) {
+                log.info("attemps for item {}: {}", firstItem.getId(), attempts);
+            }
+            if (attempts > attemptsThreshold) {
+                if (ERROR.equals(stuckQueueHandling)) {
+                    if (log.isWarnEnabled()) {
+                        log.warn("item moved to the error queue");
+                    }
+                    ReplicationQueue errorQueue = queueProvider.getOrCreateQueue(agent, "-error");
+                    errorQueue.add(firstItem);
+                }
+                if (log.isWarnEnabled()) {
+                    log.warn("item dropped from the default queue");
+                }
+                defaultQueue.removeHead();
+            }
+        }
+    }
+
+}

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java?rev=1547378&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/PriorityPathDistributionStrategy.java Tue Dec  3 13:19:50 2013
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.queue.impl;
+
+import java.util.Arrays;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.sling.replication.agent.ReplicationAgent;
+import org.apache.sling.replication.queue.ReplicationQueue;
+import org.apache.sling.replication.queue.ReplicationQueueDistributionStrategy;
+import org.apache.sling.replication.queue.ReplicationQueueException;
+import org.apache.sling.replication.queue.ReplicationQueueItemState;
+import org.apache.sling.replication.queue.ReplicationQueueItemState.ItemState;
+import org.apache.sling.replication.queue.ReplicationQueueProvider;
+import org.apache.sling.replication.serialization.ReplicationPackage;
+
+/**
+ * Distribution algorithm which keeps one specific queue to handle specific paths and another queue
+ * for handling all the other paths
+ */
+@Component(immediate = true, metatype = true)
+@Service(value = ReplicationQueueDistributionStrategy.class)
+@Property(name = "name", value = PriorityPathDistributionStrategy.NAME, propertyPrivate = true)
+public class PriorityPathDistributionStrategy implements ReplicationQueueDistributionStrategy {
+
+    public static final String NAME = "priority";
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Property(value = { "/content" })
+    private static final String PRIORITYPATHS = "priority.paths";
+
+    private String[] priorityPaths;
+
+    @Activate
+    protected void activate(ComponentContext context) throws Exception {
+        priorityPaths = PropertiesUtil.toStringArray(context.getProperties().get(PRIORITYPATHS));
+    }
+
+    public ReplicationQueueItemState add(ReplicationPackage replicationPackage,
+                    ReplicationAgent agent, ReplicationQueueProvider queueProvider)
+                    throws ReplicationQueueException {
+        if (log.isInfoEnabled()) {
+            log.info("using path priority based queue distribution");
+        }
+        ReplicationQueueItemState state = new ReplicationQueueItemState();
+
+        ReplicationQueue queue = getQueue(replicationPackage, agent, queueProvider);
+        if (log.isInfoEnabled()) {
+            log.info("obtained queue {}", queue);
+        }
+
+        if (queue != null) {
+            if (queue.add(replicationPackage)) {
+                if (log.isInfoEnabled()) {
+                    log.info("replication status: {}", state);
+                }
+                state = queue.getStatus(replicationPackage);
+            } else {
+                if (log.isErrorEnabled()) {
+                    log.error("could not add the item to the queue {}", queue);
+                }
+                state.setItemState(ItemState.ERROR);
+                state.setSuccessfull(false);
+            }
+            return state;
+        } else {
+            throw new ReplicationQueueException("could not get a queue for agent "
+                            + agent.getName());
+        }
+
+    }
+
+    private ReplicationQueue getQueue(ReplicationPackage replicationPackage,
+                    ReplicationAgent agent, ReplicationQueueProvider queueProvider)
+                    throws ReplicationQueueException {
+        String[] paths = replicationPackage.getPaths();
+
+        if (log.isInfoEnabled()) {
+            log.info("calculating priority for paths {}", Arrays.toString(paths));
+        }
+
+        boolean usePriorityQueue = false;
+        String pp = null;
+        for (String path : paths) {
+            for (String priorityPath : priorityPaths) {
+                if (path.startsWith(priorityPath)) {
+                    usePriorityQueue = true;
+                    pp = priorityPath;
+                    break;
+                }
+            }
+        }
+
+        ReplicationQueue queue;
+        if (usePriorityQueue) {
+            if (log.isInfoEnabled()) {
+                log.info("using priority queue for path {}", pp);
+            }
+            queue = queueProvider.getOrCreateQueue(agent, pp);
+        } else {
+            if (log.isInfoEnabled()) {
+                log.info("using default queue");
+            }
+            queue = queueProvider.getOrCreateDefaultQueue(agent);
+        }
+        return queue;
+    }
+
+    public void offer(ReplicationPackage replicationPackage, ReplicationAgent agent,
+                    ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
+        ReplicationQueue queue = getQueue(replicationPackage, agent, queueProvider);
+        if (queue != null) {
+            queue.add(replicationPackage);
+        } else {
+            throw new ReplicationQueueException("could not get a queue for agent "
+                            + agent.getName());
+        }
+
+    }
+
+}

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java?rev=1547378&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/SingleQueueDistributionStrategy.java Tue Dec  3 13:19:50 2013
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.queue.impl;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Service;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.sling.replication.agent.ReplicationAgent;
+import org.apache.sling.replication.queue.ReplicationQueue;
+import org.apache.sling.replication.queue.ReplicationQueueDistributionStrategy;
+import org.apache.sling.replication.queue.ReplicationQueueException;
+import org.apache.sling.replication.queue.ReplicationQueueItemState;
+import org.apache.sling.replication.queue.ReplicationQueueItemState.ItemState;
+import org.apache.sling.replication.queue.ReplicationQueueProvider;
+import org.apache.sling.replication.serialization.ReplicationPackage;
+
+/**
+ * The default strategy for delivering packages to queues. Each agent just manages a single queue,
+ * no failure / stuck handling where each pacakge is put regardless of anything.
+ */
+@Component(immediate = true)
+@Service(value = ReplicationQueueDistributionStrategy.class)
+@Property(name = "name", value = SingleQueueDistributionStrategy.NAME)
+public class SingleQueueDistributionStrategy implements ReplicationQueueDistributionStrategy {
+
+    public static final String NAME = "single";
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    public ReplicationQueueItemState add(ReplicationPackage replicationPackage,
+                    ReplicationAgent agent, ReplicationQueueProvider queueProvider)
+                    throws ReplicationQueueException {
+        if (log.isInfoEnabled()) {
+            log.info("using single queue distribution");
+        }
+        ReplicationQueueItemState state = new ReplicationQueueItemState();
+        ReplicationQueue queue = queueProvider.getOrCreateDefaultQueue(agent);
+        if (log.isInfoEnabled()) {
+            log.info("obtained queue {}", queue);
+        }
+        if (queue != null) {
+            if (queue.add(replicationPackage)) {
+                state = queue.getStatus(replicationPackage);
+                if (log.isInfoEnabled()) {
+                    log.info("replication status: {}", state);
+                }
+            } else {
+                if (log.isErrorEnabled()) {
+                    log.error("could not add the item to the queue {}", queue);
+                }
+                state.setItemState(ItemState.ERROR);
+                state.setSuccessfull(false);
+            }
+            return state;
+        } else {
+            throw new ReplicationQueueException("could not get a queue for agent "
+                            + agent.getName());
+        }
+
+    }
+
+    public void offer(ReplicationPackage replicationPackage, ReplicationAgent agent,
+                    ReplicationQueueProvider queueProvider) throws ReplicationQueueException {
+        ReplicationQueue queue = queueProvider.getOrCreateDefaultQueue(agent);
+        if (queue != null) {
+            queue.add(replicationPackage);
+        } else {
+            throw new ReplicationQueueException("could not get a queue for agent "
+                            + agent.getName());
+        }
+
+    }
+
+}

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java?rev=1547378&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueue.java Tue Dec  3 13:19:50 2013
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.queue.impl.jobhandling;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.JobManager.QueryType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.sling.replication.agent.ReplicationAgent;
+import org.apache.sling.replication.queue.ReplicationQueue;
+import org.apache.sling.replication.queue.ReplicationQueueException;
+import org.apache.sling.replication.queue.ReplicationQueueItemState;
+import org.apache.sling.replication.queue.ReplicationQueueItemState.ItemState;
+import org.apache.sling.replication.serialization.ReplicationPackage;
+
+/**
+ * a {@link ReplicationQueue} based on Sling Job Handling facilities
+ */
+public class JobHandlingReplicationQueue implements ReplicationQueue {
+
+    public final static String REPLICATION_QUEUE_TOPIC = "org/apache/sling/replication/queue";
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private String name;
+
+    private String topic;
+
+    private JobManager jobManager;
+
+    protected JobHandlingReplicationQueue(String name, String topic, JobManager jobManager) {
+        this.name = name;
+        this.topic = topic;
+        this.jobManager = jobManager;
+    }
+
+    public boolean add(ReplicationPackage replicationPackage) {
+        boolean result = true;
+        try {
+            Map<String, Object> properties = JobHandlingUtils
+                            .createFullPropertiesFromPackage(replicationPackage);
+
+            Job job = jobManager.createJob(topic).properties(properties).add();
+            if (log.isInfoEnabled()) {
+                log.info("job added {}", job);
+            }
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
+                log.error("could not add an item to the queue", e);
+            }
+            result = false;
+        }
+        return result;
+    }
+
+    public ReplicationQueueItemState getStatus(ReplicationPackage replicationPackage)
+                    throws ReplicationQueueException {
+        ReplicationQueueItemState itemStatus = new ReplicationQueueItemState();
+        try {
+            Map<String, Object> properties = JobHandlingUtils
+                            .createIdPropertiesFromPackage(replicationPackage);
+            Job job = jobManager.getJob(topic, properties);
+            if (job != null) {
+                itemStatus.setAttempts(job.getRetryCount());
+                itemStatus.setItemState(ItemState.valueOf(job.getJobState().toString()));
+                if (log.isInfoEnabled()) {
+                    log.info("status of job {} is {}", job.getId(), job.getJobState());
+                }
+            } else {
+                itemStatus.setItemState(ItemState.DROPPED);
+            }
+        } catch (Exception e) {
+            throw new ReplicationQueueException("unable to retrieve the queue status", e);
+        }
+        return itemStatus;
+    }
+
+    public ReplicationAgent getAgent() {
+        throw new RuntimeException("not implemented");
+    }
+
+    public ReplicationPackage getHead() {
+        Job firstItem = getFirstItem();
+        if (firstItem != null) {
+            return JobHandlingUtils.getPackage(firstItem);
+        } else {
+            return null;
+        }
+    }
+
+    public void removeHead() {
+        Job firstItem = getFirstItem();
+        if (firstItem != null) {
+            jobManager.removeJobById(firstItem.getId());
+        } else {
+
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private Job getFirstItem() {
+        if (log.isInfoEnabled()) {
+            log.info("getting first item in the queue");
+        }
+
+        HashMap<String, Object> props = new HashMap<String, Object>();
+        Collection<Job> jobs = jobManager.findJobs(QueryType.QUEUED, topic, -1, props);
+        jobs.addAll(jobManager.findJobs(QueryType.ACTIVE, topic, -1, props));
+        if (jobs.size() > 0) {
+            ArrayList<Job> list = new ArrayList<Job>(jobs);
+            Collections.sort(list, new Comparator<Job>() {
+
+                public int compare(Job o1, Job o2) {
+                    return o2.getRetryCount() - o1.getRetryCount();
+                }
+            });
+            Job firstItem = list.get(0);
+            if (log.isInfoEnabled()) {
+                log.info("first item in the queue is {} retried {} times", firstItem.getId(),
+                                firstItem.getRetryCount());
+            }
+            return firstItem;
+        }
+        return null;
+    }
+
+    public boolean isEmpty() {
+        return jobManager.getQueue(name).getStatistics().getNumberOfJobs() == 0;
+    }
+
+}

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java?rev=1547378&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingReplicationQueueProvider.java Tue Dec  3 13:19:50 2013
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.queue.impl.jobhandling;
+
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.Queue;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.osgi.service.cm.Configuration;
+import org.osgi.service.cm.ConfigurationAdmin;
+
+import org.apache.sling.replication.agent.ReplicationAgent;
+import org.apache.sling.replication.queue.ReplicationQueue;
+import org.apache.sling.replication.queue.ReplicationQueueException;
+import org.apache.sling.replication.queue.ReplicationQueueProvider;
+import org.apache.sling.replication.queue.impl.AbstractReplicationQueueProvider;
+
+@Component(metatype = false)
+@Service(value = ReplicationQueueProvider.class)
+@Property(name = "name", value = JobHandlingReplicationQueueProvider.NAME)
+public class JobHandlingReplicationQueueProvider extends AbstractReplicationQueueProvider
+                implements ReplicationQueueProvider {
+
+    public static final String NAME = "sjh";
+
+    @Reference
+    private JobManager jobManager;
+
+    @Reference
+    private ConfigurationAdmin configAdmin;
+
+    @Override
+    protected ReplicationQueue createQueue(ReplicationAgent agent, String queueName)
+                    throws ReplicationQueueException {
+        try {
+            String name = new StringBuilder(agent.getName()).append(queueName).toString();
+            String topic = new StringBuilder(
+                            JobHandlingReplicationQueue.REPLICATION_QUEUE_TOPIC).append('/')
+                            .append(agent.getName()).append(queueName).toString();
+            if (jobManager.getQueue(name) == null) {
+                Configuration config = configAdmin.createFactoryConfiguration(
+                                QueueConfiguration.class.getName(), null);
+                Dictionary<String, Object> props = new Hashtable<String, Object>();
+                props.put(ConfigurationConstants.PROP_NAME, name);
+                props.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.ORDERED.name());
+                props.put(ConfigurationConstants.PROP_TOPICS, new String[] { topic });
+                props.put(ConfigurationConstants.PROP_RETRIES, -1);
+                props.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L);
+                props.put(ConfigurationConstants.PROP_KEEP_JOBS, true);
+                props.put(ConfigurationConstants.PROP_PRIORITY, "MAX");
+                config.update(props);
+            }
+            return new JobHandlingReplicationQueue(name, topic, jobManager);
+        } catch (IOException e) {
+            throw new ReplicationQueueException("could not create a queue", e);
+        }
+    }
+
+    @Override
+    protected void deleteQueue(ReplicationQueue queue) throws ReplicationQueueException {
+        Queue q = jobManager.getQueue(queue.getAgent().getName());
+        q.removeAll(); // need to check if this is correct
+    }
+
+}

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java?rev=1547378&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/jobhandling/JobHandlingUtils.java Tue Dec  3 13:19:50 2013
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.queue.impl.jobhandling;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.ScheduledJobInfo;
+import org.apache.sling.replication.serialization.ReplicationPackage;
+import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JobHandlingUtils {
+
+    private static final Logger log = LoggerFactory.getLogger(JobHandlingUtils.class.getName());
+
+    private static final String PATHS = "replication.package.paths";
+
+    private static final String ID = "replication.package.id";
+
+    private static final String LENGTH = "replication.package.length";
+
+//    private static final String BIN = "replication.package.stream";
+
+    private static final String TYPE = "replication.package.type";
+
+    protected static final String ACTION = "replication.package.action";
+
+    public static ReplicationPackage getPackage(ScheduledJobInfo info, JobManager jobManager,
+                    String topic) {
+        Job job = getJob(info, jobManager, topic);
+        return JobHandlingUtils.getPackage(job);
+    }
+
+    public static Job getJob(ScheduledJobInfo info, JobManager jobManager, String topic) {
+        String id = String.valueOf(info.getJobProperties().get(ID));
+        Map<String, Object> jobProps = JobHandlingUtils.createIdPropertiesFromId(id);
+        Job job = jobManager.getJob(topic, jobProps);
+        return job;
+    }
+
+    public static ReplicationPackage getPackage(ReplicationPackageBuilder packageBuilder,
+                    final Job job) {
+        ReplicationPackage pkg = null;
+        String id = String.valueOf(job.getProperty(ID));
+        try {
+            pkg = packageBuilder.getPackage(id);
+            if (pkg != null) {
+                if (log.isInfoEnabled()) {
+                    log.info("successfully retrieved a package with id {}", id);
+                }
+            }
+        } catch (Exception e) {
+            if (log.isWarnEnabled()) {
+                log.warn("failed retrieving a package with id {}", id);
+            }
+        }
+        if (pkg == null) {
+            try {
+                pkg = getPackage(job);
+                if (pkg != null) {
+                    if (log.isInfoEnabled()) {
+                        log.info("successfully deserialized a package from job {}", job);
+                    }
+                }
+            } catch (Exception e) {
+                if (log.isWarnEnabled()) {
+                    log.warn("failed deserializing package from job {}", job, e);
+                }
+            }
+        }
+        if (pkg == null) {
+            if (log.isErrorEnabled()) {
+                log.error("could not find a package from job {}", job);
+            }
+        }
+        return pkg;
+    }
+
+    public static ReplicationPackage getPackage(final Job job) {
+        return new ReplicationPackage() {
+
+            private static final long serialVersionUID = 1L;
+
+            public String[] getPaths() {
+                return (String[]) job.getProperty(PATHS);
+            }
+
+            public long getLength() {
+                return (Long) job.getProperty(LENGTH);
+            }
+
+            public InputStream getInputStream() throws IOException {
+                // TODO : eventually re-enable it once SLING-3140 gets released
+                // return new ByteArrayInputStream((byte[]) job.getProperty(BIN));
+                return null;
+            }
+
+            public String getId() {
+                return String.valueOf(job.getProperty(ID));
+            }
+
+            public String getType() {
+                return String.valueOf(job.getProperty(TYPE));
+            }
+
+            public String getAction() {
+                return String.valueOf(job.getProperty(ACTION));
+            }
+        };
+
+    }
+
+    public static Map<String, Object> createFullPropertiesFromPackage(
+                    ReplicationPackage replicationPackage) throws IOException {
+        Map<String, Object> properties = new HashMap<String, Object>();
+        properties.put(ID, replicationPackage.getId());
+        properties.put(PATHS, replicationPackage.getPaths());
+        properties.put(LENGTH, replicationPackage.getLength());
+        properties.put(ACTION, replicationPackage.getAction());
+//        properties.put(BIN, IOUtils.toByteArray(replicationPackage.getInputStream()));
+        properties.put(TYPE, replicationPackage.getType());
+        return properties;
+    }
+
+    public static Map<String, Object> createIdPropertiesFromPackage(
+                    ReplicationPackage replicationPackage) {
+        Map<String, Object> properties = new HashMap<String, Object>();
+        properties.put(ID, replicationPackage.getId());
+        return properties;
+    }
+
+    public static Map<String, Object> createIdPropertiesFromId(String id) {
+        Map<String, Object> properties = new HashMap<String, Object>();
+        properties.put(ID, id);
+        return properties;
+    }
+
+}

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessor.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessor.java?rev=1547378&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessor.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/ScheduledReplicationQueueProcessor.java Tue Dec  3 13:19:50 2013
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.queue.impl.simple;
+
+import java.util.Arrays;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferencePolicy;
+import org.apache.felix.scr.annotations.Service;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.sling.replication.agent.AgentReplicationException;
+import org.apache.sling.replication.queue.ReplicationQueue;
+import org.apache.sling.replication.queue.ReplicationQueueException;
+import org.apache.sling.replication.queue.ReplicationQueueProvider;
+import org.apache.sling.replication.serialization.ReplicationPackage;
+
+/**
+ * a simple scheduled {@link SimpleReplicationQueue}s processor
+ */
+@Component(
+    label = "In memory Replication Queues processor", 
+    description = "Service that trigger processing of elements in memory resident replication queues.",
+    metatype = true)
+@Service(value = Runnable.class)
+@Properties({
+        @Property(name = "scheduler.period", longValue = 10, label = "Frequency", description = "Processing frequency in seconds"),
+        @Property(name = "scheduler.concurrent", boolValue = false, propertyPrivate = true) })
+public class ScheduledReplicationQueueProcessor implements Runnable {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Reference(name = "ReplicationQueueProvider", target = "(name="
+                    + SimpleReplicationQueueProvider.NAME + ")", policy = ReferencePolicy.DYNAMIC)
+    private ReplicationQueueProvider replicationQueueProvider;
+
+    public void run() {
+        try {
+            for (ReplicationQueue queue : replicationQueueProvider.getAllQueues()) {
+                while (!queue.isEmpty()) {
+                    // synchronized (queue) {
+                    ReplicationPackage item = queue.getHead();
+                    if (item != null) {
+                        try {
+                            if (queue.getAgent().process(item)) {
+                                queue.removeHead();
+                            } else {
+                                if (log.isWarnEnabled()) {
+                                    log.warn("processing of item {} failed",
+                                                    Arrays.toString(item.getPaths()));
+                                }
+                            }
+                        } catch (AgentReplicationException e) {
+                            if (log.isErrorEnabled()) {
+                                log.error("an error happened while processing an item {}",
+                                                Arrays.toString(item.getPaths()));
+                            }
+                        }
+                    }
+                    // }
+                }
+            }
+        } catch (ReplicationQueueException e) {
+            if (log.isErrorEnabled()) {
+                log.error("error while processing queue {}", e);
+            }
+        }
+
+    }
+}

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java?rev=1547378&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueue.java Tue Dec  3 13:19:50 2013
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.queue.impl.simple;
+
+import java.util.Map;
+import java.util.WeakHashMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.sling.replication.agent.ReplicationAgent;
+import org.apache.sling.replication.queue.ReplicationQueue;
+import org.apache.sling.replication.queue.ReplicationQueueItemState;
+import org.apache.sling.replication.queue.ReplicationQueueItemState.ItemState;
+import org.apache.sling.replication.serialization.ReplicationPackage;
+
+/**
+ * A simple implementation of a {@link ReplicationQueue}.
+ * 
+ * Note that, at the moment, this is a transient in memory queue not persisted on the repository and
+ * therefore not usable for production.
+ */
+public class SimpleReplicationQueue implements ReplicationQueue {
+
+    private Logger log = LoggerFactory.getLogger(getClass());
+
+    private ReplicationAgent agent;
+
+    private BlockingQueue<ReplicationPackage> queue;
+
+    private Map<ReplicationPackage, ReplicationQueueItemState> statusMap;
+
+    public SimpleReplicationQueue(ReplicationAgent agent) {
+        if (log.isInfoEnabled()) {
+            log.info("starting a simple queue for agent {}", agent.getName());
+        }
+        this.agent = agent;
+        this.queue = new LinkedBlockingQueue<ReplicationPackage>();
+        this.statusMap = new WeakHashMap<ReplicationPackage, ReplicationQueueItemState>(10);
+    }
+
+    public boolean add(ReplicationPackage replicationPackage) {
+        ReplicationQueueItemState status = new ReplicationQueueItemState();
+        boolean result = false;
+        try {
+            result = queue.offer(replicationPackage, 10, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            log.error("cannot add an item to the queue", e);
+            status.setSuccessfull(false);
+        } finally {
+            statusMap.put(replicationPackage, status);
+        }
+        return result;
+    }
+
+    public ReplicationQueueItemState getStatus(ReplicationPackage replicationPackage) {
+        ReplicationQueueItemState status = statusMap.get(replicationPackage);
+        if (queue.contains(replicationPackage)) {
+            status.setItemState(ItemState.QUEUED);
+        } else {
+            status.setItemState(ItemState.SUCCEEDED);
+        }
+        return status;
+    }
+
+    public ReplicationAgent getAgent() {
+        return agent;
+    }
+
+    public ReplicationPackage getHead() {
+        ReplicationPackage element = queue.peek();
+        if (element != null) {
+            ReplicationQueueItemState replicationQueueItemStatus = statusMap.get(element);
+            replicationQueueItemStatus.setAttempts(replicationQueueItemStatus.getAttempts() + 1);
+        }
+        return element;
+    }
+
+    public void removeHead() {
+        ReplicationPackage element = queue.remove();
+        statusMap.get(element).setSuccessfull(true);
+    }
+
+    public boolean isEmpty() {
+        return queue.isEmpty();
+    }
+
+}

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java?rev=1547378&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java (added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/queue/impl/simple/SimpleReplicationQueueProvider.java Tue Dec  3 13:19:50 2013
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.queue.impl.simple;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Service;
+
+import org.apache.sling.replication.agent.ReplicationAgent;
+import org.apache.sling.replication.queue.ReplicationQueue;
+import org.apache.sling.replication.queue.ReplicationQueueException;
+import org.apache.sling.replication.queue.ReplicationQueueProvider;
+import org.apache.sling.replication.queue.impl.AbstractReplicationQueueProvider;
+
+/**
+ * an OSGi service implementing {@link ReplicationQueueProvider} for simple in memory
+ * {@link ReplicationQueue}s
+ */
+@Component(metatype = false)
+@Service(value = ReplicationQueueProvider.class)
+@Property(name = "name", value = SimpleReplicationQueueProvider.NAME)
+public class SimpleReplicationQueueProvider extends AbstractReplicationQueueProvider implements
+                ReplicationQueueProvider {
+
+    public static final String NAME = "simple";
+
+    protected ReplicationQueue createQueue(ReplicationAgent agent, String selector)
+                    throws ReplicationQueueException {
+        return new SimpleReplicationQueue(agent);
+    }
+
+    protected void deleteQueue(ReplicationQueue queue) throws ReplicationQueueException {
+        // do nothing as queues just exist in the cache
+    }
+
+}



Mime
View raw message