jena-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rve...@apache.org
Subject [2/2] jena git commit: Support RDF Thrift as an input format
Date Tue, 11 Nov 2014 12:59:50 GMT
Support RDF Thrift as an input format

Supported for both triples and quads.

Refactors input format tests to prepare test data using OutputStream
instead of Writer.


Project: http://git-wip-us.apache.org/repos/asf/jena/repo
Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/f08fff2a
Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/f08fff2a
Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/f08fff2a

Branch: refs/heads/hadoop-rdf
Commit: f08fff2a98db7f97d05b0ad1181afe1851848dac
Parents: 3ccab77
Author: Rob Vesse <rvesse@apache.org>
Authored: Tue Nov 11 12:59:00 2014 +0000
Committer: Rob Vesse <rvesse@apache.org>
Committed: Tue Nov 11 12:59:00 2014 +0000

----------------------------------------------------------------------
 .../input/readers/thrift/ThriftQuadReader.java  |   32 +
 .../readers/thrift/ThriftTripleReader.java      |   30 +
 .../io/input/thrift/ThriftQuadInputFormat.java  |   39 +
 .../input/thrift/ThriftTripleInputFormat.java   |   39 +
 .../AbstractNodeTupleInputFormatTests.java      | 1174 +++++++++---------
 .../io/input/AbstractQuadsInputFormatTests.java |   33 +-
 .../input/AbstractTriplesInputFormatTests.java  |   34 +-
 .../AbstractWholeFileQuadInputFormatTests.java  |   44 +-
 ...AbstractWholeFileTripleInputFormatTests.java |   46 +-
 ...ractCompressedNodeTupleInputFormatTests.java |   12 +-
 ...AbstractCompressedQuadsInputFormatTests.java |   33 +-
 ...stractCompressedTriplesInputFormatTests.java |   33 +-
 ...CompressedWholeFileQuadInputFormatTests.java |   53 +-
 ...mpressedWholeFileTripleInputFormatTests.java |  245 ++--
 .../io/input/thrift/ThriftQuadInputTest.java    |   51 +
 .../io/input/thrift/ThriftTripleInputTest.java  |   51 +
 16 files changed, 1108 insertions(+), 841 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftQuadReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftQuadReader.java b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftQuadReader.java
