abdera-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jmsn...@apache.org
Subject svn commit: r1173209 [14/49] - in /abdera/abdera2: ./ .settings/ activities/ activities/src/ activities/src/main/ activities/src/main/java/ activities/src/main/java/org/ activities/src/main/java/org/apache/ activities/src/main/java/org/apache/abdera2/ ...
Date Tue, 20 Sep 2011 15:57:20 GMT
Added: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/AbstractAbderaServlet.java
URL: http://svn.apache.org/viewvc/abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/AbstractAbderaServlet.java?rev=1173209&view=auto
==============================================================================
--- abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/AbstractAbderaServlet.java (added)
+++ abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/AbstractAbderaServlet.java Tue Sep 20 15:56:46 2011
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.abdera2.common.protocol.servlet;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.activation.MimeType;
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.abdera2.common.http.CacheControl;
+import org.apache.abdera2.common.protocol.RequestContext;
+import org.apache.abdera2.common.protocol.ResponseContext;
+import org.apache.abdera2.common.protocol.FilterChain;
+import org.apache.abdera2.common.protocol.Provider;
+import org.apache.abdera2.common.protocol.ServiceManager;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public abstract class AbstractAbderaServlet
+  extends HttpServlet {
+
+  private static final long serialVersionUID = 2722733242417632126L;
+
+    private final static Log log = LogFactory.getLog(AbstractAbderaServlet.class);
+
+    protected ServiceManager<Provider> manager;
+    protected Provider provider;
+
+    @Override
+    public void init() throws ServletException {
+      log.debug("Initialing Abdera Servlet");
+        manager = createServiceManager();
+        provider = createProvider();
+        log.debug("Using manager - " + manager);
+        log.debug("Using provider - " + provider);
+    }
+
+    protected ServiceManager<Provider> getServiceManager() {
+        return manager;
+    }
+
+    @SuppressWarnings("unchecked")
+    protected ServiceManager<Provider> createServiceManager() {
+        String prop = this.getInitParameter(ServiceManager.class.getName());
+        return prop != null ? 
+          ServiceManager.Factory.getInstance(prop) :  
+          ServiceManager.Factory.getInstance();
+    }
+
+    protected Provider createProvider() {
+        return manager.newProvider(getProperties(getServletConfig()));
+    }
+
+    protected void process(
+      HttpServletRequest request,
+      HttpServletResponse response,
+      ServletContext context) {
+      RequestContext reqcontext = new ServletRequestContext(provider, request, context);
+      FilterChain chain = 
+        new FilterChain(provider, reqcontext);
+      try {
+          output(request, response, chain.next(reqcontext));
+      } catch (Throwable t) {
+          error("Error servicing request", t, response);
+          return;
+      }
+      log.debug("Request complete");
+    }
+    
+    protected void output(HttpServletRequest request, HttpServletResponse response, ResponseContext context)
+        throws IOException {
+        if (context != null) {
+            response.setStatus(context.getStatus());
+            long cl = context.getContentLength();
+            CacheControl cc = context.getCacheControl();
+            if (cl > -1)
+                response.setHeader("Content-Length", Long.toString(cl));
+            if (cc != null)
+                response.setHeader("Cache-Control", cc.toString());
+            try {
+                MimeType ct = context.getContentType();
+                if (ct != null)
+                    response.setContentType(ct.toString());
+            } catch (Exception e) {
+            }
+            Iterable<String> names = context.getHeaderNames();
+            for (String name : names) {
+                Iterable<Object> headers = context.getHeaders(name);
+                for (Object value : headers) {
+                    if (value instanceof Date)
+                        response.addDateHeader(name, ((Date)value).getTime());
+                    else
+                        response.addHeader(name, value.toString());
+                }
+            }
+            if (!request.getMethod().equals("HEAD") && context.hasEntity()) {
+                context.writeTo(response.getOutputStream());
+            }
+        } else {
+            error("Internal Server Error", null, response);
+        }
+    }
+
+    protected void error(String message, Throwable t, HttpServletResponse response) {
+      try {
+        if (t != null)
+            log.error(message, t);
+        else
+            log.error(message);
+
+        if (response.isCommitted()) {
+            log.error("Could not write an error message as the headers & HTTP status were already committed!");
+        } else {
+            ResponseContext resp = 
+              provider.createErrorResponse(500, message, t);
+            response.setStatus(500);
+            response.setCharacterEncoding("UTF-8");
+            resp.writeTo(response.getOutputStream());
+        }
+      } catch (IOException e) {
+        log.error("Error writing to output stream",e);
+      }
+    }
+
+    protected Map<String, String> getProperties(ServletConfig config) {
+        Map<String, String> properties = new HashMap<String, String>();
+        Enumeration<String> e = config.getInitParameterNames();
+        while (e.hasMoreElements()) {
+            String key = e.nextElement();
+            String val = config.getInitParameter(key);
+            properties.put(key, val);
+        }
+        return properties;
+    }
+
+}

