jena-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [2/2] jena git commit: JENA-1191: sdbload for bulk loading quads with optional progress reports
Date Thu, 09 Jun 2016 15:54:46 GMT
JENA-1191: sdbload for bulk loading quads with optional progress reports


Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/7f2dfed0
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/7f2dfed0
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/7f2dfed0

Branch: refs/heads/master
Commit: 7f2dfed0ab2a12907cb67c76b37ed6d2cd12d6c2
Parents: a1b4c33
Author: Andy Seaborne <andy@apache.org>
Authored: Thu Jun 9 16:54:34 2016 +0100
Committer: Andy Seaborne <andy@apache.org>
Committed: Thu Jun 9 16:54:34 2016 +0100

----------------------------------------------------------------------
 jena-sdb/src/main/java/sdb/sdbload.java | 259 ++++++++++-----------------
 1 file changed, 95 insertions(+), 164 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/jena/blob/7f2dfed0/jena-sdb/src/main/java/sdb/sdbload.java
----------------------------------------------------------------------
diff --git a/jena-sdb/src/main/java/sdb/sdbload.java b/jena-sdb/src/main/java/sdb/sdbload.java
index 8853c7e..23d3901 100644
--- a/jena-sdb/src/main/java/sdb/sdbload.java
+++ b/jena-sdb/src/main/java/sdb/sdbload.java
@@ -18,24 +18,31 @@
 
 package sdb;
 
-import java.util.Iterator ;
 import java.util.List ;
 
 import jena.cmd.ArgDecl;
 import jena.cmd.CmdException ;
 import org.apache.jena.atlas.lib.Lib ;
-import org.apache.jena.atlas.lib.Timer ;
+import org.apache.jena.atlas.lib.ProgressMonitor ;
 import org.apache.jena.graph.Graph ;
-import org.apache.jena.graph.GraphListener ;
+import org.apache.jena.graph.Node ;
+import org.apache.jena.graph.NodeFactory ;
 import org.apache.jena.graph.Triple ;
 import org.apache.jena.query.Dataset ;
 import org.apache.jena.rdf.model.Model ;
 import org.apache.jena.riot.Lang ;
 import org.apache.jena.riot.RDFDataMgr ;
 import org.apache.jena.riot.RDFLanguages ;
+import org.apache.jena.riot.system.ProgressStreamRDF ;
+import org.apache.jena.riot.system.StreamRDF ;
+import org.apache.jena.riot.system.StreamRDFLib ;
 import org.apache.jena.sdb.SDB ;
 import org.apache.jena.sdb.SDBFactory ;
+import org.apache.jena.sdb.Store ;
 import org.apache.jena.sdb.store.StoreBaseHSQL ;
+import org.apache.jena.sdb.store.StoreLoaderPlus ;
+import org.apache.jena.sparql.core.DatasetGraph ;
+import org.apache.jena.sparql.core.Quad ;
 import sdb.cmd.CmdArgsDB ;
 import sdb.cmd.ModGraph ;
  
@@ -49,216 +56,140 @@ import sdb.cmd.ModGraph ;
   *  </p>
   */ 
  
