labs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ppod...@apache.org
Subject svn commit: r605340 [2/2] - in /labs/fluid/slice: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/openjpa/ src/main/java/org/apache/openjpa/slice/ src/main/java/org/apache/openjpa/slice/jdbc/ src/t...
Date Tue, 18 Dec 2007 22:08:01 GMT
Added: labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java?rev=605340&view=auto
==============================================================================
--- labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
(added)
+++ labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedStoreQuery.java
Tue Dec 18 14:07:57 2007
@@ -0,0 +1,108 @@
+package org.apache.openjpa.slice.jdbc;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.openjpa.jdbc.kernel.JDBCStore;
+import org.apache.openjpa.jdbc.kernel.JDBCStoreQuery;
+import org.apache.openjpa.kernel.ExpressionStoreQuery;
+import org.apache.openjpa.kernel.QueryContext;
+import org.apache.openjpa.kernel.StoreQuery;
+import org.apache.openjpa.kernel.exps.ExpressionParser;
+import org.apache.openjpa.lib.rop.MergedResultObjectProvider;
+import org.apache.openjpa.lib.rop.ResultObjectProvider;
+import org.apache.openjpa.meta.ClassMetaData;
+
+/**
+ * A query for distributed databases.
+ * 
+ * @author Pinaki Poddar 
+ *
+ */
+@SuppressWarnings("serial")
+public class DistributedStoreQuery extends JDBCStoreQuery {
+	private List<StoreQuery> _queries = new ArrayList<StoreQuery>();
+	private ExpressionParser _parser;
+	
+	public DistributedStoreQuery(JDBCStore store, ExpressionParser parser) {
+		super(store, parser);
+		_parser = parser;
+	}
+	
+	void add(StoreQuery q) {
+		_queries.add(q);
+	}
+	
+    public Executor newDataStoreExecutor(ClassMetaData meta, boolean subs) {
+    	ParallelExecutor ex = new ParallelExecutor(this, meta, subs, _parser, 
+    			ctx.getCompilation());
+        for (StoreQuery q:_queries) {
+        	ex.addExecutor(q.newDataStoreExecutor(meta, subs));
+        }
+        return ex;
+    }
+    
+    public void setContext(QueryContext ctx) {
+    	super.setContext(ctx);
+    	for (StoreQuery q:_queries) 
+    		q.setContext(ctx); 
+    }
+
+	
+	public static class ParallelExecutor extends 
+		ExpressionStoreQuery.DataStoreExecutor {
+		private List<Executor> executors = new ArrayList<Executor>();
+		private DistributedStoreQuery owner = null;
+		
+		public void addExecutor(Executor ex) {
+			executors.add(ex);
+		}
+		
+        public ParallelExecutor(DistributedStoreQuery q, ClassMetaData meta, 
+        		boolean subclasses, ExpressionParser parser, Object parsed) {
+        	super(q, meta, subclasses, parser, parsed);
+        	owner = q;
+        }
+        
+        /**
+         * Each child query must be executed with slice context and not the 
+         * given query context.
+         */
+        public ResultObjectProvider executeQuery(StoreQuery q,
+                Object[] params, Range range) {
+        	ResultObjectProvider[] tmp = new ResultObjectProvider[executors.size()];
+        	Iterator<StoreQuery> qs = owner._queries.iterator();
+        	int i = 0;
+        	for (Executor ex:executors) {
+        		tmp[i++] = ex.executeQuery(qs.next(), params, range);
+        	}
+        	MergedResultObjectProvider ret = new MergedResultObjectProvider(tmp);
+        	return ret;
+        }
+        
+        public Number executeDelete(StoreQuery q, Object[] params) {
+        	Iterator<StoreQuery> qs = owner._queries.iterator();
+        	int i = 0;
+        	for (Executor ex:executors) {
+        		Number n = ex.executeDelete(qs.next(), params).intValue();
+        		if (n != null) 
+        			i += n.intValue();
+        	}
+        	return new Integer(i);
+        }
+        
+        public Number executeUpdate(StoreQuery q, Object[] params) {
+        	Iterator<StoreQuery> qs = owner._queries.iterator();
+        	int i = 0;
+        	for (Executor ex:executors) {
+        		Number n = ex.executeUpdate(qs.next(), params).intValue();
+        		if (n != null) 
+        			i += n.intValue();
+        	}
+        	return new Integer(i);
+        }
+
+	}
+}
+

