apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-2120) Fix bugs on KafkaInputOperatorTest and AbstractKafkaInputOperator
Date Fri, 24 Jun 2016 16:03:16 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2120?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15348478#comment-15348478
] 

ASF GitHub Bot commented on APEXMALHAR-2120:
--------------------------------------------

Github user siyuanh commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/321#discussion_r68420530
  
    --- Diff: kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
---
    @@ -264,49 +336,57 @@ public void testInputOperator(boolean hasFailure, boolean idempotent)
throws Exc
         collector.isIdempotentTest = idempotent;
     
         // Connect ports
    -    dag.addStream("Kafka message", node.outputPort, collector.inputPort).setLocality(Locality.CONTAINER_LOCAL);
    -
    +    dag.addStream("Kafka message" + testName, node.outputPort, collector.inputPort).setLocality(Locality.CONTAINER_LOCAL);
     
         if (hasFailure) {
           setupHasFailureTest(node, dag);
         }
     
         // Create local cluster
    -    final LocalMode.Controller lc = lma.getController();
    +    LocalMode.Controller lc = lma.getController();
         lc.setHeartbeatMonitoringEnabled(false);
     
    -    lc.runAsync();
    -
    -    // Wait 30s for consumer finish consuming all the messages
    -    boolean notTimeout = latch.await(40000, TimeUnit.MILLISECONDS);
    -    Collections.sort(tupleCollection, new Comparator<String>()
    -    {
    -      @Override
    -      public int compare(String o1, String o2)
    -      {
    -        return Integer.parseInt(o1.split("_")[1]) - Integer.parseInt(o2.split("_")[1]);
    -      }
    -    });
    -    Assert.assertTrue("TIMEOUT: 40s Collected " + tupleCollection, notTimeout);
    +    //let the Controller to run the inside another thread. It is almost same as call
Controller.runAsync(), 
    +    //but Controller.runAsync() don't expose the thread which run it, so we don't know
when the thread will be terminated.
    +    //create this thread and then call join() to make sure the Controller shutdown completely.
    +    monitorThread = new Thread((StramLocalCluster)lc, "master");
    +    monitorThread.start();
    +
    +    boolean notTimeout = true;
    +    try {
    +      // Wait 60s for consumer finish consuming all the messages
    +      notTimeout = latch.await(waitTime, TimeUnit.MILLISECONDS);
    +      lc.shutdown();
    +
    +      //wait until control thread finished.
    +      monitorThread.join();
    +    } catch (Exception e) {
    +      logger.warn(e.getMessage());
    +    }
    +    
    +    t.join();
    +    
    +    logger.info("Number of received/expected tuples: {}/{}, testName: {}, tuples: \n{}",
tupleCollection.size(), expectedReceiveCount, testName, tupleCollection);
    --- End diff --
    
    Log this only when something goes wrong


> Fix bugs on KafkaInputOperatorTest and AbstractKafkaInputOperator
> -----------------------------------------------------------------
>
>                 Key: APEXMALHAR-2120
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2120
>             Project: Apache Apex Malhar
>          Issue Type: Bug
>    Affects Versions: 3.4.0
>            Reporter: bright chen
>            Assignee: bright chen
>             Fix For: 3.5.0
>
>
> problems in Unit Test class: KafkaInputOperatorTest
> - 'k' not initialized for each test case
> - The assert was not correct
> - The test case assume the END_TUPLE will be received at the end of normal tuples, but
in fact the tuples could be out of order where support multiple cluster or partition. Here
is one example: 2016-06-22 08:54:12,827 [main] INFO  kafka.KafkaInputOperatorTest testInputOperator
- Number of received/expected tuples: 22/22, testName: testtopic16, tuples: 
> [c1_2, c1_3, c1_6, c1_9, c1_10, c1_13, c1_14, c1_19, c1_20, END_TUPLE, END_TUPLE, c1_1,
c1_4, c1_5, c1_7, c1_8, c1_11, c1_12, c1_15, c1_16, c1_17, c1_18]
> - The operator AbstractKafkaInputOperator implemented as "at least once", but the test
case assume "exactly once"
> - RuntimeException: Couldn't replay the offset: see following log.
> - RuntimeException: OneToOnePartitioner.assign(OneToOnePartitioner.java:52) or OneToManyPartitioner.assign(OneToManyPartitioner.java:57),
that should due to be caused by NullPointerException. See following log
> ====================================================================================================
> problem of AbstractKafkaInputOperator:
> --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
> - RuntimeException: Couldn't replay the offset:
> For test case KafkaInputOperatorTest.testIdempotentInputOperatorWithFailure() with senario
> {true, true, "one_to_many"}
> ("multi-cluster: true, multi-partition: true, partition: "one_to_many") throws following
exception and the Collector Module didn't collect any data.
> 2016-06-15 10:43:56,358 [1/Kafka inputtesttopic0:KafkaSinglePortInputOperator] INFO stram.StramLocalCluster
log - container-6 msg: Stopped running due to an exception. java.lang.RuntimeException: Couldn't
replay the offset
> at org.apache.apex.malhar.kafka.KafkaConsumerWrapper.emitImmediately(KafkaConsumerWrapper.java:146)
> at org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.replay(AbstractKafkaInputOperator.java:261)
> at org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.beginWindow(AbstractKafkaInputOperator.java:250)
> at com.datatorrent.stram.engine.InputNode.run(InputNode.java:122)
> at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1407)
> Caused by: org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined
offset with no reset policy for partition: testtopic0-1
> at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
> at org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
> at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
> at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)
> ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
> - ConcurrentModificationException
> 2016-06-16 10:14:32,400 [1/Kafka inputtesttopic4:KafkaSinglePortInputOperator] ERROR
engine.StreamingContainer run - Shutdown of operator OperatorDeployInfo[id=1,name=Kafka inputtesttopic4,type=INPUT,checkpoint={ffffffffffffffff,
0, 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=outputPort,streamId=Kafka
messagetesttopic4,bufferServer=MacBook-Pro-2.local]]] failed due to an exception.
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded
access
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1324)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1255)
> 	at org.apache.apex.malhar.kafka.KafkaConsumerWrapper.stop(KafkaConsumerWrapper.java:340)
> 	at org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.deactivate(AbstractKafkaInputOperator.java:184)
> 	at com.datatorrent.stram.engine.Node.deactivate(Node.java:646)
> 	at com.datatorrent.stram.engine.StreamingContainer.teardownNode(StreamingContainer.java:1347)
> 	at com.datatorrent.stram.engine.StreamingContainer.access$500(StreamingContainer.java:130)
> 	at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1438)
> 2016-06-16 10:14:32,400 [2/Kafka inputtesttopic4:KafkaSinglePortInputOperator] ERROR
engine.StreamingContainer run - Shutdown of operator OperatorDeployInfo[id=2,name=Kafka inputtesttopic4,type=INPUT,checkpoint={ffffffffffffffff,
0, 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=outputPort,streamId=Kafka
messagetesttopic4,bufferServer=MacBook-Pro-2.local]]] failed due to an exception.
> java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded
access
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1324)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1255)
> 	at org.apache.apex.malhar.kafka.KafkaConsumerWrapper.stop(KafkaConsumerWrapper.java:340)
> 	at org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.deactivate(AbstractKafkaInputOperator.java:184)
> 	at com.datatorrent.stram.engine.Node.deactivate(Node.java:646)
> 	at com.datatorrent.stram.engine.StreamingContainer.teardownNode(StreamingContainer.java:1347)
> 	at com.datatorrent.stram.engine.StreamingContainer.access$500(StreamingContainer.java:130)
> 	at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1438)
> ---------------------------------------------------------------------------------------------------------
> Tests run: 24, Failures: 0, Errors: 2, Skipped: 0, Time elapsed: 154.711 sec <<<
FAILURE!
> testInputOperatorWithFailure[multi-cluster: true, multi-partition: true, partition: one_to_one](org.apache.apex.malhar.kafka.KafkaInputOperatorTest)
 Time elapsed: 0.457 sec  <<< ERROR!
