ranger-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ab...@apache.org
Subject ranger git commit: RANGER-2104: Ranger tagsync should ignore ENTITY_UPDATE events if the updated entity does not have associated traits
Date Thu, 17 May 2018 17:03:54 GMT
Repository: ranger
Updated Branches:
  refs/heads/master ab6cb3935 -> 081af4819


RANGER-2104: Ranger tagsync should ignore ENTITY_UPDATE events if the updated entity does
not have associated traits


Project: http://git-wip-us.apache.org/repos/asf/ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/ranger/commit/081af481
Tree: http://git-wip-us.apache.org/repos/asf/ranger/tree/081af481
Diff: http://git-wip-us.apache.org/repos/asf/ranger/diff/081af481

Branch: refs/heads/master
Commit: 081af481923207ee93a3d4a7cc29901b4c972a44
Parents: ab6cb39
Author: Abhay Kulkarni <akulkarni@hortonworks.com>
Authored: Thu May 17 09:45:06 2018 -0700
Committer: Abhay Kulkarni <akulkarni@hortonworks.com>
Committed: Thu May 17 09:45:06 2018 -0700

----------------------------------------------------------------------
 pom.xml                                         |  1 +
 src/main/assembly/tagsync.xml                   |  1 +
 .../source/atlas/AtlasNotificationMapper.java   | 12 ++++
 .../tagsync/source/atlas/AtlasTagSource.java    | 62 +++++++++++++++++---
 4 files changed, 69 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ranger/blob/081af481/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 878f5f0..756eccb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -132,6 +132,7 @@
         <atlas.gson.version>2.5</atlas.gson.version>
         <atlas.jackson.version>2.9.2</atlas.jackson.version>
         <atlas.jettison.version>1.3.7</atlas.jettison.version>
+        <atlas.commons.compress.version>1.4.1</atlas.commons.compress.version>
         <atlas.commons.logging.version>1.1.3</atlas.commons.logging.version>
         <bouncycastle.version>1.55</bouncycastle.version>
         <c3p0.version>0.9.1.2</c3p0.version>

http://git-wip-us.apache.org/repos/asf/ranger/blob/081af481/src/main/assembly/tagsync.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/tagsync.xml b/src/main/assembly/tagsync.xml
index 0788ac1..bc6e28b 100644
--- a/src/main/assembly/tagsync.xml
+++ b/src/main/assembly/tagsync.xml
@@ -45,6 +45,7 @@
 					<include>org.apache.atlas:atlas-client-v2:jar:${atlas.version}</include>
 					<include>org.apache.atlas:atlas-client-common:jar:${atlas.version}</include>
 					<include>org.apache.atlas:atlas-common:jar:${atlas.version}</include>
+					<include>org.apache.commons:commons-compress:jar:${atlas.commons.compress.version}</include>
 					<include>org.apache.hadoop:hadoop-auth</include>
 					<include>org.apache.hadoop:hadoop-common</include>
 					<include>org.apache.kafka:kafka_${scala.binary.version}:jar:${kafka.version}</include>