Propchange: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/AbstractAbderaServlet.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/ServletRequestContext.java
URL: http://svn.apache.org/viewvc/abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/ServletRequestContext.java?rev=1173209&view=auto
==============================================================================
--- abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/ServletRequestContext.java (added)
+++ abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/ServletRequestContext.java Tue Sep 20 15:56:46 2011
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.abdera2.common.protocol.servlet;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Locale;
+
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpSession;
+
+
+import org.apache.abdera2.common.Localizer;
+import org.apache.abdera2.common.iri.IRI;
+import org.apache.abdera2.common.protocol.AbstractBaseRequestContext;
+import org.apache.abdera2.common.protocol.Provider;
+
+@SuppressWarnings({ "unchecked" })
+public class ServletRequestContext 
+  extends AbstractBaseRequestContext {
+
+    private final HttpServletRequest request;
+    private final ServletContext servletContext;
+    private HttpSession session;
+
+    public ServletRequestContext(Provider provider, HttpServletRequest request, ServletContext servletContext) {
+        super(provider, request.getMethod(), initRequestUri(request), initBaseUri(provider, request));
+        this.request = request;
+        this.servletContext = servletContext;
+        this.session = request.getSession(false);
+        this.principal = request.getUserPrincipal();
+        this.subject = provider.resolveSubject(this);
+        this.target = initTarget();
+    }
+
+    public Reader getReader() throws IOException {
+        return request.getReader();
+    }
+
+    public InputStream getInputStream() throws IOException {
+        return request.getInputStream();
+    }
+
+    public HttpServletRequest getRequest() {
+        return request;
+    }
+
+    public ServletContext getServletContext() {
+        return servletContext;
+    }
+
+    public synchronized HttpSession getSession() {
+        return getSession(false);
+    }
+
+    public synchronized HttpSession getSession(boolean create) {
+        if (session == null)
+            session = request.getSession(create);
+        return session;
+    }
+
+    public ServletRequestContext setAttribute(Scope scope, String name, Object value) {
+        switch (scope) {
+            case REQUEST:
+                request.setAttribute(name, value);
+                break;
+            case SESSION:
+                getSession(true).setAttribute(name, value);
+                break;
+            case CONTAINER: {
+                ServletContext scontext = getServletContext();
+                if (scontext != null)
+                    scontext.setAttribute(name, value);
+            }
+        }
+        return this;
+    }
+
+    public Object getAttribute(Scope scope, String name) {
+        switch (scope) {
+            case REQUEST:
+                return request.getAttribute(name);
+            case SESSION:
+                return (session != null) ? session.getAttribute(name) : null;
+            case CONTAINER: {
+                ServletContext scontext = getServletContext();
+                return scontext != null ? scontext.getAttribute(name) : null;
+            }
+        }
+        return null;
+    }
+
+    public Iterable<String> getAttributeNames(Scope scope) {
+        switch (scope) {
+            case REQUEST:
+                return enum2array(request.getAttributeNames());
+            case SESSION:
+                return (session != null) ? enum2array(session.getAttributeNames()) : null;
+            case CONTAINER: {
+                ServletContext scontext = getServletContext();
+                return scontext != null ? enum2array(scontext.getAttributeNames()) : null;
+            }
+        }
+        return null;
+    }
+
+    public String getParameter(String name) {
+        return request.getParameter(name);
+    }
+
+    public Iterable<String> getParameterNames() {
+        return enum2array(request.getParameterNames());
+    }
+
+    public List<String> getParameters(String name) {
+        String[] values = request.getParameterValues(name);
+        return values != null ? java.util.Arrays.asList(values) : null;
+    }
+
+    public Date getDateHeader(String name) {
+        long value = request.getDateHeader(name);
+        return value != -1 ? new Date(value) : null;
+    }
+
+    public String getHeader(String name) {
+        return request.getHeader(name);
+    }
+
+    public Iterable<String> getHeaderNames() {
+        return enum2array(request.getHeaderNames());
+    }
+
+    public Iterable<Object> getHeaders(String name) {
+        Enumeration<?> e = request.getHeaders(name);
+        List<Object> list = new ArrayList<Object>();
+        while(e.hasMoreElements())
+          list.add(e.nextElement());
+        return list;
+    }
+
+    private static Iterable<String> enum2array(Enumeration<String> e) {
+        return java.util.Collections.list(e);
+    }
+
+    private static String getHost(Provider provider, HttpServletRequest request) {
+        String host = provider.getProperty("org.apache.abdera.protocol.server.Host");
+        return (host != null) ? host : request.getServerName();
+    }
+
+    private static int getPort(Provider provider, HttpServletRequest request) {
+        String port = provider.getProperty("org.apache.abdera.protocol.server.Port");
+        return (port != null) ? Integer.parseInt(port) : request.getServerPort();
+    }
+
+    private static IRI initBaseUri(Provider provider, HttpServletRequest request) {
+        StringBuilder buffer = new StringBuilder((request.isSecure()) ? "https" : "http");
+        buffer.append("://");
+        buffer.append(getHost(provider, request));
+        int port = getPort(provider, request);
+        if ((port != 80) && (port != 443)) {
+            buffer.append(":");
+            buffer.append(port);
+        }
+        buffer.append(request.getContextPath());
+        // So that .resolve() works appropriately.
+        buffer.append("/");
+        return new IRI(buffer.toString());
+    }
+
+    private static IRI initRequestUri(HttpServletRequest request) {
+        IRI uri;
+        StringBuilder buf = new StringBuilder(request.getRequestURI());
+        String qs = request.getQueryString();
+        if (qs != null && qs.length() != 0)
+            buf.append("?").append(request.getQueryString());
+        uri = new IRI(buf.toString());
+        return uri;
+    }
+
+    public boolean isUserInRole(String role) {
+        return request.isUserInRole(role);
+    }
+
+    public String getContextPath() {
+        return request.getContextPath();
+    }
+
+    public Locale getPreferredLocale() {
+        return request.getLocale();
+    }
+
+    public Iterable<Locale> getPreferredLocales() {
+        return Collections.list(request.getLocales());
+    }
+
+    public String getTargetBasePath() {
+        return request.getContextPath() + request.getServletPath();
+    }
+
+    public Object getProperty(Property property) {
+      switch (property) {
+      case SESSIONID:
+          return (session != null) ? session.getId() : null;
+      case SESSIONCREATED:
+          return (session != null) ? new Date(session.getCreationTime()) : null;
+      case SESSIONACCESSED:
+          return (session != null) ? new Date(session.getLastAccessedTime()) : null;
+      case SESSIONTIMEOUT:
+          return (session != null) ? new Integer(session.getMaxInactiveInterval()) : new Integer((-1));
+      case CHARACTERENCODING:
+          return request.getCharacterEncoding();
+      case LOCALES:
+          return request.getLocales();
+      case PROTOCOL:
+          return request.getProtocol();
+      case REMOTEADDRESS:
+          return request.getRemoteAddr();
+      case REMOTEHOST:
+          return request.getRemoteHost();
+      case REMOTEUSER:
+          return request.getRemoteUser();
+      case SCHEME:
+          return request.getScheme();
+      case PRINCIPAL:
+          return request.getUserPrincipal();
+      case AUTHTYPE:
+          return request.getAuthType();
+      case CONTENTLENGTH:
+          return new Integer(request.getContentLength());
+      case CONTENTTYPE:
+          return request.getContentType();
+      case CONTEXTPATH:
+          return request.getContextPath();
+      case LOCALADDR:
+          return request.getLocalAddr();
+      case LOCALNAME:
+          return request.getLocalName();
+      case SERVERNAME:
+          return request.getServerName();
+      case SERVERPORT:
+          return new Integer(request.getServerPort());
+      case SECURE:
+          return (Boolean)request.isSecure();
+      case PARTS: {
+        try {
+          return request.getParts();
+        } catch (Throwable t) {
+          throw new RuntimeException(t);
+        }
+      }
+      default:
+          throw new UnsupportedOperationException(Localizer.get("PROPERTY.NOT.SUPPORTED"));
+  }
+    }
+}

