labs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ppod...@apache.org
Subject svn commit: r606122 - in /labs/fluid/slice: ./ src/main/java/org/apache/openjpa/slice/jdbc/ src/main/resources/org/apache/openjpa/slice/jdbc/ src/test/resources/META-INF/
Date Fri, 21 Dec 2007 07:27:53 GMT
Author: ppoddar
Date: Thu Dec 20 23:27:48 2007
New Revision: 606122

URL: http://svn.apache.org/viewvc?rev=606122&view=rev
Log:
Parrallel Query execution

Modified:
    labs/fluid/slice/pom.xml
    labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java
    labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
    labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
    labs/fluid/slice/src/main/resources/org/apache/openjpa/slice/jdbc/localizer.properties
    labs/fluid/slice/src/test/resources/META-INF/persistence.xml

Modified: labs/fluid/slice/pom.xml
URL: http://svn.apache.org/viewvc/labs/fluid/slice/pom.xml?rev=606122&r1=606121&r2=606122&view=diff
==============================================================================
--- labs/fluid/slice/pom.xml (original)
+++ labs/fluid/slice/pom.xml Thu Dec 20 23:27:48 2007
@@ -98,7 +98,7 @@
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <version>3.8.1</version>
-      <scope>compile</scope>
+      <scope>test-compile</scope>
     </dependency>
     
   </dependencies>

Modified: labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java?rev=606122&r1=606121&r2=606122&view=diff
==============================================================================
--- labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java
(original)
+++ labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedJDBCConfigurationImpl.java
Thu Dec 20 23:27:48 2007
@@ -18,7 +18,10 @@
  */
 package org.apache.openjpa.slice.jdbc;
 
+import java.sql.Connection;
+import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -26,13 +29,16 @@
 
 import javax.sql.DataSource;
 
+import org.apache.openjpa.conf.OpenJPAConfiguration;
 import org.apache.openjpa.jdbc.conf.JDBCConfiguration;
 import org.apache.openjpa.jdbc.conf.JDBCConfigurationImpl;
 import org.apache.openjpa.jdbc.schema.DataSourceFactory;
 import org.apache.openjpa.kernel.BrokerImpl;
+import org.apache.openjpa.lib.conf.BooleanValue;
 import org.apache.openjpa.lib.conf.Configurations;
 import org.apache.openjpa.lib.conf.PluginValue;
 import org.apache.openjpa.lib.jdbc.DecoratingDataSource;
+import org.apache.openjpa.lib.log.Log;
 import org.apache.openjpa.lib.util.Localizer;
 import org.apache.openjpa.slice.DistributionPolicy;
 import org.apache.openjpa.util.UserException;
@@ -50,16 +56,21 @@
 		new ArrayList<JDBCConfiguration>();
 	private String[] urls;
 	private DecoratingDataSource dataSource;
-	public PluginValue distributionPolicyPlugin;
-	public DistributionPolicy distributionPolicy;
 	
