apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-2120) Fix bugs on KafkaInputOperatorTest AbstractKafkaInputOperator
Date Mon, 20 Jun 2016 22:21:57 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15340545#comment-15340545
] 

ASF GitHub Bot commented on APEXMALHAR-2120:
--------------------------------------------

Github user brightchen commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/321#discussion_r67777812
  
    --- Diff: kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
---
    @@ -68,26 +70,53 @@
     
       public static String APPLICATION_PATH = baseDir + File.separator + StramLocalCluster.class.getName()
+ File.separator;
     
    +  public class KafkaTestInfo extends TestWatcher
    +  {
    +    public org.junit.runner.Description desc;
    +
    +    public String getDir()
    +    {
    +      String methodName = desc.getMethodName();
    +      String className = desc.getClassName();
    +      return "target/" + className + "/" + methodName + "/" + testName;
    +    }
    +
    +    @Override
    +    protected void starting(org.junit.runner.Description description)
    +    {
    +      this.desc = description;
    +    }
    +  }
    +  
    +  @Rule
    +  public final KafkaTestInfo testInfo = new KafkaTestInfo();
    +  
    +  
       @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, partition:
{2}")
       public static Collection<Object[]> testScenario()
       {
    -    return Arrays.asList(new Object[][]{{true, false, "one_to_one"},// multi cluster
with single partition
    +    return Arrays.asList(new Object[][]{
    +      {true, false, "one_to_one"},// multi cluster with single partition
           {true, false, "one_to_many"},
           {true, true, "one_to_one"},// multi cluster with multi partitions
    -      {true, true, "one_to_many"},
    +      {true, true, "one_to_many"},   //test failed, no data received.
    --- End diff --
    
    Yes, I'll remove the comments


> Fix bugs on KafkaInputOperatorTest AbstractKafkaInputOperator
> -------------------------------------------------------------
>
>                 Key: APEXMALHAR-2120
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2120
>             Project: Apache Apex Malhar
>          Issue Type: Bug
>    Affects Versions: 3.4.0
>            Reporter: bright chen
>            Assignee: bright chen
>             Fix For: 3.5.0
>
>
> problems in Unit Test class: KafkaInputOperatorTest
> - 'k' not initialized for each test case
> - The assert was not correct
> - The test case assume the END_TUPLE will be received at the end of normal tuples, but
in fact the tuples could be out of order where support multiple cluster or partition
> - The operator AbstractKafkaInputOperator implemented as "at least once", but the test
case assume "exactly once"
> ====================================================================================================
> problem of AbstractKafkaInputOperator:
> --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
> - RuntimeException: Couldn't replay the offset:
> For test case KafkaInputOperatorTest.testIdempotentInputOperatorWithFailure() with senario
> {true, true, "one_to_many"}
> ("multi-cluster: true, multi-partition: true, partition: "one_to_many") throws following
exception and the Collector Module didn't collect any data.
> 2016-06-15 10:43:56,358 [1/Kafka inputtesttopic0:KafkaSinglePortInputOperator] INFO stram.StramLocalCluster
log - container-6 msg: Stopped running due to an exception. java.lang.RuntimeException: Couldn't
replay the offset
> at org.apache.apex.malhar.kafka.KafkaConsumerWrapper.emitImmediately(KafkaConsumerWrapper.java:146)
> at org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.replay(AbstractKafkaInputOperator.java:261)
> at org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.beginWindow(AbstractKafkaInputOperator.java:250)
> at com.datatorrent.stram.engine.InputNode.run(InputNode.java:122)
> at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1407)
> Caused by: org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined
offset with no reset policy for partition: testtopic0-1
> at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
> at org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
> at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
> at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)
> ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
> - ConcurrentModificationException
> 2016-06-16 10:14:32,400 [1/Kafka inputtesttopic4:KafkaSinglePortInputOperator] ERROR
engine.StreamingContainer run - Shutdown of operator OperatorDeployInfo[id=1,name=Kafka inputtesttopic4,type=INPUT,checkpoint={ffffffffffffffff,
0, 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=outputPort,streamId=Kafka
messagetesttopic4,bufferServer=MacBook-Pro-2.local]]] failed due to an exception.
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded
access
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1324)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1255)
> 	at org.apache.apex.malhar.kafka.KafkaConsumerWrapper.stop(KafkaConsumerWrapper.java:340)
> 	at org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.deactivate(AbstractKafkaInputOperator.java:184)
> 	at com.datatorrent.stram.engine.Node.deactivate(Node.java:646)
> 	at com.datatorrent.stram.engine.StreamingContainer.teardownNode(StreamingContainer.java:1347)
> 	at com.datatorrent.stram.engine.StreamingContainer.access$500(StreamingContainer.java:130)
> 	at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1438)
> 2016-06-16 10:14:32,400 [2/Kafka inputtesttopic4:KafkaSinglePortInputOperator] ERROR
engine.StreamingContainer run - Shutdown of operator OperatorDeployInfo[id=2,name=Kafka inputtesttopic4,type=INPUT,checkpoint={ffffffffffffffff,
0, 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=outputPort,streamId=Kafka
messagetesttopic4,bufferServer=MacBook-Pro-2.local]]] failed due to an exception.
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded
access
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1324)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1255)
> 	at org.apache.apex.malhar.kafka.KafkaConsumerWrapper.stop(KafkaConsumerWrapper.java:340)
> 	at org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.deactivate(AbstractKafkaInputOperator.java:184)
> 	at com.datatorrent.stram.engine.Node.deactivate(Node.java:646)
> 	at com.datatorrent.stram.engine.StreamingContainer.teardownNode(StreamingContainer.java:1347)
> 	at com.datatorrent.stram.engine.StreamingContainer.access$500(StreamingContainer.java:130)
> 	at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1438)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message