Propchange: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/ServletRequestContext.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/AbderaAsyncService.java
URL: http://svn.apache.org/viewvc/abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/AbderaAsyncService.java?rev=1173209&view=auto
==============================================================================
--- abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/AbderaAsyncService.java (added)
+++ abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/AbderaAsyncService.java Tue Sep 20 15:56:46 2011
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.abdera2.common.protocol.servlet.async;
+
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.servlet.ServletContext;
+import javax.servlet.ServletContextEvent;
+import javax.servlet.ServletContextListener;
+import javax.servlet.annotation.WebListener;
+
+import org.apache.abdera2.common.protocol.Provider;
+import org.apache.abdera2.common.protocol.ServiceManager;
+import org.apache.abdera2.common.pusher.ChannelManager;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+@WebListener
+public class AbderaAsyncService 
+  implements ServletContextListener, Runnable {
+
+    private static final int DEFAULT_WORKER_THREADS = 10;
+    public static final String PROPERTY_WORKER_THREADS = "AbderaWorkerThreadCount";
+    public static final String PROPERTY_ATOMPUB_SERVICE = "AbderaAtompubService";
+    public static final String PROPERTY_CHANNEL_SERVICE = "AbderaChannelService";
+  
+    public static final String RUNNER = "AbderaRunner";
+    public static final String SERVICEMANAGER = "AbderaServiceManager";
+    public static final String PROVIDER = "AbderaProvider";
+    public static final String QUEUE = "AbderaProcessorQueue";
+    public static final String CM = "AbderaChannelManager";
+  
+    private final static Log log = LogFactory.getLog(AbderaAsyncService.class);
+    
+    private ServletContext context;
+    private TaskExecutor exec;
+    private ProcessorQueue queue;
+    private ChannelManager cm;
+    private boolean deployAtompubService;
+    private boolean deployChannelService;
+  
+    public AbderaAsyncService() {
+      log.debug("Abdera Async Service Created");
+    }
+    
+    protected Map<String, String> getProperties(ServletContext context) {
+      Map<String, String> properties = new HashMap<String, String>();
+      Enumeration<String> e = context.getInitParameterNames();
+      while (e.hasMoreElements()) {
+          String key = e.nextElement();
+          String val = context.getInitParameter(key);
+          properties.put(key, val);
+      }
+      return properties;
+    }
+    
+    private int worker_threads(Map<String,String> properties) {
+      int c = DEFAULT_WORKER_THREADS;
+      if (properties.containsKey(PROPERTY_WORKER_THREADS)) {
+        String val = properties.get(PROPERTY_WORKER_THREADS);
+        c = Math.max(1,Integer.parseInt(val));
+      }
+      return c;
+    }
+    
+    private boolean getBooleanProperty(Map<String, String> properties, String name, boolean def) {
+      boolean answer = def;
+      if (properties.containsKey(name)) {
+        String val = properties.get(name);
+        answer = "TRUE".equalsIgnoreCase(val) || 
+                 "1".equals(val) ||
+                 "YES".equalsIgnoreCase(val);
+      }
+      return answer;
+    }
+    
+    public boolean isDeployAtompubService(Map<String,String> properties) {
+      return getBooleanProperty(properties,PROPERTY_ATOMPUB_SERVICE,false);
+    }
+    
+    public boolean isDeployChannelService(Map<String,String> properties) {
+      return getBooleanProperty(properties,PROPERTY_CHANNEL_SERVICE,false);
+    }
+    
+    @SuppressWarnings("unchecked")
+    protected ServiceManager<Provider> createServiceManager(ServletContext context) {
+      String prop = context.getInitParameter(ServiceManager.class.getName());
+      return prop != null ? 
+        ServiceManager.Factory.getInstance(prop) :  
+        ServiceManager.Factory.getInstance();
+    }
+
+    
+    public void contextInitialized(ServletContextEvent event) {   
+      this.context = event.getServletContext();
+      Map<String,String> properties = getProperties(context);
+      this.deployAtompubService = isDeployAtompubService(properties);
+      this.deployChannelService = isDeployChannelService(properties);
+      ServiceManager<Provider> manager = 
+        createServiceManager(context);
+      
+      if (manager == null) {
+          log.error("Service Manager is null. Application can not function correctly");
+          throw new IllegalStateException("Service Manager is null");
+      }
+      
+      if (deployAtompubService) {
+        log.debug("Initializing Abdera Atompub Service...");
+        queue = manager.newProcessorQueue(properties);
+        exec = manager.newTaskExecutor(properties);
+        Provider provider = manager.newProvider(properties);
+        Processor processor = queue != null ? queue.getProcessor() : null;
+        
+        log.debug(String.format("Queue:           %s",queue));
+        log.debug(String.format("Processor:       %s",processor));
+        log.debug(String.format("Executor:        %s",exec));
+        log.debug(String.format("Service Manager: %s",manager));
+        log.debug(String.format("Provider:        %s",provider));
+        
+        if (processor != null)
+          context.setAttribute(Processor.NAME, processor);
+        else {
+          log.error("Queue Processor is null. Application can not function correctly");
+          throw new IllegalStateException("Queue Processor is null");
+        }
+        if (exec != null)
+          context.setAttribute(RUNNER, exec);
+        else {
+          log.error("Task Executor is null. Application can not function correctly");
+          throw new IllegalStateException("Task Executor is null");
+        }
+        if (provider != null)
+          context.setAttribute(PROVIDER, provider);
+        else {
+          log.error("Provider is null. Application can not function correctly");
+          throw new IllegalStateException("Provider is null");
+        }
+        if (queue != null)
+          context.setAttribute(QUEUE, queue);
+        else {
+          log.error("Queue is null. Application can not function correctly");
+          throw new IllegalStateException("Queue is null");
+        }
+        context.setAttribute(SERVICEMANAGER, manager);
+  
+        int ct = worker_threads(properties);
+        log.debug(String.format("Launching watcher threads [%d]",ct));
+        
+        for (int c = 0; c < ct; c++)
+          exec.startWorker(this);
+        
+        log.debug("Abdera Atompub Service is ready...");
+      }
+      
+      if (deployChannelService) {
+        log.debug("Initializing Abdera Channel Service");
+        cm = manager.newChannelManager(properties);
+        log.debug(String.format("Channel Manager: %s", cm));
+        if (cm != null) {
+          context.setAttribute(CM, cm);
+          log.debug("Abdera Channel Service is ready...");
+        } else log.debug("Abdera Channel Service could not be started");
+      }
+    }
+    
+    public void contextDestroyed(ServletContextEvent event) {
+      ServletContext context = event.getServletContext();
+      if (deployAtompubService) {
+        log.debug("Shutting down the Abdera Service...");
+        if (exec != null)
+          exec.shutdown();
+        // if there are remaining outstanding requests after 
+        // shutdown we need to deal with them
+        if (queue != null)
+          queue.cancelRemaining();
+        
+        context.removeAttribute(Processor.NAME);
+        context.removeAttribute(RUNNER);
+        context.removeAttribute(SERVICEMANAGER);
+        context.removeAttribute(PROVIDER);
+        context.removeAttribute(QUEUE);
+      }
+      if (deployChannelService) {
+        if (cm != null)
+          cm.shutdown();
+        context.removeAttribute(CM);
+      }
+    }
+
+    public void run() {
+      TaskExecutor exec = 
+        (TaskExecutor) context.getAttribute(RUNNER);
+      ProcessorQueue processor =
+        (ProcessorQueue) context.getAttribute(QUEUE);
+      while(exec.isRunning()) {
+        if (processor.hasNext()) {
+          final AbderaTask task = processor.next();
+          log.debug(String.format("Processing New AbderaTask (%s)...",task.getId())); 
+          exec.execute(new Runnable() {
+            public void run() {
+              try {
+                task.invoke();
+              } catch (Throwable t) {
+                log.error(String.format("Error invoking AbderaTask (%s)",task.getId()),t);
+              }
+              log.debug(String.format("AbderaTask (%s) is complete",task.getId()));
+            }
+          });
+        }
+      }
+    }
+	
+}

