incubator-sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bdelacre...@apache.org
Subject svn commit: r965878 - in /sling/trunk/contrib/extensions/bgservlets: ./ src/main/java/org/apache/sling/bgservlets/impl/nodestream/ src/test/java/org/apache/sling/bgservlets/impl/nodestream/
Date Tue, 20 Jul 2010 15:25:29 GMT
Author: bdelacretaz
Date: Tue Jul 20 15:25:28 2010
New Revision: 965878

URL: http://svn.apache.org/viewvc?rev=965878&view=rev
Log:
SLING-550 - JCR-backed permanent streams, will be used to store the output of background servlets

Added:
    sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/
    sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeInputStream.java
  (with props)
    sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeOutputStream.java
  (with props)
    sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/nodestream/
    sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/nodestream/NodeStreamTest.java
  (with props)
Modified:
    sling/trunk/contrib/extensions/bgservlets/pom.xml

Modified: sling/trunk/contrib/extensions/bgservlets/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/bgservlets/pom.xml?rev=965878&r1=965877&r2=965878&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/bgservlets/pom.xml (original)
+++ sling/trunk/contrib/extensions/bgservlets/pom.xml Tue Jul 20 15:25:28 2010
@@ -97,10 +97,20 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.sling</groupId>
+      <artifactId>org.apache.sling.commons.testing</artifactId>
+      <version>2.0.4-incubator</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>javax.servlet</groupId>
       <artifactId>servlet-api</artifactId>
     </dependency>
     <dependency>
+      <groupId>javax.jcr</groupId>
+      <artifactId>jcr</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>

Added: sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeInputStream.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeInputStream.java?rev=965878&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeInputStream.java
(added)
+++ sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeInputStream.java
Tue Jul 20 15:25:28 2010
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.bgservlets.impl.nodestream;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import javax.jcr.Node;
+import javax.jcr.Property;
+import javax.jcr.RepositoryException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Reads data stored by a {@link NodeOutputStream}
+ *  and rebuilds a continuous stream out of the 
+ *  multiple Properties that it creates.
+ */
+public class NodeInputStream extends InputStream {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    
+    /** The Node under which we read our data */
+    private final Node node;
+    
+    /** Counter used to build the name of Property from 
+     *  which we currently read */
+    private int counter;
+    
+    /** Current stream that we are reading */
+    private InputStream currentStream;
+    
+    NodeInputStream(Node n) throws IOException {
+        node = n;
+        selectNextStream();
+    }
+    
+    /** Select next property to read from and open its stream */
+    private void selectNextStream() throws IOException {
+        counter++;
+        final String name = NodeOutputStream.STREAM_PROPERTY_NAME_PREFIX + counter;
+        try {
+            if(node.hasProperty(name)) {
+                final Property p = node.getProperty(name); 
+                currentStream = p.getStream();
+                log.debug("Switched to the InputStream of Property {}", p.getPath());
+            } else {
+                currentStream = null;
+                log.debug("Property {} not found, end of stream", node.getPath() + "/" +
name);
+            }
+        } catch(RepositoryException re) {
+            throw new IOException("RepositoryException in selectNextProperty()", re);
+        }
+    }
+    
+    @Override
+    public int available() throws IOException {
+        return currentStream == null ? 0 : currentStream.available();
+    }
+
+    @Override
+    public void close() throws IOException {
+        if(currentStream != null) {
+            currentStream.close();
+        }
+        super.close();
+    }
+
+    @Override
+    public int read() throws IOException {
+        if(currentStream == null) {
+            return -1;
+        }
+        int result = currentStream.read();
+        if(result == -1) {
+            selectNextStream();
+            return read();
+        }
+        return result;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        if(currentStream == null) {
+            return 0;
+        }
+        int result = currentStream.read(b, off, len);
+        if(result == 0) {
+            selectNextStream();
+            return read(b, off, len);
+        }
+        return result;
+    }
+
+    @Override
+    public int read(byte[] b) throws IOException {
+        return read(b, 0, b.length);
+    }
+}

Propchange: sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeInputStream.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeInputStream.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL

