carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [24/24] carbondata git commit: [CARBONDATA-1173] Stream ingestion - write path framework
Date Wed, 12 Jul 2017 15:09:36 GMT
[CARBONDATA-1173] Stream ingestion - write path framework

This closes #1064


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3713ebb2
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3713ebb2
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3713ebb2

Branch: refs/heads/streaming_ingest
Commit: 3713ebb21e62764afa90dbd5f3a40d90845f00ca
Parents: 9e4da2a
Author: Aniket Adnaik <aniket.adnaik@gmail.com>
Authored: Thu Jun 15 11:57:43 2017 -0700
Committer: jackylk <jacky.likun@huawei.com>
Committed: Wed Jul 12 23:06:48 2017 +0800

----------------------------------------------------------------------
 .../streaming/CarbonStreamingCommitInfo.java    | 108 ++++++++++
 .../streaming/CarbonStreamingConstants.java     |  25 +++
 .../streaming/CarbonStreamingMetaStore.java     |  40 ++++
 .../streaming/CarbonStreamingMetaStoreImpl.java |  56 ++++++
 .../core/util/path/CarbonTablePath.java         |  10 +
 .../streaming/CarbonStreamingOutputFormat.java  |  66 +++++++
 .../streaming/CarbonStreamingRecordWriter.java  | 196 +++++++++++++++++++
 .../org/apache/spark/sql/CarbonSource.scala     |  53 +++--
 .../CarbonStreamingOutpurWriteFactory.scala     |  88 +++++++++
 .../streaming/CarbonStreamingOutputWriter.scala |  98 ++++++++++
 10 files changed, 726 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3713ebb2/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
