archiva-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jdu...@apache.org
Subject svn commit: r694244 - in /archiva/sandbox/archiva-event: ./ src/main/java/org/apache/archiva/event/ src/main/java/org/apache/archiva/event/impl/ src/test/java/com/ src/test/java/org/ src/test/java/org/apache/ src/test/java/org/apache/archiva/ src/test/...
Date Thu, 11 Sep 2008 13:27:24 GMT
Author: jdumay
Date: Thu Sep 11 06:27:23 2008
New Revision: 694244

URL: http://svn.apache.org/viewvc?rev=694244&view=rev
Log:
Archiva Event prototype. Allows events to be emitted async to observers.

Added:
    archiva/sandbox/archiva-event/src/main/java/org/apache/archiva/event/AsynchronousEventBus.java
    archiva/sandbox/archiva-event/src/test/java/org/
    archiva/sandbox/archiva-event/src/test/java/org/apache/
    archiva/sandbox/archiva-event/src/test/java/org/apache/archiva/
    archiva/sandbox/archiva-event/src/test/java/org/apache/archiva/event/
    archiva/sandbox/archiva-event/src/test/java/org/apache/archiva/event/AsynchronousEventBusTest.java
Removed:
    archiva/sandbox/archiva-event/src/test/java/com/
Modified:
    archiva/sandbox/archiva-event/pom.xml
    archiva/sandbox/archiva-event/src/main/java/org/apache/archiva/event/EventBus.java
    archiva/sandbox/archiva-event/src/main/java/org/apache/archiva/event/GenericPredicate.java
    archiva/sandbox/archiva-event/src/main/java/org/apache/archiva/event/impl/JmsEventBus.java

Modified: archiva/sandbox/archiva-event/pom.xml
URL: http://svn.apache.org/viewvc/archiva/sandbox/archiva-event/pom.xml?rev=694244&r1=694243&r2=694244&view=diff
==============================================================================
--- archiva/sandbox/archiva-event/pom.xml (original)
+++ archiva/sandbox/archiva-event/pom.xml Thu Sep 11 06:27:23 2008
@@ -1,12 +1,11 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
   <modelVersion>4.0.0</modelVersion>
-  <groupId>com.mycompany</groupId>
-  <artifactId>archiva-events</artifactId>
+  <groupId>org.apache.archiva</groupId>
+  <artifactId>archiva-event</artifactId>
   <packaging>jar</packaging>
-  <version>1.0-SNAPSHOT</version>
-  <name>archiva-events</name>
-  <url>http://maven.apache.org</url>
+  <version>1.2-SNAPSHOT</version>
+  <name>Archiva Core :: Archiva Event</name>
   <build>
         <plugins>
             <plugin>
@@ -20,11 +19,6 @@
         </plugins>
     </build>
   <dependencies>
-      <dependency>
-          <groupId>commons-collections</groupId>
-          <artifactId>commons-collections</artifactId>
-          <version>3.2.1</version>
-      </dependency>
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>