new file mode 100644
index 0000000..084b1ec
--- /dev/null
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftQuadReader.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers.thrift;
+
+import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileQuadReader;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFLanguages;
+
+public class ThriftQuadReader extends AbstractWholeFileQuadReader {
+
+    @Override
+    protected Lang getRdfLanguage() {
+        return RDFLanguages.THRIFT;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftTripleReader.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftTripleReader.java b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftTripleReader.java
new file mode 100644
index 0000000..713bfa7
--- /dev/null
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/readers/thrift/ThriftTripleReader.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.readers.thrift;
+
+import org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileTripleReader;
+import org.apache.jena.riot.Lang;
+import org.apache.jena.riot.RDFLanguages;
+
+public class ThriftTripleReader extends AbstractWholeFileTripleReader {
+    @Override
+    protected Lang getRdfLanguage() {
+        return RDFLanguages.THRIFT;
+    }
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftQuadInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftQuadInputFormat.java b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftQuadInputFormat.java
new file mode 100644
index 0000000..f75542a
--- /dev/null
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftQuadInputFormat.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.thrift;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.thrift.ThriftQuadReader;
+import org.apache.jena.hadoop.rdf.types.QuadWritable;
+
+public class ThriftQuadInputFormat extends AbstractWholeFileInputFormat<LongWritable, QuadWritable> {
+
+    @Override
+    public RecordReader<LongWritable, QuadWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        return new ThriftQuadReader();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftTripleInputFormat.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftTripleInputFormat.java b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftTripleInputFormat.java
new file mode 100644
index 0000000..b60380d
--- /dev/null
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/main/java/org/apache/jena/hadoop/rdf/io/input/thrift/ThriftTripleInputFormat.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *     
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jena.hadoop.rdf.io.input.thrift;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.jena.hadoop.rdf.io.input.AbstractWholeFileInputFormat;
+import org.apache.jena.hadoop.rdf.io.input.readers.thrift.ThriftTripleReader;
+import org.apache.jena.hadoop.rdf.types.TripleWritable;
+
+public class ThriftTripleInputFormat extends AbstractWholeFileInputFormat<LongWritable, TripleWritable> {
+
+    @Override
+    public RecordReader<LongWritable, TripleWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
+            throws IOException, InterruptedException {
+        return new ThriftTripleReader();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractNodeTupleInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractNodeTupleInputFormatTests.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractNodeTupleInputFormatTests.java
index ef1b8d3..e22650f 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractNodeTupleInputFormatTests.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractNodeTupleInputFormatTests.java
@@ -16,591 +16,597 @@
  * limitations under the License.
  */
 
-package org.apache.jena.hadoop.rdf.io.input;
-
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.Writer;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
-import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+package org.apache.jena.hadoop.rdf.io.input;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.jena.hadoop.rdf.io.HadoopIOConstants;
 import org.apache.jena.hadoop.rdf.io.RdfIOConstants;
 import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Abstract node tuple input format tests
- * 
- * 
- * 
- * @param <TValue>
- * @param <T>
- */
-public abstract class AbstractNodeTupleInputFormatTests<TValue, T extends AbstractNodeTupleWritable<TValue>> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractNodeTupleInputFormatTests.class);
-
-    protected static final int EMPTY_SIZE = 0, SMALL_SIZE = 100, LARGE_SIZE = 10000, BAD_SIZE = 100, MIXED_SIZE = 100;
-    protected static final String EMPTY = "empty";
-    protected static final String SMALL = "small";
-    protected static final String LARGE = "large";
-    protected static final String BAD = "bad";
-    protected static final String MIXED = "mixed";
-
-    /**
-     * Temporary folder for the tests
-     */
-    @Rule
-    public TemporaryFolder folder = new TemporaryFolder();
-
-    protected File empty, small, large, bad, mixed;
-
-    /**
-     * Prepares the inputs for the tests
-     * 
-     * @throws IOException
-     */
-    @Before
-    public void beforeTest() throws IOException {
-        this.prepareInputs();
-    }
-
-    /**
-     * Cleans up the inputs after each test
-     */
-    @After
-    public void afterTest() {
-        // Should be unnecessary since JUnit will clean up the temporary folder
-        // anyway but best to do this regardless
-        empty.delete();
-        small.delete();
-        large.delete();
-        bad.delete();
-        mixed.delete();
-    }
-
-    /**
-     * Prepares a fresh configuration
-     * 
-     * @return Configuration
-     */
-    protected Configuration prepareConfiguration() {
-        Configuration config = new Configuration(true);
-        // Nothing else to do
-        return config;
-    }
-
-    /**
-     * Prepares the inputs
-     * 
-     * @throws IOException
-     */
-    protected void prepareInputs() throws IOException {
-        String ext = this.getFileExtension();
-        empty = folder.newFile(EMPTY + ext);
-        this.generateTuples(empty, EMPTY_SIZE);
-        small = folder.newFile(SMALL + ext);
-        this.generateTuples(small, SMALL_SIZE);
-        large = folder.newFile(LARGE + ext);
-        this.generateTuples(large, LARGE_SIZE);
-        bad = folder.newFile(BAD + ext);
-        this.generateBadTuples(bad, BAD_SIZE);
-        mixed = folder.newFile(MIXED + ext);
-        this.generateMixedTuples(mixed, MIXED_SIZE);
-    }
-
-    /**
-     * Gets the extra file extension to add to the filenames
-     * 
-     * @return File extension
-     */
-    protected abstract String getFileExtension();
-
-    /**
-     * Generates tuples used for tests
-     * 
-     * @param f
-     *            File
-     * @param num
-     *            Number of tuples to generate
-     * @throws IOException
-     */
-    protected final void generateTuples(File f, int num) throws IOException {
-        this.generateTuples(this.getWriter(f), num);
-    }
-
-    /**
-     * Gets the writer to use for generating tuples
-     * 
-     * @param f
-     *            File
-     * @return Writer
-     * @throws IOException
-     */
-    protected Writer getWriter(File f) throws IOException {
-        return new FileWriter(f, false);
-    }
-
-    /**
-     * Generates tuples used for tests
-     * 
-     * @param writer
-     *            Writer to write to
-     * @param num
-     *            Number of tuples to generate
-     * @throws IOException
-     */
-    protected abstract void generateTuples(Writer writer, int num) throws IOException;
-
-    /**
-     * Generates bad tuples used for tests
-     * 
-     * @param f
-     *            File
-     * @param num
-     *            Number of bad tuples to generate
-     * @throws IOException
-     */
-    protected final void generateBadTuples(File f, int num) throws IOException {
-        this.generateBadTuples(this.getWriter(f), num);
-    }
-
-    /**
-     * Generates bad tuples used for tests
-     * 
-     * @param writer
-     *            Writer to write to
-     * @param num
-     *            Number of bad tuples to generate
-     * @throws IOException
-     */
-    protected abstract void generateBadTuples(Writer writer, int num) throws IOException;
-
-    /**
-     * Generates a mixture of good and bad tuples used for tests
-     * 
-     * @param f
-     *            File
-     * @param num
-     *            Number of tuples to generate, they should be a 50/50 mix of
-     *            good and bad tuples
-     * @throws IOException
-     */
-    protected final void generateMixedTuples(File f, int num) throws IOException {
-        this.generateMixedTuples(this.getWriter(f), num);
-    }
-
-    /**
-     * Generates a mixture of good and bad tuples used for tests
-     * 
-     * @param writer
-     *            Writer to write to
-     * @param num
-     *            Number of tuples to generate, they should be a 50/50 mix of
-     *            good and bad tuples
-     * @throws IOException
-     */
-    protected abstract void generateMixedTuples(Writer write, int num) throws IOException;
-
-    /**
-     * Adds an input path to the job configuration
-     * 
-     * @param f
-     *            File
-     * @param config
-     *            Configuration
-     * @param job
-     *            Job
-     * @throws IOException
-     */
-    protected void addInputPath(File f, Configuration config, Job job) throws IOException {
-        FileSystem fs = FileSystem.getLocal(config);
-        Path inputPath = fs.makeQualified(new Path(f.getAbsolutePath()));
-        FileInputFormat.addInputPath(job, inputPath);
-    }
-
-    protected final int countTuples(RecordReader<LongWritable, T> reader) throws IOException, InterruptedException {
-        int count = 0;
-
-        // Check initial progress
-        LOG.info(String.format("Initial Reported Progress %f", reader.getProgress()));
-        float progress = reader.getProgress();
-        if (Float.compare(0.0f, progress) == 0) {
-            Assert.assertEquals(0.0d, reader.getProgress(), 0.0d);
-        } else if (Float.compare(1.0f, progress) == 0) {
-            // If reader is reported 1.0 straight away then we expect there to
-            // be no key values
-            Assert.assertEquals(1.0d, reader.getProgress(), 0.0d);
-            Assert.assertFalse(reader.nextKeyValue());
-        } else {
-            Assert.fail(String.format(
-                    "Expected progress of 0.0 or 1.0 before reader has been accessed for first time but got %f", progress));
-        }
-
-        // Count tuples
-        boolean debug = LOG.isDebugEnabled();
-        while (reader.nextKeyValue()) {
-            count++;
-            progress = reader.getProgress();
-            if (debug)
-                LOG.debug(String.format("Current Reported Progress %f", progress));
-            Assert.assertTrue(String.format("Progress should be in the range 0.0 < p <= 1.0 but got %f", progress),
-                    progress > 0.0f && progress <= 1.0f);
-        }
-        reader.close();
-        LOG.info(String.format("Got %d tuples from this record reader", count));
-
-        // Check final progress
-        LOG.info(String.format("Final Reported Progress %f", reader.getProgress()));
-        Assert.assertEquals(1.0d, reader.getProgress(), 0.0d);
-
-        return count;
-    }
-
-    protected final void checkTuples(RecordReader<LongWritable, T> reader, int expected) throws IOException, InterruptedException {
-        Assert.assertEquals(expected, this.countTuples(reader));
-    }
-
-    /**
-     * Runs a test with a single input
-     * 
-     * @param input
-     *            Input
-     * @param expectedTuples
-     *            Expected tuples
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    protected final void testSingleInput(File input, int expectedSplits, int expectedTuples) throws IOException,
-            InterruptedException {
-        // Prepare configuration
-        Configuration config = this.prepareConfiguration();
-        this.testSingleInput(config, input, expectedSplits, expectedTuples);
-    }
-
-    /**
-     * Runs a test with a single input
-     * 
-     * @param config
-     *            Configuration
-     * @param input
-     *            Input
-     * @param expectedTuples
-     *            Expected tuples
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    protected final void testSingleInput(Configuration config, File input, int expectedSplits, int expectedTuples)
-            throws IOException, InterruptedException {
-        // Set up fake job
-        InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
-        Job job = Job.getInstance(config);
-        job.setInputFormatClass(inputFormat.getClass());
-        this.addInputPath(input, job.getConfiguration(), job);
-        JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
-        Assert.assertEquals(1, FileInputFormat.getInputPaths(context).length);
-        NLineInputFormat.setNumLinesPerSplit(job, LARGE_SIZE);
-
-        // Check splits
-        List<InputSplit> splits = inputFormat.getSplits(context);
-        Assert.assertEquals(expectedSplits, splits.size());
-
-        // Check tuples
-        for (InputSplit split : splits) {
-            TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
-            RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext);
-            reader.initialize(split, taskContext);
-            this.checkTuples(reader, expectedTuples);
-        }
-    }
-
-    protected abstract InputFormat<LongWritable, T> getInputFormat();
-
-    /**
-     * Basic tuples input test
-     * 
-     * @throws IOException
-     * @throws ClassNotFoundException
-     * @throws InterruptedException
-     */
-    @Test
-    public final void single_input_01() throws IOException, InterruptedException, ClassNotFoundException {
-        testSingleInput(empty, this.canSplitInputs() ? 0 : 1, EMPTY_SIZE);
-    }
-
-    /**
-     * Basic tuples input test
-     * 
-     * @throws IOException
-     * @throws ClassNotFoundException
-     * @throws InterruptedException
-     */
-    @Test
-    public final void single_input_02() throws IOException, InterruptedException, ClassNotFoundException {
-        testSingleInput(small, 1, SMALL_SIZE);
-    }
-
-    /**
-     * Basic tuples input test
-     * 
-     * @throws IOException
-     * @throws ClassNotFoundException
-     * @throws InterruptedException
-     */
-    @Test
-    public final void single_input_03() throws IOException, InterruptedException, ClassNotFoundException {
-        testSingleInput(large, 1, LARGE_SIZE);
-    }
-
-    /**
-     * Basic tuples input test
-     * 
-     * @throws IOException
-     * @throws ClassNotFoundException
-     * @throws InterruptedException
-     */
-    @Test
-    public final void single_input_04() throws IOException, InterruptedException, ClassNotFoundException {
-        testSingleInput(bad, 1, 0);
-    }
-
-    /**
-     * Basic tuples input test
-     * 
-     * @throws IOException
-     * @throws ClassNotFoundException
-     * @throws InterruptedException
-     */
-    @Test
-    public final void single_input_05() throws IOException, InterruptedException, ClassNotFoundException {
-        testSingleInput(mixed, 1, MIXED_SIZE / 2);
-    }
-
-    /**
-     * Tests behaviour when ignoring bad tuples is disabled
-     * 
-     * @throws InterruptedException
-     * @throws IOException
-     */
-    @Test(expected = IOException.class)
-    public final void fail_on_bad_input_01() throws IOException, InterruptedException {
-        Configuration config = this.prepareConfiguration();
-        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
-        Assert.assertFalse(config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true));
-        testSingleInput(config, bad, 1, 0);
-    }
-
-    /**
-     * Tests behaviour when ignoring bad tuples is disabled
-     * 
-     * @throws InterruptedException
-     * @throws IOException
-     */
-    @Test(expected = IOException.class)
-    public final void fail_on_bad_input_02() throws IOException, InterruptedException {
-        Configuration config = this.prepareConfiguration();
-        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
-        Assert.assertFalse(config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true));
-        testSingleInput(config, mixed, 1, MIXED_SIZE / 2);
-    }
-
-    /**
-     * Runs a multiple input test
-     * 
-     * @param inputs
-     *            Inputs
-     * @param expectedSplits
-     *            Number of splits expected
-     * @param expectedTuples
-     *            Number of tuples expected
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    protected final void testMultipleInputs(File[] inputs, int expectedSplits, int expectedTuples) throws IOException,
-            InterruptedException {
-        // Prepare configuration and inputs
-        Configuration config = this.prepareConfiguration();
-
-        // Set up fake job
-        InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
-        Job job = Job.getInstance(config);
-        job.setInputFormatClass(inputFormat.getClass());
-        for (File input : inputs) {
-            this.addInputPath(input, job.getConfiguration(), job);
-        }
-        JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
-        Assert.assertEquals(inputs.length, FileInputFormat.getInputPaths(context).length);
-        NLineInputFormat.setNumLinesPerSplit(job, expectedTuples);
-
-        // Check splits
-        List<InputSplit> splits = inputFormat.getSplits(context);
-        Assert.assertEquals(expectedSplits, splits.size());
-
-        // Check tuples
-        int count = 0;
-        for (InputSplit split : splits) {
-            TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
-            RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext);
-            reader.initialize(split, taskContext);
-            count += this.countTuples(reader);
-        }
-        Assert.assertEquals(expectedTuples, count);
-    }
-
-    /**
-     * tuples test with multiple inputs
-     * 
-     * @throws IOException
-     * @throws ClassNotFoundException
-     * @throws InterruptedException
-     */
-    @Test
-    public final void multiple_inputs_01() throws IOException, InterruptedException, ClassNotFoundException {
-        testMultipleInputs(new File[] { empty, small, large }, this.canSplitInputs() ? 2 : 3, EMPTY_SIZE + SMALL_SIZE
-                + LARGE_SIZE);
-    }
-
-    /**
-     * tuples test with multiple inputs
-     * 
-     * @throws IOException
-     * @throws ClassNotFoundException
-     * @throws InterruptedException
-     */
-    @Test
-    public final void multiple_inputs_02() throws IOException, InterruptedException, ClassNotFoundException {
-        testMultipleInputs(new File[] { folder.getRoot() }, this.canSplitInputs() ? 4 : 5, EMPTY_SIZE + SMALL_SIZE + LARGE_SIZE
-                + (MIXED_SIZE / 2));
-    }
-
-    protected final void testSplitInputs(Configuration config, File[] inputs, int expectedSplits, int expectedTuples)
-            throws IOException, InterruptedException {
-        // Set up fake job
-        InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
-        Job job = Job.getInstance(config);
-        job.setInputFormatClass(inputFormat.getClass());
-        for (File input : inputs) {
-            this.addInputPath(input, job.getConfiguration(), job);
-        }
-        JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
-        Assert.assertEquals(inputs.length, FileInputFormat.getInputPaths(context).length);
-
-        // Check splits
-        List<InputSplit> splits = inputFormat.getSplits(context);
-        Assert.assertEquals(expectedSplits, splits.size());
-
-        // Check tuples
-        int count = 0;
-        for (InputSplit split : splits) {
-            // Validate split
-            Assert.assertTrue(this.isValidSplit(split, config));
-
-            // Read split
-            TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
-            RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext);
-            reader.initialize(split, taskContext);
-            count += this.countTuples(reader);
-        }
-        Assert.assertEquals(expectedTuples, count);
-    }
-
-    /**
-     * Determines whether an input split is valid
-     * 
-     * @param split
-     *            Input split
-     * @return True if a valid split, false otherwise
-     * @throws IOException 
-     */
-    protected boolean isValidSplit(InputSplit split, Configuration config) throws IOException {
-        return split instanceof FileSplit;
-    }
-
-    /**
-     * Indicates whether inputs can be split, defaults to true
-     * 
-     * @return Whether inputs can be split
-     */
-    protected boolean canSplitInputs() {
-        return true;
-    }
-
-    /**
-     * Tests for input splitting
-     * 
-     * @throws IOException
-     * @throws InterruptedException
-     * @throws ClassNotFoundException
-     */
-    @Test
-    public final void split_input_01() throws IOException, InterruptedException, ClassNotFoundException {
-        Assume.assumeTrue(this.canSplitInputs());
-
-        Configuration config = this.prepareConfiguration();
-        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
-        Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
-        this.testSplitInputs(config, new File[] { small }, 100, SMALL_SIZE);
-    }
-
-    /**
-     * Tests for input splitting
-     * 
-     * @throws IOException
-     * @throws InterruptedException
-     * @throws ClassNotFoundException
-     */
-    @Test
-    public final void split_input_02() throws IOException, InterruptedException, ClassNotFoundException {
-        Assume.assumeTrue(this.canSplitInputs());
-
-        Configuration config = this.prepareConfiguration();
-        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
-        config.setLong(NLineInputFormat.LINES_PER_MAP, 10);
-        Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
-        this.testSplitInputs(config, new File[] { small }, 10, SMALL_SIZE);
-    }
-
-    /**
-     * Tests for input splitting
-     * 
-     * @throws IOException
-     * @throws InterruptedException
-     * @throws ClassNotFoundException
-     */
-    @Test
-    public final void split_input_03() throws IOException, InterruptedException, ClassNotFoundException {
-        Assume.assumeTrue(this.canSplitInputs());
-
-        Configuration config = this.prepareConfiguration();
-        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
-        config.setLong(NLineInputFormat.LINES_PER_MAP, 100);
-        Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
-        this.testSplitInputs(config, new File[] { large }, 100, LARGE_SIZE);
-    }
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract node tuple input format tests
+ * 
+ * 
+ * 
+ * @param <TValue>
+ * @param <T>
+ */
+public abstract class AbstractNodeTupleInputFormatTests<TValue, T extends AbstractNodeTupleWritable<TValue>> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractNodeTupleInputFormatTests.class);
+
+    protected static final int EMPTY_SIZE = 0, SMALL_SIZE = 100, LARGE_SIZE = 10000, BAD_SIZE = 100, MIXED_SIZE = 100;
+    protected static final String EMPTY = "empty";
+    protected static final String SMALL = "small";
+    protected static final String LARGE = "large";
+    protected static final String BAD = "bad";
+    protected static final String MIXED = "mixed";
+
+    /**
+     * Temporary folder for the tests
+     */
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder();
+
+    protected File empty, small, large, bad, mixed;
+
+    /**
+     * Prepares the inputs for the tests
+     * 
+     * @throws IOException
+     */
+    @Before
+    public void beforeTest() throws IOException {
+        this.prepareInputs();
+    }
+
+    /**
+     * Cleans up the inputs after each test
+     */
+    @After
+    public void afterTest() {
+        // Should be unnecessary since JUnit will clean up the temporary folder
+        // anyway but best to do this regardless
+        if (empty != null)
+            empty.delete();
+        if (small != null)
+            small.delete();
+        if (large != null)
+            large.delete();
+        if (bad != null)
+            bad.delete();
+        if (mixed != null)
+            mixed.delete();
+    }
+
+    /**
+     * Prepares a fresh configuration
+     * 
+     * @return Configuration
+     */
+    protected Configuration prepareConfiguration() {
+        Configuration config = new Configuration(true);
+        // Nothing else to do
+        return config;
+    }
+
+    /**
+     * Prepares the inputs
+     * 
+     * @throws IOException
+     */
+    protected void prepareInputs() throws IOException {
+        String ext = this.getFileExtension();
+        empty = folder.newFile(EMPTY + ext);
+        this.generateTuples(empty, EMPTY_SIZE);
+        small = folder.newFile(SMALL + ext);
+        this.generateTuples(small, SMALL_SIZE);
+        large = folder.newFile(LARGE + ext);
+        this.generateTuples(large, LARGE_SIZE);
+        bad = folder.newFile(BAD + ext);
+        this.generateBadTuples(bad, BAD_SIZE);
+        mixed = folder.newFile(MIXED + ext);
+        this.generateMixedTuples(mixed, MIXED_SIZE);
+    }
+
+    /**
+     * Gets the extra file extension to add to the filenames
+     * 
+     * @return File extension
+     */
+    protected abstract String getFileExtension();
+
+    /**
+     * Generates tuples used for tests
+     * 
+     * @param f
+     *            File
+     * @param num
+     *            Number of tuples to generate
+     * @throws IOException
+     */
+    protected final void generateTuples(File f, int num) throws IOException {
+        this.generateTuples(this.getOutputStream(f), num);
+    }
+
+    /**
+     * Gets the output stream to use for generating tuples
+     * 
+     * @param f
+     *            File
+     * @return Output Stream
+     * @throws IOException
+     */
+    protected OutputStream getOutputStream(File f) throws IOException {
+        return new FileOutputStream(f, false);
+    }
+
+    /**
+     * Generates tuples used for tests
+     * 
+     * @param output
+     *            Output Stream to write to
+     * @param num
+     *            Number of tuples to generate
+     * @throws IOException
+     */
+    protected abstract void generateTuples(OutputStream output, int num) throws IOException;
+
+    /**
+     * Generates bad tuples used for tests
+     * 
+     * @param f
+     *            File
+     * @param num
+     *            Number of bad tuples to generate
+     * @throws IOException
+     */
+    protected final void generateBadTuples(File f, int num) throws IOException {
+        this.generateBadTuples(this.getOutputStream(f), num);
+    }
+
+    /**
+     * Generates bad tuples used for tests
+     * 
+     * @param output
+     *            Output Stream to write to
+     * @param num
+     *            Number of bad tuples to generate
+     * @throws IOException
+     */
+    protected abstract void generateBadTuples(OutputStream output, int num) throws IOException;
+
+    /**
+     * Generates a mixture of good and bad tuples used for tests
+     * 
+     * @param f
+     *            File
+     * @param num
+     *            Number of tuples to generate, they should be a 50/50 mix of
+     *            good and bad tuples
+     * @throws IOException
+     */
+    protected final void generateMixedTuples(File f, int num) throws IOException {
+        this.generateMixedTuples(this.getOutputStream(f), num);
+    }
+
+    /**
+     * Generates a mixture of good and bad tuples used for tests
+     * 
+     * @param output
+     *            Output Stream to write to
+     * @param num
+     *            Number of tuples to generate, they should be a 50/50 mix of
+     *            good and bad tuples
+     * @throws IOException
+     */
+    protected abstract void generateMixedTuples(OutputStream output, int num) throws IOException;
+
+    /**
+     * Adds an input path to the job configuration
+     * 
+     * @param f
+     *            File
+     * @param config
+     *            Configuration
+     * @param job
+     *            Job
+     * @throws IOException
+     */
+    protected void addInputPath(File f, Configuration config, Job job) throws IOException {
+        FileSystem fs = FileSystem.getLocal(config);
+        Path inputPath = fs.makeQualified(new Path(f.getAbsolutePath()));
+        FileInputFormat.addInputPath(job, inputPath);
+    }
+
+    protected final int countTuples(RecordReader<LongWritable, T> reader) throws IOException, InterruptedException {
+        int count = 0;
+
+        // Check initial progress
+        LOG.info(String.format("Initial Reported Progress %f", reader.getProgress()));
+        float progress = reader.getProgress();
+        if (Float.compare(0.0f, progress) == 0) {
+            Assert.assertEquals(0.0d, reader.getProgress(), 0.0d);
+        } else if (Float.compare(1.0f, progress) == 0) {
+            // If reader is reported 1.0 straight away then we expect there to
+            // be no key values
+            Assert.assertEquals(1.0d, reader.getProgress(), 0.0d);
+            Assert.assertFalse(reader.nextKeyValue());
+        } else {
+            Assert.fail(String.format(
+                    "Expected progress of 0.0 or 1.0 before reader has been accessed for first time but got %f",
+                    progress));
+        }
+
+        // Count tuples
+        boolean debug = LOG.isDebugEnabled();
+        while (reader.nextKeyValue()) {
+            count++;
+            progress = reader.getProgress();
+            if (debug)
+                LOG.debug(String.format("Current Reported Progress %f", progress));
+            Assert.assertTrue(String.format("Progress should be in the range 0.0 < p <= 1.0 but got %f", progress),
+                    progress > 0.0f && progress <= 1.0f);
+        }
+        reader.close();
+        LOG.info(String.format("Got %d tuples from this record reader", count));
+
+        // Check final progress
+        LOG.info(String.format("Final Reported Progress %f", reader.getProgress()));
+        Assert.assertEquals(1.0d, reader.getProgress(), 0.0d);
+
+        return count;
+    }
+
+    protected final void checkTuples(RecordReader<LongWritable, T> reader, int expected) throws IOException,
+            InterruptedException {
+        Assert.assertEquals(expected, this.countTuples(reader));
+    }
+
+    /**
+     * Runs a test with a single input
+     * 
+     * @param input
+     *            Input
+     * @param expectedTuples
+     *            Expected tuples
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    protected final void testSingleInput(File input, int expectedSplits, int expectedTuples) throws IOException,
+            InterruptedException {
+        // Prepare configuration
+        Configuration config = this.prepareConfiguration();
+        this.testSingleInput(config, input, expectedSplits, expectedTuples);
+    }
+
+    /**
+     * Runs a test with a single input
+     * 
+     * @param config
+     *            Configuration
+     * @param input
+     *            Input
+     * @param expectedTuples
+     *            Expected tuples
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    protected final void testSingleInput(Configuration config, File input, int expectedSplits, int expectedTuples)
+            throws IOException, InterruptedException {
+        // Set up fake job
+        InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
+        Job job = Job.getInstance(config);
+        job.setInputFormatClass(inputFormat.getClass());
+        this.addInputPath(input, job.getConfiguration(), job);
+        JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+        Assert.assertEquals(1, FileInputFormat.getInputPaths(context).length);
+        NLineInputFormat.setNumLinesPerSplit(job, LARGE_SIZE);
+
+        // Check splits
+        List<InputSplit> splits = inputFormat.getSplits(context);
+        Assert.assertEquals(expectedSplits, splits.size());
+
+        // Check tuples
+        for (InputSplit split : splits) {
+            TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
+            RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext);
+            reader.initialize(split, taskContext);
+            this.checkTuples(reader, expectedTuples);
+        }
+    }
+
+    protected abstract InputFormat<LongWritable, T> getInputFormat();
+
+    /**
+     * Basic tuples input test
+     * 
+     * @throws IOException
+     * @throws ClassNotFoundException
+     * @throws InterruptedException
+     */
+    @Test
+    public final void single_input_01() throws IOException, InterruptedException, ClassNotFoundException {
+        testSingleInput(empty, this.canSplitInputs() ? 0 : 1, EMPTY_SIZE);
+    }
+
+    /**
+     * Basic tuples input test
+     * 
+     * @throws IOException
+     * @throws ClassNotFoundException
+     * @throws InterruptedException
+     */
+    @Test
+    public final void single_input_02() throws IOException, InterruptedException, ClassNotFoundException {
+        testSingleInput(small, 1, SMALL_SIZE);
+    }
+
+    /**
+     * Basic tuples input test
+     * 
+     * @throws IOException
+     * @throws ClassNotFoundException
+     * @throws InterruptedException
+     */
+    @Test
+    public final void single_input_03() throws IOException, InterruptedException, ClassNotFoundException {
+        testSingleInput(large, 1, LARGE_SIZE);
+    }
+
+    /**
+     * Basic tuples input test
+     * 
+     * @throws IOException
+     * @throws ClassNotFoundException
+     * @throws InterruptedException
+     */
+    @Test
+    public final void single_input_04() throws IOException, InterruptedException, ClassNotFoundException {
+        testSingleInput(bad, 1, 0);
+    }
+
+    /**
+     * Basic tuples input test
+     * 
+     * @throws IOException
+     * @throws ClassNotFoundException
+     * @throws InterruptedException
+     */
+    @Test
+    public final void single_input_05() throws IOException, InterruptedException, ClassNotFoundException {
+        testSingleInput(mixed, 1, MIXED_SIZE / 2);
+    }
+
+    /**
+     * Tests behaviour when ignoring bad tuples is disabled
+     * 
+     * @throws InterruptedException
+     * @throws IOException
+     */
+    @Test(expected = IOException.class)
+    public final void fail_on_bad_input_01() throws IOException, InterruptedException {
+        Configuration config = this.prepareConfiguration();
+        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
+        Assert.assertFalse(config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true));
+        testSingleInput(config, bad, 1, 0);
+    }
+
+    /**
+     * Tests behaviour when ignoring bad tuples is disabled
+     * 
+     * @throws InterruptedException
+     * @throws IOException
+     */
+    @Test(expected = IOException.class)
+    public final void fail_on_bad_input_02() throws IOException, InterruptedException {
+        Configuration config = this.prepareConfiguration();
+        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
+        Assert.assertFalse(config.getBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, true));
+        testSingleInput(config, mixed, 1, MIXED_SIZE / 2);
+    }
+
+    /**
+     * Runs a multiple input test
+     * 
+     * @param inputs
+     *            Inputs
+     * @param expectedSplits
+     *            Number of splits expected
+     * @param expectedTuples
+     *            Number of tuples expected
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    protected final void testMultipleInputs(File[] inputs, int expectedSplits, int expectedTuples) throws IOException,
+            InterruptedException {
+        // Prepare configuration and inputs
+        Configuration config = this.prepareConfiguration();
+
+        // Set up fake job
+        InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
+        Job job = Job.getInstance(config);
+        job.setInputFormatClass(inputFormat.getClass());
+        for (File input : inputs) {
+            this.addInputPath(input, job.getConfiguration(), job);
+        }
+        JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+        Assert.assertEquals(inputs.length, FileInputFormat.getInputPaths(context).length);
+        NLineInputFormat.setNumLinesPerSplit(job, expectedTuples);
+
+        // Check splits
+        List<InputSplit> splits = inputFormat.getSplits(context);
+        Assert.assertEquals(expectedSplits, splits.size());
+
+        // Check tuples
+        int count = 0;
+        for (InputSplit split : splits) {
+            TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
+            RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext);
+            reader.initialize(split, taskContext);
+            count += this.countTuples(reader);
+        }
+        Assert.assertEquals(expectedTuples, count);
+    }
+
+    /**
+     * tuples test with multiple inputs
+     * 
+     * @throws IOException
+     * @throws ClassNotFoundException
+     * @throws InterruptedException
+     */
+    @Test
+    public final void multiple_inputs_01() throws IOException, InterruptedException, ClassNotFoundException {
+        testMultipleInputs(new File[] { empty, small, large }, this.canSplitInputs() ? 2 : 3, EMPTY_SIZE + SMALL_SIZE
+                + LARGE_SIZE);
+    }
+
+    /**
+     * tuples test with multiple inputs
+     * 
+     * @throws IOException
+     * @throws ClassNotFoundException
+     * @throws InterruptedException
+     */
+    @Test
+    public final void multiple_inputs_02() throws IOException, InterruptedException, ClassNotFoundException {
+        testMultipleInputs(new File[] { folder.getRoot() }, this.canSplitInputs() ? 4 : 5, EMPTY_SIZE + SMALL_SIZE
+                + LARGE_SIZE + (MIXED_SIZE / 2));
+    }
+
+    protected final void testSplitInputs(Configuration config, File[] inputs, int expectedSplits, int expectedTuples)
+            throws IOException, InterruptedException {
+        // Set up fake job
+        InputFormat<LongWritable, T> inputFormat = this.getInputFormat();
+        Job job = Job.getInstance(config);
+        job.setInputFormatClass(inputFormat.getClass());
+        for (File input : inputs) {
+            this.addInputPath(input, job.getConfiguration(), job);
+        }
+        JobContext context = new JobContextImpl(job.getConfiguration(), job.getJobID());
+        Assert.assertEquals(inputs.length, FileInputFormat.getInputPaths(context).length);
+
+        // Check splits
+        List<InputSplit> splits = inputFormat.getSplits(context);
+        Assert.assertEquals(expectedSplits, splits.size());
+
+        // Check tuples
+        int count = 0;
+        for (InputSplit split : splits) {
+            // Validate split
+            Assert.assertTrue(this.isValidSplit(split, config));
+
+            // Read split
+            TaskAttemptContext taskContext = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
+            RecordReader<LongWritable, T> reader = inputFormat.createRecordReader(split, taskContext);
+            reader.initialize(split, taskContext);
+            count += this.countTuples(reader);
+        }
+        Assert.assertEquals(expectedTuples, count);
+    }
+
+    /**
+     * Determines whether an input split is valid
+     * 
+     * @param split
+     *            Input split
+     * @return True if a valid split, false otherwise
+     * @throws IOException
+     */
+    protected boolean isValidSplit(InputSplit split, Configuration config) throws IOException {
+        return split instanceof FileSplit;
+    }
+
+    /**
+     * Indicates whether inputs can be split, defaults to true
+     * 
+     * @return Whether inputs can be split
+     */
+    protected boolean canSplitInputs() {
+        return true;
+    }
+
+    /**
+     * Tests for input splitting
+     * 
+     * @throws IOException
+     * @throws InterruptedException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public final void split_input_01() throws IOException, InterruptedException, ClassNotFoundException {
+        Assume.assumeTrue(this.canSplitInputs());
+
+        Configuration config = this.prepareConfiguration();
+        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
+        Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
+        this.testSplitInputs(config, new File[] { small }, 100, SMALL_SIZE);
+    }
+
+    /**
+     * Tests for input splitting
+     * 
+     * @throws IOException
+     * @throws InterruptedException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public final void split_input_02() throws IOException, InterruptedException, ClassNotFoundException {
+        Assume.assumeTrue(this.canSplitInputs());
+
+        Configuration config = this.prepareConfiguration();
+        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
+        config.setLong(NLineInputFormat.LINES_PER_MAP, 10);
+        Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
+        this.testSplitInputs(config, new File[] { small }, 10, SMALL_SIZE);
+    }
+
+    /**
+     * Tests for input splitting
+     * 
+     * @throws IOException
+     * @throws InterruptedException
+     * @throws ClassNotFoundException
+     */
+    @Test
+    public final void split_input_03() throws IOException, InterruptedException, ClassNotFoundException {
+        Assume.assumeTrue(this.canSplitInputs());
+
+        Configuration config = this.prepareConfiguration();
+        config.setBoolean(RdfIOConstants.INPUT_IGNORE_BAD_TUPLES, false);
+        config.setLong(NLineInputFormat.LINES_PER_MAP, 100);
+        Assert.assertEquals(Integer.MAX_VALUE, config.getInt(HadoopIOConstants.MAX_LINE_LENGTH, Integer.MAX_VALUE));
+        this.testSplitInputs(config, new File[] { large }, 100, LARGE_SIZE);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractQuadsInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractQuadsInputFormatTests.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractQuadsInputFormatTests.java
index 4dd396b..78d7f33 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractQuadsInputFormatTests.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractQuadsInputFormatTests.java
@@ -19,7 +19,8 @@
 package org.apache.jena.hadoop.rdf.io.input;
 
 import java.io.IOException;
-import java.io.Writer;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
 
 import org.apache.jena.hadoop.rdf.types.QuadWritable;
 
@@ -30,38 +31,40 @@ import com.hp.hpl.jena.sparql.core.Quad;
  * 
  *
  */
-public abstract class AbstractQuadsInputFormatTests extends AbstractNodeTupleInputFormatTests<Quad, QuadWritable> {
+public abstract class AbstractQuadsInputFormatTests extends AbstractNodeTupleInputFormatTests<Quad, QuadWritable> {
+    
+    private static final Charset utf8 = Charset.forName("utf-8");
 
     @Override
-    protected void generateTuples(Writer writer, int num) throws IOException {
+    protected void generateTuples(OutputStream output, int num) throws IOException {
         for (int i = 0; i < num; i++) {
-            writer.write("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n");
+            output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n").getBytes(utf8));
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
     
     @Override
-    protected void generateBadTuples(Writer writer, int num) throws IOException {
+    protected void generateBadTuples(OutputStream output, int num) throws IOException {
         for (int i = 0; i < num; i++) {
-            writer.write("<http://broken\n");
+            output.write("<http://broken\n".getBytes(utf8));
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 
     @Override
-    protected void generateMixedTuples(Writer writer, int num) throws IOException {
+    protected void generateMixedTuples(OutputStream output, int num) throws IOException {
         boolean bad = false;
         for (int i = 0; i < num; i++, bad = !bad) {
             if (bad) {
-                writer.write("<http://broken\n");
+                output.write("<http://broken\n".getBytes(utf8));
             } else {
-                writer.write("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n");
+                output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n").getBytes(utf8));
             }
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractTriplesInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractTriplesInputFormatTests.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractTriplesInputFormatTests.java
index 04572d3..65a9889 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractTriplesInputFormatTests.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractTriplesInputFormatTests.java
@@ -19,7 +19,8 @@
 package org.apache.jena.hadoop.rdf.io.input;
 
 import java.io.IOException;
-import java.io.Writer;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
 
 import org.apache.jena.hadoop.rdf.types.TripleWritable;
 
@@ -31,38 +32,41 @@ import com.hp.hpl.jena.graph.Triple;
  * 
  * 
  */
-public abstract class AbstractTriplesInputFormatTests extends AbstractNodeTupleInputFormatTests<Triple, TripleWritable> {
+public abstract class AbstractTriplesInputFormatTests extends AbstractNodeTupleInputFormatTests<Triple, TripleWritable> {
+    
+    private static final Charset utf8 = Charset.forName("utf-8");
 
     @Override
-    protected void generateTuples(Writer writer, int num) throws IOException {
+    protected void generateTuples(OutputStream output, int num) throws IOException {
         for (int i = 0; i < num; i++) {
-            writer.write("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n");
+            output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n").getBytes(utf8));
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 
     @Override
-    protected void generateBadTuples(Writer writer, int num) throws IOException {
+    protected void generateBadTuples(OutputStream output, int num) throws IOException {
+        byte[] junk = "<http://broken\n".getBytes(utf8);
         for (int i = 0; i < num; i++) {
-            writer.write("<http://broken\n");
+            output.write(junk);
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 
     @Override
-    protected void generateMixedTuples(Writer writer, int num) throws IOException {
+    protected void generateMixedTuples(OutputStream output, int num) throws IOException {
         boolean bad = false;
         for (int i = 0; i < num; i++, bad = !bad) {
             if (bad) {
-                writer.write("<http://broken\n");
+                output.write("<http://broken\n".getBytes(utf8));
             } else {
-                writer.write("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n");
+                output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n").getBytes(utf8));
             }
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileQuadInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileQuadInputFormatTests.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileQuadInputFormatTests.java
index 3f2c8d2..0b6cfde 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileQuadInputFormatTests.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileQuadInputFormatTests.java
@@ -19,7 +19,8 @@
 package org.apache.jena.hadoop.rdf.io.input;
 
 import java.io.IOException;
-import java.io.Writer;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
 
 import org.apache.jena.hadoop.rdf.types.QuadWritable;
 import org.apache.jena.riot.Lang;
@@ -40,16 +41,17 @@ import com.hp.hpl.jena.sparql.core.Quad;
  * 
  * 
  */
-public abstract class AbstractWholeFileQuadInputFormatTests extends AbstractNodeTupleInputFormatTests<Quad, QuadWritable> {
+public abstract class AbstractWholeFileQuadInputFormatTests extends AbstractNodeTupleInputFormatTests<Quad, QuadWritable> {
+    
+    private static final Charset utf8 = Charset.forName("utf-8");
 
     @Override
     protected boolean canSplitInputs() {
         return false;
     }
 
-    @SuppressWarnings("deprecation")
-    private void writeTuples(Dataset ds, Writer writer) {
-        RDFDataMgr.write(writer, ds, RDFWriterRegistry.defaultSerialization(this.getRdfLanguage()));
+    private void writeTuples(Dataset ds, OutputStream output) {
+        RDFDataMgr.write(output, ds, RDFWriterRegistry.defaultSerialization(this.getRdfLanguage()));
     }
 
     /**
@@ -59,7 +61,7 @@ public abstract class AbstractWholeFileQuadInputFormatTests extends AbstractNode
      */
     protected abstract Lang getRdfLanguage();
 
-    private void writeGoodTuples(Writer writer, int num) throws IOException {
+    private void writeGoodTuples(OutputStream output, int num) throws IOException {
         Dataset ds = DatasetFactory.createMem();
         Model m = ModelFactory.createDefaultModel();
         Resource currSubj = m.createResource("http://example.org/subjects/0");
@@ -77,35 +79,37 @@ public abstract class AbstractWholeFileQuadInputFormatTests extends AbstractNode
         if (!m.isEmpty()) {
             ds.addNamedModel("http://example.org/graphs/extra", m);
         }
-        this.writeTuples(ds, writer);
+        this.writeTuples(ds, output);
     }
 
     @Override
-    protected final void generateTuples(Writer writer, int num) throws IOException {
-        this.writeGoodTuples(writer, num);
-        writer.close();
+    protected final void generateTuples(OutputStream output, int num) throws IOException {
+        this.writeGoodTuples(output, num);
+        output.close();
     }
 
     @Override
-    protected final void generateMixedTuples(Writer writer, int num) throws IOException {
+    protected final void generateMixedTuples(OutputStream output, int num) throws IOException {
         // Write good data
-        this.writeGoodTuples(writer, num / 2);
+        this.writeGoodTuples(output, num / 2);
 
-        // Write junk data
+        // Write junk data
+        byte[] junk = "junk data\n".getBytes(utf8);
         for (int i = 0; i < num / 2; i++) {
-            writer.write("junk data\n");
+            output.write(junk);
         }
 
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 
     @Override
-    protected final void generateBadTuples(Writer writer, int num) throws IOException {
+    protected final void generateBadTuples(OutputStream output, int num) throws IOException {
+        byte[] junk = "junk data\n".getBytes(utf8);
         for (int i = 0; i < num; i++) {
-            writer.write("junk data\n");
+            output.write(junk);
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileTripleInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileTripleInputFormatTests.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileTripleInputFormatTests.java
index bacd7ba..b68d662 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileTripleInputFormatTests.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/AbstractWholeFileTripleInputFormatTests.java
@@ -19,7 +19,8 @@
 package org.apache.jena.hadoop.rdf.io.input;
 
 import java.io.IOException;
-import java.io.Writer;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
 
 import org.apache.jena.hadoop.rdf.types.TripleWritable;
 import org.apache.jena.riot.Lang;
@@ -37,18 +38,19 @@ import com.hp.hpl.jena.rdf.model.Resource;
  * 
  * 
  */
-public abstract class AbstractWholeFileTripleInputFormatTests extends AbstractNodeTupleInputFormatTests<Triple, TripleWritable> {
+public abstract class AbstractWholeFileTripleInputFormatTests extends AbstractNodeTupleInputFormatTests<Triple, TripleWritable> {
+    
+    private static final Charset utf8 = Charset.forName("utf-8");
 
     @Override
     protected boolean canSplitInputs() {
         return false;
     }
     
-    @SuppressWarnings("deprecation")
-    private void writeTuples(Model m, Writer writer) {
-        RDFDataMgr.write(writer, m, this.getRdfLanguage());
-    }
-    
+    private void writeTuples(Model m, OutputStream output) {
+        RDFDataMgr.write(output, m, this.getRdfLanguage());
+    }
+        
     /**
      * Gets the RDF language to write out generate tuples in
      * @return RDF language
@@ -56,7 +58,7 @@ public abstract class AbstractWholeFileTripleInputFormatTests extends AbstractNo
     protected abstract Lang getRdfLanguage();
     
     @Override
-    protected final void generateTuples(Writer writer, int num) throws IOException {
+    protected final void generateTuples(OutputStream output, int num) throws IOException {
         Model m = ModelFactory.createDefaultModel();
         Resource currSubj = m.createResource("http://example.org/subjects/0");
         Property predicate = m.createProperty("http://example.org/predicate");
@@ -66,12 +68,12 @@ public abstract class AbstractWholeFileTripleInputFormatTests extends AbstractNo
             }
             m.add(currSubj, predicate, m.createTypedLiteral(i));
         }
-        this.writeTuples(m, writer);
-        writer.close();
-    }
+        this.writeTuples(m, output);
+        output.close();
+    }
     
     @Override
-    protected final void generateMixedTuples(Writer writer, int num) throws IOException {
+    protected final void generateMixedTuples(OutputStream output, int num) throws IOException {
         // Write good data
         Model m = ModelFactory.createDefaultModel();
         Resource currSubj = m.createResource("http://example.org/subjects/0");
@@ -82,23 +84,25 @@ public abstract class AbstractWholeFileTripleInputFormatTests extends AbstractNo
             }
             m.add(currSubj, predicate, m.createTypedLiteral(i));
         }
-        this.writeTuples(m, writer);
+        this.writeTuples(m, output);
         
-        // Write junk data
+        // Write junk data
+        byte[] junk = "junk data\n".getBytes(utf8);
         for (int i = 0; i < num / 2; i++) {
-            writer.write("junk data\n");
+            output.write(junk);
         }
         
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 
     @Override
-    protected final void generateBadTuples(Writer writer, int num) throws IOException {
+    protected final void generateBadTuples(OutputStream output, int num) throws IOException {
+        byte[] junk = "junk data\n".getBytes(utf8);
         for (int i = 0; i < num; i++) {
-            writer.write("junk data\n");
+            output.write(junk);
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedNodeTupleInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedNodeTupleInputFormatTests.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedNodeTupleInputFormatTests.java
index 5725cf7..1f18a95 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedNodeTupleInputFormatTests.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedNodeTupleInputFormatTests.java
@@ -22,9 +22,6 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -40,8 +37,8 @@ import org.apache.jena.hadoop.rdf.types.AbstractNodeTupleWritable;
  * @param <T>
  */
 public abstract class AbstractCompressedNodeTupleInputFormatTests<TValue, T extends AbstractNodeTupleWritable<TValue>> extends
-        AbstractNodeTupleInputFormatTests<TValue, T> {
-
+        AbstractNodeTupleInputFormatTests<TValue, T> {
+    
     @Override
     protected Configuration prepareConfiguration() {
         Configuration config = super.prepareConfiguration();
@@ -50,14 +47,13 @@ public abstract class AbstractCompressedNodeTupleInputFormatTests<TValue, T exte
     }
 
     @Override
-    protected Writer getWriter(File f) throws IOException {
+    protected OutputStream getOutputStream(File f) throws IOException {
         CompressionCodec codec = this.getCompressionCodec();
         if (codec instanceof Configurable) {
             ((Configurable) codec).setConf(this.prepareConfiguration());
         }
         FileOutputStream fileOutput = new FileOutputStream(f, false);
-        OutputStream output = codec.createOutputStream(fileOutput);
-        return new OutputStreamWriter(output);
+        return codec.createOutputStream(fileOutput);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedQuadsInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedQuadsInputFormatTests.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedQuadsInputFormatTests.java
index 386e772..312aae7 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedQuadsInputFormatTests.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedQuadsInputFormatTests.java
@@ -19,7 +19,8 @@
 package org.apache.jena.hadoop.rdf.io.input.compressed;
 
 import java.io.IOException;
-import java.io.Writer;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
 
 import org.apache.jena.hadoop.rdf.types.QuadWritable;
 
@@ -32,37 +33,39 @@ import com.hp.hpl.jena.sparql.core.Quad;
  * 
  */
 public abstract class AbstractCompressedQuadsInputFormatTests extends
-        AbstractCompressedNodeTupleInputFormatTests<Quad, QuadWritable> {
+        AbstractCompressedNodeTupleInputFormatTests<Quad, QuadWritable> {
+    
+    private static final Charset utf8 = Charset.forName("utf-8");
 
     @Override
-    protected void generateTuples(Writer writer, int num) throws IOException {
+    protected void generateTuples(OutputStream output, int num) throws IOException {
         for (int i = 0; i < num; i++) {
-            writer.write("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n");
+            output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n").getBytes(utf8));
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 
     @Override
-    protected void generateBadTuples(Writer writer, int num) throws IOException {
+    protected void generateBadTuples(OutputStream output, int num) throws IOException {
         for (int i = 0; i < num; i++) {
-            writer.write("<http://broken\n");
+            output.write("<http://broken\n".getBytes(utf8));
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 
     @Override
-    protected void generateMixedTuples(Writer writer, int num) throws IOException {
+    protected void generateMixedTuples(OutputStream output, int num) throws IOException {
         boolean bad = false;
         for (int i = 0; i < num; i++, bad = !bad) {
             if (bad) {
-                writer.write("<http://broken\n");
+                output.write("<http://broken\n".getBytes(utf8));
             } else {
-                writer.write("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n");
+                output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" <http://graphs/" + i + "> .\n").getBytes(utf8));
             }
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedTriplesInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedTriplesInputFormatTests.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedTriplesInputFormatTests.java
index 5312b9e..f0f0caf 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedTriplesInputFormatTests.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedTriplesInputFormatTests.java
@@ -19,7 +19,8 @@
 package org.apache.jena.hadoop.rdf.io.input.compressed;
 
 import java.io.IOException;
-import java.io.Writer;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
 
 import org.apache.jena.hadoop.rdf.types.TripleWritable;
 
@@ -32,37 +33,39 @@ import com.hp.hpl.jena.graph.Triple;
  * 
  */
 public abstract class AbstractCompressedTriplesInputFormatTests extends
-        AbstractCompressedNodeTupleInputFormatTests<Triple, TripleWritable> {
+        AbstractCompressedNodeTupleInputFormatTests<Triple, TripleWritable> {
+    
+    private static final Charset utf8 = Charset.forName("utf-8");
 
     @Override
-    protected void generateTuples(Writer writer, int num) throws IOException {
+    protected void generateTuples(OutputStream output, int num) throws IOException {
         for (int i = 0; i < num; i++) {
-            writer.write("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n");
+            output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n").getBytes(utf8));
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 
     @Override
-    protected void generateBadTuples(Writer writer, int num) throws IOException {
+    protected void generateBadTuples(OutputStream output, int num) throws IOException {
         for (int i = 0; i < num; i++) {
-            writer.write("<http://broken\n");
+            output.write("<http://broken\n".getBytes(utf8));
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 
     @Override
-    protected void generateMixedTuples(Writer writer, int num) throws IOException {
+    protected void generateMixedTuples(OutputStream output, int num) throws IOException {
         boolean bad = false;
         for (int i = 0; i < num; i++, bad = !bad) {
             if (bad) {
-                writer.write("<http://broken\n");
+                output.write("<http://broken\n".getBytes(utf8));
             } else {
-                writer.write("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n");
+                output.write(("<http://subjects/" + i + "> <http://predicate> \"" + i + "\" .\n").getBytes(utf8));
             }
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/jena/blob/f08fff2a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedWholeFileQuadInputFormatTests.java
----------------------------------------------------------------------
diff --git a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedWholeFileQuadInputFormatTests.java b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedWholeFileQuadInputFormatTests.java
index 3dd0bd0..be2b1d7 100644
--- a/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedWholeFileQuadInputFormatTests.java
+++ b/jena-hadoop-rdf/hadoop-rdf-io/src/test/java/org/apache/jena/hadoop/rdf/io/input/compressed/AbstractCompressedWholeFileQuadInputFormatTests.java
@@ -22,8 +22,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
+import java.nio.charset.Charset;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
@@ -33,8 +32,8 @@ import org.apache.jena.hadoop.rdf.io.input.AbstractNodeTupleInputFormatTests;
 import org.apache.jena.hadoop.rdf.types.QuadWritable;
 import org.apache.jena.riot.Lang;
 import org.apache.jena.riot.RDFDataMgr;
-import org.apache.jena.riot.RDFWriterRegistry;
-
+import org.apache.jena.riot.RDFWriterRegistry;
+
 import com.hp.hpl.jena.query.Dataset;
 import com.hp.hpl.jena.query.DatasetFactory;
 import com.hp.hpl.jena.rdf.model.Model;
@@ -49,7 +48,9 @@ import com.hp.hpl.jena.sparql.core.Quad;
  * 
  */
 public abstract class AbstractCompressedWholeFileQuadInputFormatTests extends
-        AbstractNodeTupleInputFormatTests<Quad, QuadWritable> {
+        AbstractNodeTupleInputFormatTests<Quad, QuadWritable> {
+    
+    private static final Charset utf8 = Charset.forName("utf-8");
 
     @Override
     protected Configuration prepareConfiguration() {
@@ -59,14 +60,13 @@ public abstract class AbstractCompressedWholeFileQuadInputFormatTests extends
     }
 
     @Override
-    protected Writer getWriter(File f) throws IOException {
+    protected OutputStream getOutputStream(File f) throws IOException {
         CompressionCodec codec = this.getCompressionCodec();
         if (codec instanceof Configurable) {
             ((Configurable) codec).setConf(this.prepareConfiguration());
         }
         FileOutputStream fileOutput = new FileOutputStream(f, false);
-        OutputStream output = codec.createOutputStream(fileOutput);
-        return new OutputStreamWriter(output);
+        return codec.createOutputStream(fileOutput);
     }
 
     /**
@@ -85,9 +85,8 @@ public abstract class AbstractCompressedWholeFileQuadInputFormatTests extends
         return false;
     }
 
-    @SuppressWarnings("deprecation")
-    private void writeTuples(Dataset ds, Writer writer) {
-        RDFDataMgr.write(writer, ds, RDFWriterRegistry.defaultSerialization(this.getRdfLanguage()));
+    private void writeTuples(Dataset ds, OutputStream output) {
+        RDFDataMgr.write(output, ds, RDFWriterRegistry.defaultSerialization(this.getRdfLanguage()));
     }
 
     /**
@@ -97,7 +96,7 @@ public abstract class AbstractCompressedWholeFileQuadInputFormatTests extends
      */
     protected abstract Lang getRdfLanguage();
 
-    private void writeGoodTuples(Writer writer, int num) throws IOException {
+    private void writeGoodTuples(OutputStream output, int num) throws IOException {
         Dataset ds = DatasetFactory.createMem();
         Model m = ModelFactory.createDefaultModel();
         Resource currSubj = m.createResource("http://example.org/subjects/0");
@@ -115,35 +114,37 @@ public abstract class AbstractCompressedWholeFileQuadInputFormatTests extends
         if (!m.isEmpty()) {
             ds.addNamedModel("http://example.org/graphs/extra", m);
         }
-        this.writeTuples(ds, writer);
+        this.writeTuples(ds, output);
     }
 
     @Override
-    protected final void generateTuples(Writer writer, int num) throws IOException {
-        this.writeGoodTuples(writer, num);
-        writer.close();
+    protected final void generateTuples(OutputStream output, int num) throws IOException {
+        this.writeGoodTuples(output, num);
+        output.close();
     }
 
     @Override
-    protected final void generateMixedTuples(Writer writer, int num) throws IOException {
+    protected final void generateMixedTuples(OutputStream output, int num) throws IOException {
         // Write good data
-        this.writeGoodTuples(writer, num / 2);
+        this.writeGoodTuples(output, num / 2);
 
-        // Write junk data
+        // Write junk data
+        byte[] junk = "junk data\n".getBytes(utf8);
         for (int i = 0; i < num / 2; i++) {
-            writer.write("junk data\n");
+            output.write(junk);
         }
 
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 
     @Override
-    protected final void generateBadTuples(Writer writer, int num) throws IOException {
+    protected final void generateBadTuples(OutputStream output, int num) throws IOException {
+        byte[] junk = "junk data\n".getBytes(utf8);
         for (int i = 0; i < num; i++) {
-            writer.write("junk data\n");
+            output.write(junk);
         }
-        writer.flush();
-        writer.close();
+        output.flush();
+        output.close();
     }
 }


Mime
View raw message