jena-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [3/3] jena git commit: JENA-1552: Phased loader
Date Wed, 06 Jun 2018 11:03:28 GMT
JENA-1552: Phased loader


Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/2934c550
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/2934c550
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/2934c550

Branch: refs/heads/master
Commit: 2934c5506f9caa237868fbbb5aabf247106ec16b
Parents: 205fac6
Author: Andy Seaborne <andy@apache.org>
Authored: Fri Jun 1 14:07:55 2018 +0100
Committer: Andy Seaborne <andy@apache.org>
Committed: Tue Jun 5 21:29:44 2018 +0100

----------------------------------------------------------------------
 jena-cmds/src/main/java/tdb2/tdbloader.java     |  23 +-
 .../org/apache/jena/tdb2/loader/DataLoader.java |  15 +-
 .../apache/jena/tdb2/loader/LoaderFactory.java  |  54 ++-
 .../jena/tdb2/loader/base/BulkStartFinish.java  |  25 ++
 .../org/apache/jena/tdb2/loader/base/CoLib.java |  54 +++
 .../jena/tdb2/loader/base/ProgressMonitor.java  |   7 +-
 .../jena/tdb2/loader/main/BulkProcesses.java    |  36 ++
 .../jena/tdb2/loader/main/DataBatcher.java      | 174 ++++++++++
 .../apache/jena/tdb2/loader/main/DataBlock.java |  37 +++
 .../jena/tdb2/loader/main/DataToTuples.java     | 207 ++++++++++++
 .../tdb2/loader/main/DataToTuplesInline.java    | 206 ++++++++++++
 .../loader/main/DataToTuplesInlineSingle.java   | 160 +++++++++
 .../jena/tdb2/loader/main/Destination.java      |  27 ++
 .../apache/jena/tdb2/loader/main/Indexer.java   | 140 ++++++++
 .../jena/tdb2/loader/main/IndexerInline.java    |  70 ++++
 .../jena/tdb2/loader/main/InputStage.java       |  39 +++
 .../jena/tdb2/loader/main/LoaderConst.java      |  41 +++
 .../jena/tdb2/loader/main/LoaderMain.java       | 330 +++++++++++++++++++
 .../jena/tdb2/loader/main/LoaderParallel.java   |  34 ++
 .../jena/tdb2/loader/main/LoaderPhased.java     |  34 ++
 .../jena/tdb2/loader/main/LoaderPlan.java       |  57 ++++
 .../jena/tdb2/loader/main/LoaderPlans.java      | 100 ++++++
 .../apache/jena/tdb2/loader/main/PhasedOps.java | 188 +++++++++++
 .../jena/tdb2/loader/main/PrefixHandler.java    |  70 ++++
 .../tdb2/loader/parallel/BulkProcesses.java     |  35 --
 .../tdb2/loader/parallel/BulkStartFinish.java   |  25 --
 .../apache/jena/tdb2/loader/parallel/CoLib.java |  56 ----
 .../jena/tdb2/loader/parallel/DataBatcher.java  | 173 ----------
 .../jena/tdb2/loader/parallel/DataBlock.java    |  37 ---
 .../jena/tdb2/loader/parallel/DataToTuples.java | 205 ------------
 .../loader/parallel/DataToTuplesInline.java     | 199 -----------
 .../jena/tdb2/loader/parallel/Destination.java  |  27 --
 .../jena/tdb2/loader/parallel/Indexer.java      | 138 --------
 .../jena/tdb2/loader/parallel/LoaderConst.java  |  41 ---
 .../tdb2/loader/parallel/LoaderParallel.java    | 159 ---------
 .../tdb2/loader/parallel/PrefixHandler.java     |  68 ----
 .../sequential/BuilderSecondaryIndexes.java     |  28 +-
 .../BuilderSecondaryIndexesSequential.java      |  57 ----
 .../loader/sequential/LoaderNodeTupleTable.java |  28 +-
 .../jena/tdb2/loader/AbstractTestLoader.java    | 226 +++++++++++++
 .../org/apache/jena/tdb2/loader/TS_Loader.java  |   3 +-
 .../org/apache/jena/tdb2/loader/TestLoader.java | 245 --------------
 .../apache/jena/tdb2/loader/TestLoaderMain.java |  58 ++++
 .../apache/jena/tdb2/loader/TestLoaderStd.java  |  55 ++++
 44 files changed, 2484 insertions(+), 1507 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/jena/blob/2934c550/jena-cmds/src/main/java/tdb2/tdbloader.java
----------------------------------------------------------------------
diff --git a/jena-cmds/src/main/java/tdb2/tdbloader.java b/jena-cmds/src/main/java/tdb2/tdbloader.java
index 6d057e7..52e541a 100644
--- a/jena-cmds/src/main/java/tdb2/tdbloader.java
+++ b/jena-cmds/src/main/java/tdb2/tdbloader.java
@@ -31,6 +31,7 @@ import org.apache.jena.query.ARQ;
 import org.apache.jena.riot.Lang;
 import org.apache.jena.riot.RDFLanguages;
 import org.apache.jena.sparql.core.DatasetGraph;
+import org.apache.jena.system.Txn;
 import org.apache.jena.tdb2.loader.DataLoader;
 import org.apache.jena.tdb2.loader.LoaderFactory;
 import org.apache.jena.tdb2.loader.base.LoaderOps;
