Author: jawi
Date: Wed Oct 2 08:54:50 2013
New Revision: 1528378
URL: http://svn.apache.org/r1528378
Log:
Refactored the agent feedback store implementation:
- made the store thread-safe on its own, allowing multiple threads to
access it without fear of corruption;
- added some test cases to test the thread-safety of the store;
- simplified some constructs in the feedback store manager.
Added:
ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/FeedbackStoreTest.java (with props)
Modified:
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackStore.java
ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackStoreManager.java
ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/FeedbackChannelImplTest.java
ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/FeedbackStoreManagerTest.java
ace/trunk/org.apache.ace.authentication.itest/src/org/apache/ace/it/authentication/LogAuthenticationTest.java
ace/trunk/org.apache.ace.client.repository.itest/src/org/apache/ace/it/repositoryadmin/ClientAutomationTest.java
ace/trunk/org.apache.ace.client.repository.itest/src/org/apache/ace/it/repositoryadmin/StatefulTargetRepositoryTest.java
ace/trunk/org.apache.ace.client.repository/src/org/apache/ace/client/repository/stateful/impl/StatefulTargetObjectImpl.java
ace/trunk/org.apache.ace.client.repository/test/org/apache/ace/client/repository/impl/ACE308Test.java
ace/trunk/org.apache.ace.client.rest/src/org/apache/ace/client/rest/LogEventSerializer.java
ace/trunk/org.apache.ace.log.itest/src/org/apache/ace/it/log/LogIntegrationTest.java
ace/trunk/org.apache.ace.log.server.store.itest/src/org/apache/ace/log/server/store/tests/MongoLogStoreTest.java
ace/trunk/org.apache.ace.log.server.ui/src/org/apache/ace/log/server/ui/LogViewerExtension.java
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/mongo/MongoLogStore.java
ace/trunk/org.apache.ace.log/src/org/apache/ace/log/target/store/impl/LogStoreImpl.java
ace/trunk/org.apache.ace.log/test/org/apache/ace/log/LogEventTest.java
ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/servlet/LogServletTest.java
ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/ServerLogStoreTester.java
ace/trunk/org.apache.ace.log/test/org/apache/ace/log/target/task/LogSyncTaskTest.java
Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java?rev=1528378&r1=1528377&r2=1528378&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackChannelImpl.java Wed Oct 2 08:54:50 2013
@@ -31,7 +31,6 @@ import java.io.Writer;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLConnection;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
@@ -70,12 +69,12 @@ public class FeedbackChannelImpl impleme
m_storeManager = new FeedbackStoreManager(agentContext, name);
}
- public synchronized void stop() throws IOException {
+ public void stop() throws IOException {
m_storeManager.close();
}
@Override
- public synchronized void sendFeedback() throws RetryAfterException, IOException {
+ public void sendFeedback() throws RetryAfterException, IOException {
String identification = getIdentification();
URL serverURL = getServerURL();
@@ -96,8 +95,8 @@ public class FeedbackChannelImpl impleme
if (sendConnection instanceof HttpURLConnection) {
((HttpURLConnection) sendConnection).setChunkedStreamingMode(8192);
}
-
writer = new BufferedWriter(new OutputStreamWriter(sendConnection.getOutputStream()));
+
SortedSet<Long> storeIDs = m_storeManager.getAllFeedbackStoreIDs();
for (Long storeID : storeIDs) {
URL queryURL = new URL(serverURL, m_name + "/" + COMMAND_QUERY + "?" + PARAMETER_TARGETID + "=" + identification + "&" + PARAMETER_LOGID + "=" + storeID);
@@ -121,7 +120,7 @@ public class FeedbackChannelImpl impleme
}
@Override
- public synchronized void write(int type, Map<String, String> properties) throws IOException {
+ public void write(int type, Map<String, String> properties) throws IOException {
m_storeManager.write(type, properties);
}
@@ -174,32 +173,41 @@ public class FeedbackChannelImpl impleme
private void synchronizeStore(long storeID, InputStream queryInput, Writer sendWriter) throws IOException {
long highestLocal = m_storeManager.getHighestEventID(storeID);
- if (highestLocal == 0) {
+ if (highestLocal <= 0) {
+ // manager is closed...
return;
}
+
SortedRangeSet localRange = new SortedRangeSet("1-" + highestLocal);
SortedRangeSet remoteRange = getQueryDescriptor(queryInput).getRangeSet();
SortedRangeSet delta = remoteRange.diffDest(localRange);
RangeIterator rangeIterator = delta.iterator();
if (!rangeIterator.hasNext()) {
+ // nothing to sync...
return;
}
- String identification = getIdentification();
long lowest = rangeIterator.next();
long highest = delta.getHigh();
- if (lowest <= highest) {
- List<Event> events = m_storeManager.getEvents(storeID, lowest, highestLocal > highest ? highest : highestLocal);
- Iterator<Event> iter = events.iterator();
- while (iter.hasNext()) {
- Event current = (Event) iter.next();
- while ((current.getID() > lowest) && rangeIterator.hasNext()) {
- lowest = rangeIterator.next();
- }
- if (current.getID() == lowest) {
- Event event = new Event(identification, current);
- sendWriter.write(event.toRepresentation());
- sendWriter.write("\n");
- }
+ if (lowest > highest) {
+ // nothing to sync...
+ return;
+ }
+
+ List<Event> events = m_storeManager.getEvents(storeID, lowest, highestLocal > highest ? highest : highestLocal);
+ if (events == null) {
+ // manager is closed...
+ return;
+ }
+
+ String identification = getIdentification();
+ for (Event current : events) {
+ while ((current.getID() > lowest) && rangeIterator.hasNext()) {
+ lowest = rangeIterator.next();
+ }
+ if (current.getID() == lowest) {
+ Event event = new Event(identification, current);
+ sendWriter.write(event.toRepresentation());
+ sendWriter.write("\n");
}
}
}
Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackStore.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackStore.java?rev=1528378&r1=1528377&r2=1528378&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackStore.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackStore.java Wed Oct 2 08:54:50 2013
@@ -22,7 +22,12 @@ import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.LockSupport;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* The general idea is to provide easy access to a file of records. It supports iterating over records both by skipping
@@ -30,9 +35,35 @@ import java.util.concurrent.atomic.Atomi
* record in case of an error -- hence, a call to truncate after an IOException might make the store readable again.
*/
public class FeedbackStore {
+ /**
+ * Denotes a single record stored in a FeedbackStore.
+ */
+ public static class Record implements Comparable<Record> {
+ public final long m_id;
+ public final byte[] m_entry;
+
+ /**
+ * Creates a new {@link Record} instance.
+ */
+ public Record(long id, byte[] entry) {
+ m_id = id;
+ m_entry = entry;
+ }
+
+ @Override
+ public int compareTo(Record other) {
+ return (m_id < other.m_id ? -1 : (m_id == other.m_id ? 0 : 1));
+ }
+ }
+
+ private final File m_storeFile;
private final RandomAccessFile m_store;
private final long m_id;
- private final AtomicLong m_current;
+
+ private long m_lowestEventID;
+ private long m_highestEventID;
+
+ private final ReadWriteLock m_rwLock = new ReentrantReadWriteLock();
/**
* Create a new File based Store.
@@ -45,179 +76,252 @@ public class FeedbackStore {
* in case the file is not rw.
*/
FeedbackStore(File store, long id) throws IOException {
- m_store = new RandomAccessFile(store, "rwd");
+ m_storeFile = store;
+ m_store = new RandomAccessFile(store, "rw");
m_id = id;
- m_current = new AtomicLong(0);
+
+ init();
}
/**
- * Get the id of the current record.
+ * Store the given record data as the next record.
*
- * @return the idea of the current record.
+ * @param entry
+ * the data of the record to store.
+ * @throws IOException
+ * in case of any IO error.
*/
- public long getCurrent() throws IOException {
- long pos = m_store.getFilePointer();
- if (m_store.length() == 0) {
- return 0;
- }
- long result = 0;
+ public void append(long id, byte[] entry) throws IOException {
+ Lock writeLock = m_rwLock.writeLock();
+
+ writeLock.lock();
try {
- m_store.seek(m_current.get());
- result = readCurrentID();
- m_store.seek(pos);
+ long pos = m_store.getFilePointer();
+ try {
+ long current = m_store.length();
+ // Go to end of file...
+ m_store.seek(current);
+
+ m_store.writeLong(id);
+ m_store.writeInt(entry.length);
+ m_store.write(entry);
+
+ // System.out.printf("Appended %d bytes for record #%d at %d (current = %d).%n", entry.length, id,
+ // current, m_store.getFilePointer());
+
+ // Go back to start of record...
+ m_store.seek(current);
+
+ updateIDs(id);
+ }
+ catch (IOException ex) {
+ handle(pos, ex);
+ }
}
- catch (IOException ex) {
- handle(pos, ex);
+ finally {
+ writeLock.unlock();
}
- return result;
}
/**
- * Get the log id of this store.
+ * Release all resources.
*
- * @return the log id of this store.
+ * @throws IOException
+ * in case of any IO error.
*/
- public long getId() {
- return m_id;
+ public void close() throws IOException {
+ Lock writeLock = m_rwLock.writeLock();
+
+ writeLock.lock();
+ try {
+ m_store.close();
+ }
+ finally {
+ writeLock.unlock();
+ }
}
/**
- * Reset the store to the beginning of the records
+ * Return the filesize for this file
*
- * @throws java.io.IOException
- * in case of an IO error.
+ * @return the size in bytes
+ * @throws IOException
+ * in case of any IO error.
*/
- public void reset() throws IOException {
- m_store.seek(0);
- m_current.set(0);
+ public long getFileSize() throws IOException {
+ Lock readLock = m_rwLock.readLock();
+
+ readLock.lock();
+ try {
+ return m_store.length();
+ }
+ finally {
+ readLock.unlock();
+ }
}
/**
- * Determine whether there are any records left based on the current postion.
+ * Get the ID of the first written event record, which is most of the times the first event record.
*
- * @return <code>true</code> if there are still records to be read.
- * @throws IOException
- * in case of an IO error.
+ * @return the ID of the first record, >= 0.
*/
- public boolean hasNext() throws IOException {
- return m_store.getFilePointer() < m_store.length();
+ public long getFirstEventID() {
+ Lock readLock = m_rwLock.readLock();
+
+ readLock.lock();
+ try {
+ return m_lowestEventID;
+ }
+ finally {
+ readLock.unlock();
+ }
}
/**
- * Read a single logevent from this file
+ * Get the log id of this store.
*
- * @return the bytes for a single logevent
- * @throws IOException
- * in case of an IO error.
+ * @return the log id of this store.
*/
- @SuppressWarnings("unused")
- public byte[] read() throws IOException {
- long pos = m_store.getFilePointer();
- try {
- if (pos < m_store.length()) {
- long current = m_store.getFilePointer();
- long id = m_store.readLong();
- int next = m_store.readInt();
- byte[] entry = new byte[next];
- m_store.readFully(entry);
- setCurrent(current);
- return entry;
- }
- }
- catch (IOException ex) {
- handle(pos, ex);
- }
- return null;
+ public long getId() {
+ return m_id;
}
/**
- * Return the id for the logevent at the current position in the file
+ * Get the ID of the last written event record, which is most of the times the current event record.
*
- * @return the event id
- * @throws IOException
- * in case of an IO error.
+ * @return the ID of the current record, >= 0.
*/
- public long readCurrentID() throws IOException {
- long pos = m_store.getFilePointer();
+ public long getLastEventID() {
+ Lock readLock = m_rwLock.readLock();
+
+ readLock.lock();
try {
- if (pos < m_store.length()) {
- long id = m_store.readLong();
- return id;
- }
- }
- catch (IOException ex) {
- handle(pos, ex);
+ return m_highestEventID;
}
finally {
- m_store.seek(pos);
+ readLock.unlock();
}
- return -1;
}
- /**
- * Make sure the store is readable. As a result, the store is at the end of the records.
- *
- * @throws IOException
- * in case of any IO error.
- */
- public void init() throws IOException {
- reset();
+ public List<Record> getRecords(long fromId, long toId) throws IOException {
+ RandomAccessFile raf = null;
+
+ List<Record> result = new ArrayList<Record>();
+
try {
- while (true) {
- skip();
+ // Take a NEW file instance as to ensure we do not
+ // disturb any concurrent writes while initializing...
+ raf = new RandomAccessFile(m_storeFile, "r");
+
+ // the length is live-updated, so we should be able
+ // to get as close as possible to the last written record...
+ while (raf.getFilePointer() < raf.length()) {
+ int headerSize = 12; // 8 for long, 4 for int...
+ waitToRead(raf, headerSize);
+
+ long id = raf.readLong();
+ int entrySize = raf.readInt();
+ waitToRead(raf, entrySize);
+
+ if ((id >= fromId) && (id <= toId)) {
+ byte[] buffer = new byte[entrySize];
+ raf.readFully(buffer);
+
+ result.add(new Record(id, buffer));
+ }
+ else {
+ int actual = 0;
+ do {
+ actual += raf.skipBytes(entrySize - actual);
+ }
+ while (actual < entrySize);
+ }
}
}
- catch (EOFException ex) {
- // done
+ finally {
+ try {
+ if (raf != null) {
+ raf.close();
+ }
+ }
+ catch (IOException ignored) {
+ }
}
+
+ return result;
}
/**
- * Skip the next record if there is any.
+ * Make sure the store is readable. As a result, the store is at the end of the records.
*
* @throws IOException
- * in case of any IO error or if there is no record left.
+ * in case of any IO error.
*/
- @SuppressWarnings("unused")
- public void skip() throws IOException {
- long pos = m_store.getFilePointer();
+ void init() throws IOException {
+ Lock writeLock = m_rwLock.writeLock();
+ RandomAccessFile raf = null;
+
try {
- long id = m_store.readLong();
- int next = m_store.readInt();
- if (m_store.length() < next + m_store.getFilePointer()) {
- throw new IOException("Unexpected end of file");
+ // Take a NEW file instance as to ensure we do not
+ // disturb any concurrent writes while initializing...
+ raf = new RandomAccessFile(m_storeFile, "r");
+
+ long lowest = Long.MAX_VALUE;
+ long highest = Long.MIN_VALUE;
+ boolean empty = true;
+
+ // the length is live-updated, so we should be able
+ // to get as close as possible to the last written record...
+ while (raf.getFilePointer() < raf.length()) {
+ long id = skip(raf);
+
+ lowest = Math.min(lowest, id);
+ highest = Math.max(highest, id);
+ empty = false;
+ }
+
+ if (empty) {
+ lowest = Long.MAX_VALUE;
+ highest = 0;
+ }
+
+ writeLock.lock();
+ try {
+ m_lowestEventID = lowest;
+ m_highestEventID = highest;
+ }
+ finally {
+ writeLock.unlock();
}
- m_store.skipBytes(next);
- setCurrent(pos);
- pos = m_store.getFilePointer();
+
+// System.out.printf("Init, range = %d..%d.%n", m_lowestEventID, m_highestEventID);
}
- catch (IOException ex) {
- handle(pos, ex);
+ finally {
+ try {
+ if (raf != null) {
+ raf.close();
+ }
+ }
+ catch (IOException ignored) {
+ }
}
}
/**
- * Store the given record data as the next record.
+ * Reset the store to the beginning of the records
*
- * @param entry
- * the data of the record to store.
- * @throws IOException
- * in case of any IO error.
+ * @throws java.io.IOException
+ * in case of an IO error.
*/
- public void append(long id, byte[] entry) throws IOException {
- long pos = m_store.getFilePointer();
- long length = m_store.length();
+ void reset() throws IOException {
+ Lock writeLock = m_rwLock.writeLock();
+
+ writeLock.lock();
try {
- m_store.seek(length);
- long current = m_store.getFilePointer();
- m_store.writeLong(id);
- m_store.writeInt(entry.length);
- m_store.write(entry);
- m_store.seek(pos);
- setCurrent(current);
+ m_store.seek(0);
}
- catch (IOException ex) {
- handle(pos, ex);
+ finally {
+ writeLock.unlock();
}
}
@@ -228,28 +332,15 @@ public class FeedbackStore {
* in case of any IO error.
*/
public void truncate() throws IOException {
- m_store.setLength(m_store.getFilePointer());
- }
-
- /**
- * Release any resources.
- *
- * @throws IOException
- * in case of any IO error.
- */
- public void close() throws IOException {
- m_store.close();
- }
+ Lock writeLock = m_rwLock.writeLock();
- /**
- * Return the filesize for this file
- *
- * @return the size in bytes
- * @throws IOException
- * in case of any IO error.
- */
- public long getFileSize() throws IOException {
- return m_store.length();
+ writeLock.lock();
+ try {
+ m_store.setLength(m_store.getFilePointer());
+ }
+ finally {
+ writeLock.unlock();
+ }
}
private void handle(long pos, IOException exception) throws IOException {
@@ -265,11 +356,52 @@ public class FeedbackStore {
throw exception;
}
- private void setCurrent(long pos) {
- long old;
+ /**
+ * Skips an entire record for the given {@link RandomAccessFile}, assuming it is placed at the beginning of a
+ * record!
+ *
+ * @param raf
+ * the {@link RandomAccessFile} to skip a record in, cannot be <code>null</code>.
+ * @return the event ID of the skipped record.
+ * @throws IOException
+ * in case of I/O errors.
+ */
+ private long skip(RandomAccessFile raf) throws IOException {
+ int headerSize = 12; // 8 for long, 4 for int...
+ waitToRead(raf, headerSize);
+
+ long lastId = raf.readLong();
+ int entrySize = raf.readInt();
+
+ waitToRead(raf, entrySize);
+
+ int actual = 0;
do {
- old = m_current.get();
+ actual += raf.skipBytes(entrySize - actual);
+ }
+ while (actual < entrySize);
+
+ return lastId;
+ }
+
+ private void updateIDs(long eventID) {
+ if (eventID < m_lowestEventID) {
+ m_lowestEventID = eventID;
+ }
+ if (eventID > m_highestEventID) {
+ m_highestEventID = eventID;
+ }
+ }
+
+ private void waitToRead(RandomAccessFile raf, int bytesNeeded) throws IOException {
+ int tryCount = 2000;
+ while (tryCount-- > 0 && (raf.getFilePointer() + bytesNeeded) > raf.length()) {
+ // Looking at a file that is changing, so it might well that the size is changing, wait a little and try
+ // again (wait 1 usec, 2 msec total waiting time)...
+ LockSupport.parkNanos(1000L);
+ }
+ if (tryCount <= 0) {
+ throw new EOFException("Unexpected end of file, expected at least " + bytesNeeded + " additional bytes!");
}
- while (!m_current.compareAndSet(old, pos));
}
}
Modified: ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackStoreManager.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackStoreManager.java?rev=1528378&r1=1528377&r2=1528378&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackStoreManager.java (original)
+++ ace/trunk/org.apache.ace.agent/src/org/apache/ace/agent/impl/FeedbackStoreManager.java Wed Oct 2 08:54:50 2013
@@ -23,17 +23,23 @@ import java.io.FileFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Comparator;
-import java.util.Dictionary;
-import java.util.Hashtable;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
+import java.util.SortedMap;
import java.util.SortedSet;
+import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.ace.agent.AgentContext;
+import org.apache.ace.agent.IdentificationHandler;
import org.apache.ace.agent.LoggingHandler;
+import org.apache.ace.agent.impl.FeedbackStore.Record;
import org.apache.ace.feedback.Event;
/**
@@ -46,18 +52,21 @@ import org.apache.ace.feedback.Event;
* This managerclass takes care of the splitting of logs over multiple files, cleanup of old files, etc.
*/
public class FeedbackStoreManager {
-
- // This is the number of files the maxFileSize of the is split over
- private static final int NUMBER_OF_FILES = 10;
private static final String DIRECTORY_NAME = "feedback";
+ private static final int DEFAULT_STORE_SIZE = 1024 * 1024; // 1 MB
+ private static final int DEFAULT_FILE_SIZE = DEFAULT_STORE_SIZE / 10;
private final AgentContext m_agentContext;
private final String m_name;
private final File m_baseDir;
+ /** the maximum size of all store files together. */
+ private final int m_maxStoreSize;
+ /** the maximum size of a single store file. */
private final int m_maxFileSize;
- private FeedbackStore m_currentStore;
- private long m_highest;
+ private final AtomicBoolean m_closed;
+ private final AtomicReference<FeedbackStore> m_currentStoreRef;
+ private final SortedMap<Long, SortedSet<Integer>> m_storeFileIdx;
private final FileFilter m_fileFilter = new FileFilter() {
@Override
@@ -75,41 +84,74 @@ public class FeedbackStoreManager {
* the name of the feedbackstore
*/
public FeedbackStoreManager(AgentContext agentContext, String name) throws IOException {
- this(agentContext, name, 1024 * 1024);
+ this(agentContext, name, DEFAULT_STORE_SIZE, DEFAULT_FILE_SIZE);
}
/**
* Create and initialize a store
*
* @param agentContext
- * the agentcontext
+ * the agent context
* @param name
- * the name of the feedbackstore
+ * the name of the store
+ * @param maxStoreSize
+ * the maximum size for this store, in bytes;
* @param maxFileSize
- * the maximum size for this feedbackstore in kB
+ * the maximum size for one file, in bytes.
*/
- public FeedbackStoreManager(AgentContext agentContext, String name, int maxFileSize) throws IOException {
+ public FeedbackStoreManager(AgentContext agentContext, String name, int maxStoreSize, int maxFileSize) throws IOException {
m_agentContext = agentContext;
m_name = name;
+ m_maxStoreSize = maxStoreSize;
m_maxFileSize = maxFileSize;
+
+ if (m_maxFileSize > m_maxStoreSize) {
+ throw new IllegalArgumentException("Maximum file size cannot exceed maximum store size!");
+ }
+
+ m_closed = new AtomicBoolean(false);
+
m_baseDir = new File(m_agentContext.getWorkDir(), DIRECTORY_NAME);
if (!m_baseDir.isDirectory() && !m_baseDir.mkdirs()) {
throw new IllegalArgumentException("Need valid dir");
}
- // Identify and initialize the latest store
- SortedSet<Long> storeIDs = getAllFeedbackStoreIDs();
- if (storeIDs.isEmpty()) {
- m_currentStore = newFeedbackStore();
- }
- else {
- m_currentStore = getLastStore(storeIDs.last());
- try {
- m_currentStore.init();
+ m_currentStoreRef = new AtomicReference<FeedbackStore>();
+ m_storeFileIdx = new TreeMap<Long, SortedSet<Integer>>();
+
+ Pattern p = Pattern.compile(m_name + "-(\\d+).(\\d+)");
+ File[] allFiles = m_baseDir.listFiles(m_fileFilter);
+ for (File file : allFiles) {
+ Matcher m = p.matcher(file.getName());
+ if (m.matches()) {
+ long storeId = Long.valueOf(m.group(1));
+ int fileNumber = Integer.valueOf(m.group(2));
+
+ SortedSet<Integer> storeFileNos = m_storeFileIdx.get(storeId);
+ if (storeFileNos == null) {
+ storeFileNos = new TreeSet<Integer>();
+ m_storeFileIdx.put(storeId, storeFileNos);
+ }
+ storeFileNos.add(fileNumber);
}
- catch (IOException ex) {
- handleException(m_currentStore, ex);
+ }
+
+ // Identify and initialize the latest store...
+ FeedbackStore store = null;
+ try {
+ if (m_storeFileIdx.isEmpty()) {
+ store = newFeedbackStore();
+ }
+ else {
+ Long lastStoreId = m_storeFileIdx.lastKey();
+ Integer fileNo = m_storeFileIdx.get(lastStoreId).last();
+
+ store = createStore(lastStoreId, fileNo);
}
+ setStore(store);
+ }
+ catch (IOException ex) {
+ handleException(store, ex);
}
}
@@ -120,10 +162,9 @@ public class FeedbackStoreManager {
* if something goed wrong
*/
public void close() throws IOException {
- if (m_currentStore != null) {
- m_currentStore.close();
+ if (m_closed.compareAndSet(false, true)) {
+ setStore(null); // will close automatically the previously set store...
}
- m_currentStore = null;
}
/**
@@ -132,77 +173,7 @@ public class FeedbackStoreManager {
* @return a ordered set of all storeIds, oldest first
*/
public SortedSet<Long> getAllFeedbackStoreIDs() throws IOException {
- File[] files = getStoreFiles();
- SortedSet<Long> storeIDs = new TreeSet<Long>();
- for (int i = 0; i < files.length; i++) {
- storeIDs.add(getStoreId(files[i]));
- }
- return storeIDs;
- }
-
- /**
- * Write to the currently active store
- *
- * @param type
- * the type of message
- * @param properties
- * the properties to be logged
- */
- public void write(int type, Map<String, String> properties) throws IOException {
- try {
- // check if we exceed the maximum allowed store size, if we do cleanup an old file
- // check if the current store file maximum filesize is reached, if it is the current store should be rotated
- if (isCurrentStoreMaximumFileSizeReached()) {
- if (isCleanupRequired()) {
- cleanup();
- }
- m_currentStore.close();
- m_currentStore = createStore(m_currentStore.getId(), getLastLogfileNumber(m_currentStore.getId()) + 1);
- }
- // convert the map of properties to a dictionary
- Dictionary<String, String> dictionary = new Hashtable<String, String>();
- for (Entry<String, String> entry : properties.entrySet()) {
- dictionary.put(entry.getKey(), entry.getValue());
- }
- // log the event
- long nextEventId = (m_highest = getHighestEventID(m_currentStore.getId()) + 1);
- Event result = new Event(null, m_currentStore.getId(), nextEventId, System.currentTimeMillis(), type, dictionary);
- m_currentStore.append(result.getID(), result.toRepresentation().getBytes());
- }
- catch (IOException ex) {
- handleException(m_currentStore, ex);
- }
- }
-
- public void forceCreateNewStore() throws IOException {
- m_currentStore = newFeedbackStore();
- }
-
- /**
- * Give the highest eventId that is is present is the specified store
- *
- * @param the
- * storeId
- * @return the highest event present in the store
- */
- public long getHighestEventID(long storeID) throws IOException {
- FeedbackStore store = getLastStore(storeID);
- try {
- if (m_highest == 0) {
- store.init();
- return (m_highest = store.getCurrent());
- }
- else {
- return m_highest;
- }
- }
- catch (IOException ex) {
- handleException(store, ex);
- }
- finally {
- closeIfNeeded(new FeedbackStore[] { store });
- }
- return -1;
+ return new TreeSet<Long>(m_storeFileIdx.keySet());
}
/**
@@ -217,114 +188,208 @@ public class FeedbackStoreManager {
* the end of the range of events
*/
public List<Event> getEvents(long storeID, long fromEventID, long toEventID) throws IOException {
+ if (m_closed.get()) {
+ return Collections.emptyList();
+ }
+
FeedbackStore[] stores = getAllStores(storeID);
- List<Event> result = new ArrayList<Event>();
try {
+ List<Record> records = new ArrayList<Record>();
for (FeedbackStore store : stores) {
try {
-
- if (store.getCurrent() > fromEventID) {
- store.reset();
- }
- while (store.hasNext()) {
- long eventID = store.readCurrentID();
- if ((eventID >= fromEventID) && (eventID <= toEventID)) {
- result.add(new Event(new String(store.read())));
- }
- else {
- store.skip();
- }
+ if (store.getFirstEventID() <= toEventID && store.getLastEventID() >= fromEventID) {
+ records.addAll(store.getRecords(fromEventID, toEventID));
}
}
catch (Exception ex) {
handleException(store, ex);
}
}
+
+ // Sort the records by their event ID...
+ Collections.sort(records);
+ // Unmarshal the records into concrete log events...
+ List<Event> result = new ArrayList<Event>();
+ for (Record record : records) {
+ result.add(new Event(record.m_entry));
+ }
+ return result;
}
finally {
closeIfNeeded(stores);
}
- return result;
}
/**
- * Handle exceptions. This method will truncate the store from the point the error occurred. If the error occurred
- * in the currently active store it will close the current store and create a new one. The original error is
- * rethrowed.
+ * Give the highest eventId that is is present is the specified store
*
- * @param store
- * the store where the exception happened
- * @param exception
- * the original exception
+ * @param the
+ * storeId
+ * @return the highest event present in the store, >= 0, or <tt>-1</tt> if this manager is already closed.
+ * @throws IOException
+ * in case of I/O problems accessing the store(s).
*/
- private void handleException(FeedbackStore store, Exception exception) throws IOException {
- logError("Exception caught while accessing feedback channel store #%d", exception, store.getId());
- if (store == m_currentStore) {
- m_currentStore = newFeedbackStore();
+ public long getHighestEventID(long storeID) throws IOException {
+ if (m_closed.get()) {
+ return -1L;
}
+ FeedbackStore store = getLastStore(storeID);
try {
- store.truncate();
+ return store.getLastEventID();
}
- catch (IOException ex) {
- logError("Exception caught while truncating feedback channel store #%d", ex, store.getId());
+ finally {
+ closeIfNeeded(store);
+ }
+ }
+
+ /**
+ * Write to the currently active store
+ *
+ * @param type
+ * the type of message
+ * @param properties
+ * the properties to be logged
+ */
+ public void write(int type, Map<String, String> properties) throws IOException {
+ if (m_closed.get()) {
+ // Nothing we can do here...
+ return;
}
+
+ FeedbackStore currentStore = getCurrentStore();
+
try {
- store.close();
+ long storeID = currentStore.getId();
+ // make sure to continue with the last written event in case we're rotating to a new file...
+ long nextEventId = currentStore.getLastEventID() + 1;
+
+ // check if the current store file maximum filesize is reached, if it is the current store should be rotated
+ if (isMaximumStoreSizeReached(currentStore)) {
+ int newFileNo = getLastLogfileNumber(storeID) + 1;
+ currentStore = setStore(createStore(storeID, newFileNo));
+
+ // check if we exceed the maximum allowed store size, if so, we do clean up old files...
+ cleanupOldStoreFiles();
+ }
+
+ // log the event XXX shouldn't the target ID be filled in?
+ Event result = new Event(getTargetID(), storeID, nextEventId, System.currentTimeMillis(), type, properties);
+
+ currentStore.append(result.getID(), result.toRepresentation().getBytes());
}
catch (IOException ex) {
- // Not much we can do
+ handleException(currentStore, ex);
}
- if (exception instanceof IOException) {
- throw (IOException) exception;
+ }
+
+ void forceCreateNewStore() throws IOException {
+ setStore(newFeedbackStore());
+ }
+
+ /**
+ * @throws IOException
+ */
+ private void cleanupOldStoreFiles() throws IOException {
+ int maxFiles = (int) Math.ceil(m_maxStoreSize / m_maxFileSize);
+
+ File[] storeFiles = getStoreFiles();
+ if (storeFiles.length > maxFiles) {
+ // we've exceeded our total storage limit...
+ int deleteTo = storeFiles.length - maxFiles;
+ // delete the files...
+ for (int i = 0; i < deleteTo; i++) {
+ storeFiles[i].delete();
+ }
}
- throw new IOException("Unable to read log entry: " + exception.getMessage());
}
/**
- * Check if the maximum allowed size for the current store file is reached
+ * Close all the feedbackstores if necessary
*
- * @return is the maximum reached
+ * @param stores
+ * a list of stores
*/
- private boolean isCurrentStoreMaximumFileSizeReached() throws IOException {
- return (m_currentStore.getFileSize()) >= (m_maxFileSize / NUMBER_OF_FILES);
+ private void closeIfNeeded(FeedbackStore... stores) {
+ for (FeedbackStore store : stores) {
+ if (store != getCurrentStore()) {
+ try {
+ store.close();
+ }
+ catch (IOException ex) {
+ // Not much we can do
+ }
+ }
+ }
}
/**
- * Check if the maximum fileSize for all the logfiles together is reached
+ * Create a new feedbackstore with the specified storeId and fileNumber.
*
- * @return is the cleanup required
+ * @param storeId
+ * the storeId
+ * @param fileNumber
+ * the new sequence number for this storeID
+ * @return a feedbackstore
*/
- private boolean isCleanupRequired() throws IOException {
- return getFileSize(getStoreFiles()) >= (m_maxFileSize);
+ private FeedbackStore createStore(long storeId, int fileNumber) throws IOException {
+ File storeFile = new File(m_baseDir, getStoreName(storeId, fileNumber));
+ m_storeFileIdx.get(storeId).add(Integer.valueOf(fileNumber));
+ return new FeedbackStore(storeFile, storeId);
}
/**
- * Removes old logfiles starting from the oldest file. It stops when there is 10% free space.
+ * Return all feedbackstores for a single storeId.
+ *
+ * @param storeId
+ * the storeId
+ * @return a list of all feedbackstores for this storeId
*/
- private void cleanup() throws IOException {
- File[] files = getStoreFiles();
- while (getFileSize(files) > ((m_maxFileSize) / (NUMBER_OF_FILES - 1))) {
- File oldestFile = files[0];
- if (oldestFile != null) {
- oldestFile.delete();
- }
- files = getStoreFiles();
+ private FeedbackStore[] getAllStores(long storeId) throws IOException {
+ List<FeedbackStore> stores = new ArrayList<FeedbackStore>();
+
+ SortedSet<Integer> storeFileNos = m_storeFileIdx.get(storeId);
+
+ FeedbackStore currentStore = getCurrentStore();
+ if (currentStore.getId() == storeId) {
+ // The last one is the current store...
+ storeFileNos = storeFileNos.headSet(storeFileNos.last());
+ }
+ for (Integer fileNo : storeFileNos) {
+ stores.add(createStore(storeId, fileNo));
}
+ if (currentStore.getId() == storeId) {
+ stores.add(currentStore);
+ }
+
+ return stores.toArray(new FeedbackStore[stores.size()]);
}
/**
- * Return the filesize of the given files in Kb
+ * Get the name of the store for a storeId
*
- * @param files
- * a list of files
+ * @param storeId
+ * the storeId
+ * @return the basename of the file
*/
- private long getFileSize(File[] files) {
- long size = 0;
- for (File file : files) {
- size += file.length();
- }
- return size;
+ private String getBaseStoreName(long storeId) {
+ return String.format("%s-%d.", m_name, storeId);
+ }
+
+ private FeedbackStore getCurrentStore() {
+ return m_currentStoreRef.get();
+ }
+
+ /**
+ * Returns the last file for the specified storeId
+ *
+ * @param storeId
+ * the storeID
+ * @return the latest (newest) file backing the specified storeID
+ */
+ private int getLastLogfileNumber(long storeId) throws IOException {
+ SortedSet<Integer> fileNos = m_storeFileIdx.get(storeId);
+ return (fileNos == null) ? 1 : fileNos.last();
}
/**
@@ -336,11 +401,18 @@ public class FeedbackStoreManager {
* @return the feedbackstore for that storeID
*/
private FeedbackStore getLastStore(long storeID) throws IOException {
- if (m_currentStore != null && m_currentStore.getId() == storeID) {
- return m_currentStore;
+ FeedbackStore currentStore = getCurrentStore();
+ if (currentStore != null && currentStore.getId() == storeID) {
+ return currentStore;
}
- return createStore(storeID);
+ int lastFileNo = getLastLogfileNumber(storeID);
+ return createStore(storeID, lastFileNo);
+ }
+
+ private int getLogfileNumber(String logfileName, long storeId) {
+ String extension = logfileName.replace(m_name + "-" + storeId + ".", "");
+ return Integer.parseInt(extension);
}
/**
@@ -356,156 +428,136 @@ public class FeedbackStoreManager {
// sort files on storeId and fileNumber
Arrays.sort(files, new Comparator<File>() {
public int compare(File f1, File f2) {
- int result = (int) (getStoreId(f1) - getStoreId(f2));
+ long storeId1 = getStoreId(f1);
+ long storeId2 = getStoreId(f2);
+
+ int result = (int) (storeId1 - storeId2);
if (result == 0) {
- int f1Number = getLogfileNumber(f1.getName(), getStoreName(getStoreId(f1)));
- int f2Number = getLogfileNumber(f2.getName(), getStoreName(getStoreId(f2)));
+ int f1Number = getLogfileNumber(f1.getName(), storeId1);
+ int f2Number = getLogfileNumber(f2.getName(), storeId2);
result = f1Number - f2Number;
}
return result;
}
-
});
return files;
}
/**
- * Create a new empty feedbackstore with a new storeId.
+ * Parse the storeId from the specified fileName
*
- * @return A new feedbackstore with a new storeID
+ * @param storeFile
+ * a store file
+ * @return the storeId
*/
- private FeedbackStore newFeedbackStore() throws IOException {
- long storeId = System.currentTimeMillis();
- while (!(new File(m_baseDir, getStoreName(storeId) + ".1")).createNewFile()) {
- storeId++;
+ private long getStoreId(File storeFile) {
+ Pattern p = Pattern.compile(m_name + "-(\\d+)");
+ Matcher m = p.matcher(storeFile.getName());
+ if (m.find()) {
+ return Long.parseLong(m.group(1));
}
- return new FeedbackStore(new File(m_baseDir, getStoreName(storeId) + ".1"), storeId);
+ throw new RuntimeException("Invalid store file name: " + storeFile.getName());
}
/**
- * Return all feedbackstores for a single storeId.
+ * Get the name of the store for a storeId
*
* @param storeId
* the storeId
- * @return a list of all feedbackstores for this storeId
+ * @return the basename of the file
*/
- private FeedbackStore[] getAllStores(long storeId) throws IOException {
- List<FeedbackStore> stores = new ArrayList<FeedbackStore>();
- File[] files = getStoreFiles();
- for (File file : files) {
- if (storeId == getStoreId(file)) {
- stores.add(new FeedbackStore(file, storeId));
- }
- }
-
- // replace the last reference to the current store to make sure there are no multiple FeedbackStores for one
- // file
- if (stores.size() >= 1 && m_currentStore.getId() == storeId) {
- stores.set(stores.size() - 1, m_currentStore);
- }
- return stores.toArray(new FeedbackStore[stores.size()]);
+ private String getStoreName(long storeId, int fileNo) {
+ return String.format("%s%d", getBaseStoreName(storeId), fileNo);
}
- /**
- * Create the feedbackstore for the specified storeId. If this storeId is already backed by files on disk then the
- * last file will be used to create this feedbackstore.
- *
- * @param storeId
- * the storeId
- * @return the newest feedbackstore for this storeID
- */
- private FeedbackStore createStore(long storeId) throws IOException {
- return createStore(storeId, getLastLogfileNumber(storeId));
+ private String getTargetID() {
+ IdentificationHandler idHandler = m_agentContext.getHandler(IdentificationHandler.class);
+ return idHandler.getAgentId();
}
/**
- * Create a new feedbackstore with the specified storeId and fileNumber.
+ * Handle exceptions. This method will truncate the store from the point the error occurred. If the error occurred
+ * in the currently active store it will close the current store and create a new one. The original error is
+ * rethrowed.
*
- * @param storeId
- * the storeId
- * @param fileNumber
- * the new sequence number for this storeID
- * @return a feedbackstore
+ * @param store
+ * the store where the exception happened
+ * @param exception
+ * the original exception
*/
- private FeedbackStore createStore(long storeId, int fileNumber) throws IOException {
- if (isCleanupRequired()) {
- cleanup();
+ private void handleException(FeedbackStore store, Exception exception) throws IOException {
+ logError("Exception caught while accessing feedback channel store #%d", exception, store.getId());
+ if (store == getCurrentStore()) {
+ setStore(newFeedbackStore()); // XXX
}
- return new FeedbackStore(new File(m_baseDir, getStoreName(storeId) + "." + fileNumber), storeId);
- }
-
- /**
- * Returns the last file for the specified storeId
- *
- * @param storeId
- * the storeID
- * @return the latest (newest) file backing the specified storeID
- */
- private int getLastLogfileNumber(long storeId) throws IOException {
- File[] storeFiles = getStoreFiles();
- String storeName = getStoreName(storeId);
- int lastNumber = 1;
- for (File file : storeFiles) {
- String fileName = file.getName();
- if (fileName.contains(storeName)) {
- lastNumber = getLogfileNumber(fileName, storeName);
- }
+ try {
+ store.truncate();
+ }
+ catch (IOException ex) {
+ logError("Exception caught while truncating feedback channel store #%d", ex, store.getId());
+ }
+ try {
+ store.close();
+ }
+ catch (IOException ex) {
+ // Not much we can do
+ }
+ if (exception instanceof IOException) {
+ throw (IOException) exception;
}
- return lastNumber;
+ throw new IOException("Unable to read log entry: " + exception.getMessage());
}
/**
- * Get the name of the store for a storeId
+ * Check if the maximum allowed size for the current store file is reached
*
- * @param storeId
- * the storeId
- * @return the basename of the file
+ * @return is the maximum reached
*/
- private String getStoreName(long storeId) {
- return m_name + "-" + storeId;
+ private boolean isMaximumStoreSizeReached(FeedbackStore store) throws IOException {
+ return store.getFileSize() >= m_maxFileSize;
}
- private int getLogfileNumber(String logfileName, String storeName) {
- String extension = logfileName.replace(storeName + ".", "");
- return Integer.parseInt(extension);
-
+ private void logError(String msg, Exception cause, Object... args) {
+ m_agentContext.getHandler(LoggingHandler.class).logError("feedbackChannel(" + m_name + ")", msg, cause, args);
}
/**
- * Parse the storeId from the specified fileName
+ * Create a new empty feedbackstore with a new storeId.
*
- * @param storeFile
- * a store file
- * @return the storeId
+ * @return A new feedbackstore with a new storeID
*/
- private long getStoreId(File storeFile) {
- // remove the extension from the filename
- String storeName = storeFile.getName().replaceFirst("[.][^.]+$", "");
- return Long.parseLong(storeName.replace(m_name + "-", ""));
- }
+ private FeedbackStore newFeedbackStore() throws IOException {
+ long storeId = System.currentTimeMillis();
- /**
- * Close all the feedbackstores if necessary
- *
- * @param stores
- * a list of stores
- */
- private void closeIfNeeded(FeedbackStore[] stores) {
- for (FeedbackStore store : stores) {
- if (store != m_currentStore) {
- try {
- store.close();
- }
- catch (IOException ex) {
- // Not much we can do
- }
+ String storeFilename;
+ File storeFile;
+ do {
+ storeFilename = getStoreName(storeId, 1);
+ storeFile = new File(m_baseDir, storeFilename);
+ if (storeFile.createNewFile()) {
+ break;
}
+ storeId++;
}
- }
+ while (true);
- private void logError(String msg, Exception cause, Object... args) {
- m_agentContext.getHandler(LoggingHandler.class).logError("feedbackChannel(" + m_name + ")", msg, cause, args);
+ m_storeFileIdx.put(storeId, new TreeSet<Integer>(Arrays.asList(1)));
+
+ return new FeedbackStore(storeFile, storeId);
}
+ private FeedbackStore setStore(FeedbackStore store) throws IOException {
+ FeedbackStore old;
+ do {
+ old = m_currentStoreRef.get();
+ }
+ while (!m_currentStoreRef.compareAndSet(old, store));
+
+ if (old != null) {
+ old.close();
+ }
+
+ return store;
+ }
}
Modified: ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/FeedbackChannelImplTest.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/FeedbackChannelImplTest.java?rev=1528378&r1=1528377&r2=1528378&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/FeedbackChannelImplTest.java (original)
+++ ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/FeedbackChannelImplTest.java Wed Oct 2 08:54:50 2013
@@ -63,7 +63,6 @@ public class FeedbackChannelImplTest ext
List<Event> m_events = new ArrayList<Event>();
- @SuppressWarnings("deprecation")
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
resp.setContentType("text/plain");
@@ -73,20 +72,19 @@ public class FeedbackChannelImplTest ext
Event event = new Event(eventString);
m_events.add(event);
}
- resp.setStatus(200, "voila");
+ resp.setStatus(200);
}
}
static class TestQueryFeedbackServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
- @SuppressWarnings("deprecation")
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
String targetID = request.getParameter("tid");
long logID = Long.parseLong(request.getParameter("logid"));
response.getOutputStream().print(new Descriptor(targetID, logID, new SortedRangeSet(new long[0])).toRepresentation());
- response.setStatus(200, "voila");
+ response.setStatus(200);
}
}
Modified: ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/FeedbackStoreManagerTest.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/FeedbackStoreManagerTest.java?rev=1528378&r1=1528377&r2=1528378&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/FeedbackStoreManagerTest.java (original)
+++ ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/FeedbackStoreManagerTest.java Wed Oct 2 08:54:50 2013
@@ -27,10 +27,12 @@ import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.List;
import java.util.SortedSet;
import org.apache.ace.agent.AgentContext;
import org.apache.ace.agent.testutil.BaseAgentTest;
+import org.apache.ace.feedback.Event;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -88,44 +90,46 @@ public class FeedbackStoreManagerTest ex
long storeID = getStoreID(feedbackStoreManager);
assertEquals(feedbackStoreManager.getHighestEventID(storeID), 0);
+
feedbackStoreManager.write(1, new HashMap<String, String>());
- assertEquals(feedbackStoreManager.getHighestEventID(storeID), 1);
- assertEquals(feedbackStoreManager.getEvents(storeID, 1, 1).size(), 1);
+ assertEquals(feedbackStoreManager.getHighestEventID(storeID), 1);
+ assertEquals(feedbackStoreManager.getEvents(storeID, 0, 1).size(), 1);
feedbackStoreManager.forceCreateNewStore();
- assertEquals(feedbackStoreManager.getEvents(storeID, 1, 1).size(), 1);
+ assertEquals(feedbackStoreManager.getEvents(storeID, 0, 1).size(), 1);
}
@Test
public void testLogfileRotation() throws Exception {
- int maxSize = 20;
+ int maxSize = 100 * 1024;
- FeedbackStoreManager feedbackStoreManager = new FeedbackStoreManager(m_agentContext, "test", maxSize);
+ FeedbackStoreManager feedbackStoreManager = new FeedbackStoreManager(m_agentContext, "test", maxSize, maxSize / 5);
long storeID = getStoreID(feedbackStoreManager);
+
+ int recordCount = 1000;
assertEquals(feedbackStoreManager.getHighestEventID(storeID), 0);
// absolutely exceed the set filesize for this store
- for (int i = 0; i < 1000; i++) {
- feedbackStoreManager.write(1, new HashMap<String, String>());
+ for (int i = 0; i < recordCount; i++) {
+ HashMap<String, String> eventProps = new HashMap<String, String>();
+ eventProps.put("key", "value" + i);
+ feedbackStoreManager.write(i, eventProps);
}
File[] logFiles = getLogFiles();
+ assertTrue(logFiles.length > 1);
- assertTrue(logFiles.length <= 10);
- // first available id
- FeedbackStore first = new FeedbackStore(logFiles[0], getStoreId(logFiles[0]));
- first.reset();
- long firstId = first.readCurrentID() - 1;
-
- assertEquals(feedbackStoreManager.getEvents(storeID, 1, 1000).size(), (1000 - firstId));
+ // take the last 1000 events...
+ List<Event> events = feedbackStoreManager.getEvents(storeID, 1, 1000);
+ assertEquals(events.size(), 1000);
long logFileSize = 0;
for (File file : logFiles) {
logFileSize += file.length();
}
- assertTrue(logFileSize < (maxSize * 1024));
+ assertTrue(logFileSize < maxSize);
}
private long getStoreID(FeedbackStoreManager feedbackStoreManager) throws Exception {
Added: ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/FeedbackStoreTest.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/FeedbackStoreTest.java?rev=1528378&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/FeedbackStoreTest.java (added)
+++ ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/FeedbackStoreTest.java Wed Oct 2 08:54:50 2013
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.agent.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.ace.agent.impl.FeedbackStore.Record;
+import org.testng.annotations.Test;
+
+/**
+ * Test cases for {@link FeedbackStore}.
+ */
+public class FeedbackStoreTest {
+ private static class Reader extends Thread {
+ private final CountDownLatch m_start;
+ private final CountDownLatch m_stop;
+ private final FeedbackStore m_store;
+ private final ConcurrentMap<Long, Boolean> m_seen = new ConcurrentHashMap<Long, Boolean>();
+ private final int m_count;
+
+ public Reader(FeedbackStore store, CountDownLatch start, CountDownLatch stop, int count) {
+ this(store, start, stop, count, 0);
+ }
+
+ public Reader(FeedbackStore store, CountDownLatch start, CountDownLatch stop, int count, int initial) {
+ setName("Reader-" + initial);
+ m_store = store;
+ m_start = start;
+ m_stop = stop;
+ m_count = count;
+ }
+
+ @Override
+ public void run() {
+ try {
+ m_start.await();
+
+ System.out.printf("Reader (%s) starting to read %d records...%n", getName(), m_count);
+
+ Random rnd = new Random();
+
+ long oldID = 0;
+ while (m_seen.size() < m_count) {
+ try {
+ // generate data records with different sizes...
+ if (rnd.nextInt(10) >= 5) {
+ // reset all...
+ m_store.init();
+ }
+ long id = m_store.getLastEventID();
+ if (id >= m_count) {
+ throw new IOException(String.format("Invalid record ID: %1$d (0x%1$x)!%n", id));
+ }
+ for (long j = oldID; j <= id; j++) {
+ m_seen.putIfAbsent(Long.valueOf(j), Boolean.TRUE);
+ }
+ oldID = id;
+ }
+ catch (IOException e) {
+ System.out.printf("I/O exception (%s) caught: %s in %s.%n", e.getClass().getSimpleName(), e.getMessage(), getCaller(e));
+ }
+ }
+
+ System.out.printf("Reader (%s) finished with %d records read...%n", getName(), m_seen.size());
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ finally {
+ m_stop.countDown();
+ }
+ }
+ }
+
+ private static class Writer extends Thread {
+ private final CountDownLatch m_start;
+ private final CountDownLatch m_stop;
+ private final FeedbackStore m_store;
+ private final ConcurrentMap<Long, byte[]> m_written = new ConcurrentHashMap<Long, byte[]>();
+ private final int m_count;
+ private final int m_initValue;
+ private final int m_stepSize;
+
+ public Writer(FeedbackStore store, CountDownLatch start, CountDownLatch stop, int count) {
+ this(store, start, stop, count, 0, 1);
+ }
+
+ public Writer(FeedbackStore store, CountDownLatch start, CountDownLatch stop, int count, int initial, int stepSize) {
+ setName("Writer-" + initial);
+ m_store = store;
+ m_start = start;
+ m_stop = stop;
+ m_count = count;
+ m_initValue = initial;
+ m_stepSize = stepSize;
+ }
+
+ @Override
+ public void run() {
+ try {
+ m_start.await();
+
+ System.out.printf("Writer (%s) starts writing %d records...%n", getName(), m_count);
+
+ for (int i = m_initValue; i < m_count; i += m_stepSize) {
+ long id = i;
+ byte[] entry = String.format("record-data-%d", i).getBytes();
+
+ m_store.append(id, entry);
+ m_written.putIfAbsent(Long.valueOf(id), entry);
+ }
+
+ System.out.printf("Writer (%s) finished with %d records written...%n", getName(), m_written.size());
+ }
+ catch (InterruptedException e) {
+ // ok, stop...
+ }
+ catch (IOException exception) {
+ exception.printStackTrace();
+ }
+ finally {
+ m_stop.countDown();
+ }
+ }
+ }
+
+ private static String getCaller(Exception e) {
+ StringBuilder sb = new StringBuilder();
+ StackTraceElement[] st = e.getStackTrace();
+ int n = Math.min(st.length, 1);
+ int m = Math.min(st.length, 4);
+ for (int i = n; i < m; i++) {
+ if (i > n) {
+ sb.append(" -> ");
+ }
+ StackTraceElement ste = st[i];
+ sb.append(ste.getClassName()).append(".").append(ste.getMethodName()).append("(").append(ste.getLineNumber()).append(")");
+ }
+ return sb.toString();
+ }
+
+ @Test
+ public void testTimedWrite() throws Exception {
+ File storeFile = File.createTempFile("feedback", ".store");
+ storeFile.deleteOnExit();
+
+ final int recordCount = 1000000;
+
+ final FeedbackStore store = new FeedbackStore(storeFile, 1);
+
+ long start = System.nanoTime();
+ for (int i = 0; i < recordCount; i++) {
+ store.append(i, "Hello World!".getBytes());
+ }
+ long end = System.nanoTime();
+ System.out.printf("Writing %d records took %.3f ms.%n", recordCount, (end - start) / 1.0e6);
+ }
+
+ /**
+ * Tests that concurrent use of a {@link FeedbackStore} with a single reader and multiple writers works as expected.
+ */
+ @Test
+ public void testConcurrentUseSingleReaderAndMultipleWriters() throws Exception {
+ File storeFile = File.createTempFile("feedback", ".store");
+ storeFile.deleteOnExit();
+
+ final int recordCount = 100000;
+ final int writerCount = Runtime.getRuntime().availableProcessors() + 1;
+
+ final FeedbackStore store = new FeedbackStore(storeFile, 1);
+
+ final CountDownLatch start = new CountDownLatch(1);
+ final CountDownLatch stop = new CountDownLatch(writerCount + 1);
+
+ Writer[] writers = new Writer[writerCount];
+
+ for (int i = 0; i < writerCount; i++) {
+ writers[i] = new Writer(store, start, stop, recordCount, i, writerCount);
+ }
+
+ Reader reader = new Reader(store, start, stop, recordCount);
+
+ // gents, start your engines...
+ reader.start();
+ for (int i = 0; i < writers.length; i++) {
+ writers[i].start();
+ }
+
+ // 3, 2, 1... GO...
+ start.countDown();
+
+ // waiting both threads to finish...
+ assertTrue(stop.await(30, TimeUnit.SECONDS));
+
+ int writtenCount = 0;
+ for (int i = 0; i < writers.length; i++) {
+ writers[i].join();
+ writtenCount += writers[i].m_written.size();
+ }
+
+ reader.join();
+
+ int readCount = reader.m_seen.size();
+
+ assertEquals(recordCount, writtenCount, "Not all records were written?");
+ assertEquals(readCount, writtenCount, "Not all records were seen?");
+
+ verifyStoreContents(store, recordCount, writers);
+ }
+
+ /**
+ * Tests that concurrent use of a {@link FeedbackStore} with multiple readers and multiple writers works as
+ * expected.
+ */
+ @Test
+ public void testConcurrentUseMultipleReaderAndMultipleWriters() throws Exception {
+ File storeFile = File.createTempFile("feedback", ".store");
+ storeFile.deleteOnExit();
+
+ final int recordCount = 100000;
+ final int readerCount = Runtime.getRuntime().availableProcessors() + 1;
+ final int writerCount = Runtime.getRuntime().availableProcessors() + 1;
+
+ final FeedbackStore store = new FeedbackStore(storeFile, 1);
+
+ final CountDownLatch start = new CountDownLatch(1);
+ final CountDownLatch stop = new CountDownLatch(writerCount + readerCount);
+
+ Writer[] writers = new Writer[writerCount];
+ for (int i = 0; i < writerCount; i++) {
+ writers[i] = new Writer(store, start, stop, recordCount, i, writerCount);
+ }
+
+ Reader[] readers = new Reader[readerCount];
+ for (int i = 0; i < readerCount; i++) {
+ readers[i] = new Reader(store, start, stop, recordCount, i);
+ }
+
+ // gents, start your engines...
+ for (int i = 0; i < readers.length; i++) {
+ readers[i].start();
+ }
+ for (int i = 0; i < writers.length; i++) {
+ writers[i].start();
+ }
+
+ // 3, 2, 1... GO...
+ start.countDown();
+
+ // waiting both threads to finish...
+ assertTrue(stop.await(30, TimeUnit.SECONDS));
+
+ int readCount = 0;
+ for (int i = 0; i < readers.length; i++) {
+ readers[i].join();
+ readCount += readers[i].m_seen.size();
+ }
+ int writtenCount = 0;
+ for (int i = 0; i < writers.length; i++) {
+ writers[i].join();
+ writtenCount += writers[i].m_written.size();
+ }
+
+ assertEquals(recordCount, writtenCount, "Not all records were written?");
+ // All readers read the exact same data, so we've got N copies of it...
+ assertEquals(readCount, readerCount * writtenCount, "Not all records were seen?");
+
+ verifyStoreContents(store, recordCount, writers);
+ }
+
+ /**
+ * Tests that concurrent use of a {@link FeedbackStore} with a single reader and writer works as expected.
+ */
+ @Test
+ public void testConcurrentUseSingleReaderAndSingleWriter() throws Exception {
+ File storeFile = File.createTempFile("feedback", ".store");
+ storeFile.deleteOnExit();
+
+ final int recordCount = 10000;
+
+ final FeedbackStore store = new FeedbackStore(storeFile, 1);
+
+ final CountDownLatch start = new CountDownLatch(1);
+ final CountDownLatch stop = new CountDownLatch(2);
+
+ Writer writer = new Writer(store, start, stop, recordCount);
+ Reader reader = new Reader(store, start, stop, recordCount);
+
+ // gents, start your engines...
+ writer.start();
+ reader.start();
+
+ // 3, 2, 1... GO...
+ start.countDown();
+
+ // waiting both threads to finish...
+ assertTrue(stop.await(15, TimeUnit.SECONDS));
+
+ writer.join();
+ reader.join();
+
+ int writeCount = writer.m_written.size();
+ int readCount = reader.m_seen.size();
+
+ assertEquals(recordCount, writeCount, "Not all records were written?");
+ assertEquals(readCount, writeCount, "Not all records were seen?");
+
+ verifyStoreContents(store, recordCount, writer);
+ }
+
+ private void verifyStoreContents(final FeedbackStore store, final int count, Writer... writers) throws IOException {
+ store.reset();
+ store.init();
+
+ assertEquals(store.getFirstEventID(), 0, "First record ID is different");
+ assertEquals(store.getLastEventID(), count - 1, "Last record ID is different");
+
+ // Verify the written file...
+ List<Record> records = store.getRecords(0, count - 1);
+ Collections.sort(records);
+
+ long expectedID = 0;
+ for (Record record : records) {
+ long id = record.m_id;
+
+ byte[] expectedEntry = null;
+ for (int i = 0; (expectedEntry == null) && i < writers.length; i++) {
+ expectedEntry = writers[i].m_written.remove(id);
+ }
+ assertNotNull(expectedEntry, "Event ID #" + id + " never written?!");
+ // Test consistency of written data...
+ assertEquals(record.m_entry, expectedEntry, "Entry mismatch?!");
+ // Test continuation of written data...
+ assertEquals(record.m_id, expectedID++, "Entry ID mismatch?!");
+ }
+ }
+}
Propchange: ace/trunk/org.apache.ace.agent/test/org/apache/ace/agent/impl/FeedbackStoreTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: ace/trunk/org.apache.ace.authentication.itest/src/org/apache/ace/it/authentication/LogAuthenticationTest.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.authentication.itest/src/org/apache/ace/it/authentication/LogAuthenticationTest.java?rev=1528378&r1=1528377&r2=1528378&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.authentication.itest/src/org/apache/ace/it/authentication/LogAuthenticationTest.java (original)
+++ ace/trunk/org.apache.ace.authentication.itest/src/org/apache/ace/it/authentication/LogAuthenticationTest.java Wed Oct 2 08:54:50 2013
@@ -22,19 +22,19 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
-import java.util.Dictionary;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.ace.client.repository.SessionFactory;
import org.apache.ace.connectionfactory.ConnectionFactory;
import org.apache.ace.discovery.property.constants.DiscoveryConstants;
-import org.apache.ace.log.Log;
import org.apache.ace.feedback.Descriptor;
import org.apache.ace.feedback.Event;
import org.apache.ace.http.listener.constants.HttpConstants;
import org.apache.ace.identification.property.constants.IdentificationConstants;
+import org.apache.ace.log.Log;
import org.apache.ace.log.server.store.LogStore;
import org.apache.ace.repository.Repository;
import org.apache.ace.repository.impl.constants.RepositoryConstants;
@@ -198,8 +198,8 @@ public class LogAuthenticationTest exten
// prepare the store
List<Event> events = new ArrayList<Event>();
- events.add(new Event(tid1, 1, 1, 1, 1, new Properties()));
- events.add(new Event(tid2, 1, 1, 1, 1, new Properties()));
+ events.add(new Event(tid1, 1, 1, 1, 1));
+ events.add(new Event(tid2, 1, 1, 1, 1));
m_serverStore.put(events);
List<String> result = getResponse("http://localhost:" + TestConstants.PORT + "/auditlog/query");
@@ -243,7 +243,7 @@ public class LogAuthenticationTest exten
List<Event> events = m_serverStore.get(descriptor);
for (Event event : events) {
if (event.getType() == type) {
- Dictionary properties = event.getProperties();
+ Map<String, String> properties = event.getProperties();
assertEquals("value1", properties.get("one"));
assertEquals("value2", properties.get("two"));
found = true;
Modified: ace/trunk/org.apache.ace.client.repository.itest/src/org/apache/ace/it/repositoryadmin/ClientAutomationTest.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.client.repository.itest/src/org/apache/ace/it/repositoryadmin/ClientAutomationTest.java?rev=1528378&r1=1528377&r2=1528378&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.client.repository.itest/src/org/apache/ace/it/repositoryadmin/ClientAutomationTest.java (original)
+++ ace/trunk/org.apache.ace.client.repository.itest/src/org/apache/ace/it/repositoryadmin/ClientAutomationTest.java Wed Oct 2 08:54:50 2013
@@ -86,8 +86,7 @@ public class ClientAutomationTest extend
private void doAutoTargetReg() throws Exception {
List<Event> events = new ArrayList<Event>();
- Properties props = new Properties();
- events.add(new Event("anotherTarget", 1, 1, 1, AuditEvent.FRAMEWORK_STARTED, props));
+ events.add(new Event("anotherTarget", 1, 1, 1, AuditEvent.FRAMEWORK_STARTED));
// fill auditlog; no install data
m_auditLogStore.put(events);
@@ -122,7 +121,7 @@ public class ClientAutomationTest extend
// add a target which will not be autoregistered
events.clear();
- events.add(new Event("secondTarget", 1, 1, 1, AuditEvent.FRAMEWORK_STARTED, props));
+ events.add(new Event("secondTarget", 1, 1, 1, AuditEvent.FRAMEWORK_STARTED));
m_auditLogStore.put(events);
// do auto target action
Modified: ace/trunk/org.apache.ace.client.repository.itest/src/org/apache/ace/it/repositoryadmin/StatefulTargetRepositoryTest.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.client.repository.itest/src/org/apache/ace/it/repositoryadmin/StatefulTargetRepositoryTest.java?rev=1528378&r1=1528377&r2=1528378&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.client.repository.itest/src/org/apache/ace/it/repositoryadmin/StatefulTargetRepositoryTest.java (original)
+++ ace/trunk/org.apache.ace.client.repository.itest/src/org/apache/ace/it/repositoryadmin/StatefulTargetRepositoryTest.java Wed Oct 2 08:54:50 2013
@@ -31,7 +31,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.concurrent.Callable;
import org.apache.ace.client.repository.RepositoryAdminLoginContext;
@@ -184,15 +183,14 @@ public class StatefulTargetRepositoryTes
// add auditlog data
List<Event> events = new ArrayList<Event>();
- Properties props = new Properties();
- events.add(new Event("myNewTarget3", 1, 1, 1, AuditEvent.FRAMEWORK_STARTED, props));
+ events.add(new Event("myNewTarget3", 1, 1, 1, AuditEvent.FRAMEWORK_STARTED));
// add an (old) set of target properties
- Properties props2 = new Properties();
+ Map<String, String> props2 = new HashMap<String, String>();
props2.put("mykey", "myoldvalue");
props2.put("myoldkey", "myoldvalue");
events.add(new Event("myNewTarget3", 1, 2, 2, AuditEvent.TARGETPROPERTIES_SET, props2));
// add a new set of target properties
- Properties props3 = new Properties();
+ Map<String, String> props3 = new HashMap<String, String>();
props3.put("mykey", "myvalue");
events.add(new Event("myNewTarget3", 1, 3, 3, AuditEvent.TARGETPROPERTIES_SET, props3));
m_auditLogStore.put(events);
@@ -207,8 +205,7 @@ public class StatefulTargetRepositoryTes
// add auditlog data for other target
events = new ArrayList<Event>();
- props = new Properties();
- events.add(new Event("myNewTarget4", 1, 1, 1, AuditEvent.FRAMEWORK_STARTED, props));
+ events.add(new Event("myNewTarget4", 1, 1, 1, AuditEvent.FRAMEWORK_STARTED));
m_auditLogStore.put(events);
runAndWaitForEvent(new Callable<Object>() {
public Object call() throws Exception {
@@ -337,8 +334,7 @@ public class StatefulTargetRepositoryTes
// add auditlog data
List<Event> events = new ArrayList<Event>();
- Properties props = new Properties();
- events.add(new Event("myNewGatewayA", 1, 1, 1, AuditEvent.FRAMEWORK_STARTED, props));
+ events.add(new Event("myNewGatewayA", 1, 1, 1, AuditEvent.FRAMEWORK_STARTED));
m_auditLogStore.put(events);
m_statefulTargetRepository.refresh();
@@ -352,8 +348,7 @@ public class StatefulTargetRepositoryTes
}
// add auditlog data for other gateway
events = new ArrayList<Event>();
- props = new Properties();
- events.add(new Event("myNewGatewayB", 1, 1, 1, AuditEvent.FRAMEWORK_STARTED, props));
+ events.add(new Event("myNewGatewayB", 1, 1, 1, AuditEvent.FRAMEWORK_STARTED));
m_auditLogStore.put(events);
runAndWaitForEvent(new Callable<Object>() {
public Object call() throws Exception {
@@ -450,9 +445,8 @@ public class StatefulTargetRepositoryTes
final String targetId = String.format("target-%s", Long.toHexString(System.nanoTime()));
List<Event> events = new ArrayList<Event>();
- Properties props = new Properties();
- events.add(new Event(targetId, 1, 1, 1, AuditEvent.FRAMEWORK_STARTED, props));
+ events.add(new Event(targetId, 1, 1, 1, AuditEvent.FRAMEWORK_STARTED));
// fill auditlog; no install data
m_auditLogStore.put(events);
@@ -470,7 +464,7 @@ public class StatefulTargetRepositoryTes
// fill auditlog with complete-data
events = new ArrayList<Event>();
- props = new Properties();
+ Map<String, String> props = new HashMap<String, String>();
props.put(AuditEvent.KEY_NAME, "mypackage");
props.put(AuditEvent.KEY_VERSION, "123");
events.add(new Event(targetId, 1, 2, 2, AuditEvent.DEPLOYMENTCONTROL_INSTALL, props));
@@ -489,7 +483,7 @@ public class StatefulTargetRepositoryTes
// fill auditlog with install data
events = new ArrayList<Event>();
- props = new Properties();
+ props = new HashMap<String, String>();
props.put(AuditEvent.KEY_NAME, "mypackage");
props.put(AuditEvent.KEY_VERSION, "123");
props.put(AuditEvent.KEY_SUCCESS, "false");
@@ -514,7 +508,7 @@ public class StatefulTargetRepositoryTes
// add another install event.
events = new ArrayList<Event>();
- props = new Properties();
+ props = new HashMap<String, String>();
props.put(AuditEvent.KEY_NAME, "mypackage");
props.put(AuditEvent.KEY_VERSION, "124");
events.add(new Event(targetId, 1, 4, 4, AuditEvent.DEPLOYMENTCONTROL_INSTALL, props));
@@ -533,7 +527,7 @@ public class StatefulTargetRepositoryTes
// fill auditlog with install data
events = new ArrayList<Event>();
- props = new Properties();
+ props = new HashMap<String, String>();
props.put(AuditEvent.KEY_NAME, "mypackage");
props.put(AuditEvent.KEY_VERSION, "124");
props.put(AuditEvent.KEY_SUCCESS, "true");
@@ -630,10 +624,9 @@ public class StatefulTargetRepositoryTes
final String targetID = ":)";
List<Event> events = new ArrayList<Event>();
- Properties props = new Properties();
// add a target with a weird name.
- events.add(new Event(targetID, 1, 1, 1, AuditEvent.FRAMEWORK_STARTED, props));
+ events.add(new Event(targetID, 1, 1, 1, AuditEvent.FRAMEWORK_STARTED));
// fill auditlog; no install data
m_auditLogStore.put(events);
Modified: ace/trunk/org.apache.ace.client.repository/src/org/apache/ace/client/repository/stateful/impl/StatefulTargetObjectImpl.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.client.repository/src/org/apache/ace/client/repository/stateful/impl/StatefulTargetObjectImpl.java?rev=1528378&r1=1528377&r2=1528378&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.client.repository/src/org/apache/ace/client/repository/stateful/impl/StatefulTargetObjectImpl.java (original)
+++ ace/trunk/org.apache.ace.client.repository/src/org/apache/ace/client/repository/stateful/impl/StatefulTargetObjectImpl.java Wed Oct 2 08:54:50 2013
@@ -54,7 +54,7 @@ public class StatefulTargetObjectImpl im
private final Object m_lock = new Object();
private TargetObject m_targetObject;
private List<Descriptor> m_processedAuditEvents = new ArrayList<Descriptor>();
- private Dictionary m_processedTargetProperties;
+ private Map<String, String> m_processedTargetProperties;
private Map<String, String> m_attributes = new HashMap<String, String>();
/** This boolean is used to suppress STATUS_CHANGED events during the creation of the object. */
private boolean m_inConstructor = true;
@@ -386,7 +386,7 @@ public class StatefulTargetObjectImpl im
private void determineTargetPropertiesState() {
// only process them if the target is already registered
if (isRegistered() && m_processedTargetProperties != null) {
- Dictionary tags = m_processedTargetProperties;
+ Map<String, String> tags = m_processedTargetProperties;
m_processedTargetProperties = null;
// clear "old" tags starting with the prefix
Enumeration<String> keys = m_targetObject.getTagKeys();
@@ -401,10 +401,8 @@ public class StatefulTargetObjectImpl im
m_targetObject.removeTag(keyToDelete);
}
// add new tags and prefix them
- Enumeration newKeys = tags.keys();
- while (newKeys.hasMoreElements()) {
- String newKey = (String) newKeys.nextElement();
- m_targetObject.addTag(TARGETPROPERTIES_PREFIX + newKey, (String) tags.get(newKey));
+ for (String newKey : tags.keySet()) {
+ m_targetObject.addTag(TARGETPROPERTIES_PREFIX + newKey, tags.get(newKey));
}
}
}
Modified: ace/trunk/org.apache.ace.client.repository/test/org/apache/ace/client/repository/impl/ACE308Test.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.client.repository/test/org/apache/ace/client/repository/impl/ACE308Test.java?rev=1528378&r1=1528377&r2=1528378&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.client.repository/test/org/apache/ace/client/repository/impl/ACE308Test.java (original)
+++ ace/trunk/org.apache.ace.client.repository/test/org/apache/ace/client/repository/impl/ACE308Test.java Wed Oct 2 08:54:50 2013
@@ -24,19 +24,17 @@ import org.apache.ace.test.utils.TestUti
import org.testng.Assert;
import org.testng.annotations.Test;
-
/**
- * Before fixing ACE-308 the comparator could "overflow" when casting a long to an
- * int, changing the sign of the result. For this specific case, it would fail. After
- * the fix, it no longer fails.
+ * Before fixing ACE-308 the comparator could "overflow" when casting a long to an int, changing the sign of the result.
+ * For this specific case, it would fail. After the fix, it no longer fails.
*/
public class ACE308Test {
- @Test( groups = { TestUtils.UNIT } )
+ @Test(groups = { TestUtils.UNIT })
public void testLogEvents() {
- LogEventComparator c = new LogEventComparator();
- Event left = new Event("t", 1, 1, -1000000000000000000L, 0, null);
- Event right = new Event("t", 1, 1, 1, 0, null);
- Assert.assertTrue((left.getTime() - right.getTime()) < 0L);
- Assert.assertTrue(c.compare(left, right) < 0L);
+ LogEventComparator c = new LogEventComparator();
+ Event left = new Event("t", 1, 1, -1000000000000000000L, 0);
+ Event right = new Event("t", 1, 1, 1, 0);
+ Assert.assertTrue((left.getTime() - right.getTime()) < 0L);
+ Assert.assertTrue(c.compare(left, right) < 0L);
}
}
Modified: ace/trunk/org.apache.ace.client.rest/src/org/apache/ace/client/rest/LogEventSerializer.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.client.rest/src/org/apache/ace/client/rest/LogEventSerializer.java?rev=1528378&r1=1528377&r2=1528378&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.client.rest/src/org/apache/ace/client/rest/LogEventSerializer.java (original)
+++ ace/trunk/org.apache.ace.client.rest/src/org/apache/ace/client/rest/LogEventSerializer.java Wed Oct 2 08:54:50 2013
@@ -22,8 +22,7 @@ import java.lang.reflect.Type;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
-import java.util.Dictionary;
-import java.util.Enumeration;
+import java.util.Map;
import org.apache.ace.feedback.AuditEvent;
import org.apache.ace.feedback.Event;
@@ -43,11 +42,9 @@ public class LogEventSerializer implemen
event.addProperty("time", format.format(new Date(e.getTime())));
event.addProperty("type", toAuditEventType(e.getType()));
JsonObject eventProperties = new JsonObject();
- Dictionary p = e.getProperties();
- Enumeration keyEnumeration = p.keys();
- while (keyEnumeration.hasMoreElements()) {
- Object key = keyEnumeration.nextElement();
- eventProperties.addProperty(key.toString(), p.get(key).toString());
+ Map<String, String> p = e.getProperties();
+ for (String key : p.keySet()) {
+ eventProperties.addProperty(key, p.get(key));
}
event.add("properties", eventProperties);
return event;
|