atlas-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nixon Rodrigues <nixon.rodrig...@freestoneinfotech.com>
Subject Re: Review Request 61274: ATLAS-1944 - Fix for ConcurrentModificationException Exception in HookConsumer (KafkaConsumer) while stopping Atlas server
Date Wed, 02 Aug 2017 08:48:33 GMT

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/61274/
-----------------------------------------------------------

(Updated Aug. 2, 2017, 8:48 a.m.)


Review request for atlas, Apoorv Naik, Ashutosh Mestry, Madhan Neethiraj, and Sarath Subramanian.


Changes
-------

This patch handles review comments from Apoorv and Ashutosh.

After adding extended HookConsumer from ShutdownableThread and calling consumer.shutdown(),
closing of kafkaconsumer was happening fine before timeout of consumer thread.so added back
the ShutdownableThread implementation in this patch.


Bugs: ATLAS-1944
    https://issues.apache.org/jira/browse/ATLAS-1944


Repository: atlas


Description
-------

Background:- NotificationHookConsumer was throwing ConcurrentModificationException exception
while shutting down the consumer thread. The stop method of thread was calling the kafkacosumer.stop
method while in run method of HookConsumer which is busy in cosuming the kafka message by
polling kafka server. This simaltanous action of stoping and polling causes ConcurrentModificationException.

Fix:- 

* Added try finally block in run method which wraps the while loop and when loop ends the
finally blocks calls the consumer close method is called which ensures that close is called
after consuming is ended. 

* Replaced the ShutdownThread interface which was delaying the shutdown process of consumer
with Runnable interface used earlier.


Reference : - https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html


Diffs (updated)
-----

  notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java d431176 
  notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java 0bd75e1

  notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
bcee00c 
  webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java 51276d3



Diff: https://reviews.apache.org/r/61274/diff/2/

Changes: https://reviews.apache.org/r/61274/diff/1-2/


Testing
-------

Tested the Atlas kafkaconsumer from hive hook and shutdown of atlas server to check if there
is issue while shutdown process.
Unit test and integration test are passing.


Thanks,

Nixon Rodrigues


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message