Return-Path: X-Original-To: apmail-ranger-commits-archive@www.apache.org Delivered-To: apmail-ranger-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id ECF4E18B36 for ; Wed, 11 Nov 2015 20:58:59 +0000 (UTC) Received: (qmail 37923 invoked by uid 500); 11 Nov 2015 20:58:59 -0000 Delivered-To: apmail-ranger-commits-archive@ranger.apache.org Received: (qmail 37894 invoked by uid 500); 11 Nov 2015 20:58:59 -0000 Mailing-List: contact commits-help@ranger.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ranger.incubator.apache.org Delivered-To: mailing list commits@ranger.incubator.apache.org Received: (qmail 37885 invoked by uid 99); 11 Nov 2015 20:58:59 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Nov 2015 20:58:59 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 480981A5AA3 for ; Wed, 11 Nov 2015 20:58:59 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.791 X-Spam-Level: * X-Spam-Status: No, score=1.791 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id OE7lhqz0wKfm for ; Wed, 11 Nov 2015 20:58:48 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 9B50B43C8F for ; Wed, 11 Nov 2015 20:58:46 +0000 (UTC) Received: (qmail 37574 invoked by uid 99); 11 Nov 2015 20:58:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Nov 2015 20:58:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 98E29E5708; Wed, 11 Nov 2015 20:58:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: madhan@apache.org To: commits@ranger.incubator.apache.org Date: Wed, 11 Nov 2015 20:58:51 -0000 Message-Id: <8b17e18cd0ba41d282d25f007d911c43@git.apache.org> In-Reply-To: <723bb2b937bc4c419e28820f14381dc8@git.apache.org> References: <723bb2b937bc4c419e28820f14381dc8@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/10] incubator-ranger git commit: RANGER-726: Updated tagsync for recent changes in Atlas API RANGER-726: Updated tagsync for recent changes in Atlas API Signed-off-by: Madhan Neethiraj Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/49e890e2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/49e890e2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/49e890e2 Branch: refs/heads/tag-policy Commit: 49e890e26360c742ccbf80d2741df7bec48c6319 Parents: 5b86864 Author: Abhay Kulkarni Authored: Mon Nov 9 18:50:52 2015 -0800 Committer: Madhan Neethiraj Committed: Wed Nov 11 12:29:47 2015 -0800 ---------------------------------------------------------------------- .../atlas-client-0.5.0.2.3.1.1-19.jar | Bin 34558 -> 0 bytes .../atlas-notification-0.5.0.2.3.1.1-19.jar | Bin 34734 -> 0 bytes .../atlas-typesystem-0.5.0.2.3.1.1-19.jar | Bin 355350 -> 0 bytes pom.xml | 4 +- src/main/assembly/tagsync.xml | 3 + .../conf/templates/installprop2xml.properties | 6 +- tagsync/pom.xml | 18 + tagsync/scripts/install.properties | 2 +- tagsync/scripts/setup.py | 5 +- .../source/atlas/AtlasNotificationMapper.java | 97 +++-- .../tagsync/source/atlas/AtlasUtility.java | 404 ------------------- .../tagsync/source/atlas/TagAtlasSource.java | 78 ++-- 12 files changed, 132 insertions(+), 485 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/local-repo/org/apache/atlas/atlas-client/0.5.0.2.3.1.1-19/atlas-client-0.5.0.2.3.1.1-19.jar ---------------------------------------------------------------------- diff --git a/local-repo/org/apache/atlas/atlas-client/0.5.0.2.3.1.1-19/atlas-client-0.5.0.2.3.1.1-19.jar b/local-repo/org/apache/atlas/atlas-client/0.5.0.2.3.1.1-19/atlas-client-0.5.0.2.3.1.1-19.jar deleted file mode 100644 index 1fb2ef7..0000000 Binary files a/local-repo/org/apache/atlas/atlas-client/0.5.0.2.3.1.1-19/atlas-client-0.5.0.2.3.1.1-19.jar and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/local-repo/org/apache/atlas/atlas-notification/0.5.0.2.3.1.1-19/atlas-notification-0.5.0.2.3.1.1-19.jar ---------------------------------------------------------------------- diff --git a/local-repo/org/apache/atlas/atlas-notification/0.5.0.2.3.1.1-19/atlas-notification-0.5.0.2.3.1.1-19.jar b/local-repo/org/apache/atlas/atlas-notification/0.5.0.2.3.1.1-19/atlas-notification-0.5.0.2.3.1.1-19.jar deleted file mode 100644 index 848eeb3..0000000 Binary files a/local-repo/org/apache/atlas/atlas-notification/0.5.0.2.3.1.1-19/atlas-notification-0.5.0.2.3.1.1-19.jar and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/local-repo/org/apache/atlas/atlas-typesystem/0.5.0.2.3.1.1-19/atlas-typesystem-0.5.0.2.3.1.1-19.jar ---------------------------------------------------------------------- diff --git a/local-repo/org/apache/atlas/atlas-typesystem/0.5.0.2.3.1.1-19/atlas-typesystem-0.5.0.2.3.1.1-19.jar b/local-repo/org/apache/atlas/atlas-typesystem/0.5.0.2.3.1.1-19/atlas-typesystem-0.5.0.2.3.1.1-19.jar deleted file mode 100644 index f619b6e..0000000 Binary files a/local-repo/org/apache/atlas/atlas-typesystem/0.5.0.2.3.1.1-19/atlas-typesystem-0.5.0.2.3.1.1-19.jar and /dev/null differ http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 0648d67..d60fca4 100644 --- a/pom.xml +++ b/pom.xml @@ -213,7 +213,7 @@ 2.2.2 1.9.13 1.19 - 0.5.0.2.3.1.1-19 + 0.6-incubating-SNAPSHOT apache.staging.https Apache Release Distribution Repository https://repository.apache.org/service/local/staging/deploy/maven2 @@ -264,6 +264,7 @@ false + http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/src/main/assembly/tagsync.xml ---------------------------------------------------------------------- diff --git a/src/main/assembly/tagsync.xml b/src/main/assembly/tagsync.xml index 331dae0..8adc5cc 100644 --- a/src/main/assembly/tagsync.xml +++ b/src/main/assembly/tagsync.xml @@ -56,7 +56,10 @@ org.apache.atlas:atlas-notification org.apache.atlas:atlas-typesystem org.apache.atlas:atlas-client + org.apache.atlas:atlas-common com.google.inject:guice + com.google.inject.extensions:guice-multibindings + org.codehaus.jettison:jettison aopalliance:aopalliance javax.inject:javax.inject org.apache.kafka:kafka_2.10 http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/tagsync/conf/templates/installprop2xml.properties ---------------------------------------------------------------------- diff --git a/tagsync/conf/templates/installprop2xml.properties b/tagsync/conf/templates/installprop2xml.properties index 5b63835..101a1ba 100644 --- a/tagsync/conf/templates/installprop2xml.properties +++ b/tagsync/conf/templates/installprop2xml.properties @@ -34,8 +34,8 @@ TAG_SOURCE_FILE_MOD_TIME_CHECK_INTERVAL = ranger.tagsync.filesource.modtime.chec TAGSYNC_FILESOURCE_FILENAME = ranger.tagsync.filesource.filename -TAGSYNC_ATLAS_KAFKA_ENDPOINTS = atlas.notification.kafka.bootstrap.servers -TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = atlas.notification.kafka.zookeeper.connect -TAGSYNC_ATLAS_CONSUMER_GROUP = atlas.notification.kafka.group.id +TAGSYNC_ATLAS_KAFKA_ENDPOINTS = atlas.kafka.bootstrap.servers +TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = atlas.kafka.zookeeper.connect +TAGSYNC_ATLAS_CONSUMER_GROUP = atlas.kafka.entities.group.id TAGSYNC_ATLAS_TO_RANGER_SERVICE_MAPPING = ranger.tagsync.atlas.to.service.mapping http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/tagsync/pom.xml ---------------------------------------------------------------------- diff --git a/tagsync/pom.xml b/tagsync/pom.xml index b800f61..c860c4a 100644 --- a/tagsync/pom.xml +++ b/tagsync/pom.xml @@ -30,8 +30,10 @@ 0.5.0 + UTF-8 @@ -107,6 +110,16 @@ 4.0 + com.google.inject.extensions + guice-multibindings + 4.0 + + + org.codehaus.jettison + jettison + 1.1 + + org.apache.atlas atlas-notification ${atlas.version} @@ -121,5 +134,10 @@ atlas-client ${atlas.version} + + org.apache.atlas + atlas-common + ${atlas.version} + http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/tagsync/scripts/install.properties ---------------------------------------------------------------------- diff --git a/tagsync/scripts/install.properties b/tagsync/scripts/install.properties index f7de6e3..b5ad580 100644 --- a/tagsync/scripts/install.properties +++ b/tagsync/scripts/install.properties @@ -53,7 +53,7 @@ TAG_SOURCE_FILE_MOD_TIME_CHECK_INTERVAL = 60000 TAGSYNC_ATLAS_KAFKA_ENDPOINTS = localhost:6667 TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = localhost:2181 -TAGSYNC_ATLAS_CONSUMER_GROUP = entityConsumer +TAGSYNC_ATLAS_CONSUMER_GROUP = ranger_entities_consumer # Mapping from Atlas hive instance-name to Ranger service-name # this needs to be in format clusterName,componentType,serviceName;clusterName2,componentType2,serviceName2 http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/tagsync/scripts/setup.py ---------------------------------------------------------------------- diff --git a/tagsync/scripts/setup.py b/tagsync/scripts/setup.py index e4b2433..f7455b8 100755 --- a/tagsync/scripts/setup.py +++ b/tagsync/scripts/setup.py @@ -317,8 +317,9 @@ def main(): atlasOutFile = file(atlasOutFn, "a") atlasOutFile.write("atlas.notification.embedded=false" + "\n") - atlasOutFile.write("atlas.notification.kafka.acks=1" + "\n") - atlasOutFile.write("atlas.notification.kafka.data=${sys:atlas.home}/data/kafka" + "\n") + atlasOutFile.write("atlas.kafka.acks=1" + "\n") + atlasOutFile.write("atlas.kafka.data=${sys:atlas.home}/data/kafka" + "\n") + atlasOutFile.write("atlas.kafka.hook.group.id=atlas" + "\n") atlasOutFile.close() http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java index 8046b68..7925b5c 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java @@ -19,9 +19,10 @@ package org.apache.ranger.tagsync.source.atlas; +import org.apache.atlas.AtlasException; import org.apache.atlas.notification.entity.EntityNotification; -import org.apache.atlas.typesystem.api.Entity; -import org.apache.atlas.typesystem.api.Trait; +import org.apache.atlas.typesystem.IReferenceableInstance; +import org.apache.atlas.typesystem.IStruct; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -58,7 +59,9 @@ class AtlasNotificationMapper { properties = props; try { - if (isEntityMappable(entityNotification.getEntity())) { + IReferenceableInstance entity = entityNotification.getEntity(); + + if (isEntityMappable(entity)) { ret = createServiceTags(entityNotification); } else { if(LOG.isDebugEnabled()) { @@ -71,7 +74,7 @@ class AtlasNotificationMapper { return ret; } - static private boolean isEntityMappable(Entity entity) { + static private boolean isEntityMappable(IReferenceableInstance entity) { boolean ret = false; String entityTypeName = entity.getTypeName(); @@ -91,44 +94,43 @@ class AtlasNotificationMapper { ServiceTags ret = null; EntityNotification.OperationType opType = entityNotification.getOperationType(); - Entity entity = entityNotification.getEntity(); - String opName = entityNotification.getOperationType().name(); switch (opType) { - case ENTITY_CREATED: { - LOG.debug("ENTITY_CREATED notification is not handled, as Ranger will get necessary information from any subsequent TRAIT_ADDED notification"); + case ENTITY_CREATE: { + LOG.debug("ENTITY_CREATE notification is not handled, as Ranger will get necessary information from any subsequent TRAIT_ADDED notification"); break; } - case ENTITY_UPDATED: { - ret = getServiceTags(entity); + case ENTITY_UPDATE: { + ret = getServiceTags(entityNotification); if (MapUtils.isEmpty(ret.getTags())) { LOG.debug("No traits associated with this entity update notification. Ignoring it altogether"); ret = null; } break; } - case TRAIT_ADDED: - case TRAIT_DELETED: { - ret = getServiceTags(entity); + case TRAIT_ADD: + case TRAIT_DELETE: { + ret = getServiceTags(entityNotification); break; } default: - LOG.error(opName + ": unknown notification received - not handled"); + LOG.error(opType + ": unknown notification received - not handled"); } return ret; } - static private ServiceTags getServiceTags(Entity entity) throws Exception { + static private ServiceTags getServiceTags(EntityNotification entityNotification) throws Exception { ServiceTags ret = null; + IReferenceableInstance entity = entityNotification.getEntity(); List serviceResources = new ArrayList(); RangerServiceResource serviceResource = getServiceResource(entity); serviceResources.add(serviceResource); - Map tags = getTags(entity); + Map tags = getTags(entityNotification); Map tagDefs = getTagDefs(tags); @@ -163,7 +165,7 @@ class AtlasNotificationMapper { } - static private RangerServiceResource getServiceResource(Entity entity) throws Exception { + static private RangerServiceResource getServiceResource(IReferenceableInstance entity) throws Exception { RangerServiceResource ret = null; @@ -224,7 +226,7 @@ class AtlasNotificationMapper { ret = new RangerServiceResource(); - ret.setGuid(entity.getId().getGuid()); + ret.setGuid(entity.getId()._getId()); ret.setId(1L); ret.setServiceName(serviceName); ret.setResourceElements(elements); @@ -232,22 +234,24 @@ class AtlasNotificationMapper { return ret; } - static private Map getTags(Entity entity) { + static private Map getTags(EntityNotification entityNotification) { Map ret = null; - Map traits = entity.getTraits(); + ret = new HashMap(); + + long index = 1; + + List traits = entityNotification.getAllTraits(); + + for (IStruct trait : traits) { - if (MapUtils.isNotEmpty(traits)) { - ret = new HashMap(); - long index = 1; + String traitName = trait.getTypeName(); - for (Map.Entry entry : traits.entrySet()) { - String traitName = entry.getKey(); - Trait trait = entry.getValue(); + Map tagAttrValues = new HashMap(); - Map attrValues = trait.getValues(); + try { - Map tagAttrValues = new HashMap(); + Map attrValues = trait.getValuesMap(); for (Map.Entry attrValueEntry : attrValues.entrySet()) { String attrName = attrValueEntry.getKey(); @@ -259,14 +263,17 @@ class AtlasNotificationMapper { LOG.error("Cannot cast attribute-value to String, skipping... attrName=" + attrName); } } + } catch (AtlasException exception) { + LOG.error("Could not get values for trait:" + traitName, exception); + } - RangerTag tag = new RangerTag(); + RangerTag tag = new RangerTag(); - tag.setType(traitName); - tag.setAttributes(tagAttrValues); + tag.setType(traitName); + tag.setAttributes(tagAttrValues); + + ret.put(index++, tag); - ret.put(index++, tag); - } } return ret; @@ -289,13 +296,13 @@ class AtlasNotificationMapper { return ret; } - static private String[] getQualifiedNameComponents(Entity entity) { + static private String[] getQualifiedNameComponents(IReferenceableInstance entity) { String ret[] = new String[5]; if (StringUtils.equals(entity.getTypeName(), ENTITY_TYPE_HIVE_DB)) { - String clusterName = getAttribute(entity.getValues(), "clusterName", String.class); - String name = getAttribute(entity.getValues(), "name", String.class); + String clusterName = getEntityAttribute(entity, "clusterName", String.class); + String name = getEntityAttribute(entity, "name", String.class); ret[1] = clusterName; ret[2] = name; @@ -303,20 +310,20 @@ class AtlasNotificationMapper { ret[0] = ret[1] + "." + ret[2]; if (LOG.isDebugEnabled()) { - LOG.debug("----- Entity-Id:" + entity.getId().getGuid()); + LOG.debug("----- Entity-Id:" + entity.getId()._getId()); LOG.debug("----- Entity-Type-Name:" + entity.getTypeName()); LOG.debug("----- Entity-Cluster-Name:" + clusterName); LOG.debug("----- Entity-Name:" + name); } } else { - String qualifiedName = getAttribute(entity.getValues(), ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class); + String qualifiedName = getEntityAttribute(entity, ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class); String nameHierarchy[] = qualifiedName.split(QUALIFIED_NAME_FORMAT_DELIMITER_STRING); int hierarchyLevels = nameHierarchy.length; if (LOG.isDebugEnabled()) { - LOG.debug("----- Entity-Id:" + entity.getId().getGuid()); + LOG.debug("----- Entity-Id:" + entity.getId()._getId()); LOG.debug("----- Entity-Type-Name:" + entity.getTypeName()); LOG.debug("----- Entity-Qualified-Name:" + qualifiedName); LOG.debug("----- Entity-Qualified-Name-Components -----"); @@ -351,6 +358,18 @@ class AtlasNotificationMapper { return TagSyncConfig.getServiceName(apacheComponent, instanceName, properties); } + static private T getEntityAttribute(IReferenceableInstance entity, String name, Class type) { + T ret = null; + + try { + Map valueMap = entity.getValuesMap(); + ret = getAttribute(valueMap, name, type); + } catch (AtlasException exception) { + LOG.error("Cannot get map of values for entity: " + entity.getId()._getId(), exception); + } + + return ret; + } static private T getAttribute(Map map, String name, Class type) { return type.cast(map.get(name)); } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasUtility.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasUtility.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasUtility.java deleted file mode 100644 index 2548c36..0000000 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasUtility.java +++ /dev/null @@ -1,404 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ranger.tagsync.source.atlas; - -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; - -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.WebResource; -import org.apache.atlas.typesystem.EntityImpl; -import org.apache.atlas.typesystem.IdImpl; -import org.apache.atlas.typesystem.TraitImpl; -import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.atlas.typesystem.api.Entity; -import org.apache.atlas.typesystem.api.Trait; -import org.apache.ranger.admin.client.datatype.RESTResponse; -import org.apache.ranger.plugin.util.RangerRESTClient; -import org.apache.ranger.plugin.util.RangerRESTUtils; -import org.apache.ranger.tagsync.process.TagSyncConfig; - -import java.io.IOException; -import java.lang.reflect.Type; -import java.util.*; - - -// class AtlasUtil - -@SuppressWarnings("unchecked") -public class AtlasUtility { - - private static final Log LOG = LogFactory.getLog(AtlasUtility.class); - - // Atlas APIs - - public static final String API_ATLAS_TYPES = "api/atlas/types"; - public static final String API_ATLAS_ENTITIES = "api/atlas/entities?type="; - public static final String API_ATLAS_ENTITY = "api/atlas/entities/"; - public static final String API_ATLAS_TYPE = "api/atlas/types/"; - - public static final String RESULTS_ATTRIBUTE = "results"; - public static final String DEFINITION_ATTRIBUTE = "definition"; - public static final String VALUES_ATTRIBUTE = "values"; - public static final String TRAITS_ATTRIBUTE = "traits"; - public static final String TYPE_NAME_ATTRIBUTE = "typeName"; - public static final String TRAIT_TYPES_ATTRIBUTE = "traitTypes"; - public static final String SUPER_TYPES_ATTRIBUTE = "superTypes"; - public static final String ATTRIBUTE_DEFINITIONS_ATTRIBUTE = "attributeDefinitions"; - public static final String NAME_ATTRIBUTE = "name"; - - private Type mapType = new TypeToken>() { - }.getType(); - - private RangerRESTClient restClient; - private Map entities = new LinkedHashMap<>(); - - - // ----- Constructor ------------------------------------------------------ - - public AtlasUtility(Properties properties) { - - String url = TagSyncConfig.getAtlasEndpoint(properties); - String sslConfigFileName = TagSyncConfig.getAtlasSslConfigFileName(properties); - - if (LOG.isDebugEnabled()) { - LOG.debug("Initializing RangerRestClient with (url=" + url + ", sslConfigFileName" + sslConfigFileName + ")"); - } - - restClient = new RangerRESTClient(url, sslConfigFileName); - - if (LOG.isDebugEnabled()) { - LOG.debug("Initialized RangerRestClient with (url=" + url + ", sslConfigFileName=" + sslConfigFileName + ")"); - } - } - - // update the set of entities with current from Atlas - public void refreshAllEntities() { - if (LOG.isDebugEnabled()) { - LOG.debug("==> TagAtlasSource.refreshAllEntities()"); - } - - try { - entities.clear(); - entities.putAll(getAllEntities()); - } catch (IOException e) { - LOG.error("getAllEntities() failed", e); - } - if (LOG.isDebugEnabled()) { - LOG.debug("<== TagAtlasSource.refreshAllEntities()"); - } - } - - // ----- AtlasUtility ------------------------------------------------------ - - public Map getAllEntities() throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> TagAtlasSource.getAllEntities()"); - } - Map entities = new LinkedHashMap<>(); - - Map typesResponse = atlasAPI(API_ATLAS_TYPES); - - List types = getAttribute(typesResponse, RESULTS_ATTRIBUTE, List.class); - - for (String type : types) { - - Map entitiesResponse = atlasAPI(API_ATLAS_ENTITIES + type); - - List guids = getAttribute(entitiesResponse, RESULTS_ATTRIBUTE, List.class); - - for (String guid : guids) { - - if (StringUtils.isNotBlank(guid)) { - - Map> traitSuperTypes = new HashMap<>(); - - Map entityResponse = atlasAPI(API_ATLAS_ENTITY + guid); - - if (entityResponse.containsKey(DEFINITION_ATTRIBUTE)) { - String definitionJSON = getAttribute(entityResponse, DEFINITION_ATTRIBUTE, String.class); - - LOG.info("{"); - LOG.info(" \"entity-id\":" + guid + ","); - LOG.info(" \"entity-definition\":" + definitionJSON); - LOG.info("}"); - - Map definition = new Gson().fromJson(definitionJSON, mapType); - - Map values = getAttribute(definition, VALUES_ATTRIBUTE, Map.class); - Map traits = getAttribute(definition, TRAITS_ATTRIBUTE, Map.class); - String typeName = getAttribute(definition, TYPE_NAME_ATTRIBUTE, String.class); - - LOG.info("Received entity(typeName=" + typeName + ", id=" + guid + ")"); - - - Map traitMap = new HashMap<>(); - - if (MapUtils.isNotEmpty(traits)) { - - LOG.info("Traits for entity(typeName=" + typeName + ", id=" + guid + ") ------ "); - - for (Map.Entry entry : traits.entrySet()) { - - Map trait = (Map) entry.getValue(); - - Map traitValues = getAttribute(trait, VALUES_ATTRIBUTE, Map.class); - String traitTypeName = getAttribute(trait, TYPE_NAME_ATTRIBUTE, String.class); - - Map superTypes = getTraitSuperTypes(getTraitType(traitTypeName), traitValues); - - TraitImpl trait1 = new TraitImpl(traitTypeName, traitValues, superTypes); - - traitSuperTypes.put(trait1, superTypes); - - traitMap.put(entry.getKey(), trait1); - - - LOG.info(" Trait(typeName=" + traitTypeName + ")"); - - } - } else { - LOG.info("No traits for entity(typeName=" + typeName + ", id=" + guid + ")"); - } - EntityImpl entity = new EntityImpl(new IdImpl(guid, 0), typeName, values, traitMap); - - showEntity(entity); - - entities.put(guid, entity); - - } - } - } - } - if (LOG.isDebugEnabled()) { - LOG.debug("==> TagAtlasSource.getAllEntities()"); - } - return entities; - } - - - // ----- helper methods ---------------------------------------------------- - - private Map getTraitType(String traitName) - throws IOException { - - Map typeResponse = atlasAPI(API_ATLAS_TYPE + traitName); - - if (typeResponse.containsKey(DEFINITION_ATTRIBUTE)) { - String definitionJSON = getAttribute(typeResponse, DEFINITION_ATTRIBUTE, String.class); - - Map definition = new Gson().fromJson(definitionJSON, mapType); - - List traitTypes = getAttribute(definition, TRAIT_TYPES_ATTRIBUTE, List.class); - - if (traitTypes.size() > 0) { - return (Map) traitTypes.get(0); - } - } - return null; - } - - private Map getTraitSuperTypes(Map traitType, Map values) - throws IOException { - - Map superTypes = new HashMap<>(); - - if (traitType != null) { - - List superTypeNames = getAttribute(traitType, SUPER_TYPES_ATTRIBUTE, List.class); - - for (String superTypeName : superTypeNames) { - - Map superTraitType = getTraitType(superTypeName); - - if (superTraitType != null) { - List> attributeDefinitions = (List) superTraitType.get(ATTRIBUTE_DEFINITIONS_ATTRIBUTE); - - Map superTypeValues = new HashMap<>(); - for (Map attributeDefinition : attributeDefinitions) { - - String attributeName = attributeDefinition.get(NAME_ATTRIBUTE).toString(); - if (values.containsKey(attributeName)) { - superTypeValues.put(attributeName, values.get(attributeName)); - } - } - - superTypes.put(superTypeName, - //new TraitImpl(getTraitSuperTypes(superTraitType, superTypeValues), superTypeValues, superTypeName)); - new TraitImpl(superTypeName, superTypeValues, getTraitSuperTypes(superTraitType, superTypeValues))); - } - } - } - return superTypes; - } - - - /* - private Map atlasAPI(String endpoint) throws IOException { - InputStream in = streamProvider.readFrom(atlasEndpoint + endpoint, "GET", (String) null, Collections.emptyMap()); - return new Gson().fromJson(IOUtils.toString(in, "UTF-8"), mapType); - } - */ - - - private Map atlasAPI(String endpoint) { - if (LOG.isDebugEnabled()) { - LOG.debug("==> TagAtlasSource.atlasAPI(" + endpoint + ")"); - } - // Create a REST client and perform a get on it - Map ret = new HashMap(); - - WebResource webResource = restClient.getResource(endpoint); - - ClientResponse response = webResource.accept(RangerRESTUtils.REST_MIME_TYPE_JSON).get(ClientResponse.class); - - if (response != null && response.getStatus() == 200) { - ret = response.getEntity(ret.getClass()); - } else { - LOG.error("Atlas REST call returned with response={" + response + "}"); - - RESTResponse resp = RESTResponse.fromClientResponse(response); - LOG.error("Error getting Atlas Entity. request=" + webResource.toString() - + ", response=" + resp.toString()); - } - if (LOG.isDebugEnabled()) { - LOG.debug("<== TagAtlasSource.atlasAPI(" + endpoint + ")"); - } - return ret; - } - - private T getAttribute(Map map, String name, Class type) { - return type.cast(map.get(name)); - } - - public void showEntity(Entity entity) { - - LOG.debug("Entity-id :" + entity.getId()); - - LOG.debug("Type: " + entity.getTypeName()); - - LOG.debug("----- Values -----"); - - for (Map.Entry entry : entity.getValues().entrySet()) { - LOG.debug(" Name: " + entry.getKey() + ""); - Object value = entry.getValue(); - LOG.debug(" Value: " + getValue(value, entities.keySet())); - } - - LOG.debug("----- Traits -----"); - - for (String traitName : entity.getTraits().keySet()) { - LOG.debug(" Name:" + entity.getId() + ", trait=" + traitName + ">" + traitName); - } - - } - - public void showTrait(Entity entity, String traitId) { - - String[] traitNames = traitId.split(","); - - Trait trait = entity.getTraits().get(traitNames[0]); - - for (int i = 1; i < traitNames.length; ++i) { - trait = trait.getSuperTypes().get(traitNames[i]); - } - - String typeName = trait.getTypeName(); - - LOG.debug("Trait " + typeName + " for Entity id=" + entity.getId()); - - LOG.debug("Type: " + typeName); - - LOG.debug("----- Values ------"); - - for (Map.Entry entry : trait.getValues().entrySet()) { - LOG.debug("Name:" + entry.getKey()); - Object value = entry.getValue(); - LOG.debug("Value:" + getValue(value, entities.keySet())); - } - - LOG.debug("Super Traits"); - - - for (String traitName : trait.getSuperTypes().keySet()) { - LOG.debug("Name=" + entity.getId() + "&trait=" + traitId + "," + traitName + ">" + traitName); - } - } - - // resolve the given value if necessary - private String getValue(Object value, Set ids) { - if (value == null) { - return ""; - } - String idString = getIdValue(value, ids); - if (idString != null) { - return idString; - } - - idString = getIdListValue(value, ids); - if (idString != null) { - return idString; - } - - return value.toString(); - } - - // get an id from the given value; return null if the value is not an id type - private String getIdValue(Object value, Set ids) { - if (value instanceof Map) { - Map map = (Map) value; - if (map.size() == 3 && map.containsKey("id")) { - String id = map.get("id").toString(); - if (ids.contains(id)) { - return id; - } - } - } - return null; - } - - // get an id list from the given value; return null if the value is not an id list type - private String getIdListValue(Object value, Set ids) { - if (value instanceof List) { - List list = (List) value; - if (list.size() > 0) { - StringBuilder sb = new StringBuilder(); - for (Object o : list) { - String idString = getIdValue(o, ids); - if (idString == null) { - return value.toString(); - } - if (sb.length() > 0) { - sb.append(", "); - } - sb.append(idString); - } - return sb.toString(); - } - } - return null; - } -} - - - http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/49e890e2/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java index 2725b23..fd64d12 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/TagAtlasSource.java @@ -23,17 +23,21 @@ import com.google.gson.Gson; import com.google.inject.Guice; import com.google.inject.Injector; +import com.google.inject.Provider; +import org.apache.atlas.AtlasException; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + +import org.apache.atlas.notification.NotificationConsumer; +import org.apache.atlas.notification.NotificationInterface; import org.apache.atlas.notification.NotificationModule; import org.apache.atlas.notification.entity.EntityNotification; -import org.apache.atlas.notification.entity.EntityNotificationConsumer; -import org.apache.atlas.notification.entity.EntityNotificationConsumerProvider; -import org.apache.atlas.typesystem.api.Entity; -import org.apache.atlas.typesystem.api.Trait; +import org.apache.atlas.typesystem.IReferenceableInstance; +import org.apache.atlas.typesystem.IStruct; + import org.apache.ranger.tagsync.model.TagSink; import org.apache.ranger.tagsync.model.TagSource; import org.apache.ranger.plugin.util.ServiceTags; @@ -47,9 +51,9 @@ public class TagAtlasSource implements TagSource { public static final String TAGSYNC_ATLAS_PROPERTIES_FILE_NAME = "application.properties"; - public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS = "atlas.notification.kafka.bootstrap.servers"; - public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = "atlas.notification.kafka.zookeeper.connect"; - public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = "atlas.notification.kafka.group.id"; + public static final String TAGSYNC_ATLAS_KAFKA_ENDPOINTS = "atlas.kafka.bootstrap.servers"; + public static final String TAGSYNC_ATLAS_ZOOKEEPER_ENDPOINT = "atlas.kafka.zookeeper.connect"; + public static final String TAGSYNC_ATLAS_CONSUMER_GROUP = "atlas.kafka.entities.group.id"; private TagSink tagSink; private Properties properties; @@ -112,9 +116,11 @@ public class TagAtlasSource implements TagSource { Injector injector = Guice.createInjector(notificationModule); - EntityNotificationConsumerProvider consumerProvider = injector.getInstance(EntityNotificationConsumerProvider.class); + Provider consumerProvider = injector.getProvider(NotificationInterface.class); + NotificationInterface notification = consumerProvider.get(); + List> iterators = notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1); - consumerTask = new ConsumerRunnable(consumerProvider.get()); + consumerTask = new ConsumerRunnable(iterators.get(0)); } if (LOG.isDebugEnabled()) { @@ -158,26 +164,25 @@ public class TagAtlasSource implements TagSource { private class ConsumerRunnable implements Runnable { - private final EntityNotificationConsumer consumer; + private final Iterator consumerIterator; - private ConsumerRunnable(EntityNotificationConsumer consumer) { - this.consumer = consumer; + private ConsumerRunnable(Iterator consumerIterator) { + this.consumerIterator = consumerIterator; } - // ----- Runnable -------------------------------------------------------- @Override public void run() { - while (consumer.hasNext()) { + while (consumerIterator.hasNext()) { try { - EntityNotification notification = consumer.next(); + EntityNotification notification = consumerIterator.next(); if (notification != null) { printNotification(notification); ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification, properties); if (serviceTags == null) { if(LOG.isDebugEnabled()) { - LOG.debug("Did not create ServiceTags structure for notification type:" + notification.getOperationType().name()); + LOG.debug("Did not create ServiceTags structure for notification type:" + notification.getOperationType()); } } else { if (LOG.isDebugEnabled()) { @@ -199,33 +204,36 @@ public class TagAtlasSource implements TagSource { } public void printNotification(EntityNotification notification) { - Entity entity = notification.getEntity(); + IReferenceableInstance entity = notification.getEntity(); if (LOG.isDebugEnabled()) { - LOG.debug("Notification-Type: " + notification.getOperationType().name()); - LOG.debug("Entity-Id: " + entity.getId().getGuid()); - LOG.debug("Entity-Type: " + entity.getTypeName()); + try { + LOG.debug("Notification-Type: " + notification.getOperationType()); + LOG.debug("Entity-Id: " + entity.getId()._getId()); + LOG.debug("Entity-Type: " + entity.getTypeName()); - LOG.debug("----------- Entity Values ----------"); + LOG.debug("----------- Entity Values ----------"); - for (Map.Entry entry : entity.getValues().entrySet()) { - LOG.debug(" Name:" + entry.getKey()); - Object value = entry.getValue(); - LOG.debug(" Value:" + value); - } + for (Map.Entry entry : entity.getValuesMap().entrySet()) { + LOG.debug(" Name:" + entry.getKey()); + Object value = entry.getValue(); + LOG.debug(" Value:" + value); + } - LOG.debug("----------- Entity Traits ----------"); + LOG.debug("----------- Entity Traits ----------"); + List traits = notification.getAllTraits(); - for (Map.Entry entry : entity.getTraits().entrySet()) { - LOG.debug(" Trait-Name:" + entry.getKey()); - Trait trait = entry.getValue(); - LOG.debug(" Trait-Type:" + trait.getTypeName()); - Map traitValues = trait.getValues(); - for (Map.Entry valueEntry : traitValues.entrySet()) { - LOG.debug(" Trait-Value-Name:" + valueEntry.getKey()); - LOG.debug(" Trait-Value:" + valueEntry.getValue()); + for (IStruct trait : traits) { + LOG.debug(" Trait-Type-Name:" + trait.getTypeName()); + Map traitValues = trait.getValuesMap(); + for (Map.Entry valueEntry : traitValues.entrySet()) { + LOG.debug(" Trait-Value-Name:" + valueEntry.getKey()); + LOG.debug(" Trait-Value:" + valueEntry.getValue()); + } } + } catch (AtlasException exception) { + LOG.error("Cannot print notification - ", exception); } } }