ace-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From br...@apache.org
Subject svn commit: r1515342 - in /ace/trunk/org.apache.ace.agent: ./ src/org/apache/ace/agent/ src/org/apache/ace/agent/impl/
Date Mon, 19 Aug 2013 10:23:37 GMT
Author: bramk
Date: Mon Aug 19 10:23:36 2013
New Revision: 1515342

URL: http://svn.apache.org/r1515342
Log:
ACE-347 Initial feedback channel implementation

Added:
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventLoggerImpl.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java
Modified:
    ace/trunk/org.apache.ace.agent/bnd.bnd
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/FeedbackChannel.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/Activator.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/AgentContext.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/AgentControlImpl.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java

Modified: ace/trunk/org.apache.ace.agent/bnd.bnd
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/bnd.bnd?rev=1515342&r1=1515341&r2=1515342&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/bnd.bnd (original)
+++ ace/trunk/org.apache.ace.agent/bnd.bnd Mon Aug 19 10:23:36 2013
@@ -3,10 +3,12 @@ Bundle-Description: Implementation of th
 Bundle-Version: 1.0.0
 Bundle-Activator: org.apache.ace.agent.impl.Activator
 
-Private-Package: \
+Private-Package: org.apache.ace.range,\
+	org.apache.ace.log.util,\
+	org.apache.ace.log,\
 	org.apache.ace.agent.impl,\
 	org.apache.ace.agent.updater,\
-    org.apache.commons.codec,\
+	org.apache.commons.codec,\
 	org.apache.commons.codec.binary,\
 	org.apache.commons.codec.digest,\
 	org.apache.commons.codec.language,\
@@ -45,7 +47,9 @@ Export-Package: org.apache.ace.agent,\
 	org.easymock,\
 	org.apache.felix.dependencymanager,\
 	commons-codec;version=1.4.0,\
-	org.apache.felix.http.jetty;version=2.2.1
+	org.apache.felix.http.jetty;version=2.2.1,\
+	org.apache.ace.range.api;version=latest,\
+	org.apache.ace.log.api;version=latest
 
 -sources false
 -runfw: org.apache.felix.framework;version='[4.0.3,4.0.3]'

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/FeedbackChannel.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/FeedbackChannel.java?rev=1515342&r1=1515341&r2=1515342&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/FeedbackChannel.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/FeedbackChannel.java Mon Aug 19
10:23:36 2013
@@ -40,5 +40,5 @@ public interface FeedbackChannel {
      * @param type
      * @param properties
      */
-    void write(int type, Map<String, String> properties);
+    void write(int type, Map<String, String> properties) throws IOException;
 }

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/Activator.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/Activator.java?rev=1515342&r1=1515341&r2=1515342&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/Activator.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/Activator.java Mon Aug 19
10:23:36 2013
@@ -18,6 +18,8 @@
  */
 package org.apache.ace.agent.impl;
 
+import java.io.File;
+import java.util.Properties;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -33,6 +35,8 @@ import org.apache.felix.dm.Component;
 import org.apache.felix.dm.DependencyActivatorBase;
 import org.apache.felix.dm.DependencyManager;
 import org.osgi.framework.BundleContext;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
 
 // TODO Decouple from DM to save 170k in agent size. Or: just include what we use
 public class Activator extends DependencyActivatorBase implements AgentContext {
@@ -47,15 +51,19 @@ public class Activator extends Dependenc
     private volatile AgentControlImpl m_agentControl;
     private volatile AgentUpdateHandlerImpl m_agentUpdateHandler; // we use the implementation
type here on purpose
 
-    private volatile DefaultController m_controller;
+    private volatile EventLoggerImpl m_eventLogger;
+    private volatile DefaultController m_defaultController;
 
-    private DependencyManager m_manager;
-    private Component m_component;
+    private BundleContext m_bundleContext;
+    private DependencyManager m_dependencyManager;
+    private Component m_agentControlComponent;
+    private Component m_eventLoggerComponent;
 
     @Override
     public void init(BundleContext context, DependencyManager manager) throws Exception {
 
-        m_manager = manager;
+        m_bundleContext = context;
+        m_dependencyManager = manager;
 
         m_executorService = Executors.newScheduledThreadPool(1);
         m_configurationHandler = new ConfigurationHandlerImpl(this);
@@ -66,6 +74,7 @@ public class Activator extends Dependenc
 
         Component service = createComponent().setImplementation(this)
             .setCallbacks("initAgent", "startAgent", "stopAgent", "destroyAgent")
+            .setAutoConfig(BundleContext.class, false)
             .setAutoConfig(DependencyManager.class, false)
             .setAutoConfig(Component.class, false);
 
@@ -91,9 +100,11 @@ public class Activator extends Dependenc
         }
 
         if (!Boolean.parseBoolean(System.getProperty("agent.defaultcontroller.disabled")))
{
-            m_controller = new DefaultController(m_agentControl, m_executorService);
+            m_defaultController = new DefaultController(m_agentControl, m_executorService);
         }
-        
+
+        m_eventLogger = new EventLoggerImpl(m_agentControl, m_bundleContext);
+
         manager.add(service);
     }
 
