Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9EB0A200BB1 for ; Thu, 3 Nov 2016 11:29:59 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 9CB39160B0B; Thu, 3 Nov 2016 10:29:59 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BE01B160AFE for ; Thu, 3 Nov 2016 11:29:58 +0100 (CET) Received: (qmail 96775 invoked by uid 500); 3 Nov 2016 10:29:57 -0000 Mailing-List: contact user-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@ignite.apache.org Delivered-To: mailing list user@ignite.apache.org Received: (qmail 96765 invoked by uid 99); 3 Nov 2016 10:29:57 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Nov 2016 10:29:57 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 1DE66CD2E8 for ; Thu, 3 Nov 2016 10:29:57 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.279 X-Spam-Level: * X-Spam-Status: No, score=1.279 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gridgain-com.20150623.gappssmtp.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id iNU2Wzrauwoj for ; Thu, 3 Nov 2016 10:29:56 +0000 (UTC) Received: from mail-lf0-f54.google.com (mail-lf0-f54.google.com [209.85.215.54]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id D06DB5F24F for ; Thu, 3 Nov 2016 10:29:55 +0000 (UTC) Received: by mail-lf0-f54.google.com with SMTP id t196so34024026lff.3 for ; Thu, 03 Nov 2016 03:29:55 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gridgain-com.20150623.gappssmtp.com; s=20150623; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=00f2SltO3tHWrrzUsGpsR0L/QrLDzvKmj4a9+QCjwOk=; b=gyXiVqnJc8Ow1M9f9DtRhP9/7wt4gRmFto1c8oB9mlzZ1cJ8XM8qs+3pnkQPIZgyT2 G25lGbR21YW7diS7zYYZ7OkCeHiYd2THw+TkVH3bXTRioGRpEGxtgGqAL2oFdWxHFG/a H+cB7B1SrSzFVNWcS8oJGLVt2W8X+7Xblz9gMTZMF/JnXETjUJqf+hfJCEW/5i+m55E1 3C4cRS/uby8jN/HHgCP66LFjwc59JiK1YwUPZy7VDjdV0sAj99uGm9YXns98ppORC9ja vOYVaKWtJu0L2cwEEtA2MzHSBCyIfnn83dO8eUOba0hJqV3ABrVeC7ZcrkouyIPyGL43 bXPA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=00f2SltO3tHWrrzUsGpsR0L/QrLDzvKmj4a9+QCjwOk=; b=C3tRqolxeHAJJzSTNwLQxEbOpEL1Js/evVDDq+bzuT9zP1nzc0vB9olCxElAcY7kaf 7SLBeDRghbdBqysN+9eJYzOsixpnJ1PsU11JtSKg+P/u1VlXrbks/nIZc6Dq4Y/BWxoB mFwyaeC6wx4MOtdHu/xtYMBJSCeF4ckiHXlA2f6glYzy408a/y6cYpy3tM3uOYzY0sT/ mYWUGs+OrI5z0fKM728P5wkaonHGEydq1/W6agOPwvl7Bg5UMW7hdEhvxlSh65m4s3gp r+G4Z60Tmn3pD+VU2SzLhY54/808ukNGDpzZq0A4yNoItcny9brPkfboAX9eQxABE/vm iJbw== X-Gm-Message-State: ABUngveRSCnKpwt7kbvRhzBNHNP/80AuMOiKO9eQ1LsdXnJ0oWcDqlVe74SIdZhWZUHIxumNydXMZHkus8lHYdOP X-Received: by 10.25.135.130 with SMTP id j124mr5158025lfd.88.1478168992046; Thu, 03 Nov 2016 03:29:52 -0700 (PDT) MIME-Version: 1.0 Received: by 10.114.177.68 with HTTP; Thu, 3 Nov 2016 03:29:51 -0700 (PDT) In-Reply-To: References: From: Anton Vinogradov Date: Thu, 3 Nov 2016 13:29:51 +0300 Message-ID: Subject: Re: DataStreamer is closed To: user Content-Type: multipart/alternative; boundary=001a113fbc44b08612054063097e archived-at: Thu, 03 Nov 2016 10:29:59 -0000 --001a113fbc44b08612054063097e Content-Type: text/plain; charset=UTF-8 Anil, Could you provide getStreamer() code and full logs? Possible, ignite node was disconnected and this cause DataStreamer closure. On Thu, Nov 3, 2016 at 1:17 PM, Anil wrote: > HI, > > I have created custom kafka data streamer for my use case and i see > following exception. > > java.lang.IllegalStateException: Data streamer has been closed. > at org.apache.ignite.internal.processors.datastreamer. > DataStreamerImpl.enterBusy(DataStreamerImpl.java:360) > at org.apache.ignite.internal.processors.datastreamer. > DataStreamerImpl.addData(DataStreamerImpl.java:507) > at org.apache.ignite.internal.processors.datastreamer. > DataStreamerImpl.addData(DataStreamerImpl.java:498) > at net.juniper.cs.cache.KafkaCacheDataStreamer.addMessage( > KafkaCacheDataStreamer.java:128) > at net.juniper.cs.cache.KafkaCacheDataStreamer$1.run( > KafkaCacheDataStreamer.java:176) > at java.util.concurrent.Executors$RunnableAdapter. > call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > > > addMessage method is > > @Override > protected void addMessage(T msg) { > if (getMultipleTupleExtractor() == null){ > Map.Entry e = getSingleTupleExtractor().extract(msg); > > if (e != null) > getStreamer().addData(e); > > } else { > Map m = getMultipleTupleExtractor().extract(msg); > if (m != null && !m.isEmpty()){ > getStreamer().addData(m); > } > } > } > > > Do you see any issue ? Please let me know if you need any additional > information. thanks. > > Thanks. > --001a113fbc44b08612054063097e Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Anil,=C2=A0

Could you provide=C2=A0getStreamer() code and full logs?
Possible, ignite node was disconnected and this cause DataStrea= mer closure.

On Thu, Nov 3, 2016 at 1:17 PM, Anil <anilklce@gmail.com> wrote:
HI,<= div>
I have created custom kafka data streamer for my use cas= e and i see following exception.

java.lang.IllegalStateException: Data streamer has been closed.
=C2=A0 = =C2=A0 =C2=A0 =C2=A0 at org.apache.ignite.internal.processors.datastre= amer.DataStreamerImpl.enterBusy(DataStreamerImpl.java:360)
<= div>=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.ignite.internal.processo= rs.datastreamer.DataStreamerImpl.addData(DataStreamerImpl.java:50= 7)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.addData(DataStreamerImp= l.java:498)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at net.juniper.cs.cache.<= wbr>KafkaCacheDataStreamer.addMessage(KafkaCacheDataStreamer.java= :128)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at net.juniper.cs.cache.KafkaCacheDataStreamer$1.run(KafkaCacheDataStreamer.java:176)<= /div>
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.util.concurrent.Executor= s$RunnableAdapter.call(Executors.java:511)
=C2=A0 =C2=A0 =C2= =A0 =C2=A0 at java.util.concurrent.FutureTask.run(FutureTask.java= :266)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 at java.util.concurrent.Th= readPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
=C2= =A0 =C2=A0 =C2=A0 =C2=A0 at java.util.concurrent.ThreadPoolExecutor$Wo= rker.run(ThreadPoolExecutor.java:617)
=C2=A0 =C2=A0 =C2=A0 = =C2=A0 at java.lang.Thread.run(Thread.java:745)



addMessage method is =C2=A0

=C2=A0@Override
=C2=A0 =C2=A0 protected voi= d addMessage(T msg) {
=C2=A0 =C2=A0 if (getM= ultipleTupleExtractor() =3D=3D null){
=C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 Map.Entry<K, V> e =3D getSingleTupleExtractor().e= xtract(msg);

=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 if (e !=3D null)
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 getStreamer().addData(e);

=C2=A0 = =C2=A0 =C2=A0 =C2=A0 } else {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 Map<K, V> m =3D getMultipleTupleExtractor().extract(msg);=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 if (m !=3D null &&= amp; !m.isEmpty()){
=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 getStreamer().addData(m);
=C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 =C2=A0 =C2=A0 }
=C2=A0 = =C2=A0 }


Do you see any issue= ? Please let me know if you need any additional information. thanks.
=

Thanks.

--001a113fbc44b08612054063097e--