Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 083FD200CF4 for ; Sun, 20 Aug 2017 00:52:56 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 06A97164B17; Sat, 19 Aug 2017 22:52:56 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 297EA164B16 for ; Sun, 20 Aug 2017 00:52:55 +0200 (CEST) Received: (qmail 20255 invoked by uid 500); 19 Aug 2017 22:52:52 -0000 Mailing-List: contact users-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@camel.apache.org Delivered-To: mailing list users@camel.apache.org Received: (qmail 20242 invoked by uid 99); 19 Aug 2017 22:52:52 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 19 Aug 2017 22:52:52 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 83F381A07F8 for ; Sat, 19 Aug 2017 22:52:51 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.18 X-Spam-Level: * X-Spam-Status: No, score=1.18 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (1024-bit key) header.d=regvart.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id ypX6qqLFd6nT for ; Sat, 19 Aug 2017 22:52:48 +0000 (UTC) Received: from mail-oi0-f54.google.com (mail-oi0-f54.google.com [209.85.218.54]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 12F745F3D0 for ; Sat, 19 Aug 2017 22:52:48 +0000 (UTC) Received: by mail-oi0-f54.google.com with SMTP id g131so125672355oic.3 for ; Sat, 19 Aug 2017 15:52:47 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=regvart.com; s=google; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :content-transfer-encoding; bh=zz5mBeW0TE01fARwCX3dNrt5mTUCQguQou7Hzy+ZiGs=; b=NxhGeIcXOhTPM5t8x96R8QAbsNnMCxCgYb/X1Tt0vr3XVLaGJmWuUaY4UUbfYGmrmS ZTDfafOYucmvZT/w30ZM7C1imOGzAHlIYKz6qRg3j3/z6/arbXOUTXakxlfrXbn7DWrv wwoEBgB5VmYYnozu7lb3RLOy4DEYga/3eSB2s= X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:content-transfer-encoding; bh=zz5mBeW0TE01fARwCX3dNrt5mTUCQguQou7Hzy+ZiGs=; b=VtUNnuseL20BWAXYcaSxaXU5JaOhG576TkJDBrZmmczLTbSNhOVn2aHYKPc6PDZcRh dA1+KGlj2Q2VO0hhk1C6KxmvfW6OEk2oeTNrd6qXPjEvofePlRvnrdH45oKM4aVklWv2 eMH22PAdLIAmh3n2OZPT7M7C0nIj3R+/bPMpB1v4Xpsl9a8mM3XflnHigiKOcEvFcLds ZgEDkvNmlTayjoRdQrDCKbupTqb25t6lLj3h79WTqFAKBQdetcdLpHtT/1/oRZ5m+YgP SZUz14Q91gA3PC7EYOfQWxdiLsgHycC4QbLbA1yvW2C1QzHEEhpyk8w61rK2HIKq9HkG qwQQ== X-Gm-Message-State: AHYfb5g8/V4geBw664+LO60iBt3F/QQ/Ci4faR4ypc0j9ssdnA5gzis2 CEt619pVlzVXuld9xHBaPF0wvRT1InWxaBKDzg== X-Received: by 10.202.198.141 with SMTP id w135mr12192587oif.36.1503183166352; Sat, 19 Aug 2017 15:52:46 -0700 (PDT) MIME-Version: 1.0 Received: by 10.74.142.85 with HTTP; Sat, 19 Aug 2017 15:52:25 -0700 (PDT) In-Reply-To: <00d301d316a1$d365db90$7a3192b0$@hm-ag.de> References: <00d301d316a1$d365db90$7a3192b0$@hm-ag.de> From: Zoran Regvart Date: Sun, 20 Aug 2017 00:52:25 +0200 Message-ID: Subject: Re: Race Condition in Aggregation using HazelcastAggregationRepository in a cluster To: users@camel.apache.org Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable archived-at: Sat, 19 Aug 2017 22:52:56 -0000 Hi Michael, it's a bit hard to follow so I could be misunderstanding your issue; is your issue that there is a race condition between the aggregator that expects the reply on node A and another aggregator that is not aware of the initial request on node B? If you're doing only request-reply correlation perhaps take a look at InOut message exchange pattern with a correlation property[1] with the replying application setting the ReplyToQMgr to the requester's queue manager. Or, place the reply in a Hazelcast queue regardless of the queue manager the reply landed on and process the reply from there. Also I think that it would be better to setup the reply coordination expectation (with timeouts and without transactions -- that would block) before sending the message. 2c [1] https://camel.apache.org/correlation-identifier.html On Wed, Aug 16, 2017 at 5:10 PM, Michael L=C3=BCck = wrote: > Hi there, > > we just had an issue in one of our systems and it looks like there is an > issue with locking in the AggregateProcessor in a > distributed environment. > > I=E2=80=99ll try to explain it: > > =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D > Scenario > =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D > > We use camel-core and camel-hazelcast 2.16.5 and hazelcast 3.5.2 > > We have a route which sends a message to an Websphere MQ Queue (via > JMSComponent) and after that we put > the message into an aggregator which uses the JMSCorrelationId to correla= te > the request and the response. > > from(epAggregation) > .aggregate(header("JMSCorrelationID"), new CustomAggregationStrategy()) > > .completionTimeout(Integer.parseInt(getContext().resolvePropertyPlacehold= ers > ("{{timeout}}"))) > .completionSize(2) > .aggregationRepository(aggrRepo) > > The aggregationRepository aggrRepo is created like this > HazelcastAggregationRepository aggrRepo =3D new > HazelcastAggregationRepository ("aggrRepoDrsBatch", hcInst)); > where hcInst is an Instance of com.hazelcast.core.HazelcastInstance. > > We also have another route which reads the response from the response que= ue > and forwards it to the aggregator. > > The environment consists of two nodes on which the same code is running (= so > essentially the send and response routes > and the aggregation) > > The problem arises when the response is returned really fast and is consu= med > on the node that didn't sent the response. > > =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D > Analysis > =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D > > I digged a bit in the camel code and it seems to me that the problem here= is > the lock in the AggregateProcessor as it is local > to the VM in which the code runs. I'll try for an example to make this mo= re > clear: > > - Node A sends a MQ message and after that it puts the message into the > aggregator. The AggregateProcessor runs and > checks the lock before going into doAggregation() > - in doAggregation it tries to get the Exchange from the reposito= ry > and doesn't find any. So it continues to aggregate > the first message an writes this into the repository > - In about the same time between reading the exchange from the repository > and before writing the "aggregated" first > message into the repository Node B fetches the reply from the response > queue and sends it to the aggregator. As in node A > the lock is checked and as the code runs on another VM the lock is free > and the AggregateProcessor can go to doAggregation > - in doAggregation the Node tries to get the Exchange from the > repository before the other node has written it. > And like Node A the code proceeds with creating the first Excha= nge > for the aggregation and writes in into the > repository. > > The result is that one of the nodes will override the Exchange the other > created before. And the Aggreagtion will never > complete (actually it does but because of the timeout) > > =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D > Ideas to solve the problem > =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D > - probably optimistic locking is an option here as > HazelcastAggregationRepository supports this by implementing > OptimisticLockingAggregationRepository > =3D> I'd like to hear your thoughts on this. > > - currently we can stop the route consuming from the response route on on= e > Node to eliminate the error. But this is not > an option for a long time because we lose the ability for fail over > - probably it's an idea to make the AggregateProcessor get the Lock Objec= t > from the repository. So for example for the > HazelcastAggregationRepository the repository can return the lock objec= t > for the hazelcast map which would lock it for the > whole cluster. > - I thought about resending the MQ message in case of an timeout but as t= he > request has side effects on the system that > processes the message this is not really an option. > > So I hope I could make myself clear, > If you have any questions which would help you to help me, I'd happy to > answer them. > > Regards, > Michael > > > --=20 Zoran Regvart