apex-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tushar Gosavi <tus...@datatorrent.com>
Subject Re: Changes for calling committed on operators when stream locality is THREAD_LOCAL
Date Fri, 30 Dec 2016 12:16:45 GMT
I would suggest to include fix in the platform. Operators should work
correctly regardless of the stream locality used for connection.

-Tushar

On Thu, Dec 29, 2016, 15:42 Priyanka Gugale <priyag@apache.org> wrote:

> CheckpointNotificationListener extends CheckpointListener and hence
> "committed" method is going to be there. So I think other operator who
> tires to use "committed" with THREAD_LOCAL combination in future will also
> suffer this behavior.
>
> I would suggest we should fix it in platform. We should think more on
> possible solutions at platform level.
>
> -Priyanka
>
> On Thu, Dec 29, 2016 at 3:04 PM, Francis Fernandes <
> francis@datatorrent.com>
> wrote:
>
> > Hi all,
> >
> > Need your inputs on the following:
> >
> > Issue -  I have  an application with 2 operators. The second operator is
> > AbstractFileOutputOperator
> > <https://github.com/apache/apex-malhar/blob/master/
> > library/src/main/java/com/datatorrent/lib/io/fs/
> > AbstractFileOutputOperator.java>
> > which implements the Operator.CheckpointListener(deprecated) committed
> > method. When the locality of the stream connecting the two operators is
> > Locality.THREAD_LOCAL, the committed method is not called.
> >
> > RCA -
> > Inside processHeartbeatResponse
> > <https://github.com/apache/apex-core/blob/master/engine/
> > src/main/java/com/datatorrent/stram/engine/StreamingContainer.java#L763>,
> > we check for the thread != null. In case of THREAD_LOCAL, we have thread
> =
> > null, and so the committed method isn't called.
> >
> > Approach 1 - Change in apex-core/engine
> > For thread local
> > <https://github.com/apache/apex-core/blob/master/engine/
> > src/main/java/com/datatorrent/stram/engine/StreamingContainer.java#L1372>
> > during
> > activate  we do not set the thread
> > <https://github.com/apache/apex-core/blob/master/engine/
> > src/main/java/com/datatorrent/stram/engine/StreamingContainer.java#L1462>
> > in the node's context
> > Because the thread is not set, we skip this operator in the
> > processHeartBeatResponse
> > <https://github.com/apache/apex-core/blob/master/engine/
> > src/main/java/com/datatorrent/stram/engine/StreamingContainer.java#L782>
> > and the committed is not called
> > if (thread == null || !thread.isAlive()) {
> >           continue;
> >         }
> > We need this if condition for invalid operators in case of other
> > localities. Should we keep the OperatorDeployInfo persistent for each
> node
> > so that it can be used later to identify the locality? This might be an
> > intrusive change.
> >
> > Approach 2 - Change in malhar/AbstractFileOutputOperator
> > Refactor the code from the committed
> > <https://github.com/apache/apex-malhar/blob/master/
> > library/src/main/java/com/datatorrent/lib/io/fs/
> > AbstractFileOutputOperator.java#L1260>
> > method to beforeCheckpoint
> > <https://github.com/apache/apex-malhar/blob/master/
> > library/src/main/java/com/datatorrent/lib/io/fs/
> > AbstractFileOutputOperator.java#L1238>.
> > This will require no change in the core platform, but will work only for
> > this particular operator. New operators should not run into this problem
> > because the CheckpointListener is now deprecated and the
> > CheckpointNotificationListener is to be used.
> >
> >
> > Please let me know your thoughts on which approach would be the right way
> > to solve this.
> >
> > Thanks,
> > Francis
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message