Return-Path: X-Original-To: apmail-sling-commits-archive@www.apache.org Delivered-To: apmail-sling-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BB0EE10991 for ; Fri, 20 Dec 2013 10:01:19 +0000 (UTC) Received: (qmail 88754 invoked by uid 500); 20 Dec 2013 10:01:19 -0000 Delivered-To: apmail-sling-commits-archive@sling.apache.org Received: (qmail 88684 invoked by uid 500); 20 Dec 2013 10:01:18 -0000 Mailing-List: contact commits-help@sling.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@sling.apache.org Delivered-To: mailing list commits@sling.apache.org Received: (qmail 88677 invoked by uid 99); 20 Dec 2013 10:01:18 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Dec 2013 10:01:18 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Dec 2013 10:01:14 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 673EB238890B; Fri, 20 Dec 2013 10:00:53 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1552575 - in /sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization: ReplicationPackageImporter.java impl/DefaultReplicationPackageImporter.java Date: Fri, 20 Dec 2013 10:00:53 -0000 To: commits@sling.apache.org From: bdelacretaz@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20131220100053.673EB238890B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: bdelacretaz Date: Fri Dec 20 10:00:52 2013 New Revision: 1552575 URL: http://svn.apache.org/r1552575 Log: SLING-3281 - forgot two files on previous commit Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageImporter.java Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java?rev=1552575&view=auto ============================================================================== --- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java (added) +++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/ReplicationPackageImporter.java Fri Dec 20 10:00:52 2013 @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.replication.serialization; + +import java.io.InputStream; + +/** + * A {@link org.apache.sling.replication.serialization.ReplicationPackage} importer + */ +public interface ReplicationPackageImporter { + + /** + * Synchronously import the stream of a {@link org.apache.sling.replication.serialization.ReplicationPackage} + * + * @param stream the InputStream of the given ReplicationPackage + * @param type the String representing the ({@link ReplicationPackage#getType() type} of the given package + * @return true if successfully imported, false otherwise + */ + boolean importStream(InputStream stream, String type); + + /** + * Asynchronously schedules the import of the {@link org.apache.sling.replication.serialization.ReplicationPackage} + * + * @param stream the InputStream of the given ReplicationPackage + * @param type the String representing the ({@link ReplicationPackage#getType() type} of the given package + */ + void scheduleImport(InputStream stream, String type) throws Exception; +} Added: sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageImporter.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageImporter.java?rev=1552575&view=auto ============================================================================== --- sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageImporter.java (added) +++ sling/trunk/contrib/extensions/replication/src/main/java/org/apache/sling/replication/serialization/impl/DefaultReplicationPackageImporter.java Fri Dec 20 10:00:52 2013 @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.replication.serialization.impl; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.Dictionary; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.Map; +import org.apache.commons.io.IOUtils; +import org.apache.felix.scr.annotations.Activate; +import org.apache.felix.scr.annotations.Component; +import org.apache.felix.scr.annotations.Deactivate; +import org.apache.felix.scr.annotations.Reference; +import org.apache.felix.scr.annotations.Service; +import org.apache.sling.event.impl.jobs.config.ConfigurationConstants; +import org.apache.sling.event.jobs.Job; +import org.apache.sling.event.jobs.JobManager; +import org.apache.sling.event.jobs.QueueConfiguration; +import org.apache.sling.event.jobs.consumer.JobConsumer; +import org.apache.sling.replication.event.ReplicationEventFactory; +import org.apache.sling.replication.event.ReplicationEventType; +import org.apache.sling.replication.queue.ReplicationQueueException; +import org.apache.sling.replication.serialization.ReplicationPackage; +import org.apache.sling.replication.serialization.ReplicationPackageBuilder; +import org.apache.sling.replication.serialization.ReplicationPackageBuilderProvider; +import org.apache.sling.replication.serialization.ReplicationPackageImporter; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceRegistration; +import org.osgi.service.cm.Configuration; +import org.osgi.service.cm.ConfigurationAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Default implementation of {@link org.apache.sling.replication.serialization.ReplicationPackageImporter} + */ +@Component +@Service(value = ReplicationPackageImporter.class) +public class DefaultReplicationPackageImporter implements ReplicationPackageImporter { + + private static final String QUEUE_NAME = "replication-package-import"; + private static final String QUEUE_TOPIC = "org/apache/sling/replication/import"; + + private final Logger log = LoggerFactory.getLogger(getClass()); + + @Reference + private ReplicationPackageBuilderProvider replicationPackageBuilderProvider; + + @Reference + private ReplicationEventFactory replicationEventFactory; + + @Reference + private JobManager jobManager; + + @Reference + private ConfigurationAdmin configAdmin; + + private ServiceRegistration jobReg; + + + @Activate + protected void activate(BundleContext context) throws Exception { + try { + if (jobManager.getQueue(QUEUE_NAME) == null) { + Configuration config = configAdmin.createFactoryConfiguration( + QueueConfiguration.class.getName(), null); + Dictionary props = new Hashtable(); + props.put(ConfigurationConstants.PROP_NAME, QUEUE_NAME); + props.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.ORDERED.name()); + props.put(ConfigurationConstants.PROP_TOPICS, new String[]{QUEUE_TOPIC, QUEUE_TOPIC + "/*"}); + props.put(ConfigurationConstants.PROP_RETRIES, -1); + props.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L); + props.put(ConfigurationConstants.PROP_KEEP_JOBS, true); + props.put(ConfigurationConstants.PROP_PRIORITY, "MAX"); + config.update(props); + } + } catch (IOException e) { + throw new ReplicationQueueException("could not create an import queue", e); + } + + Dictionary jobProps = new Hashtable(); + jobProps.put(JobConsumer.PROPERTY_TOPICS, new String[]{QUEUE_TOPIC}); + jobReg = context.registerService(JobConsumer.class.getName(), new ReplicationPackageImporterJobConsumer(), jobProps); + } + + @Deactivate + public void deactivate() { + if (jobReg != null) { + jobReg.unregister(); + } + } + + public boolean importStream(InputStream stream, String type) { + boolean success = false; + try { + ReplicationPackage replicationPackage = null; + if (type != null) { + ReplicationPackageBuilder replicationPackageBuilder = replicationPackageBuilderProvider.getReplicationPackageBuilder(type); + if (replicationPackageBuilder != null) { + replicationPackage = replicationPackageBuilder.readPackage(stream, true); + } else { + if (log.isWarnEnabled()) { + log.warn("cannot read streams of type {}", type); + } + } + } else { + BufferedInputStream bufferedInputStream = new BufferedInputStream(stream); // needed to allow for multiple reads + for (ReplicationPackageBuilder replicationPackageBuilder : replicationPackageBuilderProvider.getAvailableReplicationPackageBuilders()) { + try { + replicationPackage = replicationPackageBuilder.readPackage(bufferedInputStream, true); + } catch (Exception e) { + if (log.isWarnEnabled()) { + log.warn("received stream cannot be read with {}", replicationPackageBuilder); + } + } + } + } + + if (replicationPackage != null) { + if (log.isInfoEnabled()) { + log.info("replication package read and installed for path(s) {}", + Arrays.toString(replicationPackage.getPaths())); + } + + Dictionary dictionary = new Hashtable(); + dictionary.put("replication.action", replicationPackage.getAction()); + dictionary.put("replication.path", replicationPackage.getPaths()); + replicationEventFactory.generateEvent(ReplicationEventType.PACKAGE_INSTALLED, dictionary); + success = true; + } else { + if (log.isWarnEnabled()) { + log.warn("could not read a replication package"); + } + } + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("cannot import a package from the given stream of type {}", type); + } + } + return success; + } + + public void scheduleImport(InputStream stream, String type) throws Exception { + try { + Map properties = new HashMap(); + properties.put("bin", IOUtils.toString(stream)); // TODO : compress / encode the stream + properties.put("type", type); + Job job = jobManager.createJob(QUEUE_TOPIC).properties(properties).add(); + if (log.isInfoEnabled()) { + log.info("job added {}", job); + } + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("could not add an item to the queue"); + } + throw e; + } + } + + private class ReplicationPackageImporterJobConsumer implements JobConsumer { + + public JobResult process(Job job) { + try { + InputStream stream = IOUtils.toInputStream(String.valueOf(job.getProperty("bin"))); // TODO : decompress / decode the stream string + boolean result = importStream(stream, String.valueOf(job.getProperty("type"))); + return result ? JobResult.OK : JobResult.FAILED; + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("could not process import job correctly", e); + } + return JobResult.FAILED; + } + } + } + + +}