carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [17/25] carbondata git commit: [CARBONDATA-2212] Event fired while updating the status during streaming handoff
Date Sat, 03 Mar 2018 12:44:04 GMT
[CARBONDATA-2212] Event fired while updating the status during streaming handoff

This closes #2009


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cf2390a8
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cf2390a8
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cf2390a8

Branch: refs/heads/branch-1.3
Commit: cf2390a8c6dfa95242dacc55ca9f7a9b2020a964
Parents: bbe7376
Author: rahulforallp <rahul.kumar@knoldus.in>
Authored: Tue Feb 27 22:00:40 2018 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Sat Mar 3 18:04:11 2018 +0530

----------------------------------------------------------------------
 .../apache/carbondata/streaming/StreamHandoffRDD.scala | 13 +++++++++++++
 1 file changed, 13 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/cf2390a8/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index 35a3513..b03ee1e 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -39,9 +39,11 @@ import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
+import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostStatusUpdateEvent,
LoadTablePreStatusUpdateEvent}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, CompactionType}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
@@ -307,7 +309,18 @@ object StreamHandoffRDD {
         SegmentStatus.INSERT_IN_PROGRESS,
         carbonLoadModel.getFactTimeStamp,
         false)
+      val operationContext = new OperationContext()
+      val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
+        new LoadTablePreStatusUpdateEvent(
+          carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getCarbonTableIdentifier,
+          carbonLoadModel)
+      OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext)
+
       CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, carbonLoadModel, true, false)
+      val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent =
+        new LoadTablePostStatusUpdateEvent(carbonLoadModel)
+      OperationListenerBus.getInstance()
+        .fireEvent(loadTablePostStatusUpdateEvent, operationContext)
       // convert a streaming segment to columnar segment
       val status = new StreamHandoffRDD(
         sparkSession.sparkContext,


Mime
View raw message