Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BA08C200B35 for ; Tue, 21 Jun 2016 07:41:54 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B88E6160A65; Tue, 21 Jun 2016 05:41:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 92216160A55 for ; Tue, 21 Jun 2016 07:41:52 +0200 (CEST) Received: (qmail 81651 invoked by uid 500); 21 Jun 2016 05:41:51 -0000 Mailing-List: contact commits-help@atlas.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@atlas.incubator.apache.org Delivered-To: mailing list commits@atlas.incubator.apache.org Received: (qmail 81642 invoked by uid 99); 21 Jun 2016 05:41:51 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Jun 2016 05:41:51 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id E4886C0E26 for ; Tue, 21 Jun 2016 05:41:50 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 5YgG09TMETzN for ; Tue, 21 Jun 2016 05:41:39 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id 961B95F239 for ; Tue, 21 Jun 2016 05:41:37 +0000 (UTC) Received: (qmail 81337 invoked by uid 99); 21 Jun 2016 05:41:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Jun 2016 05:41:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C5ED7E01BD; Tue, 21 Jun 2016 05:41:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: shwethags@apache.org To: commits@atlas.incubator.apache.org Date: Tue, 21 Jun 2016 05:41:38 -0000 Message-Id: <560bf11d12d64b0fbd37621e13603d81@git.apache.org> In-Reply-To: <0848f07801ec45b0b8746fdaf7173eb2@git.apache.org> References: <0848f07801ec45b0b8746fdaf7173eb2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/5] incubator-atlas git commit: ATLAS-819 All user defined types should have a set of common attributes (shwethags) archived-at: Tue, 21 Jun 2016 05:41:54 -0000 ATLAS-819 All user defined types should have a set of common attributes (shwethags) Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/d838cf38 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/d838cf38 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/d838cf38 Branch: refs/heads/0.7-incubating Commit: d838cf3852c1229dc4f70ef41588e8387fbf780a Parents: f4670dd Author: Shwetha GS Authored: Tue Jun 21 10:10:20 2016 +0530 Committer: Shwetha GS Committed: Tue Jun 21 10:13:49 2016 +0530 ---------------------------------------------------------------------- .../org/apache/atlas/falcon/Util/EventUtil.java | 17 ++- .../atlas/falcon/bridge/FalconBridge.java | 108 +++++++++---------- .../apache/atlas/falcon/event/FalconEvent.java | 9 +- .../apache/atlas/falcon/hook/FalconHook.java | 15 ++- .../falcon/model/FalconDataModelGenerator.java | 46 ++------ .../atlas/falcon/service/AtlasService.java | 6 +- .../apache/atlas/falcon/hook/FalconHookIT.java | 10 ++ .../org/apache/atlas/fs/model/FSDataModel.scala | 5 +- .../atlas/hive/bridge/HiveMetaStoreBridge.java | 27 ++--- .../org/apache/atlas/hive/hook/HiveHook.java | 6 +- .../hive/model/HiveDataModelGenerator.java | 18 +--- .../org/apache/atlas/hive/hook/HiveHookIT.java | 13 ++- .../org/apache/atlas/sqoop/hook/SqoopHook.java | 10 +- .../sqoop/model/SqoopDataModelGenerator.java | 7 +- .../apache/atlas/storm/hook/StormAtlasHook.java | 18 ++-- .../atlas/storm/model/StormDataModel.scala | 19 +--- .../main/java/org/apache/atlas/AtlasClient.java | 11 +- distro/src/conf/atlas-log4j.xml | 2 +- release-log.txt | 1 + .../graph/GraphBackedMetadataRepository.java | 18 ++-- .../graph/GraphBackedSearchIndexer.java | 2 +- .../atlas/services/DefaultMetadataService.java | 32 +++--- .../typesystem/types/StructTypeDefinition.java | 9 +- .../atlas/web/resources/EntityResource.java | 26 +++-- .../atlas/web/resources/TypesResource.java | 4 +- 25 files changed, 200 insertions(+), 239 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d838cf38/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/Util/EventUtil.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/Util/EventUtil.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/Util/EventUtil.java index c1ccd05..ef56340 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/Util/EventUtil.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/Util/EventUtil.java @@ -21,9 +21,7 @@ package org.apache.atlas.falcon.Util; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.security.CurrentUser; -import org.apache.hadoop.security.UserGroupInformation; -import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -45,20 +43,19 @@ public final class EventUtil { String[] tags = keyValueString.split(","); for (String tag : tags) { int index = tag.indexOf("="); - String tagKey = tag.substring(0, index); - String tagValue = tag.substring(index + 1, tag.length()); + String tagKey = tag.substring(0, index).trim(); + String tagValue = tag.substring(index + 1, tag.length()).trim(); keyValueMap.put(tagKey, tagValue); } return keyValueMap; } - public static UserGroupInformation getUgi() throws FalconException { - UserGroupInformation ugi; + public static String getUser() throws FalconException { try { - ugi = CurrentUser.getAuthenticatedUGI(); - } catch (IOException ioe) { - throw new FalconException(ioe); + return CurrentUser.getAuthenticatedUGI().getShortUserName(); + } catch (Exception ioe) { + //Ignore is failed to get user, uses login user } - return ugi; + return null; } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d838cf38/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java index 1621d95..1ed9619 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java @@ -20,6 +20,7 @@ package org.apache.atlas.falcon.bridge; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasConstants; +import org.apache.atlas.falcon.Util.EventUtil; import org.apache.atlas.falcon.model.FalconDataModelGenerator; import org.apache.atlas.falcon.model.FalconDataTypes; import org.apache.atlas.fs.model.FSDataTypes; @@ -28,7 +29,6 @@ import org.apache.atlas.hive.model.HiveDataModelGenerator; import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.typesystem.Referenceable; import org.apache.commons.lang3.StringUtils; -import org.apache.atlas.falcon.Util.EventUtil; import org.apache.falcon.entity.CatalogStorage; import org.apache.falcon.entity.FeedHelper; import org.apache.falcon.entity.FileSystemStorage; @@ -49,7 +49,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -66,21 +65,21 @@ public class FalconBridge { * @param cluster ClusterEntity * @return cluster instance reference */ - public static Referenceable createClusterEntity(final org.apache.falcon.entity.v0.cluster.Cluster cluster, - final String user, - final Date timestamp) throws Exception { + public static Referenceable createClusterEntity(final org.apache.falcon.entity.v0.cluster.Cluster cluster) + throws Exception { LOG.info("Creating cluster Entity : {}", cluster.getName()); Referenceable clusterRef = new Referenceable(FalconDataTypes.FALCON_CLUSTER.getName()); - clusterRef.set(FalconDataModelGenerator.NAME, cluster.getName()); - clusterRef.set("description", cluster.getDescription()); + clusterRef.set(AtlasClient.NAME, cluster.getName()); + clusterRef.set(AtlasClient.DESCRIPTION, cluster.getDescription()); clusterRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, cluster.getName()); - clusterRef.set(FalconDataModelGenerator.TIMESTAMP, timestamp); clusterRef.set(FalconDataModelGenerator.COLO, cluster.getColo()); - clusterRef.set(FalconDataModelGenerator.USER, user); + if (cluster.getACL() != null) { + clusterRef.set(AtlasClient.OWNER, cluster.getACL().getGroup()); + } if (StringUtils.isNotEmpty(cluster.getTags())) { clusterRef.set(FalconDataModelGenerator.TAGS, @@ -90,34 +89,34 @@ public class FalconBridge { return clusterRef; } - private static Referenceable createFeedEntity(Feed feed, Referenceable clusterReferenceable, - String user, Date timestamp) throws Exception { + private static Referenceable createFeedEntity(Feed feed, Referenceable clusterReferenceable) throws Exception { LOG.info("Creating feed dataset: {}", feed.getName()); - Referenceable datasetReferenceable = new Referenceable(FalconDataTypes.FALCON_FEED.getName()); - datasetReferenceable.set(FalconDataModelGenerator.NAME, feed.getName()); + Referenceable feedEntity = new Referenceable(FalconDataTypes.FALCON_FEED.getName()); + feedEntity.set(AtlasClient.NAME, feed.getName()); + feedEntity.set(AtlasClient.DESCRIPTION, feed.getDescription()); String feedQualifiedName = - getFeedQualifiedName(feed.getName(), (String) clusterReferenceable.get(FalconDataModelGenerator.NAME)); - datasetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, feedQualifiedName); - datasetReferenceable.set(FalconDataModelGenerator.TIMESTAMP, timestamp); - - datasetReferenceable.set(FalconDataModelGenerator.STOREDIN, clusterReferenceable); - datasetReferenceable.set(FalconDataModelGenerator.USER, user); + getFeedQualifiedName(feed.getName(), (String) clusterReferenceable.get(AtlasClient.NAME)); + feedEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, feedQualifiedName); + feedEntity.set(FalconDataModelGenerator.FREQUENCY, feed.getFrequency().toString()); + feedEntity.set(FalconDataModelGenerator.STOREDIN, clusterReferenceable); + if (feed.getACL() != null) { + feedEntity.set(AtlasClient.OWNER, feed.getACL().getOwner()); + } if (StringUtils.isNotEmpty(feed.getTags())) { - datasetReferenceable.set(FalconDataModelGenerator.TAGS, + feedEntity.set(FalconDataModelGenerator.TAGS, EventUtil.convertKeyValueStringToMap(feed.getTags())); } if (feed.getGroups() != null) { - datasetReferenceable.set(FalconDataModelGenerator.GROUPS, feed.getGroups()); + feedEntity.set(FalconDataModelGenerator.GROUPS, feed.getGroups()); } - return datasetReferenceable; + return feedEntity; } - public static List createFeedCreationEntity(Feed feed, ConfigurationStore falconStore, String user, - Date timestamp) throws Exception { + public static List createFeedCreationEntity(Feed feed, ConfigurationStore falconStore) throws Exception { LOG.info("Creating feed : {}", feed.getName()); List entities = new ArrayList<>(); @@ -143,7 +142,7 @@ public class FalconBridge { } List outputs = new ArrayList<>(); - Referenceable feedEntity = createFeedEntity(feed, clusterReferenceable, user, timestamp); + Referenceable feedEntity = createFeedEntity(feed, clusterReferenceable); if (feedEntity != null) { entities.add(feedEntity); outputs.add(feedEntity); @@ -153,19 +152,18 @@ public class FalconBridge { Referenceable feedCreateEntity = new Referenceable(FalconDataTypes.FALCON_FEED_CREATION.getName()); String feedQualifiedName = getFeedQualifiedName(feed.getName(), cluster.getName()); - feedCreateEntity.set(FalconDataModelGenerator.NAME, feed.getName()); + feedCreateEntity.set(AtlasClient.NAME, feed.getName()); + feedCreateEntity.set(AtlasClient.DESCRIPTION, "Feed creation - " + feed.getName()); feedCreateEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, feedQualifiedName); - feedCreateEntity.set(FalconDataModelGenerator.TIMESTAMP, timestamp); if (!inputs.isEmpty()) { - feedCreateEntity.set(FalconDataModelGenerator.INPUTS, inputs); + feedCreateEntity.set(AtlasClient.PROCESS_ATTRIBUTE_INPUTS, inputs); } if (!outputs.isEmpty()) { - feedCreateEntity.set(FalconDataModelGenerator.OUTPUTS, outputs); + feedCreateEntity.set(AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS, outputs); } feedCreateEntity.set(FalconDataModelGenerator.STOREDIN, clusterReferenceable); - feedCreateEntity.set(FalconDataModelGenerator.USER, user); entities.add(feedCreateEntity); } @@ -180,13 +178,11 @@ public class FalconBridge { Referenceable feedReplicationEntity = new Referenceable(FalconDataTypes .FALCON_FEED_REPLICATION.getName()); - feedReplicationEntity.set(FalconDataModelGenerator.NAME, feed.getName()); + feedReplicationEntity.set(AtlasClient.NAME, feed.getName()); feedReplicationEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, feed.getName()); - feedReplicationEntity.set(FalconDataModelGenerator.TIMESTAMP, timestamp); - feedReplicationEntity.set(FalconDataModelGenerator.INPUTS, replicationInputs); - feedReplicationEntity.set(FalconDataModelGenerator.OUTPUTS, replicationOutputs); - feedReplicationEntity.set(FalconDataModelGenerator.USER, user); + feedReplicationEntity.set(AtlasClient.PROCESS_ATTRIBUTE_INPUTS, replicationInputs); + feedReplicationEntity.set(AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS, replicationOutputs); entities.add(feedReplicationEntity); } @@ -205,8 +201,7 @@ public class FalconBridge { * + */ public static List createProcessEntity(org.apache.falcon.entity.v0.process.Process process, - ConfigurationStore falconStore, String user, - Date timestamp) throws Exception { + ConfigurationStore falconStore) throws Exception { LOG.info("Creating process Entity : {}", process.getName()); // The requirement is for each cluster, create a process entity with name @@ -224,8 +219,8 @@ public class FalconBridge { List inputs = new ArrayList<>(); if (process.getInputs() != null) { for (Input input : process.getInputs().getInputs()) { - Referenceable inputReferenceable = getFeedDataSetReference(getFeedQualifiedName(input.getFeed(), - cluster.getName()), clusterReferenceable); + Feed feed = falconStore.get(EntityType.FEED, input.getFeed()); + Referenceable inputReferenceable = getFeedDataSetReference(feed, clusterReferenceable); entities.add(inputReferenceable); inputs.add(inputReferenceable); } @@ -234,8 +229,8 @@ public class FalconBridge { List outputs = new ArrayList<>(); if (process.getOutputs() != null) { for (Output output : process.getOutputs().getOutputs()) { - Referenceable outputReferenceable = getFeedDataSetReference(getFeedQualifiedName(output.getFeed(), - cluster.getName()), clusterReferenceable); + Feed feed = falconStore.get(EntityType.FEED, output.getFeed()); + Referenceable outputReferenceable = getFeedDataSetReference(feed, clusterReferenceable); entities.add(outputReferenceable); outputs.add(outputReferenceable); } @@ -244,23 +239,25 @@ public class FalconBridge { if (!inputs.isEmpty() || !outputs.isEmpty()) { Referenceable processEntity = new Referenceable(FalconDataTypes.FALCON_PROCESS.getName()); - processEntity.set(FalconDataModelGenerator.NAME, process.getName()); + processEntity.set(AtlasClient.NAME, process.getName()); processEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getProcessQualifiedName(process.getName(), cluster.getName())); - processEntity.set(FalconDataModelGenerator.TIMESTAMP, timestamp); + processEntity.set(FalconDataModelGenerator.FREQUENCY, process.getFrequency().toString()); if (!inputs.isEmpty()) { - processEntity.set(FalconDataModelGenerator.INPUTS, inputs); + processEntity.set(AtlasClient.PROCESS_ATTRIBUTE_INPUTS, inputs); } if (!outputs.isEmpty()) { - processEntity.set(FalconDataModelGenerator.OUTPUTS, outputs); + processEntity.set(AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS, outputs); } // set cluster processEntity.set(FalconDataModelGenerator.RUNSON, clusterReferenceable); // Set user - processEntity.set(FalconDataModelGenerator.USER, user); + if (process.getACL() != null) { + processEntity.set(AtlasClient.OWNER, process.getACL().getOwner()); + } if (StringUtils.isNotEmpty(process.getTags())) { processEntity.set(FalconDataModelGenerator.TAGS, @@ -317,7 +314,7 @@ public class FalconBridge { // Path path = new Path(pathUri); // ref.set("name", path.getName()); //TODO - Fix after ATLAS-542 to shorter Name - ref.set(FalconDataModelGenerator.NAME, pathUri); + ref.set(AtlasClient.NAME, pathUri); ref.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, pathUri); ref.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName); entities.add(ref); @@ -328,7 +325,7 @@ public class FalconBridge { throws Exception { Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName()); dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName); - dbRef.set(HiveDataModelGenerator.NAME, dbName); + dbRef.set(AtlasClient.NAME, dbName); dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName)); return dbRef; @@ -343,7 +340,7 @@ public class FalconBridge { Referenceable tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName()); tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName)); - tableRef.set(HiveDataModelGenerator.NAME, tableName.toLowerCase()); + tableRef.set(AtlasClient.NAME, tableName.toLowerCase()); tableRef.set(HiveDataModelGenerator.DB, dbRef); entities.add(tableRef); @@ -354,20 +351,21 @@ public class FalconBridge { final String colo) { LOG.info("Getting reference for entity {}", clusterName); Referenceable clusterRef = new Referenceable(FalconDataTypes.FALCON_CLUSTER.getName()); - clusterRef.set(FalconDataModelGenerator.NAME, String.format("%s", clusterName)); + clusterRef.set(AtlasClient.NAME, String.format("%s", clusterName)); clusterRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, clusterName); clusterRef.set(FalconDataModelGenerator.COLO, colo); return clusterRef; } - private static Referenceable getFeedDataSetReference(final String feedDatasetName, - Referenceable clusterReference) { - LOG.info("Getting reference for entity {}", feedDatasetName); + private static Referenceable getFeedDataSetReference(Feed feed, Referenceable clusterReference) { + LOG.info("Getting reference for entity {}", feed.getName()); Referenceable feedDatasetRef = new Referenceable(FalconDataTypes.FALCON_FEED.getName()); - feedDatasetRef.set(FalconDataModelGenerator.NAME, feedDatasetName); - feedDatasetRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, feedDatasetName); + feedDatasetRef.set(AtlasClient.NAME, feed.getName()); + feedDatasetRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getFeedQualifiedName(feed.getName(), + (String) clusterReference.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME))); feedDatasetRef.set(FalconDataModelGenerator.STOREDIN, clusterReference); + feedDatasetRef.set(FalconDataModelGenerator.FREQUENCY, feed.getFrequency()); return feedDatasetRef; } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d838cf38/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/event/FalconEvent.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/event/FalconEvent.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/event/FalconEvent.java index 37df6da..e6203ed 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/event/FalconEvent.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/event/FalconEvent.java @@ -19,7 +19,6 @@ package org.apache.atlas.falcon.event; import org.apache.falcon.entity.v0.Entity; -import org.apache.hadoop.security.UserGroupInformation; import java.util.Date; @@ -28,14 +27,12 @@ import java.util.Date; */ public class FalconEvent { protected String user; - protected UserGroupInformation ugi; protected OPERATION operation; protected long timestamp; protected Entity entity; - public FalconEvent(String doAsUser, UserGroupInformation ugi, OPERATION falconOperation, long timestamp, Entity entity) { + public FalconEvent(String doAsUser, OPERATION falconOperation, long timestamp, Entity entity) { this.user = doAsUser; - this.ugi = ugi; this.operation = falconOperation; this.timestamp = timestamp; this.entity = entity; @@ -54,10 +51,6 @@ public class FalconEvent { return user; } - public UserGroupInformation getUgi() { - return ugi; - } - public OPERATION getOperation() { return operation; } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d838cf38/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java index 95f255e..0acd964 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java @@ -22,13 +22,13 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Guice; import com.google.inject.Injector; import org.apache.atlas.falcon.bridge.FalconBridge; +import org.apache.atlas.falcon.event.FalconEvent; +import org.apache.atlas.falcon.publisher.FalconEventPublisher; import org.apache.atlas.hook.AtlasHook; import org.apache.atlas.notification.NotificationInterface; import org.apache.atlas.notification.NotificationModule; import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.falcon.event.FalconEvent; -import org.apache.atlas.falcon.publisher.FalconEventPublisher; import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Process; @@ -151,7 +151,7 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher { Operation op = getOperation(event.getOperation()); String user = getUser(event.getUser()); - LOG.info("fireAndForget user:{}, ugi: {}", user, event.getUgi()); + LOG.info("fireAndForget user:{}", user); switch (op) { case ADD: messages.add(new HookNotification.EntityCreateRequest(user, createEntities(event, user))); @@ -167,18 +167,15 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher { switch (event.getOperation()) { case ADD_CLUSTER: entities.add(FalconBridge - .createClusterEntity((org.apache.falcon.entity.v0.cluster.Cluster) event.getEntity(), user, - event.getTimestamp())); + .createClusterEntity((org.apache.falcon.entity.v0.cluster.Cluster) event.getEntity())); break; case ADD_PROCESS: - entities.addAll(FalconBridge.createProcessEntity((Process) event.getEntity(), STORE, - user, event.getTimestamp())); + entities.addAll(FalconBridge.createProcessEntity((Process) event.getEntity(), STORE)); break; case ADD_FEED: - entities.addAll(FalconBridge.createFeedCreationEntity((Feed) event.getEntity(), STORE, - user, event.getTimestamp())); + entities.addAll(FalconBridge.createFeedCreationEntity((Feed) event.getEntity(), STORE)); break; case UPDATE_CLUSTER: http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d838cf38/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java index 81cd5e0..3250a23 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataModelGenerator.java @@ -50,13 +50,11 @@ import java.util.Map; public class FalconDataModelGenerator { private static final Logger LOG = LoggerFactory.getLogger(FalconDataModelGenerator.class); + public static final String FREQUENCY = "frequency"; private final Map> classTypeDefinitions; - public static final String NAME = "name"; - public static final String TIMESTAMP = "timestamp"; public static final String COLO = "colo"; - public static final String USER = "owner"; public static final String TAGS = "tags"; public static final String GROUPS = "groups"; public static final String PIPELINES = "pipelines"; @@ -64,10 +62,6 @@ public class FalconDataModelGenerator { public static final String RUNSON = "runs-on"; public static final String STOREDIN = "stored-in"; - // multiple inputs and outputs for process - public static final String INPUTS = "inputs"; - public static final String OUTPUTS = "outputs"; - public FalconDataModelGenerator() { classTypeDefinitions = new HashMap<>(); } @@ -78,8 +72,8 @@ public class FalconDataModelGenerator { // classes createClusterEntityClass(); createProcessEntityClass(); + createFeedCreationEntityClass(); createFeedEntityClass(); - createFeedDatasetClass(); createReplicationFeedEntityClass(); } @@ -102,12 +96,8 @@ public class FalconDataModelGenerator { private void createClusterEntityClass() throws AtlasException { AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ - new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.OPTIONAL, false, - null), new AttributeDefinition(COLO, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null), - new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, - null), // map of tags new AttributeDefinition(TAGS, DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.STRING_TYPE.getName()), @@ -120,14 +110,11 @@ public class FalconDataModelGenerator { LOG.debug("Created definition for {}", FalconDataTypes.FALCON_CLUSTER.getName()); } - private void createFeedEntityClass() throws AtlasException { + private void createFeedCreationEntityClass() throws AtlasException { AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ - new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.REQUIRED, false, - null), new AttributeDefinition(STOREDIN, FalconDataTypes.FALCON_CLUSTER.getName(), Multiplicity.REQUIRED, - false, null), - new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, - null)}; + false, null) + }; HierarchicalTypeDefinition definition = new HierarchicalTypeDefinition<>(ClassType.class, FalconDataTypes.FALCON_FEED_CREATION.getName(), null, @@ -136,19 +123,17 @@ public class FalconDataModelGenerator { LOG.debug("Created definition for {}", FalconDataTypes.FALCON_FEED_CREATION.getName()); } - private void createFeedDatasetClass() throws AtlasException { + private void createFeedEntityClass() throws AtlasException { AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ - new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.OPTIONAL, false, - null), + TypesUtil.createRequiredAttrDef(FREQUENCY, DataTypes.STRING_TYPE), new AttributeDefinition(STOREDIN, FalconDataTypes.FALCON_CLUSTER.getName(), Multiplicity.REQUIRED, false, null), - new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, - null), new AttributeDefinition(GROUPS, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null), // map of tags new AttributeDefinition(TAGS, DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.STRING_TYPE.getName()), - Multiplicity.OPTIONAL, false, null),}; + Multiplicity.OPTIONAL, false, null) + }; HierarchicalTypeDefinition definition = new HierarchicalTypeDefinition<>(ClassType.class, FalconDataTypes.FALCON_FEED.getName(), null, @@ -159,28 +144,19 @@ public class FalconDataModelGenerator { private void createReplicationFeedEntityClass() throws AtlasException { - AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ - new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.REQUIRED, false, - null), - new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, - null)}; - HierarchicalTypeDefinition definition = new HierarchicalTypeDefinition<>(ClassType.class, FalconDataTypes.FALCON_FEED_REPLICATION.getName(), null, - ImmutableSet.of(AtlasClient.PROCESS_SUPER_TYPE), attributeDefinitions); + ImmutableSet.of(AtlasClient.PROCESS_SUPER_TYPE), null); classTypeDefinitions.put(FalconDataTypes.FALCON_FEED_REPLICATION.getName(), definition); LOG.debug("Created definition for {}", FalconDataTypes.FALCON_FEED_REPLICATION.getName()); } private void createProcessEntityClass() throws AtlasException { AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ - new AttributeDefinition(TIMESTAMP, DataTypes.DATE_TYPE.getName(), Multiplicity.REQUIRED, false, - null), + TypesUtil.createRequiredAttrDef(FREQUENCY, DataTypes.STRING_TYPE), new AttributeDefinition(RUNSON, FalconDataTypes.FALCON_CLUSTER.getName(), Multiplicity.REQUIRED, false, null), - new AttributeDefinition(USER, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, - null), // map of tags new AttributeDefinition(TAGS, DataTypes.mapTypeName(DataTypes.STRING_TYPE.getName(), DataTypes.STRING_TYPE.getName()), http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d838cf38/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/service/AtlasService.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/service/AtlasService.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/service/AtlasService.java index c92bd43..889317e 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/service/AtlasService.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/service/AtlasService.java @@ -28,7 +28,6 @@ import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.service.ConfigurationChangeListener; import org.apache.falcon.service.FalconService; -import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -127,15 +126,12 @@ public class AtlasService implements FalconService, ConfigurationChangeListener LOG.info("Adding {} entity to Atlas: {}", entity.getEntityType().name(), entity.getName()); try { - String user = entity.getACL() != null ? entity.getACL().getOwner() : - UserGroupInformation.getLoginUser().getShortUserName(); FalconEvent event = - new FalconEvent(user, EventUtil.getUgi(), operation, System.currentTimeMillis(), entity); + new FalconEvent(EventUtil.getUser(), operation, System.currentTimeMillis(), entity); FalconEventPublisher.Data data = new FalconEventPublisher.Data(event); publisher.publish(data); } catch (Exception ex) { throw new FalconException("Unable to publish data to publisher " + ex.getMessage(), ex); } } - } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d838cf38/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java index 8a5736a..0139bf5 100644 --- a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java +++ b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java @@ -181,6 +181,7 @@ public class FalconHookIT { feedCluster.setName(clusterName); STORE.publish(EntityType.FEED, feed); String feedId = assertFeedIsRegistered(feed, clusterName); + assertFeedAttributes(feedId); String processId = assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_CREATION.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, @@ -223,15 +224,24 @@ public class FalconHookIT { STORE.publish(EntityType.FEED, feed); String feedId = assertFeedIsRegistered(feed, clusterName); + assertFeedAttributes(feedId); verifyFeedLineage(feed.getName(), clusterName, feedId, dbName, tableName); if (secondClusterName != null) { String feedId2 = assertFeedIsRegistered(feed, secondClusterName); + assertFeedAttributes(feedId2); verifyFeedLineage(feed.getName(), secondClusterName, feedId2, dbName2, tableName2); } return feed; } + private void assertFeedAttributes(String feedId) throws Exception { + Referenceable feedEntity = atlasClient.getEntity(feedId); + assertEquals(feedEntity.get(AtlasClient.OWNER), "testuser"); + assertEquals(feedEntity.get(FalconDataModelGenerator.FREQUENCY), "hours(1)"); + assertEquals(feedEntity.get(AtlasClient.DESCRIPTION), "test input"); + } + private void verifyFeedLineage(String feedName, String clusterName, String feedId, String dbName, String tableName) throws Exception{ //verify that lineage from hive table to falcon feed is created http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d838cf38/addons/hdfs-model/src/main/scala/org/apache/atlas/fs/model/FSDataModel.scala ---------------------------------------------------------------------- diff --git a/addons/hdfs-model/src/main/scala/org/apache/atlas/fs/model/FSDataModel.scala b/addons/hdfs-model/src/main/scala/org/apache/atlas/fs/model/FSDataModel.scala index cf81406..4941a5f 100644 --- a/addons/hdfs-model/src/main/scala/org/apache/atlas/fs/model/FSDataModel.scala +++ b/addons/hdfs-model/src/main/scala/org/apache/atlas/fs/model/FSDataModel.scala @@ -37,7 +37,7 @@ object FSDataModel extends App { val typesDef : TypesDef = types { // FS DataSet - _class(FSDataTypes.FS_PATH.toString) { + _class(FSDataTypes.FS_PATH.toString, List(AtlasClient.DATA_SET_SUPER_TYPE)) { //fully qualified path/URI to the filesystem path is stored in 'qualifiedName' and 'path'. "path" ~ (string, required, indexed) "createTime" ~ (date, optional, indexed) @@ -48,7 +48,6 @@ object FSDataModel extends App { "isSymlink" ~ (boolean, optional, indexed) //Optional and may not be set for a directory "fileSize" ~ (long, optional, indexed) - "owner" ~ (string, optional, indexed) "group" ~ (string, optional, indexed) "posixPermissions" ~ (FSDataTypes.FS_PERMISSIONS.toString, optional, indexed) } @@ -63,7 +62,7 @@ object FSDataModel extends App { } //HDFS DataSet - _class(FSDataTypes.HDFS_PATH.toString, List("DataSet", FSDataTypes.FS_PATH.toString)) { + _class(FSDataTypes.HDFS_PATH.toString, List(FSDataTypes.FS_PATH.toString)) { //Making cluster optional since path is already unique containing the namenode URI AtlasConstants.CLUSTER_NAME_ATTRIBUTE ~ (string, optional, indexed) "numberOfReplicas" ~ (int, optional, indexed) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d838cf38/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java index c956a32..4d009e8 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java @@ -27,7 +27,6 @@ import org.apache.atlas.fs.model.FSDataModel; import org.apache.atlas.fs.model.FSDataTypes; import org.apache.atlas.hive.model.HiveDataModelGenerator; import org.apache.atlas.hive.model.HiveDataTypes; -import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Struct; import org.apache.atlas.typesystem.json.InstanceSerialization; @@ -54,6 +53,7 @@ import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -163,12 +163,12 @@ public class HiveMetaStoreBridge { } String dbName = hiveDB.getName().toLowerCase(); dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getDBQualifiedName(clusterName, dbName)); - dbRef.set(HiveDataModelGenerator.NAME, dbName); + dbRef.set(AtlasClient.NAME, dbName); dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName); dbRef.set(DESCRIPTION_ATTR, hiveDB.getDescription()); dbRef.set(HiveDataModelGenerator.LOCATION, hiveDB.getLocationUri()); dbRef.set(HiveDataModelGenerator.PARAMETERS, hiveDB.getParameters()); - dbRef.set(HiveDataModelGenerator.OWNER, hiveDB.getOwnerName()); + dbRef.set(AtlasClient.OWNER, hiveDB.getOwnerName()); if (hiveDB.getOwnerType() != null) { dbRef.set("ownerType", hiveDB.getOwnerType().getValue()); } @@ -209,7 +209,7 @@ public class HiveMetaStoreBridge { } static String getDatabaseDSLQuery(String clusterName, String databaseName, String typeName) { - return String.format("%s where %s = '%s' and %s = '%s'", typeName, HiveDataModelGenerator.NAME, + return String.format("%s where %s = '%s' and %s = '%s'", typeName, AtlasClient.NAME, databaseName.toLowerCase(), AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName); } @@ -398,8 +398,8 @@ public class HiveMetaStoreBridge { String tableQualifiedName = getTableQualifiedName(clusterName, hiveTable); tableReference.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName); - tableReference.set(HiveDataModelGenerator.NAME, hiveTable.getTableName().toLowerCase()); - tableReference.set(HiveDataModelGenerator.OWNER, hiveTable.getOwner()); + tableReference.set(AtlasClient.NAME, hiveTable.getTableName().toLowerCase()); + tableReference.set(AtlasClient.OWNER, hiveTable.getOwner()); Date createDate = new Date(); if (hiveTable.getTTable() != null){ @@ -442,10 +442,10 @@ public class HiveMetaStoreBridge { tableReference.set("temporary", hiveTable.isTemporary()); // add reference to the Partition Keys - List partKeys = getColumns(hiveTable.getPartitionKeys(), tableQualifiedName, tableReference.getId()); + List partKeys = getColumns(hiveTable.getPartitionKeys(), tableReference); tableReference.set("partitionKeys", partKeys); - tableReference.set(HiveDataModelGenerator.COLUMNS, getColumns(hiveTable.getCols(), tableQualifiedName, tableReference.getId())); + tableReference.set(HiveDataModelGenerator.COLUMNS, getColumns(hiveTable.getCols(), tableReference)); return tableReference; } @@ -507,7 +507,7 @@ public class HiveMetaStoreBridge { String serdeInfoName = HiveDataTypes.HIVE_SERDE.getName(); Struct serdeInfoStruct = new Struct(serdeInfoName); - serdeInfoStruct.set(HiveDataModelGenerator.NAME, serdeInfo.getName()); + serdeInfoStruct.set(AtlasClient.NAME, serdeInfo.getName()); serdeInfoStruct.set("serializationLib", serdeInfo.getSerializationLib()); serdeInfoStruct.set(HiveDataModelGenerator.PARAMETERS, serdeInfo.getParameters()); @@ -561,18 +561,19 @@ public class HiveMetaStoreBridge { return String.format("%s.%s@%s", tableName, colName.toLowerCase(), clusterName); } - public List getColumns(List schemaList, String tableQualifiedName, Id tableReference) throws Exception { + public List getColumns(List schemaList, Referenceable tableReference) throws Exception { List colList = new ArrayList<>(); for (FieldSchema fs : schemaList) { LOG.debug("Processing field " + fs); Referenceable colReferenceable = new Referenceable(HiveDataTypes.HIVE_COLUMN.getName()); colReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - getColumnQualifiedName(tableQualifiedName, fs.getName())); - colReferenceable.set(HiveDataModelGenerator.NAME, fs.getName()); + getColumnQualifiedName((String) tableReference.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), fs.getName())); + colReferenceable.set(AtlasClient.NAME, fs.getName()); + colReferenceable.set(AtlasClient.OWNER, tableReference.get(AtlasClient.OWNER)); colReferenceable.set("type", fs.getType()); colReferenceable.set(HiveDataModelGenerator.COMMENT, fs.getComment()); - colReferenceable.set(HiveDataModelGenerator.TABLE, tableReference); + colReferenceable.set(HiveDataModelGenerator.TABLE, tableReference.getId()); colList.add(colReferenceable); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d838cf38/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index 5d9950f..664ef62 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -401,12 +401,12 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { } private Referenceable replaceTableQFName(HiveEventContext event, Table oldTable, Table newTable, final Referenceable tableEntity, final String oldTableQFName, final String newTableQFName) throws HiveException { - tableEntity.set(HiveDataModelGenerator.NAME, oldTable.getTableName().toLowerCase()); + tableEntity.set(AtlasClient.NAME, oldTable.getTableName().toLowerCase()); tableEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, oldTableQFName); //Replace table entity with new name final Referenceable newEntity = new Referenceable(HiveDataTypes.HIVE_TABLE.getName()); - newEntity.set(HiveDataModelGenerator.NAME, newTable.getTableName().toLowerCase()); + newEntity.set(AtlasClient.NAME, newTable.getTableName().toLowerCase()); newEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, newTableQFName); ArrayList alias_list = new ArrayList<>(); @@ -422,7 +422,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { private List replaceColumnQFName(final HiveEventContext event, final List cols, final String oldTableQFName, final String newTableQFName) { List newColEntities = new ArrayList<>(); for (Referenceable col : cols) { - final String colName = (String) col.get(HiveDataModelGenerator.NAME); + final String colName = (String) col.get(AtlasClient.NAME); String oldColumnQFName = HiveMetaStoreBridge.getColumnQualifiedName(oldTableQFName, colName); String newColumnQFName = HiveMetaStoreBridge.getColumnQualifiedName(newTableQFName, colName); col.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, oldColumnQFName); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d838cf38/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java index 3686fa8..a3d97eb 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/model/HiveDataModelGenerator.java @@ -70,15 +70,12 @@ public class HiveDataModelGenerator { public static final String STORAGE_NUM_BUCKETS = "numBuckets"; public static final String STORAGE_IS_STORED_AS_SUB_DIRS = "storedAsSubDirectories"; - public static final String NAME = "name"; - public static final String TABLE_NAME = "tableName"; public static final String TABLE = "table"; public static final String DB = "db"; public static final String STORAGE_DESC = "sd"; public static final String STORAGE_DESC_INPUT_FMT = "inputFormat"; public static final String STORAGE_DESC_OUTPUT_FMT = "outputFormat"; - public static final String OWNER = "owner"; public static final String LOCATION = "location"; public static final String TABLE_TYPE_ATTR = "tableType"; @@ -147,7 +144,7 @@ public class HiveDataModelGenerator { private void createSerDeStruct() throws AtlasException { AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ - new AttributeDefinition(NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null), + new AttributeDefinition(AtlasClient.NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null), new AttributeDefinition("serializationLib", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null), new AttributeDefinition(HiveDataModelGenerator.PARAMETERS, STRING_MAP_TYPE.getName(), Multiplicity.OPTIONAL, false, null),}; @@ -206,29 +203,23 @@ public class HiveDataModelGenerator { private void createDBClass() throws AtlasException { AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ - new AttributeDefinition(NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null), new AttributeDefinition(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null), - new AttributeDefinition("description", DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, - null), new AttributeDefinition(LOCATION, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null), new AttributeDefinition(HiveDataModelGenerator.PARAMETERS, STRING_MAP_TYPE.getName(), Multiplicity.OPTIONAL, false, null), - new AttributeDefinition(OWNER, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, - null), new AttributeDefinition("ownerType", HiveDataTypes.HIVE_PRINCIPAL_TYPE.getName(), Multiplicity.OPTIONAL, false, null),}; HierarchicalTypeDefinition definition = new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_DB.getName(), null, - ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE), attributeDefinitions); + ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE, AtlasClient.ASSET_TYPE), attributeDefinitions); classTypeDefinitions.put(HiveDataTypes.HIVE_DB.getName(), definition); LOG.debug("Created definition for " + HiveDataTypes.HIVE_DB.getName()); } private void createColumnClass() throws AtlasException { AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ - new AttributeDefinition(NAME, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null), new AttributeDefinition("type", DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null), new AttributeDefinition(COMMENT, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null), //Making this optional since this is an incompatible change @@ -237,7 +228,7 @@ public class HiveDataModelGenerator { HierarchicalTypeDefinition definition = new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_COLUMN.getName(), null, - ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE), attributeDefinitions); + ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE, AtlasClient.ASSET_TYPE), attributeDefinitions); classTypeDefinitions.put(HiveDataTypes.HIVE_COLUMN.getName(), definition); LOG.debug("Created definition for " + HiveDataTypes.HIVE_COLUMN.getName()); } @@ -245,7 +236,6 @@ public class HiveDataModelGenerator { private void createTableClass() throws AtlasException { AttributeDefinition[] attributeDefinitions = new AttributeDefinition[]{ new AttributeDefinition(DB, HiveDataTypes.HIVE_DB.getName(), Multiplicity.REQUIRED, false, null), - new AttributeDefinition(OWNER, DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null), new AttributeDefinition(CREATE_TIME, DataTypes.DATE_TYPE.getName(), Multiplicity.OPTIONAL, false, null), new AttributeDefinition(LAST_ACCESS_TIME, DataTypes.DATE_TYPE.getName(), Multiplicity.OPTIONAL, false, @@ -271,7 +261,7 @@ public class HiveDataModelGenerator { null),}; HierarchicalTypeDefinition definition = new HierarchicalTypeDefinition<>(ClassType.class, HiveDataTypes.HIVE_TABLE.getName(), null, - ImmutableSet.of("DataSet"), attributeDefinitions); + ImmutableSet.of(AtlasClient.DATA_SET_SUPER_TYPE), attributeDefinitions); classTypeDefinitions.put(HiveDataTypes.HIVE_TABLE.getName(), definition); LOG.debug("Created definition for " + HiveDataTypes.HIVE_TABLE.getName()); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d838cf38/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java index 5a175e7..995562e 100755 --- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java +++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java @@ -63,9 +63,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.atlas.AtlasClient.NAME; import static org.apache.atlas.hive.hook.HiveHook.lower; import static org.apache.atlas.hive.hook.HiveHook.normalize; -import static org.apache.atlas.hive.model.HiveDataModelGenerator.NAME; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; @@ -192,13 +192,18 @@ public class HiveHookIT { Assert.assertNotNull(colEntity.get(HiveDataModelGenerator.TABLE)); Assert.assertEquals(((Id) colEntity.get(HiveDataModelGenerator.TABLE))._getId(), tableId); + //assert that column.owner = table.owner + Referenceable tableRef = atlasClient.getEntity(tableId); + assertEquals(tableRef.get(AtlasClient.OWNER), colEntity.get(AtlasClient.OWNER)); + + //create table where db is not registered tableName = createTable(); tableId = assertTableIsRegistered(DEFAULT_DB, tableName); - Referenceable tableRef = atlasClient.getEntity(tableId); + tableRef = atlasClient.getEntity(tableId); Assert.assertEquals(tableRef.get(HiveDataModelGenerator.TABLE_TYPE_ATTR), TableType.MANAGED_TABLE.name()); Assert.assertEquals(tableRef.get(HiveDataModelGenerator.COMMENT), "table comment"); String entityName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName); - Assert.assertEquals(tableRef.get(HiveDataModelGenerator.NAME), tableName.toLowerCase()); + Assert.assertEquals(tableRef.get(AtlasClient.NAME), tableName.toLowerCase()); Assert.assertEquals(tableRef.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), entityName); Table t = hiveMetaStoreBridge.hiveClient.getTable(DEFAULT_DB, tableName); @@ -1351,7 +1356,7 @@ public class HiveHookIT { assertDatabaseIsRegistered(dbName, new AssertPredicate() { @Override public void assertOnEntity(Referenceable entity) { - assertEquals(entity.get(HiveDataModelGenerator.OWNER), owner); + assertEquals(entity.get(AtlasClient.OWNER), owner); } }); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d838cf38/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java ---------------------------------------------------------------------- diff --git a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java index 9db8180..c412658 100644 --- a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java +++ b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java @@ -63,7 +63,7 @@ public class SqoopHook extends SqoopJobDataPublisher { throws Exception { Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName()); dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName); - dbRef.set(HiveDataModelGenerator.NAME, dbName); + dbRef.set(AtlasClient.NAME, dbName); dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName)); return dbRef; @@ -74,7 +74,7 @@ public class SqoopHook extends SqoopJobDataPublisher { Referenceable tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName()); tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName)); - tableRef.set(HiveDataModelGenerator.NAME, tableName.toLowerCase()); + tableRef.set(AtlasClient.NAME, tableName.toLowerCase()); tableRef.set(HiveDataModelGenerator.DB, dbRef); return tableRef; } @@ -92,14 +92,14 @@ public class SqoopHook extends SqoopJobDataPublisher { String usage = table != null ? "TABLE" : "QUERY"; String source = table != null ? table : query; String name = getSqoopDBStoreName(data); - storeRef.set(SqoopDataModelGenerator.NAME, name); + storeRef.set(AtlasClient.NAME, name); storeRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name); storeRef.set(SqoopDataModelGenerator.DB_STORE_TYPE, data.getStoreType()); storeRef.set(SqoopDataModelGenerator.DB_STORE_USAGE, usage); storeRef.set(SqoopDataModelGenerator.STORE_URI, data.getUrl()); storeRef.set(SqoopDataModelGenerator.SOURCE, source); storeRef.set(SqoopDataModelGenerator.DESCRIPTION, ""); - storeRef.set(SqoopDataModelGenerator.OWNER, data.getUser()); + storeRef.set(AtlasClient.OWNER, data.getUser()); return storeRef; } @@ -107,7 +107,7 @@ public class SqoopHook extends SqoopJobDataPublisher { SqoopJobDataPublisher.Data data, String clusterName) { Referenceable procRef = new Referenceable(SqoopDataTypes.SQOOP_PROCESS.getName()); final String sqoopProcessName = getSqoopProcessName(data, clusterName); - procRef.set(SqoopDataModelGenerator.NAME, sqoopProcessName); + procRef.set(AtlasClient.NAME, sqoopProcessName); procRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, sqoopProcessName); procRef.set(SqoopDataModelGenerator.OPERATION, data.getOperation()); if (isImportOperation(data)) { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d838cf38/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/model/SqoopDataModelGenerator.java ---------------------------------------------------------------------- diff --git a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/model/SqoopDataModelGenerator.java b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/model/SqoopDataModelGenerator.java index c3bdbfa..0ea4d7f 100644 --- a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/model/SqoopDataModelGenerator.java +++ b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/model/SqoopDataModelGenerator.java @@ -57,8 +57,6 @@ public class SqoopDataModelGenerator { private static final DataTypes.MapType STRING_MAP_TYPE = new DataTypes.MapType(DataTypes.STRING_TYPE, DataTypes.STRING_TYPE); - public static final String NAME = "name"; - public static final String OWNER = "ownerName"; public static final String USER = "userName"; public static final String DB_STORE_TYPE = "dbStoreType"; public static final String DB_STORE_USAGE = "storeUse"; @@ -127,9 +125,8 @@ public class SqoopDataModelGenerator { new AttributeDefinition(STORE_URI, DataTypes.STRING_TYPE.getName(), Multiplicity.REQUIRED, false, null), new AttributeDefinition(SOURCE, - DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null), - new AttributeDefinition(OWNER, - DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null),}; + DataTypes.STRING_TYPE.getName(), Multiplicity.OPTIONAL, false, null) + }; HierarchicalTypeDefinition definition = new HierarchicalTypeDefinition<>(ClassType.class, SqoopDataTypes.SQOOP_DBDATASTORE.getName(), null, http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d838cf38/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java ---------------------------------------------------------------------- diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java index b2171c2..5bd5397 100644 --- a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java +++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java @@ -116,7 +116,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { if (StringUtils.isEmpty(owner)) { owner = ANONYMOUS_OWNER; } - topologyReferenceable.set("owner", owner); + topologyReferenceable.set(AtlasClient.OWNER, owner); topologyReferenceable.set("startTime", System.currentTimeMillis()); topologyReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf)); @@ -194,7 +194,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { if (StringUtils.isEmpty(topologyOwner)) { topologyOwner = ANONYMOUS_OWNER; } - dataSetReferenceable.set("owner", topologyOwner); + dataSetReferenceable.set(AtlasClient.OWNER, topologyOwner); dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getKafkaTopicQualifiedName(getClusterName(stormConf), topicName)); dataSetReferenceable.set(AtlasClient.NAME, topicName); break; @@ -204,7 +204,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { final String hbaseTableName = config.get("HBaseBolt.tableName"); dataSetReferenceable.set("uri", stormConf.get("hbase.rootdir")); dataSetReferenceable.set(AtlasClient.NAME, hbaseTableName); - dataSetReferenceable.set("owner", stormConf.get("storm.kerberos.principal")); + dataSetReferenceable.set(AtlasClient.OWNER, stormConf.get("storm.kerberos.principal")); clusterName = extractComponentClusterName(HBaseConfiguration.create(), stormConf); //TODO - Hbase Namespace is hardcoded to 'default'. need to check how to get this or is it already part of tableName dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getHbaseTableQualifiedName(clusterName, HBASE_NAMESPACE_DEFAULT, @@ -220,7 +220,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { dataSetReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf)); dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, hdfsPathStr); dataSetReferenceable.set("path", hdfsPathStr); - dataSetReferenceable.set("owner", stormConf.get("hdfs.kerberos.principal")); + dataSetReferenceable.set(AtlasClient.OWNER, stormConf.get("hdfs.kerberos.principal")); final Path hdfsPath = new Path(hdfsPathStr); dataSetReferenceable.set(AtlasClient.NAME, hdfsPath.getName()); break; @@ -229,7 +229,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { // todo: verify if hive table has everything needed to retrieve existing table Referenceable dbReferenceable = new Referenceable("hive_db"); String databaseName = config.get("HiveBolt.options.databaseName"); - dbReferenceable.set(HiveDataModelGenerator.NAME, databaseName); + dbReferenceable.set(AtlasClient.NAME, databaseName); dbReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getDBQualifiedName(getClusterName(stormConf), databaseName)); dbReferenceable.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, getClusterName(stormConf)); @@ -239,7 +239,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { dataSetReferenceable = new Referenceable("hive_table"); final String tableQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(clusterName, databaseName, hiveTableName); - dataSetReferenceable.set(HiveDataModelGenerator.NAME, hiveTableName); + dataSetReferenceable.set(AtlasClient.NAME, hiveTableName); dataSetReferenceable.set(HiveDataModelGenerator.DB, dbReferenceable); dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName); break; @@ -291,8 +291,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { private Referenceable createSpoutInstance(String spoutName, SpoutSpec stormSpout) throws IllegalAccessException { - Referenceable spoutReferenceable = new Referenceable( - StormDataTypes.STORM_SPOUT.getName(), "DataProducer"); + Referenceable spoutReferenceable = new Referenceable(StormDataTypes.STORM_SPOUT.getName()); spoutReferenceable.set(AtlasClient.NAME, spoutName); Serializable instance = Utils.javaDeserialize( @@ -315,8 +314,7 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { private Referenceable createBoltInstance(String boltName, Bolt stormBolt) throws IllegalAccessException { - Referenceable boltReferenceable = new Referenceable( - StormDataTypes.STORM_BOLT.getName(), "DataProcessor"); + Referenceable boltReferenceable = new Referenceable(StormDataTypes.STORM_BOLT.getName()); boltReferenceable.set(AtlasClient.NAME, boltName); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d838cf38/addons/storm-bridge/src/main/scala/org/apache/atlas/storm/model/StormDataModel.scala ---------------------------------------------------------------------- diff --git a/addons/storm-bridge/src/main/scala/org/apache/atlas/storm/model/StormDataModel.scala b/addons/storm-bridge/src/main/scala/org/apache/atlas/storm/model/StormDataModel.scala index bae4c1a..005f441 100644 --- a/addons/storm-bridge/src/main/scala/org/apache/atlas/storm/model/StormDataModel.scala +++ b/addons/storm-bridge/src/main/scala/org/apache/atlas/storm/model/StormDataModel.scala @@ -45,8 +45,6 @@ object StormDataModel extends App { */ _class(StormDataTypes.STORM_TOPOLOGY.getName, List(AtlasClient.PROCESS_SUPER_TYPE)) { "id" ~ (string, required, indexed, unique) - "description" ~ (string, optional, indexed) - "owner" ~ (string, required, indexed) "startTime" ~ long "endTime" ~ long "conf" ~ (map(string, string), optional) @@ -81,31 +79,20 @@ object StormDataModel extends App { } // Kafka Data Set - _class(StormDataTypes.KAFKA_TOPIC.getName, List("DataSet")) { + _class(StormDataTypes.KAFKA_TOPIC.getName, List(AtlasClient.DATA_SET_SUPER_TYPE)) { "topic" ~ (string, required, unique, indexed) "uri" ~ (string, required) - "owner" ~ (string, required, indexed) } // JMS Data Set - _class(StormDataTypes.JMS_TOPIC.getName, List("DataSet")) { + _class(StormDataTypes.JMS_TOPIC.getName, List(AtlasClient.DATA_SET_SUPER_TYPE)) { "topic" ~ (string, required, unique, indexed) "uri" ~ (string, required) - "owner" ~ (string, required, indexed) } // HBase Data Set - _class(StormDataTypes.HBASE_TABLE.getName, List("DataSet")) { + _class(StormDataTypes.HBASE_TABLE.getName, List(AtlasClient.DATA_SET_SUPER_TYPE)) { "uri" ~ (string, required) - "owner" ~ (string, required, indexed) - } - - _trait("DataProcessor") { - - } - - _trait("DataProducer") { - } // Hive table data set already exists in atlas. } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d838cf38/client/src/main/java/org/apache/atlas/AtlasClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java index 4c88da9..d3af6ad 100755 --- a/client/src/main/java/org/apache/atlas/AtlasClient.java +++ b/client/src/main/java/org/apache/atlas/AtlasClient.java @@ -70,7 +70,6 @@ import static org.apache.atlas.security.SecurityProperties.TLS_ENABLED; public class AtlasClient { private static final Logger LOG = LoggerFactory.getLogger(AtlasClient.class); - public static final String NAME = "name"; public static final String TYPE = "type"; public static final String TYPENAME = "typeName"; public static final String GUID = "GUID"; @@ -106,16 +105,20 @@ public class AtlasClient { public static final String ATTRIBUTE_NAME = "property"; public static final String ATTRIBUTE_VALUE = "value"; + public static final String ASSET_TYPE = "Asset"; + public static final String NAME = "name"; + public static final String DESCRIPTION = "description"; + public static final String OWNER = "owner"; public static final String INFRASTRUCTURE_SUPER_TYPE = "Infrastructure"; public static final String DATA_SET_SUPER_TYPE = "DataSet"; public static final String PROCESS_SUPER_TYPE = "Process"; - public static final String REFERENCEABLE_SUPER_TYPE = "Referenceable"; - public static final String REFERENCEABLE_ATTRIBUTE_NAME = "qualifiedName"; - public static final String PROCESS_ATTRIBUTE_INPUTS = "inputs"; public static final String PROCESS_ATTRIBUTE_OUTPUTS = "outputs"; + public static final String REFERENCEABLE_SUPER_TYPE = "Referenceable"; + public static final String REFERENCEABLE_ATTRIBUTE_NAME = "qualifiedName"; + public static final String JSON_MEDIA_TYPE = MediaType.APPLICATION_JSON + "; charset=UTF-8"; public static final String UNKNOWN_STATUS = "Unknown status"; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d838cf38/distro/src/conf/atlas-log4j.xml ---------------------------------------------------------------------- diff --git a/distro/src/conf/atlas-log4j.xml b/distro/src/conf/atlas-log4j.xml index e14afa3..30e88f8 100755 --- a/distro/src/conf/atlas-log4j.xml +++ b/distro/src/conf/atlas-log4j.xml @@ -49,7 +49,7 @@ - + http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d838cf38/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 61c74e4..457b4f0 100644 --- a/release-log.txt +++ b/release-log.txt @@ -3,6 +3,7 @@ Apache Atlas Release Notes --trunk - unreleased INCOMPATIBLE CHANGES: +ATLAS-819 All user defined types should have a set of common attributes (shwethags) ATLAS-915 Fix docs for import-hive changes (svimal2106 via sumasai) ATLAS-688 import-hive should depend on Hive CLASSPATH jars instead of packaging everything (svimal2106 via sumasai) ATLAS-835 Falcon Integration with Atlas (sowmyaramesh via shwethags) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d838cf38/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java index dd7dd74..a4bdef7 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedMetadataRepository.java @@ -126,7 +126,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { @GraphTransaction public List createEntities(ITypedReferenceableInstance... entities) throws RepositoryException, EntityExistsException { - LOG.info("adding entities={}", entities); + LOG.debug("adding entities={}", entities); try { TypedInstanceToGraphMapper instanceToGraphMapper = new TypedInstanceToGraphMapper(graphToInstanceMapper, deleteHandler); instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.CREATE, entities); @@ -141,7 +141,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { @Override @GraphTransaction public ITypedReferenceableInstance getEntityDefinition(String guid) throws RepositoryException, EntityNotFoundException { - LOG.info("Retrieving entity with guid={}", guid); + LOG.debug("Retrieving entity with guid={}", guid); Vertex instanceVertex = graphHelper.getVertexForGUID(guid); @@ -156,7 +156,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { @GraphTransaction public ITypedReferenceableInstance getEntityDefinition(String entityType, String attribute, Object value) throws AtlasException { - LOG.info("Retrieving entity with type={} and {}={}", entityType, attribute, value); + LOG.debug("Retrieving entity with type={} and {}={}", entityType, attribute, value); IDataType type = typeSystem.getDataType(IDataType.class, entityType); String propertyKey = getFieldNameInVertex(type, attribute); Vertex instanceVertex = graphHelper.findVertex(propertyKey, value, @@ -170,7 +170,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { @Override @GraphTransaction public List getEntityList(String entityType) throws RepositoryException { - LOG.info("Retrieving entity list for type={}", entityType); + LOG.debug("Retrieving entity list for type={}", entityType); GraphQuery query = titanGraph.query().has(Constants.ENTITY_TYPE_PROPERTY_KEY, entityType); Iterator results = query.vertices().iterator(); if (!results.hasNext()) { @@ -196,7 +196,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { @Override @GraphTransaction public List getTraitNames(String guid) throws AtlasException { - LOG.info("Retrieving trait names for entity={}", guid); + LOG.debug("Retrieving trait names for entity={}", guid); Vertex instanceVertex = graphHelper.getVertexForGUID(guid); return GraphHelper.getTraitNames(instanceVertex); } @@ -214,7 +214,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { public void addTrait(String guid, ITypedStruct traitInstance) throws RepositoryException { Preconditions.checkNotNull(traitInstance, "Trait instance cannot be null"); final String traitName = traitInstance.getTypeName(); - LOG.info("Adding a new trait={} for entity={}", traitName, guid); + LOG.debug("Adding a new trait={} for entity={}", traitName, guid); try { Vertex instanceVertex = graphHelper.getVertexForGUID(guid); @@ -249,7 +249,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { @Override @GraphTransaction public void deleteTrait(String guid, String traitNameToBeDeleted) throws TraitNotFoundException, EntityNotFoundException, RepositoryException { - LOG.info("Deleting trait={} from entity={}", traitNameToBeDeleted, guid); + LOG.debug("Deleting trait={} from entity={}", traitNameToBeDeleted, guid); Vertex instanceVertex = graphHelper.getVertexForGUID(guid); @@ -289,7 +289,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { @Override @GraphTransaction public AtlasClient.EntityResult updateEntities(ITypedReferenceableInstance... entitiesUpdated) throws RepositoryException { - LOG.info("updating entity {}", entitiesUpdated); + LOG.debug("updating entity {}", entitiesUpdated); try { TypedInstanceToGraphMapper instanceToGraphMapper = new TypedInstanceToGraphMapper(graphToInstanceMapper, deleteHandler); instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.UPDATE_FULL, @@ -305,7 +305,7 @@ public class GraphBackedMetadataRepository implements MetadataRepository { @Override @GraphTransaction public AtlasClient.EntityResult updatePartial(ITypedReferenceableInstance entity) throws RepositoryException { - LOG.info("updating entity {}", entity); + LOG.debug("updating entity {}", entity); try { TypedInstanceToGraphMapper instanceToGraphMapper = new TypedInstanceToGraphMapper(graphToInstanceMapper, deleteHandler); instanceToGraphMapper.mapTypedInstanceToGraph(TypedInstanceToGraphMapper.Operation.UPDATE_PARTIAL, entity); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d838cf38/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java index c385df3..1a35d50 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java @@ -169,7 +169,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang LOG.info("Creating indexes for type name={}, definition={}", dataType.getName(), dataType.getClass()); try { addIndexForType(management, dataType); - LOG.info("Index creation for type {} complete", dataType.getName()); + LOG.debug("Index creation for type {} complete", dataType.getName()); } catch (Throwable throwable) { LOG.error("Error creating index for type {}", dataType, throwable); //Rollback indexes if any failure http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d838cf38/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java index d9e6cb9..37e7b66 100755 --- a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java +++ b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java @@ -77,6 +77,9 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import static org.apache.atlas.AtlasClient.PROCESS_ATTRIBUTE_INPUTS; +import static org.apache.atlas.AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS; + /** * Simple wrapper over TypeSystem and MetadataRepository services with hooks * for listening to changes to the repository. @@ -164,11 +167,6 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang typeSystem.commitTypes(typesAdded); } - private static final AttributeDefinition NAME_ATTRIBUTE = - TypesUtil.createRequiredAttrDef(AtlasClient.NAME, DataTypes.STRING_TYPE); - private static final AttributeDefinition DESCRIPTION_ATTRIBUTE = - TypesUtil.createOptionalAttrDef("description", DataTypes.STRING_TYPE); - @InterfaceAudience.Private private void createSuperTypes() throws AtlasException { HierarchicalTypeDefinition referenceableType = TypesUtil @@ -177,23 +175,29 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang DataTypes.STRING_TYPE)); createType(referenceableType); + HierarchicalTypeDefinition assetType = TypesUtil + .createClassTypeDef(AtlasClient.ASSET_TYPE, ImmutableSet.of(), + TypesUtil.createRequiredAttrDef(AtlasClient.NAME, DataTypes.STRING_TYPE), + TypesUtil.createOptionalAttrDef(AtlasClient.DESCRIPTION, DataTypes.STRING_TYPE), + TypesUtil.createOptionalAttrDef(AtlasClient.OWNER, DataTypes.STRING_TYPE)); + createType(assetType); + HierarchicalTypeDefinition infraType = TypesUtil - .createClassTypeDef(AtlasClient.INFRASTRUCTURE_SUPER_TYPE, ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE), NAME_ATTRIBUTE, - DESCRIPTION_ATTRIBUTE); + .createClassTypeDef(AtlasClient.INFRASTRUCTURE_SUPER_TYPE, + ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE, AtlasClient.ASSET_TYPE)); createType(infraType); HierarchicalTypeDefinition datasetType = TypesUtil - .createClassTypeDef(AtlasClient.DATA_SET_SUPER_TYPE, ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE), NAME_ATTRIBUTE, - DESCRIPTION_ATTRIBUTE); + .createClassTypeDef(AtlasClient.DATA_SET_SUPER_TYPE, + ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE, AtlasClient.ASSET_TYPE)); createType(datasetType); HierarchicalTypeDefinition processType = TypesUtil - .createClassTypeDef(AtlasClient.PROCESS_SUPER_TYPE, ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE), - TypesUtil.createRequiredAttrDef(AtlasClient.NAME, DataTypes.STRING_TYPE), - DESCRIPTION_ATTRIBUTE, - new AttributeDefinition("inputs", DataTypes.arrayTypeName(AtlasClient.DATA_SET_SUPER_TYPE), + .createClassTypeDef(AtlasClient.PROCESS_SUPER_TYPE, + ImmutableSet.of(AtlasClient.REFERENCEABLE_SUPER_TYPE, AtlasClient.ASSET_TYPE), + new AttributeDefinition(PROCESS_ATTRIBUTE_INPUTS, DataTypes.arrayTypeName(AtlasClient.DATA_SET_SUPER_TYPE), Multiplicity.OPTIONAL, false, null), - new AttributeDefinition("outputs", DataTypes.arrayTypeName(AtlasClient.DATA_SET_SUPER_TYPE), + new AttributeDefinition(PROCESS_ATTRIBUTE_OUTPUTS, DataTypes.arrayTypeName(AtlasClient.DATA_SET_SUPER_TYPE), Multiplicity.OPTIONAL, false, null)); createType(processType); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d838cf38/typesystem/src/main/java/org/apache/atlas/typesystem/types/StructTypeDefinition.java ---------------------------------------------------------------------- diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/types/StructTypeDefinition.java b/typesystem/src/main/java/org/apache/atlas/typesystem/types/StructTypeDefinition.java index 9dc93ae..f1ce1b7 100755 --- a/typesystem/src/main/java/org/apache/atlas/typesystem/types/StructTypeDefinition.java +++ b/typesystem/src/main/java/org/apache/atlas/typesystem/types/StructTypeDefinition.java @@ -28,14 +28,11 @@ public class StructTypeDefinition { public final String typeDescription;//optional field public final AttributeDefinition[] attributeDefinitions; - protected StructTypeDefinition(String typeName, boolean validate, AttributeDefinition... attributeDefinitions) { - this(typeName, null, validate, attributeDefinitions); - } - - protected StructTypeDefinition(String typeName, String typeDescription, boolean validate, AttributeDefinition... attributeDefinitions) { + protected StructTypeDefinition(String typeName, String typeDescription, boolean validate, + AttributeDefinition... attributeDefinitions) { this.typeName = ParamChecker.notEmpty(typeName, "Struct type name"); this.typeDescription = typeDescription; - if (attributeDefinitions != null && attributeDefinitions.length != 0) { + if (validate) { ParamChecker.notNullElements(attributeDefinitions, "Attribute definitions"); } this.attributeDefinitions = attributeDefinitions;