spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject [1/3] spark git commit: [SPARK-8906][SQL] Move all internal data source classes into execution.datasources.
Date Tue, 21 Jul 2015 18:56:42 GMT
Repository: spark
Updated Branches:
  refs/heads/master 9ba7c64de -> 60c0ce134


http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
deleted file mode 100644
index 5c6ef2d..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ /dev/null
@@ -1,581 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources
-
-import java.util.{Date, UUID}
-
-import scala.collection.JavaConversions.asScalaIterator
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
-
-import org.apache.spark._
-import org.apache.spark.mapred.SparkHadoopMapRedUtil
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
-import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.types.StringType
-import org.apache.spark.util.SerializableConfiguration
-
-private[sql] case class InsertIntoDataSource(
-    logicalRelation: LogicalRelation,
-    query: LogicalPlan,
-    overwrite: Boolean)
-  extends RunnableCommand {
-
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
-    val data = DataFrame(sqlContext, query)
-    // Apply the schema of the existing table to the new data.
-    val df = sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
-    relation.insert(df, overwrite)
-
-    // Invalidate the cache.
-    sqlContext.cacheManager.invalidateCache(logicalRelation)
-
-    Seq.empty[Row]
-  }
-}
-
-/**
- * A command for writing data to a [[HadoopFsRelation]].  Supports both overwriting and appending.
- * Writing to dynamic partitions is also supported.  Each [[InsertIntoHadoopFsRelation]] issues a
- * single write job, and owns a UUID that identifies this job.  Each concrete implementation of
- * [[HadoopFsRelation]] should use this UUID together with task id to generate unique file path for
- * each task output file.  This UUID is passed to executor side via a property named
- * `spark.sql.sources.writeJobUUID`.
- *
- * Different writer containers, [[DefaultWriterContainer]] and [[DynamicPartitionWriterContainer]]
- * are used to write to normal tables and tables with dynamic partitions.
- *
- * Basic work flow of this command is:
- *
- *   1. Driver side setup, including output committer initialization and data source specific
- *      preparation work for the write job to be issued.
- *   2. Issues a write job consists of one or more executor side tasks, each of which writes all
- *      rows within an RDD partition.
- *   3. If no exception is thrown in a task, commits that task, otherwise aborts that task;  If any
- *      exception is thrown during task commitment, also aborts that task.
- *   4. If all tasks are committed, commit the job, otherwise aborts the job;  If any exception is
- *      thrown during job commitment, also aborts the job.
- */
-private[sql] case class InsertIntoHadoopFsRelation(
-    @transient relation: HadoopFsRelation,
-    @transient query: LogicalPlan,
-    mode: SaveMode)
-  extends RunnableCommand {
-
-  override def run(sqlContext: SQLContext): Seq[Row] = {
-    require(
-      relation.paths.length == 1,
-      s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}")
-
-    val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
-    val outputPath = new Path(relation.paths.head)
-    val fs = outputPath.getFileSystem(hadoopConf)
-    val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
-
-    val pathExists = fs.exists(qualifiedOutputPath)
-    val doInsertion = (mode, pathExists) match {
-      case (SaveMode.ErrorIfExists, true) =>
-        sys.error(s"path $qualifiedOutputPath already exists.")
-      case (SaveMode.Overwrite, true) =>
-        fs.delete(qualifiedOutputPath, true)
-        true
-      case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) =>
-        true
-      case (SaveMode.Ignore, exists) =>
-        !exists
-    }
-    // If we are appending data to an existing dir.
-    val isAppend = pathExists && (mode == SaveMode.Append)
-
-    if (doInsertion) {
-      val job = new Job(hadoopConf)
-      job.setOutputKeyClass(classOf[Void])
-      job.setOutputValueClass(classOf[InternalRow])
-      FileOutputFormat.setOutputPath(job, qualifiedOutputPath)
-
-      // We create a DataFrame by applying the schema of relation to the data to make sure.
-      // We are writing data based on the expected schema,
-      val df = {
-        // For partitioned relation r, r.schema's column ordering can be different from the column
-        // ordering of data.logicalPlan (partition columns are all moved after data column). We
-        // need a Project to adjust the ordering, so that inside InsertIntoHadoopFsRelation, we can
-        // safely apply the schema of r.schema to the data.
-        val project = Project(
-          relation.schema.map(field => new UnresolvedAttribute(Seq(field.name))), query)
-
-        sqlContext.internalCreateDataFrame(
-          DataFrame(sqlContext, project).queryExecution.toRdd, relation.schema)
-      }
-
-      val partitionColumns = relation.partitionColumns.fieldNames
-      if (partitionColumns.isEmpty) {
-        insert(new DefaultWriterContainer(relation, job, isAppend), df)
-      } else {
-        val writerContainer = new DynamicPartitionWriterContainer(
-          relation, job, partitionColumns, PartitioningUtils.DEFAULT_PARTITION_NAME, isAppend)
-        insertWithDynamicPartitions(sqlContext, writerContainer, df, partitionColumns)
-      }
-    }
-
-    Seq.empty[Row]
-  }
-
-  /**
-   * Inserts the content of the [[DataFrame]] into a table without any partitioning columns.
-   */
-  private def insert(writerContainer: BaseWriterContainer, df: DataFrame): Unit = {
-    // Uses local vals for serialization
-    val needsConversion = relation.needConversion
-    val dataSchema = relation.dataSchema
-
-    // This call shouldn't be put into the `try` block below because it only initializes and
-    // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
-    writerContainer.driverSideSetup()
-
-    try {
-      df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _)
-      writerContainer.commitJob()
-      relation.refresh()
-    } catch { case cause: Throwable =>
-      logError("Aborting job.", cause)
-      writerContainer.abortJob()
-      throw new SparkException("Job aborted.", cause)
-    }
-
-    def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
-      // If anything below fails, we should abort the task.
-      try {
-        writerContainer.executorSideSetup(taskContext)
-
-        val converter: InternalRow => Row = if (needsConversion) {
-          CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row]
-        } else {
-          r: InternalRow => r.asInstanceOf[Row]
-        }
-        while (iterator.hasNext) {
-          val internalRow = iterator.next()
-          writerContainer.outputWriterForRow(internalRow).write(converter(internalRow))
-        }
-
-        writerContainer.commitTask()
-      } catch { case cause: Throwable =>
-        logError("Aborting task.", cause)
-        writerContainer.abortTask()
-        throw new SparkException("Task failed while writing rows.", cause)
-      }
-    }
-  }
-
-  /**
-   * Inserts the content of the [[DataFrame]] into a table with partitioning columns.
-   */
-  private def insertWithDynamicPartitions(
-      sqlContext: SQLContext,
-      writerContainer: BaseWriterContainer,
-      df: DataFrame,
-      partitionColumns: Array[String]): Unit = {
-    // Uses a local val for serialization
-    val needsConversion = relation.needConversion
-    val dataSchema = relation.dataSchema
-
-    require(
-      df.schema == relation.schema,
-      s"""DataFrame must have the same schema as the relation to which is inserted.
-         |DataFrame schema: ${df.schema}
-         |Relation schema: ${relation.schema}
-       """.stripMargin)
-
-    val partitionColumnsInSpec = relation.partitionColumns.fieldNames
-    require(
-      partitionColumnsInSpec.sameElements(partitionColumns),
-      s"""Partition columns mismatch.
-         |Expected: ${partitionColumnsInSpec.mkString(", ")}
-         |Actual: ${partitionColumns.mkString(", ")}
-       """.stripMargin)
-
-    val output = df.queryExecution.executedPlan.output
-    val (partitionOutput, dataOutput) = output.partition(a => partitionColumns.contains(a.name))
-    val codegenEnabled = df.sqlContext.conf.codegenEnabled
-
-    // This call shouldn't be put into the `try` block below because it only initializes and
-    // prepares the job, any exception thrown from here shouldn't cause abortJob() to be called.
-    writerContainer.driverSideSetup()
-
-    try {
-      df.sqlContext.sparkContext.runJob(df.queryExecution.toRdd, writeRows _)
-      writerContainer.commitJob()
-      relation.refresh()
-    } catch { case cause: Throwable =>
-      logError("Aborting job.", cause)
-      writerContainer.abortJob()
-      throw new SparkException("Job aborted.", cause)
-    }
-
-    def writeRows(taskContext: TaskContext, iterator: Iterator[InternalRow]): Unit = {
-      // If anything below fails, we should abort the task.
-      try {
-        writerContainer.executorSideSetup(taskContext)
-
-        // Projects all partition columns and casts them to strings to build partition directories.
-        val partitionCasts = partitionOutput.map(Cast(_, StringType))
-        val partitionProj = newProjection(codegenEnabled, partitionCasts, output)
-        val dataProj = newProjection(codegenEnabled, dataOutput, output)
-
-        val dataConverter: InternalRow => Row = if (needsConversion) {
-          CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row]
-        } else {
-          r: InternalRow => r.asInstanceOf[Row]
-        }
-
-        while (iterator.hasNext) {
-          val internalRow = iterator.next()
-          val partitionPart = partitionProj(internalRow)
-          val dataPart = dataConverter(dataProj(internalRow))
-          writerContainer.outputWriterForRow(partitionPart).write(dataPart)
-        }
-
-        writerContainer.commitTask()
-      } catch { case cause: Throwable =>
-        logError("Aborting task.", cause)
-        writerContainer.abortTask()
-        throw new SparkException("Task failed while writing rows.", cause)
-      }
-    }
-  }
-
-  // This is copied from SparkPlan, probably should move this to a more general place.
-  private def newProjection(
-      codegenEnabled: Boolean,
-      expressions: Seq[Expression],
-      inputSchema: Seq[Attribute]): Projection = {
-    log.debug(
-      s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
-    if (codegenEnabled) {
-
-      try {
-        GenerateProjection.generate(expressions, inputSchema)
-      } catch {
-        case e: Exception =>
-          if (sys.props.contains("spark.testing")) {
-            throw e
-          } else {
-            log.error("failed to generate projection, fallback to interpreted", e)
-            new InterpretedProjection(expressions, inputSchema)
-          }
-      }
-    } else {
-      new InterpretedProjection(expressions, inputSchema)
-    }
-  }
-}
-
-private[sql] abstract class BaseWriterContainer(
-    @transient val relation: HadoopFsRelation,
-    @transient job: Job,
-    isAppend: Boolean)
-  extends SparkHadoopMapReduceUtil
-  with Logging
-  with Serializable {
-
-  protected val serializableConf = new SerializableConfiguration(job.getConfiguration)
-
-  // This UUID is used to avoid output file name collision between different appending write jobs.
-  // These jobs may belong to different SparkContext instances. Concrete data source implementations
-  // may use this UUID to generate unique file names (e.g., `part-r-<task-id>-<job-uuid>.parquet`).
-  //  The reason why this ID is used to identify a job rather than a single task output file is
-  // that, speculative tasks must generate the same output file name as the original task.
-  private val uniqueWriteJobId = UUID.randomUUID()
-
-  // This is only used on driver side.
-  @transient private val jobContext: JobContext = job
-
-  // The following fields are initialized and used on both driver and executor side.
-  @transient protected var outputCommitter: OutputCommitter = _
-  @transient private var jobId: JobID = _
-  @transient private var taskId: TaskID = _
-  @transient private var taskAttemptId: TaskAttemptID = _
-  @transient protected var taskAttemptContext: TaskAttemptContext = _
-
-  protected val outputPath: String = {
-    assert(
-      relation.paths.length == 1,
-      s"Cannot write to multiple destinations: ${relation.paths.mkString(",")}")
-    relation.paths.head
-  }
-
-  protected val dataSchema = relation.dataSchema
-
-  protected var outputWriterFactory: OutputWriterFactory = _
-
-  private var outputFormatClass: Class[_ <: OutputFormat[_, _]] = _
-
-  def driverSideSetup(): Unit = {
-    setupIDs(0, 0, 0)
-    setupConf()
-
-    // This UUID is sent to executor side together with the serialized `Configuration` object within
-    // the `Job` instance.  `OutputWriters` on the executor side should use this UUID to generate
-    // unique task output files.
-    job.getConfiguration.set("spark.sql.sources.writeJobUUID", uniqueWriteJobId.toString)
-
-    // Order of the following two lines is important.  For Hadoop 1, TaskAttemptContext constructor
-    // clones the Configuration object passed in.  If we initialize the TaskAttemptContext first,
-    // configurations made in prepareJobForWrite(job) are not populated into the TaskAttemptContext.
-    //
-    // Also, the `prepareJobForWrite` call must happen before initializing output format and output
-    // committer, since their initialization involve the job configuration, which can be potentially
-    // decorated in `prepareJobForWrite`.
-    outputWriterFactory = relation.prepareJobForWrite(job)
-    taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
-
-    outputFormatClass = job.getOutputFormatClass
-    outputCommitter = newOutputCommitter(taskAttemptContext)
-    outputCommitter.setupJob(jobContext)
-  }
-
-  def executorSideSetup(taskContext: TaskContext): Unit = {
-    setupIDs(taskContext.stageId(), taskContext.partitionId(), taskContext.attemptNumber())
-    setupConf()
-    taskAttemptContext = newTaskAttemptContext(serializableConf.value, taskAttemptId)
-    outputCommitter = newOutputCommitter(taskAttemptContext)
-    outputCommitter.setupTask(taskAttemptContext)
-    initWriters()
-  }
-
-  protected def getWorkPath: String = {
-    outputCommitter match {
-      // FileOutputCommitter writes to a temporary location returned by `getWorkPath`.
-      case f: MapReduceFileOutputCommitter => f.getWorkPath.toString
-      case _ => outputPath
-    }
-  }
-
-  private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
-    val defaultOutputCommitter = outputFormatClass.newInstance().getOutputCommitter(context)
-
-    if (isAppend) {
-      // If we are appending data to an existing dir, we will only use the output committer
-      // associated with the file output format since it is not safe to use a custom
-      // committer for appending. For example, in S3, direct parquet output committer may
-      // leave partial data in the destination dir when the the appending job fails.
-      logInfo(
-        s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName} " +
-        "for appending.")
-      defaultOutputCommitter
-    } else {
-      val committerClass = context.getConfiguration.getClass(
-        SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
-
-      Option(committerClass).map { clazz =>
-        logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
-
-        // Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
-        // has an associated output committer. To override this output committer,
-        // we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
-        // If a data source needs to override the output committer, it needs to set the
-        // output committer in prepareForWrite method.
-        if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
-          // The specified output committer is a FileOutputCommitter.
-          // So, we will use the FileOutputCommitter-specified constructor.
-          val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
-          ctor.newInstance(new Path(outputPath), context)
-        } else {
-          // The specified output committer is just a OutputCommitter.
-          // So, we will use the no-argument constructor.
-          val ctor = clazz.getDeclaredConstructor()
-          ctor.newInstance()
-        }
-      }.getOrElse {
-        // If output committer class is not set, we will use the one associated with the
-        // file output format.
-        logInfo(
-          s"Using output committer class ${defaultOutputCommitter.getClass.getCanonicalName}")
-        defaultOutputCommitter
-      }
-    }
-  }
-
-  private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
-    this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
-    this.taskId = new TaskID(this.jobId, true, splitId)
-    this.taskAttemptId = new TaskAttemptID(taskId, attemptId)
-  }
-
-  private def setupConf(): Unit = {
-    serializableConf.value.set("mapred.job.id", jobId.toString)
-    serializableConf.value.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
-    serializableConf.value.set("mapred.task.id", taskAttemptId.toString)
-    serializableConf.value.setBoolean("mapred.task.is.map", true)
-    serializableConf.value.setInt("mapred.task.partition", 0)
-  }
-
-  // Called on executor side when writing rows
-  def outputWriterForRow(row: InternalRow): OutputWriter
-
-  protected def initWriters(): Unit
-
-  def commitTask(): Unit = {
-    SparkHadoopMapRedUtil.commitTask(
-      outputCommitter, taskAttemptContext, jobId.getId, taskId.getId, taskAttemptId.getId)
-  }
-
-  def abortTask(): Unit = {
-    if (outputCommitter != null) {
-      outputCommitter.abortTask(taskAttemptContext)
-    }
-    logError(s"Task attempt $taskAttemptId aborted.")
-  }
-
-  def commitJob(): Unit = {
-    outputCommitter.commitJob(jobContext)
-    logInfo(s"Job $jobId committed.")
-  }
-
-  def abortJob(): Unit = {
-    if (outputCommitter != null) {
-      outputCommitter.abortJob(jobContext, JobStatus.State.FAILED)
-    }
-    logError(s"Job $jobId aborted.")
-  }
-}
-
-private[sql] class DefaultWriterContainer(
-    @transient relation: HadoopFsRelation,
-    @transient job: Job,
-    isAppend: Boolean)
-  extends BaseWriterContainer(relation, job, isAppend) {
-
-  @transient private var writer: OutputWriter = _
-
-  override protected def initWriters(): Unit = {
-    taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", outputPath)
-    writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext)
-  }
-
-  override def outputWriterForRow(row: InternalRow): OutputWriter = writer
-
-  override def commitTask(): Unit = {
-    try {
-      assert(writer != null, "OutputWriter instance should have been initialized")
-      writer.close()
-      super.commitTask()
-    } catch { case cause: Throwable =>
-      // This exception will be handled in `InsertIntoHadoopFsRelation.insert$writeRows`, and will
-      // cause `abortTask()` to be invoked.
-      throw new RuntimeException("Failed to commit task", cause)
-    }
-  }
-
-  override def abortTask(): Unit = {
-    try {
-      // It's possible that the task fails before `writer` gets initialized
-      if (writer != null) {
-        writer.close()
-      }
-    } finally {
-      super.abortTask()
-    }
-  }
-}
-
-private[sql] class DynamicPartitionWriterContainer(
-    @transient relation: HadoopFsRelation,
-    @transient job: Job,
-    partitionColumns: Array[String],
-    defaultPartitionName: String,
-    isAppend: Boolean)
-  extends BaseWriterContainer(relation, job, isAppend) {
-
-  // All output writers are created on executor side.
-  @transient protected var outputWriters: java.util.HashMap[String, OutputWriter] = _
-
-  override protected def initWriters(): Unit = {
-    outputWriters = new java.util.HashMap[String, OutputWriter]
-  }
-
-  // The `row` argument is supposed to only contain partition column values which have been casted
-  // to strings.
-  override def outputWriterForRow(row: InternalRow): OutputWriter = {
-    val partitionPath = {
-      val partitionPathBuilder = new StringBuilder
-      var i = 0
-
-      while (i < partitionColumns.length) {
-        val col = partitionColumns(i)
-        val partitionValueString = {
-          val string = row.getString(i)
-          if (string.eq(null)) defaultPartitionName else PartitioningUtils.escapePathName(string)
-        }
-
-        if (i > 0) {
-          partitionPathBuilder.append(Path.SEPARATOR_CHAR)
-        }
-
-        partitionPathBuilder.append(s"$col=$partitionValueString")
-        i += 1
-      }
-
-      partitionPathBuilder.toString()
-    }
-
-    val writer = outputWriters.get(partitionPath)
-    if (writer.eq(null)) {
-      val path = new Path(getWorkPath, partitionPath)
-      taskAttemptContext.getConfiguration.set(
-        "spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString)
-      val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
-      outputWriters.put(partitionPath, newWriter)
-      newWriter
-    } else {
-      writer
-    }
-  }
-
-  private def clearOutputWriters(): Unit = {
-    if (!outputWriters.isEmpty) {
-      asScalaIterator(outputWriters.values().iterator()).foreach(_.close())
-      outputWriters.clear()
-    }
-  }
-
-  override def commitTask(): Unit = {
-    try {
-      clearOutputWriters()
-      super.commitTask()
-    } catch { case cause: Throwable =>
-      throw new RuntimeException("Failed to commit task", cause)
-    }
-  }
-
-  override def abortTask(): Unit = {
-    try {
-      clearOutputWriters()
-    } finally {
-      super.abortTask()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
deleted file mode 100644
index 5a8c97c..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ /dev/null
@@ -1,493 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources
-
-import scala.language.{existentials, implicitConversions}
-import scala.util.matching.Regex
-
-import org.apache.hadoop.fs.Path
-
-import org.apache.spark.Logging
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.types._
-import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext, SaveMode}
-import org.apache.spark.util.Utils
-
-/**
- * A parser for foreign DDL commands.
- */
-private[sql] class DDLParser(
-    parseQuery: String => LogicalPlan)
-  extends AbstractSparkSQLParser with DataTypeParser with Logging {
-
-  def parse(input: String, exceptionOnError: Boolean): LogicalPlan = {
-    try {
-      parse(input)
-    } catch {
-      case ddlException: DDLException => throw ddlException
-      case _ if !exceptionOnError => parseQuery(input)
-      case x: Throwable => throw x
-    }
-  }
-
-  // Keyword is a convention with AbstractSparkSQLParser, which will scan all of the `Keyword`
-  // properties via reflection the class in runtime for constructing the SqlLexical object
-  protected val CREATE = Keyword("CREATE")
-  protected val TEMPORARY = Keyword("TEMPORARY")
-  protected val TABLE = Keyword("TABLE")
-  protected val IF = Keyword("IF")
-  protected val NOT = Keyword("NOT")
-  protected val EXISTS = Keyword("EXISTS")
-  protected val USING = Keyword("USING")
-  protected val OPTIONS = Keyword("OPTIONS")
-  protected val DESCRIBE = Keyword("DESCRIBE")
-  protected val EXTENDED = Keyword("EXTENDED")
-  protected val AS = Keyword("AS")
-  protected val COMMENT = Keyword("COMMENT")
-  protected val REFRESH = Keyword("REFRESH")
-
-  protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable | refreshTable
-
-  protected def start: Parser[LogicalPlan] = ddl
-
-  /**
-   * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
-   * USING org.apache.spark.sql.avro
-   * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
-   * or
-   * `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...) [IF NOT EXISTS]
-   * USING org.apache.spark.sql.avro
-   * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
-   * or
-   * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
-   * USING org.apache.spark.sql.avro
-   * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
-   * AS SELECT ...
-   */
-  protected lazy val createTable: Parser[LogicalPlan] =
-    // TODO: Support database.table.
-    (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident ~
-      tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ {
-      case temp ~ allowExisting ~ tableName ~ columns ~ provider ~ opts ~ query =>
-        if (temp.isDefined && allowExisting.isDefined) {
-          throw new DDLException(
-            "a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.")
-        }
-
-        val options = opts.getOrElse(Map.empty[String, String])
-        if (query.isDefined) {
-          if (columns.isDefined) {
-            throw new DDLException(
-              "a CREATE TABLE AS SELECT statement does not allow column definitions.")
-          }
-          // When IF NOT EXISTS clause appears in the query, the save mode will be ignore.
-          val mode = if (allowExisting.isDefined) {
-            SaveMode.Ignore
-          } else if (temp.isDefined) {
-            SaveMode.Overwrite
-          } else {
-            SaveMode.ErrorIfExists
-          }
-
-          val queryPlan = parseQuery(query.get)
-          CreateTableUsingAsSelect(tableName,
-            provider,
-            temp.isDefined,
-            Array.empty[String],
-            mode,
-            options,
-            queryPlan)
-        } else {
-          val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
-          CreateTableUsing(
-            tableName,
-            userSpecifiedSchema,
-            provider,
-            temp.isDefined,
-            options,
-            allowExisting.isDefined,
-            managedIfNoPath = false)
-        }
-    }
-
-  protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")"
-
-  /*
-   * describe [extended] table avroTable
-   * This will display all columns of table `avroTable` includes column_name,column_type,comment
-   */
-  protected lazy val describeTable: Parser[LogicalPlan] =
-    (DESCRIBE ~> opt(EXTENDED)) ~ (ident <~ ".").? ~ ident  ^^ {
-      case e ~ db ~ tbl =>
-        val tblIdentifier = db match {
-          case Some(dbName) =>
-            Seq(dbName, tbl)
-          case None =>
-            Seq(tbl)
-        }
-        DescribeCommand(UnresolvedRelation(tblIdentifier, None), e.isDefined)
-   }
-
-  protected lazy val refreshTable: Parser[LogicalPlan] =
-    REFRESH ~> TABLE ~> (ident <~ ".").? ~ ident ^^ {
-      case maybeDatabaseName ~ tableName =>
-        RefreshTable(maybeDatabaseName.getOrElse("default"), tableName)
-    }
-
-  protected lazy val options: Parser[Map[String, String]] =
-    "(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap }
-
-  protected lazy val className: Parser[String] = repsep(ident, ".") ^^ { case s => s.mkString(".")}
-
-  override implicit def regexToParser(regex: Regex): Parser[String] = acceptMatch(
-    s"identifier matching regex $regex", {
-      case lexical.Identifier(str) if regex.unapplySeq(str).isDefined => str
-      case lexical.Keyword(str) if regex.unapplySeq(str).isDefined => str
-    }
-  )
-
-  protected lazy val optionPart: Parser[String] = "[_a-zA-Z][_a-zA-Z0-9]*".r ^^ {
-    case name => name
-  }
-
-  protected lazy val optionName: Parser[String] = repsep(optionPart, ".") ^^ {
-    case parts => parts.mkString(".")
-  }
-
-  protected lazy val pair: Parser[(String, String)] =
-    optionName ~ stringLit ^^ { case k ~ v => (k, v) }
-
-  protected lazy val column: Parser[StructField] =
-    ident ~ dataType ~ (COMMENT ~> stringLit).?  ^^ { case columnName ~ typ ~ cm =>
-      val meta = cm match {
-        case Some(comment) =>
-          new MetadataBuilder().putString(COMMENT.str.toLowerCase, comment).build()
-        case None => Metadata.empty
-      }
-
-      StructField(columnName, typ, nullable = true, meta)
-    }
-}
-
-private[sql] object ResolvedDataSource {
-
-  private val builtinSources = Map(
-    "jdbc" -> "org.apache.spark.sql.jdbc.DefaultSource",
-    "json" -> "org.apache.spark.sql.json.DefaultSource",
-    "parquet" -> "org.apache.spark.sql.parquet.DefaultSource",
-    "orc" -> "org.apache.spark.sql.hive.orc.DefaultSource"
-  )
-
-  /** Given a provider name, look up the data source class definition. */
-  def lookupDataSource(provider: String): Class[_] = {
-    val loader = Utils.getContextOrSparkClassLoader
-
-    if (builtinSources.contains(provider)) {
-      return loader.loadClass(builtinSources(provider))
-    }
-
-    try {
-      loader.loadClass(provider)
-    } catch {
-      case cnf: java.lang.ClassNotFoundException =>
-        try {
-          loader.loadClass(provider + ".DefaultSource")
-        } catch {
-          case cnf: java.lang.ClassNotFoundException =>
-            if (provider.startsWith("org.apache.spark.sql.hive.orc")) {
-              sys.error("The ORC data source must be used with Hive support enabled.")
-            } else {
-              sys.error(s"Failed to load class for data source: $provider")
-            }
-        }
-    }
-  }
-
-  /** Create a [[ResolvedDataSource]] for reading data in. */
-  def apply(
-      sqlContext: SQLContext,
-      userSpecifiedSchema: Option[StructType],
-      partitionColumns: Array[String],
-      provider: String,
-      options: Map[String, String]): ResolvedDataSource = {
-    val clazz: Class[_] = lookupDataSource(provider)
-    def className: String = clazz.getCanonicalName
-    val relation = userSpecifiedSchema match {
-      case Some(schema: StructType) => clazz.newInstance() match {
-        case dataSource: SchemaRelationProvider =>
-          dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
-        case dataSource: HadoopFsRelationProvider =>
-          val maybePartitionsSchema = if (partitionColumns.isEmpty) {
-            None
-          } else {
-            Some(partitionColumnsSchema(schema, partitionColumns))
-          }
-
-          val caseInsensitiveOptions = new CaseInsensitiveMap(options)
-          val paths = {
-            val patternPath = new Path(caseInsensitiveOptions("path"))
-            val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-            val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
-            SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
-          }
-
-          val dataSchema =
-            StructType(schema.filterNot(f => partitionColumns.contains(f.name))).asNullable
-
-          dataSource.createRelation(
-            sqlContext,
-            paths,
-            Some(dataSchema),
-            maybePartitionsSchema,
-            caseInsensitiveOptions)
-        case dataSource: org.apache.spark.sql.sources.RelationProvider =>
-          throw new AnalysisException(s"$className does not allow user-specified schemas.")
-        case _ =>
-          throw new AnalysisException(s"$className is not a RelationProvider.")
-      }
-
-      case None => clazz.newInstance() match {
-        case dataSource: RelationProvider =>
-          dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))
-        case dataSource: HadoopFsRelationProvider =>
-          val caseInsensitiveOptions = new CaseInsensitiveMap(options)
-          val paths = {
-            val patternPath = new Path(caseInsensitiveOptions("path"))
-            val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-            val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
-            SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
-          }
-          dataSource.createRelation(sqlContext, paths, None, None, caseInsensitiveOptions)
-        case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider =>
-          throw new AnalysisException(
-            s"A schema needs to be specified when using $className.")
-        case _ =>
-          throw new AnalysisException(
-            s"$className is neither a RelationProvider nor a FSBasedRelationProvider.")
-      }
-    }
-    new ResolvedDataSource(clazz, relation)
-  }
-
-  private def partitionColumnsSchema(
-      schema: StructType,
-      partitionColumns: Array[String]): StructType = {
-    StructType(partitionColumns.map { col =>
-      schema.find(_.name == col).getOrElse {
-        throw new RuntimeException(s"Partition column $col not found in schema $schema")
-      }
-    }).asNullable
-  }
-
-  /** Create a [[ResolvedDataSource]] for saving the content of the given [[DataFrame]]. */
-  def apply(
-      sqlContext: SQLContext,
-      provider: String,
-      partitionColumns: Array[String],
-      mode: SaveMode,
-      options: Map[String, String],
-      data: DataFrame): ResolvedDataSource = {
-    if (data.schema.map(_.dataType).exists(_.isInstanceOf[IntervalType])) {
-      throw new AnalysisException("Cannot save interval data type into external storage.")
-    }
-    val clazz: Class[_] = lookupDataSource(provider)
-    val relation = clazz.newInstance() match {
-      case dataSource: CreatableRelationProvider =>
-        dataSource.createRelation(sqlContext, mode, options, data)
-      case dataSource: HadoopFsRelationProvider =>
-        // Don't glob path for the write path.  The contracts here are:
-        //  1. Only one output path can be specified on the write path;
-        //  2. Output path must be a legal HDFS style file system path;
-        //  3. It's OK that the output path doesn't exist yet;
-        val caseInsensitiveOptions = new CaseInsensitiveMap(options)
-        val outputPath = {
-          val path = new Path(caseInsensitiveOptions("path"))
-          val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-          path.makeQualified(fs.getUri, fs.getWorkingDirectory)
-        }
-        val dataSchema = StructType(data.schema.filterNot(f => partitionColumns.contains(f.name)))
-        val r = dataSource.createRelation(
-          sqlContext,
-          Array(outputPath.toString),
-          Some(dataSchema.asNullable),
-          Some(partitionColumnsSchema(data.schema, partitionColumns)),
-          caseInsensitiveOptions)
-
-        // For partitioned relation r, r.schema's column ordering can be different from the column
-        // ordering of data.logicalPlan (partition columns are all moved after data column).  This
-        // will be adjusted within InsertIntoHadoopFsRelation.
-        sqlContext.executePlan(
-          InsertIntoHadoopFsRelation(
-            r,
-            data.logicalPlan,
-            mode)).toRdd
-        r
-      case _ =>
-        sys.error(s"${clazz.getCanonicalName} does not allow create table as select.")
-    }
-    new ResolvedDataSource(clazz, relation)
-  }
-}
-
-private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRelation)
-
-/**
- * Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command.
- * @param table The table to be described.
- * @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false.
- *                   It is effective only when the table is a Hive table.
- */
-private[sql] case class DescribeCommand(
-    table: LogicalPlan,
-    isExtended: Boolean) extends LogicalPlan with Command {
-
-  override def children: Seq[LogicalPlan] = Seq.empty
-  override val output: Seq[Attribute] = Seq(
-    // Column names are based on Hive.
-    AttributeReference("col_name", StringType, nullable = false,
-      new MetadataBuilder().putString("comment", "name of the column").build())(),
-    AttributeReference("data_type", StringType, nullable = false,
-      new MetadataBuilder().putString("comment", "data type of the column").build())(),
-    AttributeReference("comment", StringType, nullable = false,
-      new MetadataBuilder().putString("comment", "comment of the column").build())())
-}
-
-/**
-  * Used to represent the operation of create table using a data source.
-  * @param allowExisting If it is true, we will do nothing when the table already exists.
-  *                      If it is false, an exception will be thrown
-  */
-private[sql] case class CreateTableUsing(
-    tableName: String,
-    userSpecifiedSchema: Option[StructType],
-    provider: String,
-    temporary: Boolean,
-    options: Map[String, String],
-    allowExisting: Boolean,
-    managedIfNoPath: Boolean) extends LogicalPlan with Command {
-
-  override def output: Seq[Attribute] = Seq.empty
-  override def children: Seq[LogicalPlan] = Seq.empty
-}
-
-/**
- * A node used to support CTAS statements and saveAsTable for the data source API.
- * This node is a [[UnaryNode]] instead of a [[Command]] because we want the analyzer
- * can analyze the logical plan that will be used to populate the table.
- * So, [[PreWriteCheck]] can detect cases that are not allowed.
- */
-private[sql] case class CreateTableUsingAsSelect(
-    tableName: String,
-    provider: String,
-    temporary: Boolean,
-    partitionColumns: Array[String],
-    mode: SaveMode,
-    options: Map[String, String],
-    child: LogicalPlan) extends UnaryNode {
-  override def output: Seq[Attribute] = Seq.empty[Attribute]
-  // TODO: Override resolved after we support databaseName.
-  // override lazy val resolved = databaseName != None && childrenResolved
-}
-
-private[sql] case class CreateTempTableUsing(
-    tableName: String,
-    userSpecifiedSchema: Option[StructType],
-    provider: String,
-    options: Map[String, String]) extends RunnableCommand {
-
-  def run(sqlContext: SQLContext): Seq[InternalRow] = {
-    val resolved = ResolvedDataSource(
-      sqlContext, userSpecifiedSchema, Array.empty[String], provider, options)
-    sqlContext.registerDataFrameAsTable(
-      DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName)
-    Seq.empty
-  }
-}
-
-private[sql] case class CreateTempTableUsingAsSelect(
-    tableName: String,
-    provider: String,
-    partitionColumns: Array[String],
-    mode: SaveMode,
-    options: Map[String, String],
-    query: LogicalPlan) extends RunnableCommand {
-
-  override def run(sqlContext: SQLContext): Seq[InternalRow] = {
-    val df = DataFrame(sqlContext, query)
-    val resolved = ResolvedDataSource(sqlContext, provider, partitionColumns, mode, options, df)
-    sqlContext.registerDataFrameAsTable(
-      DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName)
-
-    Seq.empty
-  }
-}
-
-private[sql] case class RefreshTable(databaseName: String, tableName: String)
-  extends RunnableCommand {
-
-  override def run(sqlContext: SQLContext): Seq[InternalRow] = {
-    // Refresh the given table's metadata first.
-    sqlContext.catalog.refreshTable(databaseName, tableName)
-
-    // If this table is cached as a InMemoryColumnarRelation, drop the original
-    // cached version and make the new version cached lazily.
-    val logicalPlan = sqlContext.catalog.lookupRelation(Seq(databaseName, tableName))
-    // Use lookupCachedData directly since RefreshTable also takes databaseName.
-    val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty
-    if (isCached) {
-      // Create a data frame to represent the table.
-      // TODO: Use uncacheTable once it supports database name.
-      val df = DataFrame(sqlContext, logicalPlan)
-      // Uncache the logicalPlan.
-      sqlContext.cacheManager.tryUncacheQuery(df, blocking = true)
-      // Cache it again.
-      sqlContext.cacheManager.cacheQuery(df, Some(tableName))
-    }
-
-    Seq.empty[InternalRow]
-  }
-}
-
-/**
- * Builds a map in which keys are case insensitive
- */
-protected[sql] class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
-  with Serializable {
-
-  val baseMap = map.map(kv => kv.copy(_1 = kv._1.toLowerCase))
-
-  override def get(k: String): Option[String] = baseMap.get(k.toLowerCase)
-
-  override def + [B1 >: String](kv: (String, B1)): Map[String, B1] =
-    baseMap + kv.copy(_1 = kv._1.toLowerCase)
-
-  override def iterator: Iterator[(String, String)] = baseMap.iterator
-
-  override def -(key: String): Map[String, String] = baseMap - key.toLowerCase
-}
-
-/**
- * The exception thrown from the DDL parser.
- */
-protected[sql] class DDLException(message: String) extends Exception(message)

http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
index 24e86ca..4d942e4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala
@@ -17,6 +17,10 @@
 
 package org.apache.spark.sql.sources
 
+////////////////////////////////////////////////////////////////////////////////////////////////////
+// This file defines all the filters that we can push down to the data sources.
+////////////////////////////////////////////////////////////////////////////////////////////////////
+
 /**
  * A filter predicate for data sources.
  *

http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 2cd8b35..7cd005b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.sources
 
 import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
 import scala.util.Try
 
 import org.apache.hadoop.conf.Configuration
@@ -33,6 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
 import org.apache.spark.sql.execution.RDDConversions
+import org.apache.spark.sql.execution.datasources.{PartitioningUtils, PartitionSpec, Partition}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql._
 import org.apache.spark.util.SerializableConfiguration
@@ -523,7 +523,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
     })
   }
 
-  private[sources] final def buildScan(
+  private[sql] final def buildScan(
       requiredColumns: Array[String],
       filters: Array[Filter],
       inputPaths: Array[String],

http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
deleted file mode 100644
index 40ee048..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.sources
-
-import org.apache.spark.sql.{SaveMode, AnalysisException}
-import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, Catalog}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Alias}
-import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.types.DataType
-
-/**
- * A rule to do pre-insert data type casting and field renaming. Before we insert into
- * an [[InsertableRelation]], we will use this rule to make sure that
- * the columns to be inserted have the correct data type and fields have the correct names.
- */
-private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-      // Wait until children are resolved.
-      case p: LogicalPlan if !p.childrenResolved => p
-
-      // We are inserting into an InsertableRelation or HadoopFsRelation.
-      case i @ InsertIntoTable(
-      l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation), _, child, _, _) => {
-        // First, make sure the data to be inserted have the same number of fields with the
-        // schema of the relation.
-        if (l.output.size != child.output.size) {
-          sys.error(
-            s"$l requires that the query in the SELECT clause of the INSERT INTO/OVERWRITE " +
-              s"statement generates the same number of columns as its schema.")
-        }
-        castAndRenameChildOutput(i, l.output, child)
-      }
-  }
-
-  /** If necessary, cast data types and rename fields to the expected types and names. */
-  def castAndRenameChildOutput(
-      insertInto: InsertIntoTable,
-      expectedOutput: Seq[Attribute],
-      child: LogicalPlan): InsertIntoTable = {
-    val newChildOutput = expectedOutput.zip(child.output).map {
-      case (expected, actual) =>
-        val needCast = !expected.dataType.sameType(actual.dataType)
-        // We want to make sure the filed names in the data to be inserted exactly match
-        // names in the schema.
-        val needRename = expected.name != actual.name
-        (needCast, needRename) match {
-          case (true, _) => Alias(Cast(actual, expected.dataType), expected.name)()
-          case (false, true) => Alias(actual, expected.name)()
-          case (_, _) => actual
-        }
-    }
-
-    if (newChildOutput == child.output) {
-      insertInto
-    } else {
-      insertInto.copy(child = Project(newChildOutput, child))
-    }
-  }
-}
-
-/**
- * A rule to do various checks before inserting into or writing to a data source table.
- */
-private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => Unit) {
-  def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) }
-
-  def apply(plan: LogicalPlan): Unit = {
-    plan.foreach {
-      case i @ logical.InsertIntoTable(
-        l @ LogicalRelation(t: InsertableRelation), partition, query, overwrite, ifNotExists) =>
-        // Right now, we do not support insert into a data source table with partition specs.
-        if (partition.nonEmpty) {
-          failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.")
-        } else {
-          // Get all input data source relations of the query.
-          val srcRelations = query.collect {
-            case LogicalRelation(src: BaseRelation) => src
-          }
-          if (srcRelations.contains(t)) {
-            failAnalysis(
-              "Cannot insert overwrite into table that is also being read from.")
-          } else {
-            // OK
-          }
-        }
-
-      case logical.InsertIntoTable(LogicalRelation(r: HadoopFsRelation), part, _, _, _) =>
-        // We need to make sure the partition columns specified by users do match partition
-        // columns of the relation.
-        val existingPartitionColumns = r.partitionColumns.fieldNames.toSet
-        val specifiedPartitionColumns = part.keySet
-        if (existingPartitionColumns != specifiedPartitionColumns) {
-          failAnalysis(s"Specified partition columns " +
-            s"(${specifiedPartitionColumns.mkString(", ")}) " +
-            s"do not match the partition columns of the table. Please use " +
-            s"(${existingPartitionColumns.mkString(", ")}) as the partition columns.")
-        } else {
-          // OK
-        }
-
-      case logical.InsertIntoTable(l: LogicalRelation, _, _, _, _) =>
-        // The relation in l is not an InsertableRelation.
-        failAnalysis(s"$l does not allow insertion.")
-
-      case logical.InsertIntoTable(t, _, _, _, _) =>
-        if (!t.isInstanceOf[LeafNode] || t == OneRowRelation || t.isInstanceOf[LocalRelation]) {
-          failAnalysis(s"Inserting into an RDD-based table is not allowed.")
-        } else {
-          // OK
-        }
-
-      case CreateTableUsingAsSelect(tableName, _, _, _, SaveMode.Overwrite, _, query) =>
-        // When the SaveMode is Overwrite, we need to check if the table is an input table of
-        // the query. If so, we will throw an AnalysisException to let users know it is not allowed.
-        if (catalog.tableExists(Seq(tableName))) {
-          // Need to remove SubQuery operator.
-          EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) match {
-            // Only do the check if the table is a data source table
-            // (the relation is a BaseRelation).
-            case l @ LogicalRelation(dest: BaseRelation) =>
-              // Get all input data source relations of the query.
-              val srcRelations = query.collect {
-                case LogicalRelation(src: BaseRelation) => src
-              }
-              if (srcRelations.contains(dest)) {
-                failAnalysis(
-                  s"Cannot overwrite table $tableName that is also being read from.")
-              } else {
-                // OK
-              }
-
-            case _ => // OK
-          }
-        } else {
-          // OK
-        }
-
-      case _ => // OK
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index 3475f9d..1d04513 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -26,8 +26,8 @@ import org.scalactic.Tolerance._
 import org.apache.spark.sql.{QueryTest, Row, SQLConf}
 import org.apache.spark.sql.TestData._
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.json.InferSchema.compatibleType
-import org.apache.spark.sql.sources.LogicalRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 

http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
index a2763c7..23df102 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -24,7 +24,7 @@ import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.sources.LogicalRelation
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{Column, DataFrame, QueryTest, Row, SQLConf}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index 37b0a9f..4f98776 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -28,11 +28,11 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.sources.PartitioningUtils._
-import org.apache.spark.sql.sources.{LogicalRelation, Partition, PartitionSpec}
+import org.apache.spark.sql.execution.datasources.{LogicalRelation, PartitionSpec, Partition, PartitioningUtils}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql._
 import org.apache.spark.unsafe.types.UTF8String
+import PartitioningUtils._
 
 // The data where the partitioning key exists only in the directory structure.
 case class ParquetData(intField: Int, stringField: String)

http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index a710884..1907e64 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -22,6 +22,7 @@ import java.io.{File, IOException}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.execution.datasources.DDLException
 import org.apache.spark.util.Utils
 
 class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {

http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
index 296b0d6..3cbf546 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.sources
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.execution.datasources.ResolvedDataSource
 
 class ResolvedDataSourceSuite extends SparkFunSuite {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 4684d48..cec7685 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -44,9 +44,9 @@ import org.apache.spark.sql.catalyst.ParserDialect
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUDFs, SetCommand}
+import org.apache.spark.sql.execution.datasources.{PreWriteCheck, PreInsertCastAndRename, DataSourceStrategy}
 import org.apache.spark.sql.hive.client._
 import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
-import org.apache.spark.sql.sources.DataSourceStrategy
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
@@ -384,11 +384,11 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging {
         catalog.PreInsertionCasts ::
         ExtractPythonUDFs ::
         ResolveHiveWindowFunction ::
-        sources.PreInsertCastAndRename ::
+        PreInsertCastAndRename ::
         Nil
 
       override val extendedCheckRules = Seq(
-        sources.PreWriteCheck(catalog)
+        PreWriteCheck(catalog)
       )
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index b15261b..0a2121c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.hive
 
+import scala.collection.JavaConversions._
+
 import com.google.common.base.Objects
 import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
 
@@ -28,6 +30,7 @@ import org.apache.hadoop.hive.ql.metadata._
 import org.apache.hadoop.hive.ql.plan.TableDesc
 
 import org.apache.spark.Logging
+import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog}
 import org.apache.spark.sql.catalyst.expressions._
@@ -35,14 +38,12 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.execution.datasources
+import org.apache.spark.sql.execution.datasources.{Partition => ParquetPartition, PartitionSpec, CreateTableUsingAsSelect, ResolvedDataSource, LogicalRelation}
 import org.apache.spark.sql.hive.client._
 import org.apache.spark.sql.parquet.ParquetRelation2
-import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode, sources}
 
-/* Implicit conversions */
-import scala.collection.JavaConversions._
 
 private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: HiveContext)
   extends Catalog with Logging {
@@ -278,7 +279,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
             parquetRelation.paths.toSet == pathsInMetastore.toSet &&
             logical.schema.sameType(metastoreSchema) &&
             parquetRelation.partitionSpec == partitionSpecInMetastore.getOrElse {
-              PartitionSpec(StructType(Nil), Array.empty[sources.Partition])
+              PartitionSpec(StructType(Nil), Array.empty[datasources.Partition])
             }
 
           if (useCached) {

http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 7fc517b..f557450 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.trees.CurrentOrigin
 import org.apache.spark.sql.execution.ExplainCommand
-import org.apache.spark.sql.sources.DescribeCommand
+import org.apache.spark.sql.execution.datasources.DescribeCommand
 import org.apache.spark.sql.hive.HiveShim._
 import org.apache.spark.sql.hive.client._
 import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DropTable, AnalyzeTable, HiveScriptIOSchema}

http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 9638a82..a22c329 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -30,9 +30,9 @@ import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand, _}
+import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTableUsingAsSelect, DescribeCommand}
 import org.apache.spark.sql.hive.execution._
 import org.apache.spark.sql.parquet.ParquetRelation