Added: archiva/sandbox/archiva-event/src/main/java/org/apache/archiva/event/AsynchronousEventBus.java
URL: http://svn.apache.org/viewvc/archiva/sandbox/archiva-event/src/main/java/org/apache/archiva/event/AsynchronousEventBus.java?rev=694244&view=auto
==============================================================================
--- archiva/sandbox/archiva-event/src/main/java/org/apache/archiva/event/AsynchronousEventBus.java
(added)
+++ archiva/sandbox/archiva-event/src/main/java/org/apache/archiva/event/AsynchronousEventBus.java
Thu Sep 11 06:27:23 2008
@@ -0,0 +1,108 @@
+package org.apache.archiva.event;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.archiva.event.Event;
+import org.apache.archiva.event.EventBus;
+import org.apache.archiva.event.EventEmitter;
+import org.apache.archiva.event.EventMessage;
+import org.apache.archiva.event.EventObserver;
+
+/**
+ * Simple Async Event Bus implementation
+ * 
+ * @author jdumay
+ */
+public class AsynchronousEventBus implements EventBus
+{
+    private final Set<EventObserver> observers = Collections.synchronizedSet(new HashSet());
+
+    private final BlockingQueue<Event> events = new LinkedBlockingQueue<Event>();
+
+    private final Thread workerThread;
+
+    private final int threads;
+
+    public AsynchronousEventBus(int threads)
+    {
+        this.threads = threads;
+        workerThread = new Thread(new WorkerRunnable());
+        workerThread.start();
+    }
+
+    public void emit(EventEmitter emitter, EventMessage message)
+    {
+        events.offer(new Event(emitter, message));
+    }
+
+    public void subscribe(EventObserver observer)
+    {
+        observers.add(observer);
+    }
+
+    public void unsubscribe(EventObserver observer)
+    {
+        observers.remove(observer);
+    }
+
+    public Set<EventObserver> getObservers() {
+        return new HashSet<EventObserver>(observers);
+    }
+
+    public boolean hasEvents()
+    {
+        return events.size() > 0;
+    }
+
+    class WorkerRunnable implements Runnable
+    {
+        private final ExecutorService service;
+
+        public WorkerRunnable()
+        {
+            service = Executors.newFixedThreadPool(threads);
+        }
+        
+        public void run()
+        {
+            while (true)
+            {
+                dequeueAndExecute();
+            }
+        }
+
+        private void dequeueAndExecute()
+        {
+            try
+            {
+                final Event event = events.take();
+                for (final EventObserver observer : observers)
+                {
+                    service.execute(new Runnable()
+                    {
+                        public void run()
+                        {
+                            try
+                            {
+                                observer.observe(event);
+                            }
+                            finally
+                            {
+                                //log me
+                            }
+                        }
+                    });
+                }
+            }
+            catch (InterruptedException e)
+            {
+                //Do nothing
+            }
+        }
+    }
+}

Modified: archiva/sandbox/archiva-event/src/main/java/org/apache/archiva/event/EventBus.java
URL: http://svn.apache.org/viewvc/archiva/sandbox/archiva-event/src/main/java/org/apache/archiva/event/EventBus.java?rev=694244&r1=694243&r2=694244&view=diff
==============================================================================
--- archiva/sandbox/archiva-event/src/main/java/org/apache/archiva/event/EventBus.java (original)
+++ archiva/sandbox/archiva-event/src/main/java/org/apache/archiva/event/EventBus.java Thu
Sep 11 06:27:23 2008
@@ -1,7 +1,9 @@
 package org.apache.archiva.event;
 
+import java.util.Set;
+
 /**
- * Allows a emitter to publish
+ * Allows implementer to emit to, subscribe and unsubscribe EventObservers
  */
 public interface EventBus
 {
@@ -23,4 +25,10 @@
      * @param observer
      */
     void unsubscribe(EventObserver observer);
+
+    /**
+     * Get the set of registered EventObservers
+     * @return
+     */
+    Set<EventObserver> getObservers();
 }

Modified: archiva/sandbox/archiva-event/src/main/java/org/apache/archiva/event/GenericPredicate.java
URL: http://svn.apache.org/viewvc/archiva/sandbox/archiva-event/src/main/java/org/apache/archiva/event/GenericPredicate.java?rev=694244&r1=694243&r2=694244&view=diff
==============================================================================
--- archiva/sandbox/archiva-event/src/main/java/org/apache/archiva/event/GenericPredicate.java
(original)
+++ archiva/sandbox/archiva-event/src/main/java/org/apache/archiva/event/GenericPredicate.java
Thu Sep 11 06:27:23 2008
@@ -1,12 +0,0 @@
-package org.apache.archiva.event;
-
-import org.apache.commons.collections.Predicate;
-
-/**
- *
- * @author jdumay
- */
-public interface GenericPredicate<T> extends Predicate
-{
-    public boolean evaluate(T object);
-}

