kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jdcry...@apache.org
Subject [02/36] incubator-kudu git commit: [java-client] repackage to org.apache.kudu (Part 1)
Date Mon, 25 Jul 2016 17:15:07 GMT
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/HadoopTestingUtility.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/HadoopTestingUtility.java b/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/HadoopTestingUtility.java
deleted file mode 100644
index 1e2cb41..0000000
--- a/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/HadoopTestingUtility.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.kududb.mapreduce;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.File;
-import java.io.IOException;
-
-/**
- * This class is analog to HBaseTestingUtility except that we only need it for the MR tests.
- */
-public class HadoopTestingUtility {
-
-  private static final Log LOG = LogFactory.getLog(HadoopTestingUtility.class);
-
-  private File testDir;
-
-  private Configuration conf = new Configuration();
-
-  /**
-   * System property key to get base test directory value
-   */
-  public static final String BASE_TEST_DIRECTORY_KEY =
-      "test.build.data.basedirectory";
-
-  /**
-   * Default base directory for test output.
-   */
-  private static final String DEFAULT_BASE_TEST_DIRECTORY = "target/mr-data";
-
-  public Configuration getConfiguration() {
-    return this.conf;
-  }
-
-  /**
-   * Sets up a temporary directory for the test to run in. Call cleanup() at the end of your
-   * tests to remove it.
-   * @param testName Will be used to build a part of the directory name for the test
-   * @return Where the test is homed
-   */
-  public File setupAndGetTestDir(String testName, Configuration conf) {
-    if (this.testDir != null) {
-      return this.testDir;
-    }
-    Path testPath = new Path(getBaseTestDir(), testName + System.currentTimeMillis());
-    this.testDir = new File(testPath.toString()).getAbsoluteFile();
-    this.testDir.mkdirs();
-    // Set this property so when mapreduce jobs run, they will use this as their home dir.
-    System.setProperty("test.build.dir", this.testDir.toString());
-    System.setProperty("hadoop.home.dir", this.testDir.toString());
-    conf.set("hadoop.tmp.dir", this.testDir.toString() + "/mapred");
-
-    LOG.info("Test configured to write to " + this.testDir);
-    return this.testDir;
-  }
-
-  private Path getBaseTestDir() {
-    String pathName = System.getProperty(BASE_TEST_DIRECTORY_KEY, DEFAULT_BASE_TEST_DIRECTORY);
-    return new Path(pathName);
-  }
-
-  public void cleanup() throws IOException {
-    FileSystem.closeAll();
-    if (this.testDir != null) {
-      delete(this.testDir);
-    }
-  }
-
-  private void delete(File dir) throws IOException {
-    if (dir == null || !dir.exists()) {
-      return;
-    }
-    try {
-      FileUtils.deleteDirectory(dir);
-    } catch (IOException ex) {
-      LOG.warn("Failed to delete " + dir.getAbsolutePath());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/ITInputFormatJob.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/ITInputFormatJob.java b/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/ITInputFormatJob.java
deleted file mode 100644
index 3d04043..0000000
--- a/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/ITInputFormatJob.java
+++ /dev/null
@@ -1,129 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.mapreduce;
-
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.kududb.client.BaseKuduTest;
-import org.kududb.client.KuduPredicate;
-import org.kududb.client.RowResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.*;
-
-public class ITInputFormatJob extends BaseKuduTest {
-  private static final Logger LOG = LoggerFactory.getLogger(ITInputFormatJob.class);
-
-  private static final String TABLE_NAME =
-      ITInputFormatJob.class.getName() + "-" + System.currentTimeMillis();
-
-  private static final HadoopTestingUtility HADOOP_UTIL = new HadoopTestingUtility();
-
-  /** Counter enumeration to count the actual rows. */
-  private static enum Counters { ROWS }
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    BaseKuduTest.setUpBeforeClass();
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    try {
-      BaseKuduTest.tearDownAfterClass();
-    } finally {
-      HADOOP_UTIL.cleanup();
-    }
-  }
-
-  @Test
-  @SuppressWarnings("deprecation")
-  public void test() throws Exception {
-
-    createFourTabletsTableWithNineRows(TABLE_NAME);
-
-    Configuration conf = new Configuration();
-    HADOOP_UTIL.setupAndGetTestDir(ITInputFormatJob.class.getName(), conf).getAbsolutePath();
-
-    createAndTestJob(conf, new ArrayList<KuduPredicate>(), 9);
-
-    KuduPredicate pred1 = KuduPredicate.newComparisonPredicate(
-        basicSchema.getColumnByIndex(0), KuduPredicate.ComparisonOp.GREATER_EQUAL, 20);
-    createAndTestJob(conf, Lists.newArrayList(pred1), 6);
-
-    KuduPredicate pred2 = KuduPredicate.newComparisonPredicate(
-        basicSchema.getColumnByIndex(2), KuduPredicate.ComparisonOp.LESS_EQUAL, 1);
-    createAndTestJob(conf, Lists.newArrayList(pred1, pred2), 2);
-  }
-
-  private void createAndTestJob(Configuration conf,
-                                List<KuduPredicate> predicates, int expectedCount)
-      throws Exception {
-    String jobName = ITInputFormatJob.class.getName();
-    Job job = new Job(conf, jobName);
-
-    Class<TestMapperTableInput> mapperClass = TestMapperTableInput.class;
-    job.setJarByClass(mapperClass);
-    job.setMapperClass(mapperClass);
-    job.setNumReduceTasks(0);
-    job.setOutputFormatClass(NullOutputFormat.class);
-    KuduTableMapReduceUtil.TableInputFormatConfigurator configurator =
-        new KuduTableMapReduceUtil.TableInputFormatConfigurator(
-            job,
-            TABLE_NAME,
-            "*",
-            getMasterAddresses())
-            .operationTimeoutMs(DEFAULT_SLEEP)
-            .addDependencies(false)
-            .cacheBlocks(false);
-    for (KuduPredicate predicate : predicates) {
-      configurator.addPredicate(predicate);
-    }
-    configurator.configure();
-
-    assertTrue("Test job did not end properly", job.waitForCompletion(true));
-
-    assertEquals(expectedCount, job.getCounters().findCounter(Counters.ROWS).getValue());
-  }
-
-  /**
-   * Simple row counter and printer
-   */
-  static class TestMapperTableInput extends
-      Mapper<NullWritable, RowResult, NullWritable, NullWritable> {
-
-    @Override
-    protected void map(NullWritable key, RowResult value, Context context) throws IOException,
-        InterruptedException {
-      context.getCounter(Counters.ROWS).increment(1);
-      LOG.info(value.toStringLongFormat()); // useful to visual debugging
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/ITKuduTableInputFormat.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/ITKuduTableInputFormat.java b/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/ITKuduTableInputFormat.java
deleted file mode 100644
index ff4d81a..0000000
--- a/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/ITKuduTableInputFormat.java
+++ /dev/null
@@ -1,132 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.mapreduce;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import org.kududb.Schema;
-import org.kududb.client.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.List;
-
-import static org.junit.Assert.*;
-
-public class ITKuduTableInputFormat extends BaseKuduTest {
-
-  private static final String TABLE_NAME =
-      ITKuduTableInputFormat.class.getName() + "-" + System.currentTimeMillis();
-
-  @Test
-  public void test() throws Exception {
-    createTable(TABLE_NAME, getBasicSchema(), getBasicCreateTableOptions());
-
-    KuduTable table = openTable(TABLE_NAME);
-    Schema schema = getBasicSchema();
-    Insert insert = table.newInsert();
-    PartialRow row = insert.getRow();
-    row.addInt(0, 1);
-    row.addInt(1, 2);
-    row.addInt(2, 3);
-    row.addString(3, "a string");
-    row.addBoolean(4, true);
-    AsyncKuduSession session = client.newSession();
-    session.apply(insert).join(DEFAULT_SLEEP);
-    session.close().join(DEFAULT_SLEEP);
-
-    // Test getting all the columns back
-    RecordReader<NullWritable, RowResult> reader = createRecordReader("*", null);
-    assertTrue(reader.nextKeyValue());
-    assertEquals(5, reader.getCurrentValue().getColumnProjection().getColumnCount());
-    assertFalse(reader.nextKeyValue());
-
-    // Test getting two columns back
-    reader = createRecordReader(schema.getColumnByIndex(3).getName() + "," +
-        schema.getColumnByIndex(2).getName(), null);
-    assertTrue(reader.nextKeyValue());
-    assertEquals(2, reader.getCurrentValue().getColumnProjection().getColumnCount());
-    assertEquals("a string", reader.getCurrentValue().getString(0));
-    assertEquals(3, reader.getCurrentValue().getInt(1));
-    try {
-      reader.getCurrentValue().getString(2);
-      fail("Should only be getting 2 columns back");
-    } catch (IndexOutOfBoundsException e) {
-      // expected
-    }
-
-    // Test getting one column back
-    reader = createRecordReader(schema.getColumnByIndex(1).getName(), null);
-    assertTrue(reader.nextKeyValue());
-    assertEquals(1, reader.getCurrentValue().getColumnProjection().getColumnCount());
-    assertEquals(2, reader.getCurrentValue().getInt(0));
-    try {
-      reader.getCurrentValue().getString(1);
-      fail("Should only be getting 1 column back");
-    } catch (IndexOutOfBoundsException e) {
-      // expected
-    }
-
-    // Test getting empty rows back
-    reader = createRecordReader("", null);
-    assertTrue(reader.nextKeyValue());
-    assertEquals(0, reader.getCurrentValue().getColumnProjection().getColumnCount());
-    assertFalse(reader.nextKeyValue());
-
-    // Test getting an unknown table, will not work
-    try {
-      createRecordReader("unknown", null);
-      fail("Should not be able to scan a column that doesn't exist");
-    } catch (IllegalArgumentException e) {
-      // expected
-    }
-
-    // Test using a predicate that filters the row out.
-    KuduPredicate pred1 = KuduPredicate.newComparisonPredicate(
-        schema.getColumnByIndex(1), KuduPredicate.ComparisonOp.GREATER_EQUAL, 3);
-    reader = createRecordReader("*", Lists.newArrayList(pred1));
-    assertFalse(reader.nextKeyValue());
-  }
-
-  private RecordReader<NullWritable, RowResult> createRecordReader(String columnProjection,
-        List<KuduPredicate> predicates) throws IOException, InterruptedException {
-    KuduTableInputFormat input = new KuduTableInputFormat();
-    Configuration conf = new Configuration();
-    conf.set(KuduTableInputFormat.MASTER_ADDRESSES_KEY, getMasterAddresses());
-    conf.set(KuduTableInputFormat.INPUT_TABLE_KEY, TABLE_NAME);
-    if (columnProjection != null) {
-      conf.set(KuduTableInputFormat.COLUMN_PROJECTION_KEY, columnProjection);
-    }
-    if (predicates != null) {
-      String encodedPredicates = KuduTableMapReduceUtil.base64EncodePredicates(predicates);
-      conf.set(KuduTableInputFormat.ENCODED_PREDICATES_KEY, encodedPredicates);
-    }
-    input.setConf(conf);
-    List<InputSplit> splits = input.getSplits(null);
-
-    // We need to re-create the input format to reconnect the client.
-    input = new KuduTableInputFormat();
-    input.setConf(conf);
-    RecordReader<NullWritable, RowResult> reader = input.createRecordReader(null, null);
-    reader.initialize(Iterables.getOnlyElement(splits), null);
-    return reader;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/ITKuduTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/ITKuduTableOutputFormat.java b/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/ITKuduTableOutputFormat.java
deleted file mode 100644
index 86452ed..0000000
--- a/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/ITKuduTableOutputFormat.java
+++ /dev/null
@@ -1,66 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.mapreduce;
-
-import org.kududb.client.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class ITKuduTableOutputFormat extends BaseKuduTest {
-
-  private static final String TABLE_NAME =
-      ITKuduTableOutputFormat.class.getName() + "-" + System.currentTimeMillis();
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    BaseKuduTest.setUpBeforeClass();
-  }
-
-  @Test
-  public void test() throws Exception {
-    createTable(TABLE_NAME, getBasicSchema(), getBasicCreateTableOptions());
-
-    KuduTableOutputFormat output = new KuduTableOutputFormat();
-    Configuration conf = new Configuration();
-    conf.set(KuduTableOutputFormat.MASTER_ADDRESSES_KEY, getMasterAddresses());
-    conf.set(KuduTableOutputFormat.OUTPUT_TABLE_KEY, TABLE_NAME);
-    output.setConf(conf);
-
-    String multitonKey = conf.get(KuduTableOutputFormat.MULTITON_KEY);
-    KuduTable table = KuduTableOutputFormat.getKuduTable(multitonKey);
-    assertNotNull(table);
-
-    Insert insert = table.newInsert();
-    PartialRow row = insert.getRow();
-    row.addInt(0, 1);
-    row.addInt(1, 2);
-    row.addInt(2, 3);
-    row.addString(3, "a string");
-    row.addBoolean(4, true);
-
-    RecordWriter<NullWritable, Operation> rw = output.getRecordWriter(null);
-    rw.write(NullWritable.get(), insert);
-    rw.close(null);
-    AsyncKuduScanner.AsyncKuduScannerBuilder builder = client.newScannerBuilder(table);
-    assertEquals(1, countRowsInScan(builder.build()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/ITOutputFormatJob.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/ITOutputFormatJob.java b/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/ITOutputFormatJob.java
deleted file mode 100644
index dff2400..0000000
--- a/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/ITOutputFormatJob.java
+++ /dev/null
@@ -1,131 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.mapreduce;
-
-import org.kududb.client.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-import static org.junit.Assert.*;
-
-public class ITOutputFormatJob extends BaseKuduTest {
-
-  private static final String TABLE_NAME =
-      ITOutputFormatJob.class.getName() + "-" + System.currentTimeMillis();
-
-  private static final HadoopTestingUtility HADOOP_UTIL = new HadoopTestingUtility();
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    BaseKuduTest.setUpBeforeClass();
-    createTable(TABLE_NAME, getBasicSchema(), getBasicCreateTableOptions());
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    try {
-      BaseKuduTest.tearDownAfterClass();
-    } finally {
-      HADOOP_UTIL.cleanup();
-    }
-  }
-
-  @Test
-  @SuppressWarnings("deprecation")
-  public void test() throws Exception {
-    Configuration conf = new Configuration();
-    String testHome =
-        HADOOP_UTIL.setupAndGetTestDir(ITOutputFormatJob.class.getName(), conf).getAbsolutePath();
-    String jobName = ITOutputFormatJob.class.getName();
-    Job job = new Job(conf, jobName);
-
-
-    // Create a 2 lines input file
-    File data = new File(testHome, "data.txt");
-    writeDataFile(data);
-    FileInputFormat.setInputPaths(job, data.toString());
-
-    // Configure the job to map the file and write to kudu, without reducers
-    Class<TestMapperTableOutput> mapperClass = TestMapperTableOutput.class;
-    job.setJarByClass(mapperClass);
-    job.setMapperClass(mapperClass);
-    job.setInputFormatClass(TextInputFormat.class);
-    job.setNumReduceTasks(0);
-    new KuduTableMapReduceUtil.TableOutputFormatConfigurator(
-        job,
-        TABLE_NAME,
-        getMasterAddresses())
-        .operationTimeoutMs(DEFAULT_SLEEP)
-        .addDependencies(false)
-        .configure();
-
-    assertTrue("Test job did not end properly", job.waitForCompletion(true));
-
-    // Make sure the data's there
-    KuduTable table = openTable(TABLE_NAME);
-    AsyncKuduScanner.AsyncKuduScannerBuilder builder =
-        client.newScannerBuilder(table);
-    assertEquals(2, countRowsInScan(builder.build()));
-  }
-
-  /**
-   * Simple Mapper that writes one row per line, the key is the line number and the STRING column
-   * is the data from that line
-   */
-  static class TestMapperTableOutput extends
-      Mapper<LongWritable, Text, NullWritable, Operation> {
-
-    private KuduTable table;
-    @Override
-    protected void map(LongWritable key, Text value, Context context) throws IOException,
-        InterruptedException {
-      Insert insert = table.newInsert();
-      PartialRow row = insert.getRow();
-      row.addInt(0, (int) key.get());
-      row.addInt(1, 1);
-      row.addInt(2, 2);
-      row.addString(3, value.toString());
-      row.addBoolean(4, true);
-      context.write(NullWritable.get(), insert);
-    }
-
-    @Override
-    protected void setup(Context context) throws IOException, InterruptedException {
-      super.setup(context);
-      table = KuduTableMapReduceUtil.getTableFromContext(context);
-    }
-  }
-
-  private void writeDataFile(File data) throws IOException {
-    FileOutputStream fos = new FileOutputStream(data);
-    fos.write("VALUE1\nVALUE2\n".getBytes());
-    fos.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/TestJarFinder.java
----------------------------------------------------------------------
diff --git a/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/TestJarFinder.java b/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/TestJarFinder.java
deleted file mode 100644
index 3801a0c..0000000
--- a/java/kudu-mapreduce/src/test/java/org/kududb/mapreduce/TestJarFinder.java
+++ /dev/null
@@ -1,128 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package org.kududb.mapreduce;
-
-import org.apache.commons.logging.LogFactory;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.Writer;
-import java.text.MessageFormat;
-import java.util.Properties;
-import java.util.jar.JarInputStream;
-import java.util.jar.JarOutputStream;
-import java.util.jar.Manifest;
-
-/**
- * This file was forked from hbase/branches/master@4ce6f48.
- */
-public class TestJarFinder {
-
-  @Test
-  public void testJar() throws Exception {
-
-    // Picking a class that is for sure in a JAR in the classpath
-    String jar = JarFinder.getJar(LogFactory.class);
-    Assert.assertTrue(new File(jar).exists());
-  }
-
-  private static void delete(File file) throws IOException {
-    if (file.getAbsolutePath().length() < 5) {
-      throw new IllegalArgumentException(
-        MessageFormat.format("Path [{0}] is too short, not deleting",
-          file.getAbsolutePath()));
-    }
-    if (file.exists()) {
-      if (file.isDirectory()) {
-        File[] children = file.listFiles();
-        if (children != null) {
-          for (File child : children) {
-            delete(child);
-          }
-        }
-      }
-      if (!file.delete()) {
-        throw new RuntimeException(
-          MessageFormat.format("Could not delete path [{0}]",
-            file.getAbsolutePath()));
-      }
-    }
-  }
-
-  @Test
-  public void testExpandedClasspath() throws Exception {
-    // Picking a class that is for sure in a directory in the classpath
-    // In this case, the JAR is created on the fly
-    String jar = JarFinder.getJar(TestJarFinder.class);
-    Assert.assertTrue(new File(jar).exists());
-  }
-
-  @Test
-  public void testExistingManifest() throws Exception {
-    File dir = new File(System.getProperty("test.build.dir", "target/test-dir"),
-      TestJarFinder.class.getName() + "-testExistingManifest");
-    delete(dir);
-    dir.mkdirs();
-
-    File metaInfDir = new File(dir, "META-INF");
-    metaInfDir.mkdirs();
-    File manifestFile = new File(metaInfDir, "MANIFEST.MF");
-    Manifest manifest = new Manifest();
-    OutputStream os = new FileOutputStream(manifestFile);
-    manifest.write(os);
-    os.close();
-
-    File propsFile = new File(dir, "props.properties");
-    Writer writer = new FileWriter(propsFile);
-    new Properties().store(writer, "");
-    writer.close();
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    JarOutputStream zos = new JarOutputStream(baos);
-    JarFinder.jarDir(dir, "", zos);
-    JarInputStream jis =
-      new JarInputStream(new ByteArrayInputStream(baos.toByteArray()));
-    Assert.assertNotNull(jis.getManifest());
-    jis.close();
-  }
-
-  @Test
-  public void testNoManifest() throws Exception {
-    File dir = new File(System.getProperty("test.build.dir", "target/test-dir"),
-      TestJarFinder.class.getName() + "-testNoManifest");
-    delete(dir);
-    dir.mkdirs();
-    File propsFile = new File(dir, "props.properties");
-    Writer writer = new FileWriter(propsFile);
-    new Properties().store(writer, "");
-    writer.close();
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    JarOutputStream zos = new JarOutputStream(baos);
-    JarFinder.jarDir(dir, "", zos);
-    JarInputStream jis =
-      new JarInputStream(new ByteArrayInputStream(baos.toByteArray()));
-    Assert.assertNotNull(jis.getManifest());
-    jis.close();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
new file mode 100644
index 0000000..95145d5
--- /dev/null
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.kududb.spark.kudu
+
+import java.sql.Timestamp
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
+import org.kududb.Type
+import org.kududb.annotations.InterfaceStability
+import org.kududb.client._
+import org.kududb.client.KuduPredicate.ComparisonOp
+import org.kududb.client.SessionConfiguration.FlushMode
+import org.apache.spark.sql.SaveMode._
+
+import scala.collection.JavaConverters._
+
+/**
+  * DefaultSource for integration with Spark's dataframe datasources.
+  * This class will produce a relationProvider based on input given to it from spark.
+  */
+@InterfaceStability.Unstable
+class DefaultSource extends RelationProvider with CreatableRelationProvider {
+
+  val TABLE_KEY = "kudu.table"
+  val KUDU_MASTER = "kudu.master"
+
+  /**
+    * Construct a BaseRelation using the provided context and parameters.
+    *
+    * @param sqlContext SparkSQL context
+    * @param parameters parameters given to us from SparkSQL
+    * @return           a BaseRelation Object
+    */
+  override def createRelation(sqlContext: SQLContext,
+                              parameters: Map[String, String]):
+  BaseRelation = {
+    val tableName = parameters.getOrElse(TABLE_KEY,
+      throw new IllegalArgumentException(s"Kudu table name must be specified in create options " +
+        s"using key '$TABLE_KEY'"))
+    val kuduMaster = parameters.getOrElse(KUDU_MASTER, "localhost")
+
+    new KuduRelation(tableName, kuduMaster)(sqlContext)
+  }
+
+  /**
+    * Creates a relation and inserts data to specified table.
+    *
+    * @param sqlContext
+    * @param mode Append will not overwrite existing data, Overwrite will perform update, but will
+    *             not insert data, use upsert on KuduContext if you require both
+    * @param parameters Nessisary parameters for kudu.table and kudu.master
+    * @param data Dataframe to save into kudu
+    * @return returns populated base relation
+    */
+  override def createRelation(sqlContext: SQLContext, mode: SaveMode,
+                              parameters: Map[String, String], data: DataFrame): BaseRelation = {
+    val tableName = parameters.getOrElse(TABLE_KEY,
+      throw new IllegalArgumentException(s"Kudu table name must be specified in create options " +
+        s"using key '$TABLE_KEY'"))
+
+    val kuduMaster = parameters.getOrElse(KUDU_MASTER, "localhost")
+
+    val kuduRelation = new KuduRelation(tableName, kuduMaster)(sqlContext)
+    mode match {
+      case Append | Ignore => kuduRelation.insert(data, overwrite = false)
+      case Overwrite => kuduRelation.insert(data, overwrite = true)
+      case ErrorIfExists =>
+          throw new UnsupportedOperationException(
+            "ErrorIfExists is currently not supported")
+    }
+
+    kuduRelation
+  }
+}
+
+/**
+  * Implementation of Spark BaseRelation.
+  *
+  * @param tableName Kudu table that we plan to read from
+  * @param kuduMaster Kudu master addresses
+  * @param sqlContext SparkSQL context
+  */
+@InterfaceStability.Unstable
+class KuduRelation(private val tableName: String,
+                   private val kuduMaster: String)(
+                   val sqlContext: SQLContext)
+extends BaseRelation
+with PrunedFilteredScan
+with InsertableRelation {
+
+  import KuduRelation._
+
+  private val context: KuduContext = new KuduContext(kuduMaster)
+  private val table: KuduTable = context.syncClient.openTable(tableName)
+
+  override def unhandledFilters(filters: Array[Filter]): Array[Filter] =
+    filters.filterNot(supportsFilter)
+
+  /**
+    * Generates a SparkSQL schema object so SparkSQL knows what is being
+    * provided by this BaseRelation.
+    *
+    * @return schema generated from the Kudu table's schema
+    */
+  override def schema: StructType = {
+    val fields: Array[StructField] =
+      table.getSchema.getColumns.asScala.map { columnSchema =>
+        val sparkType = kuduTypeToSparkType(columnSchema.getType)
+        new StructField(columnSchema.getName, sparkType, columnSchema.isNullable)
+      }.toArray
+
+    new StructType(fields)
+  }
+
+  /**
+    * Build the RDD to scan rows.
+    *
+    * @param requiredColumns columns that are being requested by the requesting query
+    * @param filters         filters that are being applied by the requesting query
+    * @return RDD will all the results from Kudu
+    */
+  override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
+    val predicates = filters.flatMap(filterToPredicate)
+    new KuduRDD(kuduMaster, 1024 * 1024 * 20, requiredColumns, predicates,
+      table, context, sqlContext.sparkContext)
+  }
+
+  /**
+    * Converts a Spark [[Filter]] to a Kudu [[KuduPredicate]].
+    *
+    * @param filter the filter to convert
+    * @return the converted filter
+    */
+  private def filterToPredicate(filter : Filter) : Array[KuduPredicate] = {
+    filter match {
+      case EqualTo(column, value) =>
+        Array(comparisonPredicate(column, ComparisonOp.EQUAL, value))
+      case GreaterThan(column, value) =>
+        Array(comparisonPredicate(column, ComparisonOp.GREATER, value))
+      case GreaterThanOrEqual(column, value) =>
+        Array(comparisonPredicate(column, ComparisonOp.GREATER_EQUAL, value))
+      case LessThan(column, value) =>
+        Array(comparisonPredicate(column, ComparisonOp.LESS, value))
+      case LessThanOrEqual(column, value) =>
+        Array(comparisonPredicate(column, ComparisonOp.LESS_EQUAL, value))
+      case And(left, right) => filterToPredicate(left) ++ filterToPredicate(right)
+      case _ => Array()
+    }
+  }
+
+  /**
+    * Creates a new comparison predicate for the column, comparison operator, and comparison value.
+    *
+    * @param column the column name
+    * @param operator the comparison operator
+    * @param value the comparison value
+    * @return the comparison predicate
+    */
+  private def comparisonPredicate(column: String,
+                                  operator: ComparisonOp,
+                                  value: Any): KuduPredicate = {
+    val columnSchema = table.getSchema.getColumn(column)
+    value match {
+      case value: Boolean => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
+      case value: Byte => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
+      case value: Short => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
+      case value: Int => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
+      case value: Long => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
+      case value: Timestamp => KuduPredicate.newComparisonPredicate(columnSchema, operator, timestampToMicros(value))
+      case value: Float => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
+      case value: Double => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
+      case value: String => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
+      case value: Array[Byte] => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
+    }
+  }
+
+  /**
+    * Inserts data into an existing Kudu table.
+    * @param data [[DataFrame]] to be inserted into Kudu
+    * @param overwrite If True it will update existing records, but will not perform inserts.
+    */
+  override def insert(data: DataFrame, overwrite: Boolean): Unit = {
+    context.writeRows(data, tableName, overwrite)
+  }
+}
+
+private[spark] object KuduRelation {
+  /**
+    * Converts a Kudu [[Type]] to a Spark SQL [[DataType]].
+    *
+    * @param t the Kudu type
+    * @return the corresponding Spark SQL type
+    */
+  private def kuduTypeToSparkType(t: Type): DataType = t match {
+    case Type.BOOL => BooleanType
+    case Type.INT8 => ByteType
+    case Type.INT16 => ShortType
+    case Type.INT32 => IntegerType
+    case Type.INT64 => LongType
+    case Type.TIMESTAMP => TimestampType
+    case Type.FLOAT => FloatType
+    case Type.DOUBLE => DoubleType
+    case Type.STRING => StringType
+    case Type.BINARY => BinaryType
+  }
+
+  /**
+    * Returns `true` if the filter is able to be pushed down to Kudu.
+    *
+    * @param filter the filter to test
+    */
+  private def supportsFilter(filter: Filter): Boolean = filter match {
+    case EqualTo(_, _)
+       | GreaterThan(_, _)
+       | GreaterThanOrEqual(_, _)
+       | LessThan(_, _)
+       | LessThanOrEqual(_, _) => true
+    case And(left, right) => supportsFilter(left) && supportsFilter(right)
+    case _ => false
+  }
+
+  /**
+    * Converts a [[Timestamp]] to microseconds since the Unix epoch (1970-01-01T00:00:00Z).
+    *
+    * @param timestamp the timestamp to convert to microseconds
+    * @return the microseconds since the Unix epoch
+    */
+  def timestampToMicros(timestamp: Timestamp): Long = {
+    // Number of whole milliseconds since the Unix epoch, in microseconds.
+    val millis = timestamp.getTime * 1000
+    // Sub millisecond time since the Unix epoch, in microseconds.
+    val micros = (timestamp.getNanos % 1000000) / 1000
+    if (micros >= 0) {
+      millis + micros
+    } else {
+      millis + 1000000 + micros
+    }
+  }
+
+  /**
+    * Converts a microsecond offset from the Unix epoch (1970-01-01T00:00:00Z) to a [[Timestamp]].
+    *
+    * @param micros the offset in microseconds since the Unix epoch
+    * @return the corresponding timestamp
+    */
+  def microsToTimestamp(micros: Long): Timestamp = {
+    var millis = micros / 1000
+    var nanos = (micros % 1000000) * 1000
+    if (nanos < 0) {
+      millis -= 1
+      nanos += 1000000000
+    }
+
+    val timestamp = new Timestamp(millis)
+    timestamp.setNanos(nanos.asInstanceOf[Int])
+    timestamp
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
new file mode 100644
index 0000000..167ee13
--- /dev/null
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.kududb.spark.kudu
+
+import java.util
+import org.apache.hadoop.util.ShutdownHookManager
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.types.{StructType, DataType, DataTypes}
+import org.kududb.{ColumnSchema, Schema, Type}
+import org.kududb.annotations.InterfaceStability
+import org.kududb.client.SessionConfiguration.FlushMode
+import org.kududb.client._
+
+
+import scala.collection.mutable
+
+/**
+  * KuduContext is a serializable container for Kudu client connections.
+  *
+  * If a Kudu client connection is needed as part of a Spark application, a
+  * [[KuduContext]] should used as a broadcast variable in the job in order to
+  * share connections among the tasks in a JVM.
+  */
+@InterfaceStability.Unstable
+class KuduContext(kuduMaster: String) extends Serializable {
+
+  @transient lazy val syncClient = {
+    KuduConnection.getSyncClient(kuduMaster)
+  }
+
+  @transient lazy val asyncClient = {
+    KuduConnection.getAsyncClient(kuduMaster)
+  }
+
+  /**
+    * Create an RDD from a Kudu table.
+    *
+    * @param tableName          table to read from
+    * @param columnProjection   list of columns to read. Not specifying this at all
+    *                           (i.e. setting to null) or setting to the special
+    *                           string '*' means to project all columns.
+    * @return a new RDD that maps over the given table for the selected columns
+    */
+  def kuduRDD(sc: SparkContext,
+              tableName: String,
+              columnProjection: Seq[String] = Nil): RDD[Row] = {
+    new KuduRDD(kuduMaster, 1024*1024*20, columnProjection.toArray, Array(),
+                syncClient.openTable(tableName), this, sc)
+  }
+
+  /**
+    * Check if kudu table already exists
+    * @param tableName tablename to check
+    * @return true if table exists, false if table does not exist
+    */
+  def tableExists(tableName: String): Boolean = syncClient.tableExists(tableName)
+
+  /**
+    * Delete kudu table
+    * @param tableName tablename to delete
+    * @return DeleteTableResponse
+    */
+  def deleteTable(tableName: String): DeleteTableResponse = syncClient.deleteTable(tableName)
+
+  /**
+    * Creates a kudu table for the given schema. Partitioning can be specified through options.
+    * @param tableName table to create
+    * @param schema struct schema of table
+    * @param keys primary keys of the table
+    * @param options replication and partitioning options for the table
+    */
+  def createTable(tableName: String,
+                  schema: StructType,
+                  keys: Seq[String],
+                  options: CreateTableOptions): KuduTable = {
+    val kuduCols = new util.ArrayList[ColumnSchema]()
+    // add the key columns first, in the order specified
+    for (key <- keys) {
+      val f = schema.fields(schema.fieldIndex(key))
+      kuduCols.add(new ColumnSchema.ColumnSchemaBuilder(f.name, kuduType(f.dataType)).key(true).build())
+    }
+    // now add the non-key columns
+    for (f <- schema.fields.filter(field=> !keys.contains(field.name))) {
+      kuduCols.add(new ColumnSchema.ColumnSchemaBuilder(f.name, kuduType(f.dataType)).nullable(f.nullable).key(false).build())
+    }
+
+    syncClient.createTable(tableName, new Schema(kuduCols), options)
+  }
+
+  /** Map Spark SQL type to Kudu type */
+  def kuduType(dt: DataType) : Type = dt match {
+    case DataTypes.BinaryType => Type.BINARY
+    case DataTypes.BooleanType => Type.BOOL
+    case DataTypes.StringType => Type.STRING
+    case DataTypes.TimestampType => Type.TIMESTAMP
+    case DataTypes.ByteType => Type.INT8
+    case DataTypes.ShortType => Type.INT16
+    case DataTypes.IntegerType => Type.INT32
+    case DataTypes.LongType => Type.INT64
+    case DataTypes.FloatType => Type.FLOAT
+    case DataTypes.DoubleType => Type.DOUBLE
+    case _ => throw new IllegalArgumentException(s"No support for Spark SQL type $dt")
+  }
+
+  /**
+    * Inserts or updates rows in kudu from a [[DataFrame]].
+    * @param data `DataFrame` to insert/update
+    * @param tableName table to perform insertion on
+    * @param overwrite true=update, false=insert
+    */
+  def writeRows(data: DataFrame, tableName: String, overwrite: Boolean) {
+    val schema = data.schema
+    data.foreachPartition(iterator => {
+      val pendingErrors = writeRows(iterator, schema, tableName, overwrite)
+      val errorCount = pendingErrors.getRowErrors.length
+      if (errorCount > 0) {
+        val errors = pendingErrors.getRowErrors.take(5).map(_.getErrorStatus).mkString
+        throw new RuntimeException(
+          s"failed to write $errorCount rows from DataFrame to Kudu; sample errors: $errors")
+      }
+    })
+  }
+
+  /**
+    * Saves partitions of a [[DataFrame]] into Kudu.
+    * @param rows rows to insert or update
+    * @param tableName table to insert or update on
+    */
+  def writeRows(rows: Iterator[Row],
+                schema: StructType,
+                tableName: String,
+                performAsUpdate : Boolean = false): RowErrorsAndOverflowStatus = {
+    val table: KuduTable = syncClient.openTable(tableName)
+    val kuduSchema = table.getSchema
+    val indices: Array[(Int, Int)] = schema.fields.zipWithIndex.map({ case (field, sparkIdx) =>
+      sparkIdx -> table.getSchema.getColumnIndex(field.name)
+    })
+    val session: KuduSession = syncClient.newSession
+    session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND)
+    session.setIgnoreAllDuplicateRows(true)
+    try {
+      for (row <- rows) {
+        val operation = if (performAsUpdate) { table.newUpdate() } else { table.newInsert() }
+        for ((sparkIdx, kuduIdx) <- indices) {
+          if (row.isNullAt(sparkIdx)) {
+            operation.getRow.setNull(kuduIdx)
+          } else schema.fields(sparkIdx).dataType match {
+            case DataTypes.StringType => operation.getRow.addString(kuduIdx, row.getString(sparkIdx))
+            case DataTypes.BinaryType => operation.getRow.addBinary(kuduIdx, row.getAs[Array[Byte]](sparkIdx))
+            case DataTypes.BooleanType => operation.getRow.addBoolean(kuduIdx, row.getBoolean(sparkIdx))
+            case DataTypes.ByteType => operation.getRow.addByte(kuduIdx, row.getByte(sparkIdx))
+            case DataTypes.ShortType => operation.getRow.addShort(kuduIdx, row.getShort(sparkIdx))
+            case DataTypes.IntegerType => operation.getRow.addInt(kuduIdx, row.getInt(sparkIdx))
+            case DataTypes.LongType => operation.getRow.addLong(kuduIdx, row.getLong(sparkIdx))
+            case DataTypes.FloatType => operation.getRow.addFloat(kuduIdx, row.getFloat(sparkIdx))
+            case DataTypes.DoubleType => operation.getRow.addDouble(kuduIdx, row.getDouble(sparkIdx))
+            case DataTypes.TimestampType => operation.getRow.addLong(kuduIdx, KuduRelation.timestampToMicros(row.getTimestamp(sparkIdx)))
+            case t => throw new IllegalArgumentException(s"No support for Spark SQL type $t")
+          }
+        }
+        session.apply(operation)
+      }
+    } finally {
+      session.close()
+    }
+    session.getPendingErrors
+  }
+
+}
+
+private object KuduConnection {
+  private val syncCache = new mutable.HashMap[String, KuduClient]()
+  private val asyncCache = new mutable.HashMap[String, AsyncKuduClient]()
+
+  /**
+    * Set to
+    * [[org.apache.spark.util.ShutdownHookManager.DEFAULT_SHUTDOWN_PRIORITY]].
+    * The client instances are closed through the JVM shutdown hook
+    * mechanism in order to make sure that any unflushed writes are cleaned up
+    * properly. Spark has no shutdown notifications.
+    */
+  private val ShutdownHookPriority = 100
+
+  def getSyncClient(kuduMaster: String): KuduClient = {
+    syncCache.synchronized {
+      if (!syncCache.contains(kuduMaster)) {
+        val syncClient = new KuduClient.KuduClientBuilder(kuduMaster).build()
+        ShutdownHookManager.get().addShutdownHook(new Runnable {
+          override def run() = syncClient.close()
+        }, ShutdownHookPriority)
+        syncCache.put(kuduMaster, syncClient)
+      }
+      return syncCache(kuduMaster)
+    }
+  }
+
+  def getAsyncClient(kuduMaster: String): AsyncKuduClient = {
+    asyncCache.synchronized {
+      if (!asyncCache.contains(kuduMaster)) {
+        val asyncClient = new AsyncKuduClient.AsyncKuduClientBuilder(kuduMaster).build()
+        ShutdownHookManager.get().addShutdownHook(
+          new Runnable {
+            override def run() = asyncClient.close()
+          }, ShutdownHookPriority)
+        asyncCache.put(kuduMaster, asyncClient)
+      }
+      return asyncCache(kuduMaster)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
new file mode 100644
index 0000000..5395d5a
--- /dev/null
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.kududb.spark.kudu
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.Row
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.kududb.client._
+import org.kududb.{Type, client}
+
+import scala.collection.JavaConverters._
+
+/**
+  * A Resilient Distributed Dataset backed by a Kudu table.
+  */
+class KuduRDD(val kuduMaster: String,
+              @transient batchSize: Integer,
+              @transient projectedCols: Array[String],
+              @transient predicates: Array[client.KuduPredicate],
+              @transient table: KuduTable,
+              @transient kc: KuduContext,
+              @transient sc: SparkContext) extends RDD[Row](sc, Nil) {
+
+  /**
+    * The [[KuduContext]] for this `KuduRDD`.
+    *
+    * The `KuduContext` manages the Kudu client instances for the `KuduRDD`.
+    * When the `KuduRDD` is first constructed it uses the context passed in as
+    * `kc`. After deserialization, a new `KuduContext` is created as necessary.
+    * The `kc` field should not be used, since it will not be rehydrated after
+    * serialization.
+    */
+  @transient private lazy val kuduContext: KuduContext = {
+    if (kc != null) kc else new KuduContext(kuduMaster)
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+    val builder = kuduContext.syncClient
+                         .newScanTokenBuilder(table)
+                         .batchSizeBytes(batchSize)
+                         .setProjectedColumnNames(projectedCols.toSeq.asJava)
+                         .cacheBlocks(true)
+
+    for (predicate <- predicates) {
+      builder.addPredicate(predicate)
+    }
+    val tokens = builder.build().asScala
+    tokens.zipWithIndex.map {
+      case (token, index) =>
+        new KuduPartition(index, token.serialize(),
+                          token.getTablet.getReplicas.asScala.map(_.getRpcHost).toArray)
+    }.toArray
+  }
+
+  override def compute(part: Partition, taskContext: TaskContext): Iterator[Row] = {
+    val client: KuduClient = kuduContext.syncClient
+    val partition: KuduPartition = part.asInstanceOf[KuduPartition]
+    val scanner = KuduScanToken.deserializeIntoScanner(partition.scanToken, client)
+    new RowResultIteratorScala(scanner)
+  }
+
+  override def getPreferredLocations(partition: Partition): Seq[String] = {
+    partition.asInstanceOf[KuduPartition].locations
+  }
+}
+
+/**
+  * A Spark SQL [[Partition]] which wraps a [[KuduScanToken]].
+  */
+private[spark] class KuduPartition(val index: Int,
+                                   val scanToken: Array[Byte],
+                                   val locations : Array[String]) extends Partition {}
+
+/**
+  * A Spark SQL [[Row]] iterator which wraps a [[KuduScanner]].
+  * @param scanner the wrapped scanner
+  */
+private[spark] class RowResultIteratorScala(private val scanner: KuduScanner) extends Iterator[Row] {
+
+  private var currentIterator: RowResultIterator = null
+
+  override def hasNext: Boolean = {
+    if ((currentIterator != null && !currentIterator.hasNext && scanner.hasMoreRows) ||
+      (scanner.hasMoreRows && currentIterator == null)) {
+      currentIterator = scanner.nextRows()
+    }
+    currentIterator.hasNext
+  }
+
+  override def next(): Row = new KuduRow(currentIterator.next())
+}
+
+/**
+  * A Spark SQL [[Row]] which wraps a Kudu [[RowResult]].
+  * @param rowResult the wrapped row result
+  */
+private[spark] class KuduRow(private val rowResult: RowResult) extends Row {
+  override def length: Int = rowResult.getColumnProjection.getColumnCount
+
+  override def get(i: Int): Any = {
+    if (rowResult.isNull(i)) null
+    else rowResult.getColumnType(i) match {
+      case Type.BOOL => rowResult.getBoolean(i)
+      case Type.INT8 => rowResult.getByte(i)
+      case Type.INT16 => rowResult.getShort(i)
+      case Type.INT32 => rowResult.getInt(i)
+      case Type.INT64 => rowResult.getLong(i)
+      case Type.TIMESTAMP => KuduRelation.microsToTimestamp(rowResult.getLong(i))
+      case Type.FLOAT => rowResult.getFloat(i)
+      case Type.DOUBLE => rowResult.getDouble(i)
+      case Type.STRING => rowResult.getString(i)
+      case Type.BINARY => rowResult.getBinary(i)
+    }
+  }
+
+  override def copy(): Row = Row.fromSeq(Range(0, length).map(get))
+
+  override def toString(): String = rowResult.toString
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/package.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/package.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/package.scala
new file mode 100755
index 0000000..4203e31
--- /dev/null
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/package.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.kududb.spark
+
+import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter}
+
+package object kudu {
+
+  /**
+   * Adds a method, `kudu`, to DataFrameReader that allows you to read Kudu tables using
+   * the DataFrameReader.
+   */
+  implicit class KuduDataFrameReader(reader: DataFrameReader) {
+    def kudu: DataFrame = reader.format("org.kududb.spark.kudu").load
+  }
+
+  /**
+    * Adds a method, `kudu`, to DataFrameWriter that allows writes to Kudu using
+    * the DataFileWriter
+    */
+    implicit class KuduDataFrameWriter(writer: DataFrameWriter) {
+      def kudu = writer.format("org.kududb.spark.kudu").save
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/DefaultSource.scala
deleted file mode 100644
index 95145d5..0000000
--- a/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/DefaultSource.scala
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.kududb.spark.kudu
-
-import java.sql.Timestamp
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types._
-import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
-import org.kududb.Type
-import org.kududb.annotations.InterfaceStability
-import org.kududb.client._
-import org.kududb.client.KuduPredicate.ComparisonOp
-import org.kududb.client.SessionConfiguration.FlushMode
-import org.apache.spark.sql.SaveMode._
-
-import scala.collection.JavaConverters._
-
-/**
-  * DefaultSource for integration with Spark's dataframe datasources.
-  * This class will produce a relationProvider based on input given to it from spark.
-  */
-@InterfaceStability.Unstable
-class DefaultSource extends RelationProvider with CreatableRelationProvider {
-
-  val TABLE_KEY = "kudu.table"
-  val KUDU_MASTER = "kudu.master"
-
-  /**
-    * Construct a BaseRelation using the provided context and parameters.
-    *
-    * @param sqlContext SparkSQL context
-    * @param parameters parameters given to us from SparkSQL
-    * @return           a BaseRelation Object
-    */
-  override def createRelation(sqlContext: SQLContext,
-                              parameters: Map[String, String]):
-  BaseRelation = {
-    val tableName = parameters.getOrElse(TABLE_KEY,
-      throw new IllegalArgumentException(s"Kudu table name must be specified in create options " +
-        s"using key '$TABLE_KEY'"))
-    val kuduMaster = parameters.getOrElse(KUDU_MASTER, "localhost")
-
-    new KuduRelation(tableName, kuduMaster)(sqlContext)
-  }
-
-  /**
-    * Creates a relation and inserts data to specified table.
-    *
-    * @param sqlContext
-    * @param mode Append will not overwrite existing data, Overwrite will perform update, but will
-    *             not insert data, use upsert on KuduContext if you require both
-    * @param parameters Nessisary parameters for kudu.table and kudu.master
-    * @param data Dataframe to save into kudu
-    * @return returns populated base relation
-    */
-  override def createRelation(sqlContext: SQLContext, mode: SaveMode,
-                              parameters: Map[String, String], data: DataFrame): BaseRelation = {
-    val tableName = parameters.getOrElse(TABLE_KEY,
-      throw new IllegalArgumentException(s"Kudu table name must be specified in create options " +
-        s"using key '$TABLE_KEY'"))
-
-    val kuduMaster = parameters.getOrElse(KUDU_MASTER, "localhost")
-
-    val kuduRelation = new KuduRelation(tableName, kuduMaster)(sqlContext)
-    mode match {
-      case Append | Ignore => kuduRelation.insert(data, overwrite = false)
-      case Overwrite => kuduRelation.insert(data, overwrite = true)
-      case ErrorIfExists =>
-          throw new UnsupportedOperationException(
-            "ErrorIfExists is currently not supported")
-    }
-
-    kuduRelation
-  }
-}
-
-/**
-  * Implementation of Spark BaseRelation.
-  *
-  * @param tableName Kudu table that we plan to read from
-  * @param kuduMaster Kudu master addresses
-  * @param sqlContext SparkSQL context
-  */
-@InterfaceStability.Unstable
-class KuduRelation(private val tableName: String,
-                   private val kuduMaster: String)(
-                   val sqlContext: SQLContext)
-extends BaseRelation
-with PrunedFilteredScan
-with InsertableRelation {
-
-  import KuduRelation._
-
-  private val context: KuduContext = new KuduContext(kuduMaster)
-  private val table: KuduTable = context.syncClient.openTable(tableName)
-
-  override def unhandledFilters(filters: Array[Filter]): Array[Filter] =
-    filters.filterNot(supportsFilter)
-
-  /**
-    * Generates a SparkSQL schema object so SparkSQL knows what is being
-    * provided by this BaseRelation.
-    *
-    * @return schema generated from the Kudu table's schema
-    */
-  override def schema: StructType = {
-    val fields: Array[StructField] =
-      table.getSchema.getColumns.asScala.map { columnSchema =>
-        val sparkType = kuduTypeToSparkType(columnSchema.getType)
-        new StructField(columnSchema.getName, sparkType, columnSchema.isNullable)
-      }.toArray
-
-    new StructType(fields)
-  }
-
-  /**
-    * Build the RDD to scan rows.
-    *
-    * @param requiredColumns columns that are being requested by the requesting query
-    * @param filters         filters that are being applied by the requesting query
-    * @return RDD will all the results from Kudu
-    */
-  override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
-    val predicates = filters.flatMap(filterToPredicate)
-    new KuduRDD(kuduMaster, 1024 * 1024 * 20, requiredColumns, predicates,
-      table, context, sqlContext.sparkContext)
-  }
-
-  /**
-    * Converts a Spark [[Filter]] to a Kudu [[KuduPredicate]].
-    *
-    * @param filter the filter to convert
-    * @return the converted filter
-    */
-  private def filterToPredicate(filter : Filter) : Array[KuduPredicate] = {
-    filter match {
-      case EqualTo(column, value) =>
-        Array(comparisonPredicate(column, ComparisonOp.EQUAL, value))
-      case GreaterThan(column, value) =>
-        Array(comparisonPredicate(column, ComparisonOp.GREATER, value))
-      case GreaterThanOrEqual(column, value) =>
-        Array(comparisonPredicate(column, ComparisonOp.GREATER_EQUAL, value))
-      case LessThan(column, value) =>
-        Array(comparisonPredicate(column, ComparisonOp.LESS, value))
-      case LessThanOrEqual(column, value) =>
-        Array(comparisonPredicate(column, ComparisonOp.LESS_EQUAL, value))
-      case And(left, right) => filterToPredicate(left) ++ filterToPredicate(right)
-      case _ => Array()
-    }
-  }
-
-  /**
-    * Creates a new comparison predicate for the column, comparison operator, and comparison value.
-    *
-    * @param column the column name
-    * @param operator the comparison operator
-    * @param value the comparison value
-    * @return the comparison predicate
-    */
-  private def comparisonPredicate(column: String,
-                                  operator: ComparisonOp,
-                                  value: Any): KuduPredicate = {
-    val columnSchema = table.getSchema.getColumn(column)
-    value match {
-      case value: Boolean => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
-      case value: Byte => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
-      case value: Short => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
-      case value: Int => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
-      case value: Long => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
-      case value: Timestamp => KuduPredicate.newComparisonPredicate(columnSchema, operator, timestampToMicros(value))
-      case value: Float => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
-      case value: Double => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
-      case value: String => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
-      case value: Array[Byte] => KuduPredicate.newComparisonPredicate(columnSchema, operator, value)
-    }
-  }
-
-  /**
-    * Inserts data into an existing Kudu table.
-    * @param data [[DataFrame]] to be inserted into Kudu
-    * @param overwrite If True it will update existing records, but will not perform inserts.
-    */
-  override def insert(data: DataFrame, overwrite: Boolean): Unit = {
-    context.writeRows(data, tableName, overwrite)
-  }
-}
-
-private[spark] object KuduRelation {
-  /**
-    * Converts a Kudu [[Type]] to a Spark SQL [[DataType]].
-    *
-    * @param t the Kudu type
-    * @return the corresponding Spark SQL type
-    */
-  private def kuduTypeToSparkType(t: Type): DataType = t match {
-    case Type.BOOL => BooleanType
-    case Type.INT8 => ByteType
-    case Type.INT16 => ShortType
-    case Type.INT32 => IntegerType
-    case Type.INT64 => LongType
-    case Type.TIMESTAMP => TimestampType
-    case Type.FLOAT => FloatType
-    case Type.DOUBLE => DoubleType
-    case Type.STRING => StringType
-    case Type.BINARY => BinaryType
-  }
-
-  /**
-    * Returns `true` if the filter is able to be pushed down to Kudu.
-    *
-    * @param filter the filter to test
-    */
-  private def supportsFilter(filter: Filter): Boolean = filter match {
-    case EqualTo(_, _)
-       | GreaterThan(_, _)
-       | GreaterThanOrEqual(_, _)
-       | LessThan(_, _)
-       | LessThanOrEqual(_, _) => true
-    case And(left, right) => supportsFilter(left) && supportsFilter(right)
-    case _ => false
-  }
-
-  /**
-    * Converts a [[Timestamp]] to microseconds since the Unix epoch (1970-01-01T00:00:00Z).
-    *
-    * @param timestamp the timestamp to convert to microseconds
-    * @return the microseconds since the Unix epoch
-    */
-  def timestampToMicros(timestamp: Timestamp): Long = {
-    // Number of whole milliseconds since the Unix epoch, in microseconds.
-    val millis = timestamp.getTime * 1000
-    // Sub millisecond time since the Unix epoch, in microseconds.
-    val micros = (timestamp.getNanos % 1000000) / 1000
-    if (micros >= 0) {
-      millis + micros
-    } else {
-      millis + 1000000 + micros
-    }
-  }
-
-  /**
-    * Converts a microsecond offset from the Unix epoch (1970-01-01T00:00:00Z) to a [[Timestamp]].
-    *
-    * @param micros the offset in microseconds since the Unix epoch
-    * @return the corresponding timestamp
-    */
-  def microsToTimestamp(micros: Long): Timestamp = {
-    var millis = micros / 1000
-    var nanos = (micros % 1000000) * 1000
-    if (nanos < 0) {
-      millis -= 1
-      nanos += 1000000000
-    }
-
-    val timestamp = new Timestamp(millis)
-    timestamp.setNanos(nanos.asInstanceOf[Int])
-    timestamp
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduContext.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduContext.scala
deleted file mode 100644
index 167ee13..0000000
--- a/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduContext.scala
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.kududb.spark.kudu
-
-import java.util
-import org.apache.hadoop.util.ShutdownHookManager
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row}
-import org.apache.spark.sql.types.{StructType, DataType, DataTypes}
-import org.kududb.{ColumnSchema, Schema, Type}
-import org.kududb.annotations.InterfaceStability
-import org.kududb.client.SessionConfiguration.FlushMode
-import org.kududb.client._
-
-
-import scala.collection.mutable
-
-/**
-  * KuduContext is a serializable container for Kudu client connections.
-  *
-  * If a Kudu client connection is needed as part of a Spark application, a
-  * [[KuduContext]] should used as a broadcast variable in the job in order to
-  * share connections among the tasks in a JVM.
-  */
-@InterfaceStability.Unstable
-class KuduContext(kuduMaster: String) extends Serializable {
-
-  @transient lazy val syncClient = {
-    KuduConnection.getSyncClient(kuduMaster)
-  }
-
-  @transient lazy val asyncClient = {
-    KuduConnection.getAsyncClient(kuduMaster)
-  }
-
-  /**
-    * Create an RDD from a Kudu table.
-    *
-    * @param tableName          table to read from
-    * @param columnProjection   list of columns to read. Not specifying this at all
-    *                           (i.e. setting to null) or setting to the special
-    *                           string '*' means to project all columns.
-    * @return a new RDD that maps over the given table for the selected columns
-    */
-  def kuduRDD(sc: SparkContext,
-              tableName: String,
-              columnProjection: Seq[String] = Nil): RDD[Row] = {
-    new KuduRDD(kuduMaster, 1024*1024*20, columnProjection.toArray, Array(),
-                syncClient.openTable(tableName), this, sc)
-  }
-
-  /**
-    * Check if kudu table already exists
-    * @param tableName tablename to check
-    * @return true if table exists, false if table does not exist
-    */
-  def tableExists(tableName: String): Boolean = syncClient.tableExists(tableName)
-
-  /**
-    * Delete kudu table
-    * @param tableName tablename to delete
-    * @return DeleteTableResponse
-    */
-  def deleteTable(tableName: String): DeleteTableResponse = syncClient.deleteTable(tableName)
-
-  /**
-    * Creates a kudu table for the given schema. Partitioning can be specified through options.
-    * @param tableName table to create
-    * @param schema struct schema of table
-    * @param keys primary keys of the table
-    * @param options replication and partitioning options for the table
-    */
-  def createTable(tableName: String,
-                  schema: StructType,
-                  keys: Seq[String],
-                  options: CreateTableOptions): KuduTable = {
-    val kuduCols = new util.ArrayList[ColumnSchema]()
-    // add the key columns first, in the order specified
-    for (key <- keys) {
-      val f = schema.fields(schema.fieldIndex(key))
-      kuduCols.add(new ColumnSchema.ColumnSchemaBuilder(f.name, kuduType(f.dataType)).key(true).build())
-    }
-    // now add the non-key columns
-    for (f <- schema.fields.filter(field=> !keys.contains(field.name))) {
-      kuduCols.add(new ColumnSchema.ColumnSchemaBuilder(f.name, kuduType(f.dataType)).nullable(f.nullable).key(false).build())
-    }
-
-    syncClient.createTable(tableName, new Schema(kuduCols), options)
-  }
-
-  /** Map Spark SQL type to Kudu type */
-  def kuduType(dt: DataType) : Type = dt match {
-    case DataTypes.BinaryType => Type.BINARY
-    case DataTypes.BooleanType => Type.BOOL
-    case DataTypes.StringType => Type.STRING
-    case DataTypes.TimestampType => Type.TIMESTAMP
-    case DataTypes.ByteType => Type.INT8
-    case DataTypes.ShortType => Type.INT16
-    case DataTypes.IntegerType => Type.INT32
-    case DataTypes.LongType => Type.INT64
-    case DataTypes.FloatType => Type.FLOAT
-    case DataTypes.DoubleType => Type.DOUBLE
-    case _ => throw new IllegalArgumentException(s"No support for Spark SQL type $dt")
-  }
-
-  /**
-    * Inserts or updates rows in kudu from a [[DataFrame]].
-    * @param data `DataFrame` to insert/update
-    * @param tableName table to perform insertion on
-    * @param overwrite true=update, false=insert
-    */
-  def writeRows(data: DataFrame, tableName: String, overwrite: Boolean) {
-    val schema = data.schema
-    data.foreachPartition(iterator => {
-      val pendingErrors = writeRows(iterator, schema, tableName, overwrite)
-      val errorCount = pendingErrors.getRowErrors.length
-      if (errorCount > 0) {
-        val errors = pendingErrors.getRowErrors.take(5).map(_.getErrorStatus).mkString
-        throw new RuntimeException(
-          s"failed to write $errorCount rows from DataFrame to Kudu; sample errors: $errors")
-      }
-    })
-  }
-
-  /**
-    * Saves partitions of a [[DataFrame]] into Kudu.
-    * @param rows rows to insert or update
-    * @param tableName table to insert or update on
-    */
-  def writeRows(rows: Iterator[Row],
-                schema: StructType,
-                tableName: String,
-                performAsUpdate : Boolean = false): RowErrorsAndOverflowStatus = {
-    val table: KuduTable = syncClient.openTable(tableName)
-    val kuduSchema = table.getSchema
-    val indices: Array[(Int, Int)] = schema.fields.zipWithIndex.map({ case (field, sparkIdx) =>
-      sparkIdx -> table.getSchema.getColumnIndex(field.name)
-    })
-    val session: KuduSession = syncClient.newSession
-    session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND)
-    session.setIgnoreAllDuplicateRows(true)
-    try {
-      for (row <- rows) {
-        val operation = if (performAsUpdate) { table.newUpdate() } else { table.newInsert() }
-        for ((sparkIdx, kuduIdx) <- indices) {
-          if (row.isNullAt(sparkIdx)) {
-            operation.getRow.setNull(kuduIdx)
-          } else schema.fields(sparkIdx).dataType match {
-            case DataTypes.StringType => operation.getRow.addString(kuduIdx, row.getString(sparkIdx))
-            case DataTypes.BinaryType => operation.getRow.addBinary(kuduIdx, row.getAs[Array[Byte]](sparkIdx))
-            case DataTypes.BooleanType => operation.getRow.addBoolean(kuduIdx, row.getBoolean(sparkIdx))
-            case DataTypes.ByteType => operation.getRow.addByte(kuduIdx, row.getByte(sparkIdx))
-            case DataTypes.ShortType => operation.getRow.addShort(kuduIdx, row.getShort(sparkIdx))
-            case DataTypes.IntegerType => operation.getRow.addInt(kuduIdx, row.getInt(sparkIdx))
-            case DataTypes.LongType => operation.getRow.addLong(kuduIdx, row.getLong(sparkIdx))
-            case DataTypes.FloatType => operation.getRow.addFloat(kuduIdx, row.getFloat(sparkIdx))
-            case DataTypes.DoubleType => operation.getRow.addDouble(kuduIdx, row.getDouble(sparkIdx))
-            case DataTypes.TimestampType => operation.getRow.addLong(kuduIdx, KuduRelation.timestampToMicros(row.getTimestamp(sparkIdx)))
-            case t => throw new IllegalArgumentException(s"No support for Spark SQL type $t")
-          }
-        }
-        session.apply(operation)
-      }
-    } finally {
-      session.close()
-    }
-    session.getPendingErrors
-  }
-
-}
-
-private object KuduConnection {
-  private val syncCache = new mutable.HashMap[String, KuduClient]()
-  private val asyncCache = new mutable.HashMap[String, AsyncKuduClient]()
-
-  /**
-    * Set to
-    * [[org.apache.spark.util.ShutdownHookManager.DEFAULT_SHUTDOWN_PRIORITY]].
-    * The client instances are closed through the JVM shutdown hook
-    * mechanism in order to make sure that any unflushed writes are cleaned up
-    * properly. Spark has no shutdown notifications.
-    */
-  private val ShutdownHookPriority = 100
-
-  def getSyncClient(kuduMaster: String): KuduClient = {
-    syncCache.synchronized {
-      if (!syncCache.contains(kuduMaster)) {
-        val syncClient = new KuduClient.KuduClientBuilder(kuduMaster).build()
-        ShutdownHookManager.get().addShutdownHook(new Runnable {
-          override def run() = syncClient.close()
-        }, ShutdownHookPriority)
-        syncCache.put(kuduMaster, syncClient)
-      }
-      return syncCache(kuduMaster)
-    }
-  }
-
-  def getAsyncClient(kuduMaster: String): AsyncKuduClient = {
-    asyncCache.synchronized {
-      if (!asyncCache.contains(kuduMaster)) {
-        val asyncClient = new AsyncKuduClient.AsyncKuduClientBuilder(kuduMaster).build()
-        ShutdownHookManager.get().addShutdownHook(
-          new Runnable {
-            override def run() = asyncClient.close()
-          }, ShutdownHookPriority)
-        asyncCache.put(kuduMaster, asyncClient)
-      }
-      return asyncCache(kuduMaster)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduRDD.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduRDD.scala
deleted file mode 100644
index 5395d5a..0000000
--- a/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/KuduRDD.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.kududb.spark.kudu
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Row
-import org.apache.spark.{Partition, SparkContext, TaskContext}
-import org.kududb.client._
-import org.kududb.{Type, client}
-
-import scala.collection.JavaConverters._
-
-/**
-  * A Resilient Distributed Dataset backed by a Kudu table.
-  */
-class KuduRDD(val kuduMaster: String,
-              @transient batchSize: Integer,
-              @transient projectedCols: Array[String],
-              @transient predicates: Array[client.KuduPredicate],
-              @transient table: KuduTable,
-              @transient kc: KuduContext,
-              @transient sc: SparkContext) extends RDD[Row](sc, Nil) {
-
-  /**
-    * The [[KuduContext]] for this `KuduRDD`.
-    *
-    * The `KuduContext` manages the Kudu client instances for the `KuduRDD`.
-    * When the `KuduRDD` is first constructed it uses the context passed in as
-    * `kc`. After deserialization, a new `KuduContext` is created as necessary.
-    * The `kc` field should not be used, since it will not be rehydrated after
-    * serialization.
-    */
-  @transient private lazy val kuduContext: KuduContext = {
-    if (kc != null) kc else new KuduContext(kuduMaster)
-  }
-
-  override protected def getPartitions: Array[Partition] = {
-    val builder = kuduContext.syncClient
-                         .newScanTokenBuilder(table)
-                         .batchSizeBytes(batchSize)
-                         .setProjectedColumnNames(projectedCols.toSeq.asJava)
-                         .cacheBlocks(true)
-
-    for (predicate <- predicates) {
-      builder.addPredicate(predicate)
-    }
-    val tokens = builder.build().asScala
-    tokens.zipWithIndex.map {
-      case (token, index) =>
-        new KuduPartition(index, token.serialize(),
-                          token.getTablet.getReplicas.asScala.map(_.getRpcHost).toArray)
-    }.toArray
-  }
-
-  override def compute(part: Partition, taskContext: TaskContext): Iterator[Row] = {
-    val client: KuduClient = kuduContext.syncClient
-    val partition: KuduPartition = part.asInstanceOf[KuduPartition]
-    val scanner = KuduScanToken.deserializeIntoScanner(partition.scanToken, client)
-    new RowResultIteratorScala(scanner)
-  }
-
-  override def getPreferredLocations(partition: Partition): Seq[String] = {
-    partition.asInstanceOf[KuduPartition].locations
-  }
-}
-
-/**
-  * A Spark SQL [[Partition]] which wraps a [[KuduScanToken]].
-  */
-private[spark] class KuduPartition(val index: Int,
-                                   val scanToken: Array[Byte],
-                                   val locations : Array[String]) extends Partition {}
-
-/**
-  * A Spark SQL [[Row]] iterator which wraps a [[KuduScanner]].
-  * @param scanner the wrapped scanner
-  */
-private[spark] class RowResultIteratorScala(private val scanner: KuduScanner) extends Iterator[Row] {
-
-  private var currentIterator: RowResultIterator = null
-
-  override def hasNext: Boolean = {
-    if ((currentIterator != null && !currentIterator.hasNext && scanner.hasMoreRows) ||
-      (scanner.hasMoreRows && currentIterator == null)) {
-      currentIterator = scanner.nextRows()
-    }
-    currentIterator.hasNext
-  }
-
-  override def next(): Row = new KuduRow(currentIterator.next())
-}
-
-/**
-  * A Spark SQL [[Row]] which wraps a Kudu [[RowResult]].
-  * @param rowResult the wrapped row result
-  */
-private[spark] class KuduRow(private val rowResult: RowResult) extends Row {
-  override def length: Int = rowResult.getColumnProjection.getColumnCount
-
-  override def get(i: Int): Any = {
-    if (rowResult.isNull(i)) null
-    else rowResult.getColumnType(i) match {
-      case Type.BOOL => rowResult.getBoolean(i)
-      case Type.INT8 => rowResult.getByte(i)
-      case Type.INT16 => rowResult.getShort(i)
-      case Type.INT32 => rowResult.getInt(i)
-      case Type.INT64 => rowResult.getLong(i)
-      case Type.TIMESTAMP => KuduRelation.microsToTimestamp(rowResult.getLong(i))
-      case Type.FLOAT => rowResult.getFloat(i)
-      case Type.DOUBLE => rowResult.getDouble(i)
-      case Type.STRING => rowResult.getString(i)
-      case Type.BINARY => rowResult.getBinary(i)
-    }
-  }
-
-  override def copy(): Row = Row.fromSeq(Range(0, length).map(get))
-
-  override def toString(): String = rowResult.toString
-}

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/5c305689/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/package.scala
----------------------------------------------------------------------
diff --git a/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/package.scala b/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/package.scala
deleted file mode 100755
index 4203e31..0000000
--- a/java/kudu-spark/src/main/scala/org/kududb/spark/kudu/package.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.kududb.spark
-
-import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter}
-
-package object kudu {
-
-  /**
-   * Adds a method, `kudu`, to DataFrameReader that allows you to read Kudu tables using
-   * the DataFrameReader.
-   */
-  implicit class KuduDataFrameReader(reader: DataFrameReader) {
-    def kudu: DataFrame = reader.format("org.kududb.spark.kudu").load
-  }
-
-  /**
-    * Adds a method, `kudu`, to DataFrameWriter that allows writes to Kudu using
-    * the DataFileWriter
-    */
-    implicit class KuduDataFrameWriter(writer: DataFrameWriter) {
-      def kudu = writer.format("org.kududb.spark.kudu").save
-    }
-}


Mime
View raw message