http://git-wip-us.apache.org/repos/asf/ranger/blob/081af481/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
index 85c7c20..a4cab28 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
@@ -118,8 +118,20 @@ public class AtlasNotificationMapper {
             switch (opType) {
                 case ENTITY_CREATE:
                     ret = ! entityNotification.getIsEmptyClassifications();
+                    if (!ret) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("ENTITY_CREATE notification is ignored, as there are
no traits associated with the entity. Ranger will get necessary information from any subsequent
TRAIT_ADDED notification");
+                        }
+                    }
                     break;
                 case ENTITY_UPDATE:
+                    ret = ! entityNotification.getIsEmptyClassifications();
+                    if (!ret) {
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("ENTITY_UPDATE notification is ignored, as there are
no traits associated with the entity.");
+                        }
+                    }
+                    break;
                 case ENTITY_DELETE:
                 case CLASSIFICATION_ADD:
                 case CLASSIFICATION_UPDATE:

http://git-wip-us.apache.org/repos/asf/ranger/blob/081af481/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
----------------------------------------------------------------------
diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
index ea4c20c..21a22cd 100644
--- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
+++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
@@ -104,7 +104,6 @@ public class AtlasTagSource extends AbstractTagSource {
 			List<NotificationConsumer<EntityNotification>> iterators = notification.createConsumers(NotificationInterface.NotificationType.ENTITIES,
1);
 
 			consumerTask = new ConsumerRunnable(iterators.get(0));
-
 		}
 
 		if (LOG.isDebugEnabled()) {
@@ -163,11 +162,45 @@ public class AtlasTagSource extends AbstractTagSource {
 			if (LOG.isDebugEnabled()) {
 				LOG.debug("==> ConsumerRunnable.run()");
 			}
+
+			boolean seenCommitException = false;
+			long offsetOfLastMessageDeliveredToRanger = -1L;
+
 			while (true) {
 				try {
 					List<AtlasKafkaMessage<EntityNotification>> messages = consumer.receive(1000L);
 
-					for (AtlasKafkaMessage<EntityNotification> message :  messages) {
+					int index = 0;
+
+					if (messages.size() > 0 && seenCommitException) {
+						if (LOG.isDebugEnabled()) {
+							LOG.debug("seenCommitException=[true], offsetOfLastMessageDeliveredToRanger=[" + offsetOfLastMessageDeliveredToRanger
+ "]");
+						}
+						for (; index < messages.size(); index++) {
+							AtlasKafkaMessage<EntityNotification> message = messages.get(index);
+							if (message.getOffset() <= offsetOfLastMessageDeliveredToRanger) {
+								// Already delivered to Ranger
+								TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
+								try {
+									if (LOG.isDebugEnabled()) {
+										LOG.debug("Committing previously commit-failed message with offset:[" + message.getOffset()
+ "]");
+									}
+									consumer.commit(partition, message.getOffset());
+								} catch (Exception commitException) {
+									LOG.warn("Ranger tagsync already processed message at offset " + message.getOffset()
+ ". Ignoring failure in committing this message and continuing to process next message",
commitException);
+									LOG.warn("This will cause Kafka to deliver this message:[" + message.getOffset()
+ "] repeatedly!! This may be unrecoverable error!!");
+								}
+							} else {
+								break;
+							}
+						}
+					}
+
+					seenCommitException = false;
+					offsetOfLastMessageDeliveredToRanger = -1L;
+
+					for (; index < messages.size(); index++) {
+						AtlasKafkaMessage<EntityNotification> message = messages.get(index);
 						EntityNotification notification = message != null ? message.getMessage() : null;
 
 						if (notification != null) {
@@ -179,16 +212,24 @@ public class AtlasTagSource extends AbstractTagSource {
 							}
 							if (notificationWrapper != null) {
 								if (LOG.isDebugEnabled()) {
-									LOG.debug("Notification=" + getPrintableEntityNotification(notificationWrapper));
+									LOG.debug("Message-offset=" + message.getOffset() + ", Notification=" + getPrintableEntityNotification(notificationWrapper));
 								}
 
 								ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notificationWrapper);
 								if (serviceTags != null) {
 									updateSink(serviceTags);
 								}
-
-								TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
-								consumer.commit(partition, message.getOffset());
+								offsetOfLastMessageDeliveredToRanger = message.getOffset();
+
+								if (!seenCommitException) {
+									TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
+									try {
+										consumer.commit(partition, message.getOffset());
+									} catch (Exception commitException) {
+										seenCommitException = true;
+										LOG.warn("Ranger tagsync processed message at offset " + message.getOffset() +
". Ignoring failure in committing this message and continuing to process next message", commitException);
+									}
+								}
 							}
 						} else {
 							LOG.error("Null entityNotification received from Kafka!! Ignoring..");
@@ -196,7 +237,14 @@ public class AtlasTagSource extends AbstractTagSource {
 					}
 				} catch (Exception exception) {
 					LOG.error("Caught exception..: ", exception);
-					return;
+					// If transient error, retry after short interval
+					try {
+						Thread.sleep(100);
+					} catch (InterruptedException interrupted) {
+						LOG.error("Interrupted: ", interrupted);
+						LOG.error("Returning from thread. May cause process to be up but not processing events!!");
+						return;
+					}
 				}
 			}
 		}


Mime
View raw message