wicket-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mgrigo...@apache.org
Subject svn commit: r1136037 - in /wicket/trunk/wicket-core/src: main/java/org/apache/wicket/pageStore/ test/java/org/apache/wicket/page/persistent/disk/ test/java/org/apache/wicket/versioning/
Date Wed, 15 Jun 2011 13:45:28 GMT
Author: mgrigorov
Date: Wed Jun 15 13:45:27 2011
New Revision: 1136037

URL: http://svn.apache.org/viewvc?rev=1136037&view=rev
Log:
WICKET-3791 Improve AsynchronousDataStore

Rework AsynchronousDataStore to use BlockingQueue.
This way it works as classic consumer-producer.
Add javadocs.


Modified:
    wicket/trunk/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousDataStore.java
    wicket/trunk/wicket-core/src/test/java/org/apache/wicket/page/persistent/disk/DiskDataStoreTest.java
    wicket/trunk/wicket-core/src/test/java/org/apache/wicket/versioning/PageVersioningTest.java

Modified: wicket/trunk/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousDataStore.java
URL: http://svn.apache.org/viewvc/wicket/trunk/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousDataStore.java?rev=1136037&r1=1136036&r2=1136037&view=diff
==============================================================================
--- wicket/trunk/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousDataStore.java
(original)
+++ wicket/trunk/wicket-core/src/main/java/org/apache/wicket/pageStore/AsynchronousDataStore.java
Wed Jun 15 13:45:27 2011
@@ -16,18 +16,29 @@
  */
 package org.apache.wicket.pageStore;
 
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.wicket.util.lang.Args;
+import org.apache.wicket.util.time.Duration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Facade for {@link IDataStore} that does the actual saving in worker thread.
+ * <p>
+ * Creates an {@link Entry} for each triple (sessionId, pageId, data) and puts it in
+ * {@link #entries} queue if there is room. Acts as producer.<br/>
+ * Later {@link PageSavingRunnable} reads in blocking manner from {@link #entries} and saves
each
+ * entry. Acts as consumer.
+ * </p>
+ * It starts only one instance of {@link PageSavingRunnable} because all we need is to make
the page
+ * storing asynchronous. We don't want to write concurrently in the wrapped {@link IDataStore},
+ * though it may happen in the extreme case when the queue is full. These cases should be
avoided.
  * 
  * @author Matej Knopp
  */
@@ -36,26 +47,55 @@ public class AsynchronousDataStore imple
 	/** Log for reporting. */
 	private static final Logger log = LoggerFactory.getLogger(AsynchronousDataStore.class);
 
-	private static final Object WRITE_LOCK = new Object();
+	/**
+	 * The time to wait when adding an {@link Entry} into the entries
+	 */
+	private static final Duration OFFER_WAIT = Duration.milliseconds(100);
 
-	private final AtomicBoolean destroy = new AtomicBoolean(false);
+	/**
+	 * The time to wait for an entry to save with the wrapped {@link IDataStore}
+	 */
+	private static final Duration POLL_WAIT = Duration.milliseconds(1000);
 
+	/**
+	 * A flag indicating that this {@link IDataStore} should stop
+	 */
+	private final AtomicBoolean destroy;
+
+	/**
+	 * The wrapped {@link IDataStore} that actually stores that pages
+	 */
 	private final IDataStore dataStore;
 
-	private final Queue<Entry> entries = new ConcurrentLinkedQueue<Entry>();
+	/**
+	 * The queue where the entries which have to be saved are temporary stored
+	 */
+	private final BlockingQueue<Entry> entries;
 
-	private final Map<String, Entry> entryMap = new ConcurrentHashMap<String, Entry>();
+	/**
+	 * A map 'sessionId:::pageId' -> {@link Entry}. Used for fast retrieval of {@link Entry}s
which
+	 * are not yet stored by the wrapped {@link IDataStore}
+	 */
+	private final ConcurrentMap<String, Entry> entryMap;
 
 	/**
 	 * Construct.
 	 * 
 	 * @param dataStore
+	 *            the wrapped {@link IDataStore} that actually saved the data
+	 * @param capacity
+	 *            the capacity of the queue that delays the saving
 	 */
-	public AsynchronousDataStore(final IDataStore dataStore)
+	public AsynchronousDataStore(final IDataStore dataStore, final int capacity)
 	{
 		this.dataStore = dataStore;
-
-		new Thread(new PageSavingRunnable(), "Wicket-PageSavingThread").start();
+		destroy = new AtomicBoolean(false);
+		entries = new ArrayBlockingQueue<Entry>(capacity);
+		entryMap = new ConcurrentHashMap<String, Entry>();
+
+		PageSavingRunnable savingRunnable = new PageSavingRunnable(dataStore, entries, entryMap,
+			destroy);
+		new Thread(savingRunnable, "Wicket-PageSavingThread").start();
 	}
 
 	/**
@@ -65,12 +105,6 @@ public class AsynchronousDataStore imple
 	{
 		destroy.set(true);
 
-		synchronized (entries)
-		{
-			// let the saving thread continue
-			entries.notify();
-		}
-
 		try
 		{
 			synchronized (destroy)
@@ -106,9 +140,17 @@ public class AsynchronousDataStore imple
 		Entry entry = getEntry(sessionId, id);
 		if (entry != null)
 		{
+			log.debug(
+				"Returning the data of a non-stored entry with sessionId '{}' and pageId '{}'",
+				sessionId, id);
 			return entry.getData();
 		}
-		return dataStore.getData(sessionId, id);
+		byte[] data = dataStore.getData(sessionId, id);
+
+		log.debug("Returning the data of a stored entry with sessionId '{}' and pageId '{}'",
+			sessionId, id);
+
+		return data;
 	}
 
 	/**
@@ -120,31 +162,20 @@ public class AsynchronousDataStore imple
 	}
 
 	/**
-	 * @return max queue size
-	 */
-	protected int getMaxQueuedEntries()
-	{
-		return 100;
-	}
-
-	/**
 	 * @see org.apache.wicket.pageStore.IDataStore#removeData(java.lang.String, int)
 	 */
 	public void removeData(final String sessionId, final int id)
 	{
-		synchronized (WRITE_LOCK)
+		String key = getKey(sessionId, id);
+		if (key != null)
 		{
-			String key = getKey(sessionId, id);
-			if (key != null)
+			Entry entry = entryMap.remove(key);
+			if (entry != null)
 			{
-				Entry entry = entryMap.remove(key);
-				if (entry != null)
-				{
-					entries.remove(entry);
-				}
+				entries.remove(entry);
 			}
-
 		}
+
 		dataStore.removeData(sessionId, id);
 	}
 
@@ -153,16 +184,14 @@ public class AsynchronousDataStore imple
 	 */
 	public void removeData(final String sessionId)
 	{
-		synchronized (WRITE_LOCK)
+		// make a copy to iterate to avoid ConcurrentModificationException
+		Entry[] entriesCopy = entries.toArray(new Entry[entries.size()]);
+		for (Entry entry : entriesCopy)
 		{
-			for (Iterator<Entry> iter = entries.iterator(); iter.hasNext();)
+			if (entry.getSessionId().equals(sessionId))
 			{
-				Entry e = iter.next();
-				if (e.getSessionId().equals(sessionId))
-				{
-					iter.remove();
-					entryMap.remove(getKey(e));
-				}
+				entryMap.remove(getKey(entry));
+				entries.remove(entry);
 			}
 		}
 
@@ -170,24 +199,33 @@ public class AsynchronousDataStore imple
 	}
 
 	/**
+	 * Save the entry in the queue if there is a room or directly pass it to the wrapped
+	 * {@link IDataStore} if there is no such
+	 * 
 	 * @see org.apache.wicket.pageStore.IDataStore#storeData(java.lang.String, int, byte[])
 	 */
 	public void storeData(final String sessionId, final int id, final byte[] data)
 	{
-		if (entryMap.size() > getMaxQueuedEntries())
-		{
-			dataStore.storeData(sessionId, id, data);
-		}
-		else
+		Entry entry = new Entry(sessionId, id, data);
+		try
 		{
-			Entry entry = new Entry(sessionId, id, data);
-			entryMap.put(getKey(sessionId, id), entry);
-			entries.add(entry);
-			synchronized (entries)
+			boolean added = entries.offer(entry, OFFER_WAIT.getMilliseconds(),
+				TimeUnit.MILLISECONDS);
+
+			if (added == false)
+			{
+				dataStore.storeData(sessionId, id, data);
+			}
+			else
 			{
-				entries.notify();
+				entryMap.put(getKey(entry), entry);
 			}
 		}
+		catch (InterruptedException e)
+		{
+			log.error(e.getMessage(), e);
+			dataStore.storeData(sessionId, id, data);
+		}
 	}
 
 	/**
@@ -196,9 +234,9 @@ public class AsynchronousDataStore imple
 	 * @param sessionId
 	 * @return generated key
 	 */
-	private String getKey(final String sessionId, final int pageId)
+	private static String getKey(final String sessionId, final int pageId)
 	{
-		return pageId + "::: " + sessionId;
+		return pageId + ":::" + sessionId;
 	}
 
 	/**
@@ -206,13 +244,13 @@ public class AsynchronousDataStore imple
 	 * @param entry
 	 * @return generated key
 	 */
-	private String getKey(final Entry entry)
+	private static String getKey(final Entry entry)
 	{
 		return getKey(entry.getSessionId(), entry.getPageId());
 	}
 
 	/**
-	 * 
+	 * The structure used for an entry in the queue
 	 */
 	private static class Entry
 	{
@@ -222,9 +260,9 @@ public class AsynchronousDataStore imple
 
 		public Entry(final String sessionId, final int pageId, final byte data[])
 		{
-			this.sessionId = sessionId;
+			this.sessionId = Args.notNull(sessionId, "sessionId");
 			this.pageId = pageId;
-			this.data = data;
+			this.data = Args.notNull(data, "data");
 		}
 
 		public String getSessionId()
@@ -241,51 +279,91 @@ public class AsynchronousDataStore imple
 		{
 			return data;
 		}
+
+		@Override
+		public int hashCode()
+		{
+			final int prime = 31;
+			int result = 1;
+			result = prime * result + pageId;
+			result = prime * result + ((sessionId == null) ? 0 : sessionId.hashCode());
+			return result;
+		}
+
+		@Override
+		public boolean equals(Object obj)
+		{
+			if (this == obj)
+				return true;
+			if (obj == null)
+				return false;
+			if (getClass() != obj.getClass())
+				return false;
+			Entry other = (Entry)obj;
+			if (pageId != other.pageId)
+				return false;
+			if (sessionId == null)
+			{
+				if (other.sessionId != null)
+					return false;
+			}
+			else if (!sessionId.equals(other.sessionId))
+				return false;
+			return true;
+		}
+
+		@Override
+		public String toString()
+		{
+			return "Entry [sessionId=" + sessionId + ", pageId=" + pageId + "]";
+		}
+
 	}
 
 	/**
-	 * 
+	 * The thread that acts as consumer of {@link Entry}ies
 	 */
-	private class PageSavingRunnable implements Runnable
+	private static class PageSavingRunnable implements Runnable
 	{
+		private static final Logger log = LoggerFactory.getLogger(PageSavingRunnable.class);
+
+		private final AtomicBoolean destroy;
+
+		private final BlockingQueue<Entry> entries;
+
+		private final ConcurrentMap<String, Entry> entryMap;
+
+		private final IDataStore dataStore;
+
+		private PageSavingRunnable(IDataStore dataStore, BlockingQueue<Entry> entries,
+			ConcurrentMap<String, Entry> entryMap, AtomicBoolean destroy)
+		{
+			this.dataStore = dataStore;
+			this.entries = entries;
+			this.entryMap = entryMap;
+			this.destroy = destroy;
+		}
+
 		public void run()
 		{
-			while (destroy.get() == false || !entries.isEmpty())
+			while (destroy.get() == false)
 			{
-				if (entries.isEmpty())
+				Entry entry = null;
+				try
 				{
-					try
-					{
-						synchronized (entries)
-						{
-							entries.wait();
-						}
-					}
-					catch (InterruptedException e)
-					{
-						log.error(e.getMessage(), e);
-					}
+					entry = entries.poll(POLL_WAIT.getMilliseconds(), TimeUnit.MILLISECONDS);
 				}
-
-				synchronized (WRITE_LOCK)
+				catch (InterruptedException e)
 				{
-					Entry entry = entries.poll();
-					if (entry != null)
-					{
-						dataStore.storeData(entry.getSessionId(), entry.getPageId(),
-							entry.getData());
-						entryMap.remove(getKey(entry));
-					}
+					log.error(e.getMessage(), e);
 				}
-			}
 
-			try
-			{
-				Thread.sleep(10);
-			}
-			catch (InterruptedException e)
-			{
-				log.error(e.getMessage(), e);
+				if (entry != null)
+				{
+					log.debug("Saving asynchronously: {}...", entry);
+					dataStore.storeData(entry.getSessionId(), entry.getPageId(), entry.getData());
+					entryMap.remove(getKey(entry));
+				}
 			}
 
 			synchronized (destroy)

Modified: wicket/trunk/wicket-core/src/test/java/org/apache/wicket/page/persistent/disk/DiskDataStoreTest.java
URL: http://svn.apache.org/viewvc/wicket/trunk/wicket-core/src/test/java/org/apache/wicket/page/persistent/disk/DiskDataStoreTest.java?rev=1136037&r1=1136036&r2=1136037&view=diff
==============================================================================
--- wicket/trunk/wicket-core/src/test/java/org/apache/wicket/page/persistent/disk/DiskDataStoreTest.java
(original)
+++ wicket/trunk/wicket-core/src/test/java/org/apache/wicket/page/persistent/disk/DiskDataStoreTest.java
Wed Jun 15 13:45:27 2011
@@ -302,7 +302,7 @@ public class DiskDataStoreTest extends T
 		{
 			try
 			{
-				Thread.sleep(5);
+				Thread.sleep(50);
 			}
 			catch (InterruptedException e)
 			{
@@ -339,7 +339,7 @@ public class DiskDataStoreTest extends T
 
 		dataStore = new DiskDataStore("app1", fileStoreFolder, MAX_SIZE_PER_SESSION,
 			FILE_CHANNEL_POOL_CAPACITY);
-		dataStore = new AsynchronousDataStore(dataStore);
+		dataStore = new AsynchronousDataStore(dataStore, 100);
 
 		doTestDataStore();
 

Modified: wicket/trunk/wicket-core/src/test/java/org/apache/wicket/versioning/PageVersioningTest.java
URL: http://svn.apache.org/viewvc/wicket/trunk/wicket-core/src/test/java/org/apache/wicket/versioning/PageVersioningTest.java?rev=1136037&r1=1136036&r2=1136037&view=diff
==============================================================================
--- wicket/trunk/wicket-core/src/test/java/org/apache/wicket/versioning/PageVersioningTest.java
(original)
+++ wicket/trunk/wicket-core/src/test/java/org/apache/wicket/versioning/PageVersioningTest.java
Wed Jun 15 13:45:27 2011
@@ -65,7 +65,8 @@ public class PageVersioningTest
 					{
 
 						final IDataStore dataStore = new InMemoryPageStore();
-						final AsynchronousDataStore asyncDS = new AsynchronousDataStore(dataStore);
+						final AsynchronousDataStore asyncDS = new AsynchronousDataStore(dataStore,
+							100);
 						final DefaultPageStore pageStore = new DefaultPageStore(new JavaSerializer(
 							application.getApplicationKey()), asyncDS, 40);
 						return new PageStoreManager(application.getName(), pageStore,



Mime
View raw message