reef-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Markus Weimer (JIRA)" <>
Subject [jira] [Commented] (REEF-1453) StreamingNetworkService should create a new observer for each client
Date Tue, 14 Jun 2016 21:26:30 GMT


Markus Weimer commented on REEF-1453:

I'm with Dhruv in confusion :) Networking at this layer should allow us to target a specific,
named {{Observer}} in another Task. Hence, I'd expect some call like 

  Observable<T> GetRemoteObservable(string taskId, string observerId);

This way, the object containing this method can keep track of all the subscriptions and route
errors from a Task through all the {{Observable}} instances returned. An alternative design
could be

  void Subscribe(string taskId, string observerId, Observer<T> observer);

That way, we directly subscribe the {{Observer}}. The same reasoning applies: The object with
this method can route all Exceptions appropriately.

> StreamingNetworkService should create a new observer for each client
> --------------------------------------------------------------------
>                 Key: REEF-1453
>                 URL:
>             Project: REEF
>          Issue Type: Improvement
>          Components: REEF.NET
>            Reporter: Andrew Chung
>            Assignee: Andrew Chung
>              Labels: FT, breaking_change
> {{StreamingNetworkService}} currently only has one universal observer that handles connections
for all incoming clients. This is inconvenient because when a client fails or disconnects,
there is no easy way to propagate the failure/completion signal of the *specific* failed client
up to the universal observer.
> A proposed solution is to instead allow the injection of a {{NetworkObserverFactory}},
which creates a new {{IObserver}} for each new client that is connected to the {{StreamingTransportServer}}.
The {{NetworkObserverFactory}} itself will be wrapped in a universal observer registered of
type {{IObserver<IRemoteMessage<T>>}} such that it receives a notification upon
each new client. For each message, it will cast the {{IRemoteMessage.Identifier}} as a {{SocketRemoteIdentifier}}.
If the {{IPEndpoint}} of the message has not been seen before, it will know that the message
is from a new client and will create an {{IObserver}} for the new client and register the
new client with the {{IPEndpoint}} with the {{ObserverContainer}}.
> The change is a simple migration from the original universal observer model in that state
can still be shared between the {{IObserver}} s spun off by the {{NetworkObserverFactory}}
by passing the shared state object into the constructor of the created {{IObserver}} s.

This message was sent by Atlassian JIRA

View raw message