Added: labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedTemplate.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedTemplate.java?rev=605340&view=auto
==============================================================================
--- labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedTemplate.java
(added)
+++ labs/fluid/slice/src/main/java/org/apache/openjpa/slice/jdbc/DistributedTemplate.java
Tue Dec 18 14:07:57 2007
@@ -0,0 +1,255 @@
+package org.apache.openjpa.slice.jdbc;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A template for multiple Statements being executed by multiple connections.
+ * 
+ * @author Pinaki Poddar 
+ *
+ */
+public class DistributedTemplate<T extends Statement> 
+	implements Statement, Iterable<T> {
+	protected List<T> stmts = new ArrayList<T>();
+	protected final DistributedConnection con;
+	protected T master;
+	
+	public DistributedTemplate(DistributedConnection c) {
+		con = c;
+	}
+	
+	public Iterator<T> iterator() {
+		return stmts.iterator();
+	}
+	
+	public void add(T s) {
+		if (stmts.isEmpty())
+			master = s;
+		try {
+			if (!con.contains(s.getConnection()))
+				throw new IllegalArgumentException(s + " has different connection");
+			stmts.add(s);
+		} catch (SQLException e) {
+			e.printStackTrace();
+		}
+	}
+	
+	public void addBatch(String sql) throws SQLException {
+		for (T s:this)
+			s.addBatch(sql);
+	}
+
+	public void cancel() throws SQLException {
+		for (T s:this)
+			s.cancel();
+	}
+
+	public void clearBatch() throws SQLException {
+		for (T s:this)
+			s.clearBatch();
+	}
+
+	public void clearWarnings() throws SQLException {
+		for (T s:this)
+			s.clearWarnings();
+	}
+
+	public void close() throws SQLException {
+		for (T s:this)
+			s.close();
+	}
+
+	public boolean execute(String arg0) throws SQLException {
+		boolean ret = true;
+		for (T s:this)
+			ret = s.execute(arg0) & ret;
+		return ret;
+	}
+
+	public boolean execute(String arg0, int arg1) throws SQLException {
+		boolean ret = true;
+		for (T s:this)
+			ret = s.execute(arg0, arg1) & ret;
+		return ret;
+	}
+
+	public boolean execute(String arg0, int[] arg1) throws SQLException {
+		boolean ret = true;
+		for (T s:this)
+			ret = s.execute(arg0, arg1) & ret;
+		return ret;
+	}
+
+	public boolean execute(String arg0, String[] arg1) throws SQLException {
+		boolean ret = true;
+		for (T s:this)
+			ret = s.execute(arg0, arg1) & ret;
+		return ret;
+	}
+
+	public int[] executeBatch() throws SQLException {
+		int[] ret = new int[0];
+		for (Statement s:this) {
+			int[] tmp = s.executeBatch();
+			ret = new int[ret.length + tmp.length];
+			System.arraycopy(tmp, 0, ret, ret.length-tmp.length, tmp.length);
+		}
+		return ret;
+	}
+
+	public ResultSet executeQuery() throws SQLException {
+		DistributedResultSet rs = new DistributedResultSet();
+		for (T s:this)
+			rs.add(s.executeQuery(null));
+		return rs;
+	}
+
+	public ResultSet executeQuery(String arg0) throws SQLException {
+		DistributedResultSet rs = new DistributedResultSet();
+		for (T s:this)
+			rs.add(s.executeQuery(arg0));
+		return rs;
+	}
+
+	public int executeUpdate(String arg0) throws SQLException {
+		int ret = 0;
+		for (T s:this)
+			ret += s.executeUpdate(arg0);
+		return ret;
+	}
+
+	public int executeUpdate(String arg0, int arg1) throws SQLException {
+		int ret = 0;
+		for (T s:this)
+			ret += s.executeUpdate(arg0, arg1);
+		return ret;
+	}
+
+	public int executeUpdate(String arg0, int[] arg1) throws SQLException {
+		int ret = 0;
+		for (T s:this)
+			ret += s.executeUpdate(arg0, arg1);
+		return ret;
+	}
+
+	public int executeUpdate(String arg0, String[] arg1) throws SQLException {
+		int ret = 0;
+		for (T s:this)
+			ret += s.executeUpdate(arg0, arg1);
+		return ret;
+	}
+
+	public Connection getConnection() throws SQLException {
+		return con;
+	}
+
+	public int getFetchDirection() throws SQLException {
+		return master.getFetchDirection();
+	}
+
+	public int getFetchSize() throws SQLException {
+		return master.getFetchSize();
+	}
+
+	public ResultSet getGeneratedKeys() throws SQLException {
+		DistributedResultSet mrs = new DistributedResultSet();
+		for (T s:this)
+			mrs.add(s.getGeneratedKeys());
+		return mrs;
+	}
+
+	public int getMaxFieldSize() throws SQLException {
+		return master.getMaxFieldSize();
+	}
+
+	public int getMaxRows() throws SQLException {
+		return master.getMaxRows();
+	}
+
+	public boolean getMoreResults() throws SQLException {
+		for (T s:this)
+			if (s.getMoreResults())
+				return true;
+		return false;
+	}
+
+	public boolean getMoreResults(int arg0) throws SQLException {
+		for (T s:this)
+			if (s.getMoreResults(arg0))
+				return true;
+		return false;
+	}
+
+	public int getQueryTimeout() throws SQLException {
+		return master.getQueryTimeout();
+	}
+
+	public ResultSet getResultSet() throws SQLException {
+		DistributedResultSet rs = new DistributedResultSet();
+		for (T s:this)
+			rs.add(s.getResultSet());
+		return rs;
+	}
+
+	public int getResultSetConcurrency() throws SQLException {
+		return master.getResultSetConcurrency();
+	}
+
+	public int getResultSetHoldability() throws SQLException {
+		return master.getResultSetHoldability();
+	}
+
+	public int getResultSetType() throws SQLException {
+		return master.getResultSetType();
+	}
+
+	public int getUpdateCount() throws SQLException {
+		return master.getUpdateCount();
+	}
+
+	public SQLWarning getWarnings() throws SQLException {
+		return master.getWarnings();
+	}
+
+	public void setCursorName(String name) throws SQLException {
+		for (T s:this)
+			s.setCursorName(name);
+	}
+
+	public void setEscapeProcessing(boolean flag) throws SQLException {
+		for (T s:this)
+			s.setEscapeProcessing(flag);
+	}
+
+	public void setFetchDirection(int dir) throws SQLException {
+		for (T s:this)
+			s.setFetchDirection(dir);
+	}
+
+	public void setFetchSize(int size) throws SQLException {
+		for (T s:this)
+			s.setFetchSize(size);
+	}
+
+	public void setMaxFieldSize(int size) throws SQLException {
+		for (T s:this)
+			s.setMaxFieldSize(size);
+	}
+
+	public void setMaxRows(int n) throws SQLException {
+		for (T s:this)
+			s.setMaxFieldSize(n);
+	}
+	
+	public void setQueryTimeout(int n) throws SQLException {
+		for (T s:this)
+			s.setMaxFieldSize(n);
+	}
+}

