carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [1/5] carbondata git commit: [CARBONDATA-2165]Remove spark in carbon-hadoop module
Date Wed, 28 Mar 2018 03:19:42 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 2e1ddb542 -> c723947a7


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
deleted file mode 100644
index bc7b042..0000000
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.streaming
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, Sink}
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.dictionary.server.{DictionaryServer, NonSecureDictionaryServer}
-import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceProvider
-import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
-import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent,
LoadTablePreExecutionEvent}
-import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder,
LoadOption}
-import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
-import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
-import org.apache.carbondata.streaming.segment.StreamSegment
-
-/**
- * Stream sink factory
- */
-object StreamSinkFactory {
-
-  def createStreamTableSink(
-      sparkSession: SparkSession,
-      hadoopConf: Configuration,
-      carbonTable: CarbonTable,
-      parameters: Map[String, String]): Sink = {
-    validateParameters(parameters)
-
-    // build load model
-    val carbonLoadModel = buildCarbonLoadModelForStream(
-      sparkSession,
-      hadoopConf,
-      carbonTable,
-      parameters,
-      "")
-    // fire pre event before streamin is started
-    // in case of streaming options and optionsFinal can be same
-    val operationContext = new OperationContext
-    val loadTablePreExecutionEvent = new LoadTablePreExecutionEvent(
-      carbonTable.getCarbonTableIdentifier,
-      carbonLoadModel,
-      carbonLoadModel.getFactFilePath,
-      false,
-      parameters.asJava,
-      parameters.asJava,
-      false
-    )
-    OperationListenerBus.getInstance().fireEvent(loadTablePreExecutionEvent, operationContext)
-    // prepare the stream segment
-    val segmentId = getStreamSegmentId(carbonTable)
-    carbonLoadModel.setSegmentId(segmentId)
-
-    // start server if necessary
-    val server = startDictionaryServer(
-      sparkSession,
-      carbonTable,
-      carbonLoadModel)
-    if (server.isDefined) {
-      carbonLoadModel.setUseOnePass(true)
-    } else {
-      carbonLoadModel.setUseOnePass(false)
-    }
-    // default is carbon appended stream sink
-    val carbonAppendableStreamSink = new CarbonAppendableStreamSink(
-      sparkSession,
-      carbonTable,
-      segmentId,
-      parameters,
-      carbonLoadModel,
-      server)
-
-    // fire post event before streamin is started
-    val loadTablePostExecutionEvent = new LoadTablePostExecutionEvent(
-      carbonTable.getCarbonTableIdentifier,
-      carbonLoadModel
-    )
-    OperationListenerBus.getInstance().fireEvent(loadTablePostExecutionEvent, operationContext)
-    carbonAppendableStreamSink
-  }
-
-  private def validateParameters(parameters: Map[String, String]): Unit = {
-    val segmentSize = parameters.get(CarbonCommonConstants.HANDOFF_SIZE)
-    if (segmentSize.isDefined) {
-      try {
-        val value = java.lang.Long.parseLong(segmentSize.get)
-        if (value < CarbonCommonConstants.HANDOFF_SIZE_MIN) {
-          new CarbonStreamException(CarbonCommonConstants.HANDOFF_SIZE +
-                                    "should be bigger than or equal " +
-                                    CarbonCommonConstants.HANDOFF_SIZE_MIN)
-        }
-      } catch {
-        case _: NumberFormatException =>
-          new CarbonStreamException(CarbonCommonConstants.HANDOFF_SIZE +
-                                    s" $segmentSize is an illegal number")
-      }
-    }
-  }
-
-  /**
-   * get current stream segment id
-   * @return
-   */
-  private def getStreamSegmentId(carbonTable: CarbonTable): String = {
-    val segmentId = StreamSegment.open(carbonTable)
-    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
-    val fileType = FileFactory.getFileType(segmentDir)
-    if (!FileFactory.isFileExist(segmentDir, fileType)) {
-      // Create table directory path, in case of enabling hive metastore first load may not
have
-      // table folder created.
-      FileFactory.mkdirs(segmentDir, fileType)
-    }
-    if (FileFactory.isFileExist(segmentDir, fileType)) {
-      // recover fault
-      StreamSegment.recoverSegmentIfRequired(segmentDir)
-    } else {
-      FileFactory.mkdirs(segmentDir, fileType)
-    }
-    segmentId
-  }
-
-  def startDictionaryServer(
-      sparkSession: SparkSession,
-      carbonTable: CarbonTable,
-      carbonLoadModel: CarbonLoadModel): Option[DictionaryServer] = {
-    // start dictionary server when use one pass load and dimension with DICTIONARY
-    // encoding is present.
-    val allDimensions = carbonTable.getAllDimensions.asScala.toList
-    val createDictionary = allDimensions.exists {
-      carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
-                         !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
-    }
-    val carbonSecureModeDictServer = CarbonProperties.getInstance.
-      getProperty(CarbonCommonConstants.CARBON_SECURE_DICTIONARY_SERVER,
-        CarbonCommonConstants.CARBON_SECURE_DICTIONARY_SERVER_DEFAULT)
-
-    val sparkConf = sparkSession.sqlContext.sparkContext.getConf
-    val sparkDriverHost = sparkSession.sqlContext.sparkContext.
-      getConf.get("spark.driver.host")
-
-    val server: Option[DictionaryServer] = if (createDictionary) {
-      if (sparkConf.get("spark.authenticate", "false").equalsIgnoreCase("true") &&
-          carbonSecureModeDictServer.toBoolean) {
-        val dictionaryServer = SecureDictionaryServer.getInstance(sparkConf,
-          sparkDriverHost.toString, carbonLoadModel.getDictionaryServerPort, carbonTable)
-        carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
-        carbonLoadModel.setDictionaryServerHost(dictionaryServer.getHost)
-        carbonLoadModel.setDictionaryServerSecretKey(dictionaryServer.getSecretKey)
-        carbonLoadModel.setDictionaryEncryptServerSecure(dictionaryServer.isEncryptSecureServer)
-        carbonLoadModel.setDictionaryServiceProvider(new SecureDictionaryServiceProvider())
-        sparkSession.sparkContext.addSparkListener(new SparkListener() {
-          override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
-            dictionaryServer.shutdown()
-          }
-        })
-        Some(dictionaryServer)
-      } else {
-        val dictionaryServer = NonSecureDictionaryServer
-          .getInstance(carbonLoadModel.getDictionaryServerPort, carbonTable)
-        carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
-        carbonLoadModel.setDictionaryServerHost(dictionaryServer.getHost)
-        carbonLoadModel.setDictionaryEncryptServerSecure(false)
-        carbonLoadModel
-          .setDictionaryServiceProvider(new NonSecureDictionaryServiceProvider(dictionaryServer
-            .getPort))
-        sparkSession.sparkContext.addSparkListener(new SparkListener() {
-          override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
-            dictionaryServer.shutdown()
-          }
-        })
-        Some(dictionaryServer)
-      }
-    } else {
-      None
-    }
-    server
-  }
-
-  private def buildCarbonLoadModelForStream(
-      sparkSession: SparkSession,
-      hadoopConf: Configuration,
-      carbonTable: CarbonTable,
-      parameters: Map[String, String],
-      segmentId: String): CarbonLoadModel = {
-    val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
-    carbonProperty.addProperty("zookeeper.enable.lock", "false")
-    val optionsFinal = LoadOption.fillOptionWithDefaultValue(parameters.asJava)
-    optionsFinal.put("sort_scope", "no_sort")
-    if (parameters.get("fileheader").isEmpty) {
-      optionsFinal.put("fileheader", carbonTable.getCreateOrderColumn(carbonTable.getTableName)
-        .asScala.map(_.getColName).mkString(","))
-    }
-    val carbonLoadModel = new CarbonLoadModel()
-    new CarbonLoadModelBuilder(carbonTable).build(
-      parameters.asJava,
-      optionsFinal,
-      carbonLoadModel,
-      hadoopConf)
-    carbonLoadModel.setSegmentId(segmentId)
-    // stream should use one pass
-    val dictionaryServerPort = parameters.getOrElse(
-      CarbonCommonConstants.DICTIONARY_SERVER_PORT,
-      carbonProperty.getProperty(
-        CarbonCommonConstants.DICTIONARY_SERVER_PORT,
-        CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT))
-    val sparkDriverHost = sparkSession.sqlContext.sparkContext.
-      getConf.get("spark.driver.host")
-    carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
-    carbonLoadModel.setDictionaryServerPort(dictionaryServerPort.toInt)
-    carbonLoadModel
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
new file mode 100644
index 0000000..8661417
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming.parser
+
+import java.nio.charset.Charset
+import java.text.SimpleDateFormat
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+
+object FieldConverter {
+
+  /**
+   * Return a String representation of the input value
+   * @param value input value
+   * @param serializationNullFormat string for null value
+   * @param delimiterLevel1 level 1 delimiter for complex type
+   * @param delimiterLevel2 level 2 delimiter for complex type
+   * @param timeStampFormat timestamp format
+   * @param dateFormat date format
+   * @param level level for recursive call
+   */
+  def objectToString(
+      value: Any,
+      serializationNullFormat: String,
+      delimiterLevel1: String,
+      delimiterLevel2: String,
+      timeStampFormat: SimpleDateFormat,
+      dateFormat: SimpleDateFormat,
+      level: Int = 1): String = {
+    if (value == null) {
+      serializationNullFormat
+    } else {
+      value match {
+        case s: String => if (s.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT)
{
+          throw new Exception("Dataload failed, String length cannot exceed " +
+                              CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " characters")
+        } else {
+          s
+        }
+        case d: java.math.BigDecimal => d.toPlainString
+        case i: java.lang.Integer => i.toString
+        case d: java.lang.Double => d.toString
+        case t: java.sql.Timestamp => timeStampFormat format t
+        case d: java.sql.Date => dateFormat format d
+        case b: java.lang.Boolean => b.toString
+        case s: java.lang.Short => s.toString
+        case f: java.lang.Float => f.toString
+        case bs: Array[Byte] => new String(bs,
+          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))
+        case s: scala.collection.Seq[Any] =>
+          val delimiter = if (level == 1) {
+            delimiterLevel1
+          } else {
+            delimiterLevel2
+          }
+          val builder = new StringBuilder()
+          s.foreach { x =>
+            builder.append(objectToString(x, serializationNullFormat, delimiterLevel1,
+              delimiterLevel2, timeStampFormat, dateFormat, level + 1)).append(delimiter)
+          }
+          builder.substring(0, builder.length - delimiter.length())
+        case m: scala.collection.Map[Any, Any] =>
+          throw new Exception("Unsupported data type: Map")
+        case r: org.apache.spark.sql.Row =>
+          val delimiter = if (level == 1) {
+            delimiterLevel1
+          } else {
+            delimiterLevel2
+          }
+          val builder = new StringBuilder()
+          for (i <- 0 until r.length) {
+            builder.append(objectToString(r(i), serializationNullFormat, delimiterLevel1,
+              delimiterLevel2, timeStampFormat, dateFormat, level + 1)).append(delimiter)
+          }
+          builder.substring(0, builder.length - delimiter.length())
+        case other => other.toString
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
index 5a227cf..1696fdc 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.streaming.parser
 
+import java.nio.charset.Charset
 import java.text.SimpleDateFormat
 
 import org.apache.hadoop.conf.Configuration
@@ -27,7 +28,6 @@ import org.apache.spark.sql.types.StructType
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
-import org.apache.carbondata.spark.util.CarbonScalaUtil
 
 /**
  * SparkSQL Row Stream Parser.
@@ -61,12 +61,13 @@ class RowStreamParserImp extends CarbonStreamParser {
 
   override def parserRow(value: InternalRow): Array[Object] = {
     this.encoder.fromRow(value).toSeq.map { x => {
-      CarbonScalaUtil.getString(x,
-        serializationNullFormat, complexDelimiterLevel1, complexDelimiterLevel2,
+      FieldConverter.objectToString(
+        x, serializationNullFormat, complexDelimiterLevel1, complexDelimiterLevel2,
         timeStampFormat, dateFormat)
     } }.toArray
   }
 
   override def close(): Unit = {
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
deleted file mode 100644
index 6e6d092..0000000
--- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming
-
-import java.util.Date
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import org.apache.spark.TaskContext
-import org.apache.spark.internal.io.FileCommitProtocol
-import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
-import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.{QueryExecution, SQLExecution}
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.{SerializableConfiguration, Utils}
-
-import org.apache.carbondata.common.CarbonIterator
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.dictionary.server.DictionaryServer
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.stats.QueryStatistic
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
-import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
-import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
-import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent,
LoadTablePreExecutionEvent}
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.streaming.{CarbonStreamException, StreamHandoffRDD}
-import org.apache.carbondata.streaming.parser.CarbonStreamParser
-import org.apache.carbondata.streaming.segment.StreamSegment
-
-/**
- * an implement of stream sink, it persist each batch to disk by appending the batch data
to
- * data files.
- */
-class CarbonAppendableStreamSink(
-    sparkSession: SparkSession,
-    val carbonTable: CarbonTable,
-    var currentSegmentId: String,
-    parameters: Map[String, String],
-    carbonLoadModel: CarbonLoadModel,
-    server: Option[DictionaryServer]) extends Sink {
-
-  private val fileLogPath = CarbonTablePath.getStreamingLogDir(carbonTable.getTablePath)
-  private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, fileLogPath)
-  // prepare configuration
-  private val hadoopConf = {
-    val conf = sparkSession.sessionState.newHadoopConf()
-    // put all parameters into hadoopConf
-    parameters.foreach { entry =>
-      conf.set(entry._1, entry._2)
-    }
-    // properties below will be used for default CarbonStreamParser
-    conf.set("carbon_complex_delimiter_level_1",
-      carbonLoadModel.getComplexDelimiterLevel1)
-    conf.set("carbon_complex_delimiter_level_2",
-      carbonLoadModel.getComplexDelimiterLevel2)
-    conf.set(
-      DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT,
-      carbonLoadModel.getSerializationNullFormat().split(",")(1))
-    conf.set(
-      CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
-      carbonLoadModel.getTimestampformat())
-    conf.set(
-      CarbonCommonConstants.CARBON_DATE_FORMAT,
-      carbonLoadModel.getDateFormat())
-    conf
-  }
-  // segment max size(byte)
-  private val segmentMaxSize = hadoopConf.getLong(
-    CarbonCommonConstants.HANDOFF_SIZE,
-    CarbonProperties.getInstance().getHandoffSize
-  )
-
-  // auto handoff
-  private val enableAutoHandoff = hadoopConf.getBoolean(
-    CarbonCommonConstants.ENABLE_AUTO_HANDOFF,
-    CarbonProperties.getInstance().isEnableAutoHandoff
-  )
-
-  override def addBatch(batchId: Long, data: DataFrame): Unit = {
-    if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
-      CarbonAppendableStreamSink.LOGGER.info(s"Skipping already committed batch $batchId")
-    } else {
-
-      val statistic = new QueryStatistic()
-
-      // fire pre event on every batch add
-      // in case of streaming options and optionsFinal can be same
-      val operationContext = new OperationContext
-      val loadTablePreExecutionEvent = new LoadTablePreExecutionEvent(
-        carbonTable.getCarbonTableIdentifier,
-        carbonLoadModel,
-        carbonLoadModel.getFactFilePath,
-        false,
-        parameters.asJava,
-        parameters.asJava,
-        false
-      )
-      OperationListenerBus.getInstance().fireEvent(loadTablePreExecutionEvent, operationContext)
-      checkOrHandOffSegment()
-
-      // committer will record how this spark job commit its output
-      val committer = FileCommitProtocol.instantiate(
-        className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass,
-        jobId = batchId.toString,
-        outputPath = fileLogPath,
-        isAppend = false)
-
-      committer match {
-        case manifestCommitter: ManifestFileCommitProtocol =>
-          manifestCommitter.setupManifestOptions(fileLog, batchId)
-        case _ => // Do nothing
-      }
-
-      CarbonAppendableStreamSink.writeDataFileJob(
-        sparkSession,
-        carbonTable,
-        parameters,
-        batchId,
-        currentSegmentId,
-        data.queryExecution,
-        committer,
-        hadoopConf,
-        carbonLoadModel,
-        server)
-      // fire post event on every batch add
-      val loadTablePostExecutionEvent = new LoadTablePostExecutionEvent(
-        carbonTable.getCarbonTableIdentifier,
-        carbonLoadModel
-      )
-      OperationListenerBus.getInstance().fireEvent(loadTablePostExecutionEvent, operationContext)
-
-      statistic.addStatistics(s"add batch: $batchId", System.currentTimeMillis())
-      CarbonAppendableStreamSink.LOGGER.info(
-        s"${statistic.getMessage}, taken time(ms): ${statistic.getTimeTaken}")
-    }
-  }
-
-  /**
-   * if the directory size of current segment beyond the threshold, hand off new segment
-   */
-  private def checkOrHandOffSegment(): Unit = {
-    val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, currentSegmentId)
-    val fileType = FileFactory.getFileType(segmentDir)
-    if (segmentMaxSize <= StreamSegment.size(segmentDir)) {
-      val newSegmentId = StreamSegment.close(carbonTable, currentSegmentId)
-      currentSegmentId = newSegmentId
-      val newSegmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, currentSegmentId)
-      FileFactory.mkdirs(newSegmentDir, fileType)
-
-      // TODO trigger hand off operation
-      if (enableAutoHandoff) {
-        StreamHandoffRDD.startStreamingHandoffThread(
-          carbonLoadModel,
-          sparkSession,
-          false)
-      }
-    }
-  }
-}
-
-object CarbonAppendableStreamSink {
-
-  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  /**
-   * package the hadoop configuration and it will be passed to executor side from driver
side
-   */
-  case class WriteDataFileJobDescription(
-      serializableHadoopConf: SerializableConfiguration,
-      batchId: Long,
-      segmentId: String)
-
-  /**
-   * Run a spark job to append the newly arrived data to the existing row format
-   * file directly.
-   * If there are failure in the task, spark will re-try the task and
-   * carbon will do recovery by HDFS file truncate. (see StreamSegment.tryRecoverFromTaskFault)
-   * If there are job level failure, every files in the stream segment will do truncate
-   * if necessary. (see StreamSegment.tryRecoverFromJobFault)
-   */
-  def writeDataFileJob(
-      sparkSession: SparkSession,
-      carbonTable: CarbonTable,
-      parameters: Map[String, String],
-      batchId: Long,
-      segmentId: String,
-      queryExecution: QueryExecution,
-      committer: FileCommitProtocol,
-      hadoopConf: Configuration,
-      carbonLoadModel: CarbonLoadModel,
-      server: Option[DictionaryServer]): Unit = {
-
-    // create job
-    val job = Job.getInstance(hadoopConf)
-    job.setOutputKeyClass(classOf[Void])
-    job.setOutputValueClass(classOf[InternalRow])
-    val jobId = CarbonInputFormatUtil.getJobId(new Date, batchId.toInt)
-    job.setJobID(jobId)
-
-    val description = WriteDataFileJobDescription(
-      serializableHadoopConf = new SerializableConfiguration(job.getConfiguration),
-      batchId,
-      segmentId
-    )
-
-    // run write data file job
-    SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
-      var result: Array[TaskCommitMessage] = null
-      try {
-        committer.setupJob(job)
-        // initialize dictionary server
-        if (server.isDefined) {
-          server.get.initializeDictionaryGenerator(carbonTable)
-        }
-
-        val rowSchema = queryExecution.analyzed.schema
-        // write data file
-        result = sparkSession.sparkContext.runJob(queryExecution.toRdd,
-          (taskContext: TaskContext, iterator: Iterator[InternalRow]) => {
-            writeDataFileTask(
-              description,
-              carbonLoadModel,
-              sparkStageId = taskContext.stageId(),
-              sparkPartitionId = taskContext.partitionId(),
-              sparkAttemptNumber = taskContext.attemptNumber(),
-              committer,
-              iterator,
-              rowSchema
-            )
-          })
-
-        // write dictionary
-        if (server.isDefined) {
-          try {
-            server.get.writeTableDictionary(carbonTable.getCarbonTableIdentifier.getTableId)
-          } catch {
-            case _: Exception =>
-              LOGGER.error(
-                s"Error while writing dictionary file for ${carbonTable.getTableUniqueName}")
-              throw new Exception(
-                "Streaming ingest failed due to error while writing dictionary file")
-          }
-        }
-
-        // update data file info in index file
-        StreamSegment.updateIndexFile(
-          CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId))
-
-      } catch {
-        // catch fault of executor side
-        case t: Throwable =>
-          val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
-          StreamSegment.recoverSegmentIfRequired(segmentDir)
-          LOGGER.error(t, s"Aborting job ${ job.getJobID }.")
-          committer.abortJob(job)
-          throw new CarbonStreamException("Job failed to write data file", t)
-      }
-      committer.commitJob(job, result)
-      LOGGER.info(s"Job ${ job.getJobID } committed.")
-    }
-  }
-
-  /**
-   * execute a task for each partition to write a data file
-   */
-  def writeDataFileTask(
-      description: WriteDataFileJobDescription,
-      carbonLoadModel: CarbonLoadModel,
-      sparkStageId: Int,
-      sparkPartitionId: Int,
-      sparkAttemptNumber: Int,
-      committer: FileCommitProtocol,
-      iterator: Iterator[InternalRow],
-      rowSchema: StructType
-  ): TaskCommitMessage = {
-
-    val jobId = CarbonInputFormatUtil.getJobId(new Date, sparkStageId)
-    val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId)
-    val taskAttemptId = new TaskAttemptID(taskId, sparkAttemptNumber)
-
-    // Set up the attempt context required to use in the output committer.
-    val taskAttemptContext: TaskAttemptContext = {
-      // Set up the configuration object
-      val hadoopConf = description.serializableHadoopConf.value
-      CarbonStreamOutputFormat.setSegmentId(hadoopConf, description.segmentId)
-      hadoopConf.set("mapred.job.id", jobId.toString)
-      hadoopConf.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
-      hadoopConf.set("mapred.task.id", taskAttemptId.toString)
-      hadoopConf.setBoolean("mapred.task.is.map", true)
-      hadoopConf.setInt("mapred.task.partition", 0)
-      new TaskAttemptContextImpl(hadoopConf, taskAttemptId)
-    }
-
-    committer.setupTask(taskAttemptContext)
-
-    try {
-      Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
-
-        val parserName = taskAttemptContext.getConfiguration.get(
-          CarbonStreamParser.CARBON_STREAM_PARSER,
-          CarbonStreamParser.CARBON_STREAM_PARSER_DEFAULT)
-
-        val streamParser =
-          Class.forName(parserName).newInstance.asInstanceOf[CarbonStreamParser]
-        streamParser.initialize(taskAttemptContext.getConfiguration, rowSchema)
-
-        StreamSegment.appendBatchData(new InputIterator(iterator, streamParser),
-          taskAttemptContext, carbonLoadModel)
-      })(catchBlock = {
-        committer.abortTask(taskAttemptContext)
-        LOGGER.error(s"Job $jobId aborted.")
-      })
-      committer.commitTask(taskAttemptContext)
-    } catch {
-      case t: Throwable =>
-        throw new CarbonStreamException("Task failed while writing rows", t)
-    }
-  }
-
-  /**
-   * convert spark iterator to carbon iterator, so that java module can use it.
-   */
-  class InputIterator(rddIter: Iterator[InternalRow], streamParser: CarbonStreamParser)
-    extends CarbonIterator[Array[Object]] {
-
-    override def hasNext: Boolean = rddIter.hasNext
-
-    override def next: Array[Object] = {
-      streamParser.parserRow(rddIter.next())
-    }
-
-    override def close(): Unit = {
-      streamParser.close()
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
deleted file mode 100644
index 2f911c5..0000000
--- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming
-
-import java.util
-import java.util.UUID
-
-import org.apache.spark.SPARK_VERSION
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.streaming.StreamingQueryListener
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
-
-class CarbonStreamingQueryListener(spark: SparkSession) extends StreamingQueryListener {
-
-  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  private val cache = new util.HashMap[UUID, ICarbonLock]()
-
-  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
-    val streamQuery = spark.streams.get(event.id)
-    val qry = if (streamQuery.isInstanceOf[StreamExecution]) {
-      // adapt spark 2.1
-      streamQuery.asInstanceOf[StreamExecution]
-    } else {
-      // adapt spark 2.2 and later version
-      val clazz = Class.forName("org.apache.spark.sql.execution.streaming.StreamingQueryWrapper")
-      val method = clazz.getMethod("streamingQuery")
-      method.invoke(streamQuery).asInstanceOf[StreamExecution]
-    }
-    if (qry.sink.isInstanceOf[CarbonAppendableStreamSink]) {
-      LOGGER.info("Carbon streaming query started: " + event.id)
-      val sink = qry.sink.asInstanceOf[CarbonAppendableStreamSink]
-      val carbonTable = sink.carbonTable
-      val lock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-        LockUsage.STREAMING_LOCK)
-      if (lock.lockWithRetries()) {
-        LOGGER.info("Acquired the lock for stream table: " + carbonTable.getDatabaseName
+ "." +
-                    carbonTable.getTableName)
-        cache.put(event.id, lock)
-      } else {
-        LOGGER.error("Not able to acquire the lock for stream table:" +
-                     carbonTable.getDatabaseName + "." + carbonTable.getTableName)
-        throw new InterruptedException(
-          "Not able to acquire the lock for stream table: " + carbonTable.getDatabaseName
+ "." +
-          carbonTable.getTableName)
-      }
-    }
-  }
-
-  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit =
{
-  }
-
-  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit
= {
-    val lock = cache.remove(event.id)
-    if (null != lock) {
-      LOGGER.info("Carbon streaming query: " + event.id)
-      lock.unlock()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamInputFormatTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamInputFormatTest.java
b/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamInputFormatTest.java
new file mode 100644
index 0000000..a224446
--- /dev/null
+++ b/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamInputFormatTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.statusmanager.FileFormat;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CarbonStreamInputFormatTest extends TestCase {
+
+  private TaskAttemptID taskAttemptId;
+  private TaskAttemptContext taskAttemptContext;
+  private Configuration hadoopConf;
+  private AbsoluteTableIdentifier identifier;
+  private String tablePath;
+
+
+  @Override protected void setUp() throws Exception {
+    tablePath = new File("target/stream_input").getCanonicalPath();
+    String dbName = "default";
+    String tableName = "stream_table_input";
+    identifier = AbsoluteTableIdentifier.from(
+        tablePath,
+        new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
+
+    JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0);
+    TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
+    taskAttemptId = new TaskAttemptID(taskId, 0);
+
+    hadoopConf = new Configuration();
+    taskAttemptContext = new TaskAttemptContextImpl(hadoopConf, taskAttemptId);
+  }
+
+  private InputSplit buildInputSplit() throws IOException {
+    CarbonInputSplit carbonInputSplit = new CarbonInputSplit();
+    List<CarbonInputSplit> splitList = new ArrayList<>();
+    splitList.add(carbonInputSplit);
+    return new CarbonMultiBlockSplit(splitList, new String[] { "localhost" },
+        FileFormat.ROW_V1);
+  }
+
+  @Test public void testCreateRecordReader() {
+    try {
+      InputSplit inputSplit = buildInputSplit();
+      CarbonStreamInputFormat inputFormat = new CarbonStreamInputFormat();
+      RecordReader recordReader = inputFormat.createRecordReader(inputSplit, taskAttemptContext);
+      Assert.assertNotNull("Failed to create record reader", recordReader);
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.assertTrue(e.getMessage(), false);
+    }
+  }
+
+  @Override protected void tearDown() throws Exception {
+    super.tearDown();
+    if (tablePath != null) {
+      FileFactory.deleteAllFilesOfDir(new File(tablePath));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c723947a/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamOutputFormatTest.java
b/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamOutputFormatTest.java
new file mode 100644
index 0000000..af79483
--- /dev/null
+++ b/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamOutputFormatTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Date;
+import java.util.UUID;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.hadoop.testutil.StoreCreator;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CarbonStreamOutputFormatTest extends TestCase {
+
+  private Configuration hadoopConf;
+  private TaskAttemptID taskAttemptId;
+  private CarbonLoadModel carbonLoadModel;
+  private String tablePath;
+
+  @Override protected void setUp() throws Exception {
+    super.setUp();
+    JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0);
+    TaskID taskId = new TaskID(jobId, TaskType.MAP, 0);
+    taskAttemptId = new TaskAttemptID(taskId, 0);
+
+    hadoopConf = new Configuration();
+    hadoopConf.set("mapred.job.id", jobId.toString());
+    hadoopConf.set("mapred.tip.id", taskAttemptId.getTaskID().toString());
+    hadoopConf.set("mapred.task.id", taskAttemptId.toString());
+    hadoopConf.setBoolean("mapred.task.is.map", true);
+    hadoopConf.setInt("mapred.task.partition", 0);
+
+    tablePath = new File("target/stream_output").getCanonicalPath();
+    String dbName = "default";
+    String tableName = "stream_table_output";
+    AbsoluteTableIdentifier identifier =
+        AbsoluteTableIdentifier.from(
+            tablePath,
+            new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
+
+    CarbonTable table = StoreCreator.createTable(identifier);
+
+    String factFilePath = new File("../hadoop/src/test/resources/data.csv").getCanonicalPath();
+    carbonLoadModel = StoreCreator.buildCarbonLoadModel(table, factFilePath, identifier);
+  }
+
+  @Test public void testSetCarbonLoadModel() {
+    try {
+      CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel);
+    } catch (IOException e) {
+      Assert.assertTrue("Failed to config CarbonLoadModel for CarbonStreamOutputFromat",
false);
+    }
+  }
+
+  @Test public void testGetCarbonLoadModel() {
+    try {
+      CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel);
+      CarbonLoadModel model = CarbonStreamOutputFormat.getCarbonLoadModel(hadoopConf);
+
+      Assert.assertNotNull("Failed to get CarbonLoadModel", model);
+      Assert.assertTrue("CarbonLoadModel should be same with previous",
+          carbonLoadModel.getFactTimeStamp() == model.getFactTimeStamp());
+
+    } catch (IOException e) {
+      Assert.assertTrue("Failed to get CarbonLoadModel for CarbonStreamOutputFromat", false);
+    }
+  }
+
+  @Test public void testGetRecordWriter() {
+    CarbonStreamOutputFormat outputFormat = new CarbonStreamOutputFormat();
+    try {
+      CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel);
+      TaskAttemptContext taskAttemptContext =
+          new TaskAttemptContextImpl(hadoopConf, taskAttemptId);
+      RecordWriter recordWriter = outputFormat.getRecordWriter(taskAttemptContext);
+      Assert.assertNotNull("Failed to get CarbonStreamRecordWriter", recordWriter);
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.assertTrue(e.getMessage(), false);
+    }
+  }
+
+  @Override protected void tearDown() throws Exception {
+    super.tearDown();
+    if (tablePath != null) {
+      FileFactory.deleteAllFilesOfDir(new File(tablePath));
+    }
+  }
+}


Mime
View raw message