incubator-sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bdelacre...@apache.org
Subject svn commit: r1552575 - in /sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization: ReplicationPackageImporter.java impl/DefaultReplicationPackageImporter.java
Date Fri, 20 Dec 2013 10:00:53 GMT
Author: bdelacretaz
Date: Fri Dec 20 10:00:52 2013
New Revision: 1552575

URL: http://svn.apache.org/r1552575
Log:
SLING-3281 - forgot two files on previous commit

Added:
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java
    sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageImporter.java

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java?rev=1552575&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java
(added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java
Fri Dec 20 10:00:52 2013
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.serialization;
+
+import java.io.InputStream;
+
+/**
+ * A {@link org.apache.sling.replication.serialization.ReplicationPackage} importer
+ */
+public interface ReplicationPackageImporter {
+
+    /**
+     * Synchronously import the stream of a {@link org.apache.sling.replication.serialization.ReplicationPackage}
+     *
+     * @param stream the <code>InputStream</code> of the given <code>ReplicationPackage</code>
+     * @param type   the <code>String</code> representing the ({@link ReplicationPackage#getType()
type} of the given package
+     * @return <code>true</code> if successfully imported, <code>false</code>
otherwise
+     */
+    boolean importStream(InputStream stream, String type);
+
+    /**
+     * Asynchronously schedules the import of the {@link org.apache.sling.replication.serialization.ReplicationPackage}
+     *
+     * @param stream the <code>InputStream</code> of the given <code>ReplicationPackage</code>
+     * @param type   the <code>String</code> representing the ({@link ReplicationPackage#getType()
type} of the given package
+     */
+    void scheduleImport(InputStream stream, String type) throws Exception;
+}

Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageImporter.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageImporter.java?rev=1552575&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageImporter.java
(added)
+++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageImporter.java
Fri Dec 20 10:00:52 2013
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.replication.serialization.impl;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+import org.apache.commons.io.IOUtils;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
+import org.apache.sling.replication.event.ReplicationEventFactory;
+import org.apache.sling.replication.event.ReplicationEventType;
+import org.apache.sling.replication.queue.ReplicationQueueException;
+import org.apache.sling.replication.serialization.ReplicationPackage;
+import org.apache.sling.replication.serialization.ReplicationPackageBuilder;
+import org.apache.sling.replication.serialization.ReplicationPackageBuilderProvider;
+import org.apache.sling.replication.serialization.ReplicationPackageImporter;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.cm.Configuration;
+import org.osgi.service.cm.ConfigurationAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Default implementation of {@link org.apache.sling.replication.serialization.ReplicationPackageImporter}
+ */
+@Component
+@Service(value = ReplicationPackageImporter.class)
+public class DefaultReplicationPackageImporter implements ReplicationPackageImporter {
+
+    private static final String QUEUE_NAME = "replication-package-import";
+    private static final String QUEUE_TOPIC = "org/apache/sling/replication/import";
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Reference
+    private ReplicationPackageBuilderProvider replicationPackageBuilderProvider;
+
+    @Reference
+    private ReplicationEventFactory replicationEventFactory;
+
+    @Reference
+    private JobManager jobManager;
+
+    @Reference
+    private ConfigurationAdmin configAdmin;
+
+    private ServiceRegistration jobReg;
+
+
+    @Activate
+    protected void activate(BundleContext context) throws Exception {
+        try {
+            if (jobManager.getQueue(QUEUE_NAME) == null) {
+                Configuration config = configAdmin.createFactoryConfiguration(
+                        QueueConfiguration.class.getName(), null);
+                Dictionary<String, Object> props = new Hashtable<String, Object>();
+                props.put(ConfigurationConstants.PROP_NAME, QUEUE_NAME);
+                props.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.ORDERED.name());
+                props.put(ConfigurationConstants.PROP_TOPICS, new String[]{QUEUE_TOPIC, QUEUE_TOPIC
+ "/*"});
+                props.put(ConfigurationConstants.PROP_RETRIES, -1);
+                props.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L);
+                props.put(ConfigurationConstants.PROP_KEEP_JOBS, true);
+                props.put(ConfigurationConstants.PROP_PRIORITY, "MAX");
+                config.update(props);
+            }
+        } catch (IOException e) {
+            throw new ReplicationQueueException("could not create an import queue", e);
+        }
+
+        Dictionary<String, Object> jobProps = new Hashtable<String, Object>();
+        jobProps.put(JobConsumer.PROPERTY_TOPICS, new String[]{QUEUE_TOPIC});
+        jobReg = context.registerService(JobConsumer.class.getName(), new ReplicationPackageImporterJobConsumer(),
jobProps);
+    }
+
+    @Deactivate
+    public void deactivate() {
+        if (jobReg != null) {
+            jobReg.unregister();
+        }
+    }
+
+    public boolean importStream(InputStream stream, String type) {
+        boolean success = false;
+        try {
+            ReplicationPackage replicationPackage = null;
+            if (type != null) {
+                ReplicationPackageBuilder replicationPackageBuilder = replicationPackageBuilderProvider.getReplicationPackageBuilder(type);
+                if (replicationPackageBuilder != null) {
+                    replicationPackage = replicationPackageBuilder.readPackage(stream, true);
+                } else {
+                    if (log.isWarnEnabled()) {
+                        log.warn("cannot read streams of type {}", type);
+                    }
+                }
+            } else {
+                BufferedInputStream bufferedInputStream = new BufferedInputStream(stream);
// needed to allow for multiple reads
+                for (ReplicationPackageBuilder replicationPackageBuilder : replicationPackageBuilderProvider.getAvailableReplicationPackageBuilders())
{
+                    try {
+                        replicationPackage = replicationPackageBuilder.readPackage(bufferedInputStream,
true);
+                    } catch (Exception e) {
+                        if (log.isWarnEnabled()) {
+                            log.warn("received stream cannot be read with {}", replicationPackageBuilder);
+                        }
+                    }
+                }
+            }
+
+            if (replicationPackage != null) {
+                if (log.isInfoEnabled()) {
+                    log.info("replication package read and installed for path(s) {}",
+                            Arrays.toString(replicationPackage.getPaths()));
+                }
+
+                Dictionary<String, Object> dictionary = new Hashtable<String, Object>();
+                dictionary.put("replication.action", replicationPackage.getAction());
+                dictionary.put("replication.path", replicationPackage.getPaths());
+                replicationEventFactory.generateEvent(ReplicationEventType.PACKAGE_INSTALLED,
dictionary);
+                success = true;
+            } else {
+                if (log.isWarnEnabled()) {
+                    log.warn("could not read a replication package");
+                }
+            }
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
+                log.error("cannot import a package from the given stream of type {}", type);
+            }
+        }
+        return success;
+    }
+
+    public void scheduleImport(InputStream stream, String type) throws Exception {
+        try {
+            Map<String, Object> properties = new HashMap<String, Object>();
+            properties.put("bin", IOUtils.toString(stream)); // TODO : compress / encode
the stream
+            properties.put("type", type);
+            Job job = jobManager.createJob(QUEUE_TOPIC).properties(properties).add();
+            if (log.isInfoEnabled()) {
+                log.info("job added {}", job);
+            }
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
+                log.error("could not add an item to the queue");
+            }
+            throw e;
+        }
+    }
+
+    private class ReplicationPackageImporterJobConsumer implements JobConsumer {
+
+        public JobResult process(Job job) {
+            try {
+                InputStream stream = IOUtils.toInputStream(String.valueOf(job.getProperty("bin")));
// TODO : decompress / decode the stream string
+                boolean result = importStream(stream, String.valueOf(job.getProperty("type")));
+                return result ? JobResult.OK : JobResult.FAILED;
+            } catch (Exception e) {
+                if (log.isErrorEnabled()) {
+                    log.error("could not process import job correctly", e);
+                }
+                return JobResult.FAILED;
+            }
+        }
+    }
+
+
+}



Mime
View raw message