Propchange: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/AbderaAsyncService.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/AbderaChannelServlet.java
URL: http://svn.apache.org/viewvc/abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/AbderaChannelServlet.java?rev=1173209&view=auto
==============================================================================
--- abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/AbderaChannelServlet.java (added)
+++ abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/AbderaChannelServlet.java Tue Sep 20 15:56:46 2011
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.abdera2.common.protocol.servlet.async;
+
+import java.io.IOException;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.AsyncEvent;
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.annotation.WebServlet;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.abdera2.common.pusher.ChannelManager;
+import org.apache.abdera2.common.pusher.Listener;
+import org.apache.abdera2.common.pusher.Receiver;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+@WebServlet(asyncSupported=true)
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public abstract class AbderaChannelServlet extends HttpServlet {
+
+  private final static Log log = LogFactory.getLog(AbderaChannelServlet.class);
+  
+  private static final long serialVersionUID = 3751815744618869423L;
+
+  protected abstract String getChannel(AsyncContext context);
+  
+  protected abstract AsyncListener<?> createListener(AsyncContext context);
+  
+  protected abstract long getTimeout(ServletConfig config, ServletContext context);
+  
+  protected void doGet(
+      final HttpServletRequest request, 
+      final HttpServletResponse response) 
+        throws ServletException, IOException {
+    final ServletContext sc = getServletContext();
+    final ChannelManager cm = (ChannelManager) sc.getAttribute(AbderaAsyncService.CM);
+    if (cm == null || !cm.isShutdown()) {
+      final AsyncContext context = request.startAsync(request, response);
+      context.setTimeout(getTimeout(getServletConfig(),sc));
+      context.start(
+        new Runnable() {
+          public void run() {
+            String channel = getChannel(context);
+            log.debug(String.format("Selected Channel Name: %s",channel));
+            if (channel != null) {
+              final Receiver receiver = cm.getReceiver(channel);
+              log.debug(String.format("Selected Receiver: %s",receiver));
+              if (receiver != null) {
+                final Listener listener = createListener(context);
+                context.addListener(
+                  new javax.servlet.AsyncListener() {
+                    public void onComplete(AsyncEvent event) throws IOException {
+                      System.out.println("on complete");
+                      try {
+                        receiver.stopListening(listener);
+                      } catch (Throwable t) {}
+                    }
+                    public void onError(AsyncEvent event) throws IOException {
+                      System.out.println("on error");
+                      event.getThrowable().printStackTrace();
+                      try {
+                        receiver.stopListening(listener);
+                      } catch (Throwable t) {}
+                    }
+                    public void onStartAsync(AsyncEvent event)
+                        throws IOException {
+                    }
+                    public void onTimeout(AsyncEvent event) throws IOException {
+                      System.out.println("on timeout");
+                      try {
+                        receiver.stopListening(listener);
+                      } catch (Throwable t) {}
+                    }
+                  }
+                );
+                log.debug(String.format("Listener: %s",listener));
+                if (listener != null) {
+                  request.setAttribute("AbderaChannel", channel);
+                  request.setAttribute("AbderaReceiver", receiver);
+                  request.setAttribute("AbderaListener", listener);
+                  receiver.startListening(listener);      
+                }
+              } 
+            }
+          }
+        }
+      );
+    } else {
+      response.sendError(
+        HttpServletResponse.SC_SERVICE_UNAVAILABLE, 
+        "Abdera Service in unavailable");
+    }
+    
+  }
+  
+  public abstract static class AsyncListener<T> implements Listener<T> {
+
+    private final AsyncContext context;
+    private boolean done = false;
+    
+    protected AsyncListener(AsyncContext context) {
+      this.context = context;
+    }
+    
+    protected HttpServletRequest getRequest() {
+      return (HttpServletRequest) context.getRequest();
+    }
+    
+    protected HttpServletResponse getResponse() {
+      return (HttpServletResponse) context.getResponse();
+    }
+    
+    protected boolean isDone() {
+      return done;
+    }
+    
+    public void afterItems() {
+      if (!done) {
+      try {
+        finish();
+        getResponse().flushBuffer();
+      } catch (Throwable t) {
+        // whoops, must have lost the connection before the request completed.
+      }
+      context.complete();
+      done = true;
+      }
+    }
+    
+    protected void finish() {
+      // by default do nothing
+    }
+    
+    public void beforeItems() {
+      // by default do nothing
+    }
+  }
+}

