commons-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nutty...@apache.org
Subject svn commit: r468153 - in /jakarta/commons/sandbox/pipeline/trunk/src: main/java/org/apache/commons/pipeline/event/ main/java/org/apache/commons/pipeline/stage/ test/java/org/apache/commons/pipeline/stage/
Date Thu, 26 Oct 2006 21:47:26 GMT
Author: nuttycom
Date: Thu Oct 26 14:47:25 2006
New Revision: 468153

URL: http://svn.apache.org/viewvc?view=rev&rev=468153
Log:
Added functionality to allow inter-branch synchronization using event-based key passing.

Added:
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/event/KeyAvailableEvent.java
  (with props)
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/KeyWaitBufferStage.java
  (with props)
    jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/RaiseKeyAvailableEventStage.java
  (with props)
    jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java
  (with props)

Added: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/event/KeyAvailableEvent.java
URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/event/KeyAvailableEvent.java?view=auto&rev=468153
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/event/KeyAvailableEvent.java
(added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/event/KeyAvailableEvent.java
Thu Oct 26 14:47:25 2006
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */ 
+
+package org.apache.commons.pipeline.event;
+
+import java.util.EventObject;
+
+/**
+ * This event is used to signal the availability of the specified key. This is
+ * usually used for inter-branch synchronization using the
+ * {@link org.apache.commons.pipeline.stage.RaiseKeyAvailableEventStage RaiseKeyAvailableEventStage}
+ * and/or the {@link org.apache.commons.pipeline.stage.KeyWaitBufferStage KeyWaitBufferStage}.
+ */
+public class KeyAvailableEvent<T> extends EventObject {
+    private T key;
+    
+    /** Creates a new instance of KeyAvailableEvent */
+    public KeyAvailableEvent(Object source, T key) {
+        super(source);
+        this.key = key;
+    }
+    
+    /**
+     * Returns the key.
+     */
+    public T getKey() {
+        return this.key;
+    }
+}

Propchange: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/event/KeyAvailableEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/KeyWaitBufferStage.java
URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/KeyWaitBufferStage.java?view=auto&rev=468153
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/KeyWaitBufferStage.java
(added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/KeyWaitBufferStage.java
Thu Oct 26 14:47:25 2006
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */ 
+
+package org.apache.commons.pipeline.stage;
+
+import java.util.EventObject;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import org.apache.commons.pipeline.StageContext;
+import org.apache.commons.pipeline.StageEventListener;
+import org.apache.commons.pipeline.StageException;
+import org.apache.commons.pipeline.event.KeyAvailableEvent;
+import org.apache.commons.pipeline.util.KeyFactory;
+import org.apache.commons.pipeline.util.QueueFactory;
+
+/**
+ *
+ * @author kjn
+ */
+public class KeyWaitBufferStage extends BaseStage implements StageEventListener {
+    
+    private Set<Object> receivedKeys = new TreeSet<Object>();
+    private Map<Object,Queue<Object>> buffers = new TreeMap<Object,Queue<Object>>();
+    
+    /** Creates a new instance of KeyWaitBufferStage */
+    public KeyWaitBufferStage() {
+    }
+    
+    public void notify(EventObject ev) {
+        if (ev instanceof KeyAvailableEvent) {
+            KeyAvailableEvent e = (KeyAvailableEvent) ev;
+            synchronized(receivedKeys) {
+                receivedKeys.add(e.getKey());
+            }
+            
+            //at this point, we know that no more objects will be added to
+            //the pending queue for the key, so we can remove and empty it.
+            if (buffers.containsKey(e.getKey())) {
+                for (Object obj : buffers.remove(e.getKey())) this.emit(obj);
+            }
+        }
+    }
+    
+    public void init(StageContext context) {
+        super.init(context);
+        context.registerListener(this);
+    }
+    
+    public void process(Object obj) throws StageException {
+        Object key = keyFactory.generateKey(obj);
+        synchronized(receivedKeys) {
+            if (!receivedKeys.contains(key)) {
+                //store the object in a pending queue.
+                if (!buffers.containsKey(key)) buffers.put(key, queueFactory.createQueue());
+                buffers.get(key).add(obj);
+                return;
+            }
+        }
+        
+        this.emit(obj);
+    }
+
+    /**
+     * Holds value of property keyFactory.
+     */
+    private KeyFactory<Object,? extends Object> keyFactory;
+
+    /**
+     * Getter for property keyFactory.
+     * @return Value of property keyFactory.
+     */
+    public KeyFactory<Object,? extends Object> getKeyFactory() {
+        return this.keyFactory;
+    }
+
+    /**
+     * Setter for property keyFactory.
+     * @param keyFactory New value of property keyFactory.
+     */
+    public void setKeyFactory(KeyFactory<Object,? extends Object> keyFactory) {
+        this.keyFactory = keyFactory;
+    }
+
+    /**
+     * Holds value of property queueFactory.
+     */
+    private QueueFactory<Object> queueFactory;
+
+    /**
+     * Getter for property queueFactory.
+     * @return Value of property queueFactory.
+     */
+    public QueueFactory<Object> getQueueFactory() {
+        return this.queueFactory;
+    }
+
+    /**
+     * Setter for property queueFactory.
+     * @param queueFactory New value of property queueFactory.
+     */
+    public void setQueueFactory(QueueFactory<Object> queueFactory) {
+        this.queueFactory = queueFactory;
+    }
+}

Propchange: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/KeyWaitBufferStage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/RaiseKeyAvailableEventStage.java
URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/RaiseKeyAvailableEventStage.java?view=auto&rev=468153
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/RaiseKeyAvailableEventStage.java
(added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/RaiseKeyAvailableEventStage.java
Thu Oct 26 14:47:25 2006
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */ 
+
+
+package org.apache.commons.pipeline.stage;
+
+import org.apache.commons.pipeline.StageException;
+import org.apache.commons.pipeline.event.KeyAvailableEvent;
+import org.apache.commons.pipeline.util.KeyFactory;
+
+/**
+ *
+ *
+ */
+public class RaiseKeyAvailableEventStage extends BaseStage {
+    
+    /** Creates a new instance of RaiseKeyAvailableEventStage */
+    public RaiseKeyAvailableEventStage() {
+    }
+    
+    /**
+     * This implementation of process() simply generates a key for the 
+     * processed object and raises a KeyAvailableEvent with the generated
+     * key, then emits the processed object unchanged.
+     */
+    public void process(Object obj) throws StageException {
+        this.context.raise(new KeyAvailableEvent(this, keyFactory.generateKey(obj)));
+        this.emit(obj);
+    }    
+    
+    /**
+     * Holds value of property keyFactory.
+     */
+     private KeyFactory<Object,Object> keyFactory;
+    
+    /**
+     * Returns the KeyFactory used to create keys for the objects processed
+     * by this stage.
+     */
+    public KeyFactory<Object,Object> getKeyFactory() {
+        return keyFactory;
+    }    
+    
+    /**
+     * Sets the KeyFactory used to create keys for the objects processed
+     * by this stage.
+     */
+    public void setKeyFactory(KeyFactory<Object,Object> keyFactory) {
+        this.keyFactory = keyFactory;
+    }    
+}

Propchange: jakarta/commons/sandbox/pipeline/trunk/src/main/java/org/apache/commons/pipeline/stage/RaiseKeyAvailableEventStage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java
URL: http://svn.apache.org/viewvc/jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java?view=auto&rev=468153
==============================================================================
--- jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java
(added)
+++ jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java
Thu Oct 26 14:47:25 2006
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */ 
+
+package org.apache.commons.pipeline.stage;
+
+import junit.framework.*;
+import java.util.EventObject;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import org.apache.commons.pipeline.StageContext;
+import org.apache.commons.pipeline.StageEventListener;
+import org.apache.commons.pipeline.StageException;
+import org.apache.commons.pipeline.event.KeyAvailableEvent;
+import org.apache.commons.pipeline.util.KeyFactory;
+import org.apache.commons.pipeline.util.QueueFactory;
+
+/**
+ * Test case for the KeyWaitBufferStage
+ */
+public class KeyWaitBufferStageTest extends AbstractStageTest {
+    
+    public KeyWaitBufferStageTest(String testName) {
+        super(testName);
+    }
+
+    public static Test suite() {
+        TestSuite suite = new TestSuite(KeyWaitBufferStageTest.class);
+        
+        return suite;
+    }
+
+    /**
+     * Test of notify and process methods, of class org.apache.commons.pipeline.stage.KeyWaitBufferStage.
+     * The tests of these methods are coupled since the process method buffers
+     * data waiting for notify() to be called with an appropriate event.
+     */
+    public void testProcessAndNotify() throws Exception {
+        System.out.println("notify");
+        
+        String obj = "Hello, World!";
+        KeyFactory<Object,Integer> keyFactory = new KeyFactory.HashKeyFactory();
+        EventObject ev = new KeyAvailableEvent(this, keyFactory.generateKey(obj));
+
+        KeyWaitBufferStage instance = new KeyWaitBufferStage();
+        instance.setKeyFactory(keyFactory);
+        instance.setQueueFactory(new QueueFactory.LinkedListFactory());
+        
+        this.init(instance);
+                
+        instance.process(obj);
+        
+        assertTrue("The process object was not buffered correctly.", this.testFeeder.receivedValues.isEmpty());
+                
+        instance.notify(ev);
+        
+        assertTrue("The buffer was not properly flushed upon receiving the event.", this.testFeeder.receivedValues.contains(obj));
+    }
+
+    /**
+     * Test of init method, of class org.apache.commons.pipeline.stage.KeyWaitBufferStage.
+     */
+    public void testInit() {
+        KeyWaitBufferStage instance = new KeyWaitBufferStage();
+        
+        instance.init(this.testContext);
+        
+        assertTrue("The automatic registration of the stage as a StageEventListener failed.",
this.testContext.listeners.contains(instance));
+    }
+}

Propchange: jakarta/commons/sandbox/pipeline/trunk/src/test/java/org/apache/commons/pipeline/stage/KeyWaitBufferStageTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: commons-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: commons-dev-help@jakarta.apache.org


Mime
View raw message