labs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ppod...@apache.org
Subject svn commit: r606127 - in /labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc: DistributedStoreManager.java DistributedStoreQuery.java
Date Fri, 21 Dec 2007 07:55:41 GMT
Author: ppoddar
Date: Thu Dec 20 23:55:35 2007
New Revision: 606127

URL: http://svn.apache.org/viewvc?rev=606127&view=rev
Log:
Parrallel Flush execution

Modified:
    labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
    labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java

Modified: labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java?rev=606127&r1=606126&r2=606127&view=diff
==============================================================================
--- labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
(original)
+++ labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
Thu Dec 20 23:55:35 2007
@@ -23,6 +23,12 @@
 import java.util.BitSet;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.openjpa.enhance.PersistenceCapable;
 import org.apache.openjpa.jdbc.conf.JDBCConfiguration;
@@ -47,6 +53,7 @@
 import org.apache.openjpa.meta.FieldMetaData;
 import org.apache.openjpa.slice.DistributionPolicy;
 import org.apache.openjpa.util.InternalException;
+import org.apache.openjpa.util.StoreException;
 import org.apache.openjpa.util.UserException;
 
 /**
@@ -63,6 +70,7 @@
 	private final Integer[] _storeIds;
 	private static final Localizer _loc = 
 		Localizer.forPackage(DistributedStoreManager.class);
+	private static ExecutorService threadPool = Executors.newCachedThreadPool();
 
 	/**
 	 * Constructs a set of child StoreManagers each connected to a physical
@@ -240,15 +248,38 @@
 	@SuppressWarnings("unchecked")
 	public Collection flush(Collection sms) {
 		Collection exceptions = new ArrayList();
-		for (int i = 0; i < _children.length; i++) {
-			Collection subset = new ArrayList();
-			for (Object x : sms) {
-				OpenJPAStateManager sm = (OpenJPAStateManager) x;
-				if (getSliceIndex(sm, null) == i)
-					subset.add(sm);
+		List<List<OpenJPAStateManager>> subsets = 
+			new ArrayList<List<OpenJPAStateManager>>();
+		for (int i=0; i<_children.length; i++)
+			subsets.add(new ArrayList<OpenJPAStateManager>());
+		for (Object x : sms) {
+			OpenJPAStateManager sm = (OpenJPAStateManager) x;
+			int i = getSliceIndex(sm, null);
+			subsets.get(i).add(sm);
+		}
+		List<Future<Collection>> futures = new ArrayList<Future<Collection>>();

+		int i = 0; 
+		for (JDBCStoreManager store:_children) {
+			List<OpenJPAStateManager> toBeFlushed = subsets.get(i++);
+			if (toBeFlushed.isEmpty()) 
+				continue;
+			Flusher flusher = new Flusher();
+			flusher.store   = store;
+			flusher.toFlush = toBeFlushed;
+			futures.add(threadPool.submit(flusher));
+		}
+		for (Future<Collection> future:futures) {
+			Collection error;
+			try {
+				error = future.get();
+				if (!(error == null || error.isEmpty())) {
+					exceptions.addAll(error);
+				}
+			} catch (InterruptedException e) {
+				throw new StoreException(e);
+			} catch (ExecutionException e) {
+				throw new StoreException(e.getCause());
 			}
-			exceptions.addAll(_children[i].flush(subset));
-			subset.clear();
 		}
 		return exceptions;
 	}
@@ -354,5 +385,13 @@
 
 	public boolean syncVersion(OpenJPAStateManager sm, Object edata) {
 		return selectStore(sm, edata).syncVersion(sm, edata);
+	}
+	
+	private static class Flusher implements Callable<Collection> {
+		JDBCStoreManager store;
+		Collection toFlush;
+		public Collection call() throws Exception {
+			return store.flush(toFlush);
+		}
 	}
 }

Modified: labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java?rev=606127&r1=606126&r2=606127&view=diff
==============================================================================
--- labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
(original)
+++ labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
Thu Dec 20 23:55:35 2007
@@ -122,7 +122,7 @@
 				} catch (InterruptedException e) {
 					throw new RuntimeException(e);
 				} catch (ExecutionException e) {
-					throw new StoreException(e);
+					throw new StoreException(e.getCause());
 				}
         	}
         	MergedResultObjectProvider ret = new MergedResultObjectProvider(tmp);
@@ -148,7 +148,7 @@
 				} catch (InterruptedException e) {
 					throw new RuntimeException(e);
 				} catch (ExecutionException e) {
-					throw new StoreException(e);
+					throw new StoreException(e.getCause());
 				}
         	}
         	return new Integer(N);
@@ -173,7 +173,7 @@
 				} catch (InterruptedException e) {
 					throw new RuntimeException(e);
 				} catch (ExecutionException e) {
-					throw new StoreException(e);
+					throw new StoreException(e.getCause());
 				}
         	}
         	return new Integer(N);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@labs.apache.org
For additional commands, e-mail: commits-help@labs.apache.org


Mime
View raw message