Return-Path: Delivered-To: apmail-sling-commits-archive@www.apache.org Received: (qmail 65499 invoked from network); 20 Jul 2010 15:26:57 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 20 Jul 2010 15:26:57 -0000 Received: (qmail 749 invoked by uid 500); 20 Jul 2010 15:26:57 -0000 Delivered-To: apmail-sling-commits-archive@sling.apache.org Received: (qmail 678 invoked by uid 500); 20 Jul 2010 15:26:57 -0000 Mailing-List: contact commits-help@sling.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@sling.apache.org Delivered-To: mailing list commits@sling.apache.org Received: (qmail 663 invoked by uid 99); 20 Jul 2010 15:26:56 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Jul 2010 15:26:56 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Jul 2010 15:26:53 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 6F51B2388A2C; Tue, 20 Jul 2010 15:25:29 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r965878 - in /sling/trunk/contrib/extensions/bgservlets: ./ src/main/java/org/apache/sling/bgservlets/impl/nodestream/ src/test/java/org/apache/sling/bgservlets/impl/nodestream/ Date: Tue, 20 Jul 2010 15:25:29 -0000 To: commits@sling.apache.org From: bdelacretaz@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100720152529.6F51B2388A2C@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: bdelacretaz Date: Tue Jul 20 15:25:28 2010 New Revision: 965878 URL: http://svn.apache.org/viewvc?rev=965878&view=rev Log: SLING-550 - JCR-backed permanent streams, will be used to store the output of background servlets Added: sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/ sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeInputStream.java (with props) sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeOutputStream.java (with props) sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/nodestream/ sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/nodestream/NodeStreamTest.java (with props) Modified: sling/trunk/contrib/extensions/bgservlets/pom.xml Modified: sling/trunk/contrib/extensions/bgservlets/pom.xml URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/bgservlets/pom.xml?rev=965878&r1=965877&r2=965878&view=diff ============================================================================== --- sling/trunk/contrib/extensions/bgservlets/pom.xml (original) +++ sling/trunk/contrib/extensions/bgservlets/pom.xml Tue Jul 20 15:25:28 2010 @@ -97,10 +97,20 @@ provided + org.apache.sling + org.apache.sling.commons.testing + 2.0.4-incubator + test + + javax.servlet servlet-api + javax.jcr + jcr + + org.slf4j slf4j-api Added: sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeInputStream.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeInputStream.java?rev=965878&view=auto ============================================================================== --- sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeInputStream.java (added) +++ sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeInputStream.java Tue Jul 20 15:25:28 2010 @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.bgservlets.impl.nodestream; + +import java.io.IOException; +import java.io.InputStream; + +import javax.jcr.Node; +import javax.jcr.Property; +import javax.jcr.RepositoryException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Reads data stored by a {@link NodeOutputStream} + * and rebuilds a continuous stream out of the + * multiple Properties that it creates. + */ +public class NodeInputStream extends InputStream { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + /** The Node under which we read our data */ + private final Node node; + + /** Counter used to build the name of Property from + * which we currently read */ + private int counter; + + /** Current stream that we are reading */ + private InputStream currentStream; + + NodeInputStream(Node n) throws IOException { + node = n; + selectNextStream(); + } + + /** Select next property to read from and open its stream */ + private void selectNextStream() throws IOException { + counter++; + final String name = NodeOutputStream.STREAM_PROPERTY_NAME_PREFIX + counter; + try { + if(node.hasProperty(name)) { + final Property p = node.getProperty(name); + currentStream = p.getStream(); + log.debug("Switched to the InputStream of Property {}", p.getPath()); + } else { + currentStream = null; + log.debug("Property {} not found, end of stream", node.getPath() + "/" + name); + } + } catch(RepositoryException re) { + throw new IOException("RepositoryException in selectNextProperty()", re); + } + } + + @Override + public int available() throws IOException { + return currentStream == null ? 0 : currentStream.available(); + } + + @Override + public void close() throws IOException { + if(currentStream != null) { + currentStream.close(); + } + super.close(); + } + + @Override + public int read() throws IOException { + if(currentStream == null) { + return -1; + } + int result = currentStream.read(); + if(result == -1) { + selectNextStream(); + return read(); + } + return result; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if(currentStream == null) { + return 0; + } + int result = currentStream.read(b, off, len); + if(result == 0) { + selectNextStream(); + return read(b, off, len); + } + return result; + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } +} Propchange: sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeInputStream.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeInputStream.java ------------------------------------------------------------------------------ svn:keywords = Author Date Id Revision Rev URL Added: sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeOutputStream.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeOutputStream.java?rev=965878&view=auto ============================================================================== --- sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeOutputStream.java (added) +++ sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeOutputStream.java Tue Jul 20 15:25:28 2010 @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.bgservlets.impl.nodestream; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +import javax.jcr.Node; +import javax.jcr.RepositoryException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** An OutputStream stored in properties under + * a JCR node. The content is persisted on + * each flush() call, using sequentially-named + * properties so that {@link NodeInputStream} can + * reconstruct the stream from permanent storage. + * flush() is also called automatically every + * BUFFER_SWITCH_SIZE bytes, to keep our memory + * requirements low. + * + * Meant to be used when running background servlets: + * we want to save their output in a way that + * survives system restart. + */ +public class NodeOutputStream extends OutputStream { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + /** Prefix for Property names used to store our streams */ + public static final String STREAM_PROPERTY_NAME_PREFIX = "_NODE_STREAM_"; + + /** The Node under which we write our data */ + private final Node node; + + /** Counter used to build the name of Property to + * which we currently write */ + private int counter; + + /** Buffer to hold data before writing it to a Property */ + private final ByteArrayOutputStream buffer = new ByteArrayOutputStream(BUFFER_SIZE); + + public static final int BUFFER_SIZE = 32768; + public static final int BUFFER_SWITCH_SIZE = BUFFER_SIZE * 100 / 90; + + public NodeOutputStream(Node n) { + node = n; + } + + /** Calls flush to persist our stream, before closing */ + @Override + public void close() throws IOException { + flush(); + buffer.close(); + } + + /** Store the contents of our buffer to a new Property under our + * node, numbered sequentially. + */ + @Override + public void flush() throws IOException { + counter++; + final String name = NodeOutputStream.STREAM_PROPERTY_NAME_PREFIX + counter; + try { + node.setProperty(name, new ByteArrayInputStream(buffer.toByteArray())); + log.debug("Saved {} bytes to Property {}", buffer.size(), node.getProperty(name).getPath()); + node.save(); + buffer.reset(); + } catch(RepositoryException re) { + throw new IOException("RepositoryException in flush()", re); + } + } + + private void flushIfNeeded() throws IOException { + if(buffer.size() >= BUFFER_SWITCH_SIZE) { + log.debug("Buffer size {} reached switch level {}, flushing", buffer.size(), BUFFER_SWITCH_SIZE); + flush(); + } + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + buffer.write(b, off, len); + flushIfNeeded(); + } + + @Override + public void write(byte[] b) throws IOException { + buffer.write(b); + flushIfNeeded(); + } + + @Override + public void write(int b) throws IOException { + buffer.write(b); + flushIfNeeded(); + } +} Propchange: sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeOutputStream.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeOutputStream.java ------------------------------------------------------------------------------ svn:keywords = Author Date Id Revision Rev URL Added: sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/nodestream/NodeStreamTest.java URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/nodestream/NodeStreamTest.java?rev=965878&view=auto ============================================================================== --- sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/nodestream/NodeStreamTest.java (added) +++ sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/nodestream/NodeStreamTest.java Tue Jul 20 15:25:28 2010 @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.bgservlets.impl.nodestream; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +import javax.jcr.Node; + +import org.apache.sling.commons.testing.jcr.RepositoryTestBase; + +public class NodeStreamTest extends RepositoryTestBase { + + public static final String ASCII_DATA = "0123456789abcdefgjijklmnoprqstuvwxyz"; + public static final byte [] BINARY_DATA = getBinaryData(); + public static final String NAME_PREFIX = "testNode"; + private int counter; + + private void assertStream(InputStream expected, InputStream actual) throws IOException { + int offset = 0; + while(true) { + final int exp = expected.read(); + if(exp == -1) { + assertEquals("Expecting end of actual stream at offset " + offset, -1, actual.read()); + break; + } else { + final int act = actual.read(); + assertEquals("Expecting same data at offset " + offset, exp, act); + } + offset++; + } + } + + public void testAsciiWriteAndRead() throws Exception { + final Node testNode = getTestRootNode().addNode(NAME_PREFIX + counter++); + testNode.getSession().save(); + final NodeOutputStream nos = new NodeOutputStream(testNode); + nos.write(ASCII_DATA.getBytes()); + nos.close(); + final NodeInputStream nis = new NodeInputStream(testNode); + assertStream(new ByteArrayInputStream(ASCII_DATA.getBytes()), nis); + } + + public void testBinaryWriteAndRead() throws Exception { + final Node testNode = getTestRootNode().addNode(NAME_PREFIX + counter++); + testNode.getSession().save(); + final NodeOutputStream nos = new NodeOutputStream(testNode); + nos.write(BINARY_DATA); + nos.close(); + final NodeInputStream nis = new NodeInputStream(testNode); + assertStream(new ByteArrayInputStream(BINARY_DATA), nis); + } + + public void testBigBinaryWriteAndRead() throws Exception { + final int FACTOR = 20; + final byte [] data = bigData(BINARY_DATA, FACTOR); + assertEquals("Expecting " + FACTOR + "x test data size", FACTOR * BINARY_DATA.length, data.length); + final Node testNode = getTestRootNode().addNode(NAME_PREFIX + counter++); + testNode.getSession().save(); + final NodeOutputStream nos = new NodeOutputStream(testNode); + nos.write(data); + nos.close(); + + assertFalse("Expecting no pending changes in testNode session", testNode.getSession().hasPendingChanges()); + + final NodeInputStream nis = new NodeInputStream(testNode); + assertStream(new ByteArrayInputStream(data), nis); + } + + public void testMultipleBinaryWrites() throws Exception { + final int FACTOR = 20; + final Node testNode = getTestRootNode().addNode(NAME_PREFIX + counter++); + testNode.getSession().save(); + final NodeOutputStream nos = new NodeOutputStream(testNode); + for(int i=0; i < FACTOR; i++) { + nos.write(BINARY_DATA); + } + nos.close(); + + assertFalse("Expecting no pending changes in testNode session", testNode.getSession().hasPendingChanges()); + final long propCount = testNode.getProperties().getSize(); + final long expect = 10; + assertTrue("Expecting > " + expect + " properties on test node", propCount > expect); + + final byte [] data = bigData(BINARY_DATA, FACTOR); + final NodeInputStream nis = new NodeInputStream(testNode); + assertStream(new ByteArrayInputStream(data), nis); + } + + public void testWriteWithOffset() throws Exception { + final int FACTOR = 20; + final byte [] data = bigData(BINARY_DATA, FACTOR); + assertEquals("Expecting " + FACTOR + "x test data size", FACTOR * BINARY_DATA.length, data.length); + + final Node testNode = getTestRootNode().addNode(NAME_PREFIX + counter++); + testNode.getSession().save(); + + final NodeOutputStream nos = new NodeOutputStream(testNode); + int offset = 0; + int step = 1271; + while(offset < data.length && step > 0) { + step = Math.min(step, data.length - offset); + nos.write(data, offset, step); + offset += step; + } + nos.close(); + + final NodeInputStream nis = new NodeInputStream(testNode); + assertStream(new ByteArrayInputStream(data), nis); + } + + private byte[] bigData(byte [] data, int multiplier) { + final byte [] result = new byte[data.length * multiplier]; + int destPos = 0; + for(int i=0; i < multiplier; i++) { + System.arraycopy(data, 0, result, destPos, data.length); + destPos += data.length; + } + return result; + } + + private static byte [] getBinaryData() { + final ByteArrayOutputStream os = new ByteArrayOutputStream(); + for(int i=0;i < 66000; i++) { + os.write(i); + } + return os.toByteArray(); + } +} Propchange: sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/nodestream/NodeStreamTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/nodestream/NodeStreamTest.java ------------------------------------------------------------------------------ svn:keywords = Author Date Id Revision Rev URL