Propchange: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/AbderaChannelServlet.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/AbderaTask.java
URL: http://svn.apache.org/viewvc/abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/AbderaTask.java?rev=1173209&view=auto
==============================================================================
--- abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/AbderaTask.java (added)
+++ abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/AbderaTask.java Tue Sep 20 15:56:46 2011
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.abdera2.common.protocol.servlet.async;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.UUID;
+
+import javax.activation.MimeType;
+import javax.servlet.AsyncContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.abdera2.common.http.CacheControl;
+import org.apache.abdera2.common.protocol.RequestContext;
+import org.apache.abdera2.common.protocol.ResponseContext;
+import org.apache.abdera2.common.protocol.FilterChain;
+import org.apache.abdera2.common.protocol.Provider;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class AbderaTask {
+
+  private final static Log log = LogFactory.getLog(AbderaTask.class);
+  
+  private final String id;
+  private final AsyncContext context;
+  private final RequestContext requestContext;
+  private final Provider provider;
+  
+  AbderaTask(AsyncContext context, Provider provider, RequestContext requestContext) {
+    this.context = context;
+    this.provider = provider;
+    this.requestContext = requestContext;
+    this.id = UUID.randomUUID().toString();
+  }
+    
+  public String getId() {
+    return id;
+  }
+  
+  public void cancel() {
+    cancel(context,getId());
+  }
+  
+  public static void cancel(AsyncContext context, String id) {
+    try {
+      HttpServletResponse resp = 
+        (HttpServletResponse) context.getResponse();
+      if (!resp.isCommitted())
+        resp.reset();
+      resp.sendError(
+        HttpServletResponse.SC_SERVICE_UNAVAILABLE, 
+        "Server is shutting down. Unable to process request");
+      resp.flushBuffer();
+    } catch (Throwable t) {
+      log.error(String.format("Unrecoverable error canceling Abdera Task (%s)",id), t);
+    } finally {
+      context.complete();
+    }
+  }
+  
+  public void invoke() {
+    try {
+      log.debug(String.format("Invoking Abdera Task (%s)",getId()));
+      HttpServletRequest req = 
+        (HttpServletRequest) context.getRequest();
+      HttpServletResponse resp = 
+        (HttpServletResponse) context.getResponse();
+      process(provider,req,resp);
+      resp.flushBuffer();
+    } catch (Throwable t) {
+      log.error(String.format("Unrecoverable error processing Abdera Task (%s)",getId()), t);
+    } finally {
+      context.complete();
+    }
+  }
+  
+  protected void process(
+    Provider provider,
+    HttpServletRequest request,
+    HttpServletResponse response) {
+    FilterChain chain = 
+      new FilterChain(provider, requestContext);
+    try {
+        log.debug(String.format("Using RequestContext: %s",requestContext.getClass().getName()));
+        output(request, response, chain.next(requestContext), provider);
+    } catch (Throwable t) {
+        error("Error servicing request", t, response, provider);
+        return;
+    }
+  }
+    
+    protected void output(HttpServletRequest request, HttpServletResponse response, ResponseContext context, Provider provider)
+        throws IOException {
+        log.debug(String.format("Received ResponseContext: %s", context));
+        if (context != null) {
+          log.debug(String.format("Status: %d",context.getStatus()));
+          response.setStatus(context.getStatus());
+            long cl = context.getContentLength();
+            CacheControl cc = context.getCacheControl();
+            if (cl > -1)
+                response.setHeader("Content-Length", Long.toString(cl));
+            if (cc != null)
+                response.setHeader("Cache-Control", cc.toString());
+            try {
+                MimeType ct = context.getContentType();
+                if (ct != null) {
+                  log.debug(String.format("Content-Type: %s",ct.toString()));
+                    response.setContentType(ct.toString());
+                }
+            } catch (Exception e) {
+            }
+            Iterable<String> names = context.getHeaderNames();
+            for (String name : names) {
+                Iterable<Object> headers = context.getHeaders(name);
+                for (Object value : headers) {
+                  log.debug(String.format("Header [%s]: %s", name, value.toString()));
+                    if (value instanceof Date)
+                        response.addDateHeader(name, ((Date)value).getTime());
+                    else
+                        response.addHeader(name, value.toString());
+                }
+            }
+            if (!request.getMethod().equals("HEAD") && context.hasEntity()) {
+                log.debug("Writing entity...");
+                context.writeTo(response.getOutputStream());
+            } else {
+              log.debug("No entity to write...");
+            }
+        } else {
+            error("Internal Server Error", null, response, provider);
+        }
+    }
+
+    protected void error(String message, Throwable t, HttpServletResponse response, Provider provider) {
+      try {
+        message = 
+          String.format(
+            "Error in Abdera Task (%s): %s", 
+            getId(), message);
+        if (t != null)
+            log.error(message, t);
+        else
+            log.error(message);
+
+        if (response.isCommitted()) {
+            log.error("Could not write an error message as the headers & HTTP status were already committed!");
+        } else {
+          ResponseContext resp = 
+            provider.createErrorResponse(500, message, t);
+          response.setStatus(500);
+          response.setCharacterEncoding("UTF-8");
+          resp.writeTo(response.getOutputStream());
+        }
+      } catch (IOException e) {
+        log.error(String.format("Error writing to output stream (%s)",getId()),e);
+      }
+    }
+}

Propchange: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/AbderaTask.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/AsyncAbderaServlet.java
URL: http://svn.apache.org/viewvc/abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/AsyncAbderaServlet.java?rev=1173209&view=auto
==============================================================================
--- abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/AsyncAbderaServlet.java (added)
+++ abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/AsyncAbderaServlet.java Tue Sep 20 15:56:46 2011
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.abdera2.common.protocol.servlet.async;
+
+import java.io.IOException;
+import javax.servlet.AsyncContext;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.annotation.WebServlet;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.abdera2.common.protocol.Provider;
+import org.apache.abdera2.common.protocol.servlet.ServletRequestContext;
+
+@WebServlet(asyncSupported=true)
+public class AsyncAbderaServlet 
+  extends HttpServlet {
+
+    private static final long serialVersionUID = 2086707888078611321L;
+    @Override
+    protected void service(
+        final HttpServletRequest request, 
+        final HttpServletResponse response) 
+          throws ServletException, IOException {
+      ServletContext sc = getServletContext();
+      Processor proc = (Processor) sc.getAttribute(Processor.NAME);
+      if (proc == null || !proc.isShutdown()) {
+        final AsyncContext context = request.startAsync(request, response);
+        Provider provider = (Provider) sc.getAttribute(AbderaAsyncService.PROVIDER);
+        ServletRequestContext reqcontext = new ServletRequestContext(provider, request, sc);        
+        proc.submit(context,provider,reqcontext);
+      } else {
+        response.sendError(
+          HttpServletResponse.SC_SERVICE_UNAVAILABLE, 
+          "Abdera Service in unavailable");
+      }
+    }
+}

