jena-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [25/40] jena git commit: Fix line endings
Date Sat, 14 May 2016 17:22:21 GMT
http://git-wip-us.apache.org/repos/asf/jena/blob/d6ae87fd/jena-arq/src/main/java/org/apache/jena/riot/lang/LangCSV.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/riot/lang/LangCSV.java b/jena-arq/src/main/java/org/apache/jena/riot/lang/LangCSV.java
index db1cf03..a5c9bfc 100644
--- a/jena-arq/src/main/java/org/apache/jena/riot/lang/LangCSV.java
+++ b/jena-arq/src/main/java/org/apache/jena/riot/lang/LangCSV.java
@@ -1,152 +1,152 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jena.riot.lang;
-
-import java.io.InputStream;
-import java.io.Reader;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.jena.atlas.csv.CSVParser;
-import org.apache.jena.atlas.lib.IRILib ;
-import org.apache.jena.datatypes.xsd.XSDDatatype ;
-import org.apache.jena.graph.Node ;
-import org.apache.jena.graph.NodeFactory ;
-import org.apache.jena.riot.Lang;
-import org.apache.jena.riot.RDFLanguages;
-import org.apache.jena.riot.system.ErrorHandler;
-import org.apache.jena.riot.system.IRIResolver;
-import org.apache.jena.riot.system.ParserProfile;
-import org.apache.jena.riot.system.RiotLib;
-import org.apache.jena.riot.system.StreamRDF;
-
-public class LangCSV implements LangRIOT {
-
-	public static final String CSV_PREFIX = "http://w3c/future-csv-vocab/";
-	public static final String CSV_ROW = CSV_PREFIX + "row";
-
-	private InputStream input = null;
-	private Reader reader = null;
-	private String base;
-	private String filename;
-	private StreamRDF sink;
-	private ParserProfile profile; // Warning - we don't use all of this.
-
-	@Override
-	public Lang getLang() {
-		return RDFLanguages.CSV;
-
-	}
-
-	@Override
-	public ParserProfile getProfile() {
-		return profile;
-	}
-
-	@Override
-	public void setProfile(ParserProfile profile) {
-		this.profile = profile;
-	}
-
-	public LangCSV(Reader reader, String base, String filename,
-			ErrorHandler errorHandler, StreamRDF sink) {
-		this.reader = reader;
-		this.base = base;
-		this.filename = filename;
-		this.sink = sink;
-		this.profile = RiotLib.profile(getLang(), base, errorHandler);
-	}
-
-	public LangCSV(InputStream in, String base, String filename,
-			ErrorHandler errorHandler, StreamRDF sink) {
-		this.input = in;
-		this.base = base;
-		this.filename = filename;
-		this.sink = sink;
-		this.profile = RiotLib.profile(getLang(), base, errorHandler);
-	}
-
-	@Override
-	public void parse() {
-		sink.start();
-		CSVParser parser = (input != null) ? CSVParser.create(input)
-				: CSVParser.create(reader);
-		ArrayList<Node> predicates = new ArrayList<Node>();
-		int rowNum = 0;
-		for (List<String> row : parser) {
-			
-			if (rowNum == 0) {
-				for (String column : row) {
-					String uri = IRIResolver.resolveString(filename) + "#"
-							+ toSafeLocalname(column);
-					Node predicate = this.profile.createURI(uri, rowNum, 0);
-					predicates.add(predicate);
-				}
-			} else {
-				//Node subject = this.profile.createBlankNode(null, -1, -1);
-				Node subject = caculateSubject(rowNum, filename);
-				Node predicateRow = this.profile.createURI(CSV_ROW, -1, -1);
-				Node objectRow = this.profile
-						.createTypedLiteral((rowNum + ""),
-								XSDDatatype.XSDinteger, rowNum, 0);
-				sink.triple(this.profile.createTriple(subject, predicateRow,
-						objectRow, rowNum, 0));
-				for (int col = 0; col < row.size() && col<predicates.size(); col++) {
-					Node predicate = predicates.get(col);
-					String columnValue = row.get(col).trim();
-					if("".equals(columnValue)){
-						continue;
-					}					
-					Node o;
-					try {
-						// Try for a double.
-						Double.parseDouble(columnValue);
-						o = NodeFactory.createLiteral(columnValue,
-								XSDDatatype.XSDdouble);
-					} catch (Exception e) {
-						o = NodeFactory.createLiteral(columnValue);
-					}
-					sink.triple(this.profile.createTriple(subject, predicate,
-							o, rowNum, col));
-				}
-
-			}
-			rowNum++;
-		}
-		sink.finish();
-
-	}
-
-	public static String toSafeLocalname(String raw) {
-		String ret = raw.trim();
-		return encodeURIComponent(ret);
-		
-	}
-	
-	public static String encodeURIComponent(String s) {
-	    return IRILib.encodeUriComponent(s);
-	}
-	
-	public static Node caculateSubject(int rowNum, String filename){
-		Node subject = NodeFactory.createBlankNode();
-//		String uri = IRIResolver.resolveString(filename) + "#Row_" + rowNum; 
-//		Node subject =  NodeFactory.createURI(uri);
-		return subject;
-	}
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.lang;
+
+import java.io.InputStream;
+import java.io.Reader;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.jena.atlas.csv.CSVParser;
+import org.apache.jena.atlas.lib.IRILib ;
+import org.apache.jena.datatypes.xsd.XSDDatatype ;
+import org.apache.jena.graph.Node ;
+import org.apache.jena.graph.NodeFactory ;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFLanguages;
+import org.apache.jena.riot.system.ErrorHandler;
+import org.apache.jena.riot.system.IRIResolver;
+import org.apache.jena.riot.system.ParserProfile;
+import org.apache.jena.riot.system.RiotLib;
+import org.apache.jena.riot.system.StreamRDF;
+
+public class LangCSV implements LangRIOT {
+
+	public static final String CSV_PREFIX = "http://w3c/future-csv-vocab/";
+	public static final String CSV_ROW = CSV_PREFIX + "row";
+
+	private InputStream input = null;
+	private Reader reader = null;
+	private String base;
+	private String filename;
+	private StreamRDF sink;
+	private ParserProfile profile; // Warning - we don't use all of this.
+
+	@Override
+	public Lang getLang() {
+		return RDFLanguages.CSV;
+
+	}
+
+	@Override
+	public ParserProfile getProfile() {
+		return profile;
+	}
+
+	@Override
+	public void setProfile(ParserProfile profile) {
+		this.profile = profile;
+	}
+
+	public LangCSV(Reader reader, String base, String filename,
+			ErrorHandler errorHandler, StreamRDF sink) {
+		this.reader = reader;
+		this.base = base;
+		this.filename = filename;
+		this.sink = sink;
+		this.profile = RiotLib.profile(getLang(), base, errorHandler);
+	}
+
+	public LangCSV(InputStream in, String base, String filename,
+			ErrorHandler errorHandler, StreamRDF sink) {
+		this.input = in;
+		this.base = base;
+		this.filename = filename;
+		this.sink = sink;
+		this.profile = RiotLib.profile(getLang(), base, errorHandler);
+	}
+
+	@Override
+	public void parse() {
+		sink.start();
+		CSVParser parser = (input != null) ? CSVParser.create(input)
+				: CSVParser.create(reader);
+		ArrayList<Node> predicates = new ArrayList<Node>();
+		int rowNum = 0;
+		for (List<String> row : parser) {
+			
+			if (rowNum == 0) {
+				for (String column : row) {
+					String uri = IRIResolver.resolveString(filename) + "#"
+							+ toSafeLocalname(column);
+					Node predicate = this.profile.createURI(uri, rowNum, 0);
+					predicates.add(predicate);
+				}
+			} else {
+				//Node subject = this.profile.createBlankNode(null, -1, -1);
+				Node subject = caculateSubject(rowNum, filename);
+				Node predicateRow = this.profile.createURI(CSV_ROW, -1, -1);
+				Node objectRow = this.profile
+						.createTypedLiteral((rowNum + ""),
+								XSDDatatype.XSDinteger, rowNum, 0);
+				sink.triple(this.profile.createTriple(subject, predicateRow,
+						objectRow, rowNum, 0));
+				for (int col = 0; col < row.size() && col<predicates.size(); col++) {
+					Node predicate = predicates.get(col);
+					String columnValue = row.get(col).trim();
+					if("".equals(columnValue)){
+						continue;
+					}					
+					Node o;
+					try {
+						// Try for a double.
+						Double.parseDouble(columnValue);
+						o = NodeFactory.createLiteral(columnValue,
+								XSDDatatype.XSDdouble);
+					} catch (Exception e) {
+						o = NodeFactory.createLiteral(columnValue);
+					}
+					sink.triple(this.profile.createTriple(subject, predicate,
+							o, rowNum, col));
+				}
+
+			}
+			rowNum++;
+		}
+		sink.finish();
+
+	}
+
+	public static String toSafeLocalname(String raw) {
+		String ret = raw.trim();
+		return encodeURIComponent(ret);
+		
+	}
+	
+	public static String encodeURIComponent(String s) {
+	    return IRILib.encodeUriComponent(s);
+	}
+	
+	public static Node caculateSubject(int rowNum, String filename){
+		Node subject = NodeFactory.createBlankNode();
+//		String uri = IRIResolver.resolveString(filename) + "#Row_" + rowNum; 
+//		Node subject =  NodeFactory.createURI(uri);
+		return subject;
+	}
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/d6ae87fd/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedQuadsStream.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedQuadsStream.java b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedQuadsStream.java
index 5f9d6a6..ff9ba63 100644
--- a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedQuadsStream.java
+++ b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedQuadsStream.java
@@ -1,53 +1,53 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jena.riot.lang ;
-
-import org.apache.jena.graph.Triple ;
-import org.apache.jena.riot.system.StreamRDF ;
-import org.apache.jena.sparql.core.Quad ;
-
-/**
- * Implementation of a producer class that sends Quads; must be connected to a {@code PipedRDFIterator<Quad>}. 
- */
-public class PipedQuadsStream extends PipedRDFStream<Quad> implements StreamRDF
-{
-    /**
-     * Creates a piped quads stream connected to the specified piped 
-     * RDF iterator.  Quads written to this stream will then be 
-     * available as input from <code>sink</code>.
-     *
-     * @param sink The piped RDF iterator to connect to.
-     */
-    public PipedQuadsStream(PipedRDFIterator<Quad> sink)
-    {
-        super(sink) ;
-    }
-
-    @Override
-    public void triple(Triple triple)
-    {
-        // Triples are discarded
-    }
-
-    @Override
-    public void quad(Quad quad)
-    {
-        receive(quad) ;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.lang ;
+
+import org.apache.jena.graph.Triple ;
+import org.apache.jena.riot.system.StreamRDF ;
+import org.apache.jena.sparql.core.Quad ;
+
+/**
+ * Implementation of a producer class that sends Quads; must be connected to a {@code PipedRDFIterator<Quad>}. 
+ */
+public class PipedQuadsStream extends PipedRDFStream<Quad> implements StreamRDF
+{
+    /**
+     * Creates a piped quads stream connected to the specified piped 
+     * RDF iterator.  Quads written to this stream will then be 
+     * available as input from <code>sink</code>.
+     *
+     * @param sink The piped RDF iterator to connect to.
+     */
+    public PipedQuadsStream(PipedRDFIterator<Quad> sink)
+    {
+        super(sink) ;
+    }
+
+    @Override
+    public void triple(Triple triple)
+    {
+        // Triples are discarded
+    }
+
+    @Override
+    public void quad(Quad quad)
+    {
+        receive(quad) ;
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/d6ae87fd/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFIterator.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFIterator.java b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFIterator.java
index 3259b9d..a79ae6f 100644
--- a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFIterator.java
+++ b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFIterator.java
@@ -1,392 +1,392 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jena.riot.lang;
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.jena.atlas.lib.Closeable;
-import org.apache.jena.riot.RiotException;
-import org.apache.jena.riot.system.PrefixMap;
-import org.apache.jena.riot.system.PrefixMapFactory;
-
-/**
- * <p>
- * A {@code PipedRDFIterator} should be connected to a {@link PipedRDFStream}
- * implementation; the piped iterator then provides whatever RDF primitives are
- * written to the {@code PipedRDFStream}
- * </p>
- * <p>
- * Typically, data is read from a {@code PipedRDFIterator} by one thread (the
- * consumer) and data is written to the corresponding {@code PipedRDFStream} by
- * some other thread (the producer). Attempting to use both objects from a
- * single thread is not recommended, as it may deadlock the thread. The
- * {@code PipedRDFIterator} contains a buffer, decoupling read operations from
- * write operations, within limits.
- * </p>
- * <p>
- * Inspired by Java's {@link java.io.PipedInputStream} and
- * {@link java.io.PipedOutputStream}
- * </p>
- * 
- * @param <T>
- *            The type of the RDF primitive, should be one of {@code Triple},
- *            {@code Quad}, or {@code Tuple<Node>}
- * 
- * @see PipedTriplesStream
- * @see PipedQuadsStream
- * @see PipedTuplesStream
- */
-public class PipedRDFIterator<T> implements Iterator<T>, Closeable {
-    /**
-     * Constant for default buffer size
-     */
-    public static final int DEFAULT_BUFFER_SIZE = 10000;
-
-    /**
-     * Constant for default poll timeout in milliseconds, used to stop the
-     * consumer deadlocking in certain circumstances
-     */
-    public static final int DEFAULT_POLL_TIMEOUT = 1000; // one second
-    /**
-     * Constant for max number of failed poll attempts before the producer will
-     * be declared as dead
-     */
-    public static final int DEFAULT_MAX_POLLS = 10;
-
-    private final BlockingQueue<T> queue;
-
-    @SuppressWarnings("unchecked")
-    private final T endMarker = (T) new Object();
-
-    private volatile boolean closedByConsumer = false;
-    private volatile boolean closedByProducer = false;
-    private volatile boolean finished = false;
-    private volatile boolean threadReused = false;
-    private volatile Thread consumerThread;
-    private volatile Thread producerThread;
-
-    private boolean connected = false;
-    private int pollTimeout = DEFAULT_POLL_TIMEOUT;
-    private int maxPolls = DEFAULT_MAX_POLLS;
-
-    private T slot;
-
-    private final Object lock = new Object(); // protects baseIri and prefixes
-    private String baseIri;
-    private final PrefixMap prefixes = PrefixMapFactory.createForInput();
-
-    /**
-     * Creates a new piped RDF iterator with the default buffer size of
-     * {@code DEFAULT_BUFFER_SIZE}.
-     * <p>
-     * Buffer size must be chosen carefully in order to avoid performance
-     * problems, if you set the buffer size too low you will experience a lot of
-     * blocked calls so it will take longer to consume the data from the
-     * iterator. For best performance the buffer size should be at least 10% of
-     * the expected input size though you may need to tune this depending on how
-     * fast your consumer thread is.
-     * </p>
-     */
-    public PipedRDFIterator() {
-        this(DEFAULT_BUFFER_SIZE);
-    }
-
-    /**
-     * Creates a new piped RDF iterator
-     * <p>
-     * Buffer size must be chosen carefully in order to avoid performance
-     * problems, if you set the buffer size too low you will experience a lot of
-     * blocked calls so it will take longer to consume the data from the
-     * iterator. For best performance the buffer size should be roughly 10% of
-     * the expected input size though you may need to tune this depending on how
-     * fast your consumer thread is.
-     * </p>
-     * 
-     * @param bufferSize
-     *            Buffer size
-     */
-    public PipedRDFIterator(int bufferSize) {
-        this(bufferSize, false, DEFAULT_POLL_TIMEOUT, DEFAULT_MAX_POLLS);
-    }
-
-    /**
-     * Creates a new piped RDF iterator
-     * <p>
-     * Buffer size must be chosen carefully in order to avoid performance
-     * problems, if you set the buffer size too low you will experience a lot of
-     * blocked calls so it will take longer to consume the data from the
-     * iterator. For best performance the buffer size should be roughly 10% of
-     * the expected input size though you may need to tune this depending on how
-     * fast your consumer thread is.
-     * </p>
-     * <p>
-     * The fair parameter controls whether the locking policy used for the
-     * buffer is fair. When enabled this reduces throughput but also reduces the
-     * chance of thread starvation. This likely need only be set to {@code true}
-     * if there will be multiple consumers.
-     * </p>
-     * 
-     * @param bufferSize
-     *            Buffer size
-     * @param fair
-     *            Whether the buffer should use a fair locking policy
-     */
-    public PipedRDFIterator(int bufferSize, boolean fair) {
-        this(bufferSize, fair, DEFAULT_POLL_TIMEOUT, DEFAULT_MAX_POLLS);
-    }
-
-    /**
-     * Creates a new piped RDF iterator
-     * <p>
-     * Buffer size must be chosen carefully in order to avoid performance
-     * problems, if you set the buffer size too low you will experience a lot of
-     * blocked calls so it will take longer to consume the data from the
-     * iterator. For best performance the buffer size should be roughly 10% of
-     * the expected input size though you may need to tune this depending on how
-     * fast your consumer thread is.
-     * </p>
-     * <p>
-     * The {@code fair} parameter controls whether the locking policy used for
-     * the buffer is fair. When enabled this reduces throughput but also reduces
-     * the chance of thread starvation. This likely need only be set to
-     * {@code true} if there will be multiple consumers.
-     * </p>
-     * <p>
-     * The {@code pollTimeout} parameter controls how long each poll attempt
-     * waits for data to be produced. This prevents the consumer thread from
-     * blocking indefinitely and allows it to detect various potential deadlock
-     * conditions e.g. dead producer thread, another consumer closed the
-     * iterator etc. and errors out accordingly. It is unlikely that you will
-     * ever need to adjust this from the default value provided by
-     * {@link #DEFAULT_POLL_TIMEOUT}.
-     * </p>
-     * <p>
-     * The {@code maxPolls} parameter controls how many poll attempts will be
-     * made by a single consumer thread within the context of a single call to
-     * {@link #hasNext()} before the iterator declares the producer to be dead
-     * and errors out accordingly. You may need to adjust this if you have a
-     * slow producer thread or many consumer threads.
-     * </p>
-     * 
-     * @param bufferSize
-     *            Buffer size
-     * @param fair
-     *            Whether the buffer should use a fair locking policy
-     * @param pollTimeout
-     *            Poll timeout in milliseconds
-     * @param maxPolls
-     *            Max poll attempts
-     */
-    public PipedRDFIterator(int bufferSize, boolean fair, int pollTimeout, int maxPolls) {
-        if (pollTimeout <= 0)
-            throw new IllegalArgumentException("Poll Timeout must be > 0");
-        if (maxPolls <= 0)
-            throw new IllegalArgumentException("Max Poll attempts must be > 0");
-        this.queue = new ArrayBlockingQueue<>(bufferSize, fair);
-        this.pollTimeout = pollTimeout;
-        this.maxPolls = maxPolls;
-    }
-
-    @Override
-    public boolean hasNext() {
-        if (!connected)
-            throw new IllegalStateException("Pipe not connected");
-
-        if (closedByConsumer)
-            throw new RiotException("Pipe closed");
-
-        if (finished)
-            return false;
-
-        consumerThread = Thread.currentThread();
-
-        // Depending on how code and/or the JVM schedules the threads involved
-        // there is a scenario that exists where a producer can finish/die
-        // before theconsumer is started and the consumer is scheduled onto the
-        // same thread thus resulting in a deadlock on the consumer because it
-        // will never be able to detect that the producer died
-        // In this scenario we need to set a special flag to indicate the
-        // possibility
-        if (producerThread != null && producerThread == consumerThread)
-            threadReused = true;
-
-        if (slot != null)
-            return true;
-
-        int attempts = 0;
-        while (true) {
-            attempts++;
-            try {
-                slot = queue.poll(this.pollTimeout, TimeUnit.MILLISECONDS);
-            } catch (InterruptedException e) {
-                throw new CancellationException();
-            }
-
-            if (null != slot)
-                break;
-
-            // If the producer thread died and did not call finish() then
-            // declare this pipe to be "broken"
-            // Since check is after the break, we will drain as much as possible
-            // out of the queue before throwing this exception
-            if (threadReused || (producerThread != null && !producerThread.isAlive() && !closedByProducer)) {
-                closedByConsumer = true;
-                throw new RiotException("Producer dead");
-            }
-
-            // Need to check this inside the loop as otherwise outside code that
-            // attempts to break the deadlock by causing close() on the iterator
-            // cannot do so
-            if (closedByConsumer)
-                throw new RiotException("Pipe closed");
-
-            // Need to check whether polling attempts have been exceeded
-            // If so declare the producer dead and exit
-            if (attempts >= this.maxPolls) {
-                closedByConsumer = true;
-                if (producerThread != null) {
-                    throw new RiotException(
-                            "Producer failed to produce any data within the specified number of polling attempts, declaring producer dead");
-                } else {
-                    throw new RiotException("Producer failed to ever call start(), declaring producer dead");
-                }
-            }
-        }
-
-        // When the end marker is seen set slot to null
-        if (slot == endMarker) {
-            finished = true;
-            slot = null;
-            return false;
-        }
-        return true;
-    }
-
-    @Override
-    public T next() {
-        if (!hasNext())
-            throw new NoSuchElementException();
-        T item = slot;
-        slot = null;
-        return item;
-    }
-
-    @Override
-    public void remove() {
-        throw new UnsupportedOperationException();
-    }
-
-    private void checkStateForReceive() {
-        if (closedByProducer || closedByConsumer) {
-            throw new RiotException("Pipe closed");
-        } else if (consumerThread != null && !consumerThread.isAlive()) {
-            throw new RiotException("Consumer dead");
-        }
-    }
-
-    protected void connect() {
-        this.connected = true;
-    }
-
-    protected void receive(T t) {
-        checkStateForReceive();
-        producerThread = Thread.currentThread();
-
-        try {
-            queue.put(t);
-        } catch (InterruptedException e) {
-            throw new CancellationException();
-        }
-    }
-
-    protected void base(String base) {
-        synchronized (lock) {
-            this.baseIri = base;
-        }
-    }
-
-    /**
-     * Gets the most recently seen Base IRI
-     * 
-     * @return Base IRI
-     */
-    public String getBaseIri() {
-        synchronized (lock) {
-            return baseIri;
-        }
-    }
-
-    protected void prefix(String prefix, String iri) {
-        synchronized (lock) {
-            prefixes.add(prefix, iri);
-        }
-    }
-
-    /**
-     * Gets the prefix map which contains the prefixes seen so far in the stream
-     * 
-     * @return Prefix Map
-     */
-    public PrefixMap getPrefixes() {
-        synchronized (lock) {
-            // Need to return a copy since PrefixMap is not concurrent
-            return PrefixMapFactory.create(this.prefixes);
-        }
-    }
-
-    /**
-     * Should be called by the producer when it begins writing to the iterator.
-     * If the producer fails to call this for whatever reason and never produces
-     * any output or calls {@code finish()} consumers may be blocked for a short
-     * period before they detect this state and error out.
-     */
-    protected void start() {
-        // Track the producer thread in case it never delivers us anything and
-        // dies before calling finish
-        producerThread = Thread.currentThread();
-    }
-
-    /**
-     * Should be called by the producer when it has finished writing to the
-     * iterator. If the producer fails to call this for whatever reason
-     * consumers may be blocked for a short period before they detect this state
-     * and error out.
-     */
-    protected void finish() {
-        if ( closedByProducer )
-            return ;
-        receive(endMarker);
-        closedByProducer = true;
-    }
-
-    /**
-     * May be called by the consumer when it is finished reading from the
-     * iterator, if the producer thread has not finished it will receive an
-     * error the next time it tries to write to the iterator
-     */
-    @Override
-    public void close() {
-        closedByConsumer = true;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.lang;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.jena.atlas.lib.Closeable;
+import org.apache.jena.riot.RiotException;
+import org.apache.jena.riot.system.PrefixMap;
+import org.apache.jena.riot.system.PrefixMapFactory;
+
+/**
+ * <p>
+ * A {@code PipedRDFIterator} should be connected to a {@link PipedRDFStream}
+ * implementation; the piped iterator then provides whatever RDF primitives are
+ * written to the {@code PipedRDFStream}
+ * </p>
+ * <p>
+ * Typically, data is read from a {@code PipedRDFIterator} by one thread (the
+ * consumer) and data is written to the corresponding {@code PipedRDFStream} by
+ * some other thread (the producer). Attempting to use both objects from a
+ * single thread is not recommended, as it may deadlock the thread. The
+ * {@code PipedRDFIterator} contains a buffer, decoupling read operations from
+ * write operations, within limits.
+ * </p>
+ * <p>
+ * Inspired by Java's {@link java.io.PipedInputStream} and
+ * {@link java.io.PipedOutputStream}
+ * </p>
+ * 
+ * @param <T>
+ *            The type of the RDF primitive, should be one of {@code Triple},
+ *            {@code Quad}, or {@code Tuple<Node>}
+ * 
+ * @see PipedTriplesStream
+ * @see PipedQuadsStream
+ * @see PipedTuplesStream
+ */
+public class PipedRDFIterator<T> implements Iterator<T>, Closeable {
+    /**
+     * Constant for default buffer size
+     */
+    public static final int DEFAULT_BUFFER_SIZE = 10000;
+
+    /**
+     * Constant for default poll timeout in milliseconds, used to stop the
+     * consumer deadlocking in certain circumstances
+     */
+    public static final int DEFAULT_POLL_TIMEOUT = 1000; // one second
+    /**
+     * Constant for max number of failed poll attempts before the producer will
+     * be declared as dead
+     */
+    public static final int DEFAULT_MAX_POLLS = 10;
+
+    private final BlockingQueue<T> queue;
+
+    @SuppressWarnings("unchecked")
+    private final T endMarker = (T) new Object();
+
+    private volatile boolean closedByConsumer = false;
+    private volatile boolean closedByProducer = false;
+    private volatile boolean finished = false;
+    private volatile boolean threadReused = false;
+    private volatile Thread consumerThread;
+    private volatile Thread producerThread;
+
+    private boolean connected = false;
+    private int pollTimeout = DEFAULT_POLL_TIMEOUT;
+    private int maxPolls = DEFAULT_MAX_POLLS;
+
+    private T slot;
+
+    private final Object lock = new Object(); // protects baseIri and prefixes
+    private String baseIri;
+    private final PrefixMap prefixes = PrefixMapFactory.createForInput();
+
+    /**
+     * Creates a new piped RDF iterator with the default buffer size of
+     * {@code DEFAULT_BUFFER_SIZE}.
+     * <p>
+     * Buffer size must be chosen carefully in order to avoid performance
+     * problems, if you set the buffer size too low you will experience a lot of
+     * blocked calls so it will take longer to consume the data from the
+     * iterator. For best performance the buffer size should be at least 10% of
+     * the expected input size though you may need to tune this depending on how
+     * fast your consumer thread is.
+     * </p>
+     */
+    public PipedRDFIterator() {
+        this(DEFAULT_BUFFER_SIZE);
+    }
+
+    /**
+     * Creates a new piped RDF iterator
+     * <p>
+     * Buffer size must be chosen carefully in order to avoid performance
+     * problems, if you set the buffer size too low you will experience a lot of
+     * blocked calls so it will take longer to consume the data from the
+     * iterator. For best performance the buffer size should be roughly 10% of
+     * the expected input size though you may need to tune this depending on how
+     * fast your consumer thread is.
+     * </p>
+     * 
+     * @param bufferSize
+     *            Buffer size
+     */
+    public PipedRDFIterator(int bufferSize) {
+        this(bufferSize, false, DEFAULT_POLL_TIMEOUT, DEFAULT_MAX_POLLS);
+    }
+
+    /**
+     * Creates a new piped RDF iterator
+     * <p>
+     * Buffer size must be chosen carefully in order to avoid performance
+     * problems, if you set the buffer size too low you will experience a lot of
+     * blocked calls so it will take longer to consume the data from the
+     * iterator. For best performance the buffer size should be roughly 10% of
+     * the expected input size though you may need to tune this depending on how
+     * fast your consumer thread is.
+     * </p>
+     * <p>
+     * The fair parameter controls whether the locking policy used for the
+     * buffer is fair. When enabled this reduces throughput but also reduces the
+     * chance of thread starvation. This likely need only be set to {@code true}
+     * if there will be multiple consumers.
+     * </p>
+     * 
+     * @param bufferSize
+     *            Buffer size
+     * @param fair
+     *            Whether the buffer should use a fair locking policy
+     */
+    public PipedRDFIterator(int bufferSize, boolean fair) {
+        this(bufferSize, fair, DEFAULT_POLL_TIMEOUT, DEFAULT_MAX_POLLS);
+    }
+
+    /**
+     * Creates a new piped RDF iterator
+     * <p>
+     * Buffer size must be chosen carefully in order to avoid performance
+     * problems, if you set the buffer size too low you will experience a lot of
+     * blocked calls so it will take longer to consume the data from the
+     * iterator. For best performance the buffer size should be roughly 10% of
+     * the expected input size though you may need to tune this depending on how
+     * fast your consumer thread is.
+     * </p>
+     * <p>
+     * The {@code fair} parameter controls whether the locking policy used for
+     * the buffer is fair. When enabled this reduces throughput but also reduces
+     * the chance of thread starvation. This likely need only be set to
+     * {@code true} if there will be multiple consumers.
+     * </p>
+     * <p>
+     * The {@code pollTimeout} parameter controls how long each poll attempt
+     * waits for data to be produced. This prevents the consumer thread from
+     * blocking indefinitely and allows it to detect various potential deadlock
+     * conditions e.g. dead producer thread, another consumer closed the
+     * iterator etc. and errors out accordingly. It is unlikely that you will
+     * ever need to adjust this from the default value provided by
+     * {@link #DEFAULT_POLL_TIMEOUT}.
+     * </p>
+     * <p>
+     * The {@code maxPolls} parameter controls how many poll attempts will be
+     * made by a single consumer thread within the context of a single call to
+     * {@link #hasNext()} before the iterator declares the producer to be dead
+     * and errors out accordingly. You may need to adjust this if you have a
+     * slow producer thread or many consumer threads.
+     * </p>
+     * 
+     * @param bufferSize
+     *            Buffer size
+     * @param fair
+     *            Whether the buffer should use a fair locking policy
+     * @param pollTimeout
+     *            Poll timeout in milliseconds
+     * @param maxPolls
+     *            Max poll attempts
+     */
+    public PipedRDFIterator(int bufferSize, boolean fair, int pollTimeout, int maxPolls) {
+        if (pollTimeout <= 0)
+            throw new IllegalArgumentException("Poll Timeout must be > 0");
+        if (maxPolls <= 0)
+            throw new IllegalArgumentException("Max Poll attempts must be > 0");
+        this.queue = new ArrayBlockingQueue<>(bufferSize, fair);
+        this.pollTimeout = pollTimeout;
+        this.maxPolls = maxPolls;
+    }
+
+    @Override
+    public boolean hasNext() {
+        if (!connected)
+            throw new IllegalStateException("Pipe not connected");
+
+        if (closedByConsumer)
+            throw new RiotException("Pipe closed");
+
+        if (finished)
+            return false;
+
+        consumerThread = Thread.currentThread();
+
+        // Depending on how code and/or the JVM schedules the threads involved
+        // there is a scenario that exists where a producer can finish/die
+        // before theconsumer is started and the consumer is scheduled onto the
+        // same thread thus resulting in a deadlock on the consumer because it
+        // will never be able to detect that the producer died
+        // In this scenario we need to set a special flag to indicate the
+        // possibility
+        if (producerThread != null && producerThread == consumerThread)
+            threadReused = true;
+
+        if (slot != null)
+            return true;
+
+        int attempts = 0;
+        while (true) {
+            attempts++;
+            try {
+                slot = queue.poll(this.pollTimeout, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                throw new CancellationException();
+            }
+
+            if (null != slot)
+                break;
+
+            // If the producer thread died and did not call finish() then
+            // declare this pipe to be "broken"
+            // Since check is after the break, we will drain as much as possible
+            // out of the queue before throwing this exception
+            if (threadReused || (producerThread != null && !producerThread.isAlive() && !closedByProducer)) {
+                closedByConsumer = true;
+                throw new RiotException("Producer dead");
+            }
+
+            // Need to check this inside the loop as otherwise outside code that
+            // attempts to break the deadlock by causing close() on the iterator
+            // cannot do so
+            if (closedByConsumer)
+                throw new RiotException("Pipe closed");
+
+            // Need to check whether polling attempts have been exceeded
+            // If so declare the producer dead and exit
+            if (attempts >= this.maxPolls) {
+                closedByConsumer = true;
+                if (producerThread != null) {
+                    throw new RiotException(
+                            "Producer failed to produce any data within the specified number of polling attempts, declaring producer dead");
+                } else {
+                    throw new RiotException("Producer failed to ever call start(), declaring producer dead");
+                }
+            }
+        }
+
+        // When the end marker is seen set slot to null
+        if (slot == endMarker) {
+            finished = true;
+            slot = null;
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public T next() {
+        if (!hasNext())
+            throw new NoSuchElementException();
+        T item = slot;
+        slot = null;
+        return item;
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+
+    private void checkStateForReceive() {
+        if (closedByProducer || closedByConsumer) {
+            throw new RiotException("Pipe closed");
+        } else if (consumerThread != null && !consumerThread.isAlive()) {
+            throw new RiotException("Consumer dead");
+        }
+    }
+
+    protected void connect() {
+        this.connected = true;
+    }
+
+    protected void receive(T t) {
+        checkStateForReceive();
+        producerThread = Thread.currentThread();
+
+        try {
+            queue.put(t);
+        } catch (InterruptedException e) {
+            throw new CancellationException();
+        }
+    }
+
+    protected void base(String base) {
+        synchronized (lock) {
+            this.baseIri = base;
+        }
+    }
+
+    /**
+     * Gets the most recently seen Base IRI
+     * 
+     * @return Base IRI
+     */
+    public String getBaseIri() {
+        synchronized (lock) {
+            return baseIri;
+        }
+    }
+
+    protected void prefix(String prefix, String iri) {
+        synchronized (lock) {
+            prefixes.add(prefix, iri);
+        }
+    }
+
+    /**
+     * Gets the prefix map which contains the prefixes seen so far in the stream
+     * 
+     * @return Prefix Map
+     */
+    public PrefixMap getPrefixes() {
+        synchronized (lock) {
+            // Need to return a copy since PrefixMap is not concurrent
+            return PrefixMapFactory.create(this.prefixes);
+        }
+    }
+
+    /**
+     * Should be called by the producer when it begins writing to the iterator.
+     * If the producer fails to call this for whatever reason and never produces
+     * any output or calls {@code finish()} consumers may be blocked for a short
+     * period before they detect this state and error out.
+     */
+    protected void start() {
+        // Track the producer thread in case it never delivers us anything and
+        // dies before calling finish
+        producerThread = Thread.currentThread();
+    }
+
+    /**
+     * Should be called by the producer when it has finished writing to the
+     * iterator. If the producer fails to call this for whatever reason
+     * consumers may be blocked for a short period before they detect this state
+     * and error out.
+     */
+    protected void finish() {
+        if ( closedByProducer )
+            return ;
+        receive(endMarker);
+        closedByProducer = true;
+    }
+
+    /**
+     * May be called by the consumer when it is finished reading from the
+     * iterator, if the producer thread has not finished it will receive an
+     * error the next time it tries to write to the iterator
+     */
+    @Override
+    public void close() {
+        closedByConsumer = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/d6ae87fd/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFStream.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFStream.java b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFStream.java
index 6406204..40877e4 100644
--- a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFStream.java
+++ b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedRDFStream.java
@@ -1,70 +1,70 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jena.riot.lang ;
-
-import org.apache.jena.riot.system.StreamRDF ;
-
-/**
- * Abstract implementation of a producer class that implements {@code StreamRDF};
- * use one of the concrete implementations that match the RDF primitive you are using.
- * @param <T> Type corresponding to a supported RDF primitive
- * 
- * @see PipedTriplesStream
- * @see PipedQuadsStream
- * @see PipedTuplesStream
- */
-public abstract class PipedRDFStream<T> implements StreamRDF
-{
-    private final PipedRDFIterator<T> sink ;
-
-    protected PipedRDFStream(PipedRDFIterator<T> sink)
-    {
-        this.sink = sink ;
-        this.sink.connect();
-    }
-
-    protected void receive(T t)
-    {
-        sink.receive(t) ;
-    }
-
-    @Override
-    public void base(String base)
-    {
-        sink.base(base) ;
-    }
-
-    @Override
-    public void prefix(String prefix, String iri)
-    {
-        sink.prefix(prefix, iri) ;
-    }
-
-    @Override
-    public void start()
-    {
-        sink.start() ;
-    }
-
-    @Override
-    public void finish()
-    {
-        sink.finish() ;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.lang ;
+
+import org.apache.jena.riot.system.StreamRDF ;
+
+/**
+ * Abstract implementation of a producer class that implements {@code StreamRDF};
+ * use one of the concrete implementations that match the RDF primitive you are using.
+ * @param <T> Type corresponding to a supported RDF primitive
+ * 
+ * @see PipedTriplesStream
+ * @see PipedQuadsStream
+ * @see PipedTuplesStream
+ */
+public abstract class PipedRDFStream<T> implements StreamRDF
+{
+    private final PipedRDFIterator<T> sink ;
+
+    protected PipedRDFStream(PipedRDFIterator<T> sink)
+    {
+        this.sink = sink ;
+        this.sink.connect();
+    }
+
+    protected void receive(T t)
+    {
+        sink.receive(t) ;
+    }
+
+    @Override
+    public void base(String base)
+    {
+        sink.base(base) ;
+    }
+
+    @Override
+    public void prefix(String prefix, String iri)
+    {
+        sink.prefix(prefix, iri) ;
+    }
+
+    @Override
+    public void start()
+    {
+        sink.start() ;
+    }
+
+    @Override
+    public void finish()
+    {
+        sink.finish() ;
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/d6ae87fd/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTriplesStream.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTriplesStream.java b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTriplesStream.java
index c5c2dfe..270d59e 100644
--- a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTriplesStream.java
+++ b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTriplesStream.java
@@ -1,53 +1,53 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jena.riot.lang ;
-
-import org.apache.jena.graph.Triple ;
-import org.apache.jena.riot.system.StreamRDF ;
-import org.apache.jena.sparql.core.Quad ;
-
-/**
- * Implementation of a producer class that sends Triples; must be connected to a {@code PipedRDFIterator<Triple>}. 
- */
-public class PipedTriplesStream extends PipedRDFStream<Triple> implements StreamRDF
-{
-    /**
-     * Creates a piped triples stream connected to the specified piped 
-     * RDF iterator.  Triples written to this stream will then be 
-     * available as input from <code>sink</code>.
-     *
-     * @param sink The piped RDF iterator to connect to.
-     */
-    public PipedTriplesStream(PipedRDFIterator<Triple> sink)
-    {
-        super(sink) ;
-    }
-
-    @Override
-    public void triple(Triple triple)
-    {
-        receive(triple) ;
-    }
-
-    @Override
-    public void quad(Quad quad)
-    {
-        // Quads are discarded
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.lang ;
+
+import org.apache.jena.graph.Triple ;
+import org.apache.jena.riot.system.StreamRDF ;
+import org.apache.jena.sparql.core.Quad ;
+
+/**
+ * Implementation of a producer class that sends Triples; must be connected to a {@code PipedRDFIterator<Triple>}. 
+ */
+public class PipedTriplesStream extends PipedRDFStream<Triple> implements StreamRDF
+{
+    /**
+     * Creates a piped triples stream connected to the specified piped 
+     * RDF iterator.  Triples written to this stream will then be 
+     * available as input from <code>sink</code>.
+     *
+     * @param sink The piped RDF iterator to connect to.
+     */
+    public PipedTriplesStream(PipedRDFIterator<Triple> sink)
+    {
+        super(sink) ;
+    }
+
+    @Override
+    public void triple(Triple triple)
+    {
+        receive(triple) ;
+    }
+
+    @Override
+    public void quad(Quad quad)
+    {
+        // Quads are discarded
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/d6ae87fd/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTuplesStream.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTuplesStream.java b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTuplesStream.java
index 4bdb728..f1a63d3 100644
--- a/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTuplesStream.java
+++ b/jena-arq/src/main/java/org/apache/jena/riot/lang/PipedTuplesStream.java
@@ -1,55 +1,55 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jena.riot.lang ;
-
-import org.apache.jena.atlas.lib.tuple.Tuple ;
-import org.apache.jena.graph.Node ;
-import org.apache.jena.graph.Triple ;
-import org.apache.jena.riot.system.StreamRDF ;
-import org.apache.jena.sparql.core.Quad ;
-
-/**
- * Implementation of a producer class that sends @{code Tuple<Node>}; must be connected to a {@code PipedRDFIterator<Tuple<Node>}. 
- */
-public class PipedTuplesStream extends PipedRDFStream<Tuple<Node>> implements StreamRDF
-{
-    /**
-     * Creates a piped tuples stream connected to the specified piped 
-     * RDF iterator.  Tuples written to this stream will then be 
-     * available as input from <code>sink</code>.
-     *
-     * @param sink The piped RDF iterator to connect to.
-     */
-    public PipedTuplesStream(PipedRDFIterator<Tuple<Node>> sink)
-    {
-        super(sink) ;
-    }
-
-    @Override
-    public void triple(Triple triple)
-    {
-        // Triples are discarded
-    }
-
-    @Override
-    public void quad(Quad quad)
-    {
-        // Quads are discarded
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.lang ;
+
+import org.apache.jena.atlas.lib.tuple.Tuple ;
+import org.apache.jena.graph.Node ;
+import org.apache.jena.graph.Triple ;
+import org.apache.jena.riot.system.StreamRDF ;
+import org.apache.jena.sparql.core.Quad ;
+
+/**
+ * Implementation of a producer class that sends @{code Tuple<Node>}; must be connected to a {@code PipedRDFIterator<Tuple<Node>}. 
+ */
+public class PipedTuplesStream extends PipedRDFStream<Tuple<Node>> implements StreamRDF
+{
+    /**
+     * Creates a piped tuples stream connected to the specified piped 
+     * RDF iterator.  Tuples written to this stream will then be 
+     * available as input from <code>sink</code>.
+     *
+     * @param sink The piped RDF iterator to connect to.
+     */
+    public PipedTuplesStream(PipedRDFIterator<Tuple<Node>> sink)
+    {
+        super(sink) ;
+    }
+
+    @Override
+    public void triple(Triple triple)
+    {
+        // Triples are discarded
+    }
+
+    @Override
+    public void quad(Quad quad)
+    {
+        // Quads are discarded
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/d6ae87fd/jena-arq/src/main/java/org/apache/jena/riot/out/SinkQuadBracedOutput.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/riot/out/SinkQuadBracedOutput.java b/jena-arq/src/main/java/org/apache/jena/riot/out/SinkQuadBracedOutput.java
index 0841c50..d27e46c 100644
--- a/jena-arq/src/main/java/org/apache/jena/riot/out/SinkQuadBracedOutput.java
+++ b/jena-arq/src/main/java/org/apache/jena/riot/out/SinkQuadBracedOutput.java
@@ -1,137 +1,137 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jena.riot.out ;
-
-import java.io.OutputStream ;
-import java.util.Objects;
-
-import org.apache.jena.atlas.io.IndentedWriter ;
-import org.apache.jena.atlas.lib.Closeable ;
-import org.apache.jena.atlas.lib.Sink ;
-import org.apache.jena.graph.Node ;
-import org.apache.jena.graph.Triple ;
-import org.apache.jena.sparql.core.Quad ;
-import org.apache.jena.sparql.serializer.SerializationContext ;
-import org.apache.jena.sparql.util.FmtUtils ;
-
-/**
- * A class that print quads, SPARQL style (maybe good for Trig too?)
- */
-public class SinkQuadBracedOutput implements Sink<Quad>, Closeable
-{
-    protected static final int           BLOCK_INDENT = 2 ;
-
-    protected final IndentedWriter       out ;
-    protected final SerializationContext sCxt ;
-    protected boolean                    opened       = false ;
-
-    protected Node                       currentGraph ;
-
-    public SinkQuadBracedOutput(OutputStream out) {
-        this(out, null) ;
-    }
-
-    public SinkQuadBracedOutput(OutputStream out, SerializationContext sCxt) {
-        this(new IndentedWriter(out), sCxt) ;
-    }
-
-    public SinkQuadBracedOutput(IndentedWriter out, SerializationContext sCxt) {
-        if ( out == null ) { throw new IllegalArgumentException("out may not be null") ; }
-
-        if ( sCxt == null ) {
-            sCxt = new SerializationContext() ;
-        }
-
-        this.out = out ;
-        this.sCxt = sCxt ;
-    }
-
-    public void open() {
-        out.println("{") ;
-        out.incIndent(BLOCK_INDENT) ;
-        opened = true ;
-    }
-
-    private void checkOpen() {
-        if ( !opened ) { throw new IllegalStateException("SinkQuadBracedOutput is not opened.  Call open() first.") ; }
-    }
-
-    @Override
-    public void send(Quad quad) {
-        send(quad.getGraph(), quad.asTriple()) ;
-    }
-
-    public void send(Node graphName, Triple triple) {
-        checkOpen() ;
-        if ( Quad.isDefaultGraph(graphName) ) {
-            graphName = null ;
-        }
-
-        if ( !Objects.equals(currentGraph, graphName) ) {
-            if ( null != currentGraph ) {
-                out.decIndent(BLOCK_INDENT) ;
-                out.println("}") ;
-            }
-
-            if ( null != graphName ) {
-                out.print("GRAPH ") ;
-                output(graphName) ;
-                out.println(" {") ;
-                out.incIndent(BLOCK_INDENT) ;
-            }
-        }
-
-        output(triple) ;
-        out.println(" .") ;
-
-        currentGraph = graphName ;
-    }
-
-    private void output(Node node) {
-        String n = FmtUtils.stringForNode(node, sCxt) ;
-        out.print(n) ;
-    }
-
-    private void output(Triple triple) {
-        String ts = FmtUtils.stringForTriple(triple, sCxt) ;
-        out.print(ts) ;
-    }
-
-    @Override
-    public void flush() {
-        out.flush() ;
-    }
-
-    @Override
-    public void close() {
-        if ( opened ) {
-            if ( null != currentGraph ) {
-                out.decIndent(BLOCK_INDENT) ;
-                out.println("}") ;
-            }
-
-            out.decIndent(BLOCK_INDENT) ;
-            out.print("}") ;
-
-            // Since we didn't create the OutputStream, we'll just flush it
-            flush() ;
-            opened = false ;
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.riot.out ;
+
+import java.io.OutputStream ;
+import java.util.Objects;
+
+import org.apache.jena.atlas.io.IndentedWriter ;
+import org.apache.jena.atlas.lib.Closeable ;
+import org.apache.jena.atlas.lib.Sink ;
+import org.apache.jena.graph.Node ;
+import org.apache.jena.graph.Triple ;
+import org.apache.jena.sparql.core.Quad ;
+import org.apache.jena.sparql.serializer.SerializationContext ;
+import org.apache.jena.sparql.util.FmtUtils ;
+
+/**
+ * A class that print quads, SPARQL style (maybe good for Trig too?)
+ */
+public class SinkQuadBracedOutput implements Sink<Quad>, Closeable
+{
+    protected static final int           BLOCK_INDENT = 2 ;
+
+    protected final IndentedWriter       out ;
+    protected final SerializationContext sCxt ;
+    protected boolean                    opened       = false ;
+
+    protected Node                       currentGraph ;
+
+    public SinkQuadBracedOutput(OutputStream out) {
+        this(out, null) ;
+    }
+
+    public SinkQuadBracedOutput(OutputStream out, SerializationContext sCxt) {
+        this(new IndentedWriter(out), sCxt) ;
+    }
+
+    public SinkQuadBracedOutput(IndentedWriter out, SerializationContext sCxt) {
+        if ( out == null ) { throw new IllegalArgumentException("out may not be null") ; }
+
+        if ( sCxt == null ) {
+            sCxt = new SerializationContext() ;
+        }
+
+        this.out = out ;
+        this.sCxt = sCxt ;
+    }
+
+    public void open() {
+        out.println("{") ;
+        out.incIndent(BLOCK_INDENT) ;
+        opened = true ;
+    }
+
+    private void checkOpen() {
+        if ( !opened ) { throw new IllegalStateException("SinkQuadBracedOutput is not opened.  Call open() first.") ; }
+    }
+
+    @Override
+    public void send(Quad quad) {
+        send(quad.getGraph(), quad.asTriple()) ;
+    }
+
+    public void send(Node graphName, Triple triple) {
+        checkOpen() ;
+        if ( Quad.isDefaultGraph(graphName) ) {
+            graphName = null ;
+        }
+
+        if ( !Objects.equals(currentGraph, graphName) ) {
+            if ( null != currentGraph ) {
+                out.decIndent(BLOCK_INDENT) ;
+                out.println("}") ;
+            }
+
+            if ( null != graphName ) {
+                out.print("GRAPH ") ;
+                output(graphName) ;
+                out.println(" {") ;
+                out.incIndent(BLOCK_INDENT) ;
+            }
+        }
+
+        output(triple) ;
+        out.println(" .") ;
+
+        currentGraph = graphName ;
+    }
+
+    private void output(Node node) {
+        String n = FmtUtils.stringForNode(node, sCxt) ;
+        out.print(n) ;
+    }
+
+    private void output(Triple triple) {
+        String ts = FmtUtils.stringForTriple(triple, sCxt) ;
+        out.print(ts) ;
+    }
+
+    @Override
+    public void flush() {
+        out.flush() ;
+    }
+
+    @Override
+    public void close() {
+        if ( opened ) {
+            if ( null != currentGraph ) {
+                out.decIndent(BLOCK_INDENT) ;
+                out.println("}") ;
+            }
+
+            out.decIndent(BLOCK_INDENT) ;
+            out.print("}") ;
+
+            // Since we didn't create the OutputStream, we'll just flush it
+            flush() ;
+            opened = false ;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/d6ae87fd/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/HashIndexTable.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/HashIndexTable.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/HashIndexTable.java
index 0fb4f6e..4d0d414 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/HashIndexTable.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/HashIndexTable.java
@@ -1,218 +1,218 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jena.sparql.engine.index;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.jena.graph.Node ;
-import org.apache.jena.sparql.core.Var ;
-import org.apache.jena.sparql.engine.QueryIterator ;
-import org.apache.jena.sparql.engine.binding.Binding ;
-
-/**
- * Indexes bindings so that they can be search for quickly when a binding to all the
- * variables is provided. If a binding to only some of the known variables is provided
- * then the index still works, but will search linearly.
- */
-public class HashIndexTable implements IndexTable {
-    // Contribution from P Gearon (@quoll)
-	final private Set<Key> table ;
-	private Map<Var,Integer> varColumns ;
-	private boolean missingValue ;
-
-	public HashIndexTable(Set<Var> commonVars, QueryIterator data) throws MissingBindingException
-    {
-    	initColumnMappings(commonVars) ;
-    	if ( commonVars.size() == 0 )
-    	{
-    		table = null ;
-    		return ;
-    	}
-
-    	table = new HashSet<>() ;
-    	missingValue = false ;
-
-    	while ( data.hasNext() )
-        {
-            Binding binding = data.nextBinding() ;
-            addBindingToTable(binding) ;
-        }
-    	data.close() ;
-    }
-
-    @Override
-	public boolean containsCompatibleWithSharedDomain(Binding binding)
-    {
-    	// no shared variables means no shared domain, and should be ignored
-    	if ( table == null )
-    		return false ;
-
-    	Key indexKey ;
-		indexKey = convertToKey(binding) ;
-
-		if ( table.contains(indexKey) )
-			return true ;
-		
-		if ( anyUnbound(indexKey) )
-			return exhaustiveSearch(indexKey) ;
-		return false ;
-    }
-
-    private boolean anyUnbound(Key mappedBinding)
-    {
-    	for ( Node n: mappedBinding.getNodes() )
-    	{
-    		if ( n == null )
-    			return true ;
-    	}
-    	return false ;
-    }
-
-    private void initColumnMappings(Set<Var> commonVars)
-    {
-    	varColumns = new HashMap<>() ;
-    	int c = 0 ;
-    	for ( Var var: commonVars )
-    		varColumns.put(var, c++) ;
-    }
-
-    private void addBindingToTable(Binding binding) throws MissingBindingException
-    {
-    	Key key = convertToKey(binding) ;
-		table.add(key) ;
-		if ( missingValue )
-			throw new MissingBindingException(table, varColumns) ;
-    }
-
-    private Key convertToKey(Binding binding)
-    {
-		Node[] indexKey = new Node[varColumns.size()] ;
-
-		for ( Map.Entry<Var,Integer> varCol : varColumns.entrySet() )
-		{
-			Node value = binding.get(varCol.getKey()) ;
-			if ( value == null )
-				missingValue = true ;
-			indexKey[varCol.getValue()] = value ;
-		}
-		return new Key(indexKey) ;
-    }
-
-    private boolean exhaustiveSearch(Key mappedBindingLeft)
-    {
-    	for ( Key mappedBindingRight: table )
-    	{
-    		if ( mappedBindingLeft.compatibleAndSharedDomain(mappedBindingRight) )
-    			return true ;
-    	}
-    	return false ;
-    }
-
-    static class MissingBindingException extends Exception {
-    	private final Set<Key> data ;
-    	private final Map<Var,Integer> varMappings ;
-
-    	public MissingBindingException(Set<Key> data, Map<Var,Integer> varMappings)
-    	{
-    		this.data = data ;
-    		this.varMappings = varMappings ;
-    	}
-
-    	public Set<Key> getData() { return data ; }
-    	public Map<Var,Integer> getMap() { return varMappings ; }
-    }
-    
-    static class Key
-    {
-    	final Node[] nodes;
-
-    	Key(Node[] nodes)
-    	{
-    		this.nodes = nodes ;
-    	}
-
-    	public Node[] getNodes()
-    	{
-    		return nodes;
-    	}
-
-    	@Override
-		public String toString()
-    	{
-    		return Arrays.asList(nodes).toString() ;
-    	}
-
-    	@Override
-		public int hashCode()
-    	{
-    		int result = 0 ;
-    		for ( Node n: nodes )
-    			result ^= (n == null) ? 0 : n.hashCode() ;
-    		return result ;
-    	}
-    	
-    	@Override
-		public boolean equals(Object o)
-    	{
-    		if ( ! (o instanceof Key) )
-    			return false ;
-    		Node[] other = ((Key)o).nodes ;
-
-    		for ( int i = 0 ; i < nodes.length ; i++ )
-    		{
-    			if ( nodes[i] == null)
-    			{
-    				if ( other[i] != null )
-        				return false ;
-    			}
-    			else
-    			{
-	    			if ( ! nodes[i].equals(other[i]) )
-	    				return false ;
-    			}
-    		}
-    		return true ;
-    	}
-
-        public boolean compatibleAndSharedDomain(Key mappedBindingR)
-        {
-        	Node[] nodesRight = mappedBindingR.getNodes() ;
-
-        	boolean sharedDomain = false ;
-        	for ( int c = 0 ; c < nodes.length ; c++ )
-            {
-                Node nLeft  = nodes[c] ; 
-                Node nRight = nodesRight[c] ;
-                
-                if ( nLeft != null && nRight != null )
-            	{
-            		if ( nLeft.equals(nRight) )
-            			return false ;
-            		sharedDomain = true ;
-            	}
-            }
-            return sharedDomain ;
-        }
-    }
-}
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.sparql.engine.index;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.jena.graph.Node ;
+import org.apache.jena.sparql.core.Var ;
+import org.apache.jena.sparql.engine.QueryIterator ;
+import org.apache.jena.sparql.engine.binding.Binding ;
+
+/**
+ * Indexes bindings so that they can be search for quickly when a binding to all the
+ * variables is provided. If a binding to only some of the known variables is provided
+ * then the index still works, but will search linearly.
+ */
+public class HashIndexTable implements IndexTable {
+    // Contribution from P Gearon (@quoll)
+	final private Set<Key> table ;
+	private Map<Var,Integer> varColumns ;
+	private boolean missingValue ;
+
+	public HashIndexTable(Set<Var> commonVars, QueryIterator data) throws MissingBindingException
+    {
+    	initColumnMappings(commonVars) ;
+    	if ( commonVars.size() == 0 )
+    	{
+    		table = null ;
+    		return ;
+    	}
+
+    	table = new HashSet<>() ;
+    	missingValue = false ;
+
+    	while ( data.hasNext() )
+        {
+            Binding binding = data.nextBinding() ;
+            addBindingToTable(binding) ;
+        }
+    	data.close() ;
+    }
+
+    @Override
+	public boolean containsCompatibleWithSharedDomain(Binding binding)
+    {
+    	// no shared variables means no shared domain, and should be ignored
+    	if ( table == null )
+    		return false ;
+
+    	Key indexKey ;
+		indexKey = convertToKey(binding) ;
+
+		if ( table.contains(indexKey) )
+			return true ;
+		
+		if ( anyUnbound(indexKey) )
+			return exhaustiveSearch(indexKey) ;
+		return false ;
+    }
+
+    private boolean anyUnbound(Key mappedBinding)
+    {
+    	for ( Node n: mappedBinding.getNodes() )
+    	{
+    		if ( n == null )
+    			return true ;
+    	}
+    	return false ;
+    }
+
+    private void initColumnMappings(Set<Var> commonVars)
+    {
+    	varColumns = new HashMap<>() ;
+    	int c = 0 ;
+    	for ( Var var: commonVars )
+    		varColumns.put(var, c++) ;
+    }
+
+    private void addBindingToTable(Binding binding) throws MissingBindingException
+    {
+    	Key key = convertToKey(binding) ;
+		table.add(key) ;
+		if ( missingValue )
+			throw new MissingBindingException(table, varColumns) ;
+    }
+
+    private Key convertToKey(Binding binding)
+    {
+		Node[] indexKey = new Node[varColumns.size()] ;
+
+		for ( Map.Entry<Var,Integer> varCol : varColumns.entrySet() )
+		{
+			Node value = binding.get(varCol.getKey()) ;
+			if ( value == null )
+				missingValue = true ;
+			indexKey[varCol.getValue()] = value ;
+		}
+		return new Key(indexKey) ;
+    }
+
+    private boolean exhaustiveSearch(Key mappedBindingLeft)
+    {
+    	for ( Key mappedBindingRight: table )
+    	{
+    		if ( mappedBindingLeft.compatibleAndSharedDomain(mappedBindingRight) )
+    			return true ;
+    	}
+    	return false ;
+    }
+
+    static class MissingBindingException extends Exception {
+    	private final Set<Key> data ;
+    	private final Map<Var,Integer> varMappings ;
+
+    	public MissingBindingException(Set<Key> data, Map<Var,Integer> varMappings)
+    	{
+    		this.data = data ;
+    		this.varMappings = varMappings ;
+    	}
+
+    	public Set<Key> getData() { return data ; }
+    	public Map<Var,Integer> getMap() { return varMappings ; }
+    }
+    
+    static class Key
+    {
+    	final Node[] nodes;
+
+    	Key(Node[] nodes)
+    	{
+    		this.nodes = nodes ;
+    	}
+
+    	public Node[] getNodes()
+    	{
+    		return nodes;
+    	}
+
+    	@Override
+		public String toString()
+    	{
+    		return Arrays.asList(nodes).toString() ;
+    	}
+
+    	@Override
+		public int hashCode()
+    	{
+    		int result = 0 ;
+    		for ( Node n: nodes )
+    			result ^= (n == null) ? 0 : n.hashCode() ;
+    		return result ;
+    	}
+    	
+    	@Override
+		public boolean equals(Object o)
+    	{
+    		if ( ! (o instanceof Key) )
+    			return false ;
+    		Node[] other = ((Key)o).nodes ;
+
+    		for ( int i = 0 ; i < nodes.length ; i++ )
+    		{
+    			if ( nodes[i] == null)
+    			{
+    				if ( other[i] != null )
+        				return false ;
+    			}
+    			else
+    			{
+	    			if ( ! nodes[i].equals(other[i]) )
+	    				return false ;
+    			}
+    		}
+    		return true ;
+    	}
+
+        public boolean compatibleAndSharedDomain(Key mappedBindingR)
+        {
+        	Node[] nodesRight = mappedBindingR.getNodes() ;
+
+        	boolean sharedDomain = false ;
+        	for ( int c = 0 ; c < nodes.length ; c++ )
+            {
+                Node nLeft  = nodes[c] ; 
+                Node nRight = nodesRight[c] ;
+                
+                if ( nLeft != null && nRight != null )
+            	{
+            		if ( nLeft.equals(nRight) )
+            			return false ;
+            		sharedDomain = true ;
+            	}
+            }
+            return sharedDomain ;
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/jena/blob/d6ae87fd/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexFactory.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexFactory.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexFactory.java
index 2593a54..5828a6b 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexFactory.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexFactory.java
@@ -1,45 +1,45 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jena.sparql.engine.index;
-
-import java.util.Set;
-
-import org.apache.jena.sparql.core.Var ;
-import org.apache.jena.sparql.engine.QueryIterator ;
-import org.apache.jena.sparql.engine.index.HashIndexTable.MissingBindingException ;
-import org.apache.jena.sparql.engine.iterator.QueryIterMinus ;
-
-/**
- * Creates {@link IndexTable}s for use by
- * {@link QueryIterMinus}.
- */
-public class IndexFactory {
-    // Contribution from P Gearon (@quoll)
-    public static IndexTable createIndex(Set<Var> commonVars, QueryIterator data) {
-        try {
-            if (commonVars.size() == 1) {
-                return new SetIndexTable(commonVars, data);
-            } else {
-                return new HashIndexTable(commonVars, data);
-            }
-        } catch (MissingBindingException e) {
-            return new LinearIndex(commonVars, data, e.getData(), e.getMap());
-        }
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.sparql.engine.index;
+
+import java.util.Set;
+
+import org.apache.jena.sparql.core.Var ;
+import org.apache.jena.sparql.engine.QueryIterator ;
+import org.apache.jena.sparql.engine.index.HashIndexTable.MissingBindingException ;
+import org.apache.jena.sparql.engine.iterator.QueryIterMinus ;
+
+/**
+ * Creates {@link IndexTable}s for use by
+ * {@link QueryIterMinus}.
+ */
+public class IndexFactory {
+    // Contribution from P Gearon (@quoll)
+    public static IndexTable createIndex(Set<Var> commonVars, QueryIterator data) {
+        try {
+            if (commonVars.size() == 1) {
+                return new SetIndexTable(commonVars, data);
+            } else {
+                return new HashIndexTable(commonVars, data);
+            }
+        } catch (MissingBindingException e) {
+            return new LinearIndex(commonVars, data, e.getData(), e.getMap());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/d6ae87fd/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexTable.java
----------------------------------------------------------------------
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexTable.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexTable.java
index 9b18f4d..5aa6e8a 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexTable.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/index/IndexTable.java
@@ -1,32 +1,32 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.jena.sparql.engine.index;
-
-import org.apache.jena.sparql.engine.binding.Binding ;
-
-/**
- * Interface for indexes that are used for identifying matching
- * {@link org.apache.jena.sparql.engine.binding.Binding}s when
- * {@link org.apache.jena.sparql.engine.iterator.QueryIterMinus} is trying to determine
- * which Bindings need to be removed.
- */
-public interface IndexTable {
-    // Contribution from P Gearon
-	public abstract boolean containsCompatibleWithSharedDomain(Binding binding);
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.sparql.engine.index;
+
+import org.apache.jena.sparql.engine.binding.Binding ;
+
+/**
+ * Interface for indexes that are used for identifying matching
+ * {@link org.apache.jena.sparql.engine.binding.Binding}s when
+ * {@link org.apache.jena.sparql.engine.iterator.QueryIterMinus} is trying to determine
+ * which Bindings need to be removed.
+ */
+public interface IndexTable {
+    // Contribution from P Gearon
+	public abstract boolean containsCompatibleWithSharedDomain(Binding binding);
+}


Mime
View raw message