ace-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject svn commit: r1571687 - in /ace/trunk/org.apache.ace.log: src/org/apache/ace/log/server/store/impl/ test/org/apache/ace/log/server/store/impl/
Date Tue, 25 Feb 2014 13:45:54 GMT
Author: jawi
Date: Tue Feb 25 13:45:54 2014
New Revision: 1571687

URL: http://svn.apache.org/r1571687
Log:
ACE-461 - added additional concurrency tests for LogStoreImpl:

- also fixed Java7-style generics.


Added:
    ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/LogStoreImplConcurrencyTest.java
  (with props)
Modified:
    ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/LogStoreImpl.java
    ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/ServerLogStoreTester.java

Modified: ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/LogStoreImpl.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/LogStoreImpl.java?rev=1571687&r1=1571686&r2=1571687&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/LogStoreImpl.java
(original)
+++ ace/trunk/org.apache.ace.log/src/org/apache/ace/log/server/store/impl/LogStoreImpl.java
Tue Feb 25 13:45:54 2014
@@ -36,7 +36,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
 
 import org.apache.ace.feedback.Descriptor;
 import org.apache.ace.feedback.Event;
@@ -79,8 +79,8 @@ public class LogStoreImpl implements Log
     }
 
     public List<Event> get(Descriptor descriptor) throws IOException {
+        obtainLock(descriptor.getTargetID(), descriptor.getStoreID());
         try {
-            obtainLock(descriptor.getTargetID(), descriptor.getStoreID());
             return getInternal(descriptor);
         }
         finally {
@@ -278,7 +278,7 @@ public class LogStoreImpl implements Log
                     high = Long.MAX_VALUE;
                 }
                 // send (eventadmin)event about a new (log)event being stored
-                Dictionary props = new Hashtable();
+                Dictionary<String, Object> props = new Hashtable<String, Object>();
                 props.put(LogStore.EVENT_PROP_LOGNAME, m_name);
                 props.put(LogStore.EVENT_PROP_LOG_EVENT, event);
                 m_eventAdmin.postEvent(new org.osgi.service.event.Event(LogStore.EVENT_TOPIC,
props));
@@ -419,8 +419,8 @@ public class LogStoreImpl implements Log
     }
 
     private void clean(String targetID, Long logID) throws IOException {
+        obtainLock(targetID, logID);
         try {
-            obtainLock(targetID, logID);
             List<Event> events = getInternal(new Descriptor(targetID, logID, SortedRangeSet.FULL_SET));
             if (events.size() > m_maxEvents) {
                 for (int i = 0; i < m_maxEvents; i++) {
@@ -454,15 +454,10 @@ public class LogStoreImpl implements Log
 
         // try to obtain the lock if we could not lock it on the first try
         if (alreadyLocked) {
-            int nrOfTries = 1;
-            while (alreadyLocked && nrOfTries < 10000) {
-                try {
-                    Thread.sleep(1);
-                }
-                catch (InterruptedException e) {
-                    break;
-                }
-                nrOfTries++;
+            int nrOfTries = 0;
+            while (alreadyLocked && nrOfTries++ < 10000) {
+                LockSupport.parkNanos(50);
+
                 synchronized (lockedLogs) {
                     alreadyLocked = lockedLogs.contains(logID);
                     if (!alreadyLocked) {

Added: ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/LogStoreImplConcurrencyTest.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/LogStoreImplConcurrencyTest.java?rev=1571687&view=auto
==============================================================================
--- ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/LogStoreImplConcurrencyTest.java
(added)
+++ ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/LogStoreImplConcurrencyTest.java
Tue Feb 25 13:45:54 2014
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ace.log.server.store.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.ace.feedback.Descriptor;
+import org.apache.ace.feedback.Event;
+import org.apache.ace.range.RangeIterator;
+import org.apache.ace.range.SortedRangeSet;
+import org.apache.ace.test.utils.TestUtils;
+import org.osgi.service.event.EventAdmin;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Test cases for {@link LogStoreImpl}.
+ */
+public class LogStoreImplConcurrencyTest {
+    private static final String TARGET_ID = "targetId";
+    private static final long STORE_ID = 12345;
+
+    private static class Reader implements Runnable {
+        private final String m_name;
+        private final CountDownLatch m_start;
+        private final CountDownLatch m_stop;
+        private final LogStoreImpl m_store;
+        private final ConcurrentMap<Long, Boolean> m_seen = new ConcurrentHashMap<Long,
Boolean>();
+        private final int m_count;
+
+        public Reader(LogStoreImpl store, CountDownLatch start, CountDownLatch stop, int
count) {
+            this(store, start, stop, count, 0);
+        }
+
+        public Reader(LogStoreImpl store, CountDownLatch start, CountDownLatch stop, int
count, int initial) {
+            m_name = "Reader-" + initial;
+            m_store = store;
+            m_start = start;
+            m_stop = stop;
+            m_count = count;
+        }
+
+        @Override
+        public void run() {
+            Random rnd = new Random();
+
+            try {
+                m_start.await();
+
+                System.out.printf("Reader (%s) starting to read %d records...%n", m_name,
m_count);
+
+                while (m_seen.size() < m_count) {
+                    try {
+                        if (rnd.nextInt(1000) >= 995) {
+                            // perform a random cleanup...
+                            m_store.clean();
+                        }
+                        List<Descriptor> descriptors = m_store.getDescriptors(TARGET_ID);
+                        for (Descriptor desc : descriptors) {
+                            SortedRangeSet rangeSet = desc.getRangeSet();
+                            RangeIterator rangeIter = rangeSet.iterator();
+                            while (rangeIter.hasNext()) {
+                                m_seen.putIfAbsent(Long.valueOf(rangeIter.next()), Boolean.TRUE);
+                            }
+                        }
+                    }
+                    catch (IOException e) {
+                        System.out.printf("I/O exception (%s) caught: %s in %s.%n", e.getClass().getSimpleName(),
e.getMessage(), getCaller(e));
+                    }
+                }
+
+                System.out.printf("Reader (%s) finished with %d records read...%n", m_name,
m_seen.size());
+            }
+            catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+            finally {
+                m_stop.countDown();
+
+                System.out.println("Ending reader (" + m_name + ")");
+            }
+        }
+    }
+
+    private static class Writer implements Runnable {
+        private final String m_name;
+        private final CountDownLatch m_start;
+        private final CountDownLatch m_stop;
+        private final LogStoreImpl m_store;
+        private final ConcurrentMap<Long, Event> m_written = new ConcurrentHashMap<Long,
Event>();
+        private final int m_count;
+        private final int m_initValue;
+        private final int m_stepSize;
+
+        public Writer(LogStoreImpl store, CountDownLatch start, CountDownLatch stop, int
count) {
+            this(store, start, stop, count, 0, 1);
+        }
+
+        public Writer(LogStoreImpl store, CountDownLatch start, CountDownLatch stop, int
count, int initial, int stepSize) {
+            m_name = "Writer-" + initial;
+            m_store = store;
+            m_start = start;
+            m_stop = stop;
+            m_count = count;
+            m_initValue = initial;
+            m_stepSize = stepSize;
+        }
+
+        @Override
+        public void run() {
+            Random rnd = new Random();
+
+            try {
+                m_start.await();
+
+                System.out.printf("Writer (%s) starts writing %d records...%n", m_name, m_count);
+
+                for (int i = m_initValue; i < m_count; i += m_stepSize) {
+                    long id = i;
+                    Event event = new Event(TARGET_ID, STORE_ID, id, id, rnd.nextInt(10));
+
+                    m_store.put(Arrays.asList(event));
+                    m_written.putIfAbsent(Long.valueOf(id), event);
+                }
+
+                System.out.printf("Writer (%s) finished with %d records written...%n", m_name,
m_written.size());
+            }
+            catch (InterruptedException e) {
+                // ok, stop...
+            }
+            catch (IOException exception) {
+                exception.printStackTrace();
+            }
+            finally {
+                m_stop.countDown();
+                
+                System.out.println("Ending writer (" + m_name + ")");
+            }
+        }
+    }
+
+    private static String getCaller(Exception e) {
+        StringBuilder sb = new StringBuilder();
+        StackTraceElement[] st = e.getStackTrace();
+        int n = Math.min(st.length, 1);
+        int m = Math.min(st.length, 4);
+        for (int i = n; i < m; i++) {
+            if (i > n) {
+                sb.append(" -> ");
+            }
+            StackTraceElement ste = st[i];
+            sb.append(ste.getClassName()).append(".").append(ste.getMethodName()).append("(").append(ste.getLineNumber()).append(")");
+        }
+        return sb.toString();
+    }
+
+    private File m_baseDir;
+    private ExecutorService m_executor;
+    private CompletionService<Boolean> m_completionService;
+
+    /**
+     * Tests that concurrent use of a {@link LogStoreImpl} with multiple readers and multiple
writers works as expected.
+     */
+    @Test(enabled = false)
+    public void testConcurrentUseMultipleReaderAndMultipleWriters() throws Exception {
+        File storeFile = File.createTempFile("feedback", ".store");
+        storeFile.deleteOnExit();
+
+        final int recordCount = 10000;
+        final int readerCount = 3; // Runtime.getRuntime().availableProcessors() + 1;
+        final int writerCount = 3; // Runtime.getRuntime().availableProcessors() + 1;
+
+        final LogStoreImpl store = createLogStore();
+
+        final CountDownLatch start = new CountDownLatch(1);
+        final CountDownLatch stop = new CountDownLatch(writerCount + readerCount);
+
+        Writer[] writers = new Writer[writerCount];
+        for (int i = 0; i < writerCount; i++) {
+            writers[i] = new Writer(store, start, stop, recordCount, i, writerCount);
+        }
+
+        Reader[] readers = new Reader[readerCount];
+        for (int i = 0; i < readerCount; i++) {
+            readers[i] = new Reader(store, start, stop, recordCount, i);
+        }
+
+        // gents, start your engines...
+        for (int i = 0; i < readers.length; i++) {
+            m_completionService.submit(readers[i], Boolean.TRUE);
+        }
+        for (int i = 0; i < writers.length; i++) {
+            m_completionService.submit(writers[i], Boolean.TRUE);
+        }
+
+        // 3, 2, 1... GO...
+        start.countDown();
+
+        // waiting for all threads to finish...
+        for (int i = 0, r = 0; r < 10 && i < writerCount + readerCount; i++)
{
+            Future<Boolean> future = m_completionService.poll(1, TimeUnit.MINUTES);
+            if (future == null) {
+                r++;
+            }
+        }
+        assertTrue(stop.await(5, TimeUnit.SECONDS));
+
+        int readCount = 0;
+        for (int i = 0; i < readers.length; i++) {
+            readCount += readers[i].m_seen.size();
+        }
+        int writtenCount = 0;
+        for (int i = 0; i < writers.length; i++) {
+            writtenCount += writers[i].m_written.size();
+        }
+
+        assertEquals(recordCount, writtenCount, "Not all records were written?");
+        // All readers read the exact same data, so we've got N copies of it...
+        assertEquals(readCount, readerCount * writtenCount, "Not all records were seen?");
+
+        verifyStoreContents(store, recordCount, writers);
+    }
+
+    /**
+     * Tests that concurrent use of a {@link LogStoreImpl} with a single reader and multiple
writers works as expected.
+     */
+    @Test(enabled = false)
+    public void testConcurrentUseSingleReaderAndMultipleWriters() throws Exception {
+        File storeFile = File.createTempFile("feedback", ".store");
+        storeFile.deleteOnExit();
+
+        final int recordCount = 10000;
+        final int writerCount = 3; // Runtime.getRuntime().availableProcessors() + 1;
+
+        final LogStoreImpl store = createLogStore();
+
+        final CountDownLatch start = new CountDownLatch(1);
+        final CountDownLatch stop = new CountDownLatch(writerCount + 1);
+
+        Writer[] writers = new Writer[writerCount];
+
+        for (int i = 0; i < writerCount; i++) {
+            writers[i] = new Writer(store, start, stop, recordCount, i, writerCount);
+        }
+
+        Reader reader = new Reader(store, start, stop, recordCount);
+
+        // gents, start your engines...
+        m_completionService.submit(reader, Boolean.TRUE);
+        for (int i = 0; i < writers.length; i++) {
+            m_completionService.submit(writers[i], Boolean.TRUE);
+        }
+
+        // 3, 2, 1... GO...
+        start.countDown();
+
+        // waiting for all threads to finish...
+        for (int i = 0, r = 0; r < 10 && i < writerCount + 1; i++) {
+            Future<Boolean> future = m_completionService.poll(1, TimeUnit.MINUTES);
+            if (future == null) {
+                r++;
+            }
+        }
+        assertTrue(stop.await(5, TimeUnit.SECONDS));
+
+        int writtenCount = 0;
+        for (int i = 0; i < writers.length; i++) {
+            writtenCount += writers[i].m_written.size();
+        }
+
+        int readCount = reader.m_seen.size();
+
+        assertEquals(recordCount, writtenCount, "Not all records were written?");
+        assertEquals(readCount, writtenCount, "Not all records were seen?");
+
+        verifyStoreContents(store, recordCount, writers);
+    }
+
+    /**
+     * Tests that concurrent use of a {@link LogStoreImpl} with a single reader and writer
works as expected.
+     */
+    @Test
+    public void testConcurrentUseSingleReaderAndSingleWriter() throws Exception {
+        File storeFile = File.createTempFile("feedback", ".store");
+        storeFile.deleteOnExit();
+
+        final int recordCount = 10000;
+
+        final LogStoreImpl store = createLogStore();
+
+        final CountDownLatch start = new CountDownLatch(1);
+        final CountDownLatch stop = new CountDownLatch(2);
+
+        Writer writer = new Writer(store, start, stop, recordCount);
+        Reader reader = new Reader(store, start, stop, recordCount);
+
+        // gents, start your engines...
+        m_completionService.submit(writer, Boolean.TRUE);
+        m_completionService.submit(reader, Boolean.TRUE);
+
+        // 3, 2, 1... GO...
+        start.countDown();
+
+        // waiting both threads to finish...
+        assertTrue(stop.await(120, TimeUnit.SECONDS));
+
+        int writeCount = writer.m_written.size();
+        int readCount = reader.m_seen.size();
+
+        assertEquals(recordCount, writeCount, "Not all records were written?");
+        assertEquals(readCount, writeCount, "Not all records were seen?");
+
+        verifyStoreContents(store, recordCount, writer);
+    }
+
+    @Test
+    public void testTimedWrite() throws Exception {
+        File storeFile = File.createTempFile("feedback", ".store");
+        storeFile.deleteOnExit();
+
+        final int recordCount = 10000;
+
+        final LogStoreImpl store = createLogStore();
+
+        long start = System.nanoTime();
+        for (int i = 0; i < recordCount; i++) {
+            store.put(Arrays.asList(new Event("1,2,3,4,5")));
+        }
+        long end = System.nanoTime();
+        System.out.printf("Writing %d records took %.3f ms.%n", recordCount, (end - start)
/ 1.0e6);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    protected void setUp() throws Exception {
+        m_baseDir = File.createTempFile("logstore", "txt");
+        m_baseDir.delete();
+        m_baseDir.mkdirs();
+        
+        m_executor = Executors.newCachedThreadPool();
+        m_completionService = new ExecutorCompletionService<Boolean>(m_executor);
+    }
+    
+    @AfterMethod(alwaysRun = true)
+    protected void tearDown() throws InterruptedException {
+        m_executor.shutdownNow();
+        m_executor.awaitTermination(5, TimeUnit.SECONDS);
+    }
+
+    private LogStoreImpl createLogStore() throws Exception {
+        LogStoreImpl logStore = new LogStoreImpl(m_baseDir, "log");
+        TestUtils.configureObject(logStore, EventAdmin.class);
+        logStore.start();
+        return logStore;
+    }
+
+    private void verifyStoreContents(final LogStoreImpl store, final int count, Writer...
writers) throws IOException {
+        // Verify the written file...
+        List<Descriptor> descriptors = store.getDescriptors();
+
+        long expectedID = 0;
+        for (Descriptor desc : descriptors) {
+            SortedRangeSet rangeSet = desc.getRangeSet();
+            RangeIterator rangeIter = rangeSet.iterator();
+
+            while (rangeIter.hasNext()) {
+                long id = rangeIter.next();
+
+                Event expectedEntry = null;
+                for (int i = 0; (expectedEntry == null) && i < writers.length;
i++) {
+                    expectedEntry = writers[i].m_written.remove(id);
+                }
+                assertNotNull(expectedEntry, "Event ID #" + id + " never written?!");
+                // Test continuation of written data...
+                assertEquals(expectedEntry.getID(), expectedID++, "Entry ID mismatch?!");
+            }
+        }
+    }
+}

Propchange: ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/LogStoreImplConcurrencyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/ServerLogStoreTester.java
URL: http://svn.apache.org/viewvc/ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/ServerLogStoreTester.java?rev=1571687&r1=1571686&r2=1571687&view=diff
==============================================================================
--- ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/ServerLogStoreTester.java
(original)
+++ ace/trunk/org.apache.ace.log/test/org/apache/ace/log/server/store/impl/ServerLogStoreTester.java
Tue Feb 25 13:45:54 2014
@@ -218,7 +218,7 @@ public class ServerLogStoreTester {
                     es.execute(new Runnable() {
                         @Override
                         public void run() {
-                            List<Event> list = new ArrayList<>();
+                            List<Event> list = new ArrayList<Event>();
                             list.add(new Event(t, l, i, System.currentTimeMillis(), AuditEvent.FRAMEWORK_STARTED,
props));
                             try {
                                 m_logStore.put(list);



Mime
View raw message