carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From manishgupt...@apache.org
Subject carbondata git commit: [CARBONDATA-2311][Streaming] Fix bug to avoid to append data to streaming finish segment
Date Thu, 12 Apr 2018 08:56:05 GMT
Repository: carbondata
Updated Branches:
  refs/heads/branch-1.3 167260da8 -> da0cb4f6a


[CARBONDATA-2311][Streaming] Fix bug to avoid to append data to streaming finish segment

At the begin of each micro batch, check the status of current segment.if the status is streaming,
continue to use this segment
if the status is streaming finish, open new streaming segment to accept new streaming data

This closes #2163


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/da0cb4f6
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/da0cb4f6
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/da0cb4f6

Branch: refs/heads/branch-1.3
Commit: da0cb4f6aa1e73bf0961b86d087160f55ee3d696
Parents: 167260d
Author: QiangCai <qiangcai@qq.com>
Authored: Tue Apr 3 14:32:59 2018 +0800
Committer: manishgupta88 <tomanishgupta18@gmail.com>
Committed: Thu Apr 12 14:29:55 2018 +0530

----------------------------------------------------------------------
 docs/streaming-guide.md                         |  2 +-
 .../streaming/StreamSinkFactory.scala           | 35 ++++++++++++++++++
 .../streaming/CarbonAppendableStreamSink.scala  | 38 +++++++++++++-------
 .../CarbonStreamingQueryListener.scala          | 27 ++++----------
 4 files changed, 68 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/da0cb4f6/docs/streaming-guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming-guide.md b/docs/streaming-guide.md
index aa9eaef..3ea2881 100644
--- a/docs/streaming-guide.md
+++ b/docs/streaming-guide.md
@@ -133,7 +133,7 @@ streaming | The segment is running streaming ingestion
 streaming finish | The segment already finished streaming ingestion, <br /> it will
be handed off to a segment in the columnar format
 
 ## Change segment status
-Use below command to change the status of "streaming" segment to "streaming finish" segment.
+Use below command to change the status of "streaming" segment to "streaming finish" segment.
If the streaming application is running, this command will be blocked.
 ```sql
 ALTER TABLE streaming_table FINISH STREAMING
 ```

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da0cb4f6/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index 3366f51..aded292 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -17,6 +17,9 @@
 
 package org.apache.carbondata.streaming
 
+import java.io.IOException
+import java.util
+
 import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
@@ -24,10 +27,12 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, Sink}
 
+import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.{DictionaryServer, NonSecureDictionaryServer}
 import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceProvider
