ace-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject svn commit: r1524723 - in /ace/trunk/org.apache.ace.agent: src/org/apache/ace/agent/impl/ test/org/apache/ace/agent/impl/
Date Thu, 19 Sep 2013 13:02:22 GMT
Author: marrs
Date: Thu Sep 19 13:02:22 2013
New Revision: 1524723

URL: http://svn.apache.org/r1524723
Log:
ACE-381 Applied the supplied patch, with a few minor formatting tweaks and one modified comment. Thanks Ronald.

Added:
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackStore.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackStoreManager.java
    ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/FeedbackChannelImplTest.java
    ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/FeedbackStoreManagerTest.java
Modified:
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java
    ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackHandlerImpl.java

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java?rev=1524723&r1=1524722&r2=1524723&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java Thu Sep 19 13:02:22 2013
@@ -18,31 +18,23 @@
  */
 package org.apache.ace.agent.impl;
 
-import static org.apache.ace.agent.impl.ConnectionUtil.*;
+import static org.apache.ace.agent.impl.ConnectionUtil.close;
+import static org.apache.ace.agent.impl.ConnectionUtil.closeSilently;
 
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileFilter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
-import java.io.RandomAccessFile;
 import java.io.Writer;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.net.URLConnection;
-import java.util.ArrayList;
-import java.util.Dictionary;
-import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.SortedSet;
-import java.util.TreeSet;
 
 import org.apache.ace.agent.AgentContext;
 import org.apache.ace.agent.ConnectionHandler;
@@ -58,268 +50,28 @@ import org.apache.ace.range.SortedRangeS
 
 /**
  * FeedbackChannel implementation
- * 
  */