@@ -103,12 +114,25 @@ public class Activator extends Dependenc
 
     void startAgent() throws Exception {
         System.out.println("Starting agent!");
-        m_component = createComponent()
+
+        m_agentControlComponent = createComponent()
             .setInterface(AgentControl.class.getName(), null)
             .setImplementation(m_agentControl);
-        m_manager.add(m_component);
-        if (m_controller != null) {
-            m_controller.start();
+        m_dependencyManager.add(m_agentControlComponent);
+
+        m_eventLoggerComponent = createComponent()
+            .setInterface(EventHandler.class.getName(), new Properties() {
+                {
+                    put(EventConstants.EVENT_TOPIC, EventLoggerImpl.TOPICS_INTEREST);
+                }
+            })
+            .setImplementation(m_eventLogger);
+        m_dependencyManager.add(m_eventLoggerComponent);
+        m_bundleContext.addBundleListener(m_eventLogger);
+        m_bundleContext.addFrameworkListener(m_eventLogger);
+
+        if (m_defaultController != null) {
+            m_defaultController.start();
         }
         // at this point we know the agent has started, so any updater bundle that
         // might still be running can be uninstalled
@@ -117,10 +141,15 @@ public class Activator extends Dependenc
 
     void stopAgent() throws Exception {
         System.out.println("Stopping agent");
-        if (m_controller != null) {
-            m_controller.stop();
+        if (m_defaultController != null) {
+            m_defaultController.stop();
         }
-        m_manager.remove(m_component);
+
+        m_bundleContext.removeFrameworkListener(m_eventLogger);
+        m_bundleContext.removeBundleListener(m_eventLogger);
+        m_dependencyManager.remove(m_eventLoggerComponent);
+
+        m_dependencyManager.remove(m_agentControlComponent);
     }
 
     @Override
@@ -157,9 +186,14 @@ public class Activator extends Dependenc
     public DownloadHandler getDownloadHandler() {
         return m_downloadHandler;
     }
-    
+
     @Override
     public AgentUpdateHandler getAgentUpdateHandler() {
         return m_agentUpdateHandler;
     }
+
+    @Override
+    public File getWorkDir() {
+        return m_bundleContext.getDataFile("");
+    }
 }

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/AgentContext.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/AgentContext.java?rev=1515342&r1=1515341&r2=1515342&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/AgentContext.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/AgentContext.java Mon Aug
19 10:23:36 2013
@@ -18,6 +18,7 @@
  */
 package org.apache.ace.agent.impl;
 
+import java.io.File;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.ace.agent.AgentUpdateHandler;
@@ -31,18 +32,20 @@ import org.apache.ace.agent.Identificati
 public interface AgentContext {
 
     IdentificationHandler getIdentificationHandler();
-    
+
     DiscoveryHandler getDiscoveryHandler();
 
     ConnectionHandler getConnectionHandler();
-    
+
     DeploymentHandler getDeploymentHandler();
-    
+
     DownloadHandler getDownloadHandler();
-    
+
     ScheduledExecutorService getExecutorService();
-    
+
     ConfigurationHandler getConfigurationHandler();
 
     AgentUpdateHandler getAgentUpdateHandler();
+
+    File getWorkDir();
 }

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/AgentControlImpl.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/AgentControlImpl.java?rev=1515342&r1=1515341&r2=1515342&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/AgentControlImpl.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/AgentControlImpl.java Mon
Aug 19 10:23:36 2013
@@ -18,7 +18,11 @@
  */
 package org.apache.ace.agent.impl;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.ace.agent.AgentControl;
 import org.apache.ace.agent.AgentUpdateHandler;
@@ -31,8 +35,12 @@ public class AgentControlImpl implements
 
     private final AgentContext m_agentContext;
 
-    public AgentControlImpl(AgentContext agentContext) {
+    private final Map<String, FeedbackChannelImpl> m_feedbackChannels = new HashMap<String,
FeedbackChannelImpl>();
+
+    public AgentControlImpl(AgentContext agentContext) throws IOException {
         m_agentContext = agentContext;
+        // TODO get from configuration
+        m_feedbackChannels.put("auditlog", new FeedbackChannelImpl(m_agentContext, "auditlog"));
     }
 
     @Override
@@ -52,16 +60,17 @@ public class AgentControlImpl implements
 
     @Override
     public List<String> getFeedbackChannelNames() {
-        // TODO Auto-generated method stub
-        return null;
+        // TODO get from configuration
+        List<String> channels = new ArrayList<String>();
+        channels.addAll(m_feedbackChannels.keySet());
+        return channels;
     }
 
     @Override
     public FeedbackChannel getFeedbackChannel(String name) {
-        // TODO Auto-generated method stub
-        return null;
+        return m_feedbackChannels.get(name);
     }
-    
+
     @Override
     public AgentUpdateHandler getAgentUpdateHandler() {
         return m_agentContext.getAgentUpdateHandler();

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java?rev=1515342&r1=1515341&r2=1515342&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/DefaultController.java Mon
Aug 19 10:23:36 2013
@@ -20,6 +20,7 @@ package org.apache.ace.agent.impl;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.List;
 import java.util.SortedSet;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -29,6 +30,7 @@ import org.apache.ace.agent.AgentControl
 import org.apache.ace.agent.AgentUpdateHandler;
 import org.apache.ace.agent.ConfigurationHandler;
 import org.apache.ace.agent.DeploymentHandler;
+import org.apache.ace.agent.FeedbackChannel;
 import org.apache.ace.agent.RetryAfterException;
 import org.osgi.framework.Version;
 
@@ -60,7 +62,8 @@ public class DefaultController implement
         long syncInterval = getSyncInterval();
         try {
             runSafeAgent();
-            //runSafe();
+            // runSafeUpdate();
+            runSafeFeedback();
         }
         catch (RetryAfterException e) {
             syncInterval = e.getSeconds();
@@ -76,7 +79,7 @@ public class DefaultController implement
         reSchedule(syncInterval);
     }
 
-    public void runSafe() throws RetryAfterException, IOException {
+    private void runSafeUpdate() throws RetryAfterException, IOException {
 
         DeploymentHandler deploymentHandler = getDeploymentHandler();
 
@@ -97,7 +100,17 @@ public class DefaultController implement
             }
         }
     }
-    public void runSafeAgent() throws RetryAfterException, IOException {
+
+    private void runSafeFeedback() throws RetryAfterException, IOException {
+        List<String> channelNames = m_agentControl.getFeedbackChannelNames();
+        for (String channelName : channelNames) {
+            FeedbackChannel channel = m_agentControl.getFeedbackChannel(channelName);
+            if (channel != null)
+                channel.sendFeedback();
+        }
+    }
+
+    private void runSafeAgent() throws RetryAfterException, IOException {
 
         AgentUpdateHandler deploymentHandler = getAgentUpdateHandler();
 

Added: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventLoggerImpl.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventLoggerImpl.java?rev=1515342&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventLoggerImpl.java (added)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/EventLoggerImpl.java Mon
Aug 19 10:23:36 2013
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.agent.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ace.agent.AgentControl;
+import org.apache.ace.agent.FeedbackChannel;
+import org.apache.ace.log.AuditEvent;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.BundleEvent;
+import org.osgi.framework.BundleListener;
+import org.osgi.framework.Constants;
+import org.osgi.framework.FrameworkEvent;
+import org.osgi.framework.FrameworkListener;
+import org.osgi.framework.ServiceReference;
+import org.osgi.framework.Version;
+import org.osgi.service.deploymentadmin.DeploymentAdmin;
+import org.osgi.service.deploymentadmin.DeploymentPackage;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
+
+/**
+ * Service component that listens for
+ * 
+ */
+// TODO quick copy & paste & simplify from org.apache.ace.log.listener.*
+// TODO Which event types to log must be configurable
+// TODO split into separate listeners
+public class EventLoggerImpl implements BundleListener, FrameworkListener, EventHandler {
+
+    /*
+     * FIXME This is a simplified quick copy and paste of org.apache.ace.log.listener.* without
caching and async. I
+     * think that is OK. However we need to revisit all logging/monitoring and this logic
should probably be made
+     * configurable split up is separate components.
+     * 
+     * @see EvenLoggerFactory as well
+     */
+
+    public static final String EVENTLOGGER_FEEDBACKCHANNEL = "auditlog";
+
+    public static final String[] TOPICS_INTEREST = new String[] { "org/osgi/service/deployment/*",
"org/apache/ace/deployment/*" };
+
+    public static final String TOPIC_INSTALL = "org/osgi/service/deployment/INSTALL";
+    public static final String TOPIC_UNINSTALL = "org/osgi/service/deployment/UNINSTALL";
+    public static final String TOPIC_COMPLETE = "org/osgi/service/deployment/COMPLETE";
+    public static final String TOPIC_DEPLOYMENTPACKAGE_INSTALL = "org/apache/ace/deployment/INSTALL";
+
+    private final BundleContext m_bundleContext;
+    private final AgentControl m_agentControl;
+
+    public EventLoggerImpl(AgentControl agentControl, BundleContext bundleContext) {
+        m_agentControl = agentControl;
+        m_bundleContext = bundleContext;
+    }
+
+    @Override
+    public void handleEvent(Event event) {
+        int eventType = AuditEvent.DEPLOYMENTADMIN_BASE;
+        Map<String, String> props = new HashMap<String, String>();
+
+        String topic = event.getTopic();
+
+        if (topic.equals(TOPIC_DEPLOYMENTPACKAGE_INSTALL)) {
+            String url = (String) event.getProperty("deploymentpackage.url");
+            String version = (String) event.getProperty("deploymentpackage.version");
+            eventType = AuditEvent.DEPLOYMENTCONTROL_INSTALL;
+            props.put(AuditEvent.KEY_VERSION, version);
+            props.put(AuditEvent.KEY_NAME, url);
+        }
+        else if (topic.equals(TOPIC_INSTALL)) {
+            String deplPackName = (String) event.getProperty("deploymentpackage.name");
+            eventType = AuditEvent.DEPLOYMENTADMIN_INSTALL;
+            props.put(AuditEvent.KEY_NAME, deplPackName);
+        }
+
+        else if (topic.equals(TOPIC_UNINSTALL)) {
+            String deplPackName = (String) event.getProperty("deploymentpackage.name");
+            eventType = AuditEvent.DEPLOYMENTADMIN_UNINSTALL;
+            props.put(AuditEvent.KEY_NAME, deplPackName);
+        }
+        else if (topic.equals(TOPIC_COMPLETE)) {
+            String deplPackName = (String) event.getProperty("deploymentpackage.name");
+            // to retrieve the version, DeploymentAdmin has to be used
+            ServiceReference ref = m_bundleContext.getServiceReference(DeploymentAdmin.class.getName());
+            if (ref != null) {
+                DeploymentAdmin deplAdmin = (DeploymentAdmin) m_bundleContext.getService(ref);
+                if (deplAdmin != null) {
+                    DeploymentPackage dp = deplAdmin.getDeploymentPackage(deplPackName);
+                    if (dp != null) {
+                        Version version = dp.getVersion();
+                        if (version != null) {
+                            props.put(AuditEvent.KEY_VERSION, version.toString());
+                        }
+                    }
+                    // after use, release the service as is it not needed anymore
+                    m_bundleContext.ungetService(ref);
+                }
+            }
+            eventType = AuditEvent.DEPLOYMENTADMIN_COMPLETE;
+            props.put(AuditEvent.KEY_NAME, deplPackName);
+            Boolean success = (Boolean) event.getProperty("successful");
+            props.put(AuditEvent.KEY_SUCCESS, success.toString());
+        }
+        writeEvent(eventType, props);
+    }
+
+    @Override
+    public void frameworkEvent(FrameworkEvent event) {
+        int eventType = AuditEvent.FRAMEWORK_BASE;
+        Map<String, String> props = new HashMap<String, String>();
+        Bundle bundle = event.getBundle();
+
+        if (bundle != null) {
+            props.put(AuditEvent.KEY_ID, Long.toString(bundle.getBundleId()));
+        }
+
+        String msg = null;
+        String type = null;
+        Throwable exception = event.getThrowable();
+        if (exception != null) {
+            msg = exception.getMessage();
+            type = exception.getClass().getName();
+        }
+
+        switch (event.getType()) {
+            case FrameworkEvent.INFO:
+                eventType = AuditEvent.FRAMEWORK_INFO;
+                if (msg != null) {
+                    props.put(AuditEvent.KEY_MSG, msg);
+                }
+                if (type != null) {
+                    props.put(AuditEvent.KEY_TYPE, type);
+                }
+                break;
+            case FrameworkEvent.WARNING:
+                eventType = AuditEvent.FRAMEWORK_WARNING;
+                if (msg != null) {
+                    props.put(AuditEvent.KEY_MSG, msg);
+                }
+                if (type != null) {
+                    props.put(AuditEvent.KEY_TYPE, type);
+                }
+                break;
+            case FrameworkEvent.ERROR:
+                eventType = AuditEvent.FRAMEWORK_ERROR;
+                if (msg != null) {
+                    props.put(AuditEvent.KEY_MSG, msg);
+                }
+                if (type != null) {
+                    props.put(AuditEvent.KEY_TYPE, type);
+                }
+                break;
+            case FrameworkEvent.PACKAGES_REFRESHED:
+                eventType = AuditEvent.FRAMEWORK_REFRESH;
+                break;
+            case FrameworkEvent.STARTED:
+                eventType = AuditEvent.FRAMEWORK_STARTED;
+                break;
+            case FrameworkEvent.STARTLEVEL_CHANGED:
+                eventType = AuditEvent.FRAMEWORK_STARTLEVEL;
+                break;
+        }
+        writeEvent(eventType, props);
+    }
+
+    @Override
+    public void bundleChanged(BundleEvent event) {
+        int eventType = AuditEvent.BUNDLE_BASE;
+        Map<String, String> props = new HashMap<String, String>();
+        Bundle bundle = event.getBundle();
+        props.put(AuditEvent.KEY_ID, Long.toString(bundle.getBundleId()));
+
+        switch (event.getType()) {
+            case BundleEvent.INSTALLED:
+                eventType = AuditEvent.BUNDLE_INSTALLED;
+                if (bundle.getSymbolicName() != null) {
+                    props.put(AuditEvent.KEY_NAME, bundle.getSymbolicName());
+                }
+                String version = (String) bundle.getHeaders().get(Constants.BUNDLE_VERSION);
+                if (version != null) {
+                    props.put(AuditEvent.KEY_VERSION, version);
+                }
+                props.put(AuditEvent.KEY_LOCATION, bundle.getLocation());
+                break;
+            case BundleEvent.RESOLVED:
+                eventType = AuditEvent.BUNDLE_RESOLVED;
+                break;
+            case BundleEvent.STARTED:
+                eventType = AuditEvent.BUNDLE_STARTED;
+                break;
+            case BundleEvent.STOPPED:
+                eventType = AuditEvent.BUNDLE_STOPPED;
+                break;
+            case BundleEvent.UNRESOLVED:
+                eventType = AuditEvent.BUNDLE_UNRESOLVED;
+                break;
+            case BundleEvent.UPDATED:
+                eventType = AuditEvent.BUNDLE_UPDATED;
+                version = (String) bundle.getHeaders().get(Constants.BUNDLE_VERSION);
+                if (version != null) {
+                    props.put(AuditEvent.KEY_VERSION, version);
+                }
+                props.put(AuditEvent.KEY_LOCATION, bundle.getLocation());
+                break;
+            case BundleEvent.UNINSTALLED:
+                eventType = AuditEvent.BUNDLE_UNINSTALLED;
+                break;
+            case BundleEvent.STARTING:
+                eventType = AuditEvent.BUNDLE_STARTING;
+                break;
+            case BundleEvent.STOPPING:
+                eventType = AuditEvent.BUNDLE_STOPPING;
+                break;
+        }
+        writeEvent(eventType, props);
+    }
+
+    private void writeEvent(int eventType, Map<String, String> payload) {
+        FeedbackChannel channel = m_agentControl.getFeedbackChannel(EVENTLOGGER_FEEDBACKCHANNEL);
+        if (channel != null) {
+            try {
+                channel.write(eventType, payload);
+            }
+            catch (IOException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+        }
+    }
+}

Added: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java?rev=1515342&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java
(added)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java
Mon Aug 19 10:23:36 2013
@@ -0,0 +1,576 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.agent.impl;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.RandomAccessFile;
+import java.io.Writer;
+import java.net.ConnectException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLConnection;
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.ace.agent.ConnectionHandler;
+import org.apache.ace.agent.FeedbackChannel;
+import org.apache.ace.agent.RetryAfterException;
+import org.apache.ace.log.LogDescriptor;
+import org.apache.ace.log.LogEvent;
+import org.apache.ace.range.RangeIterator;
+import org.apache.ace.range.SortedRangeSet;
+
+/**
+ * FeedbackChannel implementation
+ * 
+ */
+// TODO: rotate/truncate<br/>
+// TODO: test(coverage)<br/>
+// TODO: decouple from range/log API?
+public class FeedbackChannelImpl implements FeedbackChannel {
+
+    private static final String DIRECTORY_NAME = "feedback";
+    private static final String COMMAND_QUERY = "query";
+    private static final String COMMAND_SEND = "send";
+    private static final String PARAMETER_TARGETID = "tid";
+    private static final String PARAMETER_LOGID = "logid";
+
+    private final AgentContext m_agentContext;
+    private final String m_name;
+    private final File m_baseDir;
+    private final FileFilter m_fileFilter = new FileFilter() {
+        @Override
+        public boolean accept(File file) {
+            return file.getName().startsWith(m_name);
+        }
+    };
+
+    private Store m_store = null;
+    private long m_highest;
+
+    public FeedbackChannelImpl(AgentContext agentContext, String name) throws IOException
{
+        m_agentContext = agentContext;
+        m_name = name;
+        m_baseDir = new File(m_agentContext.getWorkDir(), DIRECTORY_NAME);
+        if (!m_baseDir.isDirectory() && !m_baseDir.mkdirs())
+            throw new IllegalArgumentException("Need valid dir");
+        initStore();
+    }
+
+    @Override
+    public synchronized void sendFeedback() throws RetryAfterException, IOException {
+        String identification = getIdentification();
+        URL serverURL = getServerURL();
+        if (identification == null || serverURL == null)
+            return;
+        URLConnection sendConnection = null;
+        Writer writer = null;
+        try {
+            URL sendURL = new URL(serverURL, m_name + "/" + COMMAND_SEND);
+            sendConnection = getConnectionHandler().getConnection(sendURL);
+            sendConnection.setDoOutput(true);
+            if (sendConnection instanceof HttpURLConnection)
+                ((HttpURLConnection) sendConnection).setChunkedStreamingMode(8192);
+            writer = new BufferedWriter(new OutputStreamWriter(sendConnection.getOutputStream()));
+            SortedSet<Long> storeIDs = getStoreIDs();
+            for (Long storeID : storeIDs) {
+                URL queryURL = new URL(serverURL, m_name + "/" + COMMAND_QUERY + "?" + PARAMETER_TARGETID
+ "=" + identification + "&" + PARAMETER_LOGID + "=" + storeID);
+                URLConnection queryConnection = getConnectionHandler().getConnection(queryURL);
+                synchronizeStore(storeID, queryConnection.getInputStream(), writer);
+            }
+            writer.flush();
+            sendConnection.getContent();
+        }
+        catch (ConnectException e) {
+            e.printStackTrace();
+        }
+        catch (IOException e) {
+            e.printStackTrace();
+        }
+        finally {
+            if (writer != null)
+                writer.close();
+            if (sendConnection instanceof HttpURLConnection)
+                ((HttpURLConnection) sendConnection).disconnect();
+        }
+    }
+
+    @Override
+    public synchronized void write(int type, Map<String, String> properties) throws
IOException {
+        try {
+            LogEvent result = new LogEvent(null, m_store.getId(), getNextEventID(), System.currentTimeMillis(),
type, mapToDictionary(properties));
+            m_store.append(result.getID(), result.toRepresentation().getBytes());
+        }
+        catch (IOException ex) {
+            handleException(m_store, ex);
+        }
+    }
+
+    // TODO Is this called?
+    public synchronized void closeStore() throws IOException {
+        m_store.close();
+        m_store = null;
+    }
+
+    private void initStore() throws IOException {
+        SortedSet<Long> storeIDs = getStoreIDs();
+        if (storeIDs.isEmpty()) {
+            m_store = newFeedbackStore();
+        }
+        else {
+            m_store = createStore(storeIDs.last());
+            try {
+                m_store.init();
+            }
+            catch (IOException ex) {
+                handleException(m_store, ex);
+            }
+        }
+    }
+
+    private void synchronizeStore(long storeID, InputStream queryInput, Writer sendWriter)
throws IOException {
+        long highestLocal = getHighestEventID(storeID);
+        if (highestLocal == 0)
+            return;
+        SortedRangeSet localRange = new SortedRangeSet("1-" + highestLocal);
+        SortedRangeSet remoteRange = getQueryDescriptor(queryInput).getRangeSet();
+        SortedRangeSet delta = remoteRange.diffDest(localRange);
+        RangeIterator rangeIterator = delta.iterator();
+        if (!rangeIterator.hasNext())
+            return;
+        String identification = getIdentification();
+        long lowest = rangeIterator.next();
+        long highest = delta.getHigh();
+        if (lowest <= highest) {
+            List<LogEvent> events = getEvents(storeID, lowest, highestLocal > highest
? highest : highestLocal);
+            Iterator<LogEvent> iter = events.iterator();
+            while (iter.hasNext()) {
+                LogEvent current = (LogEvent) iter.next();
+                while ((current.getID() > lowest) && rangeIterator.hasNext())
{
+                    lowest = rangeIterator.next();
+                }
+                if (current.getID() == lowest) {
+                    LogEvent event = new LogEvent(identification, current);
+                    sendWriter.write(event.toRepresentation());
+                    sendWriter.write("\n");
+                }
+            }
+        }
+    }
+
+    private LogDescriptor getQueryDescriptor(InputStream queryInput) throws IOException {
+        BufferedReader queryReader = null;
+        try {
+            queryReader = new BufferedReader(new InputStreamReader(queryInput));
+            String rangeString = queryReader.readLine();
+            if (rangeString == null) {
+                throw new IOException("Could not construct LogDescriptor from stream because
stream is empty");
+            }
+            try {
+                return new LogDescriptor(rangeString);
+            }
+            catch (IllegalArgumentException e) {
+                throw new IOException("Could not determine highest remote event id, received
malformed event range (" + rangeString + ")");
+            }
+        }
+        finally {
+            if (queryReader != null) {
+                try {
+                    queryReader.close();
+                }
+                catch (Exception ex) {
+                    // not much we can do
+                }
+            }
+        }
+    }
+
+    private Store newFeedbackStore() throws IOException {
+        long storeId = System.currentTimeMillis();
+        while (!(new File(m_baseDir, getStoreName(storeId))).createNewFile()) {
+            storeId++;
+        }
+        return new Store(new File(m_baseDir, getStoreName(storeId)), storeId);
+    }
+
+    private Store createStore(long storeId) throws IOException {
+        return new Store(new File(m_baseDir, getStoreName(storeId)), storeId);
+    }
+
+    private String getStoreName(long storeId) {
+        return m_name + "-" + storeId;
+    }
+
+    private long getStoreId(String storeName) {
+        return Long.parseLong(storeName.replace(m_name + "-", ""));
+    }
+
+    private List<LogEvent> getEvents(long storeID, long fromEventID, long toEventID)
throws IOException {
+        Store store = getStore(storeID);
+        List<LogEvent> result = new ArrayList<LogEvent>();
+        try {
+            if (store.getCurrent() > fromEventID) {
+                store.reset();
+            }
+            while (store.hasNext()) {
+                long eventID = store.readCurrentID();
+                if ((eventID >= fromEventID) && (eventID <= toEventID)) {
+                    result.add(new LogEvent(new String(store.read())));
+                }
+                else {
+                    store.skip();
+                }
+            }
+        }
+        catch (Exception ex) {
+            handleException(store, ex);
+        }
+        finally {
+            closeIfNeeded(store);
+        }
+        return result;
+    }
+
+    private void handleException(Store store, Exception exception) throws IOException {
+        // System.err.println(LogService.LOG_WARNING, "Exception accessing the log: "
+        // + store.getId(), exception);
+        if (store == m_store)
+            m_store = newFeedbackStore();
+
+        try {
+            store.truncate();
+        }
+        catch (IOException ex) {
+            // m_log.log(LogService.LOG_WARNING, "Exception during truncate: "
+            // + store.getId(), ex);
+        }
+        try {
+            store.close();
+        }
+        catch (IOException ex) {
+            // Not much we can do
+        }
+        if (exception instanceof IOException) {
+            throw (IOException) exception;
+        }
+        throw new IOException("Unable to read log entry: "
+            + exception.getMessage());
+    }
+
+    private File[] getStoreFiles() throws IOException {
+        File[] files = (File[]) m_baseDir.listFiles(m_fileFilter);
+        if (files == null)
+            throw new IOException("Unable to list store files in " + m_baseDir.getAbsolutePath());
+        return files;
+    }
+
+    private SortedSet<Long> getStoreIDs() throws IOException {
+        File[] files = getStoreFiles();
+        SortedSet<Long> storeIDs = new TreeSet<Long>();
+        for (int i = 0; i < files.length; i++)
+            storeIDs.add(getStoreId(files[i].getName()));
+        return storeIDs;
+    }
+
+    private long getHighestEventID(long storeID) throws IOException {
+        Store store = getStore(storeID);
+        try {
+            if (m_highest == 0) {
+                store.init();
+                return (m_highest = store.getCurrent());
+            }
+            else {
+                return m_highest;
+            }
+        }
+        catch (IOException ex) {
+            handleException(store, ex);
+        }
+        finally {
+            closeIfNeeded(store);
+        }
+        return -1;
+    }
+
+    private void closeIfNeeded(Store store) {
+        if (store != m_store) {
+            try {
+                store.close();
+            }
+            catch (IOException ex) {
+                // Not much we can do;
+            }
+        }
+    }
+
+    private Store getStore(long storeID) throws IOException {
+        if (m_store.getId() == storeID) {
+            return m_store;
+        }
+        return createStore(storeID);
+    }
+
+    private long getNextEventID() throws IOException {
+        return (m_highest = getHighestEventID(m_store.m_id) + 1);
+    }
+
+    private ConnectionHandler getConnectionHandler() {
+        return m_agentContext.getConnectionHandler();
+    }
+
+    private String getIdentification() {
+        return m_agentContext.getIdentificationHandler().getIdentification();
+    }
+
+    private URL getServerURL() {
+        return m_agentContext.getDiscoveryHandler().getServerUrl();
+    }
+
+    // bridging to log api
+    private static Dictionary<String, String> mapToDictionary(Map<String, String>
map) {
+        Dictionary<String, String> dictionary = new Hashtable<String, String>();
+        for (Entry<String, String> entry : map.entrySet()) {
+            dictionary.put(entry.getKey(), entry.getValue());
+        }
+        return dictionary;
+    }
+
+    /**
+     * The general idea is to provide easy access to a file of records. It supports iterating
over records both by
+     * skipping and by reading. Furthermore, files can be truncated. Most methods will make
an effort to reset to the
+     * last good record in case of an error -- hence, a call to truncate after an IOException
might make the store
+     * readable again.
+     */
+    static class Store {
+        private final RandomAccessFile m_store;
+        private final long m_id;
+        private long m_current;
+
+        /**
+         * Create a new File based Store.
+         * 
+         * @param store
+         *            the file to use as backend.
+         * @param id
+         *            the log id of the store
+         * @throws java.io.IOException
+         *             in case the file is not rw.
+         */
+        Store(File store, long id) throws IOException {
+            m_store = new RandomAccessFile(store, "rwd");
+            m_id = id;
+        }
+
+        /**
+         * Get the id of the current record.
+         * 
+         * @return the idea of the current record.
+         */
+        public long getCurrent() throws IOException {
+            long pos = m_store.getFilePointer();
+            if (m_store.length() == 0) {
+                return 0;
+            }
+            long result = 0;
+            try {
+                m_store.seek(m_current);
+                result = readCurrentID();
+                m_store.seek(pos);
+            }
+            catch (IOException ex) {
+                handle(pos, ex);
+            }
+            return result;
+        }
+
+        /**
+         * Get the log id of this store.
+         * 
+         * @return the log id of this store.
+         */
+        public long getId() {
+            return m_id;
+        }
+
+        /**
+         * Reset the store to the beginning of the records
+         * 
+         * @throws java.io.IOException
+         *             in case of an IO error.
+         */
+        public void reset() throws IOException {
+            m_store.seek(0);
+            m_current = 0;
+        }
+
+        /**
+         * Determine whether there are any records left based on the current postion.
+         * 
+         * @return <code>true</code> if there are still records to be read.
+         * @throws java.io.IOException
+         *             in case of an IO error.
+         */
+        public boolean hasNext() throws IOException {
+            return m_store.getFilePointer() < m_store.length();
+        }
+
+        public byte[] read() throws IOException {
+            long pos = m_store.getFilePointer();
+            try {
+                if (pos < m_store.length()) {
+                    long current = m_store.getFilePointer();
+                    long id = m_store.readLong();
+                    int next = m_store.readInt();
+                    byte[] entry = new byte[next];
+                    m_store.readFully(entry);
+                    m_current = current;
+                    return entry;
+                }
+            }
+            catch (IOException ex) {
+                handle(pos, ex);
+            }
+            return null;
+        }
+
+        public long readCurrentID() throws IOException {
+            long pos = m_store.getFilePointer();
+            try {
+                if (pos < m_store.length()) {
+                    long id = m_store.readLong();
+                    m_store.seek(pos);
+                    return id;
+                }
+            }
+            catch (IOException ex) {
+                handle(pos, ex);
+            }
+            return -1;
+        }
+
+        /**
+         * Make sure the store is readable. As a result, the store is at the end of the records.
+         * 
+         * @throws java.io.IOException
+         *             in case of any IO error.
+         */
+        public void init() throws IOException {
+            reset();
+            try {
+                while (true) {
+                    skip();
+                }
+            }
+            catch (EOFException ex) {
+                // done
+            }
+        }
+
+        /**
+         * Skip the next record if there is any.
+         * 
+         * @throws java.io.IOException
+         *             in case of any IO error or if there is no record left.
+         */
+        public void skip() throws IOException {
+            long pos = m_store.getFilePointer();
+            try {
+                long id = m_store.readLong();
+                int next = m_store.readInt();
+                if (m_store.length() < next + m_store.getFilePointer()) {
+                    throw new IOException("Unexpected end of file");
+                }
+                m_store.seek(m_store.getFilePointer() + next);
+                m_current = pos;
+                pos = m_store.getFilePointer();
+            }
+            catch (IOException ex) {
+                handle(pos, ex);
+            }
+        }
+
+        /**
+         * Store the given record data as the next record.
+         * 
+         * @param entry
+         *            the data of the record to store.
+         * @throws java.io.IOException
+         *             in case of any IO error.
+         */
+        public void append(long id, byte[] entry) throws IOException {
+            long pos = m_store.getFilePointer();
+            try {
+                m_store.seek(m_store.length());
+                long current = m_store.getFilePointer();
+                m_store.writeLong(id);
+                m_store.writeInt(entry.length);
+                m_store.write(entry);
+                m_store.seek(pos);
+            }
+            catch (IOException ex) {
+                handle(pos, ex);
+            }
+        }
+
+        /**
+         * Try to truncate the store at the current record.
+         * 
+         * @throws java.io.IOException
+         *             in case of any IO error.
+         */
+        public void truncate() throws IOException {
+            m_store.setLength(m_store.getFilePointer());
+        }
+
+        /**
+         * Release any resources.
+         * 
+         * @throws java.io.IOException
+         *             in case of any IO error.
+         */
+        public void close() throws IOException {
+            m_store.close();
+        }
+
+        private void handle(long pos, IOException exception) throws IOException {
+            try {
+                m_store.seek(pos);
+            }
+            catch (IOException ex) {
+                // m_log.log(LogService.LOG_WARNING, "Exception during seek!", ex);
+            }
+            throw exception;
+        }
+    }
+}



Mime
View raw message