apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Francis Fernandes <fran...@datatorrent.com>
Subject Changes for calling committed on operators when stream locality is THREAD_LOCAL
Date Thu, 29 Dec 2016 09:34:43 GMT
Hi all,

Need your inputs on the following:

Issue -  I have  an application with 2 operators. The second operator is
AbstractFileOutputOperator
<https://github.com/apache/apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java>
which implements the Operator.CheckpointListener(deprecated) committed
method. When the locality of the stream connecting the two operators is
Locality.THREAD_LOCAL, the committed method is not called.

RCA -
Inside processHeartbeatResponse
<https://github.com/apache/apex-core/blob/master/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java#L763>,
we check for the thread != null. In case of THREAD_LOCAL, we have thread =
null, and so the committed method isn't called.

Approach 1 - Change in apex-core/engine
For thread local
<https://github.com/apache/apex-core/blob/master/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java#L1372>during
activate  we do not set the thread
<https://github.com/apache/apex-core/blob/master/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java#L1462>
in the node's context
Because the thread is not set, we skip this operator in the
processHeartBeatResponse
<https://github.com/apache/apex-core/blob/master/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java#L782>
and the committed is not called
if (thread == null || !thread.isAlive()) {
          continue;
        }
We need this if condition for invalid operators in case of other
localities. Should we keep the OperatorDeployInfo persistent for each node
so that it can be used later to identify the locality? This might be an
intrusive change.

Approach 2 - Change in malhar/AbstractFileOutputOperator
Refactor the code from the committed
<https://github.com/apache/apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java#L1260>
method to beforeCheckpoint
<https://github.com/apache/apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java#L1238>.
This will require no change in the core platform, but will work only for
this particular operator. New operators should not run into this problem
because the CheckpointListener is now deprecated and the
CheckpointNotificationListener is to be used.


Please let me know your thoughts on which approach would be the right way
to solve this.

Thanks,
Francis

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message