clerezza-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rwes...@apache.org
Subject svn commit: r1457660 - in /clerezza/trunk/rdf.jena.tdb.storage/src: main/java/org/apache/clerezza/rdf/jena/tdb/storage/SingleTdbDatasetTcProvider.java test/java/org/apache/clerezza/rdf/jena/tdb/storage/MultiThreadedSingleTdbDatasetTest.java
Date Mon, 18 Mar 2013 07:33:13 GMT
Author: rwesten
Date: Mon Mar 18 07:33:13 2013
New Revision: 1457660

URL: http://svn.apache.org/r1457660
Log:
CLEREZZA-745: Changes the SingleSingleTdbDatasetTcProvider to use a single ReadWrite lock
for all Clerezza TripleCollections backed by the same Jena Dataset. Also adapts the MultiThreadedSingleTdbDatasetTest
to explicitly validate the issue reported by this issue

Modified:
    clerezza/trunk/rdf.jena.tdb.storage/src/main/java/org/apache/clerezza/rdf/jena/tdb/storage/SingleTdbDatasetTcProvider.java
    clerezza/trunk/rdf.jena.tdb.storage/src/test/java/org/apache/clerezza/rdf/jena/tdb/storage/MultiThreadedSingleTdbDatasetTest.java

Modified: clerezza/trunk/rdf.jena.tdb.storage/src/main/java/org/apache/clerezza/rdf/jena/tdb/storage/SingleTdbDatasetTcProvider.java
URL: http://svn.apache.org/viewvc/clerezza/trunk/rdf.jena.tdb.storage/src/main/java/org/apache/clerezza/rdf/jena/tdb/storage/SingleTdbDatasetTcProvider.java?rev=1457660&r1=1457659&r2=1457660&view=diff
==============================================================================
--- clerezza/trunk/rdf.jena.tdb.storage/src/main/java/org/apache/clerezza/rdf/jena/tdb/storage/SingleTdbDatasetTcProvider.java
(original)
+++ clerezza/trunk/rdf.jena.tdb.storage/src/main/java/org/apache/clerezza/rdf/jena/tdb/storage/SingleTdbDatasetTcProvider.java
Mon Mar 18 07:33:13 2013
@@ -12,23 +12,32 @@ import java.math.BigDecimal;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.charset.Charset;
+import java.util.Collection;
 import java.util.Dictionary;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.clerezza.rdf.core.Graph;
 import org.apache.clerezza.rdf.core.MGraph;
+import org.apache.clerezza.rdf.core.NonLiteral;
+import org.apache.clerezza.rdf.core.Resource;
+import org.apache.clerezza.rdf.core.Triple;
 import org.apache.clerezza.rdf.core.TripleCollection;
 import org.apache.clerezza.rdf.core.UriRef;
 import org.apache.clerezza.rdf.core.access.EntityAlreadyExistsException;
 import org.apache.clerezza.rdf.core.access.EntityUndeletableException;
+import org.apache.clerezza.rdf.core.access.LockableMGraph;
 import org.apache.clerezza.rdf.core.access.LockableMGraphWrapper;
 import org.apache.clerezza.rdf.core.access.NoSuchEntityException;
 import org.apache.clerezza.rdf.core.access.TcProvider;
 import org.apache.clerezza.rdf.core.access.WeightedTcProvider;
+import org.apache.clerezza.rdf.core.event.FilterTriple;
+import org.apache.clerezza.rdf.core.event.GraphListener;
 import org.apache.clerezza.rdf.core.impl.SimpleGraph;
 import org.apache.clerezza.rdf.core.impl.util.PrivilegedGraphWrapper;
 import org.apache.clerezza.rdf.core.impl.util.PrivilegedMGraphWrapper;
@@ -40,7 +49,6 @@ import org.apache.felix.scr.annotations.
 import org.apache.felix.scr.annotations.Properties;
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Service;
-import org.openjena.atlas.lib.Tuple;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.Constants;
 import org.osgi.service.cm.ConfigurationAdmin;