new file mode 100644
index 0000000..6cf303a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingCommitInfo.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.streaming;
+
+/**
+ * Commit info for streaming writes
+ * The commit info can be used to recover valid offset in the file
+ * in the case of write failure.
+ */
+public class CarbonStreamingCommitInfo {
+
+  private String dataBase;
+
+  private String table;
+
+  private long commitTime;
+
+  private long segmentID;
+
+  private String partitionID;
+
+  private long batchID;
+
+  private String fileOffset;
+
+  private long transactionID;     // future use
+
+  public  CarbonStreamingCommitInfo(
+
+      String dataBase,
+
+      String table,
+
+      long commitTime,
+
+      long segmentID,
+
+      String partitionID,
+
+      long batchID) {
+
+    this.dataBase = dataBase;
+
+    this.table = table;
+
+    this.commitTime = commitTime;
+
+    this.segmentID = segmentID;
+
+    this.partitionID = partitionID;
+
+    this.batchID = batchID;
+
+    this.transactionID = -1;
+  }
+
+  public String getDataBase() {
+    return dataBase;
+  }
+
+  public String getTable() {
+    return table;
+  }
+
+  public long getCommitTime() {
+    return commitTime;
+  }
+
+  public long getSegmentID() {
+    return segmentID;
+  }
+
+  public String getPartitionID() {
+    return partitionID;
+  }
+
+  public long getBatchID() {
+    return batchID;
+  }
+
+  public String getFileOffset() {
+    return fileOffset;
+  }
+
+  public long getTransactionID() {
+    return transactionID;
+  }
+
+  @Override
+  public String toString() {
+    return dataBase + "." + table + "." + segmentID + "$" + partitionID;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3713ebb2/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingConstants.java
b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingConstants.java
new file mode 100644
index 0000000..db7186f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingConstants.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.streaming;
+
+public class CarbonStreamingConstants {
+
+  public static final long DEFAULT_CARBON_STREAM_FILE_BLOCK_SIZE = 1024 * 1024 * 1024; //
1GB
+
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3713ebb2/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStore.java
b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStore.java
new file mode 100644
index 0000000..fa3746c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStore.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.streaming;
+
+
+import java.io.IOException;
+
+/**
+ * Generic interface for storing commit info for streaming ingest
+ */
+public interface CarbonStreamingMetaStore {
+
+  public CarbonStreamingCommitInfo getStreamingCommitInfo(
+          String dataBase,
+          String table,
+          long segmentID,
+          String partitionID) throws IOException;
+
+  public void updateStreamingCommitInfo(
+          CarbonStreamingMetaStore commitInfo) throws IOException;
+
+  public void recoverStreamingData(
+          CarbonStreamingCommitInfo commitInfo) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3713ebb2/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStoreImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStoreImpl.java
b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStoreImpl.java
new file mode 100644
index 0000000..0afe962
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/streaming/CarbonStreamingMetaStoreImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.streaming;
+
+import java.io.IOException;
+
+/**
+ *  JSON format can be used to store the metadata
+ */
+public class CarbonStreamingMetaStoreImpl implements CarbonStreamingMetaStore {
+
+  /**
+   * get commit info from metastore
+   */
+  public CarbonStreamingCommitInfo getStreamingCommitInfo(
+          String dataBase,
+          String table,
+          long segmentID,
+          String partitionID) throws IOException {
+
+    return null;
+
+  }
+
+  /**
+   * Update commit info in metastore
+   */
+  public void updateStreamingCommitInfo(
+          CarbonStreamingMetaStore commitInfo) throws IOException {
+
+  }
+
+  /**
+   * Recover streaming data using valid offset in commit info
+   */
+  public void recoverStreamingData(
+          CarbonStreamingCommitInfo commitInfo) throws IOException {
+
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3713ebb2/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 101419d..9ce7d3a 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -51,6 +51,16 @@ public class CarbonTablePath extends Path {
   protected static final String INDEX_FILE_EXT = ".carbonindex";
   protected static final String DELETE_DELTA_FILE_EXT = ".deletedelta";
 
+  /**
+   * Streaming ingest related paths
+   */
+  protected static final String STREAM_PREFIX = "Streaming";
+  protected static final String STREAM_FILE_NAME_EXT = ".carbondata.stream";
+  protected static final String STREAM_FILE_BEING_WRITTEN = "in-progress.carbondata.stream";
+  protected static final String STREAM_FILE_BEING_WRITTEN_META = "in-progress.meta";
+  protected static final String STREAM_COMPACTION_STATUS = "streaming_compaction_status";
+  protected static final String STREAM_FILE_LOCK = "streaming_in_use.lock";
+
   protected String tablePath;
   protected CarbonTableIdentifier carbonTableIdentifier;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3713ebb2/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingOutputFormat.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingOutputFormat.java
new file mode 100644
index 0000000..fc6f455
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingOutputFormat.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.streaming;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.streaming.CarbonStreamingConstants;
+import org.apache.carbondata.processing.csvload.CSVInputFormat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+
+/**
+ * Output format to write streaming data to carbondata file
+ *
+ * @param <V> - type of record
+ */
+public class CarbonStreamingOutputFormat<K, V> extends FileOutputFormat<K, V>
{
+
+  public static long getBlockSize(Configuration conf) {
+    return conf.getLong("dfs.block.size",
+            CarbonStreamingConstants.DEFAULT_CARBON_STREAM_FILE_BLOCK_SIZE);
+  }
+
+  public static void setBlockSize(Configuration conf, long blockSize) {
+    conf.setLong("dfs.block.size", blockSize);
+  }
+
+  /**
+   * When getRecordWriter may need to override
+   * to provide correct path including streaming segment name
+   */
+  @Override
+  public CarbonStreamingRecordWriter<K, V> getRecordWriter(TaskAttemptContext job)
+          throws IOException, InterruptedException {
+
+    Configuration conf = job.getConfiguration();
+
+    String keyValueSeparator = conf.get(
+            CSVInputFormat.DELIMITER,
+            CSVInputFormat.DELIMITER_DEFAULT);
+
+    return new CarbonStreamingRecordWriter<K, V>(
+            conf,
+            getDefaultWorkFile(job, null),
+            keyValueSeparator);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3713ebb2/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingRecordWriter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingRecordWriter.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingRecordWriter.java
new file mode 100644
index 0000000..9d1951f
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamingRecordWriter.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.streaming;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+
+public class CarbonStreamingRecordWriter<K,V> extends RecordWriter<K, V> {
+
+  private static final String utf8 = "UTF-8";
+
+  private static final byte[] newline;
+
+  static {
+
+    try {
+
+      newline = "\n".getBytes(utf8);
+
+    } catch (UnsupportedEncodingException uee) {
+
+      throw new IllegalArgumentException("Can't find " + utf8 + " encoding");
+    }
+  }
+
+  private FSDataOutputStream outputStream;
+
+  private FileSystem fs;
+
+  private Path file;
+
+  private volatile boolean isClosed;
+
+  private final byte[] keyValueSeparator;
+
+  public void initOut() throws IOException {
+
+    outputStream = fs.create(file, false);
+
+    isClosed = false;
+  }
+
+  public CarbonStreamingRecordWriter(
+          Configuration conf,
+          Path file,
+          String keyValueSeparator) throws IOException {
+
+    this.file = file;
+
+    fs = FileSystem.get(conf);
+
+    outputStream = fs.create(file, false);
+
+    isClosed = false;
+
+    try {
+
+      this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
+
+    } catch (UnsupportedEncodingException uee) {
+
+      throw new IllegalArgumentException("Can't find " + utf8 + "encoding");
+
+    }
+
+  }
+
+  public CarbonStreamingRecordWriter(
+          Configuration conf,
+          Path file) throws IOException {
+
+    this(conf, file, ",");
+
+  }
+
+  /**
+   *  Write Object to byte stream.
+   */
+
+  private void writeObject(Object o) throws IOException {
+
+    if (o instanceof Text) {
+      Text to = (Text)o;
+
+      outputStream.write(to.getBytes(), 0, to.getLength());
+
+    } else {
+
+      outputStream.write(o.toString().getBytes(utf8));
+
+    }
+  }
+
+  /**
+   * Write streaming data as text file (temporary)
+   */
+
+  @Override
+  public synchronized void write(K key, V value) throws IOException {
+
+    boolean isNULLKey = key == null || key instanceof NullWritable;
+
+    boolean isNULLValue = value == null || value instanceof NullWritable;
+
+    if (isNULLKey && isNULLValue) {
+
+      return;
+    }
+
+    if (!isNULLKey) {
+
+      writeObject(key);
+    }
+
+    if (!isNULLKey || !isNULLValue) {
+
+      outputStream.write(keyValueSeparator);
+    }
+
+    if (!isNULLValue) {
+
+      writeObject(value);
+    }
+
+    outputStream.write(newline);
+  }
+
+  private void closeInternal() throws IOException {
+
+    if (!isClosed) {
+
+      outputStream.close();
+
+      isClosed = true;
+    }
+
+  }
+
+  public void flush() throws IOException {
+
+    outputStream.hflush();
+  }
+
+  public long getOffset() throws IOException {
+
+    return outputStream.getPos();
+  }
+
+  public void commit(boolean finalCommit) throws IOException {
+
+    closeInternal();
+
+    Path commitFile = new Path(file.getParent(),
+            CarbonTablePath.getCarbonDataPrefix() + System.currentTimeMillis());
+
+    fs.rename(file, commitFile);
+
+    if (!finalCommit) {
+      initOut();
+    }
+  }
+
+  @Override
+  public void close(TaskAttemptContext context) throws IOException, InterruptedException
{
+
+    closeInternal();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3713ebb2/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 1c16143..eaaca6b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -20,26 +20,30 @@ package org.apache.spark.sql
 import scala.language.implicitConversions
 
 import org.apache.commons.lang.StringUtils
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce.Job
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.execution.CarbonLateDecodeStrategy
-import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field}
+import org.apache.spark.sql.execution.command.CreateTable
+import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory}
 import org.apache.spark.sql.optimizer.CarbonLateDecodeRule
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{DecimalType, StructType}
+import org.apache.spark.sql.streaming.CarbonStreamingOutputWriterFactory
+import org.apache.spark.sql.types.{StringType, StructType}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
+
 /**
  * Carbon relation provider compliant to data source api.
  * Creates carbon relations
  */
 class CarbonSource extends CreatableRelationProvider with RelationProvider
-  with SchemaRelationProvider with DataSourceRegister {
+  with SchemaRelationProvider with DataSourceRegister with FileFormat  {
 
   override def shortName(): String = "carbondata"
 
@@ -47,7 +51,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
   override def createRelation(sqlContext: SQLContext,
       parameters: Map[String, String]): BaseRelation = {
     CarbonEnv.getInstance(sqlContext.sparkSession)
-    // if path is provided we can directly create Hadoop relation. \
+    // if path is provided we can directly create Hadoop relation.
     // Otherwise create datasource relation
     parameters.get("tablePath") match {
       case Some(path) => CarbonDatasourceHadoopRelation(sqlContext.sparkSession,
@@ -169,13 +173,13 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
     }
   }
 
-  /**
-   * Returns the path of the table
-   * @param sparkSession
-   * @param dbName
-   * @param tableName
-   * @return
-   */
+/**
+ * Returns the path of the table
+ * @param sparkSession
+ * @param dbName
+ * @param tableName
+ * @return
+ */
   private def getPathForTable(sparkSession: SparkSession, dbName: String,
       tableName : String): String = {
 
@@ -190,9 +194,30 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
         .lookupRelation(Option(dbName), tableName)(sparkSession)
       CarbonEnv.getInstance(sparkSession).carbonMetastore.storePath + s"/$dbName/$tableName"
     } catch {
-      case ex: Exception =>
-        throw new Exception(s"Do not have $dbName and $tableName", ex)
+        case ex: Exception =>
+          throw new Exception(s"Do not have $dbName and $tableName", ex)
     }
   }
 
+/**
+ * Prepares a write job and returns an [[OutputWriterFactory]].  Client side job preparation
can
+ * be put here.  For example, user defined output committer can be configured here
+ * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
+ */
+  def prepareWrite(
+    sparkSession: SparkSession,
+    job: Job,
+    options: Map[String, String],
+    dataSchema: StructType): OutputWriterFactory = new CarbonStreamingOutputWriterFactory()
+
+/**
+ * When possible, this method should return the schema of the given `files`.  When the format
+ * does not support inference, or no valid files are given should return None.  In these
cases
+ * Spark will require that user specify the schema manually.
+ */
+  def inferSchema(
+    sparkSession: SparkSession,
+    options: Map[String, String],
+    files: Seq[FileStatus]): Option[StructType] = Some(new StructType().add("value", StringType))
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3713ebb2/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala
new file mode 100644
index 0000000..be69885
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutpurWriteFactory.scala
@@ -0,0 +1,88 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.streaming
+
+
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.spark.sql.execution.datasources.OutputWriterFactory
+import org.apache.spark.sql.types.StructType
+
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+
+class CarbonStreamingOutputWriterFactory extends OutputWriterFactory {
+
+ /**
+  * When writing to a [[org.apache.spark.sql.execution.datasources.HadoopFsRelation]],
+  * this method gets called by each task on executor side
+  * to instantiate new [[org.apache.spark.sql.execution.datasources.OutputWriter]]s.
+  *
+  * @param path Path to write the file.
+  * @param dataSchema Schema of the rows to be written. Partition columns are not
+  *                   included in the schema if the relation being written is
+  *                   partitioned.
+  * @param context The Hadoop MapReduce task context.
+  */
+
+  override def newInstance(
+    path: String,
+
+    dataSchema: StructType,
+
+    context: TaskAttemptContext) : CarbonStreamingOutputWriter = {
+
+        new CarbonStreamingOutputWriter(path, context)
+  }
+
+  override def getFileExtension(context: TaskAttemptContext): String = {
+
+    CarbonTablePath.STREAM_FILE_NAME_EXT
+  }
+
+}
+
+object CarbonStreamingOutpurWriterFactory {
+
+  private[this] val writers = new ConcurrentHashMap[String, CarbonStreamingOutputWriter]()
+
+  def addWriter(path: String, writer: CarbonStreamingOutputWriter): Unit = {
+
+    if (writers.contains(path)) {
+      throw new IllegalArgumentException(path + "writer already exists")
+    }
+
+    writers.put(path, writer)
+  }
+
+  def getWriter(path: String): CarbonStreamingOutputWriter = {
+
+    writers.get(path)
+  }
+
+  def containsWriter(path: String): Boolean = {
+
+    writers.containsKey(path)
+  }
+
+  def removeWriter(path: String): Unit = {
+
+    writers.remove(path)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3713ebb2/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriter.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriter.scala
new file mode 100644
index 0000000..dfc8ff3
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/streaming/CarbonStreamingOutputWriter.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{NullWritable, Text}
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.OutputWriter
+import org.apache.spark.sql.Row
+
+import org.apache.carbondata.hadoop.streaming.{CarbonStreamingOutputFormat, CarbonStreamingRecordWriter}
+
+class CarbonStreamingOutputWriter (
+    path: String,
+    context: TaskAttemptContext)
+    extends OutputWriter {
+
+  private[this] val buffer = new Text()
+
+  private val recordWriter: CarbonStreamingRecordWriter[NullWritable, Text] = {
+
+    val outputFormat = new CarbonStreamingOutputFormat[NullWritable, Text] () {
+
+      override def getDefaultWorkFile(context: TaskAttemptContext, extension: String) : Path
= {
+        new Path(path)
+      }
+
+    /*
+     May need to override
+     def getOutputCommiter(c: TaskAttemptContext): OutputCommitter = {
+      null
+    }
+    */
+
+    }
+
+    outputFormat.
+      getRecordWriter(context).asInstanceOf[CarbonStreamingRecordWriter[NullWritable, Text]]
+  }
+
+  override def write(row: Row): Unit = {
+
+    throw new UnsupportedOperationException("call writeInternal")
+
+  }
+
+  override protected [sql] def writeInternal(row: InternalRow): Unit = {
+
+    val utf8string = row.getUTF8String(0)
+
+    buffer.set(utf8string.getBytes)
+
+    recordWriter.write(NullWritable.get(), buffer)
+
+  }
+
+  def getpath: String = path
+
+  override def close(): Unit = {
+
+    recordWriter.close(context)
+
+  }
+
+  def flush(): Unit = {
+
+    recordWriter.flush()
+
+  }
+
+  def getPos(): Long = {
+
+    recordWriter.getOffset()
+
+  }
+
+  def commit(finalCommit: Boolean): Unit = {
+
+    recordWriter.commit(finalCommit)
+
+  }
+}


Mime
View raw message