From dev-return-29899-archive-asf-public=cust-asf.ponee.io@ignite.apache.org Wed Jan 10 09:02:39 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 7095D18062E for ; Wed, 10 Jan 2018 09:02:39 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 60A60160C2E; Wed, 10 Jan 2018 08:02:39 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7D171160C23 for ; Wed, 10 Jan 2018 09:02:38 +0100 (CET) Received: (qmail 64209 invoked by uid 500); 10 Jan 2018 08:02:37 -0000 Mailing-List: contact dev-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list dev@ignite.apache.org Received: (qmail 64198 invoked by uid 99); 10 Jan 2018 08:02:37 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Jan 2018 08:02:37 +0000 Received: from mail-ot0-f173.google.com (mail-ot0-f173.google.com [74.125.82.173]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id F30671A008B for ; Wed, 10 Jan 2018 08:02:34 +0000 (UTC) Received: by mail-ot0-f173.google.com with SMTP id 5so5603585oth.8 for ; Wed, 10 Jan 2018 00:02:34 -0800 (PST) X-Gm-Message-State: AKwxytes6RgYq0YWWNcoKWAJsLJF3ACUTp9Ngd5bVUWewzVSOOJtUOrT TCGDfTz2FiVvteQ4tecAe/mSRQ60owakLyieCD+pTQ== X-Google-Smtp-Source: ACJfBosiwVlXB1W0ZP/6/gdR6NKGbz8OnSC/puqDAbQCjItvYpM/JL3nScQLRIZ/OP5xm6XpAnHFl5EEe8JzpqMTFFk= X-Received: by 10.157.11.234 with SMTP id 97mr350474oth.40.1515571354174; Wed, 10 Jan 2018 00:02:34 -0800 (PST) MIME-Version: 1.0 Received: by 10.157.85.3 with HTTP; Wed, 10 Jan 2018 00:02:33 -0800 (PST) In-Reply-To: References: From: Semyon Boikov Date: Wed, 10 Jan 2018 11:02:33 +0300 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: DiscoverySpi based on Apache ZooKeeper To: dev@ignite.apache.org Content-Type: multipart/alternative; boundary="001a113e20ac32aa910562677469" --001a113e20ac32aa910562677469 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Andrey, Vladimir, Zookeeper does provide required ordering guarantees, but Zookeeper watcher API really does not provide functionality required for DiscoverySpi out of the box. To detect nodes join/failure we need to watch for creation/deletion of znodes, but in case if some client disconnects from Zookeeper and reconnects after some time, then it can miss some change events and after reconnect it will get only current state. For example: there are 2 znodes (related to Ignite cluster nodes): A, B. One of zookeeper clients disconnects from Zookeeper and tries to reconnect, at this moment new cluster node joins and creates znode C, then it immediately fails and znode C is removed. When disconnected client restores connection it still sees znodes A and B and is not aware about node C. This means that different clients can see different Zookeeper events, to overcome this issue ZookeeperDiscoverySpi has single coordinator node which listens for Zookeeper notifications and transforms then into DiscoverySpi events (scenario when znode C is created and removed while coordinator is disconnected is still possible, but it is not an issue, this means cluster node C failed before it finished join). With such approach with single coordinator I don't see scenario when different nodes can see different events or some event can be missed. Semyon On Wed, Jan 10, 2018 at 10:38 AM, Vladimir Ozerov wrote: > Hi Andrey, > > Could you please share detail of this API mismatch? AFAIK, the main > guarantee we need for disco SPI is total message ordering. Zookeeper > provides this guarantee. Moreover, Zookeeper is proven to be correct and > reliable coordinator service by many users and Jepsen tests, as opposed t= o > various in-house implementations (e.g. Zen of Elasticsearch). > > =D0=B2=D1=82, 9 =D1=8F=D0=BD=D0=B2. 2018 =D0=B3. =D0=B2 21:53, Andrey Kor= nev : > >> Semyon, >> >> Not to discourage you or anything, just a interesting fact from recent >> history. >> >> I vaguely remember already trying to implement DiscoverySpi on top of >> Zookeeper back in 2012. After a few failed attempts and a lot of help fr= om >> Zookeeper's original developers (Flavio Junqueira and Ben Reed) we (Dmit= riy >> S. and I) concluded that its not possible to implement DiscoverySpi on = top >> of Zookeeper due to strict(er) semantics of DiscoverySpi. Unfortunately = I >> do not remember details, but essentially, in some cases it was not possi= ble >> to establish total ordering of watcher events and under certain >> circumstances loss of such events was possible. >> >> It's not to say that Zookeeper can't be used to implement the cluster >> membership tracking in general. The problem is rather with DiscoverySpi >> semantics that require a different set of APIs than what Zookeeper provi= des. >> >> Regards >> Andrey >> >> ________________________________ >> From: Semyon Boikov >> Sent: Tuesday, January 9, 2018 3:39 AM >> To: dev@ignite.apache.org >> Subject: DiscoverySpi based on Apache ZooKeeper >> >> Hi all, >> >> Currently I'm working on implementation of DiscoverySpi based on Apache >> ZooKeeper (ZookeeperDiscoverySpi) and want to share results of this work= . >> >> In very large clusters (>1000 nodes) current default implementation of >> DiscoverySpi - TcpDiscoverySpi - has some significant drawbacks: >> - TcpDiscoverySpi organizes nodes in ring, and all messages are passed >> sequentially via ring. More nodes there are in ring, more time it takes = to >> pass message. In very large clusters such architecture can cause slowdow= n >> of important operations (node join, node stop, cache start, etc). >> - in TcpDiscoverySpi's protocol each node in ring is able to fail next o= ne >> in case of network issues, and it is possible that two nodes can 'kill' >> each other (it is possible in complex scenarios when network is broken a= nd >> then restored back after some time, such problems were observed in real >> environments), and with current TcpDiscoverySpi protocol there is no eas= y >> way to completely fix such problems. >> - when some node in ring fails, then previous node tries to restore ring >> and sequentially tries to connect to next nodes. If large part of ring >> fails then it takes long time to sequentially detect failure of all node= s. >> - with TcpDiscoverySpi split brain is possible (one ring can split into >> two >> independent parts), separate mechanism is needed to protect from split >> brain when TcpDiscoverySpi is used >> >> Even though most probably some of these problems can be somehow fixed in >> TcpDiscoverySpi, it seems more robust and fast DiscoverySpi can be >> implemented on top of some existing coordination service. >> >> Apache ZooKeeper is known reliable service and it provides all mechanism= s >> and consistency guarantees required for Ignite's DiscoverySpi. Some >> technical details of ZookeeperDiscoverySpi implementation can be found i= n >> description prepared by Sergey Puchnin: >> https://cwiki.apache.org/confluence/display/IGNITE/ >> Discovery+SPI+by+ZooKeeper >> . >> >> In our preliminary tests we were able to successfully start 4000+ nodes >> with ZookeeperDiscoverySpi. New implementation works faster than >> TcpDiscoverySpi and does not have mentioned TcpDiscoverySpi's drawbacks: >> - nodes alive status is controlled by ZooKeeper, so nodes never can kill >> each other >> - ZooKeeper has protection from split brain >> - with ZooKeeper it is possible to detect nodes join/failures in batches >> so >> time to detect join/failure of 1 vs 100+ nodes is almost the same >> >> I'm going to finalize implementation of ZookeeperDiscoverySpi in next fe= w >> days. But in Ignite there is one more issue caused by the fact that >> DiscoverySpi and CommunicationSpi are two independent component: it is >> possible that DiscoverySpi considers some node as alive, but at the same >> time CommunicationSpi is not able to send message to this node (this iss= ue >> exists not only for ZookeeperDiscoverySpi for but for TcpDiscoverySpi >> too). >> Such case is very critical since all internal Ignite's code assumes that >> if >> node is alive then CommunicationSpi is able to send/receive messages >> to/from this node. If it is not the case, then any Ignite operation can >> hang (with ZooKeeper such situation is possible when due to network >> failures nodes have connection with ZooKeeper, but can not establish TCP >> connection to each other). >> >> If such case arises, then Ignite should have some mechanism to forcibly >> kill faulty nodes to keep cluster operational. But note that in case of >> 'split brain' scenarios each independent part of cluster will consider >> others as 'failed' and there should be some way to choose which part >> should >> be killed. It would be good to provide generic solution for this problem >> as >> part of work on new DiscoverySpi. >> >> We discussed this with Yakov Zhdanov and suggest following: in case when >> communication fails to send message to some node and this node is >> considered as alive, then Ignite should trigger global 'communication >> error >> resolve' process (obviously, this process should use for messaging >> internal >> discovery mechanisms). As part of this process CommunicationSpi on each >> node should try to establish connection to all others alive nodes >> (TcpCommunicationSpi can do this efficiently using async NIO) and send >> results of this connection test to some coordinator node (e.g. oldest >> cluster node). When coordinator receives results of connection test from >> all nodes it calls user-defined CommunicationProblemResolver to choose >> which nodes should be killed (CommunicationProblemResolver should be set >> in >> IgniteConfiguration): >> >> public interface CommunicationProblemResolver { >> public void resolve(CommunicationProblemContext ctx); >> } >> >> CommunicationProblemResolver receives CommunicationProblemContext which >> provides results of CommunicationSpi connection test. Also it can be >> useful >> to have information about started caches and current cache data >> distribution to decide which part of cluster should be killed: >> >> public interface CommunicationProblemContext { >> /** >> * @return Current topology snapshot. >> */ >> public List topologySnapshot(); >> >> /** >> * @param node1 First node. >> * @param node2 Second node. >> * @return {@code True} if {@link CommunicationSpi} is able to >> establish connection from first node to second node. >> */ >> public boolean connectionAvailable(ClusterNode node1, ClusterNode >> node2); >> >> /** >> * @return List of currently started cache. >> */ >> public List startedCaches(); >> >> /** >> * @param cacheName Cache name. >> * @return Cache partitions affinity assignment. >> */ >> public List> cacheAffinity(String cacheName); >> >> /** >> * @param cacheName Cache name. >> * @return Cache partitions owners. >> */ >> public List> cachePartitionOwners(String cacheName= ); >> >> /** >> * @param node Node to kill after communication error resolve. >> */ >> public void killNode(ClusterNode node); >> } >> >> Default implementation of CommunicationProblemContext provided as part o= f >> Ignite can keep alive largest sub-cluster where all nodes are able to >> connect to each other. >> >> In addition to CommunicationProblemResolver we can fire new local >> org.apache.ignite.events.Event when CommunicationSpi fails to send messa= ge >> to alive node (can be useful for monitoring): >> >> class CommunicationProblemEvent extends EventAdapter { >> ClusterNode eventNode(); >> >> Exception connectionError(); >> } >> >> >> Since this is pretty large change in public API I would be grateful if y= ou >> provide thoughts about CommunicationProblemResolver. >> >> Thank you, >> Semyon >> > --001a113e20ac32aa910562677469--