Return-Path: Delivered-To: apmail-jakarta-commons-dev-archive@www.apache.org Received: (qmail 68348 invoked from network); 26 Oct 2006 21:47:50 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 26 Oct 2006 21:47:50 -0000 Received: (qmail 45202 invoked by uid 500); 26 Oct 2006 21:47:59 -0000 Delivered-To: apmail-jakarta-commons-dev-archive@jakarta.apache.org Received: (qmail 45129 invoked by uid 500); 26 Oct 2006 21:47:59 -0000 Mailing-List: contact commons-dev-help@jakarta.apache.org; run by ezmlm Precedence: bulk List-Unsubscribe: List-Help: List-Post: List-Id: "Jakarta Commons Developers List" Reply-To: "Jakarta Commons Developers List" Delivered-To: mailing list commons-dev@jakarta.apache.org Received: (qmail 45118 invoked by uid 500); 26 Oct 2006 21:47:59 -0000 Received: (qmail 45115 invoked by uid 99); 26 Oct 2006 21:47:59 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Oct 2006 14:47:59 -0700 X-ASF-Spam-Status: No, hits=0.6 required=10.0 tests=NO_REAL_NAME X-Spam-Check-By: apache.org Received-SPF: pass (herse.apache.org: local policy) Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Oct 2006 14:47:47 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 89CF81A984D; Thu, 26 Oct 2006 14:47:26 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r468153 - in /jakarta/commons/sandbox/pipeline/trunk/src: main/java/org/apache/commons/pipeline/event/ main/java/org/apache/commons/pipeline/stage/ test/java/org/apache/commons/pipeline/stage/ Date: Thu, 26 Oct 2006 21:47:26 -0000 To: commons-cvs@jakarta.apache.org From: nuttycom@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20061026214726.89CF81A984D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: nuttycom Date: Thu Oct 26 14:47:25 2006 New Revision: 468153 URL: http://svn.apache.org/viewvc?view=rev&rev=468153 Log: Added functionality to allow inter-branch synchronization using event-based key passing. Added: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/event/KeyAvailableEvent.java (with props) jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/KeyWaitBufferStage.java (with props) jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/RaiseKeyAvailableEventStage.java (with props) jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java (with props) Added: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/event/KeyAvailableEvent.java URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/event/KeyAvailableEvent.java?view=auto&rev=468153 ============================================================================== --- jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/event/KeyAvailableEvent.java (added) +++ jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/event/KeyAvailableEvent.java Thu Oct 26 14:47:25 2006 @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.commons.pipeline.event; + +import java.util.EventObject; + +/** + * This event is used to signal the availability of the specified key. This is + * usually used for inter-branch synchronization using the + * {@link org.apache.commons.pipeline.stage.RaiseKeyAvailableEventStage RaiseKeyAvailableEventStage} + * and/or the {@link org.apache.commons.pipeline.stage.KeyWaitBufferStage KeyWaitBufferStage}. + */ +public class KeyAvailableEvent extends EventObject { + private T key; + + /** Creates a new instance of KeyAvailableEvent */ + public KeyAvailableEvent(Object source, T key) { + super(source); + this.key = key; + } + + /** + * Returns the key. + */ + public T getKey() { + return this.key; + } +} Propchange: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/event/KeyAvailableEvent.java ------------------------------------------------------------------------------ svn:eol-style = native Added: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/KeyWaitBufferStage.java URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/KeyWaitBufferStage.java?view=auto&rev=468153 ============================================================================== --- jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/KeyWaitBufferStage.java (added) +++ jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/KeyWaitBufferStage.java Thu Oct 26 14:47:25 2006 @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.commons.pipeline.stage; + +import java.util.EventObject; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import org.apache.commons.pipeline.StageContext; +import org.apache.commons.pipeline.StageEventListener; +import org.apache.commons.pipeline.StageException; +import org.apache.commons.pipeline.event.KeyAvailableEvent; +import org.apache.commons.pipeline.util.KeyFactory; +import org.apache.commons.pipeline.util.QueueFactory; + +/** + * + * @author kjn + */ +public class KeyWaitBufferStage extends BaseStage implements StageEventListener { + + private Set receivedKeys = new TreeSet(); + private Map> buffers = new TreeMap>(); + + /** Creates a new instance of KeyWaitBufferStage */ + public KeyWaitBufferStage() { + } + + public void notify(EventObject ev) { + if (ev instanceof KeyAvailableEvent) { + KeyAvailableEvent e = (KeyAvailableEvent) ev; + synchronized(receivedKeys) { + receivedKeys.add(e.getKey()); + } + + //at this point, we know that no more objects will be added to + //the pending queue for the key, so we can remove and empty it. + if (buffers.containsKey(e.getKey())) { + for (Object obj : buffers.remove(e.getKey())) this.emit(obj); + } + } + } + + public void init(StageContext context) { + super.init(context); + context.registerListener(this); + } + + public void process(Object obj) throws StageException { + Object key = keyFactory.generateKey(obj); + synchronized(receivedKeys) { + if (!receivedKeys.contains(key)) { + //store the object in a pending queue. + if (!buffers.containsKey(key)) buffers.put(key, queueFactory.createQueue()); + buffers.get(key).add(obj); + return; + } + } + + this.emit(obj); + } + + /** + * Holds value of property keyFactory. + */ + private KeyFactory keyFactory; + + /** + * Getter for property keyFactory. + * @return Value of property keyFactory. + */ + public KeyFactory getKeyFactory() { + return this.keyFactory; + } + + /** + * Setter for property keyFactory. + * @param keyFactory New value of property keyFactory. + */ + public void setKeyFactory(KeyFactory keyFactory) { + this.keyFactory = keyFactory; + } + + /** + * Holds value of property queueFactory. + */ + private QueueFactory queueFactory; + + /** + * Getter for property queueFactory. + * @return Value of property queueFactory. + */ + public QueueFactory getQueueFactory() { + return this.queueFactory; + } + + /** + * Setter for property queueFactory. + * @param queueFactory New value of property queueFactory. + */ + public void setQueueFactory(QueueFactory queueFactory) { + this.queueFactory = queueFactory; + } +} Propchange: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/KeyWaitBufferStage.java ------------------------------------------------------------------------------ svn:eol-style = native Added: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/RaiseKeyAvailableEventStage.java URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/RaiseKeyAvailableEventStage.java?view=auto&rev=468153 ============================================================================== --- jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/RaiseKeyAvailableEventStage.java (added) +++ jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/RaiseKeyAvailableEventStage.java Thu Oct 26 14:47:25 2006 @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.commons.pipeline.stage; + +import org.apache.commons.pipeline.StageException; +import org.apache.commons.pipeline.event.KeyAvailableEvent; +import org.apache.commons.pipeline.util.KeyFactory; + +/** + * + * + */ +public class RaiseKeyAvailableEventStage extends BaseStage { + + /** Creates a new instance of RaiseKeyAvailableEventStage */ + public RaiseKeyAvailableEventStage() { + } + + /** + * This implementation of process() simply generates a key for the + * processed object and raises a KeyAvailableEvent with the generated + * key, then emits the processed object unchanged. + */ + public void process(Object obj) throws StageException { + this.context.raise(new KeyAvailableEvent(this, keyFactory.generateKey(obj))); + this.emit(obj); + } + + /** + * Holds value of property keyFactory. + */ + private KeyFactory keyFactory; + + /** + * Returns the KeyFactory used to create keys for the objects processed + * by this stage. + */ + public KeyFactory getKeyFactory() { + return keyFactory; + } + + /** + * Sets the KeyFactory used to create keys for the objects processed + * by this stage. + */ + public void setKeyFactory(KeyFactory keyFactory) { + this.keyFactory = keyFactory; + } +} Propchange: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/RaiseKeyAvailableEventStage.java ------------------------------------------------------------------------------ svn:eol-style = native Added: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java?view=auto&rev=468153 ============================================================================== --- jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java (added) +++ jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java Thu Oct 26 14:47:25 2006 @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.commons.pipeline.stage; + +import junit.framework.*; +import java.util.EventObject; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import org.apache.commons.pipeline.StageContext; +import org.apache.commons.pipeline.StageEventListener; +import org.apache.commons.pipeline.StageException; +import org.apache.commons.pipeline.event.KeyAvailableEvent; +import org.apache.commons.pipeline.util.KeyFactory; +import org.apache.commons.pipeline.util.QueueFactory; + +/** + * Test case for the KeyWaitBufferStage + */ +public class KeyWaitBufferStageTest extends AbstractStageTest { + + public KeyWaitBufferStageTest(String testName) { + super(testName); + } + + public static Test suite() { + TestSuite suite = new TestSuite(KeyWaitBufferStageTest.class); + + return suite; + } + + /** + * Test of notify and process methods, of class org.apache.commons.pipeline.stage.KeyWaitBufferStage. + * The tests of these methods are coupled since the process method buffers + * data waiting for notify() to be called with an appropriate event. + */ + public void testProcessAndNotify() throws Exception { + System.out.println("notify"); + + String obj = "Hello, World!"; + KeyFactory keyFactory = new KeyFactory.HashKeyFactory(); + EventObject ev = new KeyAvailableEvent(this, keyFactory.generateKey(obj)); + + KeyWaitBufferStage instance = new KeyWaitBufferStage(); + instance.setKeyFactory(keyFactory); + instance.setQueueFactory(new QueueFactory.LinkedListFactory()); + + this.init(instance); + + instance.process(obj); + + assertTrue("The process object was not buffered correctly.", this.testFeeder.receivedValues.isEmpty()); + + instance.notify(ev); + + assertTrue("The buffer was not properly flushed upon receiving the event.", this.testFeeder.receivedValues.contains(obj)); + } + + /** + * Test of init method, of class org.apache.commons.pipeline.stage.KeyWaitBufferStage. + */ + public void testInit() { + KeyWaitBufferStage instance = new KeyWaitBufferStage(); + + instance.init(this.testContext); + + assertTrue("The automatic registration of the stage as a StageEventListener failed.", this.testContext.listeners.contains(instance)); + } +} Propchange: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java ------------------------------------------------------------------------------ svn:eol-style = native --------------------------------------------------------------------- To unsubscribe, e-mail: commons-dev-unsubscribe@jakarta.apache.org For additional commands, e-mail: commons-dev-help@jakarta.apache.org