Added: labs/fluid/slice/src/test/java/domain/Address.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/test/java/domain/Address.java?rev=605340&view=auto
==============================================================================
--- labs/fluid/slice/src/test/java/domain/Address.java (added)
+++ labs/fluid/slice/src/test/java/domain/Address.java Tue Dec 18 14:07:57 2007
@@ -0,0 +1,53 @@
+package domain;
+
+import javax.persistence.*;
+
+@Entity
+public class Address {
+	@Id
+	@GeneratedValue
+	private long id;
+
+	private String city;
+	private int zip;
+
+	@OneToOne(mappedBy = "address")
+	Person owner;
+
+	public Address() {
+		this("?", 0);
+	}
+
+	public Address(String city, int zip) {
+		setCity(city);
+		setZip(zip);
+	}
+
+	public String getCity() {
+		return city;
+	}
+
+	public void setCity(String city) {
+		this.city = city;
+	}
+
+	public int getZip() {
+		return zip;
+	}
+
+	public void setZip(int zip) {
+		this.zip = zip;
+	}
+
+	public Person getOwner() {
+		return owner;
+	}
+
+	public void setOwner(Person owner) {
+		this.owner = owner;
+	}
+
+	public long getId() {
+		return id;
+	}
+}

Added: labs/fluid/slice/src/test/java/domain/PObject.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/test/java/domain/PObject.java?rev=605340&view=auto
==============================================================================
--- labs/fluid/slice/src/test/java/domain/PObject.java (added)
+++ labs/fluid/slice/src/test/java/domain/PObject.java Tue Dec 18 14:07:57 2007
@@ -0,0 +1,32 @@
+package domain;
+
+import javax.persistence.Entity;
+import javax.persistence.Id;
+
+@Entity
+public class PObject {
+	@Id
+	private long id;
+	
+	private int value;
+	
+	public PObject() {
+		this(System.currentTimeMillis());
+	}
+	
+	public PObject(long id) {
+		this.id = id;
+	}
+	
+	public long getId() {
+		return id;
+	}
+	
+	public int getValue() {
+		return value;
+	}
+	
+	public void setValue(int i) {
+		value = i;
+	}
+}

