jena-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From casta...@apache.org
Subject svn commit: r1159590 - /incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystemMultiDatasets.java
Date Fri, 19 Aug 2011 11:12:25 GMT
Author: castagna
Date: Fri Aug 19 11:12:25 2011
New Revision: 1159590

URL: http://svn.apache.org/viewvc?rev=1159590&view=rev
Log:
This is temporary, to help investigating JENA-91, JENA-96 and JENA-97. It duplicates a lot
of code from TestTransSystem, we can remove it later.

Added:
    incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystemMultiDatasets.java
  (with props)

Added: incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystemMultiDatasets.java
URL: http://svn.apache.org/viewvc/incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystemMultiDatasets.java?rev=1159590&view=auto
==============================================================================
--- incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystemMultiDatasets.java
(added)
+++ incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystemMultiDatasets.java
Fri Aug 19 11:12:25 2011
@@ -0,0 +1,408 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.hp.hpl.jena.tdb.transaction;
+
+import static com.hp.hpl.jena.tdb.transaction.TransTestLib.count ;
+import static java.lang.String.format ;
+
+import java.io.File ;
+import java.util.ArrayList ;
+import java.util.Random ;
+import java.util.concurrent.Callable ;
+import java.util.concurrent.ExecutorService ;
+import java.util.concurrent.Executors ;
+import java.util.concurrent.TimeUnit ;
+import java.util.concurrent.atomic.AtomicInteger ;
+
+import org.junit.AfterClass ;
+import org.junit.BeforeClass ;
+import org.openjena.atlas.lib.FileOps ;
+import org.openjena.atlas.lib.Lib ;
+import org.openjena.atlas.lib.RandomLib ;
+import org.slf4j.Logger ;
+import org.slf4j.LoggerFactory ;
+
+import com.hp.hpl.jena.datatypes.xsd.XSDDatatype ;
+import com.hp.hpl.jena.graph.Node ;
+import com.hp.hpl.jena.sparql.core.Quad ;
+import com.hp.hpl.jena.sparql.sse.SSE ;
+import com.hp.hpl.jena.tdb.ConfigTest ;
+import com.hp.hpl.jena.tdb.DatasetGraphTxn ;
+import com.hp.hpl.jena.tdb.ReadWrite ;
+import com.hp.hpl.jena.tdb.StoreConnection ;
+import com.hp.hpl.jena.tdb.base.block.FileMode ;
+import com.hp.hpl.jena.tdb.base.file.Location ;
+import com.hp.hpl.jena.tdb.sys.SystemTDB ;
+
+/** System testing using multiple datasets of the transactions. */
+public class TestTransSystemMultiDatasets
+{
+    // Use this to flip between FileMode.direct and FileMode.mapped
+    static { SystemTDB.setFileMode(FileMode.mapped) ; }
+    static { org.openjena.atlas.logging.Log.setLog4j() ; }
+    private static Logger log = LoggerFactory.getLogger(TestTransSystemMultiDatasets.class)
;
+
+    static boolean MEM = false ;
+    
+    static final int NUM_DATASETS = 3 ;
+    static final ArrayList<Location> LOCATIONS = new ArrayList<Location>() ;

+    
+    static {
+    	for ( int i = 0; i < NUM_DATASETS; i++ ) 
+    		LOCATIONS.add(createLocation()) ;
+    }
+
+    private static int count_datasets = 0 ;
+    static Location createLocation() {
+    	return MEM ? Location.mem() : new Location(ConfigTest.getTestingDirDB() + File.separator
+ "DB-" + ++count_datasets) ;
+    }
+
+    static final int Iterations             = MEM ? 1000 : 100 ;
+    // Output style.
+    static boolean inlineProgress           = true ; // (! log.isDebugEnabled()) &&
Iterations > 20 ;
+    static boolean logging                  = ! inlineProgress ; // (! log.isDebugEnabled())
&& Iterations > 20 ;
+    
+    static final int numReaderTasks         = 10 ;
+    static final int numWriterTasksA        = 10 ;
+    static final int numWriterTasksC        = 10 ;
+
+    static final int readerSeqRepeats       = 8 ; 
+    static final int readerMaxPause         = 50 ;
+
+    static final int writerAbortSeqRepeats  = 4 ;
+    static final int writerCommitSeqRepeats = 4 ;
+    static final int writerMaxPause         = 25 ;
+
+    
+    public static void main(String...args)
+    {
+        if ( logging )
+            log.info("START ("+ (MEM?"memory":"disk") + ", {} iterations)", Iterations) ;
+        else
+            printf("START (%s, %d iterations)\n", (MEM?"memory":"disk"), Iterations) ;
+        
+        int N = (Iterations < 10) ? 1 : Iterations / 10 ;
+        N = Math.min(N, 100) ;
+        int i ;
+        
+        for ( i = 0 ; i < Iterations ; i++ )
+        {
+            clean() ;
+            
+            if (!inlineProgress && logging)
+                log.info(format("Iteration: %d\n", i)) ;
+            if ( inlineProgress )
+            {
+                if ( i%N == 0 )
+                    printf("%03d: ",i) ;
+                printf(".") ;
+                if ( i%N == (N-1) )
+                    println() ;
+            }
+            new TestTransSystemMultiDatasets().manyReaderAndOneWriter() ;
+        }
+        if ( inlineProgress )
+        {
+            if ( i%N != 0 )
+                System.out.println() ;
+            println() ;
+            printf("DONE (%03d)\n",i) ;
+        }
+        if (logging)
+            log.info("FINISH ({})", i) ;
+        else
+            printf("FINISH") ;
+    }
+    
+    private static void clean()
+    {
+    	for ( Location location : LOCATIONS ) {
+            StoreConnection.release(location) ;
+            if ( ! location.isMem() )
+                FileOps.clearDirectory(location.getDirectoryPath()) ;			
+		}
+    }
+
+    static class Reader implements Callable<Object>
+    {
+        private final int repeats ;
+        private final int maxpause ;
+        private final TestTransSystemMultiDatasets tts ; 
+    
+        Reader(TestTransSystemMultiDatasets tts, int numSeqRepeats, int pause)
+        {
+            this.repeats = numSeqRepeats ;
+            this.maxpause = pause ;
+            this.tts = tts ;
+        }
+    
+        @Override
+        public Object call()
+        {
+        	StoreConnection sConn = tts.getStoreConnection() ;
+            DatasetGraphTxn dsg = null ;
+            try
+            {
+                int id = gen.incrementAndGet() ;
+                for (int i = 0; i < repeats; i++)
+                {
+                    dsg = sConn.begin(ReadWrite.READ) ;
+                    log.debug("reader start " + id + "/" + i) ;
+
+                    int x1 = count("SELECT * { ?s ?p ?o }", dsg) ;
+                    pause(maxpause) ;
+                    int x2 = count("SELECT * { ?s ?p ?o }", dsg) ;
+                    if (x1 != x2) log.warn(format("READER: %s Change seen: %d/%d : id=%d:
i=%d",
+                                                  dsg.getTransaction().getLabel(), x1, x2,
id, i)) ;
+                    log.debug("reader finish " + id + "/" + i) ;
+                    dsg.close() ;
+                    dsg = null ;
+                }
+                return null ;
+            } catch (RuntimeException ex)
+            {
+                System.err.println(ex.getMessage()) ;
+                if ( dsg != null )
+                {
+                    dsg.abort() ;
+                    dsg.close() ;
+                    dsg = null ;
+                }
+                return null ;
+            }
+        }
+    }
+
+    static abstract class Writer implements Callable<Object>
+    {
+        private final int repeats ;
+        private final int maxpause ;
+        private final TestTransSystemMultiDatasets tts ;
+        private final boolean commit ; 
+    
+        protected Writer(TestTransSystemMultiDatasets tts, int numSeqRepeats, int pause,
boolean commit)
+        {
+            this.repeats = numSeqRepeats ;
+            this.maxpause = pause ;
+            this.tts = tts ;
+            this.commit = commit ;
+        }
+        
+        @Override
+        public Object call()
+        {
+        	StoreConnection sConn = tts.getStoreConnection() ;
+            DatasetGraphTxn dsg = null ;
+            try { 
+                int id = gen.incrementAndGet() ;
+                for ( int i = 0 ; i < repeats ; i++ )
+                {
+                    log.debug("writer start "+id+"/"+i) ;                
+                    dsg = sConn.begin(ReadWrite.WRITE) ;
+
+                    int x1 = count("SELECT * { ?s ?p ?o }", dsg) ;
+                    int z = change(dsg, id, i) ;
+                    pause(maxpause) ;
+                    int x2 = count("SELECT * { ?s ?p ?o }", dsg) ;
+                    if ( x1+z != x2 )
+                    {
+                        TransactionManager txnMgr = dsg.getTransaction().getTxnMgr() ;
+                        SysTxnState state = txnMgr.state() ;
+                        String label = dsg.getTransaction().getLabel() ; 
+                        log.warn(format("WRITER: %s Change seen: %d + %d != %d : id=%d: i=%d",
label, x1, z, x2, id, i)) ;
+                        log.warn(state.toString()) ;
+                        dsg.abort() ;
+                        dsg.close() ;
+                        dsg = null ;
+                        return null ;
+                    }
+                    if (commit) 
+                        dsg.commit() ;
+                    else
+                        dsg.abort() ;
+                    SysTxnState state = sConn.getTransMgrState() ;
+                    log.debug(state.toString()) ;
+                    log.debug("writer finish "+id+"/"+i) ;                
+                    dsg.close() ;
+                    dsg = null ;
+                }
+                return null ;
+            }
+            catch (RuntimeException ex)
+            { 
+                System.err.println(ex.getMessage()) ;
+                if ( dsg != null )
+                {
+                    dsg.abort() ;
+                    dsg.close() ;
+                    dsg = null ;
+                }
+                return null ;
+            }
+        }
+    
+        // return the delta.
+        protected abstract int change(DatasetGraphTxn dsg, int id, int i) ;
+    }
+
+    @BeforeClass 
+    public static void beforeClass()
+    {
+    	for ( Location location : LOCATIONS ) {
+            if ( ! location.isMem() )
+                FileOps.clearDirectory(location.getDirectoryPath()) ;    		
+    	}
+        StoreConnection.reset() ;
+    }
+
+    @AfterClass 
+    public static void afterClass() {}
+
+    private StoreConnection sConn ;
+    private static Random random = new Random(System.currentTimeMillis()) ;
+    protected synchronized StoreConnection getStoreConnection()
+    {
+        StoreConnection sConn = StoreConnection.make(LOCATIONS.get(random.nextInt(NUM_DATASETS)))
;
+        //sConn.getTransMgr().recording(true) ;
+        return sConn ;
+    }
+    
+    public TestTransSystemMultiDatasets() {}
+        
+    //@Test
+    public void manyRead()
+    {
+        final StoreConnection sConn = getStoreConnection() ;
+        Callable<?> proc = new Reader(this, 50, 200)  ;        // Number of repeats,
max pause
+            
+        for ( int i = 0 ; i < 5 ; i++ )
+            execService.submit(proc) ;
+        try
+        {
+            execService.shutdown() ;
+            execService.awaitTermination(100, TimeUnit.SECONDS) ;
+        } catch (InterruptedException e)
+        {
+            e.printStackTrace();
+        }
+    }
+    
+    //@Test
+    public void manyReaderAndOneWriter()
+    {
+        Callable<?> procR = new Reader(this, readerSeqRepeats, readerMaxPause) ;  
   // Number of repeats, max pause
+        Callable<?> procW_a = new Writer(this, writerAbortSeqRepeats, writerMaxPause,
false)  // Number of repeats, max pause, commit. 
+        {
+            @Override
+            protected int change(DatasetGraphTxn dsg, int id, int i)
+            { return changeProc(dsg, id, i) ; }
+        } ;
+            
+        Callable<?> procW_c = new Writer(this, writerCommitSeqRepeats, writerMaxPause,
true)  // Number of repeats, max pause, commit. 
+        {
+            @Override
+            protected int change(DatasetGraphTxn dsg, int id, int i)
+            { return changeProc(dsg, id, i) ; }
+        } ;
+
+        submit(execService, procR,   numReaderTasks) ;
+        submit(execService, procW_c, numWriterTasksC) ;
+        submit(execService, procW_a, numWriterTasksA) ;
+        
+        try
+        {
+            execService.shutdown() ;
+            execService.awaitTermination(100, TimeUnit.SECONDS) ;
+        } catch (InterruptedException e)
+        {
+            e.printStackTrace();
+        } 
+    }
+
+    private void submit(ExecutorService execService2, Callable<?> proc, int numTasks)
+    {
+        for ( int i = 0 ; i < numTasks ; i++ )
+            execService.submit(proc) ;
+    }
+
+    static int changeProc(DatasetGraphTxn dsg, int id, int i)
+    {
+        int count = 0 ;
+        int maxN = 500 ;
+        int N = RandomLib.qrandom.nextInt(maxN) ;
+        for ( int j = 0 ; j < N; j++ )
+        {
+            Quad q = genQuad(id*maxN+j) ;
+            if ( ! dsg.contains(q) )
+            {
+                dsg.add(q) ;
+                count++ ;
+            }
+        }
+        log.debug("Change = "+dsg.getDefaultGraph().size()) ;
+        return count ;
+    }
+    
+    static void pause(int maxInternal)
+    {
+        int x = (int)Math.round(Math.random()*maxInternal) ;
+        Lib.sleep(x) ;
+    }
+    
+    static Quad genQuad(int value)
+    {
+        Quad q1 = SSE.parseQuad("(_ <s> <p> <o>)") ;
+        Node g1 = q.getGraph() ;
+        
+        Node g = Quad.defaultGraphNodeGenerated ; // urn:x-arq:DefaultGraphNode
+        Node s = Node.createURI("S") ;
+        Node p = Node.createURI("P") ;
+        Node o = Node.createLiteral(Integer.toString(value), null, XSDDatatype.XSDinteger)
;
+        return new Quad(g,s,p,o) ;
+    }
+
+    private static void println()
+    {
+        printf("\n") ; System.out.flush() ;
+    }
+
+    private static void printf(String string, Object...args)
+    {
+        System.out.printf(string, args) ;
+    }
+
+    private ExecutorService execService = Executors.newCachedThreadPool() ;
+
+    static Quad q  = SSE.parseQuad("(_ <s> <p> <o>) ") ;
+
+    static Quad q1 = SSE.parseQuad("(_ <s> <p> <o1>)") ;
+
+    static Quad q2 = SSE.parseQuad("(_ <s> <p> <o2>)") ;
+
+    static Quad q3 = SSE.parseQuad("(_ <s> <p> <o3>)") ;
+
+    static Quad q4 = SSE.parseQuad("(_ <s> <p> <o4>)") ;
+
+    private static int initCount = -1 ;
+
+    //static final Location LOC = new Location(ConfigTest.getTestingDirDB()) ;
+    static final AtomicInteger gen = new AtomicInteger() ;
+    
+}
+

Propchange: incubator/jena/Experimental/TxTDB/trunk/src/test/java/com/hp/hpl/jena/tdb/transaction/TestTransSystemMultiDatasets.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message