Propchange: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/AsyncAbderaServlet.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/DefaultProcessor.java
URL: http://svn.apache.org/viewvc/abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/DefaultProcessor.java?rev=1173209&view=auto
==============================================================================
--- abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/DefaultProcessor.java (added)
+++ abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/DefaultProcessor.java Tue Sep 20 15:56:46 2011
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.abdera2.common.protocol.servlet.async;
+
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import javax.servlet.AsyncContext;
+
+import org.apache.abdera2.common.protocol.Provider;
+import org.apache.abdera2.common.protocol.RequestContext;
+
+public class DefaultProcessor 
+  implements Processor, ProcessorQueue {
+
+  private final Queue<AbderaTask> queue = 
+    new ConcurrentLinkedQueue<AbderaTask>();
+  boolean rejectNew = false;
+  
+  public void submit(AsyncContext context, Provider provider, RequestContext requestContext) {
+    if (!rejectNew) {
+      queue.offer(new AbderaTask(context,provider,requestContext));
+    } else {
+      AbderaTask.cancel(context, "NEW");
+    }
+  }
+
+  public AbderaTask next() {
+    return queue.poll();
+  }
+  
+  public boolean hasNext() {
+    return !queue.isEmpty();
+  }
+
+  public Processor getProcessor() {
+    return this;
+  }
+  
+  public boolean isShutdown() {
+    return rejectNew;
+  }
+
+  public void cancelRemaining() {
+    rejectNew = true;
+    while(hasNext())
+      next().cancel();
+  }
+
+}

Propchange: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/DefaultProcessor.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/DefaultTaskExecutor.java
URL: http://svn.apache.org/viewvc/abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/DefaultTaskExecutor.java?rev=1173209&view=auto
==============================================================================
--- abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/DefaultTaskExecutor.java (added)
+++ abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/DefaultTaskExecutor.java Tue Sep 20 15:56:46 2011
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.abdera2.common.protocol.servlet.async;
+
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class DefaultTaskExecutor 
+  implements TaskExecutor {
+
+  private ThreadPoolExecutor exec;
+
+  public static final String TERMINATION_TIMEOUT = "AbderaDefaultTaskExecutorTerminationTimout";
+  public static final long DEFAULT_TERMINATION_TIMEOUT = 10;
+  
+  private long terminationTimeout = DEFAULT_TERMINATION_TIMEOUT;
+  
+  public DefaultTaskExecutor() {}
+  
+  public void execute(Runnable task) {
+    exec.execute(task);
+  }
+
+  public void init(Map<String, String> properties) {
+    exec = (ThreadPoolExecutor) Executors.newCachedThreadPool();
+    if (properties.containsKey(TERMINATION_TIMEOUT)) {
+      String val = properties.get(TERMINATION_TIMEOUT);
+      terminationTimeout = Math.max(1,Long.parseLong(val));
+    }
+  }
+
+  public void startWorker(Runnable worker) {
+    exec.execute(worker);
+  }
+
+  public void shutdown() {
+    exec.shutdown();
+    try {
+      exec.awaitTermination(
+        terminationTimeout, TimeUnit.SECONDS);
+    } catch (Throwable t) {}
+  }
+
+  public boolean isRunning() {
+    return !exec.isShutdown() && 
+           !exec.isTerminated() && 
+           !exec.isTerminating(); 
+  }
+
+}

Propchange: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/DefaultTaskExecutor.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/Processor.java
URL: http://svn.apache.org/viewvc/abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/Processor.java?rev=1173209&view=auto
==============================================================================
--- abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/Processor.java (added)
+++ abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/Processor.java Tue Sep 20 15:56:46 2011
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.abdera2.common.protocol.servlet.async;
+
+import javax.servlet.AsyncContext;
+
+import org.apache.abdera2.common.protocol.Provider;
+import org.apache.abdera2.common.protocol.RequestContext;
+
+public interface Processor {
+
+  public static final String NAME = "AbderaProcessor";
+  
+  void submit(
+    AsyncContext context, 
+    Provider provider,
+    RequestContext requestContext);
+  
+  boolean isShutdown();
+}

Propchange: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/Processor.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/ProcessorQueue.java
URL: http://svn.apache.org/viewvc/abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/ProcessorQueue.java?rev=1173209&view=auto
==============================================================================
--- abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/ProcessorQueue.java (added)
+++ abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/ProcessorQueue.java Tue Sep 20 15:56:46 2011
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.abdera2.common.protocol.servlet.async;
+
+public interface ProcessorQueue {
+
+  boolean hasNext();
+  
+  AbderaTask next();
+  
+  Processor getProcessor();
+  
+  void cancelRemaining();
+  
+}

Propchange: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/ProcessorQueue.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/TaskExecutor.java
URL: http://svn.apache.org/viewvc/abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/TaskExecutor.java?rev=1173209&view=auto
==============================================================================
--- abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/TaskExecutor.java (added)
+++ abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/TaskExecutor.java Tue Sep 20 15:56:46 2011
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.abdera2.common.protocol.servlet.async;
+
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+public interface TaskExecutor extends Executor {
+
+  void init(Map<String,String> properties);
+  
+  void startWorker(Runnable runnable);
+  
+  void shutdown();
+  
+  boolean isRunning();
+  
+}

Propchange: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/protocol/servlet/async/TaskExecutor.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/AbstractPusher.java
URL: http://svn.apache.org/viewvc/abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/AbstractPusher.java?rev=1173209&view=auto
==============================================================================
--- abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/AbstractPusher.java (added)
+++ abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/AbstractPusher.java Tue Sep 20 15:56:46 2011
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.abdera2.common.pusher;
+
+public abstract class AbstractPusher<T> 
+  implements Pusher<T> {
+
+  public void pushAll(Iterable<T> t) {
+    for (T i : t)
+      push(i);
+  }
+
+}

Propchange: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/AbstractPusher.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/ChannelManager.java
URL: http://svn.apache.org/viewvc/abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/ChannelManager.java?rev=1173209&view=auto
==============================================================================
--- abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/ChannelManager.java (added)
+++ abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/ChannelManager.java Tue Sep 20 15:56:46 2011
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.abdera2.common.pusher;
+
+import java.util.Map;
+
+/**
+ * A "Channel" is a logical pairing of related Pusher and Receiver interfaces.
+ * The ChannelManager interface allows applications to access the Pusher's
+ * and Receiver's for specific named channels. This is designed to be as 
+ * generic as possible, allowing channels to be implemented in a broad variety
+ * of ways.
+ */
+public interface ChannelManager extends Iterable<String> {
+
+  <T>Pusher<T> getPusher(String channel);
+  
+  <T>Receiver<T> getReceiver(String channel);
+  
+  void init(Map<String,String> properties);
+  
+  void shutdown();
+  
+  boolean isShutdown();
+  
+}