-public class sdbload extends CmdArgsDB
-{
-    private static final String usage = "sdbload --sdb <SPEC> [--graph=IRI] file" ;
-    
-    private static ModGraph modGraph = new ModGraph() ;
-    private static ArgDecl argDeclTruncate = new ArgDecl(false, "truncate") ;
-    private static ArgDecl argDeclReplace = new ArgDecl(false, "replace") ;
-    
-    public static void main(String... argv)
-    {
-        SDB.init();
+public class sdbload extends CmdArgsDB {
+    private static final String usage           = "sdbload --sdb <SPEC> [--graph=IRI]
file" ;
+
+    private static ModGraph     modGraph        = new ModGraph() ;
+    private static ArgDecl      argDeclTruncate = new ArgDecl(false, "truncate") ;
+    private static ArgDecl      argDeclReplace  = new ArgDecl(false, "replace") ;
+
+    public static void main(String... argv) {
+        SDB.init() ;
         new sdbload(argv).mainRun() ;
     }
-    
+
     String filename = null ;
 
-    public sdbload(String... args)
-    {
-        super(args);
+    public sdbload(String... args) {
+        super(args) ;
         addModule(modGraph) ;
         add(argDeclTruncate) ;
         add(argDeclReplace) ;
     }
 
     @Override
-    protected String getCommandName() { return Lib.className(this) ; }
-    
+    protected String getCommandName() {
+        return Lib.className(this) ;
+    }
+
     @Override
-    protected String getSummary()  { return getCommandName()+" <SPEC> [--graph IRI]
file ..."; }
-    
+    protected String getSummary() {
+        return getCommandName() + " <SPEC> [--graph IRI] file ..." ;
+    }
+
     @Override
-    protected void processModulesAndArgs()
-    {
+    protected void processModulesAndArgs() {
         if ( getNumPositional() == 0 )
             cmdError("Need filenames of RDF data to load", true) ;
     }
-    
+
     @Override
-    protected void execCmd(List<String> args)
-    {
-        if ( contains(argDeclTruncate) ) 
+    protected void execCmd(List<String> args) {
+        if ( contains(argDeclTruncate) )
             getStore().getTableFormatter().truncate() ;
         for ( String x : args )
             loadOne(x, contains(argDeclReplace)) ;
         StoreBaseHSQL.close(getStore()) ;
     }
     
-    private void loadOne(String filename, boolean replace)
-    {
-        Monitor monitor = null ;
-        
+    private static ProgressMonitor.Output output = (fmt, args)-> {
+        System.out.printf(fmt, args) ;
+        if ( ! fmt.endsWith("\n") )
+            System.out.println() ;
+    } ;
+
+    private void loadOne(String filename, boolean replace) {
         Model model = null ;
         Dataset dataset = null ;
         Lang lang = RDFLanguages.filenameToLang(filename) ;
         if ( lang == null )
-            throw new CmdException("Data syntax not recognized: "+filename) ;
-        
+            throw new CmdException("Data syntax not recognized: " + filename) ;
+
         if ( modGraph.getGraphName() != null )
             model = modGraph.getModel(getStore()) ;
         else
             dataset = SDBFactory.connectDataset(getStore()) ;
-        
+
         // For monitoring only.
-        Graph monitorGraph = (model==null) ? null : model.getGraph() ;
-        
-        if ( isVerbose() && replace )
-            System.out.println("Emptying: "+filename) ;
-        if (replace) {
-            if ( model != null ) 
-                model.removeAll();
-            else
-                dataset.asDatasetGraph().clear();
-        }
+        Graph monitorGraph = (model == null) ? null : model.getGraph() ;
 
-        if ( isVerbose() || getModTime().timingEnabled() )
-            System.out.println("Start load: "+filename) ;
-        if ( getModTime().timingEnabled() ) {
-            if ( monitorGraph == null ) {
-                // This old monitor code only works for graphs.  
-                // See TDB for a better way using org.apache.jena.atlas.lib.ProgressMonitor
-                throw new CmdException("Timing only implemented for graphs, not whole datasets")
;
-            }
-            monitor = new Monitor(getStore().getLoader().getChunkSize(), isVerbose()) ;
-            monitorGraph.getEventManager().register(monitor) ;
+        if ( replace ) {
+            if ( model != null )
+                model.removeAll() ;
+            else
+                dataset.asDatasetGraph().clear() ;
         }
-
-        // Crude but convenient
-        if ( filename.indexOf(':') == -1 )
-            filename = "file:"+filename ;
-
-        
-        // Always time, only print if enabled. 
-        getModTime().startTimer() ;
         
-        // Load here
-        if ( model == null )
-            RDFDataMgr.read(dataset, filename, lang);
-        else
-            RDFDataMgr.read(model, filename, lang);
+        boolean showProgress = isVerbose() || getModTime().timingEnabled() ;
 
-        long timeMilli = getModTime().endTimer() ;
-            
-        if ( monitor != null )
-        {
-            System.out.println("Added "+monitor.addCount+" triples") ; 
+        if ( showProgress )
+            output.print("Start load: %s", filename) ;
         
-            if ( getModTime().timingEnabled() && !isQuiet() )
-                System.out.printf("Loaded in %.3f seconds [%d triples/s]\n", 
-                                  timeMilli/1000.0, (1000*monitor.addCount/timeMilli)) ;
-            monitorGraph.getEventManager().unregister(monitor) ;
+        StreamRDF stream = streamToStore(dataset.asDatasetGraph(), getStore()) ;
+        if ( modGraph.getGraphName() != null ) {
+            Node gn = NodeFactory.createURI(modGraph.getGraphName()) ;
+            stream = StreamRDFLib.extendTriplesToQuads(gn, stream) ;
         }
-    }
-        
-    static class Monitor implements GraphListener
-    {
-        int addNotePoint ;
-        long addCount = 0 ;
-        int outputCount = 0 ;
         
-        private Timer timer = null ;
-		private long lastTime = 0 ;
-        private boolean displayMemory = false ; 
-            
-        Monitor(int addNotePoint, boolean displayMemory)
-        {
-            this.addNotePoint = addNotePoint ;
-            this.displayMemory = displayMemory ;
-            this.timer = new Timer() ;
-            this.timer.startTimer() ;
+        ProgressMonitor progress = null ;
+        if ( showProgress ) {
+            progress = new ProgressMonitor(filename, 100_000, 10, output) ;
+            stream = new ProgressStreamRDF(stream, progress) ;
         }
         
+        if ( progress != null )
+            progress.start(); 
         
-        @Override
-        public void notifyAddTriple(Graph g, Triple t) { addEvent(t) ; }
-
-        @Override
-        public void notifyAddArray(Graph g, Triple[] triples)
-        { 
-            for ( Triple t : triples )
-                addEvent(t) ;
-        }
-
-        @Override
-        public void notifyAddList(Graph g, List<Triple> triples) 
-        { 
-            notifyAddIterator(g, triples.iterator()) ;
-        }
-
-        @Override
-        public void notifyAddIterator(Graph g, Iterator<Triple> it)
-        {
-            for ( ; it.hasNext() ; )
-                addEvent(it.next()) ;
+        // Load!
+        RDFDataMgr.parse(stream, filename, lang) ;
+        
+        if ( progress != null ) {
+            progress.finish() ; 
+            progress.finishMessage();
         }
+    }
 
-        @Override
-        public void notifyAddGraph(Graph g, Graph added)
-        {}
-
-        @Override
-        public void notifyDeleteTriple(Graph g, Triple t)
-        {}
+    private StreamRDF streamToStore(DatasetGraph dsg, Store store) {
+        StoreLoaderPlus sl = (StoreLoaderPlus)store.getLoader() ;
+        return new StreamRDF() {
 
-        @Override
-        public void notifyDeleteList(Graph g, List<Triple> L)
-        {}
+            @Override
+            public void start() {
+                sl.startBulkUpdate();
+            }
 
-        @Override
-        public void notifyDeleteArray(Graph g, Triple[] triples)
-        {}
+            @Override
+            public void triple(Triple triple) {
+                sl.addTriple(triple);
+            }
 
-        @Override
-        public void notifyDeleteIterator(Graph g, Iterator<Triple> it)
-        {}
+            @Override
+            public void quad(Quad quad) {
+                sl.addQuad(quad.getGraph(), quad.getSubject(), quad.getPredicate(), quad.getObject())
;
+            }
 
-        @Override
-        public void notifyDeleteGraph(Graph g, Graph removed)
-        {}
+            @Override
+            public void base(String base) {}
 
-        @Override
-        public void notifyEvent(Graph source, Object value)
-        {}
+            @Override
+            public void prefix(String prefix, String iri) {
+                dsg.getDefaultGraph().getPrefixMapping().setNsPrefix(prefix, iri) ;
+            }
 
-        private void addEvent(Triple t)
-        {
-            addCount++ ;
-            if ( addNotePoint > 0 && (addCount%addNotePoint) == 0 )
-            {
-                outputCount++ ;
-                long soFar = timer.readTimer() ;
-                long thisTime = soFar - lastTime ;
-                
-                // *1000L is milli to second conversion
-                //   addNotePoint/ (thisTime/1000L)
-                long tpsBatch = (addNotePoint * 1000L) / thisTime;
-                long tpsAvg = (addCount * 1000L) / soFar;
-                
-                String msg = String.format("Add: %,d triples  (Batch: %d / Run: %d)", addCount,
tpsBatch, tpsAvg) ;
-                if ( displayMemory )
-                {
-                  long mem = Runtime.getRuntime().totalMemory() ;
-                  long free = Runtime.getRuntime().freeMemory() ;
-                  msg = msg+String.format("   [M:%,d/F:%,d]", mem,free) ;
-                }
-                System.out.println(msg) ;
-                if ( outputCount > 0 && (outputCount%10) == 0 )
-                    System.out.printf("  Elapsed: %.1f seconds\n", (soFar/1000F)) ;
-                lastTime = soFar ;
+            @Override
+            public void finish() {
+                sl.finishBulkUpdate();
             }
-        }
-        
+        } ;
     }
 }


Mime
View raw message