reef-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Markus Weimer (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (REEF-1453) StreamingNetworkService should create a new observer for each client
Date Tue, 14 Jun 2016 21:26:30 GMT

    [ https://issues.apache.org/jira/browse/REEF-1453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15330648#comment-15330648
] 

Markus Weimer commented on REEF-1453:
-------------------------------------

I'm with Dhruv in confusion :) Networking at this layer should allow us to target a specific,
named {{Observer}} in another Task. Hence, I'd expect some call like 

{code}
  Observable<T> GetRemoteObservable(string taskId, string observerId);
{code}

This way, the object containing this method can keep track of all the subscriptions and route
errors from a Task through all the {{Observable}} instances returned. An alternative design
could be

{code}
  void Subscribe(string taskId, string observerId, Observer<T> observer);
{code}

That way, we directly subscribe the {{Observer}}. The same reasoning applies: The object with
this method can route all Exceptions appropriately.



> StreamingNetworkService should create a new observer for each client
> --------------------------------------------------------------------
>
>                 Key: REEF-1453
>                 URL: https://issues.apache.org/jira/browse/REEF-1453
>             Project: REEF
>          Issue Type: Improvement
>          Components: REEF.NET
>            Reporter: Andrew Chung
>            Assignee: Andrew Chung
>              Labels: FT, breaking_change
>
> {{StreamingNetworkService}} currently only has one universal observer that handles connections
for all incoming clients. This is inconvenient because when a client fails or disconnects,
there is no easy way to propagate the failure/completion signal of the *specific* failed client
up to the universal observer.
> A proposed solution is to instead allow the injection of a {{NetworkObserverFactory}},
which creates a new {{IObserver}} for each new client that is connected to the {{StreamingTransportServer}}.
The {{NetworkObserverFactory}} itself will be wrapped in a universal observer registered of
type {{IObserver<IRemoteMessage<T>>}} such that it receives a notification upon
each new client. For each message, it will cast the {{IRemoteMessage.Identifier}} as a {{SocketRemoteIdentifier}}.
If the {{IPEndpoint}} of the message has not been seen before, it will know that the message
is from a new client and will create an {{IObserver}} for the new client and register the
new client with the {{IPEndpoint}} with the {{ObserverContainer}}.
> The change is a simple migration from the original universal observer model in that state
can still be shared between the {{IObserver}} s spun off by the {{NetworkObserverFactory}}
by passing the shared state object into the constructor of the created {{IObserver}} s.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message