@@ -42,7 +43,7 @@ public class tdbloader extends CmdTDBGraph {
     private static final ArgDecl argStats = new ArgDecl(ArgDecl.HasValue,  "stats");
     private static final ArgDecl argLoader = new ArgDecl(ArgDecl.HasValue, "loader");
     
-    private enum LoaderEnum { Basic, Parallel, Sequential }
+    private enum LoaderEnum { Basic, Parallel, Sequential, Phased }
     
     private boolean showProgress = true;
     private boolean generateStats = false;
@@ -56,7 +57,7 @@ public class tdbloader extends CmdTDBGraph {
     protected tdbloader(String[] argv) {
         super(argv);
 //        super.add(argStats, "Generate statistics");
-        super.add(argLoader, "--loader", "Loader to use");
+        super.add(argLoader, "--loader=", "Loader to use: 'basic', 'phased' (default), 'sequential' or 'parallel'");
     }
 
     @Override
@@ -67,6 +68,8 @@ public class tdbloader extends CmdTDBGraph {
             String loadername = getValue(argLoader).toLowerCase();
             if ( loadername.matches("basic.*") )
                 loader = LoaderEnum.Basic;
+            else if ( loadername.matches("phas.*") )
+                loader = LoaderEnum.Phased;
             else if ( loadername.matches("seq.*") )
                 loader = LoaderEnum.Sequential;
             else if ( loadername.matches("para.*") )
@@ -84,7 +87,7 @@ public class tdbloader extends CmdTDBGraph {
 
     @Override
     protected String getSummary() {
-        return getCommandName() + " [--desc DATASET | --loc DIR] FILE ...";
+        return getCommandName() + "--loader= [--desc DATASET | --loc DIR] FILE ...";
     }
 
     @Override
@@ -146,9 +149,15 @@ public class tdbloader extends CmdTDBGraph {
         if ( graphName != null )
             gn = NodeFactory.createURI(graphName);
         
-        LoaderEnum useLoader = loader; 
-        if ( useLoader == null )
-            useLoader = LoaderEnum.Parallel;
+        LoaderEnum useLoader = loader;
+        if ( useLoader == null ) {
+            // Default choice - phased if empty. basic if not.  
+            boolean isEmpty = Txn.calculateRead(dsg, ()->dsg.isEmpty());
+            if ( isEmpty )
+                useLoader = LoaderEnum.Phased;
+            else
+                useLoader = LoaderEnum.Basic;
+        }
         
         MonitorOutput output = isQuiet() ? LoaderOps.nullOutput() : LoaderOps.outputToLog();
         DataLoader loader = createLoader(useLoader, dsg, gn, output);
@@ -159,6 +168,8 @@ public class tdbloader extends CmdTDBGraph {
         
     private DataLoader createLoader(LoaderEnum useLoader, DatasetGraph dsg, Node gn, MonitorOutput output) {
         switch(useLoader) {
+            case Phased :
+                return LoaderFactory.phasedLoader(dsg, gn, output);
             case Parallel :
                 return LoaderFactory.parallelLoader(dsg, gn, output);
             case Sequential :

http://git-wip-us.apache.org/repos/asf/jena/blob/2934c550/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/DataLoader.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/DataLoader.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/DataLoader.java
index 152af53..9759d83 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/DataLoader.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/DataLoader.java
@@ -41,17 +41,24 @@ import org.apache.jena.riot.system.StreamRDF;
  * to large datasets and doe sssot max up the hardware so it is suitable for runtime operation at larger scales.
  * 
  * <h4>sequential</h4>
- * A fully transactional laoder that loads the primary indexes then does multipel passes to load the secondary indexes.
+ * A fully transactional loader that loads the primary indexes then does multiple passes to load the secondary indexes.
  * This maximises RAM file system caching effects.
  * It can be useful when hardware is restricted and I/O is slow (disk, not non volatile storage liek SSDs).  
  *
+ * <h4>phased</h4>
+ * The phased loader use some multiple threads to process data and to index the {@code DatasetGraph}.
+ * It proceeds by loading data into theprimaryindexes, then, separately, builds the other indexes.  
+ * Loading is not fully transaction-safe in the presence of persistent
+ * storage problems or a JVM/machine crash when finishing writing.
+ * Otherwise it is transactional.
+ * 
  * <h4>parallel</h4>
  * The parallel loader use multiple threads to process data and to index the {@code DatasetGraph}.  
- * Loading in parallel is not fully transaction-safe in the presence of persistent
+ * Loading is not fully transaction-safe in the presence of persistent
  * storage problems or a JVM/machine crash when finishing writing.
  * Otherwise it is transactional.
- * 
- * Because it uses multi-threads, it can interfer with performance of other applications on the machine it is run on.
+ * Because it uses many threads to write to peristsne storage,
+ * it can interfer with performance of other applications on the machine it is run on.
  * 
  * <h4>{@code DataLoader} API</h4>
  * 

http://git-wip-us.apache.org/repos/asf/jena/blob/2934c550/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/LoaderFactory.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/LoaderFactory.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/LoaderFactory.java
index affb923..d06e3d4 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/LoaderFactory.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/LoaderFactory.java
@@ -26,7 +26,8 @@ import org.apache.jena.sparql.core.DatasetGraph;
 import org.apache.jena.sparql.core.Quad;
 import org.apache.jena.tdb2.loader.base.MonitorOutput;
 import org.apache.jena.tdb2.loader.basic.LoaderBasic;
-import org.apache.jena.tdb2.loader.parallel.LoaderParallel;
+import org.apache.jena.tdb2.loader.main.LoaderParallel;
+import org.apache.jena.tdb2.loader.main.LoaderPhased;
 import org.apache.jena.tdb2.loader.sequential.LoaderSequential;
 
 /** Obtain a {@link DataLoader}.
@@ -124,6 +125,47 @@ public class LoaderFactory {
      * Supply a {@link MonitorOutput} for the desirabled progress and summary output messages
      * or {@code null} for no output.
      */
+    
+    public static DataLoader phasedLoader(DatasetGraph dsg, MonitorOutput output) {
+        Objects.requireNonNull(dsg);
+        return new LoaderPhased(dsg, null, output);
+    }
+
+    /** 
+     * A phased loader to load a single graph in the destination {@code DatasetGraph}.
+     * See {@link #phasedLoader(DatasetGraph, MonitorOutput)} for loader characteristics.
+     * <p>
+     * Use {@link Quad#defaultGraphIRI} to load the default graph.
+     * <p>
+     * No other graphs in the destination {@code DatasetGraph} are touched. If quads
+     * data is read, default graph data is sent to the destination named graph but all
+     * other quad data is discarded.
+     * <p>
+     * For other behaviours, use {@link #phasedLoader(DatasetGraph, MonitorOutput)} 
+     * and wrap the {@link StreamRDF} from {@link DataLoader#stream()}) with the required
+     * transformation.
+     * 
+     * @see #phasedLoader(DatasetGraph, MonitorOutput)
+     */
+    
+    public static DataLoader phasedLoader(DatasetGraph dsg, Node graphName, MonitorOutput output) {
+        Objects.requireNonNull(dsg);
+        return new LoaderPhased(dsg, graphName, output);
+    }
+
+    /** 
+     * A loader that uses multiple threads to reduce loading time,
+     * and makes parallel writes to persistent storage (disk or SSD).
+     * <p> 
+     * This loader will use all available resources of the machine,
+     * making other actions on the machine unresponsive.
+     * <p>
+     * * The dataset can not be used for other operations - the code will block other transactions
+     * as necessary and release then when loading has finished.
+     * <p>
+     * Supply a {@link MonitorOutput} for the desirabled progress and summary output messages
+     * or {@code null} for no output.
+     */
     public static DataLoader parallelLoader(DatasetGraph dsg, MonitorOutput output) {
         Objects.requireNonNull(dsg);
         return new LoaderParallel(dsg, null, output);
@@ -159,7 +201,7 @@ public class LoaderFactory {
         Objects.requireNonNull(dsg);
         return createDft(dsg, null, output);
     }
-    
+
     /**
      * Return a general purpose loader to load a single graph in the destination {@code DatasetGraph}.
      * Use {@link Quad#defaultGraphIRI} to load the default graph.
@@ -176,10 +218,10 @@ public class LoaderFactory {
         return createDft(dsg, graphName, output);
     }
     
+    // Choice of default loader.
     private static DataLoader createDft(DatasetGraph dsg, Node graphName, MonitorOutput output) {
-        // May be a conservatively tuned LoaderParallel so that it does not
-        // swamp the machine and copes with lower spec hardware while still
-        // providing a reasonable loading rate.  
-        return parallelLoader(dsg, graphName, output);
+        // The LoaderPhased does not swamp the machine and copes with lower spec hardware
+        // while still providing a reasonable loading rate.
+        return phasedLoader(dsg, graphName, output);
     }
 }

http://git-wip-us.apache.org/repos/asf/jena/blob/2934c550/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/BulkStartFinish.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/BulkStartFinish.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/BulkStartFinish.java
new file mode 100644
index 0000000..0b477af
--- /dev/null
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/BulkStartFinish.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.tdb2.loader.base;
+
+public interface BulkStartFinish {
+    public void startBulk();
+    public void finishBulk();
+    
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/2934c550/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/CoLib.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/CoLib.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/CoLib.java
new file mode 100644
index 0000000..216a249
--- /dev/null
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/CoLib.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.tdb2.loader.base;
+
+import org.apache.jena.dboe.base.file.Location;
+import org.apache.jena.dboe.transaction.txn.TransactionCoordinator;
+import org.apache.jena.dboe.transaction.txn.journal.Journal;
+import org.apache.jena.tdb2.store.nodetable.NodeTable;
+import org.apache.jena.tdb2.store.tupletable.TupleIndex;
+
+/** Per-thread TransactionCoordinator helpers. */
+public class CoLib {
+    
+    public static TransactionCoordinator newCoordinator() {
+        Journal journal = Journal.create(Location.mem());
+        return new TransactionCoordinator(journal);
+    }
+    
+    public static void add(TransactionCoordinator coordinator, NodeTable nodeTable) {
+        coordinator.add(LoaderOps.ntDataFile(nodeTable));
+        coordinator.add(LoaderOps.ntBPTree(nodeTable));
+    }
+    
+    public static void add(TransactionCoordinator coordinator, TupleIndex... indexes) {
+        for ( TupleIndex pIdx : indexes ) {
+            coordinator.add(LoaderOps.idxBTree(pIdx));
+        }
+    }
+
+    public static void start(TransactionCoordinator coordinator) {
+        coordinator.start();
+    }
+    
+    public static void finish(TransactionCoordinator coordinator) {
+        // Do not do this - it will shutdown the TransactionComponents as well.
+        //coordinator.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/2934c550/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/ProgressMonitor.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/ProgressMonitor.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/ProgressMonitor.java
index 59eac32..7f114b5 100644
--- a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/ProgressMonitor.java
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/base/ProgressMonitor.java
@@ -33,10 +33,13 @@ public interface ProgressMonitor {
      */
     public void finishMessage(String message);
 
-    /** Start and start timing. TYhis should be pairs with a call to {@link #finish()}. */
+    /** Start and start timing. This should be paired with a call to {@link #finish()}. */
     public void start();
 
-    /** Finish - return time in milliseconds since the {@link #start()} call. */
+    /**
+     * Finish and stop timing. The total time is available with {@link #getTime} and the
+     * numbe rof items processes with {@link #getTicks()}.
+     */
     public void finish();
 
     /** Something happened */

http://git-wip-us.apache.org/repos/asf/jena/blob/2934c550/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/BulkProcesses.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/BulkProcesses.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/BulkProcesses.java
new file mode 100644
index 0000000..4b0bcbe
--- /dev/null
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/BulkProcesses.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.tdb2.loader.main;
+
+import java.util.List;
+
+import org.apache.jena.ext.com.google.common.collect.Lists;
+import org.apache.jena.tdb2.loader.base.BulkStartFinish;
+
+public class BulkProcesses {
+    
+    public static void start(List<BulkStartFinish> list) {
+        list.forEach(x->x.startBulk());
+    }
+    
+    public static void finish(List<BulkStartFinish> list) {
+        Lists.reverse(list).forEach(x->x.finishBulk());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/2934c550/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataBatcher.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataBatcher.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataBatcher.java
new file mode 100644
index 0000000..6f97ed5
--- /dev/null
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataBatcher.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.tdb2.loader.main;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+
+import org.apache.jena.graph.Triple;
+import org.apache.jena.riot.lang.StreamRDFCounting;
+import org.apache.jena.riot.system.StreamRDF;
+import org.apache.jena.sparql.core.Quad;
+import org.apache.jena.tdb2.loader.base.BulkStartFinish;
+import org.apache.jena.tdb2.loader.base.MonitorOutput;
+
+/**
+ * A {@link StreamRDF} that groups triples and quads and dispatches them in batches. This
+ * class is a {@link StreamRDF} and runs on the calling thread; it does not create any
+ * threads.
+ */   
+public class DataBatcher implements StreamRDFCounting, BulkStartFinish {
+    
+    private List<Triple> triples = null;
+    private List<Quad> quads = null;
+    private long countTriples;
+    private long countQuads;
+    private final MonitorOutput output;
+    private  BiConsumer<String, String> prefixHandler;
+    private Consumer<DataBlock> batchDestination;
+    
+    public DataBatcher(Consumer<DataBlock> batchDestination,
+                       BiConsumer<String, String> prefixHandler,
+                       MonitorOutput output) {
+        this(batchDestination, prefixHandler, LoaderParallel.DataTickPoint, LoaderParallel.DataSuperTick, output);
+    }
+    
+    public DataBatcher(Consumer<DataBlock> batchDestination,
+                       BiConsumer<String, String> prefixHandler, 
+                       int tickPoint, int superTick, MonitorOutput output) {
+        this.batchDestination = batchDestination;
+        this.output = output;
+        this.prefixHandler = prefixHandler;
+    }
+    
+    @Override
+    public void startBulk() {}
+
+    @Override
+    public void finishBulk() {
+        DataBlock lastData = null;
+        if ( ! isEmpty(triples) || ! isEmpty(quads) ) {
+            lastData = new DataBlock(triples, quads);
+            dispatch(lastData);
+            triples = null;
+            quads = null;
+        }
+        dispatch(DataBlock.END);
+    }
+    
+    private <X> boolean isEmpty(List<X> list) {
+        return list == null || list.isEmpty() ;
+    }
+    
+    @Override public void start() {}
+
+    @Override public void finish() {}
+
+    @Override public long count()           { return countTriples() + countQuads(); }
+
+    @Override public long countTriples()    { return countTriples; }
+
+    @Override public long countQuads()      { return countQuads; }
+    
+    @Override
+    public void triple(Triple triple) {
+        if ( triples == null )
+            triples = allocChunkTriples();
+        triples.add(triple);
+        countTriples++;
+        maybeDispatch();
+    }
+
+    @Override
+    public void quad(Quad quad) {
+        if ( quad.isTriple() || quad.isDefaultGraph() ) {
+            // Shame about the object creation.
+            triple(quad.asTriple());
+            return;
+        }
+        if ( quads == null )
+            quads = allocChunkQuads();
+        quads.add(quad);
+        countQuads++;
+        maybeDispatch();
+    }
+
+    private void maybeDispatch() {
+        long x = 0;
+        if ( triples != null )
+            x += triples.size();
+        if ( quads != null )
+            x += quads.size();
+        if ( x <= LoaderConst.ChunkSize )
+            return ;
+        
+        DataBlock block = new DataBlock(triples, quads) ;
+        // Dispatch.
+        dispatch(block);
+        triples = null;
+        quads = null;
+    }
+
+    private void dispatch(DataBlock datablock) {
+        batchDestination.accept(datablock);
+    }
+
+    
+//    private void maybeDispatch3() {
+//        if ( triples.size() >= LoaderConst.ChunkSize ) {
+//            dispatchTriples(triples);
+//            triples = null;
+//        }
+//    }
+//    
+//    private void maybeDispatch4() {
+//        if ( quads.size() >= LoaderConst.ChunkSize ) {
+//            dispatchQuads(quads);
+//            quads = null;
+//        }
+//    }
+//
+//    private void dispatchTriples(List<Triple> triples) {
+//        destTriples.deliver(triples);
+//    }
+//
+//    private void dispatchQuads(List<Quad> quads) {
+//        destQuads.deliver(quads);
+//    }
+
+    @Override
+    public void base(String base) {}
+
+    @Override
+    public void prefix(String prefix, String iri) {
+        if ( prefixHandler != null )
+            prefixHandler.accept(prefix, iri);
+    }
+
+    private List<Triple>  allocChunkTriples() {
+        return new ArrayList<>(LoaderConst.ChunkSize); 
+    } 
+
+    private List<Quad>  allocChunkQuads() {
+        return new ArrayList<>(LoaderConst.ChunkSize); 
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/2934c550/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataBlock.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataBlock.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataBlock.java
new file mode 100644
index 0000000..fe966f4
--- /dev/null
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataBlock.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.tdb2.loader.main;
+
+import java.util.List;
+
+import org.apache.jena.graph.Triple;
+import org.apache.jena.sparql.core.Quad;
+
+/** Unit of output from the data batcher */ 
+public class DataBlock {
+    /** Unique end marker object */ 
+    static DataBlock END = new DataBlock(null, null);
+     
+    List<Triple> triples = null;
+    List<Quad> quads = null;
+    DataBlock( List<Triple> triples, List<Quad> quads) {
+        this.triples = triples;
+        this.quads = quads;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/jena/blob/2934c550/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuples.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuples.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuples.java
new file mode 100644
index 0000000..02b163d
--- /dev/null
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuples.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.tdb2.loader.main;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+import org.apache.jena.atlas.lib.tuple.Tuple;
+import org.apache.jena.atlas.lib.tuple.TupleFactory;
+import org.apache.jena.dboe.transaction.txn.Transaction;
+import org.apache.jena.dboe.transaction.txn.TransactionCoordinator;
+import org.apache.jena.graph.Node;
+import org.apache.jena.graph.Triple;
+import org.apache.jena.query.TxnType;
+import org.apache.jena.sparql.core.DatasetPrefixStorage;
+import org.apache.jena.sparql.core.Quad;
+import org.apache.jena.tdb2.loader.BulkLoaderException;
+import org.apache.jena.tdb2.loader.base.BulkStartFinish;
+import org.apache.jena.tdb2.loader.base.CoLib;
+import org.apache.jena.tdb2.loader.base.MonitorOutput;
+import org.apache.jena.tdb2.store.DatasetGraphTDB;
+import org.apache.jena.tdb2.store.NodeId;
+import org.apache.jena.tdb2.store.nodetable.NodeTable;
+
+/** Batch processing of {@link DataBlock}s (triples or Quads) converting them to two output of
+ * to blocks of {@code Tuple<NodeId>}.
+ * <p>
+ * This class runs one task thread.
+ * <p>
+ * Data is deliver into the process by calling the provided functions for {@code Destination<Tuple<NodeId>}.
+ * <p>
+ * Assumes triples and quads share a node table.
+ */ 
+ 
+public class DataToTuples implements BulkStartFinish {
+    private long countTriples;
+    private long countQuads;
+
+    private final Destination<Tuple<NodeId>> dest3;
+    private final Destination<Tuple<NodeId>> dest4;
+    private final DatasetGraphTDB dsgtdb;
+    private final NodeTable nodeTable;
+    private final DatasetPrefixStorage prefixes;
+
+    // Chunk accumulators.
+    private List<Tuple<NodeId>> quads = null;
+    private List<Tuple<NodeId>> triples = null;
+    private final MonitorOutput output;
+    private BlockingQueue<DataBlock> input;
+
+    public DataToTuples(DatasetGraphTDB dsgtdb, Destination<Tuple<NodeId>> tuples3, Destination<Tuple<NodeId>> tuples4, MonitorOutput output) {
+        this.dsgtdb = dsgtdb;
+        this.dest3 = tuples3;
+        this.dest4 = tuples4;
+        this.input = new ArrayBlockingQueue<>(LoaderConst.QueueSizeData);
+        this.nodeTable = dsgtdb.getQuadTable().getNodeTupleTable().getNodeTable();
+        this.prefixes = dsgtdb.getPrefixes();
+        this.output = output;
+        
+        NodeTable nodeTable2 = dsgtdb.getTripleTable().getNodeTupleTable().getNodeTable();
+        if ( nodeTable != nodeTable2 )
+            throw new BulkLoaderException("Different node tables");
+    }
+    
+    private TransactionCoordinator coordinator;
+    private Transaction transaction; 
+    
+    public Consumer<DataBlock> data() {
+        return this::index; 
+    }
+    
+    private void index(DataBlock dataBlock) {
+        try { input.put(dataBlock); }
+        catch (InterruptedException e) {
+            throw new BulkLoaderException("InterruptedException", e);
+        }
+    }
+    
+    @Override
+    public void startBulk() {
+        new Thread(()->action()).start();
+    }
+     
+    @Override
+    public void finishBulk() { }
+
+    // Triples.
+    private void action() {
+        coordinator = CoLib.newCoordinator();
+        CoLib.add(coordinator, nodeTable);
+        CoLib.start(coordinator);
+        transaction = coordinator.begin(TxnType.WRITE);
+
+        try {
+            for (;;) {
+                
+                DataBlock data = input.take();
+                if ( data == DataBlock.END )
+                    break;
+                if ( data.triples != null ) {
+                    List<Tuple<NodeId>> tuples = new ArrayList<>(data.triples.size());
+                    for ( Triple t : data.triples ) {
+                        countTriples++;
+                        accTuples(t, nodeTable, tuples);
+                    }
+                    dispatchTuples3(tuples);
+                }
+                if ( data.quads != null ) {
+                    List<Tuple<NodeId>> tuples = new ArrayList<>(data.quads.size());
+                    for ( Quad q : data.quads ) {
+                        countQuads++;
+                        accTuples(q, nodeTable, tuples);
+                    }
+                    dispatchTuples4(tuples);
+                }
+            }
+            dispatchTuples3(LoaderConst.END_TUPLES);
+            dispatchTuples4(LoaderConst.END_TUPLES);
+            transaction.commit();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            transaction.abort();
+        }
+        transaction.end();
+        CoLib.finish(coordinator);
+    }
+
+    //@Override
+    public long getCountTriples()   { return countTriples; }
+    //@Override
+    public long getCountQuads()     { return countQuads; }
+
+    private void dispatchTuples3(List<Tuple<NodeId>> chunk) {
+        dest3.deliver(chunk);
+    }
+    
+    private void dispatchTuples4(List<Tuple<NodeId>> chunk) {
+        dest4.deliver(chunk);
+    }
+
+    private static void accTuples(Triple triple, NodeTable nodeTable, List<Tuple<NodeId>> acc) {
+        acc.add(nodes(nodeTable, triple));
+    }
+    
+    private static void accTuples(Quad quad, NodeTable nodeTable, List<Tuple<NodeId>> acc) {
+        acc.add(nodes(nodeTable, quad));
+    }
+    
+    // Recycle?
+    private List<Tuple<NodeId>> allocChunkTriples() {
+        return new ArrayList<>(LoaderConst.ChunkSize); 
+    } 
+
+    private List<Tuple<NodeId>> allocChunkQuads() {
+        return new ArrayList<>(LoaderConst.ChunkSize); 
+    }
+
+    private static Tuple<NodeId> nodes(NodeTable nt, Triple triple) {
+        NodeId s = idForNode(nt, triple.getSubject());
+        NodeId p = idForNode(nt, triple.getPredicate());
+        NodeId o = idForNode(nt, triple.getObject());
+        return TupleFactory.tuple(s,p,o);
+    }
+    
+    private Function<List<Quad>, List<Tuple<NodeId>>> quadsToNodeIds(NodeTable nodeTable) {
+        return (List<Quad> quads) -> {
+            List<Tuple<NodeId>> x = new ArrayList<>(quads.size()); 
+            for(Quad quad: quads) {
+                x.add(nodes(nodeTable, quad));
+            }
+            return x;
+        };
+    }
+
+    private static Tuple<NodeId> nodes(NodeTable nt, Quad quad) {
+        NodeId g = idForNode(nt, quad.getGraph());
+        NodeId s = idForNode(nt, quad.getSubject());
+        NodeId p = idForNode(nt, quad.getPredicate());
+        NodeId o = idForNode(nt, quad.getObject());
+        return TupleFactory.tuple(g,s,p,o);
+    }
+    
+    private static final NodeId idForNode(NodeTable nodeTable, Node node) {
+        return nodeTable.getAllocateNodeId(node);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/2934c550/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuplesInline.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuplesInline.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuplesInline.java
new file mode 100644
index 0000000..c6b5075
--- /dev/null
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuplesInline.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.tdb2.loader.main;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.jena.atlas.lib.tuple.Tuple;
+import org.apache.jena.atlas.lib.tuple.TupleFactory;
+import org.apache.jena.dboe.transaction.txn.Transaction;
+import org.apache.jena.dboe.transaction.txn.TransactionCoordinator;
+import org.apache.jena.graph.Node;
+import org.apache.jena.graph.Triple;
+import org.apache.jena.query.TxnType;
+import org.apache.jena.riot.lang.StreamRDFCounting;
+import org.apache.jena.riot.system.StreamRDF;
+import org.apache.jena.sparql.core.DatasetPrefixStorage;
+import org.apache.jena.sparql.core.Quad;
+import org.apache.jena.tdb2.loader.BulkLoaderException;
+import org.apache.jena.tdb2.loader.base.BulkStartFinish;
+import org.apache.jena.tdb2.loader.base.CoLib;
+import org.apache.jena.tdb2.loader.base.MonitorOutput;
+import org.apache.jena.tdb2.store.DatasetGraphTDB;
+import org.apache.jena.tdb2.store.DatasetPrefixesTDB;
+import org.apache.jena.tdb2.store.NodeId;
+import org.apache.jena.tdb2.store.nodetable.NodeTable;
+import org.apache.jena.tdb2.store.nodetupletable.NodeTupleTable;
+
+/** Triple to chunks of Tuples.  
+ *  Same thread version.
+ *  This is a {@link StreamRDF}.
+ *  Also loads prefixes.
+ */ 
+public class DataToTuplesInline implements StreamRDFCounting, BulkStartFinish {
+    public static final int DataTickPoint   = 100_000;
+    public static final int DataSuperTick   = 10;
+    
+    private final Destination<Tuple<NodeId>> dest3;
+    private final Destination<Tuple<NodeId>> dest4;
+    private final DatasetGraphTDB dsgtdb;
+    private final NodeTable nodeTable;
+    private final DatasetPrefixStorage prefixes;
+
+    private final MonitorOutput output;
+    // Chunk accumulators.
+    private long countTriples = 0;
+    private long countQuads = 0;
+    private List<Tuple<NodeId>> quads = null;
+    private List<Tuple<NodeId>> triples = null;
+    // Prefix handler.
+    public DataToTuplesInline(DatasetGraphTDB dsgtdb,
+                              Destination<Tuple<NodeId>> dest3,
+                              Destination<Tuple<NodeId>> dest4, 
+                              MonitorOutput output) {
+        this.dsgtdb = dsgtdb;
+        this.dest3 = dest3;
+        this.dest4 = dest4;
+        this.output = output;
+        this.nodeTable = dsgtdb.getTripleTable().getNodeTupleTable().getNodeTable();
+        this.prefixes = dsgtdb.getPrefixes();
+        NodeTable nodeTable2 = dsgtdb.getQuadTable().getNodeTupleTable().getNodeTable();
+        if ( nodeTable != nodeTable2 )
+            throw new BulkLoaderException("Different node tables");
+    }
+    
+    // StreamRDF
+    private TransactionCoordinator coordinator;
+    private Transaction transaction; 
+    @Override
+    public void startBulk() {
+        coordinator = CoLib.newCoordinator();
+        CoLib.add(coordinator, nodeTable);
+        
+        // Prefixes
+        NodeTupleTable p = ((DatasetPrefixesTDB)prefixes).getNodeTupleTable();
+        CoLib.add(coordinator, p.getNodeTable());
+        CoLib.add(coordinator, p.getTupleTable().getIndexes());
+        CoLib.start(coordinator);
+        transaction = coordinator.begin(TxnType.WRITE);
+    }
+
+    @Override
+    public void finishBulk() {
+        if ( triples != null && ! triples.isEmpty() ) {
+            dispatchTriples(triples);
+            triples = null;
+        }
+        dispatchTriples(LoaderConst.END_TUPLES);
+        if ( quads != null && ! quads.isEmpty() ) {
+            dispatchTriples(quads);
+            quads = null;
+        }
+        dispatchQuads(LoaderConst.END_TUPLES);
+        transaction.commit();
+        transaction.end();
+        CoLib.finish(coordinator);
+    }
+
+    @Override public void start() {}
+
+    @Override public void finish() {}
+
+    @Override public long count()           { return countTriples() + countQuads(); }
+
+    @Override public long countTriples()    { return countTriples; }
+
+    @Override public long countQuads()      { return countQuads; }
+
+    @Override
+    public void triple(Triple triple) {
+        countTriples++;
+        if ( triples == null )
+            triples = allocChunkTriples();
+        accTuples(triple, nodeTable, triples);
+        if ( triples.size() >= LoaderConst.ChunkSize ) {
+            dispatchTriples(triples);
+            triples = null;
+        }
+    }
+
+    @Override
+    public void quad(Quad quad) {
+        if ( quad.isTriple() || quad.isDefaultGraph() ) {
+            triple(quad.asTriple());
+            return;
+        }
+        countQuads++;
+        if ( quads == null )
+            quads = allocChunkQuads();
+        accTuples(quad, nodeTable, quads);
+        if ( quads.size() >= LoaderConst.ChunkSize ) {
+            dispatchQuads(quads);
+            quads = null;
+        }
+    }
+    
+    private void dispatchQuads(List<Tuple<NodeId>> chunk) {
+        dest4.deliver(chunk);
+    }
+
+    private void dispatchTriples(List<Tuple<NodeId>> chunk) {
+        dest3.deliver(chunk);
+    }
+    
+    @Override
+    public void base(String base) {}
+
+    @Override
+    public void prefix(String prefix, String iri) {
+        // Clean constant handling.
+        prefixes.insertPrefix("", prefix, iri);
+    }
+
+    private static void accTuples(Triple triple, NodeTable nodeTable, List<Tuple<NodeId>> acc) {
+        acc.add(nodes(nodeTable, triple));
+    }
+    
+    private static void accTuples(Quad quad, NodeTable nodeTable, List<Tuple<NodeId>> acc) {
+        acc.add(nodes(nodeTable, quad));
+    }
+    
+    // Recycle?
+    private List<Tuple<NodeId>> allocChunkTriples() {
+        return new ArrayList<>(LoaderConst.ChunkSize); 
+    } 
+
+    private List<Tuple<NodeId>> allocChunkQuads() {
+        return new ArrayList<>(LoaderConst.ChunkSize); 
+    }
+
+    private static Tuple<NodeId> nodes(NodeTable nt, Triple triple) {
+        NodeId s = idForNode(nt, triple.getSubject());
+        NodeId p = idForNode(nt, triple.getPredicate());
+        NodeId o = idForNode(nt, triple.getObject());
+        return TupleFactory.tuple(s,p,o);
+    }
+    
+   private static Tuple<NodeId> nodes(NodeTable nt, Quad quad) {
+        NodeId g = idForNode(nt, quad.getGraph());
+        NodeId s = idForNode(nt, quad.getSubject());
+        NodeId p = idForNode(nt, quad.getPredicate());
+        NodeId o = idForNode(nt, quad.getObject());
+        return TupleFactory.tuple(g,s,p,o);
+    }
+    
+    private static final NodeId idForNode(NodeTable nodeTable, Node node) {
+        return nodeTable.getAllocateNodeId(node);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/2934c550/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuplesInlineSingle.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuplesInlineSingle.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuplesInlineSingle.java
new file mode 100644
index 0000000..4ea2226
--- /dev/null
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/DataToTuplesInlineSingle.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.tdb2.loader.main;
+
+import java.util.List;
+import java.util.function.Consumer;
+
+import org.apache.jena.atlas.lib.tuple.Tuple;
+import org.apache.jena.atlas.lib.tuple.TupleFactory;
+import org.apache.jena.dboe.transaction.txn.Transaction;
+import org.apache.jena.dboe.transaction.txn.TransactionCoordinator;
+import org.apache.jena.graph.Node;
+import org.apache.jena.graph.Triple;
+import org.apache.jena.query.TxnType;
+import org.apache.jena.riot.lang.StreamRDFCounting;
+import org.apache.jena.riot.system.StreamRDF;
+import org.apache.jena.sparql.core.DatasetPrefixStorage;
+import org.apache.jena.sparql.core.Quad;
+import org.apache.jena.tdb2.loader.BulkLoaderException;
+import org.apache.jena.tdb2.loader.base.BulkStartFinish;
+import org.apache.jena.tdb2.loader.base.CoLib;
+import org.apache.jena.tdb2.loader.base.MonitorOutput;
+import org.apache.jena.tdb2.store.DatasetGraphTDB;
+import org.apache.jena.tdb2.store.DatasetPrefixesTDB;
+import org.apache.jena.tdb2.store.NodeId;
+import org.apache.jena.tdb2.store.nodetable.NodeTable;
+import org.apache.jena.tdb2.store.nodetupletable.NodeTupleTable;
+
+/** Triple to Tuples, without chunking.  
+ *  Same thread version.
+ *  This is a {@link StreamRDF}.
+ *  Also loads prefixes.
+ */ 
+public class DataToTuplesInlineSingle implements StreamRDFCounting, BulkStartFinish {
+    public static final int DataTickPoint   = 100_000;
+    public static final int DataSuperTick   = 10;
+    
+    private final Consumer<Tuple<NodeId>> dest3;
+    private final Consumer<Tuple<NodeId>> dest4;
+    private final DatasetGraphTDB dsgtdb;
+    private final NodeTable nodeTable;
+    private final DatasetPrefixStorage prefixes;
+
+    private final MonitorOutput output;
+    // Chunk accumulators.
+    private long countTriples = 0;
+    private long countQuads = 0;
+    private List<Tuple<NodeId>> quads = null;
+    private List<Tuple<NodeId>> triples = null;
+
+    public DataToTuplesInlineSingle(DatasetGraphTDB dsgtdb,
+                                    Consumer<Tuple<NodeId>> dest3,
+                                    Consumer<Tuple<NodeId>> dest4, 
+                                    MonitorOutput output) {
+        this.dsgtdb = dsgtdb;
+        this.dest3 = dest3;
+        this.dest4 = dest4;
+        this.output = output;
+        this.nodeTable = dsgtdb.getTripleTable().getNodeTupleTable().getNodeTable();
+        this.prefixes = dsgtdb.getPrefixes();
+        NodeTable nodeTable2 = dsgtdb.getQuadTable().getNodeTupleTable().getNodeTable();
+        if ( nodeTable != nodeTable2 )
+            throw new BulkLoaderException("Different node tables");
+    }
+    
+    // StreamRDF
+    private TransactionCoordinator coordinator;
+    private Transaction transaction; 
+    @Override
+    public void startBulk() {
+        coordinator = CoLib.newCoordinator();
+        CoLib.add(coordinator, nodeTable);
+        
+        // Prefixes
+        NodeTupleTable p = ((DatasetPrefixesTDB)prefixes).getNodeTupleTable();
+        CoLib.add(coordinator, p.getNodeTable());
+        CoLib.add(coordinator, p.getTupleTable().getIndexes());
+        CoLib.start(coordinator);
+        transaction = coordinator.begin(TxnType.WRITE);
+    }
+
+    @Override
+    public void finishBulk() {
+        transaction.commit();
+        transaction.end();
+        CoLib.finish(coordinator);
+    }
+
+    @Override public void start() {}
+
+    @Override public void finish() {}
+
+    @Override public long count()           { return countTriples() + countQuads(); }
+
+    @Override public long countTriples()    { return countTriples; }
+
+    @Override public long countQuads()      { return countQuads; }
+
+    @Override
+    public void triple(Triple triple) {
+        countTriples++;
+        Tuple<NodeId> tuple = nodes(nodeTable, triple);
+        dest3.accept(tuple);
+    }
+
+    @Override
+    public void quad(Quad quad) {
+        if ( quad.isTriple() || quad.isDefaultGraph() ) {
+            triple(quad.asTriple());
+            return;
+        }
+        countQuads++;
+        Tuple<NodeId> tuple = nodes(nodeTable, quad);
+        dest4.accept(tuple);
+    }
+    
+    @Override
+    public void base(String base) {}
+
+    @Override
+    public void prefix(String prefix, String iri) {
+        // Clean constant handling.
+        prefixes.insertPrefix("", prefix, iri);
+    }
+
+    private static Tuple<NodeId> nodes(NodeTable nt, Triple triple) {
+        NodeId s = idForNode(nt, triple.getSubject());
+        NodeId p = idForNode(nt, triple.getPredicate());
+        NodeId o = idForNode(nt, triple.getObject());
+        return TupleFactory.tuple(s,p,o);
+    }
+    
+   private static Tuple<NodeId> nodes(NodeTable nt, Quad quad) {
+        NodeId g = idForNode(nt, quad.getGraph());
+        NodeId s = idForNode(nt, quad.getSubject());
+        NodeId p = idForNode(nt, quad.getPredicate());
+        NodeId o = idForNode(nt, quad.getObject());
+        return TupleFactory.tuple(g,s,p,o);
+    }
+    
+    private static final NodeId idForNode(NodeTable nodeTable, Node node) {
+        return nodeTable.getAllocateNodeId(node);
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/2934c550/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/Destination.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/Destination.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/Destination.java
new file mode 100644
index 0000000..72f6750
--- /dev/null
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/Destination.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.tdb2.loader.main;
+
+import java.util.List;
+
+/** Unit of delivery to a processing stage. */
+@FunctionalInterface
+public interface Destination<X> {
+    void deliver(List<X> block); 
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/2934c550/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/Indexer.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/Indexer.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/Indexer.java
new file mode 100644
index 0000000..6982917
--- /dev/null
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/Indexer.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.tdb2.loader.main;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Semaphore;
+
+import org.apache.jena.atlas.lib.ArrayUtils;
+import org.apache.jena.atlas.lib.Timer;
+import org.apache.jena.atlas.lib.tuple.Tuple;
+import org.apache.jena.dboe.transaction.txn.Transaction;
+import org.apache.jena.dboe.transaction.txn.TransactionCoordinator;
+import org.apache.jena.query.TxnType;
+import org.apache.jena.tdb2.loader.base.BulkStartFinish;
+import org.apache.jena.tdb2.loader.base.CoLib;
+import org.apache.jena.tdb2.loader.base.MonitorOutput;
+import org.apache.jena.tdb2.store.NodeId;
+import org.apache.jena.tdb2.store.tupletable.TupleIndex;
+
+/**
+ * Build index(es).
+ * Provides a function {@link #index()} that should be called from another thread
+ * to deliver chunks of tuples ({@code  List<Tuple<NodeId>>}).
+ * Each chunk should be the same Tuyple length and this must correspond to the length of the {@link TupleIndex}s being loaded. 
+ * <p>
+ * This class creates one thread per {@link TupleIndex}.  
+ */
+public class Indexer implements BulkStartFinish {
+
+    private BlockingQueue<List<Tuple<NodeId>>>[] pipesTripleIndexers;
+    private final int N;
+    private final MonitorOutput output;
+    private TupleIndex[] indexes;
+    private final Semaphore termination = new Semaphore(0);
+    
+    @SuppressWarnings("unchecked")
+    public Indexer(MonitorOutput output, TupleIndex... idxTriples) {
+        pipesTripleIndexers = ArrayUtils.alloc(BlockingQueue.class, idxTriples.length);
+        this.N = idxTriples.length;
+        this.indexes = Arrays.copyOf(idxTriples, N); 
+        this.output = output; 
+            
+        for ( int i = 0 ; i < N ; i++ ) {
+            pipesTripleIndexers[i] = new ArrayBlockingQueue<List<Tuple<NodeId>>>(LoaderConst.QueueSizeTuples);
+        }
+    }
+    
+    private static long acquire(Semaphore semaphore, int numPermits) {
+        return Timer.time(()->{
+            try { semaphore.acquire(numPermits); }
+            catch (InterruptedException e) { e.printStackTrace(); }
+        });
+    }
+    
+    /** Return a function that delivers multiple {@code List<Tuple<NodeId>>>} to this indexer */
+    public Destination<Tuple<NodeId>> index() {
+        return this::index; 
+    }
+    
+    private void index(List<Tuple<NodeId>> chunk) {
+        for ( int i = 0 ; i < N ; i++ ) {
+            try {
+                pipesTripleIndexers[i].put(chunk);
+            }
+            catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    /** Start the threads that will do the indexing */ 
+    @Override
+    public void startBulk() {
+        for ( int i = 0 ; i < N ; i++ ) {
+            TupleIndex idx = indexes[i];
+            BlockingQueue<List<Tuple<NodeId>>> pipe = pipesTripleIndexers[i];
+            new Thread(()->stageIndex(pipe, idx)).start();
+        }
+    }
+    
+    /** Wait for all the indexing threads to complete. */ 
+    @Override
+    public void finishBulk() {
+        //output.print("Wait for %d indexers", N);
+        acquire(termination, N);
+    }
+    
+    private void stageIndex(BlockingQueue<List<Tuple<NodeId>>> pipe, TupleIndex idx) {
+        TransactionCoordinator coordinator = CoLib.newCoordinator();
+        CoLib.add(coordinator, idx);
+        CoLib.start(coordinator);
+        Transaction transaction = coordinator.begin(TxnType.WRITE);
+        boolean workHasBeenDone; 
+        try {
+            Destination<Tuple<NodeId>> loader = loadTuples(idx);
+            for (;;) {
+                List<Tuple<NodeId>> tuples = pipe.take();
+                if ( tuples.isEmpty() )
+                    break;
+                loader.deliver(tuples);
+            }
+            workHasBeenDone = ! idx.isEmpty();
+            transaction.commit();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            transaction.abort();
+            workHasBeenDone = false;
+        }
+        CoLib.finish(coordinator);
+        if ( workHasBeenDone )
+            output.print("Finish - index %s", idx.getName());
+        termination.release();
+    }
+    
+    private static Destination<Tuple<NodeId>> loadTuples(TupleIndex index) {
+        return (List<Tuple<NodeId>> tuples) -> {
+            for(Tuple<NodeId> tuple : tuples)
+                index.add(tuple);
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/2934c550/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/IndexerInline.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/IndexerInline.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/IndexerInline.java
new file mode 100644
index 0000000..0464a7c
--- /dev/null
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/IndexerInline.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.tdb2.loader.main;
+
+import java.util.Arrays;
+
+import org.apache.jena.atlas.lib.tuple.Tuple;
+import org.apache.jena.dboe.transaction.txn.Transaction;
+import org.apache.jena.dboe.transaction.txn.TransactionCoordinator;
+import org.apache.jena.query.TxnType;
+import org.apache.jena.tdb2.loader.base.BulkStartFinish;
+import org.apache.jena.tdb2.loader.base.CoLib;
+import org.apache.jena.tdb2.loader.base.MonitorOutput;
+import org.apache.jena.tdb2.store.NodeId;
+import org.apache.jena.tdb2.store.tupletable.TupleIndex;
+
+/**
+ * Build index(es).
+ * <p>
+ * This is an inline indexer, it loads each Tuple<NodeId> on the calling thread. 
+ */
+public class IndexerInline implements BulkStartFinish {
+    private final int N;
+    private final MonitorOutput output;
+    private TupleIndex[] indexes;
+    private TransactionCoordinator coordinator;
+    private Transaction transaction;
+    
+    public IndexerInline(MonitorOutput output, TupleIndex... idxTriples) {
+        this.N = idxTriples.length;
+        this.indexes = Arrays.copyOf(idxTriples, N); 
+        this.output = output; 
+    }
+    
+    @Override
+    public void startBulk() { 
+        TransactionCoordinator coordinator = CoLib.newCoordinator();
+        Arrays.stream(indexes).forEach(idx->CoLib.add(coordinator, idx));
+        CoLib.start(coordinator);
+        transaction = coordinator.begin(TxnType.WRITE);
+    }
+    
+    @Override
+    public void finishBulk() { 
+        transaction.commit();
+        transaction.end();
+        CoLib.finish(coordinator);
+    }
+    
+    public void load(Tuple<NodeId> tuple) {
+        for ( TupleIndex idx : indexes )
+            idx.add(tuple);
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/2934c550/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/InputStage.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/InputStage.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/InputStage.java
new file mode 100644
index 0000000..13cb6ff
--- /dev/null
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/InputStage.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.tdb2.loader.main;
+
+/** 
+ * The first phase, parsing to at least one index each of triples and quads
+ * can be done in several ways.
+ * <ul>
+ * <li> {@code MULTI} - one thread parsing (caller), one for nodetable/tuples, and one for each index
+ * <li> {@code PARSE_NODE} - one thread parsing (caller) and also nodetable/tuples, and one for each index
+ * <li> {@code PARSE_NODE_INDEX} - use the caller thread for all operations
+ * </ul>
+ * {@code MULTI} is fastest when hardware allows.
+ * <br/>
+ * When data is triples or quads, not a mixture, {@code PARSE_NODE} uses two threads.
+ * <br/>
+ * {@code PARSE_NODE_INDEX} uses only the caller thread for all steps.
+ */
+enum InputStage { 
+    MULTI,
+    PARSE_NODE, 
+    PARSE_NODE_INDEX
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/jena/blob/2934c550/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderConst.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderConst.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderConst.java
new file mode 100644
index 0000000..c168c93
--- /dev/null
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderConst.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.tdb2.loader.main;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.jena.atlas.lib.tuple.Tuple;
+import org.apache.jena.tdb2.store.NodeId;
+
+public class LoaderConst {
+
+    /** Chunk size for the triple->tuples output pipe */  
+    public final static int ChunkSize = 100_000 ;
+
+    /** Queue size for chunks of tuples Tuples */
+    public final static int QueueSizeTuples = 10;
+
+    //public final static int pipeSize = 10;
+    
+    /* package */ static final List<Tuple<NodeId>> END_TUPLES      = Collections.emptyList();
+
+    /*package*/ static final int QueueSizeData = 10;
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/2934c550/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderMain.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderMain.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderMain.java
new file mode 100644
index 0000000..373f24b
--- /dev/null
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderMain.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.tdb2.loader.main;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import org.apache.jena.atlas.lib.tuple.Tuple;
+import org.apache.jena.dboe.transaction.txn.Transaction;
+import org.apache.jena.dboe.transaction.txn.TransactionCoordinator;
+import org.apache.jena.graph.Node;
+import org.apache.jena.query.TxnType;
+import org.apache.jena.riot.lang.StreamRDFCounting;
+import org.apache.jena.riot.system.StreamRDF;
+import org.apache.jena.sparql.core.DatasetGraph;
+import org.apache.jena.tdb2.loader.DataLoader;
+import org.apache.jena.tdb2.loader.base.*;
+import org.apache.jena.tdb2.store.DatasetGraphTDB;
+import org.apache.jena.tdb2.store.DatasetPrefixesTDB;
+import org.apache.jena.tdb2.store.NodeId;
+import org.apache.jena.tdb2.store.tupletable.TupleIndex;
+import org.apache.jena.tdb2.sys.TDBInternal;
+
+/**
+ * The phased {@link DataLoader}, which runs loading in a number of phases. 
+ * Options are controlled by a {@link LoaderPlan}. 
+ * <p>
+ * The first phase is data to primary indexes, and maybe some additional indexes. 
+ * Then phases of replaying an index (triples or quads) to drive creating more secondary
+ * indexes.
+ * </p>
+ * <p>
+ * {@link LoaderPlans#loaderPlanParallel} is the parallel loader - do everything in
+ * the first phase, one thread for pasring, for node table insertion and one for each index built.
+ * </p>
+ * <p>  
+ * {@link LoaderPlans#loaderPlanPhased} is the phased loader. 
+ * </p>
+ * <p>
+ * The process is: 
+ * <blockquote> 
+ * Data phase: {@code parser -> to NodeIds/Tuples -> TupleIndex+}
+ * <br/>
+ * Additional index phases: {@code primary index -> Indexer*}
+ * </blockquote>
+ * <p><b>Data Phase</b></p>
+ * <p>
+ * {@link DataBatcher} produces {@link DataBlock DataBlocks} - grouping of triples and
+ * quads and sends them to a handler {@code Consumer<DataBlock>}. This is wired up to be
+ * the feed for {@link DataToTuples}.
+ * </p><p>
+ * {@link DataToTuples} processes {@link DataBlock DataBlocks} to create 2 outputs blocks
+ * of {@code Tuple<NodeId>}, one output for triples, one for quads, and sends these to
+ * {@link Indexer}s for triples and quads.
+ * </p><p>
+ * {@link Indexer} processes blocks of {@code Tuple<NodeId>} (of the same tuple length)
+ * and writes them to a number of indexes. Each index being written is a separate thread.
+ * </p><p>
+ * The normal execution of {@link InputStage#MULTI} is provided by {@code executeData}
+ * and is {@code DataBatcher -> DataToTuples -> Indexer+} on separate threads.
+ * </p><p>
+ * One alternative is {@link InputStage#PARSE_NODE}, provided by {@code executeDataParseId},
+ * which uses on thread for both parsing and node table
+ * {@code DataToTuplesInline -> Indexer+}.
+ * </p><p>
+ * The third alternative {@link InputStage#PARSE_NODE}, provided by {@code executeDataOneThread},
+ * which do doesa all input stage operations on the calling parser thread.
+ * </p>
+ * <p><b>Index Phase</b></p>
+ * <p>
+ * Additional indexes are built in a number of later phases. Each phase copies the primary index for triples
+ * to other indexes in controllable groups.  This happens for triples and for quads. See {@code executeSecondary}.
+ * </p>
+ * @see LoaderPlans
+ */
+public class LoaderMain extends LoaderBase implements DataLoader {
+    public static final int DataTickPoint   = 500_000;
+    public static final int DataSuperTick   = 10;
+    public static final int IndexTickPoint  = 1_000_000;
+    public static final int IndexSuperTick  = 10;
+    
+    private final LoaderPlan loaderPlan;
+    
+    private final DatasetGraphTDB dsgtdb;
+    private final StreamRDF stream;
+    private final Map<String, TupleIndex> indexMap;
+
+    private final StreamRDFCounting dataInput;
+    private final List<BulkStartFinish> dataProcess = new ArrayList<>();
+    
+    public LoaderMain(LoaderPlan loaderPlan, DatasetGraph dsg, MonitorOutput output) {
+        this(loaderPlan, dsg, null, output);
+    }
+    
+    public LoaderMain(LoaderPlan loaderPlan, DatasetGraph dsg, Node graphName, MonitorOutput output) {
+        super(dsg, graphName, output);
+        this.loaderPlan = loaderPlan;
+        dsgtdb = TDBInternal.getDatasetGraphTDB(dsg);
+        indexMap = PhasedOps.indexMap(dsgtdb);
+        // Phase 1.
+        switch ( loaderPlan.dataInputType() ) {
+            case MULTI :
+                dataInput = executeData(loaderPlan, dsgtdb, indexMap, dataProcess, output);
+                break;
+            case PARSE_NODE :
+                dataInput = executeDataParseId(loaderPlan, dsgtdb, indexMap, dataProcess, output);
+                break;
+            case PARSE_NODE_INDEX :
+                dataInput = executeDataOneThread(loaderPlan, dsgtdb, indexMap, dataProcess, output);
+                break;
+            default :
+                throw new IllegalStateException();
+        }
+        stream = LoaderOps.toNamedGraph(dataInput, graphName);
+    }
+
+    /**
+     * Create data ingestion and primary index building of a {@link LoaderPlan}. 
+     */
+    private static StreamRDFCounting executeData(LoaderPlan loaderPlan, DatasetGraphTDB dsgtdb, Map<String, TupleIndex> indexMap, List<BulkStartFinish> dataProcess, MonitorOutput output) {
+        DatasetPrefixesTDB dps = (DatasetPrefixesTDB)dsgtdb.getPrefixes();
+        PrefixHandler prefixHandler = new PrefixHandler(dps, output);
+        dataProcess.add(prefixHandler);
+    
+        // Must be one index at least of each triples and quads.
+            
+        TupleIndex[] idx3 = PhasedOps.indexSetFromNames(loaderPlan.primaryLoad3(), indexMap);
+        Indexer indexer3 = new Indexer(output, idx3);
+        TupleIndex[] idx4 = PhasedOps.indexSetFromNames(loaderPlan.primaryLoad4(), indexMap);
+        Indexer indexer4 = new Indexer(output, idx4);
+    
+        dataProcess.add(indexer4);
+        dataProcess.add(indexer3);
+        
+        Destination<Tuple<NodeId>> functionIndexer3 = indexer3.index();
+        Destination<Tuple<NodeId>> functionIndexer4 = indexer4.index();
+        
+        DataToTuples dtt = new DataToTuples(dsgtdb, functionIndexer3, functionIndexer4, output);
+        Consumer<DataBlock> dest = dtt.data();
+        DataBatcher dataBatcher = new DataBatcher(dest, prefixHandler.handler(), output);
+        StreamRDF baseInput = dataBatcher;
+        
+        dataProcess.add(dtt);
+        dataProcess.add(dataBatcher);
+        return dataBatcher;
+    }
+
+    /**
+     * Create data ingestion and primary index building of a {@link LoaderPlan}.
+     * This version uses a thread for parse/NodeTable/Tuple and a thread for each of triple and quad index for phase one.  
+     */
+    private static StreamRDFCounting executeDataParseId(LoaderPlan loaderPlan, DatasetGraphTDB dsgtdb, Map<String, TupleIndex> indexMap, List<BulkStartFinish> dataProcess, MonitorOutput output) {
+        // One thread for parse/NodeTable.
+        // Two steps of phase one on the invoking thread.
+        // Chunk and dispatch to indexers for the tuple loading.
+        
+        TupleIndex[] idx3 = PhasedOps.indexSetFromNames(loaderPlan.primaryLoad3(), indexMap);
+        Indexer indexer3 = new Indexer(output, idx3);
+        TupleIndex[] idx4 = PhasedOps.indexSetFromNames(loaderPlan.primaryLoad4(), indexMap);
+        Indexer indexer4 = new Indexer(output, idx4);
+        
+        DataToTuplesInline dttInline = new DataToTuplesInline(dsgtdb, indexer3.index(), indexer4.index(), output);
+        dataProcess.add(indexer3);
+        dataProcess.add(indexer4);
+        dataProcess.add(dttInline);
+        return dttInline;
+    }
+
+    /**
+     * Create data ingestion and primary index building of a {@link LoaderPlan}.
+     * This version uses a thread for parse/NodeTable/Tuple/Index.  
+     */
+    private static StreamRDFCounting executeDataOneThread(LoaderPlan loaderPlan, DatasetGraphTDB dsgtdb, Map<String, TupleIndex> indexMap, List<BulkStartFinish> dataProcess, MonitorOutput output) {
+        // One thread input stage.
+        // All three phase one steps on the invoking thread.
+
+        TupleIndex[] idx3 = PhasedOps.indexSetFromNames(loaderPlan.primaryLoad3(), indexMap);
+        IndexerInline indexer3 = new IndexerInline(output, idx3);
+        Consumer<Tuple<NodeId>> dest3 = tuple->indexer3.load(tuple);
+        
+        TupleIndex[] idx4 = PhasedOps.indexSetFromNames(loaderPlan.primaryLoad4(), indexMap);
+        IndexerInline indexer4 = new IndexerInline(output, idx4);
+        Consumer<Tuple<NodeId>> dest4 = tuple->indexer4.load(tuple);
+        
+        DataToTuplesInlineSingle dataToTuples = new DataToTuplesInlineSingle(dsgtdb, dest3, dest4, output);
+        dataProcess.add(indexer3);
+        dataProcess.add(indexer4);
+        dataProcess.add(dataToTuples);
+        return dataToTuples;
+    }
+    
+    @Override
+    public StreamRDF stream() {
+        return stream;
+    }
+    
+    @Override
+    public boolean bulkUseTransaction() {
+        // Manipulate the transactions directly by component. 
+        return false;
+    }
+
+    @Override
+    public void startBulk() {
+        // Lock everyone else out while we multithread.
+        dsgtdb.getTxnSystem().getTxnMgr().startExclusiveMode();
+        super.startBulk();
+        // Set the data pipeline
+        BulkProcesses.start(dataProcess);
+    }
+
+    @Override
+    public void finishBulk() {
+        // Close off the data pipeline
+        BulkProcesses.finish(dataProcess);
+        
+        boolean doTriples = countTriples() != 0;
+        boolean doQuads = countQuads() != 0 ;
+        
+        if ( doTriples ) {
+            TupleIndex srcIdx3 = PhasedOps.findInIndexMap(loaderPlan.primaryLoad3()[0], indexMap);
+            TupleIndex[][] indexSets3 = PhasedOps.indexSetsFromNames(loaderPlan.secondaryIndex3(), indexMap);
+            executeSecondary(srcIdx3, indexSets3, dsgtdb, output);
+        }
+        
+        if ( doQuads ) {
+            TupleIndex srcIdx4 = PhasedOps.findInIndexMap(loaderPlan.primaryLoad4()[0], indexMap);
+            TupleIndex[][] indexSets4 = PhasedOps.indexSetsFromNames(loaderPlan.secondaryIndex4(), indexMap);
+            executeSecondary(srcIdx4, indexSets4, dsgtdb, output);
+        }
+        super.finishBulk();
+        dsgtdb.getTxnSystem().getTxnMgr().finishExclusiveMode();
+    }
+    
+    /** Execute secondary index building of a {@link LoaderPlan} */
+    private static void executeSecondary(TupleIndex srcIdx, TupleIndex[][] indexSets, DatasetGraphTDB dsgtdb, MonitorOutput output) {
+        
+        List<BulkStartFinish> processes = new ArrayList<>();
+        output.print("Start replay index %s", srcIdx.getName());
+        // For each phase.
+        for ( TupleIndex[] indexes : indexSets ) {
+            if ( indexes.length == 0 )
+                // Nothing in this phase. 
+                continue;
+            indexPhase(processes, srcIdx, indexes, output);
+            // processes - wait now or wait later?
+        }
+        // Now make sure they are flushed.
+        BulkProcesses.finish(processes);
+    }
+
+    private static void indexPhase(List<BulkStartFinish> processes, TupleIndex srcIdx, TupleIndex[] indexes, MonitorOutput output) {
+        String indexSetLabel = PhasedOps.indexMappings(indexes);
+        output.print("Index set:  %s => %s", srcIdx.getName(), indexSetLabel);
+        Indexer indexer = new Indexer(output, indexes);
+        Destination<Tuple<NodeId>> dest = indexer.index();
+        indexer.startBulk();
+        TransactionCoordinator coordinator = CoLib.newCoordinator();
+        CoLib.add(coordinator, srcIdx);
+        CoLib.start(coordinator);
+        // READ transaction.
+        Transaction transaction = coordinator.begin(TxnType.READ);
+        // Add to processes - we can wait later if we do not touched indexes being built.
+        processes.add(indexer);
+        PhasedOps.ReplayResult result = PhasedOps.replay(srcIdx, dest, output);
+        // End read tranaction on srcIdx
+        transaction.end();
+        
+        String timeStr = "---";
+        if ( result.elapsed != 0 ) {
+            double time = result.elapsed / 1000.0;
+            //long AvgRate = (result.items * 1000L) / result.elapsed;
+            timeStr = String.format("%,.1f", time);
+        }
+        output.print("Index set:  %s => %s [%,d items, %s seconds]", srcIdx.getName(), indexSetLabel, result.items, timeStr);
+    }
+
+    
+//    private static Map<String, TupleIndex> indexMap(DatasetGraphTDB dsgtdb) {
+//        Map<String, TupleIndex> indexMap = new HashMap<>();
+//        // All triple/quad indexes.
+//        Arrays.stream(dsgtdb.getTripleTable().getNodeTupleTable().getTupleTable().getIndexes())
+//              .forEach(idx->indexMap.put(idx.getName(), idx));
+//        Arrays.stream(dsgtdb.getQuadTable().getNodeTupleTable().getTupleTable().getIndexes())
+//              .forEach(idx->indexMap.put(idx.getName(), idx));
+//        return indexMap;
+//    }
+    
+    @Override
+    public void finishException(Exception ex) {
+        try { 
+            dsgtdb.getTxnSystem().getTxnMgr().finishExclusiveMode();
+        } catch (Exception ex2) {
+            ex.addSuppressed(ex2);
+        }
+    }
+    
+    @Override
+    public long countTriples() {
+        return dataInput.countTriples();
+    }
+
+    @Override
+    public long countQuads() {
+        return dataInput.countQuads();
+    }
+
+    @Override
+    protected void loadOne(String filename) {
+        LoaderOps.inputFile(stream, filename, output, DataTickPoint, DataSuperTick);
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/2934c550/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderParallel.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderParallel.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderParallel.java
new file mode 100644
index 0000000..621c6d4
--- /dev/null
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderParallel.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.tdb2.loader.main;
+
+import org.apache.jena.graph.Node;
+import org.apache.jena.sparql.core.DatasetGraph;
+import org.apache.jena.tdb2.loader.base.MonitorOutput;
+
+public class LoaderParallel extends LoaderMain {
+    
+    public LoaderParallel(DatasetGraph dsg, MonitorOutput output) {
+        super(LoaderPlans.loaderPlanParallel, dsg, output);
+    }
+    
+    public LoaderParallel(DatasetGraph dsg, Node graphName, MonitorOutput output) {
+        super(LoaderPlans.loaderPlanParallel, dsg, graphName, output);
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/2934c550/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderPhased.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderPhased.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderPhased.java
new file mode 100644
index 0000000..074659d
--- /dev/null
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderPhased.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.tdb2.loader.main;
+
+import org.apache.jena.graph.Node;
+import org.apache.jena.sparql.core.DatasetGraph;
+import org.apache.jena.tdb2.loader.base.MonitorOutput;
+
+public class LoaderPhased extends LoaderMain {
+    
+    public LoaderPhased(DatasetGraph dsg, MonitorOutput output) {
+        this(dsg, null, output);
+    }
+    
+    public LoaderPhased(DatasetGraph dsg, Node graphName, MonitorOutput output) {
+        super(LoaderPlans.loaderPlanPhased, dsg, graphName, output);
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/2934c550/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderPlan.java
----------------------------------------------------------------------
diff --git a/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderPlan.java b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderPlan.java
new file mode 100644
index 0000000..299eda2
--- /dev/null
+++ b/jena-db/jena-tdb2/src/main/java/org/apache/jena/tdb2/loader/main/LoaderPlan.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.tdb2.loader.main;
+
+import org.apache.jena.tdb2.store.NodeId;
+
+/** 
+ * A {@code LoaderPlan}
+ * <p>
+ * For triples and for quads there is a first phase to parse the input, 
+ * convert to tuples of {@link NodeId NodeIds}, including allocating the ids,
+ * and do at least one tuple index for each of triples quads to capture the input.
+ * <p>   
+ * After that, a number of phases builds the other indexes. 
+ * <p>
+ * The {@code mulithreadedInput} flag indicates whether the first phase is
+ * done in parallel (threads for parer, node table building and primary indexes)
+ * or as a single threaded process.
+ */
+public class LoaderPlan {
+    private final InputStage dataInput;
+    private final String[] loadGroup3;
+    private final String[] loadGroup4;
+    private final String[][] secondaryGroups3;
+    private final String[][] secondaryGroups4;
+    
+    public LoaderPlan(InputStage dataInput,
+                      String[] loadGroup3, String[] loadGroup4,
+                      String[][] secondaryGroups3, String[][] secondaryGroups4) {
+        this.dataInput = dataInput;
+        this.loadGroup3 = loadGroup3;
+        this.loadGroup4 = loadGroup4;
+        this.secondaryGroups3 = secondaryGroups3;
+        this.secondaryGroups4 = secondaryGroups4;
+    }
+    public InputStage dataInputType()       { return dataInput; }
+    public String[] primaryLoad3()          { return loadGroup3; }
+    public String[] primaryLoad4()          { return loadGroup4; }
+    public String[][] secondaryIndex3()     { return secondaryGroups3; }
+    public String[][] secondaryIndex4()     { return secondaryGroups4; }
+}
\ No newline at end of file


Mime
View raw message