atlas-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Madhan Neethiraj <mad...@apache.org>
Subject Re: Review Request 61665: ATLAS-2047: Exception Thrown by Kafka Consumer Ends up Filling Logs Due to Incorrect Handling
Date Thu, 31 Aug 2017 00:07:08 GMT

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/61665/#review184102
-----------------------------------------------------------


Fix it, then Ship it!





webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Lines 246 (patched)
<https://reviews.apache.org/r/61665/#comment260305>

    Instead of computing timeSinceLastWait here and passing to setWaitDurations(), I would
suggest the following:
    
    public void pause(Exception ex) {
      setWaitDuration();
      
      try {
        ...
        Thread.sleep(waitDuration);
        ...
      } catch(...) {
        ...
      }
    }
    
    private void setWaitDuration() {
      long now               = System.currentTimeMillis();
      long timeSinceLastWait = now - lastWaitAt; // lastWaitAt will be 0 for the first time,
which will result in "waitDuration = minDuration" in the followig if block
    
      lastWaitAt = now;
    
      if (timeSinceLastWait > resetInterval) {
        waitDuration = minDuration;
      } else if (waitDuration != maxDuration) {
        waitDuration += increment;
        
        if (waitDuration > maxDuration) {
          waitDuration = maxDuration;
        }
      }
    }



webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Lines 281 (patched)
<https://reviews.apache.org/r/61665/#comment260301>

    Use maxWaitDuration from line #127, instead of creating a new one here.



webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
Lines 283 (patched)
<https://reviews.apache.org/r/61665/#comment260302>

    500 ==> minWaitDuration


- Madhan Neethiraj


On Aug. 29, 2017, 10:37 p.m., Ashutosh Mestry wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/61665/
> -----------------------------------------------------------
> 
> (Updated Aug. 29, 2017, 10:37 p.m.)
> 
> 
> Review request for atlas, Madhan Neethiraj and Nixon Rodrigues.
> 
> 
> Bugs: ATLAS-2047
>     https://issues.apache.org/jira/browse/ATLAS-2047
> 
> 
> Repository: atlas
> 
> 
> Description
> -------
> 
> Please refer to [ATLAS-2047](https://issues.apache.org/jira/browse/ATLAS-2047) for background
and analysis.
> 
> **Background**
> 
> The _IllegalStateException_ is thrown by _KafkaConsumer.aquire_. This method is called
at the beginning of almost every method in this class. The method checks if the consumer is
closed, if it is then it throws IllegalStateException.
> 
> Scenario may come about in this way:
> - Shutdown has been initiated. Close on consumer is called.
> - However, the consumer thread is just about to enter another poll cycle.
> - Thus acquire sees that consumer is closed and throws the exception (2nd bullet above).
> 
> Please take a look at this scala code. This is _ShutdownableThread_. The thread does
the job of handling all exceptions. Upon exception, it manages the _shutdownLatch_ (from yesterday’s
bug fix) and gets out of the _isRunning_ loop.
> ```scala
>   override def run(): Unit = {
>     info("Starting ")
>     try{
>       while(isRunning.get()){
>         doWork()
>       }
>     } catch{
>       case e: Throwable =>
>         if(isRunning.get())
>           error("Error due to ", e)
>     }
>     shutdownLatch.countDown()
>     info("Stopped ")
>   }
> ```
> 
> **Implementation**
> 
> Special treatment is given to _IllegalStateException_ by implementing pause & retry
logic:
> - Modified _LOG_ to _debug_. That way logs are not filled during retry.
> - _HookConsumer_ is more resilient. It handles exceptions resulting from _Kafka_ and
entity APIs.
> 
> 
> Diffs
> -----
> 
>   webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ef64c3b

>   webapp/src/test/java/org/apache/atlas/notification/AdaptiveWaiterTest.java PRE-CREATION

>   webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
a6f58e8 
> 
> 
> Diff: https://reviews.apache.org/r/61665/diff/4/
> 
> 
> Testing
> -------
> 
> **Unit tests**
> Updated unit tests to reproduce the scenarios and verify the fix.
> 
> **Functional tests**
> Verified regular notification scenarios.
> 
> 
> Thanks,
> 
> Ashutosh Mestry
> 
>


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message