-import org.apache.spark.sql.sources.{CreateTableUsing, CreateTableUsingAsSelect, DescribeCommand}
 import org.apache.spark.sql.types.StringType
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 71fa3e9..a47f9a4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.RunnableCommand
+import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, LogicalRelation}
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._

http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 48d35a6..de63ee5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -37,6 +37,7 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil
 import org.apache.spark.rdd.{HadoopRDD, RDD}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.execution.datasources.PartitionSpec
 import org.apache.spark.sql.hive.{HiveContext, HiveInspectors, HiveMetastoreTypes, HiveShim}
 import org.apache.spark.sql.sources.{Filter, _}
 import org.apache.spark.sql.types.StructType

http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index d910af2..e403f32 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -28,12 +28,12 @@ import org.apache.hadoop.mapred.InvalidInputException
 
 import org.apache.spark.Logging
 import org.apache.spark.sql._
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable}
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
 import org.apache.spark.sql.parquet.ParquetRelation2
-import org.apache.spark.sql.sources.LogicalRelation
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils

http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index c9dd4c0..efb04bf 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -22,11 +22,11 @@ import java.io._
 import org.scalatest.{BeforeAndAfterAll, GivenWhenThen}
 
 import org.apache.spark.{Logging, SparkFunSuite}
-import org.apache.spark.sql.sources.DescribeCommand
-import org.apache.spark.sql.execution.{SetCommand, ExplainCommand}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.execution.{SetCommand, ExplainCommand}
+import org.apache.spark.sql.execution.datasources.DescribeCommand
 import org.apache.spark.sql.hive.test.TestHive
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 05a1f00..0342826 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -23,12 +23,12 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.DefaultParserDialect
 import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
 import org.apache.spark.sql.catalyst.errors.DialectException
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
 import org.apache.spark.sql.hive.{HiveContext, HiveQLDialect, MetastoreRelation}
 import org.apache.spark.sql.parquet.ParquetRelation2
-import org.apache.spark.sql.sources.LogicalRelation
 import org.apache.spark.sql.types._
 
 case class Nested1(f1: Nested2)

http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 9d79a4b..82a8daf 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -23,12 +23,12 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
+import org.apache.spark.sql.execution.datasources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.hive.execution.HiveTableScan
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
 import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
-import org.apache.spark.sql.sources.{InsertIntoDataSource, InsertIntoHadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils

http://git-wip-us.apache.org/repos/asf/spark/blob/60c0ce13/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index afecf96..1cef83f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -17,10 +17,10 @@
 
 package org.apache.spark.sql.sources
 
-import scala.collection.JavaConversions._
-
 import java.io.File
 
+import scala.collection.JavaConversions._
+
 import com.google.common.io.Files
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
@@ -31,10 +31,12 @@ import org.apache.parquet.hadoop.ParquetOutputCommitter
 import org.apache.spark.{SparkException, SparkFunSuite}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql._
+import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types._
 
+
 abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
   override lazy val sqlContext: SQLContext = TestHive
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message