Propchange: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/ChannelManager.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/Listener.java
URL: http://svn.apache.org/viewvc/abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/Listener.java?rev=1173209&view=auto
==============================================================================
--- abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/Listener.java (added)
+++ abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/Listener.java Tue Sep 20 15:56:46 2011
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.abdera2.common.pusher;
+
+public interface Listener<T> {
+
+  /**
+   * Called when the Listener is added to a Receiver before any items are
+   * passed to the onItem method
+   */
+  void beforeItems();
+  
+  void onItem(T t);
+  
+  /**
+   * Called by the Receiver when this listener is done receiving items
+   */
+  void afterItems();
+  
+}

Propchange: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/Listener.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/MapChannelManager.java
URL: http://svn.apache.org/viewvc/abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/MapChannelManager.java?rev=1173209&view=auto
==============================================================================
--- abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/MapChannelManager.java (added)
+++ abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/MapChannelManager.java Tue Sep 20 15:56:46 2011
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.abdera2.common.pusher;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * ChannelManager implementation based on an internal HashMap of Pusher/Receiver pairs
+ */
+@SuppressWarnings("unchecked")
+public abstract class MapChannelManager 
+  implements ChannelManager {
+
+  public static abstract class Channel {
+    private final Pusher<?> pusher;
+    private final Receiver<?> receiver;
+    public Channel(Pusher<?> pusher, Receiver<?> receiver) {
+      this.pusher = pusher;
+      this.receiver = receiver;
+    }
+    abstract protected void shutdown();
+  }
+  
+  private final Map<String,Channel> map = 
+    new ConcurrentHashMap<String,Channel>();
+  
+  private boolean shuttingDown = false;
+  
+  public Iterator<String> iterator() {
+    return map.keySet().iterator();
+  }
+
+  protected Channel get(String channel) {
+    return get(channel,false);
+  }
+  
+  private synchronized Channel get(String channel, boolean skip) {
+    if (!skip && shuttingDown)
+      throw new IllegalStateException(
+        "Channel Manager is shutting down");
+    Channel c = map.get(channel);
+    if (c == null) {
+      c = createChannel();
+      map.put(channel,c);
+    }
+    return c;
+  }
+  
+  public <T> Pusher<T> getPusher(String channel) {
+    return (Pusher<T>)get(channel).pusher;
+  }
+
+  public <T> Receiver<T> getReceiver(String channel) {
+    return (Receiver<T>)get(channel).receiver;
+  }
+
+  protected abstract Channel createChannel();
+  
+  protected void clearChannels() {
+    map.clear();
+  }
+  
+  public void shutdownChannel(String channel) {
+    if (map.containsKey(channel)) {
+      Channel c = map.get(channel);
+      c.shutdown();
+      map.remove(channel);
+    }
+  }
+  
+  public void shutdown() {
+    shuttingDown = true;
+    for (String name : this) {
+      Channel c = get(name,true);
+      if (c != null) 
+        c.shutdown();
+    }
+    clearChannels();
+  }
+  
+  public boolean isShutdown() {
+    return shuttingDown;
+  }
+  
+  public void init(Map<String,String> properties) {}
+}

Propchange: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/MapChannelManager.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/Pusher.java
URL: http://svn.apache.org/viewvc/abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/Pusher.java?rev=1173209&view=auto
==============================================================================
--- abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/Pusher.java (added)
+++ abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/Pusher.java Tue Sep 20 15:56:46 2011
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.abdera2.common.pusher;
+
+/**
+ * Used to push items into a channel. This will cause all registered
+ * Listeners to receive notification of the item
+ */
+public interface Pusher<T> {
+
+  void push(T t);
+
+  void pushAll(Iterable<T> t);
+  
+}

Propchange: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/Pusher.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/Receiver.java
URL: http://svn.apache.org/viewvc/abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/Receiver.java?rev=1173209&view=auto
==============================================================================
--- abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/Receiver.java (added)
+++ abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/Receiver.java Tue Sep 20 15:56:46 2011
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.abdera2.common.pusher;
+
+/**
+ * The Receiver interface provides the mechanisms for adding and removing
+ * listeners to a channel. 
+ */
+public interface Receiver<T> {
+
+  void startListening(Listener<T> listener);
+  
+  void stopListening(Listener<T> listener);
+  
+}

Propchange: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/Receiver.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/SelectorListener.java
URL: http://svn.apache.org/viewvc/abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/SelectorListener.java?rev=1173209&view=auto
==============================================================================
--- abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/SelectorListener.java (added)
+++ abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/SelectorListener.java Tue Sep 20 15:56:46 2011
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.abdera2.common.pusher;
+import org.apache.abdera2.common.selector.Selector;
+
+public class SelectorListener<T> 
+  implements Listener<T> {
+
+  private final Selector selector;
+  private final Listener<T> listener;
+
+  public SelectorListener(
+    Selector selector,
+    Listener<T> listener) {
+    this.selector = selector;
+    this.listener = listener;
+  }
+  
+  public void onItem(T t) {
+    if (selector.select(t))
+      listener.onItem(t);
+  }
+
+  public void afterItems() {
+    listener.afterItems();
+  }
+
+  public void beforeItems() {
+    listener.beforeItems();
+  }
+  
+}

Propchange: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/SelectorListener.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/SelectorPusher.java
URL: http://svn.apache.org/viewvc/abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/SelectorPusher.java?rev=1173209&view=auto
==============================================================================
--- abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/SelectorPusher.java (added)
+++ abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/SelectorPusher.java Tue Sep 20 15:56:46 2011
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.abdera2.common.pusher;
+
+import org.apache.abdera2.common.selector.Selector;
+
+public class SelectorPusher<T> 
+  extends AbstractPusher<T> {
+
+  private final Pusher<T> pusher;
+  private final Selector selector;
+  
+  public SelectorPusher(
+      Selector selector, 
+      Pusher<T> pusher) {
+    this.selector = selector;
+    this.pusher = pusher;
+  }
+  
+  public void push(T t) {
+    if (selector.select(t))
+      pusher.push(t);
+  }
+
+}

Propchange: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/SelectorPusher.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/SimpleChannelManager.java
URL: http://svn.apache.org/viewvc/abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/SimpleChannelManager.java?rev=1173209&view=auto
==============================================================================
--- abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/SimpleChannelManager.java (added)
+++ abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/SimpleChannelManager.java Tue Sep 20 15:56:46 2011
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.abdera2.common.pusher;
+
+public class SimpleChannelManager 
+  extends MapChannelManager {
+
+  @SuppressWarnings("rawtypes")
+  protected Channel createChannel() {
+    final SimplePusher<?> sp = new SimplePusher();
+    return new Channel(sp,sp) {
+      protected void shutdown() {
+        sp.shutdown();
+      }
+    };
+  }
+
+}

