Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 21D39200CB7 for ; Fri, 30 Jun 2017 11:00:20 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 20D24160BDD; Fri, 30 Jun 2017 09:00:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BBC8F160BEB for ; Fri, 30 Jun 2017 11:00:17 +0200 (CEST) Received: (qmail 54978 invoked by uid 500); 30 Jun 2017 09:00:16 -0000 Mailing-List: contact commits-help@atlas.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@atlas.apache.org Delivered-To: mailing list commits@atlas.apache.org Received: (qmail 54969 invoked by uid 99); 30 Jun 2017 09:00:16 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 30 Jun 2017 09:00:16 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 680E31AF989 for ; Fri, 30 Jun 2017 09:00:16 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.222 X-Spam-Level: X-Spam-Status: No, score=-4.222 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id F_PixO_BiwBn for ; Fri, 30 Jun 2017 09:00:04 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id C38655FC3A for ; Fri, 30 Jun 2017 09:00:01 +0000 (UTC) Received: (qmail 52676 invoked by uid 99); 30 Jun 2017 09:00:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 30 Jun 2017 09:00:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 98CDCE037F; Fri, 30 Jun 2017 09:00:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: madhan@apache.org To: commits@atlas.incubator.apache.org Date: Fri, 30 Jun 2017 09:00:00 -0000 Message-Id: <05f7b3615eb44299b4a11f5258fbbeab@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/5] incubator-atlas git commit: ATLAS-1880: search API with support for entity/tag attribute filters archived-at: Fri, 30 Jun 2017 09:00:20 -0000 Repository: incubator-atlas Updated Branches: refs/heads/master 8101883cc -> 9a4ed469c http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7c262b40/repository/src/main/java/org/apache/atlas/discovery/GremlinStep.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/GremlinStep.java b/repository/src/main/java/org/apache/atlas/discovery/GremlinStep.java new file mode 100644 index 0000000..5565781 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/discovery/GremlinStep.java @@ -0,0 +1,388 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.discovery; + +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.discovery.SearchParameters; +import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria; +import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria.Condition; +import org.apache.atlas.model.discovery.SearchParameters.Operator; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graphdb.AtlasEdge; +import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasGraphQuery; +import org.apache.atlas.repository.graphdb.AtlasIndexQuery; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1; +import org.apache.atlas.type.AtlasClassificationType; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasStructType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.utils.AtlasPerfTracer; +import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import static org.apache.atlas.discovery.SearchPipeline.IndexResultType; +import static org.apache.atlas.discovery.SearchPipeline.PipelineContext; +import static org.apache.atlas.discovery.SearchPipeline.PipelineStep; +import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.ComparisionOperator; +import static org.apache.atlas.repository.graphdb.AtlasGraphQuery.MatchingOperator; + +@Component +public class GremlinStep implements PipelineStep { + private static final Logger LOG = LoggerFactory.getLogger(GremlinStep.class); + private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("GremlinSearchStep"); + + private final AtlasGraph graph; + private final AtlasTypeRegistry typeRegistry; + + enum GremlinFilterQueryType { TAG, ENTITY } + + @Inject + public GremlinStep(AtlasGraph graph, AtlasTypeRegistry typeRegistry) { + this.graph = graph; + this.typeRegistry = typeRegistry; + } + + @Override + public void execute(PipelineContext context) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> GremlinStep.execute({})", context); + } + + if (context == null) { + throw new AtlasBaseException("Can't start search without any context"); + } + + AtlasPerfTracer perf = null; + + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "GremlinSearchStep.execute(" + context + ")"); + } + + final Iterator result; + + if (context.hasIndexResults()) { + // We have some results from the indexed step, let's proceed accordingly + if (context.getIndexResultType() == IndexResultType.TAG) { + // Index search was done on tag and filters + if (context.isTagProcessingComplete()) { + LOG.debug("GremlinStep.execute(): index has completely processed tag, further TAG filtering not needed"); + + Set taggedVertexGUIDs = new HashSet<>(); + + Iterator tagVertexIterator = context.getIndexResultsIterator(); + + while (tagVertexIterator.hasNext()) { + // Find out which Vertex has this outgoing edge + AtlasVertex vertex = tagVertexIterator.next().getVertex(); + Iterable edges = vertex.getEdges(AtlasEdgeDirection.IN); + + for (AtlasEdge edge : edges) { + String guid = AtlasGraphUtilsV1.getIdFromVertex(edge.getOutVertex()); + + taggedVertexGUIDs.add(guid); + } + } + + // No entities are tagged (actually this check is already done) + if (!taggedVertexGUIDs.isEmpty()) { + result = processEntity(taggedVertexGUIDs, context); + } else { + result = null; + } + } else { + result = processTagAndEntity(Collections.emptySet(), context); + } + } else if (context.getIndexResultType() == IndexResultType.TEXT) { + // Index step processed full-text; + Set entityIDs = getVertexIDs(context.getIndexResultsIterator()); + + result = processTagAndEntity(entityIDs, context); + } else if (context.getIndexResultType() == IndexResultType.ENTITY) { + // Index step processed entity and it's filters; tag filter wouldn't be set + Set entityIDs = getVertexIDs(context.getIndexResultsIterator()); + + result = processEntity(entityIDs, context); + } else { + result = null; + } + } else { + // No index results, need full processing in Gremlin + if (context.getClassificationType() != null) { + // Process tag and filters first, then entity filters + result = processTagAndEntity(Collections.emptySet(), context); + } else { + result = processEntity(Collections.emptySet(), context); + } + } + + context.setGremlinResultIterator(result); + + AtlasPerfTracer.log(perf); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== GremlinStep.execute({})", context); + } + } + + private Iterator processEntity(Set entityGUIDs, PipelineContext context) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> GremlinStep.processEntity(entityGUIDs={})", entityGUIDs); + } + + final Iterator ret; + + SearchParameters searchParameters = context.getSearchParameters(); + AtlasEntityType entityType = context.getEntityType(); + + if (entityType != null) { + AtlasGraphQuery entityFilterQuery = context.getGraphQuery("ENTITY_FILTER"); + + if (entityFilterQuery == null) { + entityFilterQuery = graph.query().in(Constants.TYPE_NAME_PROPERTY_KEY, entityType.getTypeAndAllSubTypes()); + + if (searchParameters.getEntityFilters() != null) { + toGremlinFilterQuery(GremlinFilterQueryType.ENTITY, entityType, searchParameters.getEntityFilters(), entityFilterQuery, context); + } + + if (searchParameters.getExcludeDeletedEntities()) { + entityFilterQuery.has(Constants.STATE_PROPERTY_KEY, "ACTIVE"); + } + + context.cacheGraphQuery("ENTITY_FILTER", entityFilterQuery); + } + + // Now get all vertices + if (CollectionUtils.isEmpty(entityGUIDs)) { + ret = entityFilterQuery.vertices(context.getCurrentOffset(), context.getMaxLimit()).iterator(); + } else { + AtlasGraphQuery guidQuery = graph.query().in(Constants.GUID_PROPERTY_KEY, entityGUIDs); + + if (entityFilterQuery != null) { + guidQuery.addConditionsFrom(entityFilterQuery); + } else if (searchParameters.getExcludeDeletedEntities()) { + guidQuery.has(Constants.STATE_PROPERTY_KEY, "ACTIVE"); + } + + ret = guidQuery.vertices(context.getMaxLimit()).iterator(); + } + } else if (CollectionUtils.isNotEmpty(entityGUIDs)) { + AtlasGraphQuery guidQuery = graph.query().in(Constants.GUID_PROPERTY_KEY, entityGUIDs); + + if (searchParameters.getExcludeDeletedEntities()) { + guidQuery.has(Constants.STATE_PROPERTY_KEY, "ACTIVE"); + } + + Iterable vertices = guidQuery.vertices(context.getMaxLimit()); + + ret = vertices.iterator(); + } else { + ret = null; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== GremlinStep.processEntity(entityGUIDs={})", entityGUIDs); + } + + return ret; + } + + private Iterator processTagAndEntity(Set entityGUIDs, PipelineContext context) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> GremlinStep.processTagAndEntity(entityGUIDs={})", entityGUIDs); + } + + final Iterator ret; + + AtlasClassificationType classificationType = context.getClassificationType(); + + if (classificationType != null) { + AtlasGraphQuery tagVertexQuery = context.getGraphQuery("TAG_VERTEX"); + + if (tagVertexQuery == null) { + tagVertexQuery = graph.query().in(Constants.TYPE_NAME_PROPERTY_KEY, classificationType.getTypeAndAllSubTypes()); + + SearchParameters searchParameters = context.getSearchParameters(); + + // Do tag filtering first as it'll return a smaller subset of vertices + if (searchParameters.getTagFilters() != null) { + toGremlinFilterQuery(GremlinFilterQueryType.TAG, classificationType, searchParameters.getTagFilters(), tagVertexQuery, context); + } + + context.cacheGraphQuery("TAG_VERTEX", tagVertexQuery); + } + + if (tagVertexQuery != null) { + Set taggedVertexGuids = new HashSet<>(); + // Now get all vertices after adjusting offset for each iteration + LOG.debug("Firing TAG query"); + + Iterator tagVertexIterator = tagVertexQuery.vertices(context.getCurrentOffset(), context.getMaxLimit()).iterator(); + + while (tagVertexIterator.hasNext()) { + // Find out which Vertex has this outgoing edge + Iterable edges = tagVertexIterator.next().getEdges(AtlasEdgeDirection.IN); + for (AtlasEdge edge : edges) { + String guid = AtlasGraphUtilsV1.getIdFromVertex(edge.getOutVertex()); + taggedVertexGuids.add(guid); + } + } + + entityGUIDs = taggedVertexGuids; + } + } + + if (!entityGUIDs.isEmpty()) { + ret = processEntity(entityGUIDs, context); + } else { + ret = null; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== GremlinStep.processTagAndEntity(entityGUIDs={})", entityGUIDs); + } + + return ret; + } + + private Set getVertexIDs(Iterator idxResultsIterator) { + Set guids = new HashSet<>(); + while (idxResultsIterator.hasNext()) { + AtlasVertex vertex = idxResultsIterator.next().getVertex(); + String guid = AtlasGraphUtilsV1.getIdFromVertex(vertex); + guids.add(guid); + } + return guids; + } + + private Set getVertexIDs(Iterable vertices) { + Set guids = new HashSet<>(); + for (AtlasVertex vertex : vertices) { + String guid = AtlasGraphUtilsV1.getIdFromVertex(vertex); + guids.add(guid); + } + return guids; + } + + private AtlasGraphQuery toGremlinFilterQuery(GremlinFilterQueryType queryType, AtlasStructType type, FilterCriteria criteria, + AtlasGraphQuery query, PipelineContext context) { + if (criteria.getCondition() != null) { + if (criteria.getCondition() == Condition.AND) { + for (FilterCriteria filterCriteria : criteria.getCriterion()) { + AtlasGraphQuery nestedQuery = toGremlinFilterQuery(queryType, type, filterCriteria, graph.query(), context); + query.addConditionsFrom(nestedQuery); + } + } else { + List orConditions = new LinkedList<>(); + + for (FilterCriteria filterCriteria : criteria.getCriterion()) { + AtlasGraphQuery nestedQuery = toGremlinFilterQuery(queryType, type, filterCriteria, graph.query(), context); + // FIXME: Something might not be right here as the queries are getting overwritten sometimes + orConditions.add(graph.query().createChildQuery().addConditionsFrom(nestedQuery)); + } + + if (!orConditions.isEmpty()) { + query.or(orConditions); + } + } + } else { + String attrName = criteria.getAttributeName(); + String attrValue = criteria.getAttributeValue(); + Operator operator = criteria.getOperator(); + + try { + // If attribute belongs to supertype then adjust the name accordingly + final String qualifiedAttributeName; + final boolean attrProcessed; + + if (queryType == GremlinFilterQueryType.TAG) { + qualifiedAttributeName = type.getQualifiedAttributeName(attrName); + attrProcessed = context.hasProcessedTagAttribute(qualifiedAttributeName); + } else { + qualifiedAttributeName = type.getQualifiedAttributeName(attrName); + attrProcessed = context.hasProcessedEntityAttribute(qualifiedAttributeName); + } + + // Check if the qualifiedAttribute has been processed + if (!attrProcessed) { + switch (operator) { + case LT: + query.has(qualifiedAttributeName, ComparisionOperator.LESS_THAN, attrValue); + break; + case LTE: + query.has(qualifiedAttributeName, ComparisionOperator.LESS_THAN_EQUAL, attrValue); + break; + case GT: + query.has(qualifiedAttributeName, ComparisionOperator.GREATER_THAN, attrValue); + break; + case GTE: + query.has(qualifiedAttributeName, ComparisionOperator.GREATER_THAN_EQUAL, attrValue); + break; + case EQ: + query.has(qualifiedAttributeName, ComparisionOperator.EQUAL, attrValue); + break; + case NEQ: + query.has(qualifiedAttributeName, ComparisionOperator.NOT_EQUAL, attrValue); + break; + case LIKE: + // TODO: Maybe we need to validate pattern + query.has(qualifiedAttributeName, MatchingOperator.REGEX, getLikeRegex(attrValue)); + break; + case CONTAINS: + query.has(qualifiedAttributeName, MatchingOperator.REGEX, getContainsRegex(attrValue)); + break; + case STARTS_WITH: + query.has(qualifiedAttributeName, MatchingOperator.PREFIX, attrValue); + break; + case ENDS_WITH: + query.has(qualifiedAttributeName, MatchingOperator.REGEX, getSuffixRegex(attrValue)); + case IN: + LOG.warn("{}: unsupported operator. Ignored", operator); + break; + } + } + } catch (AtlasBaseException e) { + LOG.error("toGremlinFilterQuery(): failed for attrName=" + attrName + "; operator=" + operator + "; attrValue=" + attrValue, e); + } + } + + return query; + } + + private String getContainsRegex(String attributeValue) { + return ".*" + attributeValue + ".*"; + } + + private String getSuffixRegex(String attributeValue) { + return ".*" + attributeValue; + } + + private String getLikeRegex(String attributeValue) { return ".*" + attributeValue + ".*"; } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7c262b40/repository/src/main/java/org/apache/atlas/discovery/SearchPipeline.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/SearchPipeline.java b/repository/src/main/java/org/apache/atlas/discovery/SearchPipeline.java new file mode 100644 index 0000000..0f91b2d --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/discovery/SearchPipeline.java @@ -0,0 +1,611 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.discovery; + +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.discovery.SearchParameters; +import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria; +import org.apache.atlas.model.discovery.SearchParameters.FilterCriteria.Condition; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; +import org.apache.atlas.repository.graphdb.*; +import org.apache.atlas.type.AtlasClassificationType; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasStructType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.util.SearchTracker; +import org.apache.atlas.utils.AtlasPerfTracer; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.builder.ToStringBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Component +public class SearchPipeline { + private static final Logger LOG = LoggerFactory.getLogger(SearchPipeline.class); + private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("SearchPipeline"); + + enum ExecutionMode { SOLR, GREMLIN, MIXED } + + enum IndexResultType { TAG, ENTITY, TEXT } + + private final SolrStep solrStep; + private final GremlinStep gremlinStep; + private final SearchTracker searchTracker; + private final AtlasTypeRegistry typeRegistry; + private final Configuration atlasConfiguration; + private final GraphBackedSearchIndexer indexer; + + @Inject + public SearchPipeline(SolrStep solrStep, GremlinStep gremlinStep, SearchTracker searchTracker, AtlasTypeRegistry typeRegistry, Configuration atlasConfiguration, GraphBackedSearchIndexer indexer) { + this.solrStep = solrStep; + this.gremlinStep = gremlinStep; + this.searchTracker = searchTracker; + this.typeRegistry = typeRegistry; + this.atlasConfiguration = atlasConfiguration; + this.indexer = indexer; + } + + public List run(SearchParameters searchParameters) throws AtlasBaseException { + final List ret; + + AtlasPerfTracer perf = null; + + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "SearchPipeline.run("+ searchParameters +")"); + } + + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(searchParameters.getTypeName()); + AtlasClassificationType classiType = typeRegistry.getClassificationTypeByName(searchParameters.getClassification()); + PipelineContext context = new PipelineContext(searchParameters, entityType, classiType, indexer.getVertexIndexKeys()); + String searchId = searchTracker.add(context); // For future cancellation + + try { + ExecutionMode mode = determineExecutionMode(context); + + if (LOG.isDebugEnabled()) { + LOG.debug("Execution mode {}", mode); + } + + switch (mode) { + case SOLR: + ret = runOnlySolr(context); + break; + + case GREMLIN: + ret = runOnlyGremlin(context); + break; + + case MIXED: + ret = runMixed(context); + break; + + default: + ret = Collections.emptyList(); + } + } finally { + searchTracker.remove(searchId); + + AtlasPerfTracer.log(perf); + } + + return ret; + } + + private List runOnlySolr(PipelineContext context) throws AtlasBaseException { + // Only when there's no tag and query + List results = new ArrayList<>(); + + while (results.size() < context.getSearchParameters().getLimit()) { + if (context.getForceTerminate()) { + LOG.debug("search has been terminated"); + + break; + } + + // Execute solr search only + solrStep.execute(context); + + List stepResults = getIndexResults(context); + + context.incrementSearchRound(); + + addToResult(results, stepResults, context.getSearchParameters().getLimit()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Pipeline iteration {}: stepResults={}; totalResult={}", context.getIterationCount(), stepResults.size(), results.size()); + } + + if (CollectionUtils.isEmpty(stepResults)) { + // If no result is found any subsequent iteration then just stop querying the index + break; + } + } + + if (context.getIndexResultType() == IndexResultType.TAG) { + List entityVertices = new ArrayList<>(results.size()); + + for (AtlasVertex tagVertex : results) { + Iterable edges = tagVertex.getEdges(AtlasEdgeDirection.IN); + + for (AtlasEdge edge : edges) { + AtlasVertex entityVertex = edge.getOutVertex(); + + entityVertices.add(entityVertex); + } + } + + results = entityVertices; + } + + return results; + } + + private List runOnlyGremlin(PipelineContext context) throws AtlasBaseException { + List results = new ArrayList<>(); + + while (results.size() < context.getSearchParameters().getLimit()) { + if (context.getForceTerminate()) { + LOG.debug("search has been terminated"); + + break; + } + + gremlinStep.execute(context); + + List stepResults = getGremlinResults(context); + + context.incrementSearchRound(); + + addToResult(results, stepResults, context.getSearchParameters().getLimit()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Pipeline iteration {}: stepResults={}; totalResult={}", context.getIterationCount(), stepResults.size(), results.size()); + } + + if (CollectionUtils.isEmpty(stepResults)) { + // If no result is found any subsequent iteration then just stop querying the index + break; + } + } + + return results; + } + + /* + 1. Index processes few attributes and then gremlin processes rest + 1.1 Iterate for gremlin till the index results are non null + 2. Index processes all attributes, gremlin has nothing to do + + Sometimes the result set might be less than the max limit and we need to iterate until the result set is full + or the iteration doesn't return any results + + */ + private List runMixed(PipelineContext context) throws AtlasBaseException { + List results = new ArrayList<>(); + + while (results.size() < context.getSearchParameters().getLimit()) { + if (context.getForceTerminate()) { + LOG.debug("search has been terminated"); + + break; + } + + // Execute Solr search and then pass it to the Gremlin step (if needed) + solrStep.execute(context); + + if (!context.hasIndexResults()) { + if (LOG.isDebugEnabled()) { + LOG.debug("No index results in iteration {}", context.getIterationCount()); + } + + // If no result is found any subsequent iteration then just stop querying the index + break; + } + + // Attributes partially processed by Solr, use gremlin to process remaining attribute(s) + gremlinStep.execute(context); + + context.incrementSearchRound(); + + List stepResults = getGremlinResults(context); + + addToResult(results, stepResults, context.getSearchParameters().getLimit()); + + if (LOG.isDebugEnabled()) { + LOG.debug("Pipeline iteration {}: stepResults={}; totalResult={}", context.getIterationCount(), stepResults.size(), results.size()); + } + } + + return results; + } + + private void addToResult(List result, List stepResult, int maxLimit) { + if (result != null && stepResult != null && result.size() < maxLimit) { + for (AtlasVertex vertex : stepResult) { + result.add(vertex); + + if (result.size() >= maxLimit) { + break; + } + } + } + } + + private List getIndexResults(PipelineContext pipelineContext) { + List ret = new ArrayList<>(); + + if (pipelineContext.hasIndexResults()) { + Iterator iter = pipelineContext.getIndexResultsIterator(); + + while(iter.hasNext()) { + ret.add(iter.next().getVertex()); + } + } + + return ret; + } + + private List getGremlinResults(PipelineContext pipelineContext) { + List ret = new ArrayList<>(); + + if (pipelineContext.hasGremlinResults()) { + Iterator iter = pipelineContext.getGremlinResultIterator(); + + while (iter.hasNext()) { + ret.add(iter.next()); + } + } + + return ret; + } + + private ExecutionMode determineExecutionMode(PipelineContext context) { + SearchParameters searchParameters = context.getSearchParameters(); + AtlasClassificationType classificationType = context.getClassificationType(); + AtlasEntityType entityType = context.getEntityType(); + int solrCount = 0; + int gremlinCount = 0; + + if (StringUtils.isNotEmpty(searchParameters.getQuery())) { + solrCount++; + + // __state index only exists in vertex_index + if (searchParameters.getExcludeDeletedEntities()) { + gremlinCount++; + } + } + + if (classificationType != null) { + Set typeAndAllSubTypes = classificationType.getTypeAndAllSubTypes(); + + if (typeAndAllSubTypes.size() > atlasConfiguration.getInt(Constants.INDEX_SEARCH_MAX_TAGS_COUNT, 10)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Classification type {} has too many subTypes ({}) to use in Solr search. Gremlin will be used to execute the search", + classificationType.getTypeName(), typeAndAllSubTypes.size()); + } + + gremlinCount++; + } else { + if (hasNonIndexedAttrViolation(classificationType, context.getIndexedKeys(), searchParameters.getTagFilters())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Tag filters not suitable for Solr search. Gremlin will be used to execute the search"); + } + + gremlinCount++; + } else { + solrCount++; + + // __state index only exist in vertex_index + if (searchParameters.getExcludeDeletedEntities()) { + gremlinCount++; + } + } + } + } + + if (entityType != null) { + Set typeAndAllSubTypes = entityType.getTypeAndAllSubTypes(); + + if (typeAndAllSubTypes.size() > atlasConfiguration.getInt(Constants.INDEX_SEARCH_MAX_TYPES_COUNT, 10)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Entity type {} has too many subTypes ({}) to use in Solr search. Gremlin will be used to execute the search", + entityType.getTypeName(), typeAndAllSubTypes.size()); + } + + gremlinCount++; + } else { + if (hasNonIndexedAttrViolation(entityType, context.getIndexedKeys(), searchParameters.getEntityFilters())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Entity filters not suitable for Solr search. Gremlin will be used to execute the search"); + } + + gremlinCount++; + } else { + solrCount++; + } + } + } + + ExecutionMode mode = ExecutionMode.MIXED; + + if (solrCount == 1 && gremlinCount == 0) { + mode = ExecutionMode.SOLR; + } else if (gremlinCount == 1 && solrCount == 0) { + mode = ExecutionMode.GREMLIN; + } + + return mode; + } + + // If Index can't process all attributes and any of the non-indexed attribute is present in OR nested within AND + // then the only way is Gremlin + // A violation (here) is defined as presence of non-indexed attribute within any OR clause nested under an AND clause + // the reason being that the index would not be able to process the nested OR attribute which might result in + // exclusion of valid result (vertex) + private boolean hasNonIndexedAttrViolation(AtlasStructType structType, Set indexKeys, FilterCriteria filterCriteria) { + return hasNonIndexedAttrViolation(structType, indexKeys, filterCriteria, false); + } + + private boolean hasNonIndexedAttrViolation(AtlasStructType structType, Set indexKeys, FilterCriteria filterCriteria, boolean enclosedInOrCondition) { + if (filterCriteria == null) { + return false; + } + + boolean ret = false; + Condition filterCondition = filterCriteria.getCondition(); + List criterion = filterCriteria.getCriterion(); + + if (filterCondition != null && CollectionUtils.isNotEmpty(criterion)) { + if (!enclosedInOrCondition) { + enclosedInOrCondition = filterCondition == Condition.OR; + } + + // If we have nested criterion let's find any nested ORs with non-indexed attr + for (FilterCriteria criteria : criterion) { + ret |= hasNonIndexedAttrViolation(structType, indexKeys, criteria, enclosedInOrCondition); + + if (ret) { + break; + } + } + } else if (StringUtils.isNotEmpty(filterCriteria.getAttributeName())) { + // If attribute qualified name doesn't exist in the vertex index we potentially might have a problem + try { + String qualifiedAttributeName = structType.getQualifiedAttributeName(filterCriteria.getAttributeName()); + + ret = CollectionUtils.isEmpty(indexKeys) || !indexKeys.contains(qualifiedAttributeName); + + if (ret) { + LOG.warn("search includes non-indexed attribute '{}'; might cause poor performance", qualifiedAttributeName); + } + } catch (AtlasBaseException e) { + LOG.warn(e.getMessage()); + + ret = true; + } + } + + // return ret && enclosedInOrCondition; + + return ret; + } + + public interface PipelineStep { + void execute(PipelineContext context) throws AtlasBaseException; + } + + public static class PipelineContext { + // TODO: See if anything can be cached in the context + + private final SearchParameters searchParameters; + private final AtlasEntityType entityType; + private final AtlasClassificationType classificationType; + private final Set indexedKeys; + + private int iterationCount; + private boolean forceTerminate; + private int currentOffset; + private int maxLimit; + + // Continuous processing stuff + private Set tagSearchAttributes = new HashSet<>(); + private Set entitySearchAttributes = new HashSet<>(); + private Set tagAttrProcessedBySolr = new HashSet<>(); + private Set entityAttrProcessedBySolr = new HashSet<>(); + + // Results related stuff + private IndexResultType indexResultType; + private Iterator indexResultsIterator; + private Iterator gremlinResultIterator; + + private Map cachedIndexQueries = new HashMap<>(); + private Map cachedGraphQueries = new HashMap<>(); + + public PipelineContext(SearchParameters searchParameters, AtlasEntityType entityType, AtlasClassificationType classificationType, Set indexedKeys) { + this.searchParameters = searchParameters; + this.entityType = entityType; + this.classificationType = classificationType; + this.indexedKeys = indexedKeys; + + currentOffset = searchParameters.getOffset(); + maxLimit = searchParameters.getLimit(); + } + + public SearchParameters getSearchParameters() { + return searchParameters; + } + + public AtlasEntityType getEntityType() { + return entityType; + } + + public AtlasClassificationType getClassificationType() { + return classificationType; + } + + public Set getIndexedKeys() { return indexedKeys; } + + public int getIterationCount() { + return iterationCount; + } + + public boolean getForceTerminate() { + return forceTerminate; + } + + public void setForceTerminate(boolean forceTerminate) { + this.forceTerminate = forceTerminate; + } + + public boolean hasProcessedTagAttribute(String attributeName) { + return tagAttrProcessedBySolr.contains(attributeName); + } + + public boolean hasProcessedEntityAttribute(String attributeName) { + return entityAttrProcessedBySolr.contains(attributeName); + } + + public Iterator getIndexResultsIterator() { + return indexResultsIterator; + } + + public void setIndexResultsIterator(Iterator indexResultsIterator) { + this.indexResultsIterator = indexResultsIterator; + } + + public Iterator getGremlinResultIterator() { + return gremlinResultIterator; + } + + public void setGremlinResultIterator(Iterator gremlinResultIterator) { + this.gremlinResultIterator = gremlinResultIterator; + } + + public boolean hasIndexResults() { + return null != indexResultsIterator && indexResultsIterator.hasNext(); + } + + public boolean hasGremlinResults() { + return null != gremlinResultIterator && gremlinResultIterator.hasNext(); + } + + + public boolean isTagProcessingComplete() { + return CollectionUtils.isEmpty(tagSearchAttributes) || + CollectionUtils.isEqualCollection(tagSearchAttributes, tagAttrProcessedBySolr); + } + + public boolean isEntityProcessingComplete() { + return CollectionUtils.isEmpty(entitySearchAttributes) || + CollectionUtils.isEqualCollection(entitySearchAttributes, entityAttrProcessedBySolr); + } + + public boolean isProcessingComplete() { + return isTagProcessingComplete() && isEntityProcessingComplete(); + } + + public void incrementOffset(int increment) { + currentOffset += increment; + } + + public void incrementSearchRound() { + iterationCount ++; + incrementOffset(searchParameters.getLimit()); + } + + public int getCurrentOffset() { + return currentOffset; + } + + public boolean addTagSearchAttribute(String attribute) { + return tagSearchAttributes.add(attribute); + } + + public boolean addProcessedTagAttribute(String attribute) { + return tagAttrProcessedBySolr.add(attribute); + } + + public boolean addEntitySearchAttribute(String attribute) { + return tagSearchAttributes.add(attribute); + } + + public boolean addProcessedEntityAttribute(String attribute) { + return entityAttrProcessedBySolr.add(attribute); + } + + public void cacheGraphQuery(String name, AtlasGraphQuery graphQuery) { + cachedGraphQueries.put(name, graphQuery); + } + + public void cacheIndexQuery(String name, AtlasIndexQuery indexQuery) { + cachedIndexQueries.put(name, indexQuery); + } + + public AtlasIndexQuery getIndexQuery(String name){ + return cachedIndexQueries.get(name); + } + + public AtlasGraphQuery getGraphQuery(String name) { + return cachedGraphQueries.get(name); + } + + public IndexResultType getIndexResultType() { + return indexResultType; + } + + public void setIndexResultType(IndexResultType indexResultType) { + this.indexResultType = indexResultType; + } + + public int getMaxLimit() { + return maxLimit; + } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("iterationCount", iterationCount) + .append("forceTerminate", forceTerminate) + .append("currentOffset", currentOffset) + .append("maxLimit", maxLimit) + .append("searchParameters", searchParameters) + .append("tagSearchAttributes", tagSearchAttributes) + .append("entitySearchAttributes", entitySearchAttributes) + .append("tagAttrProcessedBySolr", tagAttrProcessedBySolr) + .append("entityAttrProcessedBySolr", entityAttrProcessedBySolr) + .append("indexResultType", indexResultType) + .append("cachedIndexQueries", cachedIndexQueries) + .append("cachedGraphQueries", cachedGraphQueries) + .toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7c262b40/repository/src/main/java/org/apache/atlas/discovery/SolrStep.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/SolrStep.java b/repository/src/main/java/org/apache/atlas/discovery/SolrStep.java new file mode 100644 index 0000000..6a5dd5a --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/discovery/SolrStep.java @@ -0,0 +1,288 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.discovery; + +import org.apache.atlas.discovery.SearchPipeline.IndexResultType; +import org.apache.atlas.discovery.SearchPipeline.PipelineContext; +import org.apache.atlas.discovery.SearchPipeline.PipelineStep; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.discovery.SearchParameters; +import org.apache.atlas.model.discovery.SearchParameters.Operator; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasIndexQuery; +import org.apache.atlas.type.*; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import static org.apache.atlas.model.discovery.SearchParameters.*; + +@Component +public class SolrStep implements PipelineStep { + private static final Logger LOG = LoggerFactory.getLogger(SolrStep.class); + + private static final Pattern STRAY_AND_PATTERN = Pattern.compile("(AND\\s+)+\\)"); + private static final Pattern STRAY_OR_PATTERN = Pattern.compile("(OR\\s+)+\\)"); + private static final Pattern STRAY_ELIPSIS_PATTERN = Pattern.compile("(\\(\\s*)\\)"); + private static final String AND_STR = " AND "; + private static final String EMPTY_STRING = ""; + private static final String SPACE_STRING = " "; + private static final String BRACE_OPEN_STR = "( "; + private static final String BRACE_CLOSE_STR = " )"; + + private static final Map operatorMap = new HashMap<>(); + + static + { + operatorMap.put(Operator.LT,"v.\"%s\": [* TO %s}"); + operatorMap.put(Operator.GT,"v.\"%s\": {%s TO *]"); + operatorMap.put(Operator.LTE,"v.\"%s\": [* TO %s]"); + operatorMap.put(Operator.GTE,"v.\"%s\": [%s TO *]"); + operatorMap.put(Operator.EQ,"v.\"%s\": %s"); + operatorMap.put(Operator.NEQ,"v.\"%s\": (NOT %s)"); + operatorMap.put(Operator.IN, "v.\"%s\": (%s)"); + operatorMap.put(Operator.LIKE, "v.\"%s\": (%s)"); + operatorMap.put(Operator.STARTS_WITH, "v.\"%s\": (%s*)"); + operatorMap.put(Operator.ENDS_WITH, "v.\"%s\": (*%s)"); + operatorMap.put(Operator.CONTAINS, "v.\"%s\": (*%s*)"); + } + + private final AtlasGraph graph; + + @Inject + public SolrStep(AtlasGraph graph) { + this.graph = graph; + } + + @Override + public void execute(PipelineContext context) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("==> SolrStep.execute({})", context); + } + + if (context == null) { + throw new AtlasBaseException("Can't start search without any context"); + } + + SearchParameters searchParameters = context.getSearchParameters(); + + final Iterator result; + + if (StringUtils.isNotEmpty(searchParameters.getQuery())) { + result = executeAgainstFulltextIndex(context); + } else { + result = executeAgainstVertexIndex(context); + } + + context.setIndexResultsIterator(result); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== SolrStep.execute({})", context); + } + } + + private Iterator executeAgainstFulltextIndex(PipelineContext context) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> SolrStep.executeAgainstFulltextIndex()"); + } + + final Iterator ret; + + AtlasIndexQuery query = context.getIndexQuery("FULLTEXT"); + + if (query == null) { + // Compute only once + SearchParameters searchParameters = context.getSearchParameters(); + String indexQuery = String.format("v.\"%s\":(%s)", Constants.ENTITY_TEXT_PROPERTY_KEY, searchParameters.getQuery()); + + query = graph.indexQuery(Constants.FULLTEXT_INDEX, indexQuery); + + context.cacheIndexQuery("FULLTEXT", query); + } + + context.setIndexResultType(IndexResultType.TEXT); + + ret = query.vertices(context.getCurrentOffset(), context.getMaxLimit()); + + if (LOG.isDebugEnabled()) { + LOG.debug("<== SolrStep.executeAgainstFulltextIndex()"); + } + + return ret; + } + + private Iterator executeAgainstVertexIndex(PipelineContext context) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> SolrStep.executeAgainstVertexIndex()"); + } + + final Iterator ret; + + SearchParameters searchParameters = context.getSearchParameters(); + AtlasIndexQuery query = context.getIndexQuery("VERTEX_INDEX"); + + if (query == null) { + StringBuilder solrQuery = new StringBuilder(); + + // If tag is specified then let's start processing using tag and it's attributes, entity filters will + // be pushed to Gremlin + if (context.getClassificationType() != null) { + context.setIndexResultType(IndexResultType.TAG); + + constructTypeTestQuery(solrQuery, context.getClassificationType().getTypeAndAllSubTypes()); + constructFilterQuery(solrQuery, context.getClassificationType(), searchParameters.getTagFilters(), context); + } else if (context.getEntityType() != null) { + context.setIndexResultType(IndexResultType.ENTITY); + + constructTypeTestQuery(solrQuery, context.getEntityType().getTypeAndAllSubTypes()); + constructFilterQuery(solrQuery, context.getEntityType(), searchParameters.getEntityFilters(), context); + + // Set the status flag + if (searchParameters.getExcludeDeletedEntities()) { + if (solrQuery.length() > 0) { + solrQuery.append(" AND "); + } + + solrQuery.append("v.\"__state\":").append("ACTIVE"); + } + } + + // No query was formed, doesn't make sense to do anything beyond this point + if (solrQuery.length() > 0) { + String validSolrQuery = STRAY_AND_PATTERN.matcher(solrQuery).replaceAll(")"); + validSolrQuery = STRAY_OR_PATTERN.matcher(validSolrQuery).replaceAll(")"); + validSolrQuery = STRAY_ELIPSIS_PATTERN.matcher(validSolrQuery).replaceAll(EMPTY_STRING); + + query = graph.indexQuery(Constants.VERTEX_INDEX, validSolrQuery); + context.cacheIndexQuery("VERTEX_INDEX", query); + } + } + + // Execute solr query and return the index results in the context + if (query != null) { + ret = query.vertices(context.getCurrentOffset(), context.getMaxLimit()); + } else { + ret = null; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== SolrStep.executeAgainstVertexIndex()"); + } + + return ret; + } + + private void constructTypeTestQuery(StringBuilder solrQuery, Set typeAndAllSubTypes) { + String typeAndSubtypesString = StringUtils.join(typeAndAllSubTypes, SPACE_STRING); + + solrQuery.append("v.\"__typeName\": (") + .append(typeAndSubtypesString) + .append(")"); + } + + private void constructFilterQuery(StringBuilder solrQuery, AtlasStructType type, FilterCriteria filterCriteria, PipelineContext context) { + if (filterCriteria != null) { + LOG.debug("Processing Filters"); + + String filterQuery = toSolrQuery(type, filterCriteria, context); + + if (StringUtils.isNotEmpty(filterQuery)) { + solrQuery.append(AND_STR).append(filterQuery); + } + } + } + + private String toSolrQuery(AtlasStructType type, FilterCriteria criteria, PipelineContext context) { + return toSolrQuery(type, criteria, context, new StringBuilder()); + } + + private String toSolrQuery(AtlasStructType type, FilterCriteria criteria, PipelineContext context, StringBuilder sb) { + if (criteria.getCondition() != null && CollectionUtils.isNotEmpty(criteria.getCriterion())) { + StringBuilder nestedExpression = new StringBuilder(); + + for (FilterCriteria filterCriteria : criteria.getCriterion()) { + String nestedQuery = toSolrQuery(type, filterCriteria, context); + + if (StringUtils.isNotEmpty(nestedQuery)) { + if (nestedExpression.length() > 0) { + nestedExpression.append(SPACE_STRING).append(criteria.getCondition()).append(SPACE_STRING); + } + + nestedExpression.append(nestedQuery); + } + } + + return nestedExpression.length() > 0 ? sb.append(BRACE_OPEN_STR).append(nestedExpression.toString()).append(BRACE_CLOSE_STR).toString() : EMPTY_STRING; + } else { + return toSolrExpression(type, criteria.getAttributeName(), criteria.getOperator(), criteria.getAttributeValue(), context); + } + } + + private String toSolrExpression(AtlasStructType type, String attrName, Operator op, String attrVal, PipelineContext context) { + String ret = EMPTY_STRING; + + try { + String indexKey = type.getQualifiedAttributeName(attrName); + AtlasType attributeType = type.getAttributeType(attrName); + + switch (context.getIndexResultType()) { + case TAG: + context.addTagSearchAttribute(indexKey); + break; + + case ENTITY: + context.addEntitySearchAttribute(indexKey); + break; + + default: + // Do nothing + } + + if (attributeType != null && AtlasTypeUtil.isBuiltInType(attributeType.getTypeName()) && context.getIndexedKeys().contains(indexKey)) { + if (operatorMap.get(op) != null) { + // If there's a chance of multi-value then we need some additional processing here + switch (context.getIndexResultType()) { + case TAG: + context.addProcessedTagAttribute(indexKey); + break; + + case ENTITY: + context.addProcessedEntityAttribute(indexKey); + break; + } + + ret = String.format(operatorMap.get(op), indexKey, attrVal); + } + } + } catch (AtlasBaseException ex) { + LOG.warn(ex.getMessage()); + } + + return ret; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7c262b40/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java index 35dbf6c..94b6092 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java @@ -68,8 +68,10 @@ import java.math.BigInteger; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.*; @@ -96,7 +98,10 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang //allows injection of a dummy graph for testing private IAtlasGraphProvider provider; - + + private boolean recomputeIndexedKeys = true; + private Set vertexIndexKeys = new HashSet<>(); + @Inject public GraphBackedSearchIndexer(AtlasTypeRegistry typeRegistry) throws AtlasException { this(new AtlasGraphProvider(), ApplicationProperties.get(), typeRegistry); @@ -130,6 +135,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang if (management.containsPropertyKey(Constants.VERTEX_TYPE_PROPERTY_KEY)) { LOG.info("Global indexes already exist for graph"); management.commit(); + return; } @@ -192,7 +198,6 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang throw new RepositoryException(t); } } - private void createFullTextIndex(AtlasGraphManagement management) { AtlasPropertyKey fullText = @@ -247,6 +252,34 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang onAdd(dataTypes); } + public Set getVertexIndexKeys() { + if (recomputeIndexedKeys) { + AtlasGraphManagement management = null; + + try { + management = provider.get().getManagementSystem(); + } catch (RepositoryException excp) { + LOG.error("failed to get indexedKeys from graph", excp); + } + + if (management != null) { + recomputeIndexedKeys = false; + + AtlasGraphIndex vertexIndex = management.getGraphIndex(Constants.VERTEX_INDEX); + + Set indexKeys = new HashSet<>(); + + for (AtlasPropertyKey fieldKey : vertexIndex.getFieldKeys()) { + indexKeys.add(fieldKey.getName()); + } + + vertexIndexKeys = indexKeys; + } + } + + return vertexIndexKeys; + } + private void addIndexForType(AtlasGraphManagement management, AtlasBaseTypeDef typeDef) { if (typeDef instanceof AtlasEnumDef) { // Only handle complex types like Struct, Classification and Entity @@ -577,6 +610,8 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang private void commit(AtlasGraphManagement management) throws IndexException { try { management.commit(); + + recomputeIndexedKeys = true; } catch (Exception e) { LOG.error("Index commit failed", e); throw new IndexException("Index commit failed ", e); @@ -586,6 +621,8 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang private void rollback(AtlasGraphManagement management) throws IndexException { try { management.rollback(); + + recomputeIndexedKeys = true; } catch (Exception e) { LOG.error("Index rollback failed ", e); throw new IndexException("Index rollback failed ", e); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7c262b40/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java index 3411f8d..9221717 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java @@ -30,12 +30,14 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; +import javax.inject.Inject; import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileNotFoundException; - +@Component public class ImportService { private static final Logger LOG = LoggerFactory.getLogger(ImportService.class); @@ -46,6 +48,7 @@ public class ImportService { private long startTimestamp; private long endTimestamp; + @Inject public ImportService(final AtlasTypeDefStore typeDefStore, final AtlasEntityStore entityStore, AtlasTypeRegistry typeRegistry) { this.typeDefStore = typeDefStore; this.entityStore = entityStore; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7c262b40/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java index 42bd58f..7b3f1e6 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasGraphUtilsV1.java @@ -39,7 +39,10 @@ import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.Collection; +import java.util.Date; +import java.util.Iterator; +import java.util.Map; /** * Utility methods for Graph. http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7c262b40/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java index 9a8695a..a5b5730 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/EntityGraphRetriever.java @@ -46,10 +46,12 @@ import org.slf4j.LoggerFactory; import java.math.BigDecimal; import java.math.BigInteger; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_BIGDECIMAL; import static org.apache.atlas.model.typedef.AtlasBaseTypeDef.ATLAS_TYPE_BIGINTEGER; @@ -123,7 +125,7 @@ public final class EntityGraphRetriever { } public AtlasEntityHeader toAtlasEntityHeader(AtlasVertex entityVertex) throws AtlasBaseException { - return entityVertex != null ? mapVertexToAtlasEntityHeader(entityVertex) : null; + return toAtlasEntityHeader(entityVertex, Collections.emptySet()); } private AtlasVertex getEntityVertex(String guid) throws AtlasBaseException { @@ -185,6 +187,10 @@ public final class EntityGraphRetriever { } private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex) throws AtlasBaseException { + return mapVertexToAtlasEntityHeader(entityVertex, Collections.emptySet()); + } + + private AtlasEntityHeader mapVertexToAtlasEntityHeader(AtlasVertex entityVertex, Set attributes) throws AtlasBaseException { AtlasEntityHeader ret = new AtlasEntityHeader(); String typeName = entityVertex.getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class); @@ -218,6 +224,20 @@ public final class EntityGraphRetriever { if (displayText != null) { ret.setDisplayText(displayText.toString()); } + + if (CollectionUtils.isNotEmpty(attributes)) { + for (String attrName : attributes) { + if (ret.hasAttribute(attrName)) { + continue; + } + + Object attrValue = getVertexAttribute(entityVertex, entityType.getAttribute(attrName)); + + if (attrValue != null) { + ret.setAttribute(attrName, attrValue); + } + } + } } return ret; @@ -556,4 +576,8 @@ public final class EntityGraphRetriever { private Object getVertexAttribute(AtlasVertex vertex, AtlasAttribute attribute) throws AtlasBaseException { return vertex != null && attribute != null ? mapVertexToAttribute(vertex, attribute, null) : null; } + + public AtlasEntityHeader toAtlasEntityHeader(AtlasVertex atlasVertex, Set attributes) throws AtlasBaseException { + return atlasVertex != null ? mapVertexToAtlasEntityHeader(atlasVertex, attributes) : null; + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7c262b40/repository/src/main/java/org/apache/atlas/util/SearchTracker.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/util/SearchTracker.java b/repository/src/main/java/org/apache/atlas/util/SearchTracker.java new file mode 100644 index 0000000..15a8c20 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/util/SearchTracker.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.util; + +import org.apache.atlas.annotation.AtlasService; +import org.apache.atlas.discovery.SearchPipeline.PipelineContext; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +@AtlasService +public class SearchTracker { + private Map activeSearches = new HashMap<>(); + + /** + * + * @param context + */ + public String add(PipelineContext context) { + String searchId = Thread.currentThread().getName(); + + activeSearches.put(searchId, context); + + return searchId; + } + + /** + * + * @param searchId + * @return + */ + public PipelineContext terminate(String searchId) { + PipelineContext ret = null; + + if (activeSearches.containsKey(searchId)) { + PipelineContext pipelineToTerminate = activeSearches.remove(searchId); + + pipelineToTerminate.setForceTerminate(true); + + ret = pipelineToTerminate; + } + + return ret; + } + + public PipelineContext remove(String id) { + return activeSearches.remove(id); + } + + /** + * + * @return + */ + public Set getActiveSearches() { + return activeSearches.keySet(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7c262b40/repository/src/test/java/org/apache/atlas/TestModules.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/TestModules.java b/repository/src/test/java/org/apache/atlas/TestModules.java index fa2ac0d..d0da030 100644 --- a/repository/src/test/java/org/apache/atlas/TestModules.java +++ b/repository/src/test/java/org/apache/atlas/TestModules.java @@ -17,19 +17,14 @@ */ package org.apache.atlas; +import com.google.inject.AbstractModule; import com.google.inject.Binder; import com.google.inject.Provider; import com.google.inject.Singleton; import com.google.inject.matcher.Matchers; import com.google.inject.multibindings.Multibinder; import org.apache.atlas.annotation.GraphTransaction; -import org.apache.atlas.discovery.AtlasDiscoveryService; -import org.apache.atlas.discovery.AtlasLineageService; -import org.apache.atlas.discovery.DataSetLineageService; -import org.apache.atlas.discovery.DiscoveryService; -import org.apache.atlas.discovery.EntityDiscoveryService; -import org.apache.atlas.discovery.EntityLineageService; -import org.apache.atlas.discovery.LineageService; +import org.apache.atlas.discovery.*; import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService; import org.apache.atlas.graph.GraphSandboxUtil; import org.apache.atlas.listener.EntityChangeListener; @@ -61,6 +56,7 @@ import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.typesystem.types.TypeSystem; import org.apache.atlas.typesystem.types.cache.TypeCache; import org.apache.atlas.util.AtlasRepositoryConfiguration; +import org.apache.atlas.util.SearchTracker; import org.apache.commons.configuration.Configuration; import org.mockito.Mockito; import org.slf4j.Logger; @@ -76,7 +72,7 @@ public class TestModules { } // Test only DI modules - public static class TestOnlyModule extends com.google.inject.AbstractModule { + public static class TestOnlyModule extends AbstractModule { private static final Logger LOG = LoggerFactory.getLogger(TestOnlyModule.class); @@ -147,6 +143,11 @@ public class TestModules { typeDefChangeListenerMultibinder.addBinding().to(DefaultMetadataService.class); typeDefChangeListenerMultibinder.addBinding().to(GraphBackedSearchIndexer.class).asEagerSingleton(); + bind(SearchPipeline.class).asEagerSingleton(); + bind(SearchTracker.class).asEagerSingleton(); + bind(SolrStep.class).asEagerSingleton(); + bind(GremlinStep.class).asEagerSingleton(); + bind(AtlasEntityStore.class).to(AtlasEntityStoreV1.class); bind(AtlasRelationshipStore.class).to(AtlasRelationshipStoreV1.class); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7c262b40/repository/src/test/java/org/apache/atlas/services/EntityDiscoveryServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/services/EntityDiscoveryServiceTest.java b/repository/src/test/java/org/apache/atlas/services/EntityDiscoveryServiceTest.java index 5d5b043..dfb2ee2 100644 --- a/repository/src/test/java/org/apache/atlas/services/EntityDiscoveryServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/services/EntityDiscoveryServiceTest.java @@ -17,6 +17,7 @@ */ package org.apache.atlas.services; +import org.apache.atlas.TestModules; import org.apache.atlas.discovery.EntityDiscoveryService; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.typedef.AtlasEntityDef; @@ -24,12 +25,16 @@ import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.commons.lang.StringUtils; import org.powermock.reflect.Whitebox; import org.testng.annotations.BeforeClass; +import org.testng.annotations.Guice; import org.testng.annotations.Test; +import javax.inject.Inject; + import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; +@Guice(modules = TestModules.TestOnlyModule.class) public class EntityDiscoveryServiceTest { private final String TEST_TYPE = "test"; @@ -47,6 +52,9 @@ public class EntityDiscoveryServiceTest { private final int maxTypesCountInIdxQuery = 10; + @Inject + EntityDiscoveryService discoveryService; + @BeforeClass public void init() throws AtlasBaseException { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7c262b40/webapp/src/main/java/org/apache/atlas/web/filters/StaleTransactionCleanupFilter.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/filters/StaleTransactionCleanupFilter.java b/webapp/src/main/java/org/apache/atlas/web/filters/StaleTransactionCleanupFilter.java index cdb9064..8e74d39 100644 --- a/webapp/src/main/java/org/apache/atlas/web/filters/StaleTransactionCleanupFilter.java +++ b/webapp/src/main/java/org/apache/atlas/web/filters/StaleTransactionCleanupFilter.java @@ -50,7 +50,7 @@ public class StaleTransactionCleanupFilter implements Filter { @Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain filterChain) throws IOException, ServletException { - LOG.info("Cleaning stale transactions"); + LOG.debug("Cleaning stale transactions"); AtlasGraphProvider.getGraphInstance().rollback(); filterChain.doFilter(request, response); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7c262b40/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java index 8c5623f..1a9f57a 100755 --- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java @@ -25,6 +25,7 @@ import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.authorize.AtlasActionTypes; import org.apache.atlas.authorize.AtlasResourceTypes; import org.apache.atlas.authorize.simple.AtlasAuthorizationUtils; +import org.apache.atlas.discovery.SearchPipeline; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasExportResult; @@ -35,11 +36,9 @@ import org.apache.atlas.repository.impexp.ExportService; import org.apache.atlas.repository.impexp.ImportService; import org.apache.atlas.repository.impexp.ZipSink; import org.apache.atlas.repository.impexp.ZipSource; -import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.services.MetricsService; -import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.type.AtlasType; -import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.util.SearchTracker; import org.apache.atlas.web.filters.AtlasCSRFPreventionFilter; import org.apache.atlas.web.service.ServiceState; import org.apache.atlas.web.util.Servlets; @@ -51,7 +50,6 @@ import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.context.ApplicationContext; import org.springframework.security.core.Authentication; import org.springframework.security.core.GrantedAuthority; import org.springframework.security.core.context.SecurityContextHolder; @@ -62,9 +60,11 @@ import javax.inject.Singleton; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; +import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.WebApplicationException; @@ -109,14 +109,10 @@ public class AdminResource { private final ServiceState serviceState; private final MetricsService metricsService; - private final AtlasTypeRegistry typeRegistry; - private final AtlasTypeDefStore typesDefStore; - private final AtlasEntityStore entityStore; private static Configuration atlasProperties; private final ExportService exportService; - - @Inject - ApplicationContext applicationContext; + private final ImportService importService; + private final SearchTracker activeSearches; static { try { @@ -128,15 +124,13 @@ public class AdminResource { @Inject public AdminResource(ServiceState serviceState, MetricsService metricsService, - AtlasTypeRegistry typeRegistry, AtlasTypeDefStore typeDefStore, - AtlasEntityStore entityStore, ExportService exportService) { + ExportService exportService, ImportService importService, SearchTracker activeSearches) { this.serviceState = serviceState; this.metricsService = metricsService; - this.typeRegistry = typeRegistry; - this.typesDefStore = typeDefStore; - this.entityStore = entityStore; this.exportService = exportService; - this.importExportOperationLock = new ReentrantLock(); + this.importService = importService; + this.activeSearches = activeSearches; + importExportOperationLock = new ReentrantLock(); } /** @@ -377,7 +371,6 @@ public class AdminResource { try { AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class); - ImportService importService = new ImportService(this.typesDefStore, this.entityStore, this.typeRegistry); ZipSource zipSource = new ZipSource(inputStream); result = importService.run(zipSource, request, Servlets.getUserName(httpServletRequest), @@ -412,7 +405,6 @@ public class AdminResource { try { AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class); - ImportService importService = new ImportService(this.typesDefStore, this.entityStore, this.typeRegistry); result = importService.run(request, Servlets.getUserName(httpServletRequest), Servlets.getHostName(httpServletRequest), AtlasAuthorizationUtils.getRequestIpAddress(httpServletRequest)); @@ -431,6 +423,21 @@ public class AdminResource { return result; } + @GET + @Path("activeSearches") + @Produces(Servlets.JSON_MEDIA_TYPE) + public Set getActiveSearches() { + return activeSearches.getActiveSearches(); + } + + @DELETE + @Path("activeSearches/{id}") + @Produces(Servlets.JSON_MEDIA_TYPE) + public boolean terminateActiveSearch(@PathParam("id") String searchId) { + SearchPipeline.PipelineContext terminate = activeSearches.terminate(searchId); + return null != terminate; + } + private String getEditableEntityTypes(Configuration config) { String ret = DEFAULT_EDITABLE_ENTITY_TYPES; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7c262b40/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java b/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java index ea55021..dde300e 100644 --- a/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java +++ b/webapp/src/main/java/org/apache/atlas/web/rest/DiscoveryREST.java @@ -21,8 +21,10 @@ import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.discovery.AtlasDiscoveryService; import org.apache.atlas.model.discovery.AtlasSearchResult; +import org.apache.atlas.model.discovery.SearchParameters; import org.apache.atlas.utils.AtlasPerfTracer; import org.apache.atlas.web.util.Servlets; +import org.apache.commons.collections.CollectionUtils; import org.springframework.stereotype.Service; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -31,6 +33,7 @@ import javax.inject.Inject; import javax.inject.Singleton; import javax.ws.rs.Consumes; import javax.ws.rs.GET; +import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; @@ -213,6 +216,50 @@ public class DiscoveryREST { } } + /** + * Attribute based search for entities satisfying the search parameters + * @param parameters Search parameters + * @return Atlas search result + * @throws AtlasBaseException + * + * @HTTP 200 On successful search + * @HTTP 400 Tag/Entity doesn't exist or Tag/entity filter is present without tag/type name + */ + @Path("basic") + @POST + @Consumes(Servlets.JSON_MEDIA_TYPE) + @Produces(Servlets.JSON_MEDIA_TYPE) + public AtlasSearchResult searchWithParameters(SearchParameters parameters) throws AtlasBaseException { + AtlasPerfTracer perf = null; + + try { + if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { + perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "DiscoveryREST.searchWithParameters("+ parameters + ")"); + } + + if (parameters.getLimit() < 0 || parameters.getOffset() < 0) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "Limit/offset should be non-negative"); + } + + if (StringUtils.isEmpty(parameters.getTypeName()) && !isEmpty(parameters.getEntityFilters())) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "EntityFilters specified without Type name"); + } + + if (StringUtils.isEmpty(parameters.getClassification()) && !isEmpty(parameters.getTagFilters())) { + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST, "TagFilters specified without tag name"); + } + + return atlasDiscoveryService.searchUsingBasicQuery(parameters); + } finally { + AtlasPerfTracer.log(perf); + } + } + + private boolean isEmpty(SearchParameters.FilterCriteria filterCriteria) { + return filterCriteria == null || + (StringUtils.isEmpty(filterCriteria.getAttributeName()) && CollectionUtils.isEmpty(filterCriteria.getCriterion())); + } + private String escapeTypeName(String typeName) { String ret; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7c262b40/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java index 1fe3119..c0bbf09 100644 --- a/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java +++ b/webapp/src/test/java/org/apache/atlas/web/resources/AdminResourceTest.java @@ -48,7 +48,7 @@ public class AdminResourceTest { when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE); - AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null); + AdminResource adminResource = new AdminResource(serviceState, null, null, null, null); Response response = adminResource.getStatus(); assertEquals(response.getStatus(), HttpServletResponse.SC_OK); JSONObject entity = (JSONObject) response.getEntity(); @@ -59,7 +59,7 @@ public class AdminResourceTest { public void testResourceGetsValueFromServiceState() throws JSONException { when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); - AdminResource adminResource = new AdminResource(serviceState, null, null, null, null, null); + AdminResource adminResource = new AdminResource(serviceState, null, null, null, null); Response response = adminResource.getStatus(); verify(serviceState).getState();