Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C74D2200C31 for ; Tue, 21 Feb 2017 20:50:31 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id C52C1160B4F; Tue, 21 Feb 2017 19:50:31 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E80DE160B68 for ; Tue, 21 Feb 2017 20:50:30 +0100 (CET) Received: (qmail 85331 invoked by uid 500); 21 Feb 2017 19:50:30 -0000 Mailing-List: contact dev-help@apex.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.apache.org Delivered-To: mailing list dev@apex.apache.org Received: (qmail 85098 invoked by uid 99); 21 Feb 2017 19:50:29 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Feb 2017 19:50:29 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 3D54DC0258; Tue, 21 Feb 2017 19:50:29 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.679 X-Spam-Level: * X-Spam-Status: No, score=1.679 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id Cok6rJW3FvHA; Tue, 21 Feb 2017 19:50:26 +0000 (UTC) Received: from mail-qt0-f175.google.com (mail-qt0-f175.google.com [209.85.216.175]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 1E7085F640; Tue, 21 Feb 2017 19:50:26 +0000 (UTC) Received: by mail-qt0-f175.google.com with SMTP id b16so71264958qte.0; Tue, 21 Feb 2017 11:50:26 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=7SoH+SJ6KkAVk0ejaWGrqiYW/+aPdtZGCIlqgCQaKG8=; b=M7lWWe4PifOFCQrA4fb72TjNZKcj261sRUpie159Ojf5CAD5n+EupikhiOqEibKfii F5mFcMe+L8bCTVpEkanDoG+T8cdpYPGZ8/FE+6cOvj20fDk9bYKiTR0R9cslMGcb8j10 9mYqH7fgjWqtBGzUymGbbYbxGIqGsSdCEwprsXwrPNYtLgL3IF7xCwLrEbRYCJezzaWj Aw1qYmA4EZ5e3QfphE5VPh0AwHA7O9Bbfnt39ue6146EX46Py8Mm6yiwVUSLVdbdM9qC +V6swCeuZqKr5aC67l+Uczv5c/ld+EjxDQ4iHn9VFvvt6dv4JDpANv7gf9hF76xHQLWf gTRw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=7SoH+SJ6KkAVk0ejaWGrqiYW/+aPdtZGCIlqgCQaKG8=; b=BfeH2P6C/sxzij3GkLz+s+02WKV56EsSjUi42dwiNd1dYeKpYfrsQjyukBEr7fZgdv FyPdl5PjTeCpuIAMnlrRzpUaaWMxO8iNhAtOD4HITB8xRh09MvfHFhdh+5WrLCOMNAVz bIhFs504kxUJtilZNeDI9T6BkwfVMbT9BAH+jRTCKrnE7XNr7+38ztaFOlXjP1xxPZpR +IQdjvfvGmEvWhiOENHS7PDgRzCXZQ4MD7/e2dSI3Z2i8d5gl+UGz8MX23W82t+d7MPc TV3LaIliAeVSa0y8u+wFtcyDWPYJroCKfjJR7tWXKB9kjhGXv/R+Axq247UNMJugunbN eSkg== X-Gm-Message-State: AMke39nEwxgV1khHEQiAmqlcHYEHFSjjVvasn8jHE08Kp8S4R+Yb5+F+kXkG3ov3UnXExZ970hj8R6BZzBGw7w== X-Received: by 10.200.49.66 with SMTP id h2mr11550303qtb.252.1487706618627; Tue, 21 Feb 2017 11:50:18 -0800 (PST) MIME-Version: 1.0 Received: by 10.55.129.131 with HTTP; Tue, 21 Feb 2017 11:50:17 -0800 (PST) In-Reply-To: References: From: Ashwin Chandra Putta Date: Tue, 21 Feb 2017 11:50:17 -0800 Message-ID: Subject: Re: Occasional Out of order tuples when emitting from a thread To: users@apex.apache.org Cc: "dev@apex.apache.org" , Munagala Ramanath , Allan De Leon , Tim Zhu Content-Type: multipart/alternative; boundary=001a11412778888ef005490fb011 archived-at: Tue, 21 Feb 2017 19:50:32 -0000 --001a11412778888ef005490fb011 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Sunil, You can poll the queue in end window since process method in the input port does not get called if there is no incoming tuple. However, end window is called irrespective of there are incoming tuples or not. Regards, Ashwin. On Tue, Feb 21, 2017 at 11:32 AM, Sunil Parmar wrote: > Ram, > Thanks for the prompt response. If we use the approach you suggested we= =E2=80=99re > dependent on main thread=E2=80=99s process call I.e. Tuples in the thread= safe > queue gets only processed when main thread is processing incoming tuples. > How can we explicitly call the process from polling of delay queue ? > > Just for reference here=E2=80=99s the sample code snippet for our operato= r. > > public class MyOperator extends BaseOperator implements > > Operator.ActivationListener { > =E2=80=A6.. > > @InputPortFieldAnnotation > > public transient DefaultInputPort kafkaStreamInput =3D > > new DefaultInputPort() { > > List errors =3D new ArrayList(); > > @Override > > public void process(String consumerRecord) { > > //Code for normal tuple process > > //Code to poll thread safe queue > > } > > *=E2=80=94=E2=80=94=E2=80=94=E2=80=94=E2=80=94=E2=80=94=E2=80=94=E2=80=94= =E2=80=94=E2=80=94=E2=80=94=E2=80=94**=E2=80=94=E2=80=94=E2=80=94=E2=80=94= =E2=80=94=E2=80=94=E2=80=94=E2=80=94=E2=80=94=E2=80=94=E2=80=94=E2=80=94=E2= =80=94=E2=80=94=E2=80=94=E2=80=94=E2=80=94=E2=80=94=E2=80=94=E2=80=94=E2=80= =94=E2=80=94=E2=80=94=E2=80=94=E2=80=94* > *From: *Munagala Ramanath > *To: *users@apex.apache.org > *CC: *"dev@apex.apache.org" , Allan De Leon < > adeleon@threatmetrix.com>, Tim Zhu > *Subject: *Re: Occasional Out of order tuples when emitting from a thread > *Date: *2017-02-21 10:08 (-0800) > *List: *users@apex.apache.org > > > Please note that tuples should not be emitted by any thread other than th= e > main operator thread. > > A common pattern is to use a thread-safe queue and have worker threads > enqueue > tuples there; the main operator thread then pulls tuples from the queue a= nd > emits them. > > Ram > > _______________________________________________________ > > Munagala V. Ramanath > > Software Engineer > > E: ram@datatorrent.com | M: (408) 331-5034 | Twitter: @UnknownRam > www.datatorrent.com | apex.apache.org > > > From: Sunil Parmar > Date: Tuesday, February 21, 2017 at 10:05 AM > To: "users@apex.apache.org" , "dev@apex.apache.org= " > > Cc: Allan De Leon , Tim Zhu < > tzhu@threatmetrix.com> > Subject: Occasional Out of order tuples when emitting from a thread > > Hi there, > We have the following setup: > > - we have a generic operator that=E2=80=99s processing tuples in its i= nput port > - in the input port=E2=80=99s process method, we check for a condition= , and: > - if the condition is met, the tuple is emitted to the next > operator right away (in the process method) > - Otherwise, if the condition is not met, we store the tuple in > some cache and we use some threads that periodically check the cond= ition to > become true. Once the condition is true, the threads call the emit = method > on the stored tuples. > > With this setup, we occasionally encounter the following error: > 2017-02-15 17:29:09,364 ERROR com.datatorrent.stram.engine.GenericNode: > Catastrophic Error: Out of sequence BEGIN_WINDOW tuple 58a4046100003b7f o= n > port transformedJSON while expecting 58a4046100003b7e > > Is there a way to make the above work correctly? > If not, can you recommend a better way of doing this? > How can we ensure window assignment is done synchronously before emitting > tuples ? > > Thanks very much in advance=E2=80=A6 > -allan > --=20 Regards, Ashwin. --001a11412778888ef005490fb011--