jena-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sal...@apache.org
Subject [1/2] jena git commit: JENA-848 : Fix jena-text transaction subsystem (for Lucene, the isolation level is now serializable, and 2-phase commit is used to make it as atomic as possible)
Date Thu, 15 Jan 2015 23:20:00 GMT
Repository: jena
Updated Branches:
  refs/heads/master 273ef7708 -> b4da74768


JENA-848 : Fix jena-text transaction subsystem (for Lucene, the isolation level is now serializable,
and 2-phase commit is used to make it as atomic as possible)


Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/767dd930
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/767dd930
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/767dd930

Branch: refs/heads/master
Commit: 767dd930b3285065db6091957e38c8831cf1e24d
Parents: 4ee8d9e
Author: Stephen Allen <sallen@apache.org>
Authored: Thu Jan 15 17:12:31 2015 -0500
Committer: Stephen Allen <sallen@apache.org>
Committed: Thu Jan 15 17:25:35 2015 -0500

----------------------------------------------------------------------
 jena-text/src/main/java/jena/textindexer.java   |   4 +-
 .../jena/query/text/DatasetGraphText.java       | 121 ++++++++++++-------
 .../jena/query/text/TextDatasetFactory.java     |   2 +-
 .../query/text/TextDocProducerEntities.java     |  13 +-
 .../jena/query/text/TextDocProducerTriples.java |  22 +++-
 .../org/apache/jena/query/text/TextIndex.java   |  20 +--
 .../apache/jena/query/text/TextIndexLucene.java |  73 +++++++----
 .../apache/jena/query/text/TextIndexSolr.java   |  47 ++++---
 .../text/TestLuceneWithMultipleThreads.java     | 113 ++++++++++++-----
 9 files changed, 283 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/jena/blob/767dd930/jena-text/src/main/java/jena/textindexer.java
----------------------------------------------------------------------
diff --git a/jena-text/src/main/java/jena/textindexer.java b/jena-text/src/main/java/jena/textindexer.java
index e5586fc..eab736f 100644
--- a/jena-text/src/main/java/jena/textindexer.java
+++ b/jena-text/src/main/java/jena/textindexer.java
@@ -107,7 +107,6 @@ public class textindexer extends CmdARQ {
     @Override
     protected void exec() {
         Set<Node> properties = getIndexedProperties() ;
-        textIndex.startIndexing() ;
 
         // there are various strategies possible here
         // what is implemented is a first cut simple approach
@@ -130,7 +129,8 @@ public class textindexer extends CmdARQ {
                 }
             }
         }
-        textIndex.finishIndexing() ;
+        
+        textIndex.commit();
         progressMonitor.close() ;
     }
 

http://git-wip-us.apache.org/repos/asf/jena/blob/767dd930/jena-text/src/main/java/org/apache/jena/query/text/DatasetGraphText.java
----------------------------------------------------------------------
diff --git a/jena-text/src/main/java/org/apache/jena/query/text/DatasetGraphText.java b/jena-text/src/main/java/org/apache/jena/query/text/DatasetGraphText.java
index 1681323..c5655b5 100644
--- a/jena-text/src/main/java/org/apache/jena/query/text/DatasetGraphText.java
+++ b/jena-text/src/main/java/org/apache/jena/query/text/DatasetGraphText.java
@@ -28,8 +28,11 @@ import org.slf4j.LoggerFactory ;
 import com.hp.hpl.jena.graph.Graph ;
 import com.hp.hpl.jena.graph.Node ;
 import com.hp.hpl.jena.query.ReadWrite ;
