This is an automated email from the ASF dual-hosted git repository.
amishra pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sentry.git
The following commit(s) were added to refs/heads/master by this push:
new 677656f SENTRY-2301: Log where sentry stands in the snapshot fetching process, periodically
(Arjun Mishra reviewed by Na Li, Kalyan Kumar Kalvagadda)
677656f is described below
commit 677656fddacb628315f0c6aa8bbf1442be246917
Author: amishra <amishra@cloudera.com>
AuthorDate: Wed Feb 6 09:49:01 2019 -0600
SENTRY-2301: Log where sentry stands in the snapshot fetching process, periodically (Arjun
Mishra reviewed by Na Li, Kalyan Kumar Kalvagadda)
---
.../org/apache/sentry/hdfs/ServiceConstants.java | 3 +
.../sentry/api/service/thrift/SentryMetrics.java | 16 ++++
.../service/thrift/FullUpdateInitializer.java | 102 ++++++++++++++++-----
.../sentry/service/thrift/SentryHMSClient.java | 9 +-
4 files changed, 100 insertions(+), 30 deletions(-)
diff --git a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
index 2d21411..c98caea 100644
--- a/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
+++ b/sentry-hdfs/sentry-hdfs-common/src/main/java/org/apache/sentry/hdfs/ServiceConstants.java
@@ -39,6 +39,9 @@ public class ServiceConstants {
public static final String SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS
= "sentry.hdfs.sync.metastore.cache.retry.wait.duration.millis";
public static final int SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS_DEFAULT
= 1000;
+ public static final String SENTRY_HDFS_SYNC_METASTORE_CACHE_PRINT_SNAPSHOT_FETCH_INTERVAL_IN_MILLIS
= "sentry.hdfs.sync.metastore.cache.print-snapshot-fetch-interval.millis";
+ public static final int SENTRY_HDFS_SYNC_METASTORE_CACHE_PRINT_SNAPSHOT_FETCH_INTERVAL_IN_MILLIS_DEFAULT
= 300000;
+
public static final String SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC = "sentry.hdfs.sync.metastore.cache.max-partitions-per-rpc";
public static final int SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_PART_PER_RPC_DEFAULT = 100;
public static final String SENTRY_HDFS_SYNC_METASTORE_CACHE_MAX_TABLES_PER_RPC = "sentry.hdfs.sync.metastore.cache.max-tables-per-rpc";
diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/api/service/thrift/SentryMetrics.java
b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/api/service/thrift/SentryMetrics.java
index 534bb51..405629b 100644
--- a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/api/service/thrift/SentryMetrics.java
+++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/api/service/thrift/SentryMetrics.java
@@ -38,6 +38,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.sentry.provider.db.service.persistent.SentryStore;
import org.apache.sentry.provider.db.service.persistent.SentryStoreInterface;
+import org.apache.sentry.service.thrift.FullUpdateInitializer;
import org.apache.sentry.service.thrift.SentryService;
import org.apache.sentry.api.common.SentryServiceUtil;
import org.slf4j.Logger;
@@ -122,6 +123,21 @@ public final class SentryMetrics {
final Timer notificationProcessTimer = METRIC_REGISTRY.timer(
name(SentryPolicyStoreProcessor.class, "process-hsm-notification"));
+ public final Timer getFullHMSSnapshotTimer = METRIC_REGISTRY.timer(
+ name(FullUpdateInitializer.class, "fetch-full-snapshot"));
+
+ /** Total number of database objects */
+ public final Counter databaseCount = METRIC_REGISTRY.counter(
+ name(FullUpdateInitializer.class, "total", "db"));
+
+ /** Total number of table objects */
+ public final Counter tableCount = METRIC_REGISTRY.counter(
+ name(FullUpdateInitializer.class, "total", "tables"));
+
+ /** Total number of partition objects */
+ public final Counter partitionCount = METRIC_REGISTRY.counter(
+ name(FullUpdateInitializer.class, "total", "partitions"));
+
/**
* Return a Timer with name.
*/
diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java
b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java
index 4ff3dc9..d4bca42 100644
--- a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java
+++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/FullUpdateInitializer.java
@@ -17,7 +17,6 @@
*/
package org.apache.sentry.service.thrift;
-import com.codahale.metrics.Counter;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
@@ -50,8 +49,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import static com.codahale.metrics.MetricRegistry.name;
-
/**
* Manage fetching full snapshot from HMS.
* Snapshot is represented as a map from the hive object name to
@@ -104,6 +101,12 @@ public final class FullUpdateInitializer implements AutoCloseable {
private final Deque<Future<CallResult>> results = new ConcurrentLinkedDeque<>();
private final int maxRetries;
private final int waitDurationMillis;
+ private final long printSnapshotFetchTimeInterval;
+
+ //Objects count
+ private int totalNumberOfDatabasesFetched;
+ private int totalNumberOfTablesFetched;
+ private int totalNumberOfPartitionsFetched;
private static final Logger LOGGER = LoggerFactory.getLogger(FullUpdateInitializer.class);
@@ -111,18 +114,6 @@ public final class FullUpdateInitializer implements AutoCloseable {
new ObjectMapping(Collections.<String, Set<String>>emptyMap());
private final HiveConnectionFactory clientFactory;
- /** Total number of database objects */
- private final Counter databaseCount = SentryMetrics.getInstance()
- .getCounter(name(FullUpdateInitializer.class, "total", "db"));
-
- /** Total number of table objects */
- private final Counter tableCount = SentryMetrics.getInstance()
- .getCounter(name(FullUpdateInitializer.class, "total", "tables"));
-
- /** Total number of partition objects */
- private final Counter partitionCount = SentryMetrics.getInstance()
- .getCounter(name(FullUpdateInitializer.class, "total", "partitions"));
-
/**
* Extract path (not starting with "/") from the full URI
* @param uri - resource URI (usually with scheme)
@@ -277,7 +268,7 @@ public final class FullUpdateInitializer implements AutoCloseable {
private final List<String> partNames;
PartitionTask(String dbName, String tblName, String authName,
- List<String> partNames) {
+ List<String> partNames) {
this.dbName = safeIntern(dbName);
this.tblName = safeIntern(tblName);
this.authName = safeIntern(authName);
@@ -286,10 +277,14 @@ public final class FullUpdateInitializer implements AutoCloseable {
@Override
ObjectMapping doTask() throws Exception {
+
+ long startTime = System.currentTimeMillis();
List<Partition> tblParts;
HMSClient c = null;
+
try (HMSClient client = clientFactory.connect()) {
c = client;
+ LOGGER.debug("Fetching partition objects for db = {} table = {}", dbName, tblName);
tblParts = client.getClient().getPartitionsByNames(dbName, tblName, partNames);
} catch (Exception e) {
if (c != null) {
@@ -298,8 +293,7 @@ public final class FullUpdateInitializer implements AutoCloseable {
throw e;
}
- LOGGER.debug("Fetched partitions for db = {}, table = {}",
- dbName, tblName);
+ totalNumberOfPartitionsFetched += tblParts.size();
Collection<String> partitionNames = new ArrayList<>(tblParts.size());
for (Partition part : tblParts) {
@@ -312,6 +306,9 @@ public final class FullUpdateInitializer implements AutoCloseable {
LOGGER.info("Partition or its storage descriptor is null while fetching partitions
for db = {} table = {}", dbName, tblName);
}
}
+
+
+ LOGGER.debug("Completed partition task for db = {} table = {}. Current task size =
{}. Time Taken {} ms", dbName, tblName, results.size(), System.currentTimeMillis() - startTime);
return new ObjectMapping(authName, partitionNames);
}
}
@@ -328,12 +325,17 @@ public final class FullUpdateInitializer implements AutoCloseable {
@Override
@SuppressWarnings({"squid:S2629", "squid:S135"})
ObjectMapping doTask() throws Exception {
+
+ long startTime = System.currentTimeMillis();
HMSClient c = null;
+
try (HMSClient client = clientFactory.connect()) {
c = client;
- List<Table> tables = client.getClient().getTableObjectsByName(dbName, tableNames);
- LOGGER.debug("Fetching tables for db = {}, tables = {}", dbName, tableNames);
+ LOGGER.debug("Fetching table objects for db = {} tables count = {} tables = {}",
+ dbName, tableNames.size(), tableNames);
+ List<Table> tables = client.getClient().getTableObjectsByName(dbName, tableNames);
+ totalNumberOfTablesFetched += tables.size();
Map<String, Set<String>> objectMapping = new HashMap<>(tables.size());
for (Table tbl : tables) {
@@ -347,10 +349,15 @@ public final class FullUpdateInitializer implements AutoCloseable {
String tableName = safeIntern(tbl.getTableName().toLowerCase());
String authzObject = (dbName + "." + tableName).intern();
+
+ LOGGER.debug("Fetch all partition names for db = {} table = {}", dbName, tableName);
List<String> tblPartNames =
client.getClient().listPartitionNames(dbName, tableName, (short) -1);
+ LOGGER.info("For db = {} table = {} total number of partitions = {}",
+ dbName, tableName, tblPartNames.size());
+
// Count total number of partitions
- partitionCount.inc(tblPartNames.size());
+ SentryMetrics.getInstance().partitionCount.inc(tblPartNames.size());
for (int i = 0; i < tblPartNames.size(); i += maxPartitionsPerCall) {
List<String> partsToFetch = tblPartNames.subList(i,
Math.min(i + maxPartitionsPerCall, tblPartNames.size()));
@@ -358,6 +365,7 @@ public final class FullUpdateInitializer implements AutoCloseable {
tableName, authzObject, partsToFetch);
results.add(threadPool.submit(partTask));
}
+
String tblPath = safeIntern(pathFromURI(tbl.getSd().getLocation()));
if (tblPath == null) {
continue;
@@ -369,6 +377,10 @@ public final class FullUpdateInitializer implements AutoCloseable {
}
paths.add(tblPath);
}
+
+ LOGGER.debug("Completed table task for db = {} tables = {}. Current task size = {}.
Time Taken = {} ms",
+ dbName, tableNames, results.size(), System.currentTimeMillis() - startTime);
+
return new ObjectMapping(Collections.unmodifiableMap(objectMapping));
} catch (Exception e) {
if (c != null) {
@@ -386,29 +398,45 @@ public final class FullUpdateInitializer implements AutoCloseable {
DbTask(String dbName) {
//Database names are case insensitive
this.dbName = safeIntern(dbName.toLowerCase());
- databaseCount.inc();
}
@Override
ObjectMapping doTask() throws Exception {
+
+ long startTime = System.currentTimeMillis();
HMSClient c = null;
+
try (HMSClient client = clientFactory.connect()) {
c = client;
+
+ LOGGER.debug("Fetching database object for db = {}", dbName);
Database db = client.getClient().getDatabase(dbName);
+
+ totalNumberOfDatabasesFetched++;
+
if (!dbName.equalsIgnoreCase(db.getName())) {
LOGGER.warn("Database name {} does not match {}", db.getName(), dbName);
return emptyObjectMapping;
}
+
+ LOGGER.debug("Fetch all table names for db = {}", dbName);
List<String> allTblStr = client.getClient().getAllTables(dbName);
+ LOGGER.info("For db = {} total number of table names fetched = {}", dbName, allTblStr.size());
+
// Count total number of tables
- tableCount.inc(allTblStr.size());
+ SentryMetrics.getInstance().tableCount.inc(allTblStr.size());
for (int i = 0; i < allTblStr.size(); i += maxTablesPerCall) {
List<String> tablesToFetch = allTblStr.subList(i,
Math.min(i + maxTablesPerCall, allTblStr.size()));
Callable<CallResult> tableTask = new TableTask(db, tablesToFetch);
results.add(threadPool.submit(tableTask));
}
+
String dbPath = safeIntern(pathFromURI(db.getLocationUri()));
+
+ LOGGER.debug("Completed database task for db = {}. Current task size = {}. Time Taken
= {} ms",
+ dbName, results.size(), System.currentTimeMillis() - startTime);
+
return (dbPath != null) ? new ObjectMapping(dbName, dbPath) :
emptyObjectMapping;
} catch (Exception e) {
@@ -434,6 +462,9 @@ public final class FullUpdateInitializer implements AutoCloseable {
waitDurationMillis = conf.getInt(
ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS,
ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_RETRY_WAIT_DURAION_IN_MILLIS_DEFAULT);
+ printSnapshotFetchTimeInterval = conf.getInt(
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_PRINT_SNAPSHOT_FETCH_INTERVAL_IN_MILLIS,
+ ServerConfig.SENTRY_HDFS_SYNC_METASTORE_CACHE_PRINT_SNAPSHOT_FETCH_INTERVAL_IN_MILLIS_DEFAULT);
ThreadFactory fullUpdateInitThreadFactory = new ThreadFactoryBuilder()
.setNameFormat(FULL_UPDATE_INITIALIZER_THREAD_NAME)
@@ -459,7 +490,12 @@ public final class FullUpdateInitializer implements AutoCloseable {
HMSClient c = null;
try (HMSClient client = clientFactory.connect()) {
c = client;
+
+ LOGGER.debug("Fetch all db names");
allDbStr = client.getClient().getAllDatabases();
+ SentryMetrics.getInstance().databaseCount.inc(allDbStr.size());
+ LOGGER.info("Total number of db names fetched = {}", allDbStr.size());
+
} catch (Exception e) {
if (c != null) {
c.invalidate();
@@ -476,6 +512,7 @@ public final class FullUpdateInitializer implements AutoCloseable {
// Resulting full snapshot
Map<String, Collection<String>> fullSnapshot = new HashMap<>();
+ long printMessageTime = System.currentTimeMillis();
// As async tasks complete, merge their results into full snapshot.
while (!results.isEmpty()) {
// This is the only thread that takes elements off the results list - all other threads
@@ -502,6 +539,25 @@ public final class FullUpdateInitializer implements AutoCloseable {
}
existingSet.addAll(val);
}
+
+ if(System.currentTimeMillis() - printMessageTime > printSnapshotFetchTimeInterval)
{
+
+ long totalNumberOfDatabases = SentryMetrics.getInstance().databaseCount.getCount();
+ long totalNumberOfTables = SentryMetrics.getInstance().tableCount.getCount();
+ long totalNumberOfPartitions = SentryMetrics.getInstance().partitionCount.getCount();
+ long percentageDatabasesFetched = totalNumberOfDatabases > 0? totalNumberOfDatabasesFetched%totalNumberOfDatabases:0;
+ long percentageTablesFetched = totalNumberOfTables > 0? totalNumberOfTablesFetched%totalNumberOfTables:0;
+ long percentagePartitionsFetched = totalNumberOfPartitions > 0? totalNumberOfPartitionsFetched%totalNumberOfPartitions:0;
+
+ String snapshotFetchStatusString = String.format("Fetching full hms snapshot: databases
fetched=%d (%.2f%%); "
+ + "tables fetched=%d (%.2f%%); partitions fetched=%d (%.2f%%); total number of
databases=%d; "
+ + "total number of tables=%d total number of partitions=%d", totalNumberOfDatabasesFetched,
percentageDatabasesFetched,
+ totalNumberOfTablesFetched, percentageTablesFetched, totalNumberOfPartitionsFetched,
percentagePartitionsFetched,
+ totalNumberOfDatabases, totalNumberOfTables, totalNumberOfPartitions);
+
+ LOGGER.info(snapshotFetchStatusString);
+ printMessageTime = System.currentTimeMillis();
+ }
}
return fullSnapshot;
}
diff --git a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
index 4baeb67..5e222d9 100644
--- a/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
+++ b/sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/SentryHMSClient.java
@@ -19,7 +19,6 @@
package org.apache.sentry.service.thrift;
import com.codahale.metrics.Counter;
-import com.codahale.metrics.Timer;
import com.codahale.metrics.Timer.Context;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.exception.ExceptionUtils;
@@ -60,11 +59,7 @@ public class SentryHMSClient implements AutoCloseable {
private HiveMetaStoreClient client = null;
private HiveConnectionFactory hiveConnectionFactory;
- private static final String SNAPSHOT = "snapshot";
- /** Measures time to get full snapshot. */
- private final Timer updateTimer = SentryMetrics.getInstance()
- .getTimer(name(FullUpdateInitializer.class, SNAPSHOT));
- /** Number of times update failed. */
+ /** Number of times update failed. */
private final Counter failedSnapshotsCount = SentryMetrics.getInstance()
.getCounter(name(FullUpdateInitializer.class, "failed"));
@@ -249,7 +244,7 @@ public class SentryHMSClient implements AutoCloseable {
try (FullUpdateInitializer updateInitializer =
new FullUpdateInitializer(hiveConnectionFactory, conf);
- Context context = updateTimer.time()) {
+ Context context = SentryMetrics.getInstance().getFullHMSSnapshotTimer.time()) {
SentryStateBank.enableState(FullUpdateInitializerState.COMPONENT,FullUpdateInitializerState.FULL_SNAPSHOT_INPROGRESS);
Map<String, Collection<String>> pathsUpdate = updateInitializer.getFullHMSSnapshot();
logMessage = "Obtained full HMS snapshot";
|