@@ -111,6 +119,7 @@ public class SingleTdbDatasetTcProvider 
     private SyncThread syncThread;
 
     private Dataset dataset;
+    private final ReadWriteLock datasetLock = new ReentrantReadWriteLock();;
     
     private File graphConfigFile;
     private File mGraphConfigFile;
@@ -182,7 +191,7 @@ public class SingleTdbDatasetTcProvider 
                 graph = new PrivilegedGraphWrapper(jenaAdapter.getGraph());
             } else { //construct an MGraph
                 jenaAdapter = new JenaGraphAdaptor(model.getGraph());
-                this.graph =  new LockableMGraphWrapper(
+                this.graph =  new DatasetLockedMGraph(
                     new PrivilegedMGraphWrapper(jenaAdapter));
             }
         }
@@ -275,12 +284,15 @@ public class SingleTdbDatasetTcProvider 
                     interrupt();
                 }
                 if (!stopRequested) {
-                    synchronized (dataset) {
+                	datasetLock.writeLock().lock();
+                	try {
                         for(ModelGraph mg : initModels.values()){
                             if(mg.isReadWrite()){
                                 mg.sync();
                             } //else we do not need to sync read-only models
                         }
+                    } finally {
+                    	datasetLock.writeLock().unlock();
                     }
                 }
             }
@@ -415,7 +427,8 @@ public class SingleTdbDatasetTcProvider 
         } //else exists and is a directory ... nothing to do
         TDB.getContext().set(TDB.symUnionDefaultGraph, true);
         dataset = TDBFactory.createDataset(dataDir.getAbsolutePath());
-
+        //init the read/write lock
+        
         //init the graph config (stores the graph and mgraph names in a config file)
         initGraphConfigs(dataDir,config);
         