Added: labs/fluid/slice/src/test/java/domain/Person.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/test/java/domain/Person.java?rev=605340&view=auto
==============================================================================
--- labs/fluid/slice/src/test/java/domain/Person.java (added)
+++ labs/fluid/slice/src/test/java/domain/Person.java Tue Dec 18 14:07:57 2007
@@ -0,0 +1,44 @@
+package domain;
+
+import javax.persistence.*;
+
+@Entity
+public class Person {
+	@Id
+	@GeneratedValue
+	private long id;
+
+	private String name;
+
+	@OneToOne(cascade=CascadeType.ALL)
+	private Address address;
+
+	public Person() {
+		this("?");
+	}
+	
+	public Person(String name) {
+		setName(name);
+	}
+	
+	public String getName() {
+		return name;
+	}
+
+	public void setName(String name) {
+		this.name = name;
+	}
+
+	public Address getAddress() {
+		return address;
+	}
+
+	public void setAddress(Address address) {
+		this.address = address;
+		address.setOwner(this);
+	}
+
+	public long getId() {
+		return id;
+	}
+}

Added: labs/fluid/slice/src/test/java/org/apache/openjpa/distributed/OddEvenDistributionPolicy.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/test/java/org/apache/openjpa/distributed/OddEvenDistributionPolicy.java?rev=605340&view=auto
==============================================================================
--- labs/fluid/slice/src/test/java/org/apache/openjpa/distributed/OddEvenDistributionPolicy.java
(added)
+++ labs/fluid/slice/src/test/java/org/apache/openjpa/distributed/OddEvenDistributionPolicy.java
Tue Dec 18 14:07:57 2007
@@ -0,0 +1,15 @@
+package org.apache.openjpa.distributed;
+
+import org.apache.openjpa.slice.DistributionPolicy;
+
+import domain.PObject;
+
+public class OddEvenDistributionPolicy implements DistributionPolicy {
+
+	public int distribute(Object pc, String[] urls) {
+		if (pc instanceof PObject)
+			return (int)(((PObject)pc).getId()%2);
+		return 0;
+	}
+
+}