+	protected BooleanValue lenient;
+	protected PluginValue distributionPolicyPlugin;
+	protected DistributionPolicy distributionPolicy;
+	
+	private Log _log;
 	private static Localizer _loc = 
 		Localizer.forPackage(DistributedJDBCConfigurationImpl.class);
 	
 	public DistributedJDBCConfigurationImpl() {
 		super(true, false);
+		_log = getLog(LOG_RUNTIME);
 		distributionPolicyPlugin = addPlugin("DistributionPolicy", true);
 		distributionPolicyPlugin.setDynamic(true);
+		lenient = addBoolean("Lenient");
 	}
 	
 	public Iterator<JDBCConfiguration> iterator() {
@@ -71,9 +82,10 @@
 	 */
 	public String[] getConnectionURLs() {
 		if (urls == null) {
-		    urls = getURLs();
+		    urls = parseURLs();
 		    Map parent = toProperties(true);
 		    Configurations.removeProperty("DistributionPolicy", parent);
+		    Configurations.removeProperty("Lenient", parent);
 		    for (String url:urls) {
 		    	JDBCConfigurationImpl child = new JDBCConfigurationImpl();
 		    	child.fromProperties(parent);
@@ -109,21 +121,82 @@
         return dataSource;
     }
     
+    /**
+     * Create a virtual DistributedDataSource as a composite of individual 
+     * physical data sources as per configuration, optionally ignoring 
+     * any non-reachable sources.
+     */
     private DistributedDataSource createDistributedDataStore() {
     	List<DataSource> dataSources = new ArrayList<DataSource>();
-    	for (JDBCConfiguration conf:children)
-    		dataSources.add(DataSourceFactory.newDataSource(conf, false));
+    	boolean isLenient = lenient.get();
+    	List<JDBCConfiguration> badConfs = new ArrayList<JDBCConfiguration>();
+    	for (JDBCConfiguration conf:children) {
+    		String url = conf.getConnectionURL();
+    		if (_log.isInfoEnabled())
+    			_log.info(_loc.get("connect", url));
+    		DataSource ds = DataSourceFactory.newDataSource(conf, false);
+    		if (verifyDataSource(isLenient, badConfs, conf, ds)) {
+    			dataSources.add(ds);
+    		}
+    	}
+    	removeBadConnection(badConfs);
     	return new DistributedDataSource(dataSources);
     }
+    
+    private boolean verifyDataSource(boolean isLenient, List<JDBCConfiguration>
+    	badConfs, JDBCConfiguration conf, DataSource ds) {
+		Connection con = null;
+		try {
+			con = ds.getConnection();
+			if (con == null) {
+				handleBadConnection(isLenient, badConfs, conf);
+				return false;
+			}
+			return true;
+		} catch (SQLException ex) {
+			handleBadConnection(isLenient, badConfs, conf);
+			return false;
+		} finally {
+			if (con != null) 
+			try {
+				con.close();
+			} catch (SQLException ex) {
+				
+			}
+		}
+    }
+    
+    private void removeBadConnection(List<JDBCConfiguration> badConfs) {
+    	if (badConfs == null || badConfs.isEmpty())
+    		return;
+    	children.removeAll(badConfs);
+    	List<String> badURLs = new ArrayList<String>(badConfs.size());
+    	for (JDBCConfiguration bad:badConfs)
+    		badURLs.add(bad.getConnectionURL());
+    	List<String> allURLs =  Arrays.asList(getConnectionURLs());
+    	urls = new String[allURLs.size()-badURLs.size()];
+    	int i = 0;
+    	for (String url:allURLs)
+    		if (!badURLs.contains(url))
+    			urls[i++] = url;
+    }
+    
+    
+    private void handleBadConnection(boolean isLenient, 
+    		List<JDBCConfiguration> badConfs, JDBCConfiguration conf) {
+		String url = conf.getConnectionURL();
+		if (isLenient) {
+			_log.error(_loc.get("connect-error-lenient", url));
+			badConfs.add(conf);
+		} else
+			throw new UserException(_loc.get("connect-error", url));
+    }
 
-    private String[] getURLs() {
-    	StringTokenizer tokenizer = new StringTokenizer(getConnectionURL(),"|");
-    	String[] tokens = new String[tokenizer.countTokens()];
-	    if (tokens.length == 0)
-	    	throw new UserException(_loc.get("wrong-url", getConnectionURL()));
-    	for (int i=0; i<tokens.length; i++) {
-    		tokens[i] = tokenizer.nextToken();
-    	}
-    	return tokens;
+    private String[] parseURLs() {
+    	String url = getConnectionURL();
+    	if (url == null || url.trim().length() == 0)
+    		throw new UserException("no-url");
+    	String regex = "\\|";
+    	return url.split(regex);
     }
 }

Modified: labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java?rev=606122&r1=606121&r2=606122&view=diff
==============================================================================
--- labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
(original)
+++ labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreManager.java
Thu Dec 20 23:27:48 2007
@@ -19,6 +19,7 @@
 package org.apache.openjpa.slice.jdbc;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.BitSet;
 import java.util.Collection;
 import java.util.Iterator;
@@ -46,6 +47,7 @@
 import org.apache.openjpa.meta.FieldMetaData;
 import org.apache.openjpa.slice.DistributionPolicy;
 import org.apache.openjpa.util.InternalException;
+import org.apache.openjpa.util.UserException;
 
 /**
  * A Store manager for multiple physical databases.
@@ -117,6 +119,14 @@
 		PersistenceCapable pc = sm.getPersistenceCapable();
 		int newi = _conf.getDistributionPolicyInstance().distribute(pc,
 				_conf.getConnectionURLs());
+		if (newi<0 || newi>=_storeIds.length) {
+			String[] urls = _conf.getConnectionURLs();
+			throw new UserException(_loc.get("bad-policy-slice", 
+				new Object[]{
+				_conf.getDistributionPolicyInstance().getClass().getName(), 
+				newi, sm.getPersistenceCapable(), 
+				urls.length-1, Arrays.toString(urls)}));
+		}
 		sm.setImplData(_storeIds[newi], true);
 		return newi;
 	}

Modified: labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java?rev=606122&r1=606121&r2=606122&view=diff
==============================================================================
--- labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
(original)
+++ labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
Thu Dec 20 23:27:48 2007
@@ -21,6 +21,12 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.openjpa.jdbc.kernel.JDBCStore;
 import org.apache.openjpa.jdbc.kernel.JDBCStoreQuery;
@@ -31,6 +37,7 @@
 import org.apache.openjpa.lib.rop.MergedResultObjectProvider;
 import org.apache.openjpa.lib.rop.ResultObjectProvider;
 import org.apache.openjpa.meta.ClassMetaData;
+import org.apache.openjpa.util.StoreException;
 
 /**
  * A query for distributed databases.
@@ -77,15 +84,17 @@
 		ExpressionStoreQuery.DataStoreExecutor {
 		private List<Executor> executors = new ArrayList<Executor>();
 		private DistributedStoreQuery owner = null;
+		private static ExecutorService threadPool = 
+			Executors.newCachedThreadPool();
 		
 		public void addExecutor(Executor ex) {
 			executors.add(ex);
 		}
 		
-        public ParallelExecutor(DistributedStoreQuery q, ClassMetaData meta, 
+        public ParallelExecutor(DistributedStoreQuery dsq, ClassMetaData meta, 
         		boolean subclasses, ExpressionParser parser, Object parsed) {
-        	super(q, meta, subclasses, parser, parsed);
-        	owner = q;
+        	super(dsq, meta, subclasses, parser, parsed);
+        	owner = dsq;
         }
         
         /**
@@ -93,12 +102,28 @@
          * given query context.
          */
         public ResultObjectProvider executeQuery(StoreQuery q,
-                Object[] params, Range range) {
+                final Object[] params, final Range range) {
         	ResultObjectProvider[] tmp = new ResultObjectProvider[executors.size()];
-        	Iterator<StoreQuery> qs = owner._queries.iterator();
+        	final Iterator<StoreQuery> qs = owner._queries.iterator();
+        	final List<Future<ResultObjectProvider>> futures = 
+        		new ArrayList<Future<ResultObjectProvider>>();
         	int i = 0;
-        	for (Executor ex:executors) {
-        		tmp[i++] = ex.executeQuery(qs.next(), params, range);
+        	for (Executor ex:executors)  {
+        		QueryExecutor call = new QueryExecutor();
+        		call.executor = ex;
+        		call.query    = qs.next();
+        		call.params   = params;
+        		call.range    = range;
+        		futures.add(threadPool.submit(call)); 
+        	}
+        	for (Future<ResultObjectProvider> future:futures) {
+        		try {
+					tmp[i++] = future.get();
+				} catch (InterruptedException e) {
+					throw new RuntimeException(e);
+				} catch (ExecutionException e) {
+					throw new StoreException(e);
+				}
         	}
         	MergedResultObjectProvider ret = new MergedResultObjectProvider(tmp);
         	return ret;
@@ -106,26 +131,82 @@
         
         public Number executeDelete(StoreQuery q, Object[] params) {
         	Iterator<StoreQuery> qs = owner._queries.iterator();
-        	int i = 0;
+        	final List<Future<Number>> futures = new ArrayList<Future<Number>>();
         	for (Executor ex:executors) {
-        		Number n = ex.executeDelete(qs.next(), params).intValue();
-        		if (n != null) 
-        			i += n.intValue();
+        		DeleteExecutor call = new DeleteExecutor();
+        		call.executor = ex;
+        		call.query    = qs.next();
+        		call.params   = params;
+        		futures.add(threadPool.submit(call)); 
         	}
-        	return new Integer(i);
+        	int N = 0;
+        	for (Future<Number> future:futures) {
+        		try {
+            		Number n = future.get();
+            		if (n != null) 
+            			N += n.intValue();
+				} catch (InterruptedException e) {
+					throw new RuntimeException(e);
+				} catch (ExecutionException e) {
+					throw new StoreException(e);
+				}
+        	}
+        	return new Integer(N);
         }
         
         public Number executeUpdate(StoreQuery q, Object[] params) {
         	Iterator<StoreQuery> qs = owner._queries.iterator();
-        	int i = 0;
+        	final List<Future<Number>> futures = new ArrayList<Future<Number>>();
         	for (Executor ex:executors) {
-        		Number n = ex.executeUpdate(qs.next(), params).intValue();
-        		if (n != null) 
-        			i += n.intValue();
+        		UpdateExecutor call = new UpdateExecutor();
+        		call.executor = ex;
+        		call.query    = qs.next();
+        		call.params   = params;
+        		futures.add(threadPool.submit(call)); 
         	}
-        	return new Integer(i);
+        	int N = 0;
+        	for (Future<Number> future:futures) {
+        		try {
+            		Number n = future.get();
+            		if (n != null) 
+            			N += n.intValue();
+				} catch (InterruptedException e) {
+					throw new RuntimeException(e);
+				} catch (ExecutionException e) {
+					throw new StoreException(e);
+				}
+        	}
+        	return new Integer(N);
         }
 
+	}
+	
+	static  class QueryExecutor implements Callable<ResultObjectProvider> {
+		StoreQuery query;
+		Executor executor;
+		Object[] params;
+		Range range;
+		public ResultObjectProvider call() throws Exception {
+			return executor.executeQuery(query, params, range);
+		}
+	}
+	
+	static  class DeleteExecutor implements Callable<Number> {
+		StoreQuery query;
+		Executor executor;
+		Object[] params;
+		public Number call() throws Exception {
+			return executor.executeDelete(query, params);
+		}
+	}
+	
+	static  class UpdateExecutor implements Callable<Number> {
+		StoreQuery query;
+		Executor executor;
+		Object[] params;
+		public Number call() throws Exception {
+			return executor.executeDelete(query, params);
+		}
 	}
 }
 

Modified: labs/fluid/slice/src/main/resources/org/apache/openjpa/slice/jdbc/localizer.properties
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/resources/org/apache/openjpa/slice/jdbc/localizer.properties?rev=606122&r1=606121&r2=606122&view=diff
==============================================================================
--- labs/fluid/slice/src/main/resources/org/apache/openjpa/slice/jdbc/localizer.properties
(original)
+++ labs/fluid/slice/src/main/resources/org/apache/openjpa/slice/jdbc/localizer.properties
Thu Dec 20 23:27:48 2007
@@ -1,2 +1,12 @@
+no-url: No ConnectionURL property has been specified.
 wrong-url: URL "{0}" is invalid as database URL.
-wrong-slice: Wrong slice "{0}" for "{1}"
\ No newline at end of file
+wrong-slice: Wrong slice "{0}" for "{1}"
+connect: Connecting to "{0}"
+connect-error-lenient: Failed to connect to "{0}". This database will be \
+	ignored as Lenient property is set to true.
+connect-error: Failed to connect to "{0}"
+bad-policy-slice:Distribution policy "{0}" has returned invalid slice index \
+	"{1}" for "{2}". The valid slice index is between 0 and {3} both \
+	inclusive representing index of active databases "{4}". This can happen \
+	when one or more of the originally configured databases are not available \
+	and Lenient property is set to true.
\ No newline at end of file

Modified: labs/fluid/slice/src/test/resources/META-INF/persistence.xml
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/test/resources/META-INF/persistence.xml?rev=606122&r1=606121&r2=606122&view=diff
==============================================================================
--- labs/fluid/slice/src/test/resources/META-INF/persistence.xml (original)
+++ labs/fluid/slice/src/test/resources/META-INF/persistence.xml Thu Dec 20 23:27:48 2007
@@ -11,12 +11,13 @@
             <property name="openjpa.ConnectionUserName" value="root"/>
             <property name="openjpa.ConnectionPassword" value="hello"/>
             <property name="slice.DistributionPolicy" value="org.apache.openjpa.slice.policy.UserDistributionPolicy"/>
+            <property name="slice.Lenient" value="false"/>
             
             <property name="openjpa.Connection2URL" value="jdbc:mysql://localhost/slice1"/>
-	    <property name="openjpa.jdbc.SynchronizeMappings" value="buildSchema"/>
+	        <property name="openjpa.jdbc.SynchronizeMappings" value="buildSchema"/>
             <property name="openjpa.QueryCompilationCache" value="false"/>
-   	    <property name="openjpa.Log" value="DefaultLevel=WARN, Enhance=TRACE, SQL=TRACE,
Query=TRACE"/>
-   	    <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/>
+   	        <property name="openjpa.Log" value="DefaultLevel=WARN, Enhance=TRACE"/>
+   	        <property name="openjpa.RuntimeUnenhancedClasses" value="unsupported"/>
             <property name="openjpa.jdbc.MappingDefaults" value="DefaultMissingInfo=true"/>
       	</properties>
    </persistence-unit>



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@labs.apache.org
For additional commands, e-mail: commits-help@labs.apache.org


Mime
View raw message