Added: sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeOutputStream.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeOutputStream.java?rev=965878&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeOutputStream.java
(added)
+++ sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeOutputStream.java
Tue Jul 20 15:25:28 2010
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.bgservlets.impl.nodestream;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import javax.jcr.Node;
+import javax.jcr.RepositoryException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/** An OutputStream stored in properties under
+ *  a JCR node. The content is persisted on 
+ *  each flush() call, using sequentially-named
+ *  properties so that {@link NodeInputStream} can
+ *  reconstruct the stream from permanent storage.
+ *  flush() is also called automatically every 
+ *  BUFFER_SWITCH_SIZE bytes, to keep our memory
+ *  requirements low.    
+ *  
+ *  Meant to be used when running background servlets:
+ *  we want to save their output in a way that
+ *  survives system restart.
+ */
+public class NodeOutputStream extends OutputStream {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    /** Prefix for Property names used to store our streams */
+    public static final String STREAM_PROPERTY_NAME_PREFIX = "_NODE_STREAM_";
+    
+    /** The Node under which we write our data */
+    private final Node node;
+    
+    /** Counter used to build the name of Property to
+     *  which we currently write */
+    private int counter;
+    
+    /** Buffer to hold data before writing it to a Property */
+    private final ByteArrayOutputStream buffer = new ByteArrayOutputStream(BUFFER_SIZE);
+    
+    public static final int BUFFER_SIZE = 32768;
+    public static final int BUFFER_SWITCH_SIZE = BUFFER_SIZE * 100 / 90;
+    
+    public NodeOutputStream(Node n) {
+        node = n;
+    }
+    
+    /** Calls flush to persist our stream, before closing */
+    @Override
+    public void close() throws IOException {
+        flush();
+        buffer.close();
+    }
+
+    /** Store the contents of our buffer to a new Property under our
+     *  node, numbered sequentially.
+     */
+    @Override
+    public void flush() throws IOException {
+        counter++;
+        final String name = NodeOutputStream.STREAM_PROPERTY_NAME_PREFIX + counter;
+        try {
+            node.setProperty(name, new ByteArrayInputStream(buffer.toByteArray()));
+            log.debug("Saved {} bytes to Property {}", buffer.size(), node.getProperty(name).getPath());
+            node.save();
+            buffer.reset();
+        } catch(RepositoryException re) {
+            throw new IOException("RepositoryException in flush()", re);
+        }
+    }
+    
+    private void flushIfNeeded() throws IOException {
+        if(buffer.size() >= BUFFER_SWITCH_SIZE) {
+            log.debug("Buffer size {} reached switch level {}, flushing", buffer.size(),
BUFFER_SWITCH_SIZE);
+            flush();
+        }
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        buffer.write(b, off, len);
+        flushIfNeeded();
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+        buffer.write(b);
+        flushIfNeeded();
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        buffer.write(b);
+        flushIfNeeded();
+    }
+}

Propchange: sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeOutputStream.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/nodestream/NodeOutputStream.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL

Added: sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/nodestream/NodeStreamTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/nodestream/NodeStreamTest.java?rev=965878&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/nodestream/NodeStreamTest.java
(added)
+++ sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/nodestream/NodeStreamTest.java
Tue Jul 20 15:25:28 2010
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.bgservlets.impl.nodestream;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import javax.jcr.Node;
+
+import org.apache.sling.commons.testing.jcr.RepositoryTestBase;
+
+public class NodeStreamTest extends RepositoryTestBase {
+    
+    public static final String ASCII_DATA = "0123456789abcdefgjijklmnoprqstuvwxyz";
+    public static final byte [] BINARY_DATA = getBinaryData();
+    public static final String NAME_PREFIX = "testNode";
+    private int counter;
+    
+    private void assertStream(InputStream expected, InputStream actual) throws IOException
{
+        int offset = 0;
+        while(true) {
+            final int exp = expected.read();
+            if(exp == -1) {
+                assertEquals("Expecting end of actual stream at offset " + offset, -1, actual.read());
+                break;
+            } else {
+                final int act = actual.read();
+                assertEquals("Expecting same data at offset " + offset, exp, act);
+            }
+            offset++;
+        }
+    }
+    
+    public void testAsciiWriteAndRead() throws Exception {
+        final Node testNode = getTestRootNode().addNode(NAME_PREFIX + counter++);
+        testNode.getSession().save();
+        final NodeOutputStream nos = new NodeOutputStream(testNode);
+        nos.write(ASCII_DATA.getBytes());
+        nos.close();
+        final NodeInputStream nis = new NodeInputStream(testNode);
+        assertStream(new ByteArrayInputStream(ASCII_DATA.getBytes()), nis);
+    }
+    
+    public void testBinaryWriteAndRead() throws Exception {
+        final Node testNode = getTestRootNode().addNode(NAME_PREFIX + counter++);
+        testNode.getSession().save();
+        final NodeOutputStream nos = new NodeOutputStream(testNode);
+        nos.write(BINARY_DATA);
+        nos.close();
+        final NodeInputStream nis = new NodeInputStream(testNode);
+        assertStream(new ByteArrayInputStream(BINARY_DATA), nis);
+    }
+    
+    public void testBigBinaryWriteAndRead() throws Exception {
+        final int FACTOR = 20;
+        final byte [] data = bigData(BINARY_DATA, FACTOR);
+        assertEquals("Expecting " + FACTOR + "x test data size", FACTOR * BINARY_DATA.length,
data.length);
+        final Node testNode = getTestRootNode().addNode(NAME_PREFIX + counter++);
+        testNode.getSession().save();
+        final NodeOutputStream nos = new NodeOutputStream(testNode);
+        nos.write(data);
+        nos.close();
+        
+        assertFalse("Expecting no pending changes in testNode session", testNode.getSession().hasPendingChanges());
+        
+        final NodeInputStream nis = new NodeInputStream(testNode);
+        assertStream(new ByteArrayInputStream(data), nis);
+    }
+    
+    public void testMultipleBinaryWrites() throws Exception {
+        final int FACTOR = 20;
+        final Node testNode = getTestRootNode().addNode(NAME_PREFIX + counter++);
+        testNode.getSession().save();
+        final NodeOutputStream nos = new NodeOutputStream(testNode);
+        for(int i=0; i < FACTOR; i++) {
+            nos.write(BINARY_DATA);
+        }
+        nos.close();
+        
+        assertFalse("Expecting no pending changes in testNode session", testNode.getSession().hasPendingChanges());
+        final long propCount = testNode.getProperties().getSize();
+        final long expect = 10;
+        assertTrue("Expecting > " + expect + " properties on test node", propCount >
expect);
+
+        final byte [] data = bigData(BINARY_DATA, FACTOR);
+        final NodeInputStream nis = new NodeInputStream(testNode);
+        assertStream(new ByteArrayInputStream(data), nis);
+    }
+    
+    public void testWriteWithOffset() throws Exception {
+        final int FACTOR = 20;
+        final byte [] data = bigData(BINARY_DATA, FACTOR);
+        assertEquals("Expecting " + FACTOR + "x test data size", FACTOR * BINARY_DATA.length,
data.length);
+
+        final Node testNode = getTestRootNode().addNode(NAME_PREFIX + counter++);
+        testNode.getSession().save();
+
+        final NodeOutputStream nos = new NodeOutputStream(testNode);
+        int offset = 0;
+        int step = 1271;
+        while(offset < data.length && step > 0) {
+            step = Math.min(step, data.length - offset);
+            nos.write(data, offset, step);
+            offset += step;
+        }
+        nos.close();
+        
+        final NodeInputStream nis = new NodeInputStream(testNode);
+        assertStream(new ByteArrayInputStream(data), nis);
+    }
+    
+    private byte[] bigData(byte [] data, int multiplier) {
+        final byte [] result = new byte[data.length * multiplier];
+        int destPos = 0;
+        for(int i=0; i < multiplier; i++) {
+            System.arraycopy(data, 0, result, destPos, data.length);
+            destPos += data.length;
+        }
+        return result;
+    }
+    
+    private static byte [] getBinaryData() {
+        final ByteArrayOutputStream os = new ByteArrayOutputStream();
+        for(int i=0;i  < 66000; i++) {
+            os.write(i);
+        }
+        return os.toByteArray();
+    }
+}

Propchange: sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/nodestream/NodeStreamTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/nodestream/NodeStreamTest.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL



Mime
View raw message