Added: labs/fluid/slice/src/test/java/org/apache/openjpa/distributed/TestConfiguration.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/test/java/org/apache/openjpa/distributed/TestConfiguration.java?rev=605340&view=auto
==============================================================================
--- labs/fluid/slice/src/test/java/org/apache/openjpa/distributed/TestConfiguration.java (added)
+++ labs/fluid/slice/src/test/java/org/apache/openjpa/distributed/TestConfiguration.java Tue
Dec 18 14:07:57 2007
@@ -0,0 +1,163 @@
+package org.apache.openjpa.distributed;
+
+import java.util.List;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Persistence;
+
+import org.apache.openjpa.conf.OpenJPAConfiguration;
+import org.apache.openjpa.persistence.EntityManagerImpl;
+import org.apache.openjpa.persistence.OpenJPAEntityManager;
+import org.apache.openjpa.persistence.OpenJPAEntityManagerFactory;
+import org.apache.openjpa.persistence.OpenJPAEntityManagerSPI;
+import org.apache.openjpa.persistence.OpenJPAPersistence;
+import org.apache.openjpa.persistence.test.SingleEMFTestCase;
+import org.apache.openjpa.slice.DistributedConfiguration;
+import org.apache.openjpa.slice.jdbc.DistributedJDBCBrokerFactory;
+import org.apache.openjpa.slice.jdbc.DistributedJDBCConfiguration;
+import org.apache.openjpa.slice.jdbc.DistributedJDBCConfigurationImpl;
+
+import domain.Address;
+import domain.PObject;
+import domain.Person;
+
+import junit.framework.TestCase;
+
+public class TestConfiguration extends SingleEMFTestCase {
+	private static String persistenceUnitName = "slice";
+
+	public void setUp() throws Exception {
+		super.setUp(PObject.class, Person.class, Address.class);
+	}
+	
+	public void tearDown() throws Exception {
+		// do not invoke super -- it deletes the data
+	}
+	
+	/**
+	 * Tests that user-level configurations are set.
+	 */
+	public void testConfig() {
+		assertTrue(emf.getConfiguration() instanceof DistributedConfiguration);
+		DistributedConfiguration conf = (DistributedConfiguration)emf.getConfiguration();
+		assertTrue(conf.getConnectionURLs().length>1);
+		String brokerFactory = conf.getBrokerFactory();
+		assertTrue(brokerFactory.equals("slice") || 
+				   brokerFactory.equals(DistributedJDBCBrokerFactory.class.getName()));
+		assertNotNull(conf.getDistributionPolicyInstance());
+	}
+	
+	PObject persist() {
+		EntityManager em = emf.createEntityManager();
+		int value = (int)(System.currentTimeMillis()%100);
+		PObject pc = new PObject();
+		em.getTransaction().begin();
+		em.persist(pc);
+		pc.setValue(value);
+		em.getTransaction().commit();
+		em.clear();
+		return pc;
+	}
+
+	
+	/**
+	 * Stores and finds the same object.
+	 */
+	public void testFind() {
+		PObject pc = persist();
+		int value = pc.getValue();
+		
+		EntityManager em = emf.createEntityManager();
+		em.getTransaction().begin();
+		PObject pc2 = em.find(PObject.class, pc.getId());
+		assertNotNull(pc2);
+		assertNotEquals(pc, pc2);
+		assertEquals(pc.getId(), pc2.getId());
+		assertEquals(value, pc2.getValue());
+	}
+	
+	public void testPersistIndependentObjects() {
+		int before = count(PObject.class);
+		EntityManager em = emf.createEntityManager();
+		int N = 2;
+		long start = System.currentTimeMillis();
+		em.getTransaction().begin();
+		for (int i=0; i<N; i++)
+			em.persist(new PObject(start++));
+		em.getTransaction().commit();
+		em.clear();
+		int after = count(PObject.class);
+		assertEquals(before+N, after);
+	}
+	
+	public void testPersistConnectedObjectGraph() {
+		Person p1 = new Person("A");
+		Person p2 = new Person("B");
+		Person p3 = new Person("C");
+		Address a1 = new Address("Rome", 12345);
+		Address a2 = new Address("San Francisco", 23456);
+		Address a3 = new Address("New York", 34567);
+		p1.setAddress(a1);
+		p2.setAddress(a2);
+		p3.setAddress(a3);
+		
+		EntityManager em = emf.createEntityManager();
+		em.getTransaction().begin();
+		em.persist(p1);
+		em.persist(p2);
+		em.persist(p3);
+		em.getTransaction().commit();
+		
+		em.clear();
+		
+		em = emf.createEntityManager();
+		List<Person> persons = em.createQuery("SELECT p FROM Person p WHERE p.name=?1").
+			setParameter(1, "A").getResultList();
+		List<Address> addresses = em.createQuery("SELECT a FROM Address a").getResultList();
+		for (Address pc:addresses) {
+			System.err.println(pc.getCity() + ":" + pc.getOwner().getName());
+		}
+		for (Person pc:persons) {
+			System.err.println(pc.getName() + ":" + pc.getAddress().getCity());
+		}
+	}
+	
+	/**
+	 * Merge only works if the distribution policy assigns the correct slice
+	 * from which the instance was fetched.
+	 */
+	public void testMerge() {
+		PObject pc = persist();
+		int value = pc.getValue();
+		pc.setValue(value+1);
+		assertNotNull(pc);
+		EntityManager em = emf.createEntityManager();
+		em.getTransaction().begin();
+		PObject pc2 = em.merge(pc);
+		em.getTransaction().commit();
+		em.clear();
+		
+		assertNotNull(pc2);
+		assertNotEquals(pc, pc2);
+		assertEquals(pc.getId(), pc2.getId());
+		assertEquals(value+1, pc2.getValue());
+	}
+	
+	/**
+	 * Count number of instances of given type.
+	 * @return
+	 */
+	int count(Class type) {
+		return count("SELECT p FROM " + type.getSimpleName() + " p");
+	}
+	
+	int count(String query) {
+		EntityManager em = emf.createEntityManager();
+		return em.createQuery(query).getResultList().size();
+	}
+	
+    protected String getPersistenceUnitName() {
+        return persistenceUnitName;
+    }
+
+}