Modified: archiva/sandbox/archiva-event/src/main/java/org/apache/archiva/event/impl/JmsEventBus.java
URL: http://svn.apache.org/viewvc/archiva/sandbox/archiva-event/src/main/java/org/apache/archiva/event/impl/JmsEventBus.java?rev=694244&r1=694243&r2=694244&view=diff
==============================================================================
--- archiva/sandbox/archiva-event/src/main/java/org/apache/archiva/event/impl/JmsEventBus.java
(original)
+++ archiva/sandbox/archiva-event/src/main/java/org/apache/archiva/event/impl/JmsEventBus.java
Thu Sep 11 06:27:23 2008
@@ -1,54 +0,0 @@
-package org.apache.archiva.event.impl;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.archiva.event.Event;
-import org.apache.archiva.event.EventBus;
-import org.apache.archiva.event.EventEmitter;
-import org.apache.archiva.event.EventMessage;
-import org.apache.archiva.event.EventObserver;
-
-public class JmsEventBus implements EventBus
-{
-    private final Set<EventObserver> observers = Collections.synchronizedSet(new HashSet());
-
-    public JmsEventBus()
-    {
-        
-    }
-
-    public void emit(EventEmitter emitter, EventMessage message)
-    {
-        final Event event = new Event(emitter, message);
-        for (EventObserver observer : observers)
-        {
-            observer.observe(event);
-        }
-    }
-
-    public void subscribe(EventObserver observer)
-    {
-        //todo
-    }
-
-    public void unsubscribe(EventObserver observer)
-    {
-        //todo
-    }
-
-    private class PointToPointObserver implements EventObserver
-    {
-        private final EventObserver observer;
-
-        public PointToPointObserver(EventObserver observer)
-        {
-            this.observer = observer;
-        }
-
-        public void observe(Event event)
-        {
-            throw new UnsupportedOperationException("Not supported yet.");
-        }
-    }
-}

Added: archiva/sandbox/archiva-event/src/test/java/org/apache/archiva/event/AsynchronousEventBusTest.java
URL: http://svn.apache.org/viewvc/archiva/sandbox/archiva-event/src/test/java/org/apache/archiva/event/AsynchronousEventBusTest.java?rev=694244&view=auto
==============================================================================
--- archiva/sandbox/archiva-event/src/test/java/org/apache/archiva/event/AsynchronousEventBusTest.java
(added)
+++ archiva/sandbox/archiva-event/src/test/java/org/apache/archiva/event/AsynchronousEventBusTest.java
Thu Sep 11 06:27:23 2008
@@ -0,0 +1,51 @@
+package org.apache.archiva.event;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import junit.framework.TestCase;
+
+public class AsynchronousEventBusTest extends TestCase
+{
+    public void testSubscribeUnsubscribe() throws Exception
+    {
+        AsynchronousEventBus bus = new AsynchronousEventBus(1);
+        MockObserver observer = new MockObserver();
+
+        assertEquals(0, bus.getObservers().size());
+
+        bus.subscribe(observer);
+        assertTrue(bus.getObservers().contains(observer));
+
+        bus.unsubscribe(observer);
+        assertFalse(bus.getObservers().contains(bus));
+    }
+
+    public void testAllEventsAreObserved() throws Exception
+    {
+        AsynchronousEventBus bus = new AsynchronousEventBus(1);
+        MockObserver observer = new MockObserver();
+        bus.subscribe(observer);
+        
+        for (int i = 0; i < 10; i++)
+        {
+            bus.emit(new EventEmitter() {}, new EventMessage() {});
+        }
+
+        while (observer.observedEvents.size() != 10)
+        {
+        }
+
+        assertEquals(10, observer.observedEvents.size());
+    }
+
+    class MockObserver implements EventObserver
+    {
+        final List<Event> observedEvents = Collections.synchronizedList(new ArrayList());
+        
+        public void observe(Event event)
+        {
+            observedEvents.add(event);
+        }
+    }
+}



Mime
View raw message