Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6D1B5200C8E for ; Thu, 8 Jun 2017 13:51:05 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 6BB96160BD5; Thu, 8 Jun 2017 11:51:05 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 69410160BCA for ; Thu, 8 Jun 2017 13:51:03 +0200 (CEST) Received: (qmail 23402 invoked by uid 500); 8 Jun 2017 11:51:02 -0000 Mailing-List: contact commits-help@ambari.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: ambari-dev@ambari.apache.org Delivered-To: mailing list commits@ambari.apache.org Received: (qmail 23393 invoked by uid 99); 8 Jun 2017 11:51:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Jun 2017 11:51:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3ED87DFE5C; Thu, 8 Jun 2017 11:51:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: stoader@apache.org To: commits@ambari.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: ambari git commit: AMBARI-20687. Perf: Refactor ambari db-cleanup to include all big tables.(vbrodetskyi) Date: Thu, 8 Jun 2017 11:51:00 +0000 (UTC) archived-at: Thu, 08 Jun 2017 11:51:05 -0000 Repository: ambari Updated Branches: refs/heads/branch-2.5 24dcb1c85 -> c23ef5095 AMBARI-20687. Perf: Refactor ambari db-cleanup to include all big tables.(vbrodetskyi) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c23ef509 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c23ef509 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c23ef509 Branch: refs/heads/branch-2.5 Commit: c23ef5095bbfffe910115b1cab4017f21f377e0a Parents: 24dcb1c Author: Vitaly Brodetskyi Authored: Wed Apr 12 16:30:34 2017 +0300 Committer: Toader, Sebastian Committed: Thu Jun 8 11:51:51 2017 +0200 ---------------------------------------------------------------------- .../checks/DatabaseConsistencyCheckHelper.java | 111 ++++++++ .../apache/ambari/server/orm/DBAccessor.java | 6 + .../ambari/server/orm/DBAccessorImpl.java | 5 + .../server/orm/dao/HostRoleCommandDAO.java | 16 ++ .../ambari/server/orm/dao/RequestDAO.java | 260 ++++++++++++++++++- .../server/orm/dao/TopologyHostTaskDAO.java | 11 + .../orm/dao/TopologyLogicalRequestDAO.java | 12 + .../server/orm/dao/TopologyLogicalTaskDAO.java | 12 + .../orm/entities/ExecutionCommandEntity.java | 15 +- .../orm/entities/HostRoleCommandEntity.java | 10 +- .../server/orm/entities/RequestEntity.java | 6 + .../entities/RequestOperationLevelEntity.java | 4 +- .../entities/RequestResourceFilterEntity.java | 5 + .../orm/entities/RoleSuccessCriteriaEntity.java | 5 + .../ambari/server/orm/entities/StageEntity.java | 6 +- .../orm/entities/TopologyHostRequestEntity.java | 5 + .../orm/entities/TopologyHostTaskEntity.java | 15 +- .../entities/TopologyLogicalRequestEntity.java | 5 + .../orm/entities/TopologyLogicalTaskEntity.java | 30 ++- .../server/orm/entities/UpgradeEntity.java | 2 + .../server/orm/entities/UpgradeItemEntity.java | 5 + ambari-server/src/main/python/ambari-server.py | 10 +- .../src/main/python/ambari_server/dbCleanup.py | 37 +-- .../DatabaseConsistencyCheckHelperTest.java | 66 +++++ 24 files changed, 623 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java index 4513fac..70c3661 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelper.java @@ -24,6 +24,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; @@ -177,6 +178,7 @@ public class DatabaseConsistencyCheckHelper { checkHostComponentStatesCountEqualsHostComponentsDesiredStates(); checkServiceConfigs(); checkTopologyTables(); + checkForLargeTables(); LOG.info("******************************* Check database completed *******************************"); return checkResult; } @@ -269,6 +271,115 @@ public class DatabaseConsistencyCheckHelper { } /** + * This method checks if ambari database has tables with too big size (according to limit). + * First of all we are trying to get table size from schema information, but if it's not possible, + * we will get tables rows count and compare it with row count limit. + */ + static void checkForLargeTables() { + LOG.info("Checking for tables with large physical size"); + + ensureConnection(); + + DBAccessor.DbType dbType = dbAccessor.getDbType(); + String schemaName = dbAccessor.getDbSchema(); + + String GET_TABLE_SIZE_IN_BYTES_POSTGRESQL = "SELECT pg_total_relation_size('%s') \"Table Size\""; + String GET_TABLE_SIZE_IN_BYTES_MYSQL = "SELECT (data_length + index_length) \"Table Size\" FROM information_schema.TABLES WHERE table_schema = \"" + schemaName + "\" AND table_name =\"%s\""; + String GET_TABLE_SIZE_IN_BYTES_ORACLE = "SELECT bytes \"Table Size\" FROM user_segments WHERE segment_type='TABLE' AND segment_name='%s'"; + String GET_ROW_COUNT_QUERY = "SELECT COUNT(*) FROM %s"; + + Map tableSizeQueryMap = new HashMap<>(); + tableSizeQueryMap.put(DBAccessor.DbType.POSTGRES, GET_TABLE_SIZE_IN_BYTES_POSTGRESQL); + tableSizeQueryMap.put(DBAccessor.DbType.MYSQL, GET_TABLE_SIZE_IN_BYTES_MYSQL); + tableSizeQueryMap.put(DBAccessor.DbType.ORACLE, GET_TABLE_SIZE_IN_BYTES_ORACLE); + + List tablesToCheck = Arrays.asList("host_role_command", "execution_command", "stage", "request", "alert_history"); + + final double TABLE_SIZE_LIMIT_MB = 3000.0; + final int TABLE_ROW_COUNT_LIMIT = 3000000; + + String findTableSizeQuery = tableSizeQueryMap.get(dbType); + + if (dbType == DBAccessor.DbType.ORACLE) { + for (int i = 0;i < tablesToCheck.size(); i++) { + tablesToCheck.set(i, tablesToCheck.get(i).toUpperCase()); + } + } + + for (String tableName : tablesToCheck) { + + ResultSet rs = null; + Statement statement = null; + Double tableSizeInMB = null; + Long tableSizeInBytes = null; + int tableRowCount = -1; + + try { + statement = connection.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE); + rs = statement.executeQuery(String.format(findTableSizeQuery, tableName)); + if (rs != null) { + while (rs.next()) { + tableSizeInBytes = rs.getLong(1); + if (tableSizeInBytes != null) { + tableSizeInMB = tableSizeInBytes / 1024.0 / 1024.0; + } + } + } + + if (tableSizeInMB != null && tableSizeInMB > TABLE_SIZE_LIMIT_MB) { + warning("The database table {} is currently {} MB (limit is {}) and may impact performance. It is recommended " + + "that you reduce its size by executing \"ambari-server db-cleanup\".", + tableName, tableSizeInMB, TABLE_SIZE_LIMIT_MB); + } else if (tableSizeInMB != null && tableSizeInMB < TABLE_SIZE_LIMIT_MB) { + LOG.info(String.format("The database table %s is currently %.3f MB and is within normal limits (%.3f)", + tableName, tableSizeInMB, TABLE_SIZE_LIMIT_MB)); + } else { + throw new Exception(); + } + } catch (Exception e) { + LOG.error(String.format("Failed to get %s table size from database, will check row count: ", tableName), e); + try { + rs = statement.executeQuery(String.format(GET_ROW_COUNT_QUERY, tableName)); + if (rs != null) { + while (rs.next()) { + tableRowCount = rs.getInt(1); + } + } + + if (tableRowCount > TABLE_ROW_COUNT_LIMIT) { + warning("The database table {} currently has {} rows (limit is {}) and may impact performance. It is " + + "recommended that you reduce its size by executing \"ambari-server db-cleanup\".", + tableName, tableRowCount, TABLE_ROW_COUNT_LIMIT); + } else if (tableRowCount != -1 && tableRowCount < TABLE_ROW_COUNT_LIMIT) { + LOG.info(String.format("The database table %s currently has %d rows and is within normal limits (%d)", tableName, tableRowCount, TABLE_ROW_COUNT_LIMIT)); + } else { + throw new SQLException(); + } + } catch (SQLException ex) { + LOG.error(String.format("Failed to get %s row count: ", tableName), e); + } + } finally { + if (rs != null) { + try { + rs.close(); + } catch (SQLException e) { + LOG.error("Exception occurred during result set closing procedure: ", e); + } + } + + if (statement != null) { + try { + statement.close(); + } catch (SQLException e) { + LOG.error("Exception occurred during statement closing procedure: ", e); + } + } + } + } + + } + + /** * This method checks if any config type in clusterconfigmapping table, has * more than one versions selected. If config version is selected(in selected column = 1), * it means that this version of config is actual. So, if any config type has more http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java index fac524c..c637c05 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessor.java @@ -663,6 +663,12 @@ public interface DBAccessor { DbType getDbType(); /** + * Get database schema name + * @return @dbSchema + */ + String getDbSchema(); + + /** * Capture column type */ class DBColumnInfo { http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java index c5b116c..0e2237c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/DBAccessorImpl.java @@ -233,6 +233,11 @@ public class DBAccessorImpl implements DBAccessor { } @Override + public String getDbSchema() { + return dbSchema; + } + + @Override public boolean tableHasData(String tableName) throws SQLException { String query = "SELECT count(*) from " + tableName; Statement statement = getConnection().createStatement(); http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java index 7582957..b9e1fab 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/HostRoleCommandDAO.java @@ -932,4 +932,20 @@ public class HostRoleCommandDAO { return HostRoleCommandEntity_.getPredicateMapping().get(propertyId); } } + + public List findTaskIdsByRequestStageIds(List requestStageIds) { + EntityManager entityManager = entityManagerProvider.get(); + List taskIds = new ArrayList(); + for (RequestDAO.StageEntityPK requestIds : requestStageIds) { + TypedQuery hostRoleCommandQuery = + entityManager.createNamedQuery("HostRoleCommandEntity.findTaskIdsByRequestStageIds", Long.class); + + hostRoleCommandQuery.setParameter("requestId", requestIds.getRequestId()); + hostRoleCommandQuery.setParameter("stageId", requestIds.getStageId()); + + taskIds.addAll(daoUtils.selectList(hostRoleCommandQuery)); + } + + return taskIds; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java index 1c4d0a3..38c0977 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/RequestDAO.java @@ -19,27 +19,54 @@ package org.apache.ambari.server.orm.dao; import java.text.MessageFormat; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; +import java.util.Set; import javax.persistence.EntityManager; import javax.persistence.TypedQuery; +import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.actionmanager.HostRoleStatus; +import org.apache.ambari.server.cleanup.TimeBasedCleanupPolicy; import org.apache.ambari.server.orm.RequiresSession; +import org.apache.ambari.server.orm.entities.ExecutionCommandEntity; +import org.apache.ambari.server.orm.entities.HostRoleCommandEntity; import org.apache.ambari.server.orm.entities.RequestEntity; +import org.apache.ambari.server.orm.entities.RequestOperationLevelEntity; import org.apache.ambari.server.orm.entities.RequestResourceFilterEntity; +import org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntity; +import org.apache.ambari.server.orm.entities.StageEntity; +import org.apache.ambari.server.orm.entities.TopologyHostRequestEntity; +import org.apache.ambari.server.orm.entities.TopologyHostTaskEntity; +import org.apache.ambari.server.orm.entities.TopologyLogicalTaskEntity; +import org.apache.ambari.server.state.Clusters; import org.eclipse.persistence.config.HintValues; import org.eclipse.persistence.config.QueryHints; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.google.inject.Inject; import com.google.inject.Provider; import com.google.inject.Singleton; import com.google.inject.persist.Transactional; @Singleton -public class RequestDAO { +public class RequestDAO implements Cleanable { + + private static final Logger LOG = LoggerFactory.getLogger(RequestDAO.class); + + + private static final int BATCH_SIZE = 999; + /** * SQL template to retrieve all request IDs, sorted by the ID. */ @@ -64,6 +91,27 @@ public class RequestDAO { @Inject DaoUtils daoUtils; + @Inject + private Provider m_clusters; + + @Inject + private HostRoleCommandDAO hostRoleCommandDAO; + + @Inject + private StageDAO stageDAO; + + @Inject + private TopologyLogicalTaskDAO topologyLogicalTaskDAO; + + @Inject + private TopologyHostTaskDAO topologyHostTaskDAO; + + @Inject + private TopologyLogicalRequestDAO topologyLogicalRequestDAO; + + @Inject + private TopologyRequestDAO topologyRequestDAO; + @RequiresSession public RequestEntity findByPK(Long requestId) { return entityManagerProvider.get().find(RequestEntity.class, requestId); @@ -189,4 +237,214 @@ public class RequestDAO { return daoUtils.selectList(query); } + + public static final class StageEntityPK { + private Long requestId; + private Long stageId; + + public StageEntityPK(Long requestId, Long stageId) { + this.requestId = requestId; + this.stageId = stageId; + } + + public Long getStageId() { + return stageId; + } + + public void setStageId(Long stageId) { + this.stageId = stageId; + } + + public Long getRequestId() { + return requestId; + } + + public void setRequestId(Long requestId) { + this.requestId = requestId; + } + } + + /** + * Search for all request ids in Upgrade table + * @return the list of request ids + */ + private List findAllRequestIdsFromUpgrade() { + EntityManager entityManager = entityManagerProvider.get(); + TypedQuery upgradeQuery = + entityManager.createNamedQuery("UpgradeEntity.findAllRequestIds", Long.class); + + return daoUtils.selectList(upgradeQuery); + } + + /** + * Search for all request and stage ids in Request and Stage tables + * @return the list of request/stage ids + */ + public List findRequestAndStageIdsInClusterBeforeDate(Long clusterId, long beforeDateMillis) { + EntityManager entityManager = entityManagerProvider.get(); + TypedQuery requestQuery = + entityManager.createNamedQuery("RequestEntity.findRequestStageIdsInClusterBeforeDate", StageEntityPK.class); + + requestQuery.setParameter("clusterId", clusterId); + requestQuery.setParameter("beforeDate", beforeDateMillis); + + return daoUtils.selectList(requestQuery); + } + + /** + * In this method we are removing entities using passed ids, + * To prevent issues we are using batch request to remove limited + * count of entities. + * @param ids list of ids that we are using to remove rows from table + * @param paramName name of parameter that we are using in sql query (taskIds, stageIds) + * @param entityName name of entity which we will remove + * @param beforeDateMillis timestamp which was set by user (remove all entities that were created before), + * we are using it only for logging + * @param entityQuery name of NamedQuery which we will use to remove needed entities + * @param type type of entity class which we will use for casting query result + * @return rows count that were removed + */ + @Transactional + protected int cleanTableByIds(Set ids, String paramName, String entityName, Long beforeDateMillis, + String entityQuery, Class type) { + LOG.info(String.format("Deleting %s entities before date %s", entityName, new Date(beforeDateMillis))); + EntityManager entityManager = entityManagerProvider.get(); + int affectedRows = 0; + // Batch delete + TypedQuery query = entityManager.createNamedQuery(entityQuery, type); + if (ids != null && !ids.isEmpty()) { + for (int i = 0; i < ids.size(); i += BATCH_SIZE) { + int endRow = (i + BATCH_SIZE) > ids.size() ? ids.size() : (i + BATCH_SIZE); + List idsSubList = new ArrayList<>(ids).subList(i, endRow); + LOG.info("Deleting " + entityName + " entity batch with task ids: " + + idsSubList.get(0) + " - " + idsSubList.get(idsSubList.size() - 1)); + query.setParameter(paramName, idsSubList); + affectedRows += query.executeUpdate(); + } + } + + return affectedRows; + } + + /** + * In this method we are removing entities using passed few ids, + * To prevent issues we are using batch request to remove limited + * count of entities. + * @param ids list of ids pairs that we are using to remove rows from table + * @param paramNames list of two names of parameters that we are using in sql query (taskIds, stageIds) + * @param entityName name of entity which we will remove + * @param beforeDateMillis timestamp which was set by user (remove all entities that were created before), + * we are using it only for logging + * @param entityQuery name of NamedQuery which we will use to remove needed entities + * @param type type of entity class which we will use for casting query result + * @return rows count that were removed + */ + @Transactional + protected int cleanTableByStageEntityPK(List ids, LinkedList paramNames, String entityName, Long beforeDateMillis, + String entityQuery, Class type) { + LOG.info(String.format("Deleting %s entities before date %s", entityName, new Date(beforeDateMillis))); + EntityManager entityManager = entityManagerProvider.get(); + int affectedRows = 0; + // Batch delete + TypedQuery query = entityManager.createNamedQuery(entityQuery, type); + if (ids != null && !ids.isEmpty()) { + for (int i = 0; i < ids.size(); i += BATCH_SIZE) { + int endRow = (i + BATCH_SIZE) > ids.size() ? ids.size() : (i + BATCH_SIZE); + List idsSubList = new ArrayList<>(ids).subList(i, endRow); + LOG.info("Deleting " + entityName + " entity batch with task ids: " + + idsSubList.get(0) + " - " + idsSubList.get(idsSubList.size() - 1)); + for (StageEntityPK requestIds : idsSubList) { + query.setParameter(paramNames.get(0), requestIds.getStageId()); + query.setParameter(paramNames.get(1), requestIds.getRequestId()); + affectedRows += query.executeUpdate(); + } + } + } + + return affectedRows; + } + + @Transactional + @Override + public long cleanup(TimeBasedCleanupPolicy policy) { + long affectedRows = 0; + Long clusterId = null; + try { + clusterId = m_clusters.get().getCluster(policy.getClusterName()).getClusterId(); + // find request and stage ids that were created before date populated by user. + List requestStageIds = findRequestAndStageIdsInClusterBeforeDate(clusterId, policy.getToDateInMillis()); + + // find request ids from Upgrade table and exclude these ids from + // request ids set that we already have. We don't want to make any changes for upgrade + Set requestIdsFromUpgrade = Sets.newHashSet(findAllRequestIdsFromUpgrade()); + Iterator requestStageIdsIterator = requestStageIds.iterator(); + while (requestStageIdsIterator.hasNext()) { + StageEntityPK nextRequestStageIds = requestStageIdsIterator.next(); + if (requestIdsFromUpgrade.contains(nextRequestStageIds.getRequestId())) { + requestStageIdsIterator.remove(); + } + } + + + Set requestIds = new HashSet<>(); + for (StageEntityPK ids : requestStageIds) { + requestIds.add(ids.getRequestId()); + } + + // find task ids using request stage ids + Set taskIds = Sets.newHashSet(hostRoleCommandDAO.findTaskIdsByRequestStageIds(requestStageIds)); + LinkedList params = new LinkedList<>(); + params.add("stageId"); + params.add("requestId"); + + // find host task ids, to find related host requests and also to remove needed host tasks + List hostTaskIds = new ArrayList<>(); + if (taskIds != null && !taskIds.isEmpty()) { + hostTaskIds = topologyLogicalTaskDAO.findHostTaskIdsByPhysicalTaskIds(Lists.newArrayList(taskIds)); + } + + // find host request ids by host task ids to remove later needed host requests + List hostRequestIds = new ArrayList<>(); + if (!hostTaskIds.isEmpty()) { + hostRequestIds = topologyHostTaskDAO.findHostRequestIdsByHostTaskIds(hostTaskIds); + } + + List topologyRequestIds = new ArrayList<>(); + if (!hostRequestIds.isEmpty()) { + topologyRequestIds = topologyLogicalRequestDAO.findRequestIdsByIds(hostRequestIds); + } + + + //removing all entities one by one according to their relations using stage, task and request ids + affectedRows += cleanTableByIds(taskIds, "taskIds", "ExecutionCommand", policy.getToDateInMillis(), + "ExecutionCommandEntity.removeByTaskIds", ExecutionCommandEntity.class); + affectedRows += cleanTableByIds(taskIds, "taskIds", "TopologyLogicalTask", policy.getToDateInMillis(), + "TopologyLogicalTaskEntity.removeByPhysicalTaskIds", TopologyLogicalTaskEntity.class); + affectedRows += cleanTableByIds(Sets.newHashSet(hostTaskIds), "hostTaskIds", "TopologyHostTask", policy.getToDateInMillis(), + "TopologyHostTaskEntity.removeByTaskIds", TopologyHostTaskEntity.class); + affectedRows += cleanTableByIds(Sets.newHashSet(hostRequestIds), "hostRequestIds", "TopologyHostRequest", policy.getToDateInMillis(), + "TopologyHostRequestEntity.removeByIds", TopologyHostRequestEntity.class); + for (Long topologyRequestId : topologyRequestIds) { + topologyRequestDAO.removeByPK(topologyRequestId); + } + affectedRows += cleanTableByIds(taskIds, "taskIds", "HostRoleCommand", policy.getToDateInMillis(), + "HostRoleCommandEntity.removeByTaskIds", HostRoleCommandEntity.class); + affectedRows += cleanTableByStageEntityPK(requestStageIds, params, "RoleSuccessCriteria", policy.getToDateInMillis(), + "RoleSuccessCriteriaEntity.removeByRequestStageIds", RoleSuccessCriteriaEntity.class); + affectedRows += cleanTableByStageEntityPK(requestStageIds, params, "Stage", policy.getToDateInMillis(), + "StageEntity.removeByRequestStageIds", StageEntity.class); + affectedRows += cleanTableByIds(requestIds, "requestIds", "RequestResourceFilter", policy.getToDateInMillis(), + "RequestResourceFilterEntity.removeByRequestIds", RequestResourceFilterEntity.class); + affectedRows += cleanTableByIds(requestIds, "requestIds", "RequestOperationLevel", policy.getToDateInMillis(), + "RequestOperationLevelEntity.removeByRequestIds", RequestOperationLevelEntity.class); + affectedRows += cleanTableByIds(requestIds, "requestIds", "Request", policy.getToDateInMillis(), + "RequestEntity.removeByRequestIds", RequestEntity.class); + + } catch (AmbariException e) { + LOG.error("Error while looking up cluster with name: {}", policy.getClusterName(), e); + throw new IllegalStateException(e); + } + + return affectedRows; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java index 85a4f5f..eea8032 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyHostTaskDAO.java @@ -52,6 +52,17 @@ public class TopologyHostTaskDAO { } @RequiresSession + public List findHostRequestIdsByHostTaskIds(List hostTaskIds) { + EntityManager entityManager = entityManagerProvider.get(); + TypedQuery topologyHostTaskQuery = + entityManager.createNamedQuery("TopologyLogicalTaskEntity.findHostRequestIdsByHostTaskIds", Long.class); + + topologyHostTaskQuery.setParameter("hostTaskIds", hostTaskIds); + + return daoUtils.selectList(topologyHostTaskQuery); + } + + @RequiresSession public List findAll() { return daoUtils.selectAll(entityManagerProvider.get(), TopologyHostTaskEntity.class); } http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java index e6dcb69..32a38da 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalRequestDAO.java @@ -24,6 +24,7 @@ import com.google.inject.persist.Transactional; import org.apache.ambari.server.orm.RequiresSession; import org.apache.ambari.server.orm.entities.TopologyLogicalRequestEntity; import javax.persistence.EntityManager; +import javax.persistence.TypedQuery; import java.util.List; @Singleton @@ -58,4 +59,15 @@ public class TopologyLogicalRequestDAO { public void remove(TopologyLogicalRequestEntity requestEntity) { entityManagerProvider.get().remove(requestEntity); } + + @RequiresSession + public List findRequestIdsByIds(List ids) { + EntityManager entityManager = entityManagerProvider.get(); + TypedQuery topologyLogicalRequestQuery = + entityManager.createNamedQuery("TopologyLogicalRequestEntity.findRequestIds", Long.class); + + topologyLogicalRequestQuery.setParameter("ids", ids); + + return daoUtils.selectList(topologyLogicalRequestQuery); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java index f0331cc..3a72aed 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/TopologyLogicalTaskDAO.java @@ -25,6 +25,7 @@ import org.apache.ambari.server.orm.RequiresSession; import org.apache.ambari.server.orm.entities.TopologyLogicalTaskEntity; import javax.persistence.EntityManager; +import javax.persistence.TypedQuery; import java.util.List; @Singleton @@ -41,6 +42,17 @@ public class TopologyLogicalTaskDAO { } @RequiresSession + public List findHostTaskIdsByPhysicalTaskIds(List physicalTaskIds) { + EntityManager entityManager = entityManagerProvider.get(); + TypedQuery topologyHostTaskQuery = + entityManager.createNamedQuery("TopologyLogicalTaskEntity.findHostTaskIdsByPhysicalTaskIds", Long.class); + + topologyHostTaskQuery.setParameter("physicalTaskIds", physicalTaskIds); + + return daoUtils.selectList(topologyHostTaskQuery); + } + + @RequiresSession public List findAll() { return daoUtils.selectAll(entityManagerProvider.get(), TopologyLogicalTaskEntity.class); } http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ExecutionCommandEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ExecutionCommandEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ExecutionCommandEntity.java index 25d830b..7015709 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ExecutionCommandEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/ExecutionCommandEntity.java @@ -18,11 +18,24 @@ package org.apache.ambari.server.orm.entities; -import javax.persistence.*; import java.util.Arrays; +import javax.persistence.Basic; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.JoinColumn; +import javax.persistence.Lob; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; +import javax.persistence.OneToOne; +import javax.persistence.Table; + @Table(name = "execution_command") @Entity +@NamedQueries({ + @NamedQuery(name = "ExecutionCommandEntity.removeByTaskIds", query = "DELETE FROM ExecutionCommandEntity command WHERE command.taskId IN :taskIds") +}) public class ExecutionCommandEntity { @Id http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java index 3d946f5..7ac60a9 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/HostRoleCommandEntity.java @@ -60,6 +60,7 @@ import org.apache.commons.lang.ArrayUtils; , initialValue = 1 ) @NamedQueries({ + @NamedQuery(name = "HostRoleCommandEntity.findTaskIdsByRequestStageIds", query = "SELECT command.taskId FROM HostRoleCommandEntity command WHERE command.stageId = :stageId AND command.requestId = :requestId"), @NamedQuery(name = "HostRoleCommandEntity.findCountByCommandStatuses", query = "SELECT COUNT(command.taskId) FROM HostRoleCommandEntity command WHERE command.status IN :statuses"), @NamedQuery(name = "HostRoleCommandEntity.findByRequestIdAndStatuses", query="SELECT task FROM HostRoleCommandEntity task WHERE task.requestId=:requestId AND task.status IN :statuses ORDER BY task.taskId ASC"), @NamedQuery(name = "HostRoleCommandEntity.findTasksByStatusesOrderByIdDesc", query = "SELECT task FROM HostRoleCommandEntity task WHERE task.requestId = :requestId AND task.status IN :statuses ORDER BY task.taskId DESC"), @@ -71,12 +72,9 @@ import org.apache.commons.lang.ArrayUtils; @NamedQuery(name = "HostRoleCommandEntity.findByStatusBetweenStages", query = "SELECT command FROM HostRoleCommandEntity command WHERE command.requestId = :requestId AND command.stageId >= :minStageId AND command.stageId <= :maxStageId AND command.status = :status"), @NamedQuery(name = "HostRoleCommandEntity.updateAutoSkipExcludeRoleCommand", query = "UPDATE HostRoleCommandEntity command SET command.autoSkipOnFailure = :autoSkipOnFailure WHERE command.requestId = :requestId AND command.roleCommand <> :roleCommand"), @NamedQuery(name = "HostRoleCommandEntity.updateAutoSkipForRoleCommand", query = "UPDATE HostRoleCommandEntity command SET command.autoSkipOnFailure = :autoSkipOnFailure WHERE command.requestId = :requestId AND command.roleCommand = :roleCommand"), - @NamedQuery( - name = "HostRoleCommandEntity.findHostsByCommandStatus", - query = "SELECT DISTINCT(host.hostName) FROM HostRoleCommandEntity command, HostEntity host WHERE (command.requestId >= :iLowestRequestIdInProgress AND command.requestId <= :iHighestRequestIdInProgress) AND command.status IN :statuses AND command.hostId = host.hostId AND host.hostName IS NOT NULL"), - @NamedQuery( - name = "HostRoleCommandEntity.getBlockingHostsForRequest", - query = "SELECT DISTINCT(host.hostName) FROM HostRoleCommandEntity command, HostEntity host WHERE command.requestId >= :lowerRequestIdInclusive AND command.requestId < :upperRequestIdExclusive AND command.status IN :statuses AND command.isBackgroundCommand=0 AND command.hostId = host.hostId AND host.hostName IS NOT NULL") + @NamedQuery(name = "HostRoleCommandEntity.removeByTaskIds", query = "DELETE FROM HostRoleCommandEntity command WHERE command.taskId IN :taskIds"), + @NamedQuery(name = "HostRoleCommandEntity.findHostsByCommandStatus", query = "SELECT DISTINCT(host.hostName) FROM HostRoleCommandEntity command, HostEntity host WHERE (command.requestId >= :iLowestRequestIdInProgress AND command.requestId <= :iHighestRequestIdInProgress) AND command.status IN :statuses AND command.hostId = host.hostId AND host.hostName IS NOT NULL"), + @NamedQuery(name = "HostRoleCommandEntity.getBlockingHostsForRequest", query = "SELECT DISTINCT(host.hostName) FROM HostRoleCommandEntity command, HostEntity host WHERE command.requestId >= :lowerRequestIdInclusive AND command.requestId < :upperRequestIdExclusive AND command.status IN :statuses AND command.isBackgroundCommand=0 AND command.hostId = host.hostId AND host.hostName IS NOT NULL") }) public class HostRoleCommandEntity { http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java index e46bb51..45fb631 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestEntity.java @@ -32,6 +32,8 @@ import javax.persistence.Id; import javax.persistence.JoinColumn; import javax.persistence.Lob; import javax.persistence.ManyToOne; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; import javax.persistence.OneToMany; import javax.persistence.OneToOne; import javax.persistence.Table; @@ -39,6 +41,10 @@ import java.util.Collection; @Table(name = "request") @Entity +@NamedQueries({ + @NamedQuery(name = "RequestEntity.findRequestStageIdsInClusterBeforeDate", query = "SELECT NEW org.apache.ambari.server.orm.dao.RequestDAO.StageEntityPK(request.requestId, stage.stageId) FROM RequestEntity request JOIN StageEntity stage ON request.requestId = stage.requestId WHERE request.clusterId = :clusterId AND request.createTime <= :beforeDate"), + @NamedQuery(name = "RequestEntity.removeByRequestIds", query = "DELETE FROM RequestEntity request WHERE request.requestId IN :requestIds") +}) public class RequestEntity { @Column(name = "request_id") http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestOperationLevelEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestOperationLevelEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestOperationLevelEntity.java index c03816e..64af92a 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestOperationLevelEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestOperationLevelEntity.java @@ -42,7 +42,9 @@ import javax.persistence.TableGenerator; @NamedQueries({ @NamedQuery(name = "requestOperationLevelByHostId", query = "SELECT requestOperationLevel FROM RequestOperationLevelEntity requestOperationLevel " + - "WHERE requestOperationLevel.hostId=:hostId") + "WHERE requestOperationLevel.hostId=:hostId"), + @NamedQuery(name = "RequestOperationLevelEntity.removeByRequestIds", + query = "DELETE FROM RequestOperationLevelEntity requestOperationLevel WHERE requestOperationLevel.requestId IN :requestIds") }) public class RequestOperationLevelEntity { http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestResourceFilterEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestResourceFilterEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestResourceFilterEntity.java index 8ee41d2..9597db1 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestResourceFilterEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RequestResourceFilterEntity.java @@ -26,6 +26,8 @@ import javax.persistence.Id; import javax.persistence.JoinColumn; import javax.persistence.Lob; import javax.persistence.ManyToOne; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; import javax.persistence.Table; import javax.persistence.TableGenerator; @@ -36,6 +38,9 @@ import javax.persistence.TableGenerator; , pkColumnValue = "resourcefilter_id_seq" , initialValue = 1 ) +@NamedQueries({ + @NamedQuery(name = "RequestResourceFilterEntity.removeByRequestIds", query = "DELETE FROM RequestResourceFilterEntity filter WHERE filter.requestId IN :requestIds") +}) public class RequestResourceFilterEntity { @Id http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RoleSuccessCriteriaEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RoleSuccessCriteriaEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RoleSuccessCriteriaEntity.java index 3386c24..66e7fd8 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RoleSuccessCriteriaEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/RoleSuccessCriteriaEntity.java @@ -26,6 +26,8 @@ import javax.persistence.IdClass; import javax.persistence.JoinColumn; import javax.persistence.JoinColumns; import javax.persistence.ManyToOne; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; import javax.persistence.Table; import org.apache.ambari.server.Role; @@ -33,6 +35,9 @@ import org.apache.ambari.server.Role; @IdClass(org.apache.ambari.server.orm.entities.RoleSuccessCriteriaEntityPK.class) @Table(name = "role_success_criteria") @Entity +@NamedQueries({ + @NamedQuery(name = "RoleSuccessCriteriaEntity.removeByRequestStageIds", query = "DELETE FROM RoleSuccessCriteriaEntity criteria WHERE criteria.stageId = :stageId AND criteria.requestId = :requestId") +}) public class RoleSuccessCriteriaEntity { @Id http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java index aeafda0..3b755f6 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/StageEntity.java @@ -49,7 +49,11 @@ import org.apache.ambari.server.actionmanager.CommandExecutionType; query = "SELECT stage.requestId, MIN(stage.stageId) from StageEntity stage, HostRoleCommandEntity hrc WHERE hrc.status IN :statuses AND hrc.stageId = stage.stageId AND hrc.requestId = stage.requestId GROUP by stage.requestId ORDER BY stage.requestId"), @NamedQuery( name = "StageEntity.findByRequestIdAndCommandStatuses", - query = "SELECT stage from StageEntity stage WHERE stage.stageId IN (SELECT roleCommand.stageId from HostRoleCommandEntity roleCommand WHERE roleCommand.requestId = :requestId AND roleCommand.status IN :statuses AND roleCommand.stageId = stage.stageId AND roleCommand.requestId = stage.requestId ) ORDER BY stage.stageId") }) + query = "SELECT stage from StageEntity stage WHERE stage.stageId IN (SELECT roleCommand.stageId from HostRoleCommandEntity roleCommand WHERE roleCommand.requestId = :requestId AND roleCommand.status IN :statuses AND roleCommand.stageId = stage.stageId AND roleCommand.requestId = stage.requestId ) ORDER BY stage.stageId"), + @NamedQuery( + name = "StageEntity.removeByRequestStageIds", + query = "DELETE FROM StageEntity stage WHERE stage.stageId = :stageId AND stage.requestId = :requestId") +}) public class StageEntity { @Basic http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java index 4e05ea1..7abbd51 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostRequestEntity.java @@ -25,6 +25,8 @@ import javax.persistence.GenerationType; import javax.persistence.Id; import javax.persistence.JoinColumn; import javax.persistence.ManyToOne; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; import javax.persistence.OneToMany; import javax.persistence.Table; import javax.persistence.TableGenerator; @@ -32,6 +34,9 @@ import java.util.Collection; @Entity @Table(name = "topology_host_request") +@NamedQueries({ + @NamedQuery(name = "TopologyHostRequestEntity.removeByIds", query = "DELETE FROM TopologyHostRequestEntity topologyHostRequest WHERE topologyHostRequest.id IN :hostRequestIds") +}) public class TopologyHostRequestEntity { @Id // @GeneratedValue(strategy = GenerationType.TABLE, generator = "topology_host_request_id_generator") http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java index 49d3a97..37830b7 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyHostTaskEntity.java @@ -40,7 +40,11 @@ import java.util.Collection; pkColumnValue = "topology_host_task_id_seq", initialValue = 0) @NamedQueries({ @NamedQuery(name = "TopologyHostTaskEntity.findByHostRequest", - query = "SELECT req FROM TopologyHostTaskEntity req WHERE req.topologyHostRequestEntity.id = :hostRequestId") + query = "SELECT req FROM TopologyHostTaskEntity req WHERE req.topologyHostRequestEntity.id = :hostRequestId"), + @NamedQuery(name = "TopologyLogicalTaskEntity.findHostRequestIdsByHostTaskIds", + query = "SELECT tht.hostRequestId from TopologyHostTaskEntity tht WHERE tht.id IN :hostTaskIds"), + @NamedQuery(name = "TopologyHostTaskEntity.removeByTaskIds", + query = "DELETE FROM TopologyHostTaskEntity tht WHERE tht.id IN :hostTaskIds") }) public class TopologyHostTaskEntity { @Id @@ -51,6 +55,9 @@ public class TopologyHostTaskEntity { @Column(name = "type", length = 255, nullable = false) private String type; + @Column(name = "host_request_id", nullable = false, insertable = false, updatable = false) + private Long hostRequestId; + @ManyToOne @JoinColumn(name = "host_request_id", referencedColumnName = "id", nullable = false) private TopologyHostRequestEntity topologyHostRequestEntity; @@ -67,7 +74,11 @@ public class TopologyHostTaskEntity { } public Long getHostRequestId() { - return topologyHostRequestEntity != null ? topologyHostRequestEntity.getId() : null; + return hostRequestId; + } + + public void setHostRequestId(Long hostRequestId) { + this.hostRequestId = hostRequestId; } public String getType() { http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java index 4d255b2..1536b80 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalRequestEntity.java @@ -25,6 +25,8 @@ import javax.persistence.GenerationType; import javax.persistence.Id; import javax.persistence.JoinColumn; import javax.persistence.ManyToOne; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; import javax.persistence.OneToMany; import javax.persistence.OneToOne; import javax.persistence.Table; @@ -33,6 +35,9 @@ import java.util.Collection; @Entity @Table(name = "topology_logical_request") +@NamedQueries({ + @NamedQuery(name = "TopologyLogicalRequestEntity.findRequestIds", query = "SELECT logicalrequest.topologyRequestId from TopologyLogicalRequestEntity logicalrequest WHERE logicalrequest.id IN :ids") +}) public class TopologyLogicalRequestEntity { @Id @Column(name = "id", nullable = false, updatable = false) http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java index c71d4e4..2954863 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/TopologyLogicalTaskEntity.java @@ -24,6 +24,8 @@ import javax.persistence.GenerationType; import javax.persistence.Id; import javax.persistence.JoinColumn; import javax.persistence.ManyToOne; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; import javax.persistence.OneToOne; import javax.persistence.Table; import javax.persistence.TableGenerator; @@ -33,6 +35,10 @@ import javax.persistence.TableGenerator; @TableGenerator(name = "topology_logical_task_id_generator", table = "ambari_sequences", pkColumnName = "sequence_name", valueColumnName = "sequence_value", pkColumnValue = "topology_logical_task_id_seq", initialValue = 0) +@NamedQueries({ + @NamedQuery(name = "TopologyLogicalTaskEntity.findHostTaskIdsByPhysicalTaskIds", query = "SELECT logicaltask.hostTaskId from TopologyLogicalTaskEntity logicaltask WHERE logicaltask.physicalTaskId IN :physicalTaskIds"), + @NamedQuery(name = "TopologyLogicalTaskEntity.removeByPhysicalTaskIds", query = "DELETE FROM TopologyLogicalTaskEntity logicaltask WHERE logicaltask.physicalTaskId IN :taskIds") +}) public class TopologyLogicalTaskEntity { @Id @GeneratedValue(strategy = GenerationType.TABLE, generator = "topology_logical_task_id_generator") @@ -42,12 +48,18 @@ public class TopologyLogicalTaskEntity { @Column(name = "component", length = 255) private String componentName; + @Column(name = "host_task_id", nullable = false, insertable = false, updatable = false) + private Long hostTaskId; + + @Column(name = "physical_task_id", nullable = false, insertable = false, updatable = false) + private Long physicalTaskId; + @ManyToOne @JoinColumn(name = "host_task_id", referencedColumnName = "id", nullable = false) private TopologyHostTaskEntity topologyHostTaskEntity; @OneToOne - @JoinColumn(name = "physical_task_id", referencedColumnName = "task_id") + @JoinColumn(name = "physical_task_id", referencedColumnName = "task_id", nullable = false) private HostRoleCommandEntity hostRoleCommandEntity; public Long getId() { @@ -58,14 +70,22 @@ public class TopologyLogicalTaskEntity { this.id = id; } - public Long getHostTaskId() { - return topologyHostTaskEntity != null ? topologyHostTaskEntity.getId() : null; - } - public Long getPhysicalTaskId() { return hostRoleCommandEntity != null ? hostRoleCommandEntity.getTaskId() : null; } + public void setPhysicalTaskId(Long physicalTaskId) { + this.physicalTaskId = physicalTaskId; + } + + public void setHostTaskId(Long hostTaskId) { + this.hostTaskId = hostTaskId; + } + + public Long getHostTaskId() { + return hostTaskId; + } + public String getComponentName() { return componentName; } http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java index 0b27e3b..152fde1 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeEntity.java @@ -65,6 +65,8 @@ import org.apache.ambari.server.state.stack.upgrade.UpgradeType; query = "SELECT u FROM UpgradeEntity u JOIN RequestEntity r ON u.requestId = r.requestId WHERE u.clusterId = :clusterId AND u.direction = :direction ORDER BY r.startTime DESC, u.upgradeId DESC"), @NamedQuery(name = "UpgradeEntity.findLatestForCluster", query = "SELECT u FROM UpgradeEntity u JOIN RequestEntity r ON u.requestId = r.requestId WHERE u.clusterId = :clusterId ORDER BY r.startTime DESC"), + @NamedQuery(name = "UpgradeEntity.findAllRequestIds", + query = "SELECT upgrade.requestId FROM UpgradeEntity upgrade") }) public class UpgradeEntity { http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeItemEntity.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeItemEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeItemEntity.java index 560970a..35ea769 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeItemEntity.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/UpgradeItemEntity.java @@ -27,6 +27,8 @@ import javax.persistence.GenerationType; import javax.persistence.Id; import javax.persistence.JoinColumn; import javax.persistence.ManyToOne; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; import javax.persistence.Table; import javax.persistence.TableGenerator; @@ -48,6 +50,9 @@ import org.apache.ambari.server.state.UpgradeState; pkColumnValue = "upgrade_item_id_seq", initialValue = 0, allocationSize = 1000) +@NamedQueries({ + @NamedQuery(name = "UpgradeItemEntity.findAllStageIds", query = "SELECT upgradeItem.stageId FROM UpgradeItemEntity upgradeItem") +}) public class UpgradeItemEntity { @Id http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/python/ambari-server.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/python/ambari-server.py b/ambari-server/src/main/python/ambari-server.py index 47256ab..235ef95 100755 --- a/ambari-server/src/main/python/ambari-server.py +++ b/ambari-server/src/main/python/ambari-server.py @@ -199,6 +199,12 @@ def restart(args): start(args) +@OsFamilyFuncImpl(OsFamilyImpl.DEFAULT) +def database_cleanup(args): + logger.info("Database cleanup.") + if args.silent: + stop(args) + db_cleanup(args) # # The Ambari Server status. @@ -466,7 +472,7 @@ def init_parser_options(parser): help="Print verbose status messages") parser.add_option("-s", "--silent", action="store_true", dest="silent", default=False, - help="Silently accepts default prompt values") + help="Silently accepts default prompt values. For db-cleanup command, silent mode will stop ambari server.") parser.add_option('-g', '--debug', action="store_true", dest='debug', default=False, help="Start ambari-server in debug mode") parser.add_option('-y', '--suspend-start', action="store_true", dest='suspend_start', default=False, @@ -756,7 +762,7 @@ def create_user_action_map(args, options): CHECK_DATABASE_ACTION: UserAction(check_database, options), ENABLE_STACK_ACTION: UserAction(enable_stack, options, args), SETUP_SSO_ACTION: UserActionRestart(setup_sso, options), - DB_CLEANUP_ACTION: UserAction(db_cleanup, options), + DB_CLEANUP_ACTION: UserAction(database_cleanup, options), INSTALL_MPACK_ACTION: UserAction(install_mpack, options), UNINSTALL_MPACK_ACTION: UserAction(uninstall_mpack, options), UPGRADE_MPACK_ACTION: UserAction(upgrade_mpack, options), http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/main/python/ambari_server/dbCleanup.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/python/ambari_server/dbCleanup.py b/ambari-server/src/main/python/ambari_server/dbCleanup.py index abc8267..6e16bc5 100644 --- a/ambari-server/src/main/python/ambari_server/dbCleanup.py +++ b/ambari-server/src/main/python/ambari_server/dbCleanup.py @@ -42,25 +42,29 @@ def run_db_cleanup(options): if validate_args(options): return 1 - db_title = get_db_type(get_ambari_properties()).title + status, stateDesc = is_server_runing() - confirmBackup = get_YN_input("Ambari Server configured for {0}. Confirm you have made a backup of the Ambari Server database [y/n]".format( - db_title), True) - if not confirmBackup: - print_info_msg("Ambari Server Database cleanup aborted") - return 0 + if not options.silent: + db_title = get_db_type(get_ambari_properties()).title + + confirmBackup = get_YN_input("Ambari Server configured for {0}. Confirm you have made a backup of the Ambari Server database [y/n]".format( + db_title), True) + if not confirmBackup: + print_info_msg("Ambari Server Database cleanup aborted") + return 0 + + if status: + print_error_msg("The database cleanup cannot proceed while Ambari Server is running. Please shut down Ambari first.") + return 1 + + confirm = get_YN_input( + "Ambari server is using db type {0}. Cleanable database entries older than {1} will be cleaned up. Proceed [y/n]".format( + db_title, options.cleanup_from_date), True) + if not confirm: + print_info_msg("Ambari Server Database cleanup aborted") + return 0 - status, stateDesc = is_server_runing() - if status: - print_error_msg("The database cleanup cannot proceed while Ambari Server is running. Please shut down Ambari first.") - return 1 - confirm = get_YN_input( - "Ambari server is using db type {0}. Cleanable database entries older than {1} will be cleaned up. Proceed [y/n]".format( - db_title, options.cleanup_from_date), True) - if not confirm: - print_info_msg("Ambari Server Database cleanup aborted") - return 0 jdk_path = get_java_exe_path() if jdk_path is None: @@ -101,7 +105,6 @@ def run_db_cleanup(options): # Database cleanup # def db_cleanup(options): - logger.info("Database cleanup.") return run_db_cleanup(options) http://git-wip-us.apache.org/repos/asf/ambari/blob/c23ef509/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java index 5869630..9c8eb74 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/checks/DatabaseConsistencyCheckHelperTest.java @@ -580,4 +580,70 @@ public class DatabaseConsistencyCheckHelperTest { } + @Test + public void testCheckForLargeTables() throws Exception { + EasyMockSupport easyMockSupport = new EasyMockSupport(); + final AmbariMetaInfo mockAmbariMetainfo = easyMockSupport.createNiceMock(AmbariMetaInfo.class); + final DBAccessor mockDBDbAccessor = easyMockSupport.createNiceMock(DBAccessor.class); + final Connection mockConnection = easyMockSupport.createNiceMock(Connection.class); + final Statement mockStatement = easyMockSupport.createNiceMock(Statement.class); + final EntityManager mockEntityManager = easyMockSupport.createNiceMock(EntityManager.class); + final Clusters mockClusters = easyMockSupport.createNiceMock(Clusters.class); + final OsFamily mockOSFamily = easyMockSupport.createNiceMock(OsFamily.class); + final StackManagerFactory mockStackManagerFactory = easyMockSupport.createNiceMock(StackManagerFactory.class); + + final ResultSet hostRoleCommandResultSet = easyMockSupport.createNiceMock(ResultSet.class); + final ResultSet executionCommandResultSet = easyMockSupport.createNiceMock(ResultSet.class); + final ResultSet stageResultSet = easyMockSupport.createNiceMock(ResultSet.class); + final ResultSet requestResultSet = easyMockSupport.createNiceMock(ResultSet.class); + final ResultSet alertHistoryResultSet = easyMockSupport.createNiceMock(ResultSet.class); + + final Injector mockInjector = Guice.createInjector(new AbstractModule() { + @Override + protected void configure() { + bind(AmbariMetaInfo.class).toInstance(mockAmbariMetainfo); + bind(StackManagerFactory.class).toInstance(mockStackManagerFactory); + bind(EntityManager.class).toInstance(mockEntityManager); + bind(DBAccessor.class).toInstance(mockDBDbAccessor); + bind(Clusters.class).toInstance(mockClusters); + bind(OsFamily.class).toInstance(mockOSFamily); + } + }); + + expect(hostRoleCommandResultSet.next()).andReturn(true).once(); + expect(executionCommandResultSet.next()).andReturn(true).once(); + expect(stageResultSet.next()).andReturn(true).once(); + expect(requestResultSet.next()).andReturn(true).once(); + expect(alertHistoryResultSet.next()).andReturn(true).once(); + expect(hostRoleCommandResultSet.getLong(1)).andReturn(2345L).atLeastOnce(); + expect(executionCommandResultSet.getLong(1)).andReturn(12345L).atLeastOnce(); + expect(stageResultSet.getLong(1)).andReturn(2321L).atLeastOnce(); + expect(requestResultSet.getLong(1)).andReturn(1111L).atLeastOnce(); + expect(alertHistoryResultSet.getLong(1)).andReturn(2223L).atLeastOnce(); + expect(mockDBDbAccessor.getConnection()).andReturn(mockConnection); + expect(mockDBDbAccessor.getDbType()).andReturn(DBAccessor.DbType.MYSQL); + expect(mockDBDbAccessor.getDbSchema()).andReturn("test_schema"); + expect(mockConnection.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE)).andReturn(mockStatement).anyTimes(); + expect(mockStatement.executeQuery("SELECT (data_length + index_length) \"Table Size\" " + + "FROM information_schema.TABLES WHERE table_schema = \"test_schema\" AND table_name =\"host_role_command\"")).andReturn(hostRoleCommandResultSet); + expect(mockStatement.executeQuery("SELECT (data_length + index_length) \"Table Size\" " + + "FROM information_schema.TABLES WHERE table_schema = \"test_schema\" AND table_name =\"execution_command\"")).andReturn(executionCommandResultSet); + expect(mockStatement.executeQuery("SELECT (data_length + index_length) \"Table Size\" " + + "FROM information_schema.TABLES WHERE table_schema = \"test_schema\" AND table_name =\"stage\"")).andReturn(stageResultSet); + expect(mockStatement.executeQuery("SELECT (data_length + index_length) \"Table Size\" " + + "FROM information_schema.TABLES WHERE table_schema = \"test_schema\" AND table_name =\"request\"")).andReturn(requestResultSet); + expect(mockStatement.executeQuery("SELECT (data_length + index_length) \"Table Size\" " + + "FROM information_schema.TABLES WHERE table_schema = \"test_schema\" AND table_name =\"alert_history\"")).andReturn(alertHistoryResultSet); + + DatabaseConsistencyCheckHelper.setInjector(mockInjector); + + easyMockSupport.replayAll(); + + mockAmbariMetainfo.init(); + + DatabaseConsistencyCheckHelper.resetCheckResult(); + DatabaseConsistencyCheckHelper.checkForLargeTables(); + + easyMockSupport.verifyAll(); + } }