@@ -458,13 +471,16 @@ public class SingleTdbDatasetTcProvider 
     @Deactivate
     protected void deactivate(ComponentContext ctx) {
         if(dataset != null){ //avoid NPE on multiple calls
-            synchronized (dataset) {
+            datasetLock.writeLock().lock();
+            try {
                 for(ModelGraph mg : initModels.values()){
                     mg.close(); //close also syncs!
                 }
                 TDB.sync(dataset);
                 dataset.close();
                 dataset = null;
+            } finally {
+            	datasetLock.writeLock().unlock();
             }
         }
         if(syncThread != null){
@@ -472,7 +488,6 @@ public class SingleTdbDatasetTcProvider 
             syncThread = null;
         }
         initModels = null;
-        dataset = null;
         graphConfigFile = null;
         graphNames = null;
         mGraphConfigFile = null;
@@ -495,7 +510,8 @@ public class SingleTdbDatasetTcProvider 
      */
     private ModelGraph getModelGraph(UriRef name, boolean readWrite,boolean create) throws
NoSuchEntityException {
         ModelGraph modelGraph;
-        synchronized (dataset) {
+        datasetLock.readLock().lock();
+        try {
             modelGraph = initModels.get(name);
             if(modelGraph != null && create){
                 throw new EntityAlreadyExistsException(name);
@@ -506,6 +522,8 @@ public class SingleTdbDatasetTcProvider 
                             dataset.getNamedModel(modelName),readWrite);
                 this.initModels.put(name, modelGraph);
             }
+        } finally {
+        	datasetLock.readLock().unlock();
         }
         return modelGraph;
     }
@@ -518,12 +536,15 @@ public class SingleTdbDatasetTcProvider 
         if(name == null){
             throw new IllegalArgumentException("The parsed Graph UriRef MUST NOT be NULL!");
         }
-        synchronized (dataset) {
+        datasetLock.readLock().lock();
+        try {
             if(graphNames.contains(name) || name.equals(defaultGraphName)){
                 return getModelGraph(name,false,false).getGraph();
             } else {
                 throw new NoSuchEntityException(name);
             }
+        } finally {
+        	datasetLock.readLock().unlock();
         }
     }
     /*
@@ -535,12 +556,15 @@ public class SingleTdbDatasetTcProvider 
         if(name == null){
             throw new IllegalArgumentException("The parsed Graph UriRef MUST NOT be NULL!");
         }
-        synchronized (dataset) {
+        datasetLock.readLock().lock();
+        try {
             if(mGraphNames.contains(name)){
                 return getModelGraph(name,true,false).getMGraph();
             } else {
                 throw new NoSuchEntityException(name);
             }
+        } finally {
+        	datasetLock.readLock().unlock();
         }
     }
     /*
@@ -552,7 +576,8 @@ public class SingleTdbDatasetTcProvider 
         if(name == null){
             throw new IllegalArgumentException("The parsed Graph UriRef MUST NOT be NULL!");
         }
-        synchronized (dataset) {
+        datasetLock.readLock().lock();
+        try {
             if(graphNames.contains(name) || name.equals(defaultGraphName)){
                 return getGraph(name);
             } else if(mGraphNames.contains(name)){
@@ -560,6 +585,8 @@ public class SingleTdbDatasetTcProvider 
             } else {
                 throw new NoSuchEntityException(name);
             }
+        } finally {
+        	datasetLock.readLock().unlock();
         }
     }
     /*
@@ -594,10 +621,13 @@ public class SingleTdbDatasetTcProvider 
     @Override
     public Set<UriRef> listTripleCollections() {
         Set<UriRef> graphNames = new HashSet<UriRef>();
-        synchronized (dataset) {
+        datasetLock.readLock().lock();
+        try {
             for(Iterator<String> names = dataset.listNames(); 
                 names.hasNext();
                     graphNames.add(new UriRef(names.next())));
+        } finally {
+        	datasetLock.readLock().unlock();
         }
         if(defaultGraphName != null){
             graphNames.add(defaultGraphName);
@@ -614,7 +644,8 @@ public class SingleTdbDatasetTcProvider 
         if(name == null){
             throw new IllegalArgumentException("The parsed MGrpah name MUST NOT be NULL!");
         }
-        synchronized (dataset) {
+        datasetLock.writeLock().lock();
+        try {
             if(graphNames.contains(name) || mGraphNames.contains(name) || name.equals(defaultGraphName)){
                 throw new EntityAlreadyExistsException(name);
             }
@@ -627,6 +658,8 @@ public class SingleTdbDatasetTcProvider 
                         + mGraphConfigFile+"'!",e);
             }
             return graph;
+        } finally {
+        	datasetLock.writeLock().unlock();
         }
     }
     /*
@@ -640,7 +673,8 @@ public class SingleTdbDatasetTcProvider 
             throw new IllegalArgumentException("The parsed Grpah name MUST NOT be NULL!");
         }
         ModelGraph mg;
-        synchronized (dataset) {
+        datasetLock.writeLock().lock();
+        try {
             if(graphNames.contains(name) || mGraphNames.contains(name) || name.equals(defaultGraphName)){
                 throw new EntityAlreadyExistsException(name);
             }
@@ -652,13 +686,13 @@ public class SingleTdbDatasetTcProvider 
                 throw new IllegalStateException("Unable to wirte GraphName config file '"
                         + graphConfigFile+"'!",e);
             }
-        }
-        //add the parsed data!
-        if(triples != null) { //load the initial and final set of triples
-            mg.getJenaAdapter().addAll(triples);
-            synchronized(dataset){
-                mg.sync();
+            //add the parsed data!
+            if(triples != null) { //load the initial and final set of triples
+                mg.getJenaAdapter().addAll(triples);
+                    mg.sync();
             }
+        } finally {
+        	datasetLock.writeLock().unlock();
         }
         return mg.getGraph();
     }
@@ -673,7 +707,8 @@ public class SingleTdbDatasetTcProvider 
         if(name == null){
             throw new IllegalArgumentException("The parsed MGrpah name MUST NOT be NULL!");
         }
-        synchronized (dataset) {
+        datasetLock.writeLock().lock();
+        try {
             if(mGraphNames.remove(name)){
                 try {
                     writeMGraphConfig();
@@ -699,6 +734,8 @@ public class SingleTdbDatasetTcProvider 
             }
             //delete the graph from the initModels list
             initModels.remove(name);
+        } finally {
+        	datasetLock.writeLock().unlock();
         }
     }
     /*
@@ -822,8 +859,13 @@ public class SingleTdbDatasetTcProvider 
         graphNames = new HashSet<UriRef>();
         boolean configPresent = readGraphConfig(graphConfigFile, graphNames);
         log.info("Present named Models");
-        for(Iterator<String> it = dataset.listNames();it.hasNext();){
-            log.info(" > {}",it.next());
+        datasetLock.readLock().lock();
+        try {
+	        for(Iterator<String> it = dataset.listNames();it.hasNext();){
+	            log.info(" > {}",it.next());
+	        }
+        } finally {
+        	datasetLock.readLock().unlock();
         }
         if(configPresent) {
             //validate that all Graphs and MGraphs in the configFile also are 
@@ -880,11 +922,14 @@ public class SingleTdbDatasetTcProvider 
 //                }
 //            }
         } else { //read pre-existing models in the dataset
-            synchronized (dataset) {
+            datasetLock.readLock().lock();
+            try {
                 for(Iterator<String> it = dataset.listNames();it.hasNext();){
                     mGraphNames.add(new UriRef(it.next()));
                 }
                 writeMGraphConfig();
+            } finally {
+            	datasetLock.readLock().unlock();
             }
         }
     }
@@ -913,5 +958,206 @@ public class SingleTdbDatasetTcProvider 
             return false;
         }
     }
+    /**
+     * {@link LockableMGraph} wrapper that uses a single {@link ReadWriteLock} for
+     * the Jena TDB {@link SingleTdbDatasetTcProvider#dataset}
+     * @author Rupert Westenthaler
+     *
+     */
+    private class DatasetLockedMGraph implements LockableMGraph {
+
+    	private final MGraph wrapped;
+
+    	/**
+    	 * Constructs a LocalbleMGraph for an MGraph.
+    	 *
+    	 * @param providedMGraph a non-lockable mgraph
+    	 */
+    	public DatasetLockedMGraph(final MGraph providedMGraph) {
+    		this.wrapped = providedMGraph;
+    	}
+
+    	@Override
+    	public ReadWriteLock getLock() {
+    		return datasetLock;
+    	}
+
+    	@Override
+    	public Graph getGraph() {
+    		datasetLock.readLock().lock();
+    		try {
+    			return wrapped.getGraph();
+    		} finally {
+    			datasetLock.readLock().unlock();
+    		}
+    	}
+
+    	@Override
+    	public Iterator<Triple> filter(NonLiteral subject, UriRef predicate, Resource
object) {
+			//users will need to aquire a readlock while iterating
+			return wrapped.filter(subject, predicate, object);
+    	}
+
+    	@Override
+    	public int size() {
+    		datasetLock.readLock().lock();
+    		try {
+    			return wrapped.size();
+    		} finally {
+    			datasetLock.readLock().unlock();
+    		}
+    	}
+
+    	@Override
+    	public boolean isEmpty() {
+    		datasetLock.readLock().lock();
+    		try {
+    			return wrapped.isEmpty();
+    		} finally {
+    			datasetLock.readLock().unlock();
+    		}
+    	}
+
+    	@Override
+    	public boolean contains(Object o) {
+    		datasetLock.readLock().lock();
+    		try {
+    			return wrapped.contains(o);
+    		} finally {
+    			datasetLock.readLock().unlock();
+    		}
+    	}
+
+    	@Override
+    	public Iterator<Triple> iterator() {
+    		//users will need it acquire a read lock while iterating!
+			return wrapped.iterator();
+    	}
+
+    	@Override
+    	public Object[] toArray() {
+    		datasetLock.readLock().lock();
+    		try {
+    			return wrapped.toArray();
+    		} finally {
+    			datasetLock.readLock().unlock();
+    		}
+    	}
+
+    	@Override
+    	public <T> T[] toArray(T[] a) {
+    		datasetLock.readLock().lock();
+    		try {
+    			return wrapped.toArray(a);
+    		} finally {
+    			datasetLock.readLock().unlock();
+    		}
+    	}
+
+    	@Override
+    	public boolean containsAll(Collection<?> c) {
+    		datasetLock.readLock().lock();
+    		try {
+    			return wrapped.containsAll(c);
+    		} finally {
+    			datasetLock.readLock().unlock();
+    		}
+    	}
+
+    	@Override
+    	public boolean add(Triple e) {
+    		datasetLock.writeLock().lock();
+    		try {
+    			return wrapped.add(e);
+    		} finally {
+    			datasetLock.writeLock().unlock();
+    		}
+    	}
+
+    	@Override
+    	public boolean remove(Object o) {
+    		datasetLock.writeLock().lock();
+    		try {
+    			return wrapped.remove(o);
+    		} finally {
+    			datasetLock.writeLock().unlock();
+    		}
+    	}
+
+    	@Override
+    	public boolean addAll(Collection<? extends Triple> c) {
+    		datasetLock.writeLock().lock();
+    		try {
+    			return wrapped.addAll(c);
+    		} finally {
+    			datasetLock.writeLock().unlock();
+    		}
+    	}
+
+    	@Override
+    	public boolean removeAll(Collection<?> c) {
+    		datasetLock.writeLock().lock();
+    		try {
+    			return wrapped.removeAll(c);
+    		} finally {
+    			datasetLock.writeLock().unlock();
+    		}
+    	}
+
+    	@Override
+    	public boolean retainAll(Collection<?> c) {
+    		datasetLock.writeLock().lock();
+    		try {
+    			return wrapped.retainAll(c);
+    		} finally {
+    			datasetLock.writeLock().unlock();
+    		}
+    	}
+
+    	@Override
+    	public void clear() {
+    		datasetLock.writeLock().lock();
+    		try {
+    			wrapped.clear();
+    		} finally {
+    			datasetLock.writeLock().unlock();
+    		}
+    	}
+
+    	@Override
+    	public void addGraphListener(GraphListener listener, FilterTriple filter, long delay)
{
+    		wrapped.addGraphListener(listener, filter, delay);
+    	}
+
+    	@Override
+    	public void addGraphListener(GraphListener listener, FilterTriple filter) {
+    		wrapped.addGraphListener(listener, filter);
+    	}
+
+    	@Override
+    	public void removeGraphListener(GraphListener listener) {
+    		wrapped.removeGraphListener(listener);
+    	}
+
+    	@Override
+    	public int hashCode() {
+    		return wrapped.hashCode();
+    	}
+
+    	@Override
+    	public boolean equals(Object obj) {
+    		if(obj instanceof DatasetLockedMGraph){
+    			DatasetLockedMGraph other = (DatasetLockedMGraph) obj;
+    			return wrapped.equals(other.wrapped);
+    		} else {
+    			return false;
+    		}
+    	}
+
+    	@Override
+    	public String toString() {
+    		return wrapped.toString();
+    	}    	
+    }
 
 }

Modified: clerezza/trunk/rdf.jena.tdb.storage/src/test/java/org/apache/clerezza/rdf/jena/tdb/storage/MultiThreadedSingleTdbDatasetTest.java
URL: http://svn.apache.org/viewvc/clerezza/trunk/rdf.jena.tdb.storage/src/test/java/org/apache/clerezza/rdf/jena/tdb/storage/MultiThreadedSingleTdbDatasetTest.java?rev=1457660&r1=1457659&r2=1457660&view=diff
==============================================================================
--- clerezza/trunk/rdf.jena.tdb.storage/src/test/java/org/apache/clerezza/rdf/jena/tdb/storage/MultiThreadedSingleTdbDatasetTest.java
(original)
+++ clerezza/trunk/rdf.jena.tdb.storage/src/test/java/org/apache/clerezza/rdf/jena/tdb/storage/MultiThreadedSingleTdbDatasetTest.java
Mon Mar 18 07:33:13 2013
@@ -23,11 +23,17 @@ package org.apache.clerezza.rdf.jena.tdb
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Dictionary;
 import java.util.HashSet;
 import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import junit.framework.Assert;
 
@@ -37,6 +43,7 @@ import org.apache.clerezza.rdf.core.MGra
 import org.apache.clerezza.rdf.core.Resource;
 import org.apache.clerezza.rdf.core.Triple;
 import org.apache.clerezza.rdf.core.UriRef;
+import org.apache.clerezza.rdf.core.access.LockableMGraph;
 import org.apache.clerezza.rdf.core.impl.PlainLiteralImpl;
 import org.apache.clerezza.rdf.core.impl.TripleImpl;
 import org.apache.felix.scr.annotations.Activate;
@@ -44,35 +51,41 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.openjena.atlas.logging.Log;
 import org.osgi.service.cm.ConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.wymiwyg.commons.util.Util;
 
 
 public class MultiThreadedSingleTdbDatasetTest {
 	
+	private Logger log = LoggerFactory.getLogger(MultiThreadedSingleTdbDatasetTest.class);
 	
-	
+	private static final String TEST_GRAPH_URI_PREFIX = "http://www.example.org/multiThreadTest";
+	private int[] graphNum = new int[]{0};
 	/** 
 	 * how many threads to start
 	 */
-	private static final int THREAD_COUNT = 100;
+	private static final int TEST_THREAD_COUNT = 25;
+	private static final int VALIDATE_THREAD_COUNT = 2;
 	/**
 	 * how many seconds to let them run
 	 */
-	private static final int DELAY = 30;
+	private static final int DELAY = 15;
 	
 	
-	private MGraph mGraph;
-	private Set<Triple> testTriples = Collections.synchronizedSet(new HashSet<Triple>());

+	protected final List<MGraph> mGraphs = new ArrayList<MGraph>();
+	protected final List<Set<Triple>> testTriplesList = new ArrayList<Set<Triple>>();
+	private Random random = new Random();
 
 	class TestThread extends Thread {
 
-		private final int id;
 		private boolean stopRequested;
 		private int addedTripleCount = 0;
 
 		public TestThread(final int id) {
-			this.id = id;
+			setName("Test Thread "+id);
 			start();
 		}
 
@@ -83,9 +96,35 @@ public class MultiThreadedSingleTdbDatas
 		@Override
 		public void run() {
 			while (!stopRequested) {
+				float r;
+				synchronized (random) {
+					r = random.nextFloat();
+				}
+				MGraph graph;
+				Set<Triple> testTriples;
+				if(r > 0.995){
+					int num;
+					synchronized (graphNum) {
+						num = graphNum[0];
+						graphNum[0]++;
+					}
+					graph = provider.createMGraph(new UriRef(TEST_GRAPH_URI_PREFIX+num));
+					log.info(" ... creating the {}. Grpah", num+1);
+					testTriples = new HashSet<Triple>();
+					synchronized (mGraphs) {
+						mGraphs.add(graph);
+						testTriplesList.add(testTriples);
+					}
+				} else { //map the range [0..0.995] to the mGraphs
+					synchronized (mGraphs) {
+						int num = Math.round(r*(float)(mGraphs.size()-1)/0.995f);
+						graph = mGraphs.get(num);
+						testTriples = testTriplesList.get(num);
+					}
+				}
 				Literal randomLiteral = new PlainLiteralImpl(Util.createRandomString(22));
 				Triple triple = new TripleImpl(new BNode(), new UriRef("http://example.com/property"),
randomLiteral);
-				mGraph.add(triple);
+				graph.add(triple);
 				addedTripleCount++;
 				if ((addedTripleCount % 100) == 0) {
 					testTriples.add(triple);
@@ -98,6 +137,60 @@ public class MultiThreadedSingleTdbDatas
 		}
 
 	}
+	/**
+	 * Iterates over max. the first 10 triples of a Graph
+	 * while acquiring a read lock on the graph.
+	 * @author westei
+	 *
+	 */
+	class ValidatorThread extends Thread {
+		
+		boolean stopRequested = false;
+		
+		public ValidatorThread(int id) {
+			setName("Validator Thread "+id);
+			start();
+		}
+		
+		public void requestStop() {
+			stopRequested = true;
+		}
+
+		@Override
+		public void run() {
+			while (!stopRequested) {
+				float r;
+				synchronized (random) {
+					r = random.nextFloat();
+				}
+				int num = Math.round(r*(float)(mGraphs.size()-1));
+				LockableMGraph graph;
+				synchronized (mGraphs) {
+					graph = (LockableMGraph)mGraphs.get(num);
+				}
+				int elem = 0;
+				graph.getLock().readLock().lock();
+				try {
+					Iterator<Triple> it = graph.iterator();
+					while(it.hasNext() && elem < 10){
+						elem++;
+						it.next();
+					}
+				} finally {
+					graph.getLock().readLock().unlock();
+				}
+				//iterate inly every 200ms
+				try {
+					Thread.sleep(200);
+				} catch (InterruptedException e) {
+					//ignore
+				}
+			}
+		}
+		
+		
+		
+	}
 
 	
 	
@@ -114,30 +207,52 @@ public class MultiThreadedSingleTdbDatas
         provider = new SingleTdbDatasetTcProvider(config);
     }
     @Before
-    public void createGraph(){
-        this.mGraph = provider.createMGraph(new UriRef("http://www.example.org/multiThreadTest"));
+    public void createGraphs(){
+        mGraphs.add(provider.createMGraph(new UriRef(TEST_GRAPH_URI_PREFIX+graphNum[0])));
+		testTriplesList.add(new HashSet<Triple>());
+        graphNum[0]++;
+        mGraphs.add(provider.createMGraph(new UriRef(TEST_GRAPH_URI_PREFIX+graphNum[0])));
+		testTriplesList.add(new HashSet<Triple>());
+        graphNum[0]++;
     }
 	@Test
 	public void perform() throws InterruptedException {
-		TestThread[] threads =  new TestThread[THREAD_COUNT];
+		TestThread[] threads =  new TestThread[TEST_THREAD_COUNT];
 		for (int i = 0; i < threads.length; i++) {
 			threads[i] = new TestThread(i);
 		}
+		ValidatorThread[] validators = new ValidatorThread[VALIDATE_THREAD_COUNT];
+		for(int i = 0; i < validators.length; i++) {
+			validators [i] = new ValidatorThread(i);
+		}
 		Thread.sleep(DELAY*1000);
 		for (TestThread testThread : threads) {
 			testThread.requestStop();
 		}
+		for (ValidatorThread validator : validators) {
+			validator.requestStop();
+		}
 		for (TestThread testThread : threads) {
 			testThread.join();
 		}
+		for (ValidatorThread validator : validators) {
+			validator.join();
+		}
 		int addedTriples = 0;
 		for (TestThread testThread : threads) {
 			addedTriples += testThread.getAddedTripleCount();
 		}
-		Assert.assertEquals(addedTriples, mGraph.size());
-		for (Triple testTriple : testTriples) {
-			Assert.assertTrue(mGraph.contains(testTriple));
+		int graphTriples = 0;
+		log.info("Test created {} graphs with {} triples", mGraphs.size(), addedTriples);
+		for(int i = 0;i < mGraphs.size(); i++){
+			MGraph graph = mGraphs.get(i);
+			graphTriples += graph.size();
+			log.info("  > Grpah {}: {} triples",i,graph.size());
+			for (Triple testTriple : testTriplesList.get(i)) {
+				Assert.assertTrue(graph.contains(testTriple));
+			}
 		}
+		Assert.assertEquals(addedTriples, graphTriples);
 	}
     @AfterClass
     public static void cleanUpDirectory() {



Mime
View raw message