-// TODO: rotate/truncate<br/>
-// TODO: test(coverage)<br/>
 // TODO: decouple from range/log API?
 public class FeedbackChannelImpl implements FeedbackChannel {
-    /**
-     * The general idea is to provide easy access to a file of records. It supports iterating over records both by
-     * skipping and by reading. Furthermore, files can be truncated. Most methods will make an effort to reset to the
-     * last good record in case of an error -- hence, a call to truncate after an IOException might make the store
-     * readable again.
-     */
-    static class Store {
-        private final RandomAccessFile m_store;
-        private final long m_id;
-        private long m_current;
-
-        /**
-         * Create a new File based Store.
-         * 
-         * @param store
-         *            the file to use as backend.
-         * @param id
-         *            the log id of the store
-         * @throws java.io.IOException
-         *             in case the file is not rw.
-         */
-        Store(File store, long id) throws IOException {
-            m_store = new RandomAccessFile(store, "rwd");
-            m_id = id;
-        }
-
-        /**
-         * Store the given record data as the next record.
-         * 
-         * @param entry
-         *            the data of the record to store.
-         * @throws java.io.IOException
-         *             in case of any IO error.
-         */
-        public void append(long id, byte[] entry) throws IOException {
-            long pos = m_store.getFilePointer();
-            try {
-                m_store.seek(m_store.length());
-                m_store.writeLong(id);
-                m_store.writeInt(entry.length);
-                m_store.write(entry);
-                m_store.seek(pos);
-            }
-            catch (IOException ex) {
-                handle(pos, ex);
-            }
-        }
-
-        /**
-         * Release any resources.
-         * 
-         * @throws java.io.IOException
-         *             in case of any IO error.
-         */
-        public void close() throws IOException {
-            m_store.close();
-        }
 
-        /**
-         * Get the id of the current record.
-         * 
-         * @return the idea of the current record.
-         */
-        public long getCurrent() throws IOException {
-            long pos = m_store.getFilePointer();
-            if (m_store.length() == 0) {
-                return 0;
-            }
-            long result = 0;
-            try {
-                m_store.seek(m_current);
-                result = readCurrentID();
-                m_store.seek(pos);
-            }
-            catch (IOException ex) {
-                handle(pos, ex);
-            }
-            return result;
-        }
-
-        /**
-         * Get the log id of this store.
-         * 
-         * @return the log id of this store.
-         */
-        public long getId() {
-            return m_id;
-        }
-
-        /**
-         * Determine whether there are any records left based on the current postion.
-         * 
-         * @return <code>true</code> if there are still records to be read.
-         * @throws java.io.IOException
-         *             in case of an IO error.
-         */
-        public boolean hasNext() throws IOException {
-            return m_store.getFilePointer() < m_store.length();
-        }
-
-        /**
-         * Make sure the store is readable. As a result, the store is at the end of the records.
-         * 
-         * @throws java.io.IOException
-         *             in case of any IO error.
-         */
-        public void init() throws IOException {
-            reset();
-            try {
-                while (true) {
-                    skip();
-                }
-            }
-            catch (EOFException ex) {
-                // done
-            }
-        }
-
-        @SuppressWarnings("unused")
-        public byte[] read() throws IOException {
-            long pos = m_store.getFilePointer();
-            try {
-                if (pos < m_store.length()) {
-                    long current = m_store.getFilePointer();
-                    long id = m_store.readLong();
-                    int next = m_store.readInt();
-                    byte[] entry = new byte[next];
-                    m_store.readFully(entry);
-                    m_current = current;
-                    return entry;
-                }
-            }
-            catch (IOException ex) {
-                handle(pos, ex);
-            }
-            return null;
-        }
-
-        public long readCurrentID() throws IOException {
-            long pos = m_store.getFilePointer();
-            try {
-                if (pos < m_store.length()) {
-                    long id = m_store.readLong();
-                    m_store.seek(pos);
-                    return id;
-                }
-            }
-            catch (IOException ex) {
-                handle(pos, ex);
-            }
-            return -1;
-        }
-
-        /**
-         * Reset the store to the beginning of the records
-         * 
-         * @throws java.io.IOException
-         *             in case of an IO error.
-         */
-        public void reset() throws IOException {
-            m_store.seek(0);
-            m_current = 0;
-        }
-
-        /**
-         * Skip the next record if there is any.
-         * 
-         * @throws java.io.IOException
-         *             in case of any IO error or if there is no record left.
-         */
-        @SuppressWarnings("unused")
-        public void skip() throws IOException {
-            long pos = m_store.getFilePointer();
-            try {
-                long id = m_store.readLong();
-                int next = m_store.readInt();
-                if (m_store.length() < next + m_store.getFilePointer()) {
-                    throw new IOException("Unexpected end of file");
-                }
-                m_store.seek(m_store.getFilePointer() + next);
-                m_current = pos;
-                pos = m_store.getFilePointer();
-            }
-            catch (IOException ex) {
-                handle(pos, ex);
-            }
-        }
-
-        /**
-         * Try to truncate the store at the current record.
-         * 
-         * @throws java.io.IOException
-         *             in case of any IO error.
-         */
-        public void truncate() throws IOException {
-            m_store.setLength(m_store.getFilePointer());
-        }
-
-        private void handle(long pos, IOException exception) throws IOException {
-            try {
-                m_store.seek(pos);
-            }
-            catch (IOException ex) {
-                // m_log.log(LogService.LOG_WARNING, "Exception during seek!", ex);
-            }
-            throw exception;
-        }
-    }
-
-    private static final String DIRECTORY_NAME = "feedback";
     private static final String COMMAND_QUERY = "query";
     private static final String COMMAND_SEND = "send";
     private static final String PARAMETER_TARGETID = "tid";
 
     private static final String PARAMETER_LOGID = "logid";
 
-    // bridging to log api
-    private static Dictionary<String, String> mapToDictionary(Map<String, String> map) {
-        Dictionary<String, String> dictionary = new Hashtable<String, String>();
-        for (Entry<String, String> entry : map.entrySet()) {
-            dictionary.put(entry.getKey(), entry.getValue());
-        }
-        return dictionary;
-    }
-
     private final AgentContext m_agentContext;
     private final String m_name;
-    private final File m_baseDir;
-    private final FileFilter m_fileFilter = new FileFilter() {
-        @Override
-        public boolean accept(File file) {
-            return file.getName().startsWith(m_name);
-        }
-    };
-
-    private Store m_store = null;
-
-    private long m_highest;
+    private final FeedbackStoreManager m_storeManager;
 
     public FeedbackChannelImpl(AgentContext agentContext, String name) throws IOException {
         m_agentContext = agentContext;
         m_name = name;
-        m_baseDir = new File(m_agentContext.getWorkDir(), DIRECTORY_NAME);
-        if (!m_baseDir.isDirectory() && !m_baseDir.mkdirs()) {
-            throw new IllegalArgumentException("Need valid dir");
-        }
-        initStore();
+        m_storeManager = new FeedbackStoreManager(agentContext, name);
     }
 
-    public void closeStore() throws IOException {
-        Store store;
-        synchronized (m_store) {
-            store = m_store;
-        }
-        store.close();
-        m_store = null;
+    public synchronized void stop() throws IOException {
+        m_storeManager.close();
     }
 
     @Override
@@ -346,7 +98,7 @@ public class FeedbackChannelImpl impleme
             }
 
             writer = new BufferedWriter(new OutputStreamWriter(sendConnection.getOutputStream()));
-            SortedSet<Long> storeIDs = getStoreIDs();
+            SortedSet<Long> storeIDs = m_storeManager.getAllFeedbackStoreIDs();
             for (Long storeID : storeIDs) {
                 URL queryURL = new URL(serverURL, m_name + "/" + COMMAND_QUERY + "?" + PARAMETER_TARGETID + "=" + identification + "&" + PARAMETER_LOGID + "=" + storeID);
                 URLConnection queryConnection = connectionHandler.getConnection(queryURL);
@@ -370,82 +122,13 @@ public class FeedbackChannelImpl impleme
 
     @Override
     public void write(int type, Map<String, String> properties) throws IOException {
-        synchronized (m_store) {
-            try {
-                LogEvent result = new LogEvent(null, m_store.getId(), getNextEventID(), System.currentTimeMillis(), type, mapToDictionary(properties));
-                m_store.append(result.getID(), result.toRepresentation().getBytes());
-            }
-            catch (IOException ex) {
-                handleException(m_store, ex);
-            }
-        }
-    }
-
-    private void closeIfNeeded(Store store) {
-        if (store != m_store) {
-            try {
-                store.close();
-            }
-            catch (IOException ex) {
-                // Not much we can do;
-            }
-        }
-    }
-
-    private Store createStore(long storeId) throws IOException {
-        return new Store(new File(m_baseDir, getStoreName(storeId)), storeId);
+        m_storeManager.write(type, properties);
     }
 
     private ConnectionHandler getConnectionHandler() {
         return m_agentContext.getHandler(ConnectionHandler.class);
     }
 
-    private List<LogEvent> getEvents(long storeID, long fromEventID, long toEventID) throws IOException {
-        Store store = getStore(storeID);
-        List<LogEvent> result = new ArrayList<LogEvent>();
-        try {
-            if (store.getCurrent() > fromEventID) {
-                store.reset();
-            }
-            while (store.hasNext()) {
-                long eventID = store.readCurrentID();
-                if ((eventID >= fromEventID) && (eventID <= toEventID)) {
-                    result.add(new LogEvent(new String(store.read())));
-                }
-                else {
-                    store.skip();
-                }
-            }
-        }
-        catch (Exception ex) {
-            handleException(store, ex);
-        }
-        finally {
-            closeIfNeeded(store);
-        }
-        return result;
-    }
-
-    private long getHighestEventID(long storeID) throws IOException {
-        Store store = getStore(storeID);
-        try {
-            if (m_highest == 0) {
-                store.init();
-                return (m_highest = store.getCurrent());
-            }
-            else {
-                return m_highest;
-            }
-        }
-        catch (IOException ex) {
-            handleException(store, ex);
-        }
-        finally {
-            closeIfNeeded(store);
-        }
-        return -1;
-    }
-
     private String getIdentification() {
         return m_agentContext.getHandler(IdentificationHandler.class).getAgentId();
     }
@@ -454,8 +137,12 @@ public class FeedbackChannelImpl impleme
         return m_agentContext.getHandler(LoggingHandler.class);
     }
 
-    private long getNextEventID() throws IOException {
-        return (m_highest = getHighestEventID(m_store.m_id) + 1);
+    private URL getServerURL() {
+        return m_agentContext.getHandler(DiscoveryHandler.class).getServerUrl();
+    }
+
+    private void logWarning(String msg, Object... args) {
+        getLoggingHandler().logWarning("feedbackChannel(" + m_name + ")", msg, null, args);
     }
 
     private LogDescriptor getQueryDescriptor(InputStream queryInput) throws IOException {
@@ -485,104 +172,8 @@ public class FeedbackChannelImpl impleme
         }
     }
 
-    private URL getServerURL() {
-        return m_agentContext.getHandler(DiscoveryHandler.class).getServerUrl();
-    }
-
-    private Store getStore(long storeID) throws IOException {
-        if (m_store.getId() == storeID) {
-            return m_store;
-        }
-        return createStore(storeID);
-    }
-
-    private File[] getStoreFiles() throws IOException {
-        File[] files = (File[]) m_baseDir.listFiles(m_fileFilter);
-        if (files == null) {
-            throw new IOException("Unable to list store files in " + m_baseDir.getAbsolutePath());
-        }
-        return files;
-    }
-
-    private long getStoreId(String storeName) {
-        return Long.parseLong(storeName.replace(m_name + "-", ""));
-    }
-
-    private SortedSet<Long> getStoreIDs() throws IOException {
-        File[] files = getStoreFiles();
-        SortedSet<Long> storeIDs = new TreeSet<Long>();
-        for (int i = 0; i < files.length; i++) {
-            storeIDs.add(getStoreId(files[i].getName()));
-        }
-        return storeIDs;
-    }
-
-    private String getStoreName(long storeId) {
-        return m_name + "-" + storeId;
-    }
-
-    private void handleException(Store store, Exception exception) throws IOException {
-        logError("Exception caught while accessing feedback channel store #%d", exception, store.getId());
-
-        if (store == m_store) {
-            m_store = newFeedbackStore();
-        }
-        try {
-            store.truncate();
-        }
-        catch (IOException ex) {
-            logError("Exception caught while truncating feedback channel store #%d", ex, store.getId());
-        }
-
-        try {
-            store.close();
-        }
-        catch (IOException ex) {
-            // Not much we can do
-        }
-
-        if (exception instanceof IOException) {
-            throw (IOException) exception;
-        }
-
-        throw new IOException("Unable to read log entry: " + exception.getMessage());
-    }
-
-    private void initStore() throws IOException {
-        SortedSet<Long> storeIDs = getStoreIDs();
-        if (storeIDs.isEmpty()) {
-            m_store = newFeedbackStore();
-        }
-        else {
-            m_store = createStore(storeIDs.last());
-            try {
-                m_store.init();
-            }
-            catch (IOException ex) {
-                handleException(m_store, ex);
-            }
-        }
-    }
-
-    private void logError(String msg, Exception cause, Object... args) {
-        getLoggingHandler().logError("feedbackChannel(" + m_name + ")", msg, cause, args);
-    }
-
-    private void logWarning(String msg, Object... args) {
-        getLoggingHandler().logWarning("feedbackChannel(" + m_name + ")", msg, null, args);
-    }
-
-    private Store newFeedbackStore() throws IOException {
-        long storeId = System.currentTimeMillis();
-        // XXX this can fail in case of high concurrent situations!
-        while (!(new File(m_baseDir, getStoreName(storeId))).createNewFile()) {
-            storeId++;
-        }
-        return new Store(new File(m_baseDir, getStoreName(storeId)), storeId);
-    }
-
     private void synchronizeStore(long storeID, InputStream queryInput, Writer sendWriter) throws IOException {
-        long highestLocal = getHighestEventID(storeID);
+        long highestLocal = m_storeManager.getHighestEventID(storeID);
         if (highestLocal == 0) {
             return;
         }
@@ -597,7 +188,7 @@ public class FeedbackChannelImpl impleme
         long lowest = rangeIterator.next();
         long highest = delta.getHigh();
         if (lowest <= highest) {
-            List<LogEvent> events = getEvents(storeID, lowest, highestLocal > highest ? highest : highestLocal);
+            List<LogEvent> events = m_storeManager.getEvents(storeID, lowest, highestLocal > highest ? highest : highestLocal);
             Iterator<LogEvent> iter = events.iterator();
             while (iter.hasNext()) {
                 LogEvent current = (LogEvent) iter.next();

Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackHandlerImpl.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackHandlerImpl.java?rev=1524723&r1=1524722&r2=1524723&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackHandlerImpl.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackHandlerImpl.java Thu Sep 19 13:02:22 2013
@@ -134,7 +134,7 @@ public class FeedbackHandlerImpl extends
     private void unregisterFeedbackChannel(String oldChannelName) throws IOException {
         FeedbackChannelImpl channel = m_channels.remove(oldChannelName);
         if (channel != null) {
-            channel.closeStore();
+            channel.stop();
         }
     }
 }

Added: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackStore.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackStore.java?rev=1524723&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackStore.java (added)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackStore.java Thu Sep 19 13:02:22 2013
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.agent.impl;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+/**
+ * The general idea is to provide easy access to a file of records. It supports iterating over records both by skipping
+ * and by reading. Furthermore, files can be truncated. Most methods will make an effort to reset to the last good
+ * record in case of an error -- hence, a call to truncate after an IOException might make the store readable again.
+ */
+public class FeedbackStore {
+    private final RandomAccessFile m_store;
+    private final long m_id;
+    private long m_current;
+
+    /**
+     * Create a new File based Store.
+     * 
+     * @param store
+     *            the file to use as backend.
+     * @param id
+     *            the log id of the store
+     * @throws java.io.IOException
+     *             in case the file is not rw.
+     */
+    FeedbackStore(File store, long id) throws IOException {
+        m_store = new RandomAccessFile(store, "rwd");
+        m_id = id;
+    }
+
+    /**
+     * Get the id of the current record.
+     * 
+     * @return the idea of the current record.
+     */
+    public long getCurrent() throws IOException {
+        long pos = m_store.getFilePointer();
+        if (m_store.length() == 0) {
+            return 0;
+        }
+        long result = 0;
+        try {
+            m_store.seek(m_current);
+            result = readCurrentID();
+            m_store.seek(pos);
+        }
+        catch (IOException ex) {
+            handle(pos, ex);
+        }
+        return result;
+    }
+
+    /**
+     * Get the log id of this store.
+     * 
+     * @return the log id of this store.
+     */
+    public long getId() {
+        return m_id;
+    }
+
+    /**
+     * Reset the store to the beginning of the records
+     * 
+     * @throws java.io.IOException
+     *             in case of an IO error.
+     */
+    public void reset() throws IOException {
+        m_store.seek(0);
+        m_current = 0;
+    }
+
+    /**
+     * Determine whether there are any records left based on the current postion.
+     * 
+     * @return <code>true</code> if there are still records to be read.
+     * @throws IOException
+     *             in case of an IO error.
+     */
+    public boolean hasNext() throws IOException {
+        return m_store.getFilePointer() < m_store.length();
+    }
+
+    /**
+     * Read a single logevent from this file
+     * 
+     * @return the bytes for a single logevent
+     * @throws IOException
+     *             in case of an IO error.
+     */
+    public byte[] read() throws IOException {
+        long pos = m_store.getFilePointer();
+        try {
+            if (pos < m_store.length()) {
+                long current = m_store.getFilePointer();
+                long id = m_store.readLong();
+                int next = m_store.readInt();
+                byte[] entry = new byte[next];
+                m_store.readFully(entry);
+                m_current = current;
+                return entry;
+            }
+        }
+        catch (IOException ex) {
+            handle(pos, ex);
+        }
+        return null;
+    }
+
+    /**
+     * Return the id for the logevent at the current position in the file
+     * 
+     * @return the event id
+     * @throws IOException
+     *             in case of an IO error.
+     */
+    public long readCurrentID() throws IOException {
+        long pos = m_store.getFilePointer();
+        try {
+            if (pos < m_store.length()) {
+                long id = m_store.readLong();
+                m_store.seek(pos);
+                return id;
+            }
+        }
+        catch (IOException ex) {
+            handle(pos, ex);
+        }
+        return -1;
+    }
+
+    /**
+     * Make sure the store is readable. As a result, the store is at the end of the records.
+     * 
+     * @throws IOException
+     *             in case of any IO error.
+     */
+    public void init() throws IOException {
+        reset();
+        try {
+            while (true) {
+                skip();
+            }
+        }
+        catch (EOFException ex) {
+            // done
+        }
+    }
+
+    /**
+     * Skip the next record if there is any.
+     * 
+     * @throws IOException
+     *             in case of any IO error or if there is no record left.
+     */
+    public void skip() throws IOException {
+        long pos = m_store.getFilePointer();
+        try {
+            long id = m_store.readLong();
+            int next = m_store.readInt();
+            if (m_store.length() < next + m_store.getFilePointer()) {
+                throw new IOException("Unexpected end of file");
+            }
+            m_store.seek(m_store.getFilePointer() + next);
+            m_current = pos;
+            pos = m_store.getFilePointer();
+        }
+        catch (IOException ex) {
+            handle(pos, ex);
+        }
+    }
+
+    /**
+     * Store the given record data as the next record.
+     * 
+     * @param entry
+     *            the data of the record to store.
+     * @throws IOException
+     *             in case of any IO error.
+     */
+    public void append(long id, byte[] entry) throws IOException {
+        long pos = m_store.getFilePointer();
+        try {
+            m_store.seek(m_store.length());
+            long current = m_store.getFilePointer();
+            m_store.writeLong(id);
+            m_store.writeInt(entry.length);
+            m_store.write(entry);
+            m_store.seek(pos);
+        }
+        catch (IOException ex) {
+            handle(pos, ex);
+        }
+    }
+
+    /**
+     * Try to truncate the store at the current record.
+     * 
+     * @throws IOException
+     *             in case of any IO error.
+     */
+    public void truncate() throws IOException {
+        m_store.setLength(m_store.getFilePointer());
+    }
+
+    /**
+     * Release any resources.
+     * 
+     * @throws IOException
+     *             in case of any IO error.
+     */
+    public void close() throws IOException {
+        m_store.close();
+    }
+
+    /**
+     * Return the filesize for this file
+     * 
+     * @return the size in bytes
+     * @throws IOException
+     *             in case of any IO error.
+     */
+    public long getFileSize() throws IOException {
+        return m_store.length();
+    }
+
+    private void handle(long pos, IOException exception) throws IOException {
+        try {
+            m_store.seek(pos);
+        }
+        catch (IOException ex) {
+        	// we don't log this, seeking back to pos is a 'best effort'
+        	// attempt to keep the file consistent and it would be very
+        	// strange for it to fail (in which case we have no code to
+        	// deal with that anyway)
+        }
+        throw exception;    
+    }
+}

Added: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackStoreManager.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackStoreManager.java?rev=1524723&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackStoreManager.java (added)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackStoreManager.java Thu Sep 19 13:02:22 2013
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.agent.impl;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.ace.agent.AgentContext;
+import org.apache.ace.agent.LoggingHandler;
+import org.apache.ace.log.LogEvent;
+
+/**
+ * This class acts as a factory for retrieving/creating stores and it also is an adapter for the feedbackstore that is
+ * currently active.
+ * 
+ * The filenames backing the feedbackstore are : storename-timestamp.sequencenumber, e.g. feedback-1378716629402.1 When
+ * the maximum allowed filesize for a logfile is reached a new file is created : feedback-1378716629402.2
+ * 
+ * This managerclass takes care of the splitting of logs over multiple files, cleanup of old files, etc.
+ */
+public class FeedbackStoreManager {
+
+    // This is the number of files the maxFileSize of the is split over
+    private static final int NUMBER_OF_FILES = 10;
+    private static final String DIRECTORY_NAME = "feedback";
+
+    private final AgentContext m_agentContext;
+    private final String m_name;
+    private final File m_baseDir;
+    private final int m_maxFileSize;
+
+    private FeedbackStore m_currentStore;
+    private long m_highest;
+
+    private final FileFilter m_fileFilter = new FileFilter() {
+        @Override
+        public boolean accept(File file) {
+            return file.getName().startsWith(m_name);
+        }
+    };
+
+    /**
+     * Create and initialize a store based on a default maxFileSize of 1024 kB (=1 MB)
+     * 
+     * @param agentContext
+     *            the agentcontext
+     * @param name
+     *            the name of the feedbackstore
+     */
+    public FeedbackStoreManager(AgentContext agentContext, String name) throws IOException {
+        this(agentContext, name, 1024 * 1024);
+    }
+
+    /**
+     * Create and initialize a store
+     * 
+     * @param agentContext
+     *            the agentcontext
+     * @param name
+     *            the name of the feedbackstore
+     * @param maxFileSize
+     *            the maximum size for this feedbackstore in kB
+     */
+    public FeedbackStoreManager(AgentContext agentContext, String name, int maxFileSize) throws IOException {
+        m_agentContext = agentContext;
+        m_name = name;
+        m_maxFileSize = maxFileSize;
+        m_baseDir = new File(m_agentContext.getWorkDir(), DIRECTORY_NAME);
+        if (!m_baseDir.isDirectory() && !m_baseDir.mkdirs()) {
+            throw new IllegalArgumentException("Need valid dir");
+        }
+
+        // Identify and initialize the latest store
+        SortedSet<Long> storeIDs = getAllFeedbackStoreIDs();
+        if (storeIDs.isEmpty()) {
+            m_currentStore = newFeedbackStore();
+        }
+        else {
+            m_currentStore = getLastStore(storeIDs.last());
+            try {
+                m_currentStore.init();
+            }
+            catch (IOException ex) {
+                handleException(m_currentStore, ex);
+            }
+        }
+    }
+
+    /**
+     * Close the current active store to make sure it's nice and consistent on disk
+     * 
+     * @throws IOException
+     *             if something goed wrong
+     */
+    public void close() throws IOException {
+        if (m_currentStore != null) {
+            m_currentStore.close();
+        }
+        m_currentStore = null;
+    }
+
+    /**
+     * Return a sorted set of all the feedback stores
+     * 
+     * @return a ordered set of all storeIds, oldest first
+     */
+    public SortedSet<Long> getAllFeedbackStoreIDs() throws IOException {
+        File[] files = getStoreFiles();
+        SortedSet<Long> storeIDs = new TreeSet<Long>();
+        for (int i = 0; i < files.length; i++) {
+            storeIDs.add(getStoreId(files[i]));
+        }
+        return storeIDs;
+    }
+
+    /**
+     * Write to the currently active store
+     * 
+     * @param type
+     *            the type of message
+     * @param properties
+     *            the properties to be logged
+     */
+    public void write(int type, Map<String, String> properties) throws IOException {
+        try {
+            // check if we exceed the maximum allowed store size, if we do cleanup an old file
+            // check if the current store file maximum filesize is reached, if it is the current store should be rotated
+            if (isCurrentStoreMaximumFileSizeReached()) {
+                if (isCleanupRequired()) {
+                    cleanup();
+                }
+                m_currentStore.close();
+                m_currentStore = createStore(m_currentStore.getId(), getLastLogfileNumber(m_currentStore.getId()) + 1);
+            }
+            // convert the map of properties to a dictionary
+            Dictionary<String, String> dictionary = new Hashtable<String, String>();
+            for (Entry<String, String> entry : properties.entrySet()) {
+                dictionary.put(entry.getKey(), entry.getValue());
+            }
+            // log the event
+            long nextEventId = (m_highest = getHighestEventID(m_currentStore.getId()) + 1);
+            LogEvent result = new LogEvent(null, m_currentStore.getId(), nextEventId, System.currentTimeMillis(), type, dictionary);
+            m_currentStore.append(result.getID(), result.toRepresentation().getBytes());
+        }
+        catch (IOException ex) {
+            handleException(m_currentStore, ex);
+        }
+    }
+
+    public void forceCreateNewStore() throws IOException {
+        m_currentStore = newFeedbackStore();
+    }
+
+    /**
+     * Give the highest eventId that is is present is the specified store
+     * 
+     * @param the
+     *            storeId
+     * @return the highest event present in the store
+     */
+    public long getHighestEventID(long storeID) throws IOException {
+        FeedbackStore store = getLastStore(storeID);
+        try {
+            if (m_highest == 0) {
+                store.init();
+                return (m_highest = store.getCurrent());
+            }
+            else {
+                return m_highest;
+            }
+        }
+        catch (IOException ex) {
+            handleException(store, ex);
+        }
+        finally {
+            closeIfNeeded(new FeedbackStore[] { store });
+        }
+        return -1;
+    }
+
+    /**
+     * Return all events in the store in the given range. This list might contains gaps for entries that are not
+     * present. From/to are inclusive.
+     * 
+     * @param storeId
+     *            the storeId
+     * @param eventId
+     *            the start of the range of events
+     * @param toEventId
+     *            the end of the range of events
+     */
+    public List<LogEvent> getEvents(long storeID, long fromEventID, long toEventID) throws IOException {
+        FeedbackStore[] stores = getAllStores(storeID);
+        List<LogEvent> result = new ArrayList<LogEvent>();
+        try {
+            for (FeedbackStore store : stores) {
+                try {
+
+                    if (store.getCurrent() > fromEventID) {
+                        store.reset();
+                    }
+                    while (store.hasNext()) {
+                        long eventID = store.readCurrentID();
+                        if ((eventID >= fromEventID) && (eventID <= toEventID)) {
+                            result.add(new LogEvent(new String(store.read())));
+                        }
+                        else {
+                            store.skip();
+                        }
+                    }
+                }
+                catch (Exception ex) {
+                    handleException(store, ex);
+                }
+            }
+        }
+        finally {
+            closeIfNeeded(stores);
+        }
+        return result;
+    }
+
+    /**
+     * Handle exceptions. This method will truncate the store from the point the error occurred. If the error occurred
+     * in the currently active store it will close the current store and create a new one. The original error is
+     * rethrowed.
+     * 
+     * @param store
+     *            the store where the exception happened
+     * @param exception
+     *            the original exception
+     */
+    private void handleException(FeedbackStore store, Exception exception) throws IOException {
+        logError("Exception caught while accessing feedback channel store #%d", exception, store.getId());
+        if (store == m_currentStore) {
+            m_currentStore = newFeedbackStore();
+        }
+
+        try {
+            store.truncate();
+        }
+        catch (IOException ex) {
+            logError("Exception caught while truncating feedback channel store #%d", ex, store.getId());
+        }
+        try {
+            store.close();
+        }
+        catch (IOException ex) {
+            // Not much we can do
+        }
+        if (exception instanceof IOException) {
+            throw (IOException) exception;
+        }
+        throw new IOException("Unable to read log entry: " + exception.getMessage());
+    }
+
+    /**
+     * Check if the maximum allowed size for the current store file is reached
+     * 
+     * @return is the maximum reached
+     */
+    private boolean isCurrentStoreMaximumFileSizeReached() throws IOException {
+        return (m_currentStore.getFileSize()) >= (m_maxFileSize / NUMBER_OF_FILES);
+    }
+
+    /**
+     * Check if the maximum fileSize for all the logfiles together is reached
+     * 
+     * @return is the cleanup required
+     */
+    private boolean isCleanupRequired() throws IOException {
+        return getFileSize(getStoreFiles()) >= (m_maxFileSize);
+    }
+
+    /**
+     * Removes old logfiles starting from the oldest file. It stops when there is 10% free space.
+     */
+    private void cleanup() throws IOException {
+        File[] files = getStoreFiles();
+        while (getFileSize(files) > ((m_maxFileSize) / (NUMBER_OF_FILES - 1))) {
+            File oldestFile = files[0];
+            if (oldestFile != null) {
+                oldestFile.delete();
+            }
+            files = getStoreFiles();
+        }
+    }
+
+    /**
+     * Return the filesize of the given files in Kb
+     * 
+     * @param files
+     *            a list of files
+     */
+    private long getFileSize(File[] files) {
+        long size = 0;
+        for (File file : files) {
+            size += file.length();
+        }
+        return size;
+    }
+
+    /**
+     * Return the feedbackstore for the specified storeId. If there are multiple files for this storeId the last one is
+     * returned
+     * 
+     * @param the
+     *            storeId
+     * @return the feedbackstore for that storeID
+     */
+    private FeedbackStore getLastStore(long storeID) throws IOException {
+        if (m_currentStore != null && m_currentStore.getId() == storeID) {
+            return m_currentStore;
+        }
+
+        return createStore(storeID);
+    }
+
+    /**
+     * Return all store files for this store name, sorted by lastModifiedDate
+     * 
+     * @return a sorted list of files, oldest file first
+     */
+    private File[] getStoreFiles() throws IOException {
+        File[] files = (File[]) m_baseDir.listFiles(m_fileFilter);
+        if (files == null) {
+            throw new IOException("Unable to list store files in " + m_baseDir.getAbsolutePath());
+        }
+        // sort files on storeId and fileNumber
+        Arrays.sort(files, new Comparator<File>() {
+            public int compare(File f1, File f2) {
+                int result = (int) (getStoreId(f1) - getStoreId(f2));
+                if (result == 0) {
+                    int f1Number = getLogfileNumber(f1.getName(), getStoreName(getStoreId(f1)));
+                    int f2Number = getLogfileNumber(f2.getName(), getStoreName(getStoreId(f2)));
+                    result = f1Number - f2Number;
+                }
+                return result;
+            }
+
+        });
+        return files;
+    }
+
+    /**
+     * Create a new empty feedbackstore with a new storeId.
+     * 
+     * @return A new feedbackstore with a new storeID
+     */
+    private FeedbackStore newFeedbackStore() throws IOException {
+        long storeId = System.currentTimeMillis();
+        while (!(new File(m_baseDir, getStoreName(storeId) + ".1")).createNewFile()) {
+            storeId++;
+        }
+        return new FeedbackStore(new File(m_baseDir, getStoreName(storeId) + ".1"), storeId);
+    }
+
+    /**
+     * Return all feedbackstores for a single storeId.
+     * 
+     * @param storeId
+     *            the storeId
+     * @return a list of all feedbackstores for this storeId
+     */
+    private FeedbackStore[] getAllStores(long storeId) throws IOException {
+        List<FeedbackStore> stores = new ArrayList<FeedbackStore>();
+        File[] files = getStoreFiles();
+        for (File file : files) {
+            if (storeId == getStoreId(file)) {
+                stores.add(new FeedbackStore(file, storeId));
+            }
+        }
+
+        return stores.toArray(new FeedbackStore[0]);
+    }
+
+    /**
+     * Create the feedbackstore for the specified storeId. If this storeId is already backed by files on disk then the
+     * last file will be used to create this feedbackstore.
+     * 
+     * @param storeId
+     *            the storeId
+     * @return the newest feedbackstore for this storeID
+     */
+    private FeedbackStore createStore(long storeId) throws IOException {
+        return createStore(storeId, getLastLogfileNumber(storeId));
+    }
+
+    /**
+     * Create a new feedbackstore with the specified storeId and fileNumber.
+     * 
+     * @param storeId
+     *            the storeId
+     * @param fileNumber
+     *            the new sequence number for this storeID
+     * @return a feedbackstore
+     */
+    private FeedbackStore createStore(long storeId, int fileNumber) throws IOException {
+        if (isCleanupRequired()) {
+            cleanup();
+        }
+        return new FeedbackStore(new File(m_baseDir, getStoreName(storeId) + "." + fileNumber), storeId);
+    }
+
+    /**
+     * Returns the last file for the specified storeId
+     * 
+     * @param storeId
+     *            the storeID
+     * @return the latest (newest) file backing the specified storeID
+     */
+    private int getLastLogfileNumber(long storeId) throws IOException {
+        File[] storeFiles = getStoreFiles();
+        String storeName = getStoreName(storeId);
+
+        int lastNumber = 1;
+        for (File file : storeFiles) {
+            String fileName = file.getName();
+            if (fileName.contains(storeName)) {
+                lastNumber = getLogfileNumber(fileName, storeName);
+            }
+        }
+        return lastNumber;
+    }
+
+    /**
+     * Get the name of the store for a storeId
+     * 
+     * @param storeId
+     *            the storeId
+     * @return the basename of the file
+     */
+    private String getStoreName(long storeId) {
+        return m_name + "-" + storeId;
+    }
+
+    private int getLogfileNumber(String logfileName, String storeName) {
+        String extension = logfileName.replace(storeName + ".", "");
+        return Integer.parseInt(extension);
+
+    }
+
+    /**
+     * Parse the storeId from the specified fileName
+     * 
+     * @param storeFile
+     *            a store file
+     * @return the storeId
+     */
+    private long getStoreId(File storeFile) {
+        // remove the extension from the filename
+        String storeName = storeFile.getName().replaceFirst("[.][^.]+$", "");
+        return Long.parseLong(storeName.replace(m_name + "-", ""));
+    }
+
+    /**
+     * Close all the feedbackstores if necessary
+     * 
+     * @param stores
+     *            a list of stores
+     */
+    private void closeIfNeeded(FeedbackStore[] stores) {
+        for (FeedbackStore store : stores) {
+            if (store != m_currentStore) {
+                try {
+                    store.close();
+                }
+                catch (IOException ex) {
+                    // Not much we can do
+                }
+            }
+        }
+    }
+
+    private void logError(String msg, Exception cause, Object... args) {
+        m_agentContext.getHandler(LoggingHandler.class).logError("feedbackChannel(" + m_name + ")", msg, cause, args);
+    }
+
+}

Added: ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/FeedbackChannelImplTest.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/FeedbackChannelImplTest.java?rev=1524723&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/FeedbackChannelImplTest.java (added)
+++ ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/FeedbackChannelImplTest.java Thu Sep 19 13:02:22 2013
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.agent.impl;
+
+import static org.easymock.EasyMock.expect;
+import static org.testng.Assert.assertEquals;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.ace.agent.ConnectionHandler;
+import org.apache.ace.agent.DiscoveryHandler;
+import org.apache.ace.agent.IdentificationHandler;
+import org.apache.ace.agent.testutil.BaseAgentTest;
+import org.apache.ace.agent.testutil.TestWebServer;
+import org.apache.ace.log.LogDescriptor;
+import org.apache.ace.log.LogEvent;
+import org.apache.ace.range.SortedRangeSet;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Testing {@link FeedbackChannelImpl}.
+ */
+public class FeedbackChannelImplTest extends BaseAgentTest {
+
+    private static final int PORT = 8884;
+
+    private AgentContextImpl m_agentContext;
+    private TestWebServer m_webServer;
+    private FeedbackChannelImpl m_feedbackChannelImpl;
+
+    static class TestSendFeedbackServlet extends HttpServlet {
+        private static final long serialVersionUID = 1L;
+
+        List<LogEvent> m_events = new ArrayList<LogEvent>();
+
+        @SuppressWarnings("deprecation")
+        @Override
+        protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+            resp.setContentType("text/plain");
+            BufferedReader reader = new BufferedReader(new InputStreamReader(req.getInputStream()));
+            String eventString;
+            while ((eventString = reader.readLine()) != null) {
+                LogEvent event = new LogEvent(eventString);
+                m_events.add(event);
+            }
+            resp.setStatus(200, "voila");
+        }
+    }
+
+    static class TestQueryFeedbackServlet extends HttpServlet {
+        private static final long serialVersionUID = 1L;
+
+        @SuppressWarnings("deprecation")
+        @Override
+        protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
+            String targetID = request.getParameter("tid");
+            long logID = Long.parseLong(request.getParameter("logid"));
+            response.getOutputStream().print(new LogDescriptor(targetID, logID, new SortedRangeSet(new long[0])).toRepresentation());
+            response.setStatus(200, "voila");
+        }
+    }
+
+    @BeforeMethod
+    public void setUpAgain(Method method) throws Exception {
+        // this setup is needed because a real Feedbackstore is initialized in the constructor.
+        m_agentContext = mockAgentContext();
+
+        URL serverURL = new URL("http://localhost:" + PORT + "/");
+
+        m_webServer = new TestWebServer(PORT, "/", "generated");
+        m_webServer.start();
+
+        DiscoveryHandler discoveryHandler = addTestMock(DiscoveryHandler.class);
+        expect(discoveryHandler.getServerUrl()).andReturn(serverURL).anyTimes();
+
+        IdentificationHandler identificationHandler = addTestMock(IdentificationHandler.class);
+        expect(identificationHandler.getAgentId()).andReturn("identification").anyTimes();
+
+        replayTestMocks();
+        m_agentContext.setHandler(DiscoveryHandler.class, discoveryHandler);
+        m_agentContext.setHandler(IdentificationHandler.class, identificationHandler);
+        m_agentContext.setHandler(ConnectionHandler.class, new ConnectionHandlerImpl());
+        m_feedbackChannelImpl = new FeedbackChannelImpl(m_agentContext, "test");
+        m_agentContext.start();
+    }
+
+    @AfterTest
+    public void tearDownAgain() throws Exception {
+        m_webServer.stop();
+        m_agentContext.stop();
+        verifyTestMocks();
+        clearTestMocks();
+    }
+
+    @Test
+    public void testSendFeedback() throws Exception {
+        TestSendFeedbackServlet sendServlet = new TestSendFeedbackServlet();
+        m_webServer.addServlet(sendServlet, "/test/send");
+        TestQueryFeedbackServlet queryServlet = new TestQueryFeedbackServlet();
+        m_webServer.addServlet(queryServlet, "/test/query");
+
+        m_feedbackChannelImpl.write(1, new HashMap<String, String>());
+        m_feedbackChannelImpl.sendFeedback();
+
+        assertEquals(sendServlet.m_events.size(), 1);
+    }
+}

Added: ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/FeedbackStoreManagerTest.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/FeedbackStoreManagerTest.java?rev=1524723&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/FeedbackStoreManagerTest.java (added)
+++ ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/FeedbackStoreManagerTest.java Thu Sep 19 13:02:22 2013
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.agent.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.SortedSet;
+
+import org.apache.ace.agent.AgentContext;
+import org.apache.ace.agent.testutil.BaseAgentTest;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Testing {@link FeedbackStoreManager}.
+ */
+public class FeedbackStoreManagerTest extends BaseAgentTest {
+
+    private AgentContext m_agentContext;
+
+    @BeforeMethod
+    public void setUpAgain(Method method) throws Exception {
+        m_agentContext = mockAgentContext();
+        replayTestMocks();
+    }
+
+    @AfterMethod
+    public void tearDownAgain(Method method) throws Exception {
+        verifyTestMocks();
+        clearTestMocks();
+    }
+
+    @Test
+    public void testEmptyRepository() throws Exception {
+        FeedbackStoreManager feedbackStoreManager = new FeedbackStoreManager(m_agentContext, "test");
+        assertNotNull(getStoreID(feedbackStoreManager));
+    }
+
+    @Test
+    public void testExceptionHandling() throws Exception {
+        FeedbackStoreManager feedbackStoreManager = new FeedbackStoreManager(m_agentContext, "test");
+
+        feedbackStoreManager.forceCreateNewStore();
+
+        SortedSet<Long> allFeedbackStoreIDs = feedbackStoreManager.getAllFeedbackStoreIDs();
+        assertEquals(allFeedbackStoreIDs.size(), 2);
+    }
+
+    @Test
+    public void testReadWriteLogEvents() throws Exception {
+        FeedbackStoreManager feedbackStoreManager = new FeedbackStoreManager(m_agentContext, "test");
+        long storeID = getStoreID(feedbackStoreManager);
+        assertEquals(feedbackStoreManager.getHighestEventID(storeID), 0);
+
+        feedbackStoreManager.write(1, new HashMap<String, String>());
+        assertEquals(feedbackStoreManager.getHighestEventID(storeID), 1);
+
+        assertEquals(feedbackStoreManager.getEvents(storeID, 1, 1).size(), 1);
+    }
+
+    @Test
+    public void testReadFromOldStore() throws Exception {
+        FeedbackStoreManager feedbackStoreManager = new FeedbackStoreManager(m_agentContext, "test");
+        long storeID = getStoreID(feedbackStoreManager);
+
+        assertEquals(feedbackStoreManager.getHighestEventID(storeID), 0);
+        feedbackStoreManager.write(1, new HashMap<String, String>());
+        assertEquals(feedbackStoreManager.getHighestEventID(storeID), 1);
+
+        assertEquals(feedbackStoreManager.getEvents(storeID, 1, 1).size(), 1);
+
+        feedbackStoreManager.forceCreateNewStore();
+
+        assertEquals(feedbackStoreManager.getEvents(storeID, 1, 1).size(), 1);
+    }
+
+    @Test
+    public void testLogfileRotation() throws Exception {
+        int maxSize = 20;
+
+        FeedbackStoreManager feedbackStoreManager = new FeedbackStoreManager(m_agentContext, "test", maxSize);
+        long storeID = getStoreID(feedbackStoreManager);
+
+        assertEquals(feedbackStoreManager.getHighestEventID(storeID), 0);
+        // absolutely exceed the set filesize for this store
+        for (int i = 0; i < 1000; i++) {
+            feedbackStoreManager.write(1, new HashMap<String, String>());
+        }
+
+        File[] logFiles = getLogFiles();
+
+        assertTrue(logFiles.length <= 10);
+        // first available id
+        FeedbackStore first = new FeedbackStore(logFiles[0], getStoreId(logFiles[0]));
+        first.reset();
+        long firstId = first.readCurrentID() - 1;
+
+        assertEquals(feedbackStoreManager.getEvents(storeID, 1, 1000).size(), (1000 - firstId));
+
+        long logFileSize = 0;
+        for (File file : logFiles) {
+            logFileSize += file.length();
+        }
+        assertTrue(logFileSize < (maxSize * 1024));
+    }
+
+    private long getStoreID(FeedbackStoreManager feedbackStoreManager) throws Exception {
+        SortedSet<Long> allFeedbackStoreIDs = feedbackStoreManager.getAllFeedbackStoreIDs();
+        assertEquals(allFeedbackStoreIDs.size(), 1);
+        return allFeedbackStoreIDs.first();
+    }
+
+    private File[] getLogFiles() {
+        File[] files = new File(m_agentContext.getWorkDir(), "feedback").listFiles();
+        // sort files on storeId and fileNumber
+        Arrays.sort(files, new Comparator<File>() {
+            public int compare(File f1, File f2) {
+                int result = (int) (getStoreId(f1) - getStoreId(f2));
+                if (result == 0) {
+                    int f1Number = getLogfileNumber(f1.getName(), getStoreName(getStoreId(f1)));
+                    int f2Number = getLogfileNumber(f2.getName(), getStoreName(getStoreId(f2)));
+                    result = f1Number - f2Number;
+                }
+                return result;
+            }
+
+        });
+        return files;
+    }
+
+    private String getStoreName(long storeId) {
+        return "test-" + storeId;
+    }
+
+    private long getStoreId(File storeFile) {
+        // remove the extension from the filename
+        String storeName = storeFile.getName().replaceFirst("[.][^.]+$", "");
+        return Long.parseLong(storeName.replace("test-", ""));
+    }
+
+    private int getLogfileNumber(String logfileName, String storeName) {
+        String extension = logfileName.replace(storeName + ".", "");
+        return Integer.parseInt(extension);
+
+    }
+}



Mime
View raw message