Return-Path: X-Original-To: apmail-apex-dev-archive@minotaur.apache.org Delivered-To: apmail-apex-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4EB54188C5 for ; Wed, 16 Sep 2015 20:44:42 +0000 (UTC) Received: (qmail 78482 invoked by uid 500); 16 Sep 2015 20:44:42 -0000 Delivered-To: apmail-apex-dev-archive@apex.apache.org Received: (qmail 78417 invoked by uid 500); 16 Sep 2015 20:44:42 -0000 Mailing-List: contact dev-help@apex.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@apex.incubator.apache.org Delivered-To: mailing list dev@apex.incubator.apache.org Received: (qmail 78406 invoked by uid 99); 16 Sep 2015 20:44:41 -0000 Received: from Unknown (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Sep 2015 20:44:41 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 6A1BBC0BC9 for ; Wed, 16 Sep 2015 20:44:41 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.98 X-Spam-Level: ** X-Spam-Status: No, score=2.98 tagged_above=-999 required=6.31 tests=[HTML_MESSAGE=3, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id DmXOjWm5tWdn for ; Wed, 16 Sep 2015 20:44:33 +0000 (UTC) Received: from mail-lb0-f174.google.com (mail-lb0-f174.google.com [209.85.217.174]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with ESMTPS id CCA89203A0 for ; Wed, 16 Sep 2015 20:44:32 +0000 (UTC) Received: by lbpo4 with SMTP id o4so110856294lbp.2 for ; Wed, 16 Sep 2015 13:44:26 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:date:message-id:subject:from:to :content-type; bh=5dYEQt295ySXYH3SbpyGmjVN2aMb2XWsWCXk5HQdSlI=; b=UP1+RZHJkICZV6x2ygrZNNIPhn7+1Z8TSyU6e9R3YWT6eGj67KvmC9rpZTeFc02ETj HLtLEGmmry7wx9FKnIrstnSguKY4m/hAa+KctTNW7LzKLd08bHbXChcrhZ3/CjKSGSSN ijZ/kIQtNLUoeJC4d+xNN52UrJSagk+h+EEtl38UqeTXACr9ST0aX8Bn9Tzoe4gcM9IG MlUYZn5IQ8JrUkiicmAKGhOKagK70YdHaEKx0m3S2rSR0wX88KR8eEmXN8GfOoHSiOdx 9Wg1CSdBBcc5tdsBMTkI9PQz/QT65EhzarX0+Zdto2VNntyvzbvmsE1eeqJlYUDmQHyT C3aw== X-Gm-Message-State: ALoCoQlZ2QDAMNWGlzJMsRO1fAiNMAh1O78/6dN9ROxqfmlfajGoyz71FtG6RtJTh+oF+QyApw4s MIME-Version: 1.0 X-Received: by 10.153.4.40 with SMTP id cb8mr32438583lad.100.1442436266256; Wed, 16 Sep 2015 13:44:26 -0700 (PDT) Received: by 10.25.136.131 with HTTP; Wed, 16 Sep 2015 13:44:26 -0700 (PDT) Date: Wed, 16 Sep 2015 13:44:26 -0700 Message-ID: Subject: Supporting iterations in Apex From: David Yan To: dev@apex.incubator.apache.org Content-Type: multipart/alternative; boundary=001a113433d64303f5051fe35da3 --001a113433d64303f5051fe35da3 Content-Type: text/plain; charset=UTF-8 Hi all, One current disadvantage of Apex is the inability to do iterations and machine learning algorithms because we don't allow loops in the application DAG (hence the name DAG). I am proposing that we allow loops in the DAG if the loop advances the window ID by a configured amount. A JIRA ticket has been created: https://malhar.atlassian.net/browse/APEX-60 I have started this work in my fork at https://github.com/davidyan74/incubator-apex-core/tree/APEX-60. The current progress is that a simple test case works. Major work still needs to be done with respect to recovery and partitioning. The value ITERATION_WINDOW_COUNT is an attribute to an input port of an operator. If the value of the attribute is greater than or equal to 1, any tuples sent to the input port are treated to be ITERATION_WINDOW_COUNT windows ahead of what they are. For recovery, we will need to checkpoint all the tuples between ports with the to replay the looped tuples. During the recovery, if the operator has an input port, with ITERATION_WINDOW_COUNT=2, is recovering from checkpoint window 14, the tuples for that input port from window 13 and window 14 need to be replayed to be treated as window 15 and window 16 respectively (13+2 and 14+2). In other words, we need to store all the tuples from window with ID committedWindowId minus ITERATION_WINDOW_COUNT for recovery and purge the tuples earlier than that window. We can optimize this by only storing the tuples for ITERATION_WINDOW_COUNT windows prior to any checkpoint. For that, we need a storage mechanism for the tuples. Chandni already has something that fits this usage case in Apex Malhar. The class is IdempotentStorageManager. In order for this to be used in Apex core, we need to deprecate the class in Apex Malhar and move it to Apex Core. A JIRA ticket has been created for this particular work: https://malhar.atlassian.net/browse/APEX-128 Some of the above has been discussed among Thomas, Chetan, Chandni, and myself. For partitioning, we have not started any discussion or brainstorming. We appreciate any feedback on this and any other aspect related to supporting iterations in general. Thanks! David --001a113433d64303f5051fe35da3--