+import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
@@ -45,11 +50,41 @@ import org.apache.carbondata.streaming.segment.StreamSegment
  */
 object StreamSinkFactory {
 
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  val locks = new util.concurrent.ConcurrentHashMap[String, ICarbonLock]()
+
+  def lock(carbonTable: CarbonTable): Unit = {
+    val lock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
+      LockUsage.STREAMING_LOCK)
+    if (lock.lockWithRetries()) {
+      locks.put(carbonTable.getTableUniqueName, lock)
+      LOGGER.info("Acquired the streaming lock for stream table: " + carbonTable.getDatabaseName
+
+                  "." + carbonTable.getTableName)
+    } else {
+      LOGGER.error("Not able to acquire the streaming lock for stream table:" +
+        carbonTable.getDatabaseName + "." + carbonTable.getTableName)
+      throw new IOException(
+        "Not able to acquire the streaming lock for stream table: " +
+        carbonTable.getDatabaseName + "." + carbonTable.getTableName)
+    }
+  }
+
+  def unLock(tableUniqueName: String): Unit = {
+    val lock = locks.remove(tableUniqueName)
+    if (lock != null) {
+      lock.unlock()
+    }
+  }
+
   def createStreamTableSink(
       sparkSession: SparkSession,
       hadoopConf: Configuration,
       carbonTable: CarbonTable,
       parameters: Map[String, String]): Sink = {
+
+    lock(carbonTable)
+
     validateParameters(parameters)
 
     // build load model

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da0cb4f6/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 312d24e..6ede186 100644
--- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -166,22 +166,34 @@ class CarbonAppendableStreamSink(
    * if the directory size of current segment beyond the threshold, hand off new segment
    */
   private def checkOrHandOffSegment(): Unit = {
-    val segmentDir = carbonTablePath.getSegmentDir("0", currentSegmentId)
-    val fileType = FileFactory.getFileType(segmentDir)
-    if (segmentMaxSize <= StreamSegment.size(segmentDir)) {
-      val newSegmentId = StreamSegment.close(carbonTable, currentSegmentId)
-      currentSegmentId = newSegmentId
-      val newSegmentDir = carbonTablePath.getSegmentDir("0", currentSegmentId)
-      FileFactory.mkdirs(newSegmentDir, fileType)
 
-      // TODO trigger hand off operation
-      if (enableAutoHandoff) {
-        StreamHandoffRDD.startStreamingHandoffThread(
-          carbonLoadModel,
-          sparkSession,
-          false)
+    // get streaming segment, if not exists, create new streaming segment
+    val segmentId = StreamSegment.open(carbonTable)
+    if (segmentId.equals(currentSegmentId)) {
+      val segmentDir = carbonTablePath.getSegmentDir("0", currentSegmentId)
+      val fileType = FileFactory.getFileType(segmentDir)
+      if (segmentMaxSize <= StreamSegment.size(segmentDir)) {
+        val newSegmentId = StreamSegment.close(carbonTable, currentSegmentId)
+        currentSegmentId = newSegmentId
+        val newSegmentDir = carbonTablePath.getSegmentDir("0", currentSegmentId)
+        FileFactory.mkdirs(newSegmentDir, fileType)
+
+        // trigger hand off operation
+        if (enableAutoHandoff) {
+          StreamHandoffRDD.startStreamingHandoffThread(
+            carbonLoadModel,
+            sparkSession,
+            false)
+        }
       }
+    } else {
+      currentSegmentId = segmentId
+      val newSegmentDir =
+        carbonTablePath.getSegmentDir("0", currentSegmentId)
+      val fileType = FileFactory.getFileType(newSegmentDir)
+      FileFactory.mkdirs(newSegmentDir, fileType)
     }
+
   }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/da0cb4f6/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
index 2f911c5..d97ad40 100644
--- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonStreamingQueryListener.scala
@@ -20,18 +20,17 @@ package org.apache.spark.sql.execution.streaming
 import java.util
 import java.util.UUID
 
-import org.apache.spark.SPARK_VERSION
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.streaming.StreamingQueryListener
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
+import org.apache.carbondata.streaming.StreamSinkFactory
 
 class CarbonStreamingQueryListener(spark: SparkSession) extends StreamingQueryListener {
 
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
-  private val cache = new util.HashMap[UUID, ICarbonLock]()
+  private val cache = new util.HashMap[UUID, String]()
 
   override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
     val streamQuery = spark.streams.get(event.id)
@@ -48,19 +47,7 @@ class CarbonStreamingQueryListener(spark: SparkSession) extends StreamingQueryLi
       LOGGER.info("Carbon streaming query started: " + event.id)
       val sink = qry.sink.asInstanceOf[CarbonAppendableStreamSink]
       val carbonTable = sink.carbonTable
-      val lock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-        LockUsage.STREAMING_LOCK)
-      if (lock.lockWithRetries()) {
-        LOGGER.info("Acquired the lock for stream table: " + carbonTable.getDatabaseName
+ "." +
-                    carbonTable.getTableName)
-        cache.put(event.id, lock)
-      } else {
-        LOGGER.error("Not able to acquire the lock for stream table:" +
-                     carbonTable.getDatabaseName + "." + carbonTable.getTableName)
-        throw new InterruptedException(
-          "Not able to acquire the lock for stream table: " + carbonTable.getDatabaseName
+ "." +
-          carbonTable.getTableName)
-      }
+      cache.put(event.id, carbonTable.getTableUniqueName)
     }
   }
 
@@ -68,10 +55,10 @@ class CarbonStreamingQueryListener(spark: SparkSession) extends StreamingQueryLi
   }
 
   override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit
= {
-    val lock = cache.remove(event.id)
-    if (null != lock) {
-      LOGGER.info("Carbon streaming query: " + event.id)
-      lock.unlock()
+    val tableUniqueName = cache.remove(event.id)
+    if (null != tableUniqueName) {
+      LOGGER.info("Carbon streaming query End: " + event.id)
+      StreamSinkFactory.unLock(tableUniqueName)
     }
   }
 }


Mime
View raw message