incubator-sling-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bdelacre...@apache.org
Subject svn commit: r964017 - in /sling/trunk/contrib/extensions/bgservlets: ./ src/main/java/org/apache/sling/bgservlets/impl/ src/test/ src/test/java/ src/test/java/org/ src/test/java/org/apache/ src/test/java/org/apache/sling/ src/test/java/org/apache/sling...
Date Wed, 14 Jul 2010 12:03:01 GMT
Author: bdelacretaz
Date: Wed Jul 14 12:03:01 2010
New Revision: 964017

URL: http://svn.apache.org/viewvc?rev=964017&view=rev
Log:
SLING-550 - SuspendableOutputStream will be used to suspend/stop background servlets

Added:
    sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/SuspendableOutputStream.java
  (with props)
    sling/trunk/contrib/extensions/bgservlets/src/test/
    sling/trunk/contrib/extensions/bgservlets/src/test/java/
    sling/trunk/contrib/extensions/bgservlets/src/test/java/org/
    sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/
    sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/
    sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/
    sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/
    sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/SuspendableOutputStreamTest.java
  (with props)
Modified:
    sling/trunk/contrib/extensions/bgservlets/pom.xml

Modified: sling/trunk/contrib/extensions/bgservlets/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/bgservlets/pom.xml?rev=964017&r1=964016&r2=964017&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/bgservlets/pom.xml (original)
+++ sling/trunk/contrib/extensions/bgservlets/pom.xml Wed Jul 14 12:03:01 2010
@@ -97,5 +97,10 @@
       <artifactId>org.apache.felix.webconsole</artifactId>
       <version>3.0.0</version>
     </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
\ No newline at end of file

Added: sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/SuspendableOutputStream.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/SuspendableOutputStream.java?rev=964017&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/SuspendableOutputStream.java
(added)
+++ sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/SuspendableOutputStream.java
Wed Jul 14 12:03:01 2010
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.bgservlets.impl;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.sling.bgservlets.JobStatus;
+
+/** Wraps an OutputStream with controls for suspending it or
+ * 	throwing an IOException next time it is written to.
+ * 	Used to suspend background servlets (by blocking the 
+ * 	stream) or stop them (by throwing an exception).
+ */
+public class SuspendableOutputStream extends FilterOutputStream implements JobStatus {
+	private State state = State.RUNNING;
+	private boolean closed = false;
+	
+	@SuppressWarnings("serial")
+	public static class StreamStoppedException extends IOException {
+		StreamStoppedException() {
+			super("Stopped by " + SuspendableOutputStream.class.getSimpleName());
+		}
+	}
+	
+	public SuspendableOutputStream(OutputStream os) {
+		super(os);
+	}
+
+	@Override
+	public void write(byte[] b, int off, int len) throws IOException {
+		checkWritePermission();
+		super.write(b, off, len);
+	}
+
+	@Override
+	public void write(byte[] b) throws IOException {
+		checkWritePermission();
+		super.write(b);
+	}
+
+	@Override
+	public void write(int b) throws IOException {
+		checkWritePermission();
+		super.write(b);
+	}
+	
+	@Override
+	public void close() throws IOException {
+		super.close();
+		state = State.DONE;
+		closed = true;
+	}
+	
+	private void checkWritePermission() throws IOException {
+		if(closed) {
+			throw new IOException("Attempt to write to closed stream");
+		}
+		
+		if(state == State.STOP_REQUESTED || state == State.STOPPED) {
+			state = State.STOPPED;
+			// stopped: throw exception to stop stream user
+			flush();
+			throw new StreamStoppedException();
+			
+		} else if(state == State.SUSPEND_REQUESTED || state == State.SUSPENDED) {
+			synchronized (this) {
+				if(state == State.SUSPEND_REQUESTED || state == State.SUSPENDED)
+					state = State.SUSPENDED;
+					try {
+						// suspended: block until resumed
+						wait();
+					} catch (InterruptedException e) {
+						throw new IOException("InterruptedException in checkWritePermission()", e);
+					}
+			}
+		}
+	}
+
+
+	public State getState() {
+		return state;
+	}
+
+	/** Only SUSPENDED, STOP, and RUNNING make sense here */ 
+	public synchronized void requestStateChange(State s) {
+		boolean illegal = false;
+		
+		if(s == State.SUSPENDED) {
+			if(state == State.RUNNING) {
+				state = State.SUSPEND_REQUESTED;
+			} else if(state == State.SUSPEND_REQUESTED || state == State.SUSPENDED) {
+				// ignore change
+			} else {
+				illegal = true;
+			}
+			
+		} else if(s == State.STOPPED) {
+			if(state == State.RUNNING) {
+				state = State.STOP_REQUESTED;
+			} else if (state == State.STOP_REQUESTED || state == State.STOPPED) {
+				// ignore change
+			} else {
+				illegal = true;
+			}
+			
+		} else if(s == State.RUNNING) {
+			if(state == State.SUSPEND_REQUESTED || state == State.SUSPENDED) {
+				state = State.RUNNING;
+				notify();
+			}
+		}
+		
+		if(illegal) {
+			throw new IllegalStateException("Illegal state change:" + state + " -> " + s);
+		}
+	}
+
+	/** Not implemented
+	 * 	@throws UnsupportedOperationException */
+	public String getPath() {
+		throw new UnsupportedOperationException("getPath() is not applicable to this class");
+	}
+}