> java.lang.RuntimeException: Error creating local cluster
> 	at org.apache.apex.malhar.kafka.OneToOnePartitioner.assign(OneToOnePartitioner.java:52)
> 	at org.apache.apex.malhar.kafka.AbstractKafkaPartitioner.definePartitions(AbstractKafkaPartitioner.java:110)
> 	at org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.definePartitions(AbstractKafkaInputOperator.java:356)
> 	at com.datatorrent.stram.plan.physical.PhysicalPlan.initPartitioning(PhysicalPlan.java:752)
> 	at com.datatorrent.stram.plan.physical.PhysicalPlan.addLogicalOperator(PhysicalPlan.java:1676)
> 	at com.datatorrent.stram.plan.physical.PhysicalPlan.<init>(PhysicalPlan.java:378)
> 	at com.datatorrent.stram.StreamingContainerManager.<init>(StreamingContainerManager.java:418)
> 	at com.datatorrent.stram.StreamingContainerManager.<init>(StreamingContainerManager.java:406)
> 	at com.datatorrent.stram.StramLocalCluster.<init>(StramLocalCluster.java:299)
> 	at com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:76)
> 	at org.apache.apex.malhar.kafka.KafkaInputOperatorTest.testInputOperator(KafkaInputOperatorTest.java:338)
> 	at org.apache.apex.malhar.kafka.KafkaInputOperatorTest.testInputOperatorWithFailure(KafkaInputOperatorTest.java:285)
> testInputOperatorWithFailure[multi-cluster: false, multi-partition: false, partition:
one_to_many](org.apache.apex.malhar.kafka.KafkaInputOperatorTest)  Time elapsed: 0.333 sec
 <<< ERROR!
> java.lang.RuntimeException: Error creating local cluster
> 	at org.apache.apex.malhar.kafka.OneToManyPartitioner.assign(OneToManyPartitioner.java:57)
> 	at org.apache.apex.malhar.kafka.AbstractKafkaPartitioner.definePartitions(AbstractKafkaPartitioner.java:110)
> 	at org.apache.apex.malhar.kafka.AbstractKafkaInputOperator.definePartitions(AbstractKafkaInputOperator.java:356)
> 	at com.datatorrent.stram.plan.physical.PhysicalPlan.initPartitioning(PhysicalPlan.java:752)
> 	at com.datatorrent.stram.plan.physical.PhysicalPlan.addLogicalOperator(PhysicalPlan.java:1676)
> 	at com.datatorrent.stram.plan.physical.PhysicalPlan.<init>(PhysicalPlan.java:378)
> 	at com.datatorrent.stram.StreamingContainerManager.<init>(StreamingContainerManager.java:418)
> 	at com.datatorrent.stram.StreamingContainerManager.<init>(StreamingContainerManager.java:406)
> 	at com.datatorrent.stram.StramLocalCluster.<init>(StramLocalCluster.java:299)
> 	at com.datatorrent.stram.LocalModeImpl.getController(LocalModeImpl.java:76)
> 	at org.apache.apex.malhar.kafka.KafkaInputOperatorTest.testInputOperator(KafkaInputOperatorTest.java:338)
> 	at org.apache.apex.malhar.kafka.KafkaInputOperatorTest.testInputOperatorWithFailure(KafkaInputOperatorTest.java:285)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message