apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Munagala V. Ramanath (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (APEXMALHAR-2419) KafkaSinglePortExactlyOnceOutputOperator fails on recovery
Date Tue, 21 Feb 2017 14:46:44 GMT

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15876082#comment-15876082
] 

Munagala V. Ramanath commented on APEXMALHAR-2419:
--------------------------------------------------

The expectation seems to be that `windowDataManager.getLargestCompletedWindow()` will be the
last window on which `windowDataManager.save()` was called in the previous failed run which
in turn means that `windowId > windowDataManager.getLargestCompletedWindow()` will be true
only when replay is complete. However as the following log messages show, the `windowDataManager.getLargestCompletedWindow()`
always returns the same value during a run -- this may indicate a bug in FSWindowDataManager.

Notice in the initial deploy, the largestCompletedWindow is always -1 even after a save()
call in endWindow:

Initial deploy:
{quote}
        2017-02-21 06:18:55,196 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
beginWindow: windowId = 6389565783323705345, largestCompletedWindow = -1
        2017-02-21 06:18:55,240 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
endWindow: windowId = 6389565783323705345, partialWindowTuples.size = 0, largestCompletedWindow
= -1
        2017-02-21 06:18:55,747 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
endWindow: saved data for windowId = 6389565783323705345, largestCompletedWindow = -1
        2017-02-21 06:18:55,749 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
beginWindow: windowId = 6389565783323705346, largestCompletedWindow = -1
        2017-02-21 06:18:55,749 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
endWindow: windowId = 6389565783323705346, partialWindowTuples.size = 0, largestCompletedWindow
= -1
        2017-02-21 06:18:55,806 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
endWindow: saved data for windowId = 6389565783323705346, largestCompletedWindow = -1
        2017-02-21 06:18:55,806 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
beginWindow: windowId = 6389565783323705347, largestCompletedWindow = -1
        2017-02-21 06:18:55,806 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
endWindow: windowId = 6389565783323705347, partialWindowTuples.size = 0, largestCompletedWindow
= -1
        2017-02-21 06:18:55,906 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
endWindow: saved data for windowId = 6389565783323705347, largestCompletedWindow = -1
        2017-02-21 06:18:55,906 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
beginWindow: windowId = 6389565783323705348, largestCompletedWindow = -1
        2017-02-21 06:18:55,906 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
endWindow: windowId = 6389565783323705348, partialWindowTuples.size = 0, largestCompletedWindow
= -1
        2017-02-21 06:18:55,964 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
endWindow: saved data for windowId = 6389565783323705348, largestCompletedWindow = -1
        2017-02-21 06:18:55,964 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
beginWindow: windowId = 6389565783323705349, largestCompletedWindow = -1
        2017-02-21 06:18:55,965 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
endWindow: windowId = 6389565783323705349, partialWindowTuples.size = 0, largestCompletedWindow
= -1
        2017-02-21 06:18:56,022 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
endWindow: saved data for windowId = 6389565783323705349, largestCompletedWindow = -1
        2017-02-21 06:18:56,023 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
beginWindow: windowId = 6389565783323705350, largestCompletedWindow = -1
        2017-02-21 06:19:03,570 INFO com.datatorrent.stram.engine.StreamingContainer: Undeploy
request: [3]
{quote}

Notice here that when we reach the second replay window, it is not recognized as replay and
throws the exception.

After redeploy:

{quote}
        2017-02-21 06:19:11,899 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
beginWindow: windowId = 6389565783323705345, largestCompletedWindow = 6389565783323705345
        2017-02-21 06:19:11,900 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
Rebuild the partial window after 6389565783323705345
        2017-02-21 06:19:13,151 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
endWindow: windowId = 6389565783323705345, partialWindowTuples.size = 4, largestCompletedWindow
= 6389565783323705345
        2017-02-21 06:19:13,766 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
endWindow: saved data for windowId = 6389565783323705345, largestCompletedWindow = 6389565783323705345
        2017-02-21 06:19:13,766 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
beginWindow: windowId = 6389565783323705346, largestCompletedWindow = 6389565783323705345
        2017-02-21 06:19:13,766 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
endWindow: windowId = 6389565783323705346, partialWindowTuples.size = 3, largestCompletedWindow
= 6389565783323705345
        2017-02-21 06:19:13,773 ERROR com.datatorrent.stram.engine.StreamingContainer: Operator
set [OperatorDeployInfo[id=3,name=kafkaExactlyOnceOutputOperator,type=GENERIC,checkpoint={ffffffffffffffff,
0, 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=inputPort,streamId=linesToKafka,sourceNodeId=2,sourcePortName=output,locality=<null>,partitionMask=0,partitionKeys=<null>]],outputs=[]]]
stopped running due to an exception.
        java.lang.RuntimeException: Violates Exactly once. Not all the tuples received after
operator reset.
 {quote}

> KafkaSinglePortExactlyOnceOutputOperator fails on recovery
> ----------------------------------------------------------
>
>                 Key: APEXMALHAR-2419
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2419
>             Project: Apache Apex Malhar
>          Issue Type: Bug
>            Reporter: Munagala V. Ramanath
>
> The KafkaSinglePortExactlyOnceOutputOperator fails on recovery with the message: "Violates
Exactly once. Not all the tuples received after operator reset."
> This is because of this check in endWindow():
> {code}
>    if (!partialWindowTuples.isEmpty() && windowId > windowDataManager.getLargestCompletedWindow())
{
>       throw new RuntimeException("Violates Exactly once. Not all the tuples received
after operator reset.");
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message