-import com.hp.hpl.jena.sparql.JenaTransactionException ;
-import com.hp.hpl.jena.sparql.core.* ;
+import com.hp.hpl.jena.sparql.core.DatasetGraph ;
+import com.hp.hpl.jena.sparql.core.DatasetGraphMonitor ;
+import com.hp.hpl.jena.sparql.core.DatasetGraphWithLock ;
+import com.hp.hpl.jena.sparql.core.GraphView ;
+import com.hp.hpl.jena.sparql.core.Transactional ;
 
 public class DatasetGraphText extends DatasetGraphMonitor implements Transactional
 {
@@ -38,7 +41,13 @@ public class DatasetGraphText extends DatasetGraphMonitor implements Transaction
     private final Transactional dsgtxn ;
     private final Graph         dftGraph ;
     private final boolean       closeIndexOnClose;
-
+    
+    
+    // If we are going to implement Transactional, then we are going to have to do as DatasetGraphWithLock
and
+    // TDB's DatasetGraphTransaction do and track transaction state in a ThreadLocal
+    private final ThreadLocal<ReadWrite> readWriteMode = new ThreadLocal<ReadWrite>();
+    
+    
     public DatasetGraphText(DatasetGraph dsg, TextIndex index, TextDocProducer producer)
     { 
         this(dsg, index, producer, false);
@@ -56,7 +65,7 @@ public class DatasetGraphText extends DatasetGraphMonitor implements Transaction
         this.closeIndexOnClose = closeIndexOnClose;
     }
 
-    // ---- Intecept these and force the use of views.
+    // ---- Intercept these and force the use of views.
     @Override
     public Graph getDefaultGraph() {
         return dftGraph ;
@@ -99,66 +108,94 @@ public class DatasetGraphText extends DatasetGraphMonitor implements
Transaction
         return results.iterator() ;
     }
 
-    // Imperfect.
-    private boolean needFinish = false ;
-
     @Override
     public void begin(ReadWrite readWrite) {
+        readWriteMode.set(readWrite);
         dsgtxn.begin(readWrite) ;
-        // textIndex.begin(readWrite) ;
-        if ( readWrite == ReadWrite.WRITE ) {
-            // WRONG design
-            super.getMonitor().start() ;
-            // Right design.
-            // textIndex.startIndexing() ;
-            needFinish = true ;
-        }
+        super.getMonitor().start() ;
     }
-
+    
+    /**
+     * Rollback all changes, discarding any exceptions that occur.
+     */
+    @Override
+    public void abort() {
+        // Roll back all both objects, discarding any exceptions that occur
+        try { dsgtxn.abort(); } catch (Throwable t) { log.warn("Exception in abort: " + t.getMessage(),
t); }
+        try { textIndex.rollback(); } catch (Throwable t) { log.warn("Exception in abort:
" + t.getMessage(), t); }
+        
+        readWriteMode.set(null) ;
+        super.getMonitor().finish() ;
+    }
+    
+    /**
+     * Perform a 2-phase commit by first calling prepareCommit() on the TextIndex
+     * followed by committing the Transaction object, and then calling commit()
+     * on the TextIndex().
+     * <p> 
+     * If either of the objects fail on either the preparation or actual commit,
+     * it terminates and calls {@link #rollback()} on both of them.
+     * <p>
+     * <b>NOTE:</b> it may happen that the TextIndex fails to commit, after the
+     * Transactional has already successfully committed.  A rollback instruction will
+     * still be issued, but depending on the implementation, it may not have any effect.
+     */
     @Override
     public void commit() {
-        try {
-            if ( needFinish ) {
-                super.getMonitor().finish() ;
-                // textIndex.finishIndexing() ;
+        // Phase 1
+        if (readWriteMode.get() == ReadWrite.WRITE) {
+            try {
+                textIndex.prepareCommit();
+            }
+            catch (Throwable t) {
+                log.error("Exception in prepareCommit: " + t.getMessage(), t) ;
+                abort();
+                throw new TextIndexException(t);
             }
-            needFinish = false ;
-            // textIndex.commit() ;
-            dsgtxn.commit() ;
-        }
-        catch (Throwable ex) {
-            log.warn("Exception in commit: " + ex.getMessage(), ex) ;
-            dsgtxn.abort() ;
-            throw ex;
         }
-    }
-
-    @Override
-    public void abort() {
+        
+        // Phase 2
         try {
-            if ( needFinish )
-                textIndex.abortIndexing() ;
-            dsgtxn.abort() ;
+            dsgtxn.commit();
+            if (readWriteMode.get() == ReadWrite.WRITE) {
+                textIndex.commit();
+            }
         }
-        catch (JenaTransactionException ex) { throw ex ; }
-        catch (RuntimeException ex) { 
-            log.warn("Exception in abort: " + ex.getMessage(), ex) ;
-            throw ex ;
+        catch (Throwable t) {
+            log.error("Exception in commit: " + t.getMessage(), t) ;
+            abort();
+            throw new TextIndexException(t);
         }
+        readWriteMode.set(null);
+        super.getMonitor().finish() ;
     }
 
     @Override
     public boolean isInTransaction() {
-        return dsgtxn.isInTransaction() ;
+        return readWriteMode.get() != null;
     }
 
     @Override
     public void end() {
+        // If we are still in a write transaction at this point, then commit was never called,
so rollback the TextIndex
+        if (readWriteMode.get() == ReadWrite.WRITE) {
+            try {
+                textIndex.rollback();
+            }
+            catch (Throwable t) {
+                log.warn("Exception in end: " + t.getMessage(), t) ;
+            }
+        }
+        
         try {
-            // textIndex.end() ;
             dsgtxn.end() ;
         }
-        catch (Throwable ex) { log.warn("Exception in end: " + ex.getMessage(), ex) ; }
+        catch (Throwable t) {
+            log.warn("Exception in end: " + t.getMessage(), t) ;
+        }
+        
+        readWriteMode.set(null) ;
+        super.getMonitor().finish() ;
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/jena/blob/767dd930/jena-text/src/main/java/org/apache/jena/query/text/TextDatasetFactory.java
----------------------------------------------------------------------
diff --git a/jena-text/src/main/java/org/apache/jena/query/text/TextDatasetFactory.java b/jena-text/src/main/java/org/apache/jena/query/text/TextDatasetFactory.java
index aad5e93..f6ab21b 100644
--- a/jena-text/src/main/java/org/apache/jena/query/text/TextDatasetFactory.java
+++ b/jena-text/src/main/java/org/apache/jena/query/text/TextDatasetFactory.java
@@ -66,8 +66,8 @@ public class TextDatasetFactory
         DatasetGraph dsgt = new DatasetGraphText(dsg, textIndex, producer, closeIndexOnDSGClose)
;
         // Also set on dsg
         Context c = dsgt.getContext() ;
+        c.set(TextQuery.textIndex, textIndex) ;
         
-        dsgt.getContext().set(TextQuery.textIndex, textIndex) ;
         return dsgt ;
     }
     

http://git-wip-us.apache.org/repos/asf/jena/blob/767dd930/jena-text/src/main/java/org/apache/jena/query/text/TextDocProducerEntities.java
----------------------------------------------------------------------
diff --git a/jena-text/src/main/java/org/apache/jena/query/text/TextDocProducerEntities.java
b/jena-text/src/main/java/org/apache/jena/query/text/TextDocProducerEntities.java
index 9a3b1f0..9e2de8d 100644
--- a/jena-text/src/main/java/org/apache/jena/query/text/TextDocProducerEntities.java
+++ b/jena-text/src/main/java/org/apache/jena/query/text/TextDocProducerEntities.java
@@ -35,27 +35,30 @@ public class TextDocProducerEntities extends DatasetChangesBatched implements
Te
     private static Logger          log     = LoggerFactory.getLogger(TextDocProducer.class)
;
     private final EntityDefinition defn ;
     private final TextIndex        indexer ;
-    private boolean                started = false ;
+    
+    // Also have to have a ThreadLocal here to keep track of whether or not we are in a transaction,
+    // therefore whether or not we have to do autocommit
+    private final ThreadLocal<Boolean> inTransaction = new ThreadLocal<Boolean>()
;
 
     public TextDocProducerEntities(EntityDefinition defn, TextIndex indexer) {
         this.defn = defn ;
         this.indexer = indexer ;
+        inTransaction.set(false) ;
     }
 
     @Override
     protected void startBatched() {
-        indexer.startIndexing() ;
-        started = true ;
+        inTransaction.set(true) ;
     }
 
     @Override
     protected void finishBatched() {
-        indexer.finishIndexing() ;
+        inTransaction.set(false) ;
     }
 
     @Override
     protected void dispatch(QuadAction quadAction, List<Quad> batch) {
-        if ( !started )
+        if ( !inTransaction.get() )
             throw new IllegalStateException("Not started") ;
         if ( !QuadAction.ADD.equals(quadAction) )
             return ;

http://git-wip-us.apache.org/repos/asf/jena/blob/767dd930/jena-text/src/main/java/org/apache/jena/query/text/TextDocProducerTriples.java
----------------------------------------------------------------------
diff --git a/jena-text/src/main/java/org/apache/jena/query/text/TextDocProducerTriples.java
b/jena-text/src/main/java/org/apache/jena/query/text/TextDocProducerTriples.java
index 2f0a679..3700eb0 100644
--- a/jena-text/src/main/java/org/apache/jena/query/text/TextDocProducerTriples.java
+++ b/jena-text/src/main/java/org/apache/jena/query/text/TextDocProducerTriples.java
@@ -28,22 +28,26 @@ public class TextDocProducerTriples implements TextDocProducer {
     private static Logger          log     = LoggerFactory.getLogger(TextDocProducerTriples.class)
;
     private final EntityDefinition defn ;
     private final TextIndex        indexer ;
-    private boolean                started = false ;
+    
+    // Also have to have a ThreadLocal here to keep track of whether or not we are in a transaction,
+    // therefore whether or not we have to do autocommit
+    private final ThreadLocal<Boolean> inTransaction = new ThreadLocal<Boolean>()
;
+    
 
     public TextDocProducerTriples(EntityDefinition defn, TextIndex indexer) {
         this.defn = defn ;
         this.indexer = indexer ;
+        inTransaction.set(false) ;
     }
 
     @Override
     public void start() {
-        indexer.startIndexing() ;
-        started = true ;
+        inTransaction.set(true) ;
     }
 
     @Override
     public void finish() {
-        indexer.finishIndexing() ;
+        inTransaction.set(false) ;
     }
 
     @Override
@@ -54,8 +58,14 @@ public class TextDocProducerTriples implements TextDocProducer {
             return ;
 
         Entity entity = TextQueryFuncs.entityFromQuad(defn, g, s, p, o) ;
-        if ( entity != null )
-            // Null means does not match defn
+        // Null means does not match defn
+        if ( entity != null ) {
             indexer.addEntity(entity) ;
+            
+            // Auto commit the entity if we aren't in a transaction
+            if (!inTransaction.get()) {
+                indexer.commit() ;
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/jena/blob/767dd930/jena-text/src/main/java/org/apache/jena/query/text/TextIndex.java
----------------------------------------------------------------------
diff --git a/jena-text/src/main/java/org/apache/jena/query/text/TextIndex.java b/jena-text/src/main/java/org/apache/jena/query/text/TextIndex.java
index 26acdf5..3aabd5b 100644
--- a/jena-text/src/main/java/org/apache/jena/query/text/TextIndex.java
+++ b/jena-text/src/main/java/org/apache/jena/query/text/TextIndex.java
@@ -28,23 +28,27 @@ import com.hp.hpl.jena.graph.Node ;
 /** TextIndex abstraction */ 
 public interface TextIndex extends Closeable //, Transactional 
 {
+    // Transactional operations
+    void prepareCommit() ;
+    void commit() ;
+    void rollback() ;
+    
+    
     // Update operations
-    public abstract void startIndexing() ;
-    public abstract void addEntity(Entity entity) ;
-    public abstract void finishIndexing() ;
-    public abstract void abortIndexing() ;
+    void addEntity(Entity entity) ;
+    
     
     // read operations
     /** Get all entries for uri */
-    public abstract Map<String, Node> get(String uri) ;
+    Map<String, Node> get(String uri) ;
 
     //** score
     // Need to have more complex results.
     
     /** Access the index - limit if -1 for as many as possible */ 
-    public abstract List<Node> query(String qs, int limit) ;
+    List<Node> query(String qs, int limit) ;
     
-    public abstract List<Node> query(String qs) ;
+    List<Node> query(String qs) ;
 
-    public abstract EntityDefinition getDocDef() ;
+    EntityDefinition getDocDef() ;
 }

http://git-wip-us.apache.org/repos/asf/jena/blob/767dd930/jena-text/src/main/java/org/apache/jena/query/text/TextIndexLucene.java
----------------------------------------------------------------------
diff --git a/jena-text/src/main/java/org/apache/jena/query/text/TextIndexLucene.java b/jena-text/src/main/java/org/apache/jena/query/text/TextIndexLucene.java
index 47a804d..36d4050 100644
--- a/jena-text/src/main/java/org/apache/jena/query/text/TextIndexLucene.java
+++ b/jena-text/src/main/java/org/apache/jena/query/text/TextIndexLucene.java
@@ -74,9 +74,20 @@ public class TextIndexLucene implements TextIndex {
 
     private final EntityDefinition docDef ;
     private final Directory        directory ;
-    private final IndexWriter      indexWriter ;
     private final Analyzer         analyzer ;
     
+    // The IndexWriter can't be final because we may have to recreate it if rollback() is
called.
+    // However, it needs to be volatile in case the next write transaction is on a different
thread,
+    // but we do not need locking because we are assuming that there can only be one writer
+    // at a time (enforced elsewhere).
+    private volatile IndexWriter   indexWriter ;
+    
+    /**
+     * Constructs a new TextIndexLucene.
+     * 
+     * @param directory The Lucene Directory for the index
+     * @param def The EntityDefinition that defines how entities are stored in the index
+     */
     public TextIndexLucene(Directory directory, EntityDefinition def) {
         this.directory = directory ;
         this.docDef = def ;
@@ -97,17 +108,23 @@ public class TextIndexLucene implements TextIndex {
         
         this.analyzer = new PerFieldAnalyzerWrapper(new StandardAnalyzer(VER), analyzerPerField)
;
 
+        openIndexWriter();
+    }
+    
+    private void openIndexWriter() {
         IndexWriterConfig wConfig = new IndexWriterConfig(VER, analyzer) ;
         try
         {
             indexWriter = new IndexWriter(directory, wConfig) ;
+            // Force a commit to create the index, otherwise querying before writing will
cause an exception
+            indexWriter.commit();
         }
         catch (IOException e)
         {
             throw new TextIndexException(e) ;
         }
     }
-
+    
     public Directory getDirectory() {
         return directory ;
     }
@@ -119,28 +136,40 @@ public class TextIndexLucene implements TextIndex {
     public IndexWriter getIndexWriter() {
         return indexWriter;
     }
-
+    
     @Override
-    public void startIndexing() { }
-
+    public void prepareCommit() {
+        try {
+            indexWriter.prepareCommit();
+        }
+        catch (IOException e) {
+            throw new TextIndexException(e);
+        }
+    }
+    
     @Override
-    public void finishIndexing() {
+    public void commit() {
         try {
-            indexWriter.commit() ;
+            indexWriter.commit();
         }
         catch (IOException e) {
-            exception(e) ;
+            throw new TextIndexException(e);
         }
     }
-
+    
     @Override
-    public void abortIndexing() {
+    public void rollback() {
+        IndexWriter idx = indexWriter;
+        indexWriter = null;
         try {
-            indexWriter.rollback() ;
+            idx.rollback();
         }
-        catch (IOException ex) {
-            exception(ex) ;
+        catch (IOException e) {
+            throw new TextIndexException(e);
         }
+        
+        // The rollback will close the indexWriter, so we need to reopen it
+        openIndexWriter();
     }
 
     @Override
@@ -149,7 +178,7 @@ public class TextIndexLucene implements TextIndex {
             indexWriter.close() ;
         }
         catch (IOException ex) {
-            exception(ex) ;
+            throw new TextIndexException(ex) ;
         }
     }
 
@@ -162,7 +191,7 @@ public class TextIndexLucene implements TextIndex {
             indexWriter.addDocument(doc) ;
         }
         catch (IOException e) {
-            exception(e) ;
+            throw new TextIndexException(e) ;
         }
     }
 
@@ -187,7 +216,7 @@ public class TextIndexLucene implements TextIndex {
     @Override
     public Map<String, Node> get(String uri) {
         try {
-            IndexReader indexReader = DirectoryReader.open(indexWriter, true);
+            IndexReader indexReader = DirectoryReader.open(directory);
             List<Map<String, Node>> x = get$(indexReader, uri) ;
             if ( x.size() == 0 )
                 return null ;
@@ -196,8 +225,7 @@ public class TextIndexLucene implements TextIndex {
             return x.get(0) ;
         }
         catch (Exception ex) {
-            exception(ex) ;
-            return null ;
+            throw new TextIndexException(ex) ;
         }
     }
 
@@ -248,12 +276,11 @@ public class TextIndexLucene implements TextIndex {
     @Override
     public List<Node> query(String qs, int limit) {
         //** score
-        try (IndexReader indexReader = DirectoryReader.open(indexWriter, true)) {
+        try (IndexReader indexReader = DirectoryReader.open(directory)) {
             return query$(indexReader, qs, limit) ;
         } 
         catch (Exception ex) {
-            exception(ex) ;
-            return null ;
+            throw new TextIndexException(ex) ;
         }
     }
 
@@ -287,8 +314,4 @@ public class TextIndexLucene implements TextIndex {
         // TEMP
         return NodeFactoryExtra.createLiteralNode(v, null, null) ;
     }
-
-    private static void exception(Exception ex) {
-        throw new TextIndexException(ex) ;
-    }
 }

http://git-wip-us.apache.org/repos/asf/jena/blob/767dd930/jena-text/src/main/java/org/apache/jena/query/text/TextIndexSolr.java
----------------------------------------------------------------------
diff --git a/jena-text/src/main/java/org/apache/jena/query/text/TextIndexSolr.java b/jena-text/src/main/java/org/apache/jena/query/text/TextIndexSolr.java
index 41d32ba..45571bc 100644
--- a/jena-text/src/main/java/org/apache/jena/query/text/TextIndexSolr.java
+++ b/jena-text/src/main/java/org/apache/jena/query/text/TextIndexSolr.java
@@ -18,7 +18,11 @@
 
 package org.apache.jena.query.text;
 
-import java.util.* ;
+import java.io.IOException ;
+import java.util.ArrayList ;
+import java.util.HashMap ;
+import java.util.List ;
+import java.util.Map ;
 import java.util.Map.Entry ;
 
 import org.apache.solr.client.solrj.SolrQuery ;
@@ -39,10 +43,10 @@ import com.hp.hpl.jena.sparql.util.NodeFactoryExtra ;
 
 public class TextIndexSolr implements TextIndex
 {
-    private static Logger log = LoggerFactory.getLogger(TextIndexSolr.class) ;
+    private static final Logger log = LoggerFactory.getLogger(TextIndexSolr.class) ;
     private final SolrServer solrServer ;
-    private EntityDefinition docDef ;
-    private static int MAX_N    = 10000 ;
+    private final EntityDefinition docDef ;
+    private static final int MAX_N    = 10000 ;
 
     public TextIndexSolr(SolrServer server, EntityDefinition def)
     {
@@ -51,21 +55,32 @@ public class TextIndexSolr implements TextIndex
     }
     
     @Override
-    public void startIndexing()
-    {}
-
+    public void prepareCommit() { }
+    
     @Override
-    public void finishIndexing()
-    {
-        try { solrServer.commit() ; }
-        catch (Exception ex) { exception(ex) ; }
+    public void commit() {
+        try {
+            solrServer.commit();
+        }
+        catch (SolrServerException e) {
+            throw new TextIndexException(e);
+        }
+        catch (IOException e) {
+            throw new TextIndexException(e);
+        }
     }
-
+    
     @Override
-    public void abortIndexing()
-    {
-        try { solrServer.rollback() ; }
-        catch (Exception ex) { exception(ex) ; }
+    public void rollback() {
+        try {
+            solrServer.rollback();
+        }
+        catch (SolrServerException e) {
+            throw new TextIndexException(e);
+        }
+        catch (IOException e) {
+            throw new TextIndexException(e);
+        }
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/jena/blob/767dd930/jena-text/src/test/java/org/apache/jena/query/text/TestLuceneWithMultipleThreads.java
----------------------------------------------------------------------
diff --git a/jena-text/src/test/java/org/apache/jena/query/text/TestLuceneWithMultipleThreads.java
b/jena-text/src/test/java/org/apache/jena/query/text/TestLuceneWithMultipleThreads.java
index 4788596..b60110d 100644
--- a/jena-text/src/test/java/org/apache/jena/query/text/TestLuceneWithMultipleThreads.java
+++ b/jena-text/src/test/java/org/apache/jena/query/text/TestLuceneWithMultipleThreads.java
@@ -18,18 +18,21 @@
 
 package org.apache.jena.query.text;
 
+import java.util.ArrayList ;
+import java.util.List ;
 import java.util.concurrent.ExecutionException ;
 import java.util.concurrent.ExecutorService ;
 import java.util.concurrent.Executors ;
 import java.util.concurrent.Future ;
 import java.util.concurrent.TimeUnit ;
 
+import org.apache.jena.query.text.assembler.TextVocab ;
 import org.apache.lucene.analysis.standard.StandardAnalyzer ;
 import org.apache.lucene.store.RAMDirectory ;
 import org.apache.lucene.util.Version ;
-import org.junit.Before ;
 import org.junit.Test ;
 
+import com.hp.hpl.jena.graph.NodeFactory ;
 import com.hp.hpl.jena.query.Dataset ;
 import com.hp.hpl.jena.query.DatasetFactory ;
 import com.hp.hpl.jena.query.QueryExecution ;
@@ -38,8 +41,7 @@ import com.hp.hpl.jena.query.ReadWrite ;
 import com.hp.hpl.jena.query.ResultSet ;
 import com.hp.hpl.jena.rdf.model.Model ;
 import com.hp.hpl.jena.rdf.model.ResourceFactory ;
-import com.hp.hpl.jena.sparql.core.DatasetGraph ;
-import com.hp.hpl.jena.sparql.core.Transactional ;
+import com.hp.hpl.jena.sparql.core.DatasetGraphFactory ;
 import com.hp.hpl.jena.sparql.modify.GraphStoreNullTransactional ;
 import com.hp.hpl.jena.vocabulary.RDFS ;
 
@@ -58,20 +60,10 @@ public class TestLuceneWithMultipleThreads
         entDef.setAnalyzer("label", analyzer);
     }
     
-    private DatasetGraph dsg;
-    private Transactional tx;
-    
-    @Before
-    public void setup()
-    {
-        dsg = TextDatasetFactory.createLucene(new GraphStoreNullTransactional(), new RAMDirectory(),
entDef);
-        tx = (Transactional)dsg;
-    }
-    
-    
     @Test
     public void testReadInMiddleOfWrite() throws InterruptedException, ExecutionException
     {
+        final DatasetGraphText dsg = (DatasetGraphText)TextDatasetFactory.createLucene(new
GraphStoreNullTransactional(), new RAMDirectory(), entDef);
         final Dataset ds = DatasetFactory.create(dsg);
         final ExecutorService execService = Executors.newSingleThreadExecutor();
         final Future<?> f = execService.submit(new Runnable()
@@ -82,7 +74,7 @@ public class TestLuceneWithMultipleThreads
                 // Hammer the dataset with a series of read queries
                 while (!Thread.interrupted())
                 {
-                    tx.begin(ReadWrite.READ);
+                    dsg.begin(ReadWrite.READ);
                     try
                     {
                         QueryExecution qExec = QueryExecutionFactory.create("select * where
{ ?s ?p ?o }", ds);
@@ -91,17 +83,17 @@ public class TestLuceneWithMultipleThreads
                         {
                             rs.next();
                         }
-                        tx.commit();
+                        dsg.commit();
                     }
                     finally
                     {
-                        tx.end();
+                        dsg.end();
                     }
                 }
             }
         });
         
-        tx.begin(ReadWrite.WRITE);
+        dsg.begin(ReadWrite.WRITE);
         try
         {
             Model m = ds.getDefaultModel();
@@ -110,11 +102,11 @@ public class TestLuceneWithMultipleThreads
             Thread.sleep(100);
             m.add(ResourceFactory.createResource("http://example.org/"), RDFS.comment, "comment");
             
-            tx.commit();
+            dsg.commit();
         }
         finally
         {
-            tx.end();
+            dsg.end();
         }
         
         execService.shutdownNow();
@@ -127,9 +119,10 @@ public class TestLuceneWithMultipleThreads
     @Test
     public void testWriteInMiddleOfRead() throws InterruptedException, ExecutionException
     {
+        final DatasetGraphText dsg = (DatasetGraphText)TextDatasetFactory.createLucene(new
GraphStoreNullTransactional(), new RAMDirectory(), entDef);
         final int numReads = 10;
         final Dataset ds = DatasetFactory.create(dsg);
-        final ExecutorService execService = Executors.newFixedThreadPool(10); //.newSingleThreadExecutor();
+        final ExecutorService execService = Executors.newFixedThreadPool(10);
         final Future<?> f = execService.submit(new Runnable()
         {
             @Override
@@ -137,7 +130,7 @@ public class TestLuceneWithMultipleThreads
             {
                 while (!Thread.interrupted())
                 {
-                    tx.begin(ReadWrite.WRITE);
+                    dsg.begin(ReadWrite.WRITE);
                     try
                     {
                         Model m = ds.getDefaultModel();
@@ -153,11 +146,11 @@ public class TestLuceneWithMultipleThreads
                         }
                         m.add(ResourceFactory.createResource("http://example.org/"), RDFS.comment,
"comment");
                         
-                        tx.commit();
+                        dsg.commit();
                     }
                     finally
                     {
-                        tx.end();
+                        dsg.end();
                     }
                 }
             }
@@ -165,7 +158,7 @@ public class TestLuceneWithMultipleThreads
         
         for (int i=0; i<numReads; i++)
         {
-            tx.begin(ReadWrite.READ);
+            dsg.begin(ReadWrite.READ);
             try
             {
                 QueryExecution qExec = QueryExecutionFactory.create("select * where { ?s
?p ?o }", ds);
@@ -176,11 +169,11 @@ public class TestLuceneWithMultipleThreads
                 }
                 // Sleep for a bit so that the writer thread can get in between the reads
                 Thread.sleep(100);
-                tx.commit();
+                dsg.commit();
             }
             finally
             {
-                tx.end();
+                dsg.end();
             }
         }
         
@@ -190,4 +183,70 @@ public class TestLuceneWithMultipleThreads
         // If there was an exception in the write thread then Future.get() will throw an
ExecutionException
         assertTrue(f.get() == null);
     }
+    
+    @Test
+    public void testIsolation() throws InterruptedException, ExecutionException {
+        
+        final DatasetGraphText dsg = (DatasetGraphText)TextDatasetFactory.createLucene(DatasetGraphFactory.createMem(),
new RAMDirectory(), entDef);
+        
+        final int numReaders = 2;
+        final List<Future<?>> futures = new ArrayList<Future<?>>(numReaders);
+        final ExecutorService execService = Executors.newFixedThreadPool(numReaders);
+        final Dataset ds = DatasetFactory.create(dsg);
+        
+        
+        for (int i=0; i<numReaders; i++) {
+            futures.add(execService.submit(new Runnable() {
+                @Override
+                public void run()
+                {
+                    while (!Thread.interrupted()) {
+                        dsg.begin(ReadWrite.READ);
+                        try {
+                            QueryExecution qExec = QueryExecutionFactory.create(
+                                    "select * where { graph <http://example.org/graph>
{ ?s <" + TextVocab.pfQuery + "> (<" + RDFS.label.getURI() + "> \"test\") } }",
ds);
+//                                    "select * where { graph <http://example.org/graph>
{ ?s <" + RDFS.label.getURI() + "> \"test\" } }", ds);
+                            ResultSet rs = qExec.execSelect();
+                            assertFalse(rs.hasNext());
+                            dsg.commit();
+                        }
+                        finally {
+                            dsg.end();
+                        }
+                        
+                        try {
+                            Thread.sleep(10);
+                        }
+                        catch (InterruptedException e) {
+                            break;
+                        }
+                    }
+                }
+            }));
+        }
+        
+        // Give the read threads a chance to start up
+        Thread.sleep(500);
+        dsg.begin(ReadWrite.WRITE);
+        try {
+            dsg.add(NodeFactory.createURI("http://example.org/graph"), NodeFactory.createURI("http://example.org/test"),
RDFS.label.asNode(), NodeFactory.createLiteral("test"));
+            
+            // Now give the read threads a chance to note the change
+            Thread.sleep(500);
+            
+            // Don't commit this change
+        }
+        finally {
+            dsg.end();
+        }
+        // Just in case dsg.end() inappropriately commits the change
+        Thread.sleep(500);
+        
+        execService.shutdownNow();
+        execService.awaitTermination(1000, TimeUnit.MILLISECONDS);
+        for(Future<?> f : futures) {
+            assertTrue(f.get() == null);
+        }
+    }
+    
 }


Mime
View raw message