Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F14BC200B0F for ; Fri, 17 Jun 2016 08:40:06 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id EFFA9160A61; Fri, 17 Jun 2016 06:40:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 457A6160A50 for ; Fri, 17 Jun 2016 08:40:06 +0200 (CEST) Received: (qmail 64390 invoked by uid 500); 17 Jun 2016 06:40:05 -0000 Mailing-List: contact dev-help@reef.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@reef.apache.org Delivered-To: mailing list dev@reef.apache.org Received: (qmail 64369 invoked by uid 99); 17 Jun 2016 06:40:05 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Jun 2016 06:40:05 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 2DF292C033A for ; Fri, 17 Jun 2016 06:40:05 +0000 (UTC) Date: Fri, 17 Jun 2016 06:40:05 +0000 (UTC) From: "Julia (JIRA)" To: dev@reef.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (REEF-1453) StreamingNetworkService should create a new observer for each client MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Fri, 17 Jun 2016 06:40:07 -0000 [ https://issues.apache.org/jira/browse/REEF-1453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15335540#comment-15335540 ] Julia commented on REEF-1453: ----------------------------- Shravan and I had some discussions tonight and we have reached common understanding regarding how to handle the error/exception in GC in fault tolerant. 1. When an error happens in one of the nodes/tasks, all the other tasks should be stopped. It is not just one, but all the other nodes. In another word, if we get an exception from one node, it should be propagated to all the others, not just the individual observer that listen to that particular client. Java REEF also uses universal observer. It is not a problem. 2. The best way to control it is not based on the exception from message ReadAsync thread, but Driver. This is because driver knows all the nodes/tasks in the topology. When a task fails, driver receives IFailedTask and sends close event to all rest of the tasks. This is what Java does. Fortunately it is also what we have implemented in IMRU fault tolerant. 3. When task receives close event, what Java does is to add a special message in the message queue. When task reads that message, it will return. We can do the same thing at .Net. or use cancellation token for the blocking queue, As a fall back plan, we can do enforce close if the task still doesn't return as what we have implemented. 4. When a node/task fails in GC, the ReadAsync thread on anotehr side cannot get message and throw exception. That is fine. It is a loop that keeps reading and throwing. The important thing is to ensure when a task is closed, dispose all the resource, and stop the thread that does ReadAsync. We have fixed a few places where the resources are not disposed. If we find after the task is closed, the ReadAsync thread is still loops, we just need to find more bugs and fixes them. > StreamingNetworkService should create a new observer for each client > -------------------------------------------------------------------- > > Key: REEF-1453 > URL: https://issues.apache.org/jira/browse/REEF-1453 > Project: REEF > Issue Type: Improvement > Components: REEF.NET > Reporter: Andrew Chung > Assignee: Andrew Chung > Labels: FT, breaking_change > > {{StreamingNetworkService}} currently only has one universal observer that handles connections for all incoming clients. This is inconvenient because when a client fails or disconnects, there is no easy way to propagate the failure/completion signal of the *specific* failed client up to the universal observer. > A proposed solution is to instead allow the injection of a {{NetworkObserverFactory}}, which creates a new {{IObserver}} for each new client that is connected to the {{StreamingTransportServer}}. The {{NetworkObserverFactory}} itself will be wrapped in a universal observer registered of type {{IObserver>}} such that it receives a notification upon each new client. For each message, it will cast the {{IRemoteMessage.Identifier}} as a {{SocketRemoteIdentifier}}. If the {{IPEndpoint}} of the message has not been seen before, it will know that the message is from a new client and will create an {{IObserver}} for the new client and register the new client with the {{IPEndpoint}} with the {{ObserverContainer}}. > The change is a simple migration from the original universal observer model in that state can still be shared between the {{IObserver}} s spun off by the {{NetworkObserverFactory}} by passing the shared state object into the constructor of the created {{IObserver}} s. -- This message was sent by Atlassian JIRA (v6.3.4#6332)