From commits-return-14304-archive-asf-public=cust-asf.ponee.io@carbondata.apache.org Fri Nov 9 01:50:13 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 0124518076D for ; Fri, 9 Nov 2018 01:50:10 +0100 (CET) Received: (qmail 67775 invoked by uid 500); 9 Nov 2018 00:50:10 -0000 Mailing-List: contact commits-help@carbondata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.apache.org Delivered-To: mailing list commits@carbondata.apache.org Received: (qmail 67705 invoked by uid 99); 9 Nov 2018 00:50:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 09 Nov 2018 00:50:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 23587E1251; Fri, 9 Nov 2018 00:50:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: xuchuanyin@apache.org To: commits@carbondata.apache.org Date: Fri, 09 Nov 2018 00:50:13 -0000 Message-Id: In-Reply-To: <82ae6345caa840da9c17a99fcd7ade34@git.apache.org> References: <82ae6345caa840da9c17a99fcd7ade34@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/6] carbondata git commit: [CARBONDATA-3064] Support separate audit log http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala index c8c9a47..35b73d6 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala @@ -29,7 +29,6 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.util.CarbonException import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} @@ -39,7 +38,6 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableMergeIndexEvent, Event, OperationContext, OperationEventListener} import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePostExecutionEvent import org.apache.carbondata.processing.merger.CarbonDataMergerUtil -import org.apache.carbondata.spark.util.CommonUtil class MergeIndexEventListener extends OperationEventListener with Logging { val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) @@ -47,7 +45,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging { override def onEvent(event: Event, operationContext: OperationContext): Unit = { event match { case preStatusUpdateEvent: LoadTablePostExecutionEvent => - Audit.log(LOGGER, "Load post status event-listener called for merge index") + LOGGER.info("Load post status event-listener called for merge index") val loadModel = preStatusUpdateEvent.getCarbonLoadModel val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable val compactedSegments = loadModel.getMergedSegmentIds @@ -73,7 +71,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging { } } case alterTableCompactionPostEvent: AlterTableCompactionPostEvent => - Audit.log(LOGGER, "Merge index for compaction called") + LOGGER.info("Merge index for compaction called") val carbonTable = alterTableCompactionPostEvent.carbonTable val mergedLoads = alterTableCompactionPostEvent.compactedLoads val sparkSession = alterTableCompactionPostEvent.sparkSession @@ -84,8 +82,6 @@ class MergeIndexEventListener extends OperationEventListener with Logging { val carbonMainTable = alterTableMergeIndexEvent.carbonTable val sparkSession = alterTableMergeIndexEvent.sparkSession if (!carbonMainTable.isStreamingSink) { - Audit.log(LOGGER, s"Compaction request received for table " + - s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }") LOGGER.info(s"Merge Index request received for table " + s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }") val lock = CarbonLockFactory.getCarbonLockObj( @@ -130,16 +126,11 @@ class MergeIndexEventListener extends OperationEventListener with Logging { clearBlockDataMapCache(carbonMainTable, validSegmentIds) val requestMessage = "Compaction request completed for table " + s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }" - Audit.log(LOGGER, requestMessage) LOGGER.info(requestMessage) } else { val lockMessage = "Not able to acquire the compaction lock for table " + - s"${ carbonMainTable.getDatabaseName }.${ - carbonMainTable - .getTableName - }" - - Audit.log(LOGGER, lockMessage) + s"${ carbonMainTable.getDatabaseName }." + + s"${ carbonMainTable.getTableName}" LOGGER.error(lockMessage) CarbonException.analysisException( "Table is already locked for compaction. Please try after some time.") http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala index 081482c..271a19b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala @@ -22,10 +22,8 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command._ -import org.apache.carbondata.api.CarbonStore.LOGGER import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException} import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager} import org.apache.carbondata.core.datamap.status.DataMapStatusManager import org.apache.carbondata.core.metadata.ColumnarFormatVersion @@ -63,6 +61,11 @@ case class CarbonCreateDataMapCommand( case _ => null } + if (mainTable != null) { + setAuditTable(mainTable) + } + setAuditInfo(Map("provider" -> dmProviderName, "dmName" -> dataMapName) ++ dmProperties) + if (mainTable != null && !mainTable.getTableInfo.isTransactionalTable) { throw new MalformedCarbonCommandException("Unsupported operation on non transactional table") } @@ -75,7 +78,7 @@ case class CarbonCreateDataMapCommand( } } - if (mainTable !=null && CarbonUtil.getFormatVersion(mainTable) != ColumnarFormatVersion.V3) { + if (mainTable != null && CarbonUtil.getFormatVersion(mainTable) != ColumnarFormatVersion.V3) { throw new MalformedCarbonCommandException(s"Unsupported operation on table with " + s"V1 or V2 format data") } @@ -153,7 +156,6 @@ case class CarbonCreateDataMapCommand( systemFolderLocation, tableIdentifier, dmProviderName) OperationListenerBus.getInstance().fireEvent(createDataMapPostExecutionEvent, operationContext) - Audit.log(LOGGER, s"DataMap $dataMapName successfully added") Seq.empty } @@ -192,5 +194,7 @@ case class CarbonCreateDataMapCommand( } Seq.empty } + + override protected def opName: String = "CREATE DATAMAP" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala index be62ce4..267fedd 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala @@ -65,6 +65,8 @@ case class CarbonDataMapRebuildCommand( )(sparkSession) } + setAuditTable(table) + val provider = DataMapManager.get().getDataMapProvider(table, schema, sparkSession) provider.rebuild() @@ -87,4 +89,5 @@ case class CarbonDataMapRebuildCommand( Seq.empty } + override protected def opName: String = "REBUILD DATAMAP" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala index ae33aa8..3cee810 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala @@ -58,6 +58,7 @@ case class CarbonDataMapShowCommand(tableIdentifier: Option[TableIdentifier]) tableIdentifier match { case Some(table) => val carbonTable = CarbonEnv.getCarbonTable(table)(sparkSession) + setAuditTable(carbonTable) Checker.validateTableExists(table.database, table.table, sparkSession) if (carbonTable.hasDataMapSchema) { dataMapSchemaList.addAll(carbonTable.getTableInfo.getDataMapSchemaList) @@ -97,4 +98,6 @@ case class CarbonDataMapShowCommand(tableIdentifier: Option[TableIdentifier]) Seq.empty } } + + override protected def opName: String = "SHOW DATAMAP" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala index 67e2dee..38ec07d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala @@ -29,7 +29,6 @@ import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager} import org.apache.carbondata.core.datamap.status.DataMapStatusManager import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} @@ -59,6 +58,7 @@ case class CarbonDropDataMapCommand( var dataMapSchema: DataMapSchema = _ override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + setAuditInfo(Map("dmName" -> dataMapName)) if (table.isDefined) { val databaseNameOp = table.get.database val tableName = table.get.table @@ -78,6 +78,7 @@ case class CarbonDropDataMapCommand( null } } + setAuditTable(mainTable) val tableIdentifier = AbsoluteTableIdentifier .from(tablePath, @@ -112,8 +113,6 @@ case class CarbonDropDataMapCommand( locksToBeAcquired foreach { lock => carbonLocks += CarbonLockUtil.getLockObject(tableIdentifier, lock) } - Audit.log(LOGGER, s"Deleting datamap [$dataMapName] under table [$tableName]") - // drop index,mv datamap on the main table. if (mainTable != null && DataMapStoreManager.getInstance().getAllDataMap(mainTable).size() > 0) { @@ -242,4 +241,5 @@ case class CarbonDropDataMapCommand( Seq.empty } + override protected def opName: String = "DROP DATAMAP" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index 8e338db..1b1d708 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -33,7 +33,6 @@ import org.apache.spark.util.AlterTableUtil import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.compression.CompressorFactory import org.apache.carbondata.core.datastore.impl.FileFactory @@ -67,6 +66,10 @@ case class CarbonAlterTableCompactionCommand( val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val tableName = alterTableModel.tableName.toLowerCase val dbName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase) + setAuditTable(dbName, tableName) + if (alterTableModel.customSegmentIds.nonEmpty) { + setAuditInfo(Map("segmentIds" -> alterTableModel.customSegmentIds.get.mkString(", "))) + } table = if (tableInfoOp.isDefined) { CarbonTable.buildFromTableInfo(tableInfoOp.get) } else { @@ -217,8 +220,6 @@ case class CarbonAlterTableCompactionCommand( } } - Audit.log(LOGGER, s"Compaction request received for table " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable if (null == carbonLoadModel.getLoadMetadataDetails) { @@ -314,8 +315,6 @@ case class CarbonAlterTableCompactionCommand( throw e } } else { - Audit.log(LOGGER, "Not able to acquire the compaction lock for table " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") LOGGER.error(s"Not able to acquire the compaction lock for table" + s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") CarbonException.analysisException( @@ -379,4 +378,8 @@ case class CarbonAlterTableCompactionCommand( } } } + + override protected def opName: String = { + s"ALTER TABLE COMPACTION ${alterTableModel.compactionType.toUpperCase}" + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala index ba20773..8df0217 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala @@ -33,6 +33,7 @@ case class CarbonAlterTableFinishStreaming( extends MetadataCommand { override def processMetadata(sparkSession: SparkSession): Seq[Row] = { val carbonTable = CarbonEnv.getCarbonTable(dbName, tableName)(sparkSession) + setAuditTable(carbonTable) val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val streamingLock = CarbonLockFactory.getCarbonLockObj( carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(), @@ -58,4 +59,6 @@ case class CarbonAlterTableFinishStreaming( } Seq.empty } + + override protected def opName: String = "ALTER TABLE FINISH STREAMING" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala index a390191..4b5b4b6 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala @@ -57,7 +57,10 @@ case class CarbonCleanFilesCommand( override def processMetadata(sparkSession: SparkSession): Seq[Row] = { carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName.get)(sparkSession) - + setAuditTable(carbonTable) + setAuditInfo(Map( + "force" -> forceTableClean.toString, + "internal" -> isInternalCleanCall.toString)) if (carbonTable.hasAggregationDataMap) { cleanFileCommands = carbonTable.getTableInfo.getDataMapSchemaList.asScala.map { dataMapSchema => @@ -150,4 +153,6 @@ case class CarbonCleanFilesCommand( .error("Failed to clean in progress segments", e) } } + + override protected def opName: String = "CLEAN FILES" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCliCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCliCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCliCommand.scala index bf5adc3..d1a54d0 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCliCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCliCommand.scala @@ -47,6 +47,8 @@ case class CarbonCliCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { Checker.validateTableExists(databaseNameOp, tableName, sparkSession) val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) + setAuditTable(carbonTable) + setAuditInfo(Map("options" -> commandOptions)) val commandArgs: Seq[String] = commandOptions.split("\\s+") val finalCommands = commandArgs.collect { case a if a.trim.equalsIgnoreCase("summary") || a.trim.equalsIgnoreCase("benchmark") => @@ -59,4 +61,6 @@ case class CarbonCliCommand( Row(x) ) } + + override protected def opName: String = "CLI" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala index 165a032..275a0fd 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala @@ -35,7 +35,8 @@ case class CarbonDeleteLoadByIdCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { Checker.validateTableExists(databaseNameOp, tableName, sparkSession) val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) - + setAuditTable(carbonTable) + setAuditInfo(Map("segmentIds" -> loadIds.mkString(", "))) if (!carbonTable.getTableInfo.isTransactionalTable) { throw new MalformedCarbonCommandException("Unsupported operation on non transactional table") } @@ -66,4 +67,6 @@ case class CarbonDeleteLoadByIdCommand( OperationListenerBus.getInstance.fireEvent(deleteSegmentPostEvent, operationContext) Seq.empty } + + override protected def opName: String = "DELETE SEGMENT BY ID" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala index 19f5100..db1b7b3 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala @@ -36,7 +36,8 @@ case class CarbonDeleteLoadByLoadDateCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { Checker.validateTableExists(databaseNameOp, tableName, sparkSession) val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) - + setAuditTable(carbonTable) + setAuditInfo(Map("date" -> dateField)) if (!carbonTable.getTableInfo.isTransactionalTable) { throw new MalformedCarbonCommandException("Unsupported operation on non transactional table") } @@ -66,4 +67,6 @@ case class CarbonDeleteLoadByLoadDateCommand( Seq.empty } + + override protected def opName: String = "DELETE SEGMENT BY DATE" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala index ee0f5ab..2a64b19 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala @@ -36,6 +36,7 @@ case class CarbonInsertIntoCommand( var loadCommand: CarbonLoadDataCommand = _ override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + setAuditTable(relation.carbonTable.getDatabaseName, relation.carbonTable.getTableName) val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) def containsLimit(plan: LogicalPlan): Boolean = { plan find { @@ -82,9 +83,19 @@ case class CarbonInsertIntoCommand( } override def processData(sparkSession: SparkSession): Seq[Row] = { if (null != loadCommand) { - loadCommand.processData(sparkSession) + val rows = loadCommand.processData(sparkSession) + setAuditInfo(loadCommand.auditInfo) + rows } else { Seq.empty } } + + override protected def opName: String = { + if (overwrite) { + "INSERT OVERWRITE" + } else { + "INSERT INTO" + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 5fbe82e..bba8af7 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -48,9 +48,8 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils} -import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.common.Strings import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.converter.SparkDataTypeConverterImpl import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} import org.apache.carbondata.core.datamap.DataMapStoreManager @@ -65,12 +64,11 @@ import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum} -import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util._ import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, BuildDataMapPreExecutionEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.events.exception.PreEventException -import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.TableProcessingOperations import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent} import org.apache.carbondata.processing.loading.exception.NoRetryException @@ -113,6 +111,7 @@ case class CarbonLoadDataCommand( override def processMetadata(sparkSession: SparkSession): Seq[Row] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession) + setAuditTable(dbName, tableName) table = if (tableInfoOp.isDefined) { CarbonTable.buildFromTableInfo(tableInfoOp.get) } else { @@ -123,7 +122,6 @@ case class CarbonLoadDataCommand( } if (null == relation.carbonTable) { LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName") - Audit.log(LOGGER, s"Data loading failed. table not found: $dbName.$tableName") throw new NoSuchTableException(dbName, tableName) } relation.carbonTable @@ -189,13 +187,12 @@ case class CarbonLoadDataCommand( val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession) val hadoopConf = sparkSession.sessionState.newHadoopConf() val carbonLoadModel = new CarbonLoadModel() - try { - val tableProperties = table.getTableInfo.getFactTable.getTableProperties - val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava) - optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope", - carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE, - carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, - CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)))) + val tableProperties = table.getTableInfo.getFactTable.getTableProperties + val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava) + optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope", + carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE, + carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, + CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)))) optionsFinal .put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath(options.asJava, table)) @@ -216,176 +213,158 @@ case class CarbonLoadDataCommand( CompressorFactory.getInstance().getCompressor.getName) carbonLoadModel.setColumnCompressor(columnCompressor) - val javaPartition = mutable.Map[String, String]() - partition.foreach { case (k, v) => - if (v.isEmpty) javaPartition(k) = null else javaPartition(k) = v.get + val javaPartition = mutable.Map[String, String]() + partition.foreach { case (k, v) => + if (v.isEmpty) javaPartition(k) = null else javaPartition(k) = v.get + } + + new CarbonLoadModelBuilder(table).build( + options.asJava, + optionsFinal, + carbonLoadModel, + hadoopConf, + javaPartition.asJava, + dataFrame.isDefined) + // Delete stale segment folders that are not in table status but are physically present in + // the Fact folder + LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName") + TableProcessingOperations.deletePartialLoadDataIfExist(table, false) + var isUpdateTableStatusRequired = false + // if the table is child then extract the uuid from the operation context and the parent would + // already generated UUID. + // if parent table then generate a new UUID else use empty. + val uuid = if (table.isChildDataMap) { + Option(operationContext.getProperty("uuid")).getOrElse("").toString + } else if (table.hasAggregationDataMap) { + UUID.randomUUID().toString + } else { + "" + } + try { + operationContext.setProperty("uuid", uuid) + val loadTablePreExecutionEvent: LoadTablePreExecutionEvent = + new LoadTablePreExecutionEvent( + table.getCarbonTableIdentifier, + carbonLoadModel) + operationContext.setProperty("isOverwrite", isOverwriteTable) + OperationListenerBus.getInstance.fireEvent(loadTablePreExecutionEvent, operationContext) + // Add pre event listener for index datamap + val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(table) + val dataMapOperationContext = new OperationContext() + if (tableDataMaps.size() > 0) { + val dataMapNames: mutable.Buffer[String] = + tableDataMaps.asScala.map(dataMap => dataMap.getDataMapSchema.getDataMapName) + val buildDataMapPreExecutionEvent: BuildDataMapPreExecutionEvent = + new BuildDataMapPreExecutionEvent(sparkSession, + table.getAbsoluteTableIdentifier, dataMapNames) + OperationListenerBus.getInstance().fireEvent(buildDataMapPreExecutionEvent, + dataMapOperationContext) + } + // First system has to partition the data first and then call the load data + LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)") + concurrentLoadLock = acquireConcurrentLoadLock() + // Clean up the old invalid segment data before creating a new entry for new load. + SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions) + // add the start entry for the new load in the table status file + if (updateModel.isEmpty && !table.isHivePartitionTable) { + CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta( + carbonLoadModel, + isOverwriteTable) + isUpdateTableStatusRequired = true + } + if (isOverwriteTable) { + LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress") + } + // if table is an aggregate table then disable single pass. + if (carbonLoadModel.isAggLoadRequest) { + carbonLoadModel.setUseOnePass(false) } - new CarbonLoadModelBuilder(table).build( - options.asJava, - optionsFinal, - carbonLoadModel, - hadoopConf, - javaPartition.asJava, - dataFrame.isDefined) - // Delete stale segment folders that are not in table status but are physically present in - // the Fact folder - LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName") - TableProcessingOperations.deletePartialLoadDataIfExist(table, false) - var isUpdateTableStatusRequired = false - // if the table is child then extract the uuid from the operation context and the parent would - // already generated UUID. - // if parent table then generate a new UUID else use empty. - val uuid = if (table.isChildDataMap) { - Option(operationContext.getProperty("uuid")).getOrElse("").toString - } else if (table.hasAggregationDataMap) { - UUID.randomUUID().toString - } else { - "" + // start dictionary server when use one pass load and dimension with DICTIONARY + // encoding is present. + val allDimensions = + carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAllDimensions.asScala.toList + val createDictionary = allDimensions.exists { + carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) && + !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) } - try { - operationContext.setProperty("uuid", uuid) - val loadTablePreExecutionEvent: LoadTablePreExecutionEvent = - new LoadTablePreExecutionEvent( - table.getCarbonTableIdentifier, - carbonLoadModel) - operationContext.setProperty("isOverwrite", isOverwriteTable) - OperationListenerBus.getInstance.fireEvent(loadTablePreExecutionEvent, operationContext) - // Add pre event listener for index datamap - val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(table) - val dataMapOperationContext = new OperationContext() - if (tableDataMaps.size() > 0) { - val dataMapNames: mutable.Buffer[String] = - tableDataMaps.asScala.map(dataMap => dataMap.getDataMapSchema.getDataMapName) - val buildDataMapPreExecutionEvent: BuildDataMapPreExecutionEvent = - new BuildDataMapPreExecutionEvent(sparkSession, - table.getAbsoluteTableIdentifier, dataMapNames) - OperationListenerBus.getInstance().fireEvent(buildDataMapPreExecutionEvent, - dataMapOperationContext) - } - // First system has to partition the data first and then call the load data - LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)") - concurrentLoadLock = acquireConcurrentLoadLock() - // Clean up the old invalid segment data before creating a new entry for new load. - SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions) - // add the start entry for the new load in the table status file - if (updateModel.isEmpty && !table.isHivePartitionTable) { - CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta( - carbonLoadModel, - isOverwriteTable) - isUpdateTableStatusRequired = true - } - if (isOverwriteTable) { - LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress") - } - // if table is an aggregate table then disable single pass. - if (carbonLoadModel.isAggLoadRequest) { - carbonLoadModel.setUseOnePass(false) + if (!createDictionary) { + carbonLoadModel.setUseOnePass(false) + } + // Create table and metadata folders if not exist + if (carbonLoadModel.isCarbonTransactionalTable) { + val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath) + val fileType = FileFactory.getFileType(metadataDirectoryPath) + if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) { + FileFactory.mkdirs(metadataDirectoryPath, fileType) } + } else { + carbonLoadModel.setSegmentId(System.currentTimeMillis().toString) + } + val partitionStatus = SegmentStatus.SUCCESS + val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean + if (carbonLoadModel.getUseOnePass) { + loadDataUsingOnePass( + sparkSession, + carbonProperty, + carbonLoadModel, + columnar, + partitionStatus, + hadoopConf, + operationContext, + LOGGER) + } else { + loadData( + sparkSession, + carbonLoadModel, + columnar, + partitionStatus, + hadoopConf, + operationContext, + LOGGER) + } + val loadTablePostExecutionEvent: LoadTablePostExecutionEvent = + new LoadTablePostExecutionEvent( + table.getCarbonTableIdentifier, + carbonLoadModel) + OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext) + if (tableDataMaps.size() > 0) { + val buildDataMapPostExecutionEvent = BuildDataMapPostExecutionEvent(sparkSession, + table.getAbsoluteTableIdentifier, null, Seq(carbonLoadModel.getSegmentId), false) + OperationListenerBus.getInstance() + .fireEvent(buildDataMapPostExecutionEvent, dataMapOperationContext) + } - // start dictionary server when use one pass load and dimension with DICTIONARY - // encoding is present. - val allDimensions = - carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAllDimensions.asScala.toList - val createDictionary = allDimensions.exists { - carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) && - !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY) - } - if (!createDictionary) { - carbonLoadModel.setUseOnePass(false) - } - // Create table and metadata folders if not exist - if (carbonLoadModel.isCarbonTransactionalTable) { - val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath) - val fileType = FileFactory.getFileType(metadataDirectoryPath) - if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) { - FileFactory.mkdirs(metadataDirectoryPath, fileType) - } - } else { - carbonLoadModel.setSegmentId(System.currentTimeMillis().toString) - } - val partitionStatus = SegmentStatus.SUCCESS - val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean - if (carbonLoadModel.getUseOnePass) { - loadDataUsingOnePass( - sparkSession, - carbonProperty, - carbonLoadModel, - columnar, - partitionStatus, - hadoopConf, - operationContext, - LOGGER) - } else { - loadData( - sparkSession, - carbonLoadModel, - columnar, - partitionStatus, - hadoopConf, - operationContext, - LOGGER) - } - val loadTablePostExecutionEvent: LoadTablePostExecutionEvent = - new LoadTablePostExecutionEvent( - table.getCarbonTableIdentifier, - carbonLoadModel) - OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext) - if (tableDataMaps.size() > 0) { - val buildDataMapPostExecutionEvent = BuildDataMapPostExecutionEvent(sparkSession, - table.getAbsoluteTableIdentifier, null, Seq(carbonLoadModel.getSegmentId), false) - OperationListenerBus.getInstance() - .fireEvent(buildDataMapPostExecutionEvent, dataMapOperationContext) + } catch { + case CausedBy(ex: NoRetryException) => + // update the load entry in table status file for changing the status to marked for delete + if (isUpdateTableStatusRequired) { + CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid) } - - } catch { - case CausedBy(ex: NoRetryException) => - // update the load entry in table status file for changing the status to marked for delete - if (isUpdateTableStatusRequired) { - CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid) - } - LOGGER.error(s"Dataload failure for $dbName.$tableName", ex) - throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}") - // In case of event related exception - case preEventEx: PreEventException => - LOGGER.error(s"Dataload failure for $dbName.$tableName", preEventEx) - throw new AnalysisException(preEventEx.getMessage) - case ex: Exception => - LOGGER.error(ex) - // update the load entry in table status file for changing the status to marked for delete - if (isUpdateTableStatusRequired) { - CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid) - } - Audit.log(LOGGER, s"Dataload failure for $dbName.$tableName. Please check the logs") - throw ex - } finally { - releaseConcurrentLoadLock(concurrentLoadLock, LOGGER) - // Once the data load is successful delete the unwanted partition files - try { - val partitionLocation = CarbonProperties.getStorePath + "/partition/" + - table.getDatabaseName + "/" + - table.getTableName + "/" - val fileType = FileFactory.getFileType(partitionLocation) - if (FileFactory.isFileExist(partitionLocation, fileType)) { - val file = FileFactory.getCarbonFile(partitionLocation, fileType) - CarbonUtil.deleteFoldersAndFiles(file) - } - } catch { - case ex: Exception => - LOGGER.error(ex) - Audit.log(LOGGER, s"Dataload failure for $dbName.$tableName. " + - "Problem deleting the partition folder") - throw ex + LOGGER.error(s"Dataload failure for $dbName.$tableName", ex) + throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}") + // In case of event related exception + case preEventEx: PreEventException => + LOGGER.error(s"Dataload failure for $dbName.$tableName", preEventEx) + throw new AnalysisException(preEventEx.getMessage) + case ex: Exception => + LOGGER.error(ex) + // update the load entry in table status file for changing the status to marked for delete + if (isUpdateTableStatusRequired) { + CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid) } - + throw ex + } finally { + releaseConcurrentLoadLock(concurrentLoadLock, LOGGER) + // Once the data load is successful delete the unwanted partition files + val partitionLocation = CarbonProperties.getStorePath + "/partition/" + + table.getDatabaseName + "/" + + table.getTableName + "/" + val fileType = FileFactory.getFileType(partitionLocation) + if (FileFactory.isFileExist(partitionLocation, fileType)) { + val file = FileFactory.getCarbonFile(partitionLocation, fileType) + CarbonUtil.deleteFoldersAndFiles(file) } - } catch { - case dle: DataLoadingException => - Audit.log(LOGGER, s"Dataload failed for $dbName.$tableName. " + dle.getMessage) - throw dle - case mce: MalformedCarbonCommandException => - Audit.log(LOGGER, s"Dataload failed for $dbName.$tableName. " + mce.getMessage) - throw mce } Seq.empty } @@ -543,7 +522,7 @@ case class CarbonLoadDataCommand( } } } else { - CarbonDataRDDFactory.loadCarbonData( + val loadResult = CarbonDataRDDFactory.loadCarbonData( sparkSession.sqlContext, carbonLoadModel, columnar, @@ -554,10 +533,25 @@ case class CarbonLoadDataCommand( loadDataFrame, updateModel, operationContext) + if (loadResult != null) { + val info = makeAuditInfo(loadResult) + setAuditInfo(info) + } } rows } + private def makeAuditInfo(loadResult: LoadMetadataDetails): Map[String, String] = { + if (loadResult != null) { + Map( + "SegmentId" -> loadResult.getLoadName, + "DataSize" -> Strings.formatSize(java.lang.Long.parseLong(loadResult.getDataSize)), + "IndexSize" -> Strings.formatSize(java.lang.Long.parseLong(loadResult.getIndexSize))) + } else { + Map() + } + } + private def loadData( sparkSession: SparkSession, carbonLoadModel: CarbonLoadModel, @@ -593,7 +587,7 @@ case class CarbonLoadDataCommand( loadDataFrame, operationContext, LOGGER) } else { - CarbonDataRDDFactory.loadCarbonData( + val loadResult = CarbonDataRDDFactory.loadCarbonData( sparkSession.sqlContext, carbonLoadModel, columnar, @@ -604,6 +598,8 @@ case class CarbonLoadDataCommand( loadDataFrame, updateModel, operationContext) + val info = makeAuditInfo(loadResult) + setAuditInfo(info) } rows } @@ -1132,4 +1128,11 @@ case class CarbonLoadDataCommand( (dataFrameWithTupleId) } + override protected def opName: String = { + if (isOverwriteTable) { + "LOAD DATA OVERWRITE" + } else { + "LOAD DATA" + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala index 3f68cc4..4a35e6e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala @@ -59,6 +59,7 @@ case class CarbonShowLoadsCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { Checker.validateTableExists(databaseNameOp, tableName, sparkSession) val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) + setAuditTable(carbonTable) if (!carbonTable.getTableInfo.isTransactionalTable) { throw new MalformedCarbonCommandException("Unsupported operation on non transactional table") } @@ -68,4 +69,6 @@ case class CarbonShowLoadsCommand( showHistory ) } + + override protected def opName: String = "SHOW SEGMENTS" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala index 50b88c8..b35c285 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala @@ -29,7 +29,6 @@ import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, Me import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.indexstore.PartitionSpec @@ -54,6 +53,7 @@ case class RefreshCarbonTableCommand( override def processMetadata(sparkSession: SparkSession): Seq[Row] = { val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore val databaseName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession) + setAuditTable(databaseName, tableName) // Steps // 1. get table path // 2. perform the below steps @@ -92,7 +92,6 @@ case class RefreshCarbonTableCommand( val msg = s"Table registration with Database name [$databaseName] and Table name " + s"[$tableName] failed. All the aggregate Tables for table [$tableName] is" + s" not copied under database [$databaseName]" - Audit.log(LOGGER, msg) throwMetadataException(databaseName, tableName, msg) } // 2.2.1 Register the aggregate tables to hive @@ -104,18 +103,7 @@ case class RefreshCarbonTableCommand( tableInfo.getFactTable.getPartitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) { registerAllPartitionsToHive(identifier, sparkSession) } - } else { - Audit.log(LOGGER, - s"Table registration with Database name [$databaseName] and Table name [$tableName] " + - s"failed." + - s"Table [$tableName] either non carbon table or stale carbon table under database " + - s"[$databaseName]") } - } else { - Audit.log(LOGGER, - s"Table registration with Database name [$databaseName] and Table name [$tableName] " + - s"failed." + - s"Table [$tableName] either already exists or registered under database [$databaseName]") } // update the schema modified time metaStore.updateAndTouchSchemasUpdatedTime() @@ -185,8 +173,6 @@ case class RefreshCarbonTableCommand( OperationListenerBus.getInstance.fireEvent(refreshTablePreExecutionEvent, operationContext) CarbonCreateTableCommand(tableInfo, ifNotExistsSet = false, tableLocation = Some(tablePath)) .run(sparkSession) - Audit.log(LOGGER, s"Table registration with Database name [$dbName] and Table name " + - s"[$tableName] is successful.") } catch { case e: AnalysisException => throw e case e: Exception => @@ -288,4 +274,6 @@ case class RefreshCarbonTableCommand( AlterTableAddPartitionCommand(identifier, specs, true).run(sparkSession) } } + + override protected def opName: String = "REFRESH TABLE" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala index 053937b..70a4350 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala @@ -21,14 +21,11 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command._ -import org.apache.carbondata.api.CarbonStore.LOGGER import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.features.TableOperation import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage} -import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.events.{DeleteFromTablePostEvent, DeleteFromTablePreEvent, OperationContext, OperationListenerBus} @@ -48,6 +45,8 @@ private[sql] case class CarbonProjectForDeleteCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) + setAuditTable(carbonTable) + setAuditInfo(Map("plan" -> plan.simpleString)) if (!carbonTable.getTableInfo.isTransactionalTable) { throw new MalformedCarbonCommandException("Unsupported operation on non transactional table") } @@ -81,8 +80,6 @@ private[sql] case class CarbonProjectForDeleteCommand( var lockStatus = false try { lockStatus = metadataLock.lockWithRetries() - Audit.log(LOGGER, s" Delete data request has been received " + - s"for ${carbonTable.getDatabaseName}.${carbonTable.getTableName}.") if (lockStatus) { LOGGER.info("Successfully able to get the table metadata file lock") } else { @@ -140,4 +137,6 @@ private[sql] case class CarbonProjectForDeleteCommand( } Seq.empty } + + override protected def opName: String = "DELETE DATA" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala index 31e1779..0f23081 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala @@ -60,6 +60,8 @@ private[sql] case class CarbonProjectForUpdateCommand( return Seq.empty } val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) + setAuditTable(carbonTable) + setAuditInfo(Map("plan" -> plan.simpleString)) columns.foreach { col => val dataType = carbonTable.getColumnByName(tableName, col).getColumnSchema.getDataType if (dataType.isComplexType) { @@ -276,4 +278,6 @@ private[sql] case class CarbonProjectForUpdateCommand( Seq.empty } + + override protected def opName: String = "UPDATE DATA" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala index d118539..0f68004 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala @@ -32,8 +32,6 @@ import org.apache.spark.sql.execution.command.ExecutionErrors import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.util.SparkSQLUtil -import org.apache.carbondata.api.CarbonStore.LOGGER -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.Segment @@ -166,7 +164,6 @@ object DeleteExecution { } else { // In case of failure , clean all related delete delta files CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp) - Audit.log(LOGGER, s"Delete data operation is failed for ${ database }.${ tableName }") val errorMsg = "Delete data operation is failed due to failure in creating delete delta file for " + "segment : " + resultOfBlock._2._1.getSegmentName + " block : " + @@ -201,19 +198,14 @@ object DeleteExecution { listOfSegmentToBeMarkedDeleted) ) { LOGGER.info(s"Delete data operation is successful for ${ database }.${ tableName }") - Audit.log(LOGGER, s"Delete data operation is successful for ${ database }.${ tableName }") - } - else { + } else { // In case of failure , clean all related delete delta files CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp) - val errorMessage = "Delete data operation is failed due to failure " + "in table status updation." - Audit.log(LOGGER, s"Delete data operation is failed for ${ database }.${ tableName }") LOGGER.error("Delete data operation is failed due to failure in table status updation.") executorErrors.failureCauses = FailureCauses.STATUS_FILE_UPDATION_FAILURE executorErrors.errorMsg = errorMessage - // throw new Exception(errorMessage) } } @@ -290,12 +282,10 @@ object DeleteExecution { deleteStatus = SegmentStatus.SUCCESS } catch { case e : MultipleMatchingException => - Audit.log(LOGGER, e.getMessage) LOGGER.error(e.getMessage) // dont throw exception here. case e: Exception => val errorMsg = s"Delete data operation is failed for ${ database }.${ tableName }." - Audit.log(LOGGER, errorMsg) LOGGER.error(errorMsg + e.getMessage) throw e } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala index 3472d8a..6224d0d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala @@ -28,7 +28,6 @@ import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompact import org.apache.spark.sql.util.SparkSQLUtil import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable @@ -130,7 +129,6 @@ object HorizontalCompaction { } LOG.info(s"Horizontal Update Compaction operation started for [$db.$table].") - Audit.log(LOG, s"Horizontal Update Compaction operation started for [$db.$table].") try { // Update Compaction. @@ -154,7 +152,6 @@ object HorizontalCompaction { s"Horizontal Update Compaction Failed for [${ db }.${ table }]. " + msg, factTimeStamp) } LOG.info(s"Horizontal Update Compaction operation completed for [${ db }.${ table }].") - Audit.log(LOG, s"Horizontal Update Compaction operation completed for [${ db }.${ table }].") } /** @@ -180,7 +177,6 @@ object HorizontalCompaction { } LOG.info(s"Horizontal Delete Compaction operation started for [$db.$table].") - Audit.log(LOG, s"Horizontal Delete Compaction operation started for [$db.$table].") try { @@ -225,7 +221,6 @@ object HorizontalCompaction { timestamp.toString, segmentUpdateStatusManager) if (updateStatus == false) { - Audit.log(LOG, s"Delete Compaction data operation is failed for [$db.$table].") LOG.error("Delete Compaction data operation is failed.") throw new HorizontalCompactionException( s"Horizontal Delete Compaction Failed for [$db.$table] ." + @@ -233,7 +228,6 @@ object HorizontalCompaction { } else { LOG.info(s"Horizontal Delete Compaction operation completed for [$db.$table].") - Audit.log(LOG, s"Horizontal Delete Compaction operation completed for [$db.$table].") } } catch { http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala index 4224efa..f7f76b9 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.command +import scala.collection.JavaConverters._ import scala.language.implicitConversions import org.apache.spark.sql._ @@ -24,6 +25,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.processing.util.Auditor import org.apache.carbondata.spark.exception.ProcessMetaDataException object Checker { @@ -61,20 +64,71 @@ trait DataProcessOperation { } /** + * An utility that run the command with audit log + */ +trait Auditable { + // operation id that will be written in audit log + private val operationId: String = String.valueOf(System.nanoTime()) + + // extra info to be written in audit log, set by subclass of AtomicRunnableCommand + var auditInfo: Map[String, String] = _ + + // holds the dbName and tableName for which this command is executed for + // used for audit log, set by subclass of AtomicRunnableCommand + private var table: String = _ + + // implement by subclass, return the operation name that record in audit log + protected def opName: String + + protected def opTime(startTime: Long) = s"${System.currentTimeMillis() - startTime} ms" + + protected def setAuditTable(dbName: String, tableName: String): Unit = + table = s"$dbName.$tableName" + + protected def setAuditTable(carbonTable: CarbonTable): Unit = + table = s"${carbonTable.getDatabaseName}.${carbonTable.getTableName}" + + protected def setAuditInfo(map: Map[String, String]): Unit = auditInfo = map + + /** + * Run the passed command and record the audit log. + * Two audit log will be output, one for operation start another for operation success/failure + * @param runCmd command to run + * @param spark session + * @return command result + */ + protected def runWithAudit(runCmd: (SparkSession => Seq[Row]), spark: SparkSession): Seq[Row] = { + val start = System.currentTimeMillis() + Auditor.logOperationStart(opName, operationId) + val rows = try { + runCmd(spark) + } catch { + case e: Throwable => + val map = Map("Exception" -> e.getClass.getName, "Message" -> e.getMessage) + Auditor.logOperationEnd(opName, operationId, false, table, opTime(start), map.asJava) + throw e + } + Auditor.logOperationEnd(opName, operationId, true, table, opTime(start), + if (auditInfo != null) auditInfo.asJava else new java.util.HashMap[String, String]()) + rows + } +} + +/** * Command that modifies metadata(schema, table_status, etc) only without processing data */ -abstract class MetadataCommand extends RunnableCommand with MetadataProcessOpeation { +abstract class MetadataCommand extends RunnableCommand with MetadataProcessOpeation with Auditable { override def run(sparkSession: SparkSession): Seq[Row] = { - processMetadata(sparkSession) + runWithAudit(processMetadata, sparkSession) } } /** * Command that process data only without modifying metadata */ -abstract class DataCommand extends RunnableCommand with DataProcessOperation { +abstract class DataCommand extends RunnableCommand with DataProcessOperation with Auditable { override def run(sparkSession: SparkSession): Seq[Row] = { - processData(sparkSession) + runWithAudit(processData, sparkSession) } } @@ -84,17 +138,19 @@ abstract class DataCommand extends RunnableCommand with DataProcessOperation { * if process data failed. */ abstract class AtomicRunnableCommand - extends RunnableCommand with MetadataProcessOpeation with DataProcessOperation { + extends RunnableCommand with MetadataProcessOpeation with DataProcessOperation with Auditable { override def run(sparkSession: SparkSession): Seq[Row] = { - processMetadata(sparkSession) - try { - processData(sparkSession) - } catch { - case e: Exception => - undoMetadata(sparkSession, e) - throw e - } + runWithAudit(spark => { + processMetadata(spark) + try { + processData(spark) + } catch { + case e: Exception => + undoMetadata(spark, e) + throw e + } + }, sparkSession) } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala index 6c8b0b0..dcaac98 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala @@ -55,6 +55,8 @@ case class CarbonAlterTableAddHivePartitionCommand( override def processMetadata(sparkSession: SparkSession): Seq[Row] = { table = CarbonEnv.getCarbonTable(tableName)(sparkSession) + setAuditTable(table) + setAuditInfo(Map("partition" -> partitionSpecsAndLocs.mkString(", "))) if (table.isHivePartitionTable) { if (table.isChildDataMap) { throw new UnsupportedOperationException("Cannot add partition directly on aggregate tables") @@ -156,4 +158,5 @@ case class CarbonAlterTableAddHivePartitionCommand( Seq.empty[Row] } + override protected def opName: String = "ADD HIVE PARTITION" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala index 1e987b0..a4629f8 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala @@ -68,6 +68,8 @@ case class CarbonAlterTableDropHivePartitionCommand( override def processMetadata(sparkSession: SparkSession): Seq[Row] = { table = CarbonEnv.getCarbonTable(tableName)(sparkSession) + setAuditTable(table) + setAuditInfo(Map("partition" -> specs.mkString(","))) if (table.isHivePartitionTable) { var locks = List.empty[ICarbonLock] try { @@ -204,4 +206,5 @@ case class CarbonAlterTableDropHivePartitionCommand( Seq.empty[Row] } + override protected def opName: String = "DROP HIVE PARTITION" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala index c230322..d4a2d7f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala @@ -29,7 +29,6 @@ import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.util.AlterTableUtil import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.cache.CacheProvider import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment} @@ -58,6 +57,8 @@ case class CarbonAlterTableDropPartitionCommand( } val dbName = model.databaseName.getOrElse(sparkSession.catalog.currentDatabase) val tableName = model.tableName + setAuditTable(dbName, tableName) + setAuditInfo(Map("partition" -> model.partitionId)) val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession) .asInstanceOf[CarbonRelation] @@ -169,7 +170,6 @@ case class CarbonAlterTableDropPartitionCommand( LOGGER.info("Locks released after alter table drop partition action.") } LOGGER.info(s"Alter table drop partition is successful for table $dbName.$tableName") - Audit.log(LOGGER, s"Alter table drop partition is successful for table $dbName.$tableName") Seq.empty } @@ -178,8 +178,6 @@ case class CarbonAlterTableDropPartitionCommand( carbonLoadModel: CarbonLoadModel, dropWithData: Boolean, oldPartitionIds: List[Int]): Unit = { - Audit.log(LOGGER, s"Drop partition request received for table " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") try { startDropThreads( sqlContext, @@ -237,6 +235,8 @@ case class CarbonAlterTableDropPartitionCommand( } } } + + override protected def opName: String = "DROP CUSTOM PARTITION" } case class dropPartitionThread(sqlContext: SQLContext, http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala index 8b337c6..18c47a9 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala @@ -30,7 +30,6 @@ import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.util.{AlterTableUtil, PartitionUtils} import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.cache.CacheProvider import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.DataMapStoreManager @@ -62,6 +61,8 @@ case class CarbonAlterTableSplitPartitionCommand( val dbName = splitPartitionModel.databaseName.getOrElse(sparkSession.catalog.currentDatabase) val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore val tableName = splitPartitionModel.tableName + setAuditTable(dbName, tableName) + setAuditInfo(Map("partition" -> splitPartitionModel.partitionId)) val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession) .asInstanceOf[CarbonRelation] val tablePath = relation.carbonTable.getTablePath @@ -187,8 +188,6 @@ case class CarbonAlterTableSplitPartitionCommand( LOGGER.info("Locks released after alter table add/split partition action.") if (success) { LOGGER.info(s"Alter table add/split partition is successful for table $dbName.$tableName") - Audit.log(LOGGER, - s"Alter table add/split partition is successful for table $dbName.$tableName") } } Seq.empty @@ -200,8 +199,6 @@ case class CarbonAlterTableSplitPartitionCommand( carbonLoadModel: CarbonLoadModel, oldPartitionIdList: List[Int] ): Unit = { - Audit.log(LOGGER, s"Add partition request received for table " + - s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") try { startSplitThreads(sqlContext, carbonLoadModel, @@ -257,6 +254,8 @@ case class CarbonAlterTableSplitPartitionCommand( } } } + + override protected def opName: String = "SPLIT CUSTOM PARTITION" } case class SplitThread(sqlContext: SQLContext, http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonShowCarbonPartitionsCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonShowCarbonPartitionsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonShowCarbonPartitionsCommand.scala index 9419b00..2915981 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonShowCarbonPartitionsCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonShowCarbonPartitionsCommand.scala @@ -39,6 +39,7 @@ private[sql] case class CarbonShowCarbonPartitionsCommand( val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore .lookupRelation(tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation] val carbonTable = relation.carbonTable + setAuditTable(carbonTable) val partitionInfo = carbonTable.getPartitionInfo( carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName) if (partitionInfo == null) { @@ -51,4 +52,6 @@ private[sql] case class CarbonShowCarbonPartitionsCommand( LOGGER.info("partition column name:" + columnName) CommonUtil.getPartitionInfo(columnName, partitionType, partitionInfo) } + + override protected def opName: String = "SHOW CUSTOM PARTITION" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala index 1f1e7bd..719ed4a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala @@ -26,7 +26,6 @@ import org.apache.spark.util.AlterTableUtil import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.features.TableOperation import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl @@ -44,7 +43,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand( val tableName = alterTableAddColumnsModel.tableName val dbName = alterTableAddColumnsModel.databaseName .getOrElse(sparkSession.catalog.currentDatabase) - Audit.log(LOGGER, s"Alter table add columns request has been received for $dbName.$tableName") + setAuditTable(dbName, tableName) val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) var locks = List.empty[ICarbonLock] var timeStamp = 0L @@ -82,6 +81,8 @@ private[sql] case class CarbonAlterTableAddColumnCommand( wrapperTableInfo, carbonTable.getAbsoluteTableIdentifier, sparkSession.sparkContext).process + setAuditInfo(Map( + "newColumn" -> newCols.map(x => s"${x.getColumnName}:${x.getDataType}").mkString(","))) // generate dictionary files for the newly added columns new AlterTableAddColumnRDD(sparkSession, newCols, @@ -105,10 +106,8 @@ private[sql] case class CarbonAlterTableAddColumnCommand( carbonTable, alterTableAddColumnsModel) OperationListenerBus.getInstance.fireEvent(alterTablePostExecutionEvent, operationContext) LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName") - Audit.log(LOGGER, s"Alter table for add columns is successful for table $dbName.$tableName") } catch { case e: Exception => - LOGGER.error("Alter table add columns failed", e) if (newCols.nonEmpty) { LOGGER.info("Cleaning up the dictionary files as alter table add operation failed") new AlterTableDropColumnRDD(sparkSession, @@ -124,4 +123,6 @@ private[sql] case class CarbonAlterTableAddColumnCommand( } Seq.empty } + + override protected def opName: String = "ALTER TABLE ADD COLUMN" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala index 716b9c9..2bcd3aa 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala @@ -26,7 +26,6 @@ import org.apache.spark.util.AlterTableUtil import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.features.TableOperation import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl @@ -45,8 +44,10 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand( val tableName = alterTableDataTypeChangeModel.tableName val dbName = alterTableDataTypeChangeModel.databaseName .getOrElse(sparkSession.catalog.currentDatabase) - Audit.log(LOGGER, - s"Alter table change data type request has been received for $dbName.$tableName") + setAuditTable(dbName, tableName) + setAuditInfo(Map( + "column" -> alterTableDataTypeChangeModel.columnName, + "newType" -> alterTableDataTypeChangeModel.dataTypeInfo.dataType)) val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) var locks = List.empty[ICarbonLock] // get the latest carbon table and check for column existence @@ -70,16 +71,12 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand( val columnName = alterTableDataTypeChangeModel.columnName val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible) if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(columnName))) { - Audit.log(LOGGER, s"Alter table change data type request has failed. " + - s"Column $columnName does not exist") throwMetadataException(dbName, tableName, s"Column does not exist: $columnName") } val carbonColumn = carbonColumns.filter(_.getColName.equalsIgnoreCase(columnName)) if (carbonColumn.size == 1) { validateColumnDataType(alterTableDataTypeChangeModel.dataTypeInfo, carbonColumn.head) } else { - Audit.log(LOGGER, s"Alter table change data type request has failed. " + - s"Column $columnName is invalid") throwMetadataException(dbName, tableName, s"Invalid Column: $columnName") } // read the latest schema file @@ -118,11 +115,8 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand( alterTableDataTypeChangeModel) OperationListenerBus.getInstance.fireEvent(alterTablePostExecutionEvent, operationContext) LOGGER.info(s"Alter table for data type change is successful for table $dbName.$tableName") - Audit.log(LOGGER, - s"Alter table for data type change is successful for table $dbName.$tableName") } catch { case e: Exception => - LOGGER.error("Alter table change datatype failed : " + e.getMessage) if (carbonTable != null) { AlterTableUtil.revertDataTypeChanges(dbName, tableName, timeStamp)(sparkSession) } @@ -181,4 +175,6 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand( s"Only Int and Decimal data types are allowed for modification") } } + + override protected def opName: String = "ALTER TABLE CHANGE DATA TYPE" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala index d601ed6..ccf9e54 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala @@ -27,7 +27,6 @@ import org.apache.spark.util.AlterTableUtil import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.common.logging.impl.Audit import org.apache.carbondata.core.features.TableOperation import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl @@ -46,7 +45,8 @@ private[sql] case class CarbonAlterTableDropColumnCommand( val tableName = alterTableDropColumnModel.tableName val dbName = alterTableDropColumnModel.databaseName .getOrElse(sparkSession.catalog.currentDatabase) - Audit.log(LOGGER, s"Alter table drop columns request has been received for $dbName.$tableName") + setAuditTable(dbName, tableName) + setAuditInfo(Map("column" -> alterTableDropColumnModel.columns.mkString(", "))) var locks = List.empty[ICarbonLock] var timeStamp = 0L val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) @@ -161,10 +161,8 @@ private[sql] case class CarbonAlterTableDropColumnCommand( OperationListenerBus.getInstance().fireEvent(alterTableDropColumnPostEvent, operationContext) LOGGER.info(s"Alter table for drop columns is successful for table $dbName.$tableName") - Audit.log(LOGGER, s"Alter table for drop columns is successful for table $dbName.$tableName") } catch { case e: Exception => - LOGGER.error("Alter table drop columns failed : " + e.getMessage) if (carbonTable != null) { AlterTableUtil.revertDropColumnChanges(dbName, tableName, timeStamp)(sparkSession) } @@ -176,4 +174,6 @@ private[sql] case class CarbonAlterTableDropColumnCommand( } Seq.empty } + + override protected def opName: String = "ALTER TABLE DROP COLUMN" }