Propchange: sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/SuspendableOutputStream.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/contrib/extensions/bgservlets/src/main/java/org/apache/sling/bgservlets/impl/SuspendableOutputStream.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL

Added: sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/SuspendableOutputStreamTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/SuspendableOutputStreamTest.java?rev=964017&view=auto
==============================================================================
--- sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/SuspendableOutputStreamTest.java
(added)
+++ sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/SuspendableOutputStreamTest.java
Wed Jul 14 12:03:01 2010
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.bgservlets.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.sling.bgservlets.JobStatus;
+import org.junit.Test;
+
+public class SuspendableOutputStreamTest {
+	public final static String TEST = "0123456789abcdefghijklmnopqrstuvwxyz";
+	
+	static class WriterThread extends Thread {
+		private final OutputStream os;
+		private final byte [] toWrite = "TEST".getBytes();
+		private Exception runException;
+		final static int WRITE_DELAY = 50;
+		
+		WriterThread(OutputStream os) {
+			this.os = os;
+		}
+		
+		@Override
+		public void run() {
+			try {
+				while(true) {
+					os.write(toWrite);
+					Thread.sleep(WRITE_DELAY);
+				}
+			} catch(Exception e) {
+				runException = e;
+			}
+		}
+ 	}
+	
+	@Test
+	public void testNoSuspend() throws IOException {
+		final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+		final SuspendableOutputStream f = new SuspendableOutputStream(bos);
+		f.write(TEST.getBytes());
+		f.flush();
+		assertEquals("String should be fully written", TEST, bos.toString());
+	}
+	
+	@Test
+	public void testStop() throws IOException {
+		final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+		final SuspendableOutputStream f = new SuspendableOutputStream(bos);
+		assertEquals("Expecting RUNNING state first", JobStatus.State.RUNNING, f.getState());
+		f.write(TEST.getBytes());
+		f.flush();
+		
+		f.requestStateChange(JobStatus.State.STOPPED);
+		assertEquals("Expecting STOP_REQUESTED state before write", JobStatus.State.STOP_REQUESTED,
f.getState());
+		try {
+			f.write("nothing".getBytes());
+			fail("Expected StreamStoppedException when writing to STOPPED stream");
+		} catch(SuspendableOutputStream.StreamStoppedException asExpected) {
+		}
+		
+		assertEquals("Expecting STOPPED state after write", JobStatus.State.STOPPED, f.getState());
+	}
+	
+	@Test
+	public void testSuspend() throws Exception {
+		final ByteArrayOutputStream bos = new ByteArrayOutputStream();
+		final SuspendableOutputStream f = new SuspendableOutputStream(bos);
+		final WriterThread t = new WriterThread(f);
+		t.setDaemon(true);
+		t.start();
+		
+		final long delay = WriterThread.WRITE_DELAY * 3;
+		Thread.sleep(delay);
+		assertTrue("Expecting data to be written by WriterThread", bos.size() > 0);
+		
+		f.requestStateChange(JobStatus.State.SUSPENDED);
+		Thread.sleep(delay);
+		assertEquals("Expecting SUSPEND state after a few writes", JobStatus.State.SUSPENDED, f.getState());
+		
+		final int count = bos.size();
+		Thread.sleep(delay);
+		assertEquals("Expecting no writes in SUSPEND state", count, bos.size());
+		
+		f.requestStateChange(JobStatus.State.RUNNING);
+		Thread.sleep(delay);
+		assertEquals("Expecting RUNNING state", JobStatus.State.RUNNING, f.getState());
+		assertTrue("Expecting data to be written after resuming", bos.size() > count);
+		
+		f.close();
+		Thread.sleep(delay);
+		assertFalse("Expecting WriterThread to end after closing stream", t.isAlive());
+		assertNotNull("Expecting non-null Exception in WriterThread", t.runException);
+		assertTrue("Expecting IOException in WriterThread", t.runException instanceof IOException);
+	}
+}

Propchange: sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/SuspendableOutputStreamTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: sling/trunk/contrib/extensions/bgservlets/src/test/java/org/apache/sling/bgservlets/impl/SuspendableOutputStreamTest.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision Rev URL



Mime
View raw message