Propchange: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/SimpleChannelManager.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/SimplePusher.java
URL: http://svn.apache.org/viewvc/abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/SimplePusher.java?rev=1173209&view=auto
==============================================================================
--- abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/SimplePusher.java (added)
+++ abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/SimplePusher.java Tue Sep 20 15:56:46 2011
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.abdera2.common.pusher;
+
+
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class SimplePusher<T> 
+  extends AbstractPusher<T> implements Pusher<T>, Receiver<T> {
+
+  private final static Log log = LogFactory.getLog(SimplePusher.class);
+  
+  private final Queue<T> queue = new ConcurrentLinkedQueue<T>();
+  private final ThreadPoolExecutor exec = 
+    (ThreadPoolExecutor) Executors.newCachedThreadPool();
+  private final Set<Listener<T>> listeners = 
+    new HashSet<Listener<T>>();
+  
+  public void startListening(Listener<T> listener) {
+    listener.beforeItems();
+    listeners.add(listener);
+  }
+  
+  public void stopListening(Listener<T> listener) {
+    listener.afterItems();
+    listeners.remove(listener);
+  }
+  
+  public void push(T entry) {
+    queue.add(entry);    
+  }
+ 
+  public boolean isRunning() {
+    return !exec.isShutdown() && 
+           !exec.isTerminated();
+  }
+  
+  public SimplePusher() {
+    exec.execute(
+      new Runnable() {
+        public void run() {
+          while(isRunning()) {
+            if (!queue.isEmpty()) {
+              final T el = queue.poll();
+              if (el != null) {
+                log.info("Processing item...");
+                try {
+                  exec.execute(new Runnable() {
+                    public void run() {
+                      for (final Listener<T> l : listeners) {
+                        try {
+                          exec.execute(new Runnable() {
+                            public void run() { 
+                              try {
+                                l.onItem(el);
+                              } catch (Throwable t) {
+                                log.error(t);
+                              }
+                            }
+                          });
+                        } catch (Throwable t) {
+                          log.error(t);
+                        }
+                      }
+                    }
+                  });
+                } catch (Throwable t) {
+                  log.error(t);
+                }
+              }
+            }
+          }
+        }
+      }
+    );
+  }
+  
+  public void shutdown() {
+    ThreadPoolExecutor exec = (ThreadPoolExecutor) this.exec;
+    exec.shutdown();
+    for (Listener<?> listener : listeners)
+      listener.afterItems();
+    listeners.clear(); // remove all the listeners
+  }
+}

Propchange: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/pusher/SimplePusher.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/selector/InvertedSelector.java
URL: http://svn.apache.org/viewvc/abdera/abdera2/common/src/main/java/org/apache/abdera2/common/selector/InvertedSelector.java?rev=1173209&view=auto
==============================================================================
--- abdera/abdera2/common/src/main/java/org/apache/abdera2/common/selector/InvertedSelector.java (added)
+++ abdera/abdera2/common/src/main/java/org/apache/abdera2/common/selector/InvertedSelector.java Tue Sep 20 15:56:46 2011
@@ -0,0 +1,19 @@
+package org.apache.abdera2.common.selector;
+
+/**
+ * Selector that inverts the results of the wrapped selector
+ */
+public class InvertedSelector 
+  implements Selector {
+
+  private final Selector selector;
+  
+  public InvertedSelector(Selector selector) {
+    this.selector = selector;
+  }
+  
+  public boolean select(Object item) {
+    return !selector.select(item);
+  }
+
+}

Propchange: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/selector/InvertedSelector.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/selector/MultiSelector.java
URL: http://svn.apache.org/viewvc/abdera/abdera2/common/src/main/java/org/apache/abdera2/common/selector/MultiSelector.java?rev=1173209&view=auto
==============================================================================
--- abdera/abdera2/common/src/main/java/org/apache/abdera2/common/selector/MultiSelector.java (added)
+++ abdera/abdera2/common/src/main/java/org/apache/abdera2/common/selector/MultiSelector.java Tue Sep 20 15:56:46 2011
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.abdera2.common.selector;
+
+/**
+ * Selector that is based on an internal array of Selectors that are
+ * invoked in order. By default, the selector will accept the item unless one 
+ * of the selectors rejects it. 
+ */
+public class MultiSelector 
+  implements Selector {
+
+  private static final long serialVersionUID = 5257601171344714824L;
+  private final Selector[] selectors;
+  
+  public MultiSelector(Selector... selectors) {
+    this.selectors = selectors;
+  }
+  
+  public boolean select(Object item) {
+    for (Selector selector : selectors)
+        if (!selector.select(item))
+            return false;
+    return true;
+  }
+
+}

Propchange: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/selector/MultiSelector.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/selector/Selector.java
URL: http://svn.apache.org/viewvc/abdera/abdera2/common/src/main/java/org/apache/abdera2/common/selector/Selector.java?rev=1173209&view=auto
==============================================================================
--- abdera/abdera2/common/src/main/java/org/apache/abdera2/common/selector/Selector.java (added)
+++ abdera/abdera2/common/src/main/java/org/apache/abdera2/common/selector/Selector.java Tue Sep 20 15:56:46 2011
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.abdera2.common.selector;
+
+/**
+ * Utility interface that is used as a Filter in several places 
+ * throughout the Abdera API.
+ */
+public interface Selector {
+
+    /** Returns true the item is to be selected **/
+    boolean select(Object item);
+
+}

Propchange: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/selector/Selector.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/templates/AbstractContext.java
URL: http://svn.apache.org/viewvc/abdera/abdera2/common/src/main/java/org/apache/abdera2/common/templates/AbstractContext.java?rev=1173209&view=auto
==============================================================================
--- abdera/abdera2/common/src/main/java/org/apache/abdera2/common/templates/AbstractContext.java (added)
+++ abdera/abdera2/common/src/main/java/org/apache/abdera2/common/templates/AbstractContext.java Tue Sep 20 15:56:46 2011
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+package org.apache.abdera2.common.templates;
+
+import org.apache.abdera2.common.templates.Context;
+
+/**
+ * Abstract base for custom Context implementations
+ */
+public abstract class AbstractContext implements Context {
+
+    private static final long serialVersionUID = -2511167897930514261L;
+
+    protected boolean iri = false;
+
+    public boolean isIri() {
+        return iri;
+    }
+
+    public void setIri(boolean isiri) {
+        this.iri = isiri;
+    }
+
+}

Propchange: abdera/abdera2/common/src/main/java/org/apache/abdera2/common/templates/AbstractContext.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message