Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E3975200CB3 for ; Mon, 26 Jun 2017 17:44:11 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E26E9160BDE; Mon, 26 Jun 2017 15:44:11 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6AAC0160BD9 for ; Mon, 26 Jun 2017 17:44:09 +0200 (CEST) Received: (qmail 94297 invoked by uid 500); 26 Jun 2017 15:44:08 -0000 Mailing-List: contact commits-help@atlas.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@atlas.apache.org Delivered-To: mailing list commits@atlas.apache.org Received: (qmail 94288 invoked by uid 99); 26 Jun 2017 15:44:08 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 26 Jun 2017 15:44:08 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 16B5BC08B5 for ; Mon, 26 Jun 2017 15:44:08 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.222 X-Spam-Level: X-Spam-Status: No, score=-4.222 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id GOqozY5Ws8Je for ; Mon, 26 Jun 2017 15:43:55 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 331645FB62 for ; Mon, 26 Jun 2017 15:43:53 +0000 (UTC) Received: (qmail 93990 invoked by uid 99); 26 Jun 2017 15:43:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 26 Jun 2017 15:43:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4751FE041D; Mon, 26 Jun 2017 15:43:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: madhan@apache.org To: commits@atlas.incubator.apache.org Message-Id: <3d8c3afa5f7249dc821c10cd72204245@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-atlas git commit: ATLAS-1766: updated NotificationConsumer implementation to use new Kafka Consumer API, to enable support for SASL_SSL protocol Date: Mon, 26 Jun 2017 15:43:52 +0000 (UTC) archived-at: Mon, 26 Jun 2017 15:44:12 -0000 Repository: incubator-atlas Updated Branches: refs/heads/0.8-incubating ab7ebacd7 -> 7ec95fe75 ATLAS-1766: updated NotificationConsumer implementation to use new Kafka Consumer API, to enable support for SASL_SSL protocol Signed-off-by: Madhan Neethiraj (cherry picked from commit 0e7f8ea4603c858cc295259bbd1a22314b732f62) Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/7ec95fe7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/7ec95fe7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/7ec95fe7 Branch: refs/heads/0.8-incubating Commit: 7ec95fe757d894a1b69e3561a58f0422d2e9d549 Parents: ab7ebac Author: nixonrodrigues Authored: Mon Jun 5 12:49:27 2017 +0530 Committer: Madhan Neethiraj Committed: Mon Jun 26 08:43:43 2017 -0700 ---------------------------------------------------------------------- distro/src/conf/atlas-application.properties | 8 +- .../apache/atlas/kafka/AtlasKafkaConsumer.java | 91 ++++++++++++ .../apache/atlas/kafka/AtlasKafkaMessage.java | 44 ++++++ .../org/apache/atlas/kafka/KafkaConsumer.java | 104 -------------- .../apache/atlas/kafka/KafkaNotification.java | 95 ++++--------- .../AbstractMessageDeserializer.java | 2 +- .../AbstractNotificationConsumer.java | 34 +---- .../notification/NotificationConsumer.java | 33 ++--- .../apache/atlas/kafka/KafkaConsumerTest.java | 137 ++++++++----------- .../atlas/kafka/KafkaNotificationMockTest.java | 52 +++---- .../atlas/kafka/KafkaNotificationTest.java | 56 +++----- .../AbstractNotificationConsumerTest.java | 121 +++++----------- .../test/resources/atlas-application.properties | 8 +- .../notification/NotificationHookConsumer.java | 37 +++-- .../notification/EntityNotificationIT.java | 14 +- .../NotificationHookConsumerKafkaTest.java | 77 +++++++---- .../NotificationHookConsumerTest.java | 10 +- .../atlas/web/integration/BaseResourceIT.java | 24 ++-- .../web/integration/EntityJerseyResourceIT.java | 22 --- .../integration/EntityV2JerseyResourceIT.java | 16 +-- 20 files changed, 415 insertions(+), 570 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7ec95fe7/distro/src/conf/atlas-application.properties ---------------------------------------------------------------------- diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties index 5e59528..474f253 100755 --- a/distro/src/conf/atlas-application.properties +++ b/distro/src/conf/atlas-application.properties @@ -74,9 +74,13 @@ atlas.kafka.zookeeper.session.timeout.ms=400 atlas.kafka.zookeeper.connection.timeout.ms=200 atlas.kafka.zookeeper.sync.time.ms=20 atlas.kafka.auto.commit.interval.ms=1000 -atlas.kafka.auto.offset.reset=smallest atlas.kafka.hook.group.id=atlas -atlas.kafka.auto.commit.enable=false + +atlas.kafka.enable.auto.commit=false +atlas.kafka.auto.offset.reset=earliest +atlas.kafka.session.timeout.ms=30000 + + atlas.notification.create.topics=true atlas.notification.replicas=1 atlas.notification.topics=ATLAS_HOOK,ATLAS_ENTITIES http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7ec95fe7/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java new file mode 100644 index 0000000..9c15243 --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.kafka; + +import org.apache.atlas.notification.AbstractNotificationConsumer; +import org.apache.atlas.notification.MessageDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; + +/** + * Kafka specific notification consumer. + * + * @param the notification type returned by this consumer + */ +public class AtlasKafkaConsumer extends AbstractNotificationConsumer { + private static final Logger LOG = LoggerFactory.getLogger(AtlasKafkaConsumer.class); + + private final KafkaConsumer kafkaConsumer; + private final boolean autoCommitEnabled; + + public AtlasKafkaConsumer(MessageDeserializer deserializer, KafkaConsumer kafkaConsumer, boolean autoCommitEnabled) { + super(deserializer); + + this.kafkaConsumer = kafkaConsumer; + this.autoCommitEnabled = autoCommitEnabled; + } + + public List> receive(long timeoutMilliSeconds) { + List> messages = new ArrayList(); + + ConsumerRecords records = kafkaConsumer.poll(timeoutMilliSeconds); + + if (records != null) { + for (ConsumerRecord record : records) { + if (LOG.isDebugEnabled()) { + LOG.debug("Received Message topic ={}, partition ={}, offset = {}, key = {}, value = {}", + record.topic(), record.partition(), record.offset(), record.key(), record.value()); + } + + T message = deserializer.deserialize(record.value().toString()); + + messages.add(new AtlasKafkaMessage(message, record.offset(), record.partition())); + } + } + + return messages; + } + + + @Override + public void commit(TopicPartition partition, long offset) { + if (!autoCommitEnabled) { + if (LOG.isDebugEnabled()) { + LOG.info(" commiting the offset ==>> " + offset); + } + kafkaConsumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset))); + } + } + + @Override + public void close() { + if (kafkaConsumer != null) { + kafkaConsumer.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7ec95fe7/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java new file mode 100644 index 0000000..cdbf57f --- /dev/null +++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java @@ -0,0 +1,44 @@ +package org.apache.atlas.kafka; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public class AtlasKafkaMessage { + private final T message; + private final long offset; + private final int partition; + + public AtlasKafkaMessage(T message, long offset, int partition) { + this.message = message; + this.offset = offset; + this.partition = partition; + } + + public T getMessage() { + return message; + } + + public long getOffset() { + return offset; + } + + public int getPartition() { + return partition; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7ec95fe7/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java deleted file mode 100644 index 16c0eb2..0000000 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.kafka; - -import kafka.consumer.ConsumerIterator; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.message.MessageAndMetadata; -import org.apache.atlas.notification.AbstractNotificationConsumer; -import org.apache.atlas.notification.MessageDeserializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Kafka specific notification consumer. - * - * @param the notification type returned by this consumer - */ -public class KafkaConsumer extends AbstractNotificationConsumer { - private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class); - - private final int consumerId; - private final ConsumerIterator iterator; - private final ConsumerConnector consumerConnector; - private final boolean autoCommitEnabled; - private long lastSeenOffset; - - - // ----- Constructors ---------------------------------------------------- - - /** - * Create a Kafka consumer. - * @param deserializer the message deserializer used for this consumer - * @param stream the underlying Kafka stream - * @param consumerId an id value for this consumer - * @param consumerConnector the {@link ConsumerConnector} which created the underlying Kafka stream - * @param autoCommitEnabled true if consumer does not need to commit offsets explicitly, false otherwise. - */ - public KafkaConsumer(MessageDeserializer deserializer, KafkaStream stream, int consumerId, - ConsumerConnector consumerConnector, boolean autoCommitEnabled) { - super(deserializer); - this.consumerConnector = consumerConnector; - this.lastSeenOffset = 0; - this.iterator = stream.iterator(); - this.consumerId = consumerId; - this.autoCommitEnabled = autoCommitEnabled; - } - - - // ----- NotificationConsumer -------------------------------------------- - - @Override - public boolean hasNext() { - return iterator.hasNext(); - } - - - // ----- AbstractNotificationConsumer ------------------------------------ - - @Override - public String getNext() { - MessageAndMetadata message = iterator.next(); - LOG.debug("Read message: conumerId: {}, topic - {}, partition - {}, offset - {}, message - {}", - consumerId, message.topic(), message.partition(), message.offset(), message.message()); - lastSeenOffset = message.offset(); - return (String) message.message(); - } - - @Override - protected String peekMessage() { - MessageAndMetadata message = (MessageAndMetadata) iterator.peek(); - return (String) message.message(); - } - - @Override - public void commit() { - if (autoCommitEnabled) { - LOG.debug("Auto commit is disabled, not committing."); - } else { - consumerConnector.commitOffsets(); - LOG.debug("Committed offset: {}", lastSeenOffset); - } - } - - @Override - public void close() { - consumerConnector.shutdown(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7ec95fe7/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java index 8bd31fd..366c8a7 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java @@ -18,25 +18,22 @@ package org.apache.atlas.kafka; import com.google.common.annotations.VisibleForTesting; -import kafka.consumer.Consumer; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.serializer.StringDecoder; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; import kafka.utils.Time; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; import org.apache.atlas.notification.AbstractNotification; -import org.apache.atlas.notification.MessageDeserializer; import org.apache.atlas.notification.NotificationConsumer; import org.apache.atlas.notification.NotificationException; import org.apache.atlas.service.Service; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationConverter; +import org.apache.commons.lang.StringUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; @@ -56,10 +53,11 @@ import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.URISyntaxException; import java.net.URL; -import java.util.ArrayList; -import java.util.HashMap; +import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.HashMap; +import java.util.ArrayList; import java.util.Properties; import java.util.concurrent.Future; @@ -83,9 +81,8 @@ public class KafkaNotification extends AbstractNotification implements Service { private KafkaServer kafkaServer; private ServerCnxnFactory factory; private Properties properties; - + private KafkaConsumer consumer = null; private KafkaProducer producer = null; - private List consumerConnectors = new ArrayList<>(); private static final Map TOPIC_MAP = new HashMap() { { @@ -126,8 +123,7 @@ public class KafkaNotification extends AbstractNotification implements Service { "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); - properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "roundrobin"); - properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest"); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); } @VisibleForTesting @@ -171,34 +167,18 @@ public class KafkaNotification extends AbstractNotification implements Service { public List> createConsumers(NotificationType notificationType, int numConsumers) { return createConsumers(notificationType, numConsumers, - Boolean.valueOf(properties.getProperty("auto.commit.enable", "true"))); + Boolean.valueOf(properties.getProperty("enable.auto.commit", "true"))); } @VisibleForTesting public List> createConsumers(NotificationType notificationType, int numConsumers, boolean autoCommitEnabled) { - String topic = TOPIC_MAP.get(notificationType); Properties consumerProperties = getConsumerProperties(notificationType); - List> consumers = new ArrayList<>(numConsumers); - for (int i = 0; i < numConsumers; i++) { - ConsumerConnector consumerConnector = createConsumerConnector(consumerProperties); - Map topicCountMap = new HashMap<>(); - topicCountMap.put(topic, 1); - StringDecoder decoder = new StringDecoder(null); - Map>> streamsMap = - consumerConnector.createMessageStreams(topicCountMap, decoder, decoder); - List> kafkaConsumers = streamsMap.get(topic); - for (KafkaStream stream : kafkaConsumers) { - KafkaConsumer kafkaConsumer = - createKafkaConsumer(notificationType.getClassType(), notificationType.getDeserializer(), - stream, i, consumerConnector, autoCommitEnabled); - consumers.add(kafkaConsumer); - } - consumerConnectors.add(consumerConnector); - } - + List> consumers = new ArrayList<>(); + AtlasKafkaConsumer kafkaConsumer = new AtlasKafkaConsumer(notificationType.getDeserializer(), getKafkaConsumer(consumerProperties,notificationType, autoCommitEnabled), autoCommitEnabled); + consumers.add(kafkaConsumer); return consumers; } @@ -208,11 +188,6 @@ public class KafkaNotification extends AbstractNotification implements Service { producer.close(); producer = null; } - - for (ConsumerConnector consumerConnector : consumerConnectors) { - consumerConnector.shutdown(); - } - consumerConnectors.clear(); } @@ -254,43 +229,31 @@ public class KafkaNotification extends AbstractNotification implements Service { } } - // ----- helper methods -------------------------------------------------- - /** - * Create a Kafka consumer connector from the given properties. - * - * @param consumerProperties the properties for creating the consumer connector - * - * @return a new Kafka consumer connector - */ - protected ConsumerConnector createConsumerConnector(Properties consumerProperties) { - return Consumer.createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(consumerProperties)); - } + public KafkaConsumer getKafkaConsumer(Properties consumerProperties, NotificationType type, boolean autoCommitEnabled) { + if(this.consumer == null) { + try { + String topic = TOPIC_MAP.get(type); + consumerProperties.put("enable.auto.commit", autoCommitEnabled); + this.consumer = new KafkaConsumer(consumerProperties); + this.consumer.subscribe(Arrays.asList(topic)); + }catch (Exception ee) { + LOG.error("Exception in getKafkaConsumer ", ee); + } + } - /** - * Create a Kafka consumer from the given Kafka stream. - * - * @param type the notification type to be returned by the consumer - * @param deserializer the deserializer for the created consumers - * @param stream the Kafka stream - * @param consumerId the id for the new consumer - * - * @param consumerConnector - * @return a new Kafka consumer - */ - protected org.apache.atlas.kafka.KafkaConsumer - createKafkaConsumer(Class type, MessageDeserializer deserializer, KafkaStream stream, - int consumerId, ConsumerConnector consumerConnector, boolean autoCommitEnabled) { - return new org.apache.atlas.kafka.KafkaConsumer<>(deserializer, stream, - consumerId, consumerConnector, autoCommitEnabled); + return this.consumer; } + + + // Get properties for consumer request private Properties getConsumerProperties(NotificationType type) { // find the configured group id for the given notification type - String groupId = properties.getProperty(type.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY); - if (groupId == null) { + String groupId = properties.getProperty(type.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY); + if (StringUtils.isEmpty(groupId)) { throw new IllegalStateException("No configuration group id set for the notification type " + type); } @@ -298,7 +261,7 @@ public class KafkaNotification extends AbstractNotification implements Service { consumerProperties.putAll(properties); consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); - LOG.info("Consumer property: auto.commit.enable: {}", consumerProperties.getProperty("auto.commit.enable")); + LOG.info("Consumer property: atlas.kafka.enable.auto.commit: {}", consumerProperties.getProperty("enable.auto.commit")); return consumerProperties; } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7ec95fe7/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java index 9585827..ec99372 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java +++ b/notification/src/main/java/org/apache/atlas/notification/AbstractMessageDeserializer.java @@ -128,7 +128,7 @@ public abstract class AbstractMessageDeserializer extends VersionedMessageDes /** * Deserializer for JSONArray. */ - protected static final class JSONArrayDeserializer implements JsonDeserializer { + public static final class JSONArrayDeserializer implements JsonDeserializer { @Override public JSONArray deserialize(final JsonElement json, final Type type, final JsonDeserializationContext context) { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7ec95fe7/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java index d4d78de..8cf1e8e 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java +++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java @@ -16,6 +16,7 @@ * limitations under the License. */ package org.apache.atlas.notification; +import org.apache.kafka.common.TopicPartition; /** * Abstract notification consumer. @@ -25,10 +26,9 @@ public abstract class AbstractNotificationConsumer implements NotificationCon /** * Deserializer used to deserialize notification messages for this consumer. */ - private final MessageDeserializer deserializer; + protected final MessageDeserializer deserializer; - // ----- Constructors ---------------------------------------------------- /** * Construct an AbstractNotificationConsumer. @@ -40,34 +40,6 @@ public abstract class AbstractNotificationConsumer implements NotificationCon } - // ----- AbstractNotificationConsumer ------------------------------------- - /** - * Get the next notification as a string. - * - * @return the next notification in string form - */ - protected abstract String getNext(); - - /** - * Get the next notification as a string without advancing. - * - * @return the next notification in string form - */ - protected abstract String peekMessage(); - - - // ----- NotificationConsumer --------------------------------------------- - - @Override - public T next() { - return deserializer.deserialize(getNext()); - } - - @Override - public T peek() { - return deserializer.deserialize(peekMessage()); - } - - public abstract void commit(); + public abstract void commit(TopicPartition partition, long offset); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7ec95fe7/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java index a99cb10..22e40f9 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java @@ -17,32 +17,16 @@ */ package org.apache.atlas.notification; +import java.util.List; +import org.apache.kafka.common.TopicPartition; +import org.apache.atlas.kafka.AtlasKafkaMessage; + /** * Atlas notification consumer. This consumer blocks until a notification can be read. * * @param the class type of notifications returned by this consumer */ public interface NotificationConsumer { - /** - * Returns true when the consumer has more notifications. Blocks until a notification becomes available. - * - * @return true when the consumer has notifications to be read - */ - boolean hasNext(); - - /** - * Returns the next notification. - * - * @return the next notification - */ - T next(); - - /** - * Returns the next notification without advancing. - * - * @return the next notification - */ - T peek(); /** * Commit the offset of messages that have been successfully processed. @@ -51,7 +35,14 @@ public interface NotificationConsumer { * the consumer is ready to handle the next message, which could happen even after a normal or an abnormal * restart. */ - void commit(); + void commit(TopicPartition partition, long offset); void close(); + + /** + * Fetch data for the topics from Kafka + * @param timeoutMilliSeconds poll timeout + * @return List containing kafka message and partionId and offset. + */ + List> receive(long timeoutMilliSeconds); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7ec95fe7/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java index ad7d93e..70059cb 100644 --- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java +++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java @@ -18,13 +18,9 @@ package org.apache.atlas.kafka; -import kafka.consumer.ConsumerIterator; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import org.apache.atlas.notification.AbstractNotification; import org.apache.atlas.notification.MessageVersion; -import org.apache.atlas.notification.NotificationConsumer; import org.apache.atlas.notification.NotificationInterface; import org.apache.atlas.notification.IncompatibleVersionException; import org.apache.atlas.notification.VersionedMessage; @@ -33,6 +29,11 @@ import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.typesystem.IStruct; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Struct; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; import org.codehaus.jettison.json.JSONException; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -42,7 +43,10 @@ import org.testng.annotations.Test; import java.util.Collections; import java.util.LinkedList; import java.util.List; -import java.util.NoSuchElementException; +import java.util.Arrays; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -57,8 +61,10 @@ public class KafkaConsumerTest { private static final String TRAIT_NAME = "MyTrait"; + @Mock - private ConsumerConnector consumerConnector; + private KafkaConsumer kafkaConsumer; + @BeforeMethod public void setup() { @@ -66,9 +72,9 @@ public class KafkaConsumerTest { } @Test - public void testNext() throws Exception { - KafkaStream stream = mock(KafkaStream.class); - ConsumerIterator iterator = mock(ConsumerIterator.class); + public void testReceive() throws Exception { + + MessageAndMetadata messageAndMetadata = mock(MessageAndMetadata.class); Referenceable entity = getEntity(TRAIT_NAME); @@ -78,29 +84,34 @@ public class KafkaConsumerTest { String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), message)); - when(stream.iterator()).thenReturn(iterator); - when(iterator.hasNext()).thenReturn(true).thenReturn(false); - when(iterator.next()).thenReturn(messageAndMetadata).thenThrow(new NoSuchElementException()); + kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 0))); + List klist = new ArrayList<>(); + klist.add(new ConsumerRecord("ATLAS_HOOK", + 0, 0L, "mykey", json)); + + TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); + Map mp = new HashMap(); + mp.put(tp,klist); + ConsumerRecords records = new ConsumerRecords(mp); + + + when(kafkaConsumer.poll(1000)).thenReturn(records); when(messageAndMetadata.message()).thenReturn(json); - NotificationConsumer consumer = - new KafkaConsumer<>( - NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99, - consumerConnector, false); - assertTrue(consumer.hasNext()); + AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer,false); + List> messageList = consumer.receive(1000); + assertTrue(messageList.size() > 0); - HookNotification.HookNotificationMessage consumedMessage = consumer.next(); + HookNotification.HookNotificationMessage consumedMessage = messageList.get(0).getMessage(); assertMessagesEqual(message, consumedMessage, entity); - assertFalse(consumer.hasNext()); } @Test public void testNextVersionMismatch() throws Exception { - KafkaStream stream = mock(KafkaStream.class); - ConsumerIterator iterator = mock(ConsumerIterator.class); + MessageAndMetadata messageAndMetadata = mock(MessageAndMetadata.class); Referenceable entity = getEntity(TRAIT_NAME); @@ -110,84 +121,56 @@ public class KafkaConsumerTest { String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("2.0.0"), message)); - when(stream.iterator()).thenReturn(iterator); - when(iterator.hasNext()).thenReturn(true).thenReturn(false); - when(iterator.next()).thenReturn(messageAndMetadata).thenThrow(new NoSuchElementException()); - when(messageAndMetadata.message()).thenReturn(json); + kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 0))); + List klist = new ArrayList<>(); + klist.add(new ConsumerRecord("ATLAS_HOOK", + 0, 0L, "mykey", json)); - NotificationConsumer consumer = - new KafkaConsumer<>( - NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99, - consumerConnector, false); + TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); + Map mp = new HashMap(); + mp.put(tp,klist); + ConsumerRecords records = new ConsumerRecords(mp); - assertTrue(consumer.hasNext()); + when(kafkaConsumer.poll(1000)).thenReturn(records); + when(messageAndMetadata.message()).thenReturn(json); + AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer ,false); try { - consumer.next(); + List> messageList = consumer.receive(1000); + assertTrue(messageList.size() > 0); + + HookNotification.HookNotificationMessage consumedMessage = messageList.get(0).getMessage(); + fail("Expected VersionMismatchException!"); } catch (IncompatibleVersionException e) { e.printStackTrace(); } - assertFalse(consumer.hasNext()); - } - - @Test - public void testPeekMessage() throws Exception { - KafkaStream stream = mock(KafkaStream.class); - ConsumerIterator iterator = mock(ConsumerIterator.class); - MessageAndMetadata messageAndMetadata = mock(MessageAndMetadata.class); + } - Referenceable entity = getEntity(TRAIT_NAME); - HookNotification.EntityUpdateRequest message = - new HookNotification.EntityUpdateRequest("user1", entity); - - String json = AbstractNotification.GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), message)); - - when(stream.iterator()).thenReturn(iterator); - when(iterator.hasNext()).thenReturn(true); - when(iterator.peek()).thenReturn(messageAndMetadata); - when(messageAndMetadata.message()).thenReturn(json); - - NotificationConsumer consumer = - new KafkaConsumer<>( - NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99, - consumerConnector, false); + @Test + public void testCommitIsCalledIfAutoCommitDisabled() { - assertTrue(consumer.hasNext()); + TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); - HookNotification.HookNotificationMessage consumedMessage = consumer.peek(); + AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, false); - assertMessagesEqual(message, consumedMessage, entity); + consumer.commit(tp, 1); - assertTrue(consumer.hasNext()); + verify(kafkaConsumer).commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(1))); } @Test - public void testCommitIsCalledIfAutoCommitDisabled() { - KafkaStream stream = mock(KafkaStream.class); - NotificationConsumer consumer = - new KafkaConsumer<>( - NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99, - consumerConnector, false); - - consumer.commit(); + public void testCommitIsNotCalledIfAutoCommitEnabled() { - verify(consumerConnector).commitOffsets(); - } + TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); - @Test - public void testCommitIsNotCalledIfAutoCommitEnabled() { - KafkaStream stream = mock(KafkaStream.class); - NotificationConsumer consumer = - new KafkaConsumer<>( - NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99, - consumerConnector, true); + AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, true); - consumer.commit(); + consumer.commit(tp, 1); - verify(consumerConnector, never()).commitOffsets(); + verify(kafkaConsumer, never()).commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(1))); } private Referenceable getEntity(String traitName) { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7ec95fe7/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java index 2126be6..b7474a0 100644 --- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java +++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java @@ -24,12 +24,13 @@ import org.apache.atlas.notification.MessageDeserializer; import org.apache.atlas.notification.NotificationConsumer; import org.apache.atlas.notification.NotificationException; import org.apache.atlas.notification.NotificationInterface; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.testng.annotations.Test; - +import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -37,7 +38,7 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; - +import org.apache.atlas.kafka.AtlasKafkaConsumer; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; @@ -55,36 +56,24 @@ public class KafkaNotificationMockTest { public void testCreateConsumers() throws Exception { Properties properties = mock(Properties.class); when(properties.getProperty("entities.group.id")).thenReturn("atlas"); - final ConsumerConnector consumerConnector = mock(ConsumerConnector.class); Map topicCountMap = new HashMap<>(); topicCountMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, 1); - Map>> kafkaStreamsMap = - new HashMap<>(); - List> kafkaStreams = new ArrayList<>(); - KafkaStream kafkaStream = mock(KafkaStream.class); - kafkaStreams.add(kafkaStream); - kafkaStreamsMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, kafkaStreams); - - when(consumerConnector.createMessageStreams( - eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class))).thenReturn(kafkaStreamsMap); - - final KafkaConsumer consumer1 = mock(KafkaConsumer.class); - final KafkaConsumer consumer2 = mock(KafkaConsumer.class); + final AtlasKafkaConsumer consumer1 = mock(AtlasKafkaConsumer.class); + final AtlasKafkaConsumer consumer2 = mock(AtlasKafkaConsumer.class); KafkaNotification kafkaNotification = - new TestKafkaNotification(properties, consumerConnector, consumer1, consumer2); + new TestKafkaNotification(properties, consumer1, consumer2); - List> consumers = + List> consumers = kafkaNotification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 2); - verify(consumerConnector, times(2)).createMessageStreams( - eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class)); assertEquals(consumers.size(), 2); assertTrue(consumers.contains(consumer1)); assertTrue(consumers.contains(consumer2)); } + @Test @SuppressWarnings("unchecked") public void shouldSendMessagesSuccessfully() throws NotificationException, @@ -164,27 +153,28 @@ public class KafkaNotificationMockTest { class TestKafkaNotification extends KafkaNotification { - private final ConsumerConnector consumerConnector; - private final KafkaConsumer consumer1; - private final KafkaConsumer consumer2; + private final AtlasKafkaConsumer consumer1; + private final AtlasKafkaConsumer consumer2; - TestKafkaNotification(Properties properties, ConsumerConnector consumerConnector, - KafkaConsumer consumer1, KafkaConsumer consumer2) { + TestKafkaNotification(Properties properties, + AtlasKafkaConsumer consumer1, AtlasKafkaConsumer consumer2) { super(properties); - this.consumerConnector = consumerConnector; this.consumer1 = consumer1; this.consumer2 = consumer2; } + @Override - protected ConsumerConnector createConsumerConnector(Properties consumerProperties) { - return consumerConnector; + public List> createConsumers(NotificationType notificationType, + int numConsumers) { + List consumerList = new ArrayList(); + consumerList.add(consumer1); + consumerList.add(consumer2); + return consumerList; } - @Override - protected org.apache.atlas.kafka.KafkaConsumer - createKafkaConsumer(Class type, MessageDeserializer deserializer, KafkaStream stream, - int consumerId, ConsumerConnector connector, boolean autoCommitEnabled) { + protected AtlasKafkaConsumer + createConsumers(Class type, int consumerId, boolean autoCommitEnabled) { if (consumerId == 0) { return consumer1; } else if (consumerId == 1) { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7ec95fe7/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java index a810029..c791d43 100644 --- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java +++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java @@ -28,6 +28,9 @@ import org.apache.commons.lang.RandomStringUtils; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import static org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; + +import java.util.List; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -52,7 +55,7 @@ public class KafkaNotificationTest { } @Test - public void testNext() throws Exception { + public void testReceiveKafkaMessages() throws Exception { kafkaNotification.send(NotificationInterface.NotificationType.HOOK, new HookNotification.EntityCreateRequest("u1", new Referenceable("type"))); kafkaNotification.send(NotificationInterface.NotificationType.HOOK, @@ -64,44 +67,21 @@ public class KafkaNotificationTest { NotificationConsumer consumer = kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0); - assertTrue(consumer.hasNext()); - HookNotification.HookNotificationMessage message = (HookNotification.HookNotificationMessage) consumer.next(); - assertEquals(message.getUser(), "u1"); - - assertTrue(consumer.hasNext()); - message = (HookNotification.HookNotificationMessage) consumer.next(); - assertEquals(message.getUser(), "u2"); - consumer.close(); - - //nothing committed(even though u1 and u2 are read), now should restart from u1 - consumer = kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0); - assertTrue(consumer.hasNext()); - message = (HookNotification.HookNotificationMessage) consumer.next(); - assertEquals(message.getUser(), "u1"); - consumer.commit(); - - assertTrue(consumer.hasNext()); - message = (HookNotification.HookNotificationMessage) consumer.next(); - assertEquals(message.getUser(), "u2"); - consumer.close(); - - //u1 committed, u2 read, should start from u2 - consumer = kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0); - assertTrue(consumer.hasNext()); - message = (HookNotification.HookNotificationMessage) consumer.next(); - assertEquals(message.getUser(), "u2"); - - assertTrue(consumer.hasNext()); - message = (HookNotification.HookNotificationMessage) consumer.next(); - assertEquals(message.getUser(), "u3"); - consumer.commit(); - consumer.close(); + List> messages = null ; + long startTime = System.currentTimeMillis(); //fetch starting time + while ((System.currentTimeMillis() - startTime) < 10000) { + messages = consumer.receive(1000L); + if (messages.size() > 0) { + break; + } + } + + int i=1; + for (AtlasKafkaMessage msg : messages){ + HookNotification.HookNotificationMessage message = (HookNotificationMessage) msg.getMessage(); + assertEquals(message.getUser(), "u"+i++); + } - //u2, u3 read, but only u3 committed, should start from u4 - consumer = kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0); - assertTrue(consumer.hasNext()); - message = (HookNotification.HookNotificationMessage) consumer.next(); - assertEquals(message.getUser(), "u4"); consumer.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7ec95fe7/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java index 13f2f0b..8324b57 100644 --- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java @@ -20,10 +20,12 @@ package org.apache.atlas.notification; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; +import org.apache.atlas.kafka.AtlasKafkaMessage; import org.slf4j.Logger; import org.testng.annotations.Test; import java.lang.reflect.Type; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Objects; @@ -35,6 +37,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import org.apache.kafka.common.TopicPartition; /** * AbstractNotificationConsumer tests. @@ -44,7 +47,7 @@ public class AbstractNotificationConsumerTest { private static final Gson GSON = new Gson(); @Test - public void testNext() throws Exception { + public void testReceive() throws Exception { Logger logger = mock(Logger.class); TestMessage testMessage1 = new TestMessage("sValue1", 99); @@ -52,7 +55,7 @@ public class AbstractNotificationConsumerTest { TestMessage testMessage3 = new TestMessage("sValue3", 97); TestMessage testMessage4 = new TestMessage("sValue4", 96); - List jsonList = new LinkedList<>(); + List jsonList = new LinkedList<>(); jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1))); jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage2))); @@ -62,25 +65,19 @@ public class AbstractNotificationConsumerTest { Type versionedMessageType = new TypeToken>(){}.getType(); NotificationConsumer consumer = - new TestNotificationConsumer<>(versionedMessageType, jsonList, logger); - - assertTrue(consumer.hasNext()); - - assertEquals(testMessage1, consumer.next()); + new TestNotificationConsumer<>(versionedMessageType, jsonList, logger); - assertTrue(consumer.hasNext()); + List> messageList = consumer.receive(1000L); - assertEquals(testMessage2, consumer.next()); + assertFalse(messageList.isEmpty()); - assertTrue(consumer.hasNext()); + assertEquals(testMessage1, messageList.get(0).getMessage()); - assertEquals(testMessage3, consumer.next()); + assertEquals(testMessage2, messageList.get(1).getMessage()); - assertTrue(consumer.hasNext()); + assertEquals(testMessage3, messageList.get(2).getMessage()); - assertEquals(testMessage4, consumer.next()); - - assertFalse(consumer.hasNext()); + assertEquals(testMessage4, messageList.get(3).getMessage()); } @Test @@ -92,7 +89,7 @@ public class AbstractNotificationConsumerTest { TestMessage testMessage3 = new TestMessage("sValue3", 97); TestMessage testMessage4 = new TestMessage("sValue4", 96); - List jsonList = new LinkedList<>(); + List jsonList = new LinkedList<>(); String json1 = GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1)); String json2 = GSON.toJson(new VersionedMessage<>(new MessageVersion("0.0.5"), testMessage2)); @@ -108,26 +105,17 @@ public class AbstractNotificationConsumerTest { NotificationConsumer consumer = new TestNotificationConsumer<>(versionedMessageType, jsonList, logger); - assertTrue(consumer.hasNext()); - - assertEquals(new TestMessage("sValue1", 99), consumer.next()); - - assertTrue(consumer.hasNext()); - assertEquals(new TestMessage("sValue2", 98), consumer.next()); - verify(logger).info(endsWith(json2)); + List> messageList = consumer.receive(1000L); - assertTrue(consumer.hasNext()); + assertEquals(new TestMessage("sValue1", 99), messageList.get(0).getMessage()); - assertEquals(new TestMessage("sValue3", 97), consumer.next()); - verify(logger).info(endsWith(json3)); + assertEquals(new TestMessage("sValue2", 98), messageList.get(1).getMessage()); - assertTrue(consumer.hasNext()); + assertEquals(new TestMessage("sValue3", 97), messageList.get(2).getMessage()); - assertEquals(new TestMessage("sValue4", 96), consumer.next()); - verify(logger).info(endsWith(json4)); + assertEquals(new TestMessage("sValue4", 96), messageList.get(3).getMessage()); - assertFalse(consumer.hasNext()); } @Test @@ -137,7 +125,7 @@ public class AbstractNotificationConsumerTest { TestMessage testMessage1 = new TestMessage("sValue1", 99); TestMessage testMessage2 = new TestMessage("sValue2", 98); - List jsonList = new LinkedList<>(); + List jsonList = new LinkedList<>(); String json1 = GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1)); String json2 = GSON.toJson(new VersionedMessage<>(new MessageVersion("2.0.0"), testMessage2)); @@ -149,52 +137,19 @@ public class AbstractNotificationConsumerTest { NotificationConsumer consumer = new TestNotificationConsumer<>(versionedMessageType, jsonList, logger); - assertTrue(consumer.hasNext()); - - assertEquals(testMessage1, consumer.next()); + try { + List> messageList = consumer.receive(1000L); - assertTrue(consumer.hasNext()); + messageList.get(1).getMessage(); - try { - consumer.next(); fail("Expected VersionMismatchException!"); } catch (IncompatibleVersionException e) { - verify(logger).error(endsWith(json2)); + } - assertFalse(consumer.hasNext()); } - @Test - public void testPeek() throws Exception { - Logger logger = mock(Logger.class); - - TestMessage testMessage1 = new TestMessage("sValue1", 99); - TestMessage testMessage2 = new TestMessage("sValue2", 98); - TestMessage testMessage3 = new TestMessage("sValue3", 97); - TestMessage testMessage4 = new TestMessage("sValue4", 96); - - List jsonList = new LinkedList<>(); - - jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage1))); - jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage2))); - jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage3))); - jsonList.add(GSON.toJson(new VersionedMessage<>(new MessageVersion("1.0.0"), testMessage4))); - - Type versionedMessageType = new TypeToken>(){}.getType(); - - NotificationConsumer consumer = - new TestNotificationConsumer<>(versionedMessageType, jsonList, logger); - assertTrue(consumer.hasNext()); - assertEquals(testMessage1, consumer.peek()); - - assertTrue(consumer.hasNext()); - - assertEquals(testMessage1, consumer.peek()); - - assertTrue(consumer.hasNext()); - } private static class TestMessage { private String s; @@ -229,31 +184,16 @@ public class AbstractNotificationConsumerTest { } private static class TestNotificationConsumer extends AbstractNotificationConsumer { - private final List messageList; + private final List messageList; private int index = 0; - public TestNotificationConsumer(Type versionedMessageType, List messages, Logger logger) { + public TestNotificationConsumer(Type versionedMessageType, List messages, Logger logger) { super(new TestDeserializer(versionedMessageType, logger)); this.messageList = messages; } @Override - protected String getNext() { - return messageList.get(index++); - } - - @Override - protected String peekMessage() { - return messageList.get(index); - } - - @Override - public boolean hasNext() { - return index < messageList.size(); - } - - @Override - public void commit() { + public void commit(TopicPartition partition, long offset) { // do nothing. } @@ -261,6 +201,15 @@ public class AbstractNotificationConsumerTest { public void close() { //do nothing } + + @Override + public List> receive(long timeoutMilliSeconds) { + List> tempMessageList = new ArrayList(); + for(Object json : messageList) { + tempMessageList.add(new AtlasKafkaMessage(deserializer.deserialize((String)json), -1, -1)); + } + return tempMessageList; + } } private static final class TestDeserializer extends VersionedMessageDeserializer { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7ec95fe7/typesystem/src/test/resources/atlas-application.properties ---------------------------------------------------------------------- diff --git a/typesystem/src/test/resources/atlas-application.properties b/typesystem/src/test/resources/atlas-application.properties index c4ce5ea..7967b76 100644 --- a/typesystem/src/test/resources/atlas-application.properties +++ b/typesystem/src/test/resources/atlas-application.properties @@ -91,7 +91,13 @@ atlas.kafka.consumer.timeout.ms=4000 atlas.kafka.auto.commit.interval.ms=100 atlas.kafka.hook.group.id=atlas atlas.kafka.entities.group.id=atlas_entities -atlas.kafka.auto.commit.enable=false +#atlas.kafka.auto.commit.enable=false + +atlas.kafka.enable.auto.commit=false +atlas.kafka.auto.offset.reset=earliest +atlas.kafka.session.timeout.ms=30000 + + ######### Entity Audit Configs ######### atlas.audit.hbase.tablename=ATLAS_ENTITY_AUDIT_EVENTS http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7ec95fe7/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 2f8245d..9e5b864 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -19,16 +19,15 @@ package org.apache.atlas.notification; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import kafka.consumer.ConsumerTimeoutException; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasServiceException; import org.apache.atlas.RequestContext; import org.apache.atlas.RequestContextV1; import org.apache.atlas.ha.HAConfiguration; +import org.apache.atlas.kafka.AtlasKafkaMessage; import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest; import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.store.graph.AtlasEntityStore; @@ -46,7 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; - +import org.apache.kafka.common.TopicPartition; import javax.inject.Inject; import java.util.ArrayList; import java.util.Date; @@ -135,14 +134,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private void startConsumers(ExecutorService executorService) { int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1); - List> notificationConsumers = + List> notificationConsumers = notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads); if (executorService == null) { executorService = Executors.newFixedThreadPool(notificationConsumers.size(), new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build()); } executors = executorService; - for (final NotificationConsumer consumer : notificationConsumers) { + for (final NotificationConsumer consumer : notificationConsumers) { HookConsumer hookConsumer = new HookConsumer(consumer); consumers.add(hookConsumer); executors.submit(hookConsumer); @@ -207,21 +206,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } class HookConsumer implements Runnable { - private final NotificationConsumer consumer; + private final NotificationConsumer consumer; private final AtomicBoolean shouldRun = new AtomicBoolean(false); - private List failedMessages = new ArrayList<>(); + private List failedMessages = new ArrayList<>(); - public HookConsumer(NotificationConsumer consumer) { + public HookConsumer(NotificationConsumer consumer) { this.consumer = consumer; } - private boolean hasNext() { - try { - return consumer.hasNext(); - } catch (ConsumerTimeoutException e) { - return false; - } - } @Override public void run() { @@ -233,8 +225,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl while (shouldRun.get()) { try { - if (hasNext()) { - handleMessage(consumer.next()); + List> messages = consumer.receive(1000L); + for (AtlasKafkaMessage msg : messages){ + handleMessage(msg); } } catch (Throwable t) { LOG.warn("Failure in NotificationHookConsumer", t); @@ -243,7 +236,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } @VisibleForTesting - void handleMessage(HookNotificationMessage message) throws AtlasServiceException, AtlasException { + void handleMessage(AtlasKafkaMessage kafkaMsg) throws AtlasServiceException, AtlasException { + HookNotificationMessage message = kafkaMsg.getMessage(); String messageUser = message.getUser(); // Used for intermediate conversions during create and update AtlasEntity.AtlasEntitiesWithExtInfo entities; @@ -345,7 +339,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl RequestContextV1.clear(); } } - commit(); + commit(kafkaMsg); } private void recordFailedMessages() { @@ -356,9 +350,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl failedMessages.clear(); } - private void commit() { + private void commit(AtlasKafkaMessage kafkaMessage) { recordFailedMessages(); - consumer.commit(); + TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition()); + consumer.commit(partition, kafkaMessage.getOffset()); } boolean serverAvailable(Timer timer) { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7ec95fe7/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java index ac3b538..7e94330 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java +++ b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java @@ -34,8 +34,6 @@ import org.apache.atlas.typesystem.types.TraitType; import org.apache.atlas.typesystem.types.utils.TypesUtil; import org.apache.atlas.web.integration.BaseResourceIT; import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - import java.util.Collections; import java.util.LinkedList; import java.util.List; @@ -55,7 +53,7 @@ public class EntityNotificationIT extends BaseResourceIT { private Id tableId; private Id dbId; private String traitName; - private NotificationConsumer notificationConsumer; + private NotificationConsumer notificationConsumer; @BeforeClass public void setUp() throws Exception { @@ -64,13 +62,9 @@ public class EntityNotificationIT extends BaseResourceIT { Referenceable HiveDBInstance = createHiveDBInstanceBuiltIn(DATABASE_NAME); dbId = createInstance(HiveDBInstance); - List> consumers = - notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1); - - notificationConsumer = consumers.iterator().next(); + notificationConsumer = notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1).get(0); } - @Test public void testCreateEntity() throws Exception { Referenceable tableInstance = createHiveTableInstanceBuiltIn(DATABASE_NAME, TABLE_NAME, dbId); tableId = createInstance(tableInstance); @@ -81,7 +75,6 @@ public class EntityNotificationIT extends BaseResourceIT { newNotificationPredicate(EntityNotification.OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE_BUILTIN, guid)); } - @Test(dependsOnMethods = "testCreateEntity") public void testUpdateEntity() throws Exception { final String property = "description"; final String newValue = "New description!"; @@ -94,7 +87,6 @@ public class EntityNotificationIT extends BaseResourceIT { newNotificationPredicate(EntityNotification.OperationType.ENTITY_UPDATE, HIVE_TABLE_TYPE_BUILTIN, guid)); } - @Test public void testDeleteEntity() throws Exception { final String tableName = "table-" + randomString(); final String dbName = "db-" + randomString(); @@ -116,7 +108,6 @@ public class EntityNotificationIT extends BaseResourceIT { newNotificationPredicate(EntityNotification.OperationType.ENTITY_DELETE, HIVE_TABLE_TYPE_BUILTIN, guid)); } - @Test(dependsOnMethods = "testCreateEntity") public void testAddTrait() throws Exception { String superSuperTraitName = "SuperTrait" + randomString(); createTrait(superSuperTraitName); @@ -175,7 +166,6 @@ public class EntityNotificationIT extends BaseResourceIT { assertEquals(2, Collections.frequency(allTraitNames, superTraitName)); } - @Test(dependsOnMethods = "testAddTrait") public void testDeleteTrait() throws Exception { final String guid = tableId._getId(); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7ec95fe7/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java index 18fd2ee..650ca0a 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java @@ -22,6 +22,7 @@ import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasServiceException; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.kafka.AtlasKafkaMessage; import org.apache.atlas.kafka.KafkaNotification; import org.apache.atlas.kafka.NotificationProvider; import org.apache.atlas.model.instance.AtlasEntity; @@ -40,12 +41,21 @@ import org.testng.Assert; import org.testng.annotations.AfterTest; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; +import static org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; +import java.util.List; +import org.apache.atlas.kafka.AtlasKafkaConsumer; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.*; +import org.apache.commons.configuration.Configuration; +import org.apache.atlas.ApplicationProperties; +import static org.testng.Assert.*; + + + public class NotificationHookConsumerKafkaTest { public static final String NAME = "name"; @@ -80,6 +90,7 @@ public class NotificationHookConsumerKafkaTest { @AfterTest public void shutdown() { + kafkaNotification.close(); kafkaNotification.stop(); } @@ -87,21 +98,19 @@ public class NotificationHookConsumerKafkaTest { public void testConsumerConsumesNewMessageWithAutoCommitDisabled() throws AtlasException, InterruptedException, AtlasBaseException { try { produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity())); - - NotificationConsumer consumer = - createNewConsumer(kafkaNotification, false); + + NotificationConsumer consumer = createNewConsumer(kafkaNotification, false); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); - NotificationHookConsumer.HookConsumer hookConsumer = - notificationHookConsumer.new HookConsumer(consumer); - + NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); + consumeOneMessage(consumer, hookConsumer); verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean()); // produce another message, and make sure it moves ahead. If commit succeeded, this would work. produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity())); consumeOneMessage(consumer, hookConsumer); - verify(atlasEntityStore, times(2)).createOrUpdate(any(EntityStream.class), anyBoolean()); + verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean()); reset(atlasEntityStore); } finally { @@ -113,42 +122,49 @@ public class NotificationHookConsumerKafkaTest { public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled() throws Exception { try { produceMessage(new HookNotification.EntityCreateRequest("test_user3", createEntity())); - - NotificationConsumer consumer = - createNewConsumer(kafkaNotification, true); + + NotificationConsumer consumer = createNewConsumer(kafkaNotification, true); + + assertNotNull (consumer); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); - NotificationHookConsumer.HookConsumer hookConsumer = - notificationHookConsumer.new HookConsumer(consumer); - + NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); + + consumeOneMessage(consumer, hookConsumer); verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean()); - + // produce another message, but this will not be consumed, as commit code is not executed in hook consumer. produceMessage(new HookNotification.EntityCreateRequest("test_user4", createEntity())); - + consumeOneMessage(consumer, hookConsumer); - verify(atlasEntityStore, times(2)).createOrUpdate(any(EntityStream.class), anyBoolean()); + verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean()); } finally { kafkaNotification.close(); } } - NotificationConsumer createNewConsumer( - KafkaNotification kafkaNotification, boolean autoCommitEnabled) { - return kafkaNotification.createConsumers( - NotificationInterface.NotificationType.HOOK, 1, autoCommitEnabled).get(0); + AtlasKafkaConsumer createNewConsumer(KafkaNotification kafkaNotification, boolean autoCommitEnabled) { + return (AtlasKafkaConsumer) kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1, autoCommitEnabled).get(0); } - void consumeOneMessage(NotificationConsumer consumer, + void consumeOneMessage(NotificationConsumer consumer, NotificationHookConsumer.HookConsumer hookConsumer) throws InterruptedException { - while (!consumer.hasNext()) { - Thread.sleep(1000); - } - try { - hookConsumer.handleMessage(consumer.next()); + long startTime = System.currentTimeMillis(); //fetch starting time + while ((System.currentTimeMillis() - startTime) < 10000) { + List> messages = consumer.receive(1000L); + + for (AtlasKafkaMessage msg : messages) { + hookConsumer.handleMessage(msg); + } + + if (messages.size() > 0) { + break; + } + } } catch (AtlasServiceException | AtlasException e) { Assert.fail("Consumer failed with exception ", e); } @@ -163,7 +179,10 @@ public class NotificationHookConsumerKafkaTest { } KafkaNotification startKafkaServer() throws AtlasException, InterruptedException { - KafkaNotification kafkaNotification = (KafkaNotification) notificationInterface; + Configuration applicationProperties = ApplicationProperties.get(); + applicationProperties.setProperty("atlas.kafka.data", "target/" + RandomStringUtils.randomAlphanumeric(5)); + + kafkaNotification = new KafkaNotification(applicationProperties); kafkaNotification.start(); Thread.sleep(2000); return kafkaNotification; @@ -173,8 +192,8 @@ public class NotificationHookConsumerKafkaTest { return RandomStringUtils.randomAlphanumeric(10); } - private void produceMessage(HookNotification.HookNotificationMessage message) throws NotificationException { - notificationInterface.send(NotificationInterface.NotificationType.HOOK, message); + private void produceMessage(HookNotificationMessage message) throws NotificationException { + kafkaNotification.send(NotificationInterface.NotificationType.HOOK, message); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7ec95fe7/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java index bdb60a2..f4ec56a 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java @@ -21,6 +21,7 @@ import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasServiceException; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.ha.HAConfiguration; +import org.apache.atlas.kafka.AtlasKafkaMessage; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.notification.hook.HookNotification; @@ -36,7 +37,7 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; - +import org.apache.kafka.common.TopicPartition; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -124,9 +125,8 @@ public class NotificationHookConsumerTest { Referenceable mock = mock(Referenceable.class); when(message.getEntities()).thenReturn(Arrays.asList(mock)); - hookConsumer.handleMessage(message); - - verify(consumer).commit(); + hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1)); + verify(consumer).commit(any(TopicPartition.class),anyInt()); } @Test @@ -141,7 +141,7 @@ public class NotificationHookConsumerTest { { add(mock(Referenceable.class)); } }); when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenThrow(new RuntimeException("Simulating exception in processing message")); - hookConsumer.handleMessage(message); + hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1)); verifyZeroInteractions(consumer); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7ec95fe7/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java index b59d3ee..c036cfa 100755 --- a/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java +++ b/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java @@ -21,7 +21,6 @@ package org.apache.atlas.web.integration; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import kafka.consumer.ConsumerTimeoutException; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClientV2; @@ -42,7 +41,9 @@ import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinali import org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef; import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.notification.NotificationConsumer; +import org.apache.atlas.kafka.*; import org.apache.atlas.notification.entity.EntityNotification; +import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.type.AtlasTypeUtil; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Struct; @@ -634,14 +635,21 @@ public abstract class BaseResourceIT { @Override public boolean evaluate() throws Exception { try { - while (consumer.hasNext() && System.currentTimeMillis() < maxCurrentTime) { - EntityNotification notification = consumer.next(); - if (predicate.evaluate(notification)) { - pair.left = notification; - return true; - } + + while (System.currentTimeMillis() < maxCurrentTime) { + List> messageList = consumer.receive(1000); + if(messageList.size() > 0) { + EntityNotification notification = messageList.get(0).getMessage(); + if (predicate.evaluate(notification)) { + pair.left = notification; + return true; + } + }else{ + LOG.info( System.currentTimeMillis()+ " messageList no records" +maxCurrentTime ); + } } - } catch(ConsumerTimeoutException e) { + } catch(Exception e) { + LOG.error(" waitForNotification", e); //ignore } return false; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7ec95fe7/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java index 310b2e3..b527583 100755 --- a/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java +++ b/webapp/src/test/java/org/apache/atlas/web/integration/EntityJerseyResourceIT.java @@ -81,7 +81,6 @@ public class EntityJerseyResourceIT extends BaseResourceIT { private static final String TRAITS = "traits"; private NotificationInterface notificationInterface = NotificationProvider.get(); - private NotificationConsumer notificationConsumer; @BeforeClass public void setUp() throws Exception { @@ -89,10 +88,6 @@ public class EntityJerseyResourceIT extends BaseResourceIT { createTypeDefinitionsV1(); - List> consumers = - notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1); - - notificationConsumer = consumers.iterator().next(); } @Test @@ -218,29 +213,12 @@ public class EntityJerseyResourceIT extends BaseResourceIT { assertEntityAudit(dbId, EntityAuditEvent.EntityAuditAction.ENTITY_CREATE); - waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate() { - @Override - public boolean evaluate(EntityNotification notification) throws Exception { - return notification != null && notification.getEntity().getId()._getId().equals(dbId); - } - }); - JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, dbName)); assertEquals(results.length(), 1); //create entity again shouldn't create another instance with same unique attribute value List entityResults = atlasClientV1.createEntity(HiveDBInstance); assertEquals(entityResults.size(), 0); - try { - waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate() { - @Override - public boolean evaluate(EntityNotification notification) throws Exception { - return notification != null && notification.getEntity().getId()._getId().equals(dbId); - } - }); - } catch (Exception e) { - //expected timeout - } results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, dbName)); assertEquals(results.length(), 1); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/7ec95fe7/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java index 98a7abc..d61a9af 100755 --- a/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java +++ b/webapp/src/test/java/org/apache/atlas/web/integration/EntityV2JerseyResourceIT.java @@ -55,7 +55,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; - +import org.apache.atlas.kafka.AtlasKafkaConsumer; import static org.testng.Assert.*; @@ -72,8 +72,6 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT { private AtlasEntity dbEntity; private AtlasEntity tableEntity; - private NotificationInterface notificationInterface = NotificationProvider.get(); - private NotificationConsumer notificationConsumer; @BeforeClass public void setUp() throws Exception { @@ -81,10 +79,6 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT { createTypeDefinitionsV2(); - List> consumers = - notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1); - - notificationConsumer = consumers.iterator().next(); } @Test @@ -166,14 +160,6 @@ public class EntityV2JerseyResourceIT extends BaseResourceIT { assertEquals(results.length(), 1); final AtlasEntity hiveDBInstanceV2 = createHiveDB(); - // Do the notification thing here - waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate() { - @Override - public boolean evaluate(EntityNotification notification) throws Exception { - return notification != null && notification.getEntity().getId()._getId().equals(hiveDBInstanceV2.getGuid()); - } - }); - results = searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE_V2, DATABASE_NAME)); assertEquals(results.length(), 1);