Added: labs/fluid/slice/src/test/java/org/apache/openjpa/distributed/UserDistributionPolicy.java
URL: http://svn.apache.org/viewvc/labs/fluid/slice/src/test/java/org/apache/openjpa/distributed/UserDistributionPolicy.java?rev=605340&view=auto
==============================================================================
--- labs/fluid/slice/src/test/java/org/apache/openjpa/distributed/UserDistributionPolicy.java
(added)
+++ labs/fluid/slice/src/test/java/org/apache/openjpa/distributed/UserDistributionPolicy.java
Tue Dec 18 14:07:57 2007
@@ -0,0 +1,25 @@
+package org.apache.openjpa.distributed;
+
+import org.apache.openjpa.slice.DistributionPolicy;
+
+import domain.Address;
+import domain.PObject;
+import domain.Person;
+
+public class UserDistributionPolicy implements DistributionPolicy {
+
+	public int distribute(Object pc, String[] urls) {
+		if (pc instanceof PObject)
+			return Math.abs((int)((PObject)pc).getId()%urls.length);
+		if (pc instanceof Person) {
+			Person p = (Person)pc;
+			return (p.getName().startsWith("A"))? 0:1;
+		}
+		if (pc instanceof Address) {
+			Person p = ((Address)pc).getOwner();
+			return (p.getName().startsWith("A"))? 0:1;
+		}
+		return 0;
+	}
+
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@labs.apache.org
For additional commands, e-mail: commits-help@labs.apache.org


Mime
View raw message