Return-Path: Delivered-To: apmail-hadoop-common-dev-archive@www.apache.org Received: (qmail 79873 invoked from network); 15 Jul 2009 02:36:58 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 15 Jul 2009 02:36:58 -0000 Received: (qmail 61711 invoked by uid 500); 15 Jul 2009 02:37:07 -0000 Delivered-To: apmail-hadoop-common-dev-archive@hadoop.apache.org Received: (qmail 61632 invoked by uid 500); 15 Jul 2009 02:37:06 -0000 Mailing-List: contact common-dev-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-dev@hadoop.apache.org Received: (qmail 61622 invoked by uid 99); 15 Jul 2009 02:37:06 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Jul 2009 02:37:06 +0000 X-ASF-Spam-Status: No, hits=2.2 required=10.0 tests=HTML_MESSAGE,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of roman.wsmo@gmail.com designates 209.85.218.206 as permitted sender) Received: from [209.85.218.206] (HELO mail-bw0-f206.google.com) (209.85.218.206) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Jul 2009 02:36:57 +0000 Received: by bwz2 with SMTP id 2so3155503bwz.29 for ; Tue, 14 Jul 2009 19:36:37 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=gamma; h=domainkey-signature:mime-version:received:date:message-id:subject :from:to:content-type; bh=TrFx7M9bF1iNF7i/LnMjyhIe8Z4YtpajGH17DvStczU=; b=bgc/oV4yRuMz+hNSdukLrz0khsAgCYbM4Xy5TsTABebfd54w5KvJDQ8miOuxVTfyzy eGVzKk+VdyMk7M32Hz1BQ5WZKHRhJlBAcC5YlY/mn3vxjZdsbhvoJkWc0HdWZvWoMFOJ S1QLoK7t62Z8EHXwNjSqoZVdvz5L5m/ZoiU+M= DomainKey-Signature: a=rsa-sha1; c=nofws; d=gmail.com; s=gamma; h=mime-version:date:message-id:subject:from:to:content-type; b=TMlXHQcz0g2RFzrtri3e4VaJLXJ2qvd8/n2Orliwpx2mmawAXhF21vRdeHptSJ9iJW VLX44D9q8sZ+1vE4kBNy5wJwvfOiuIH179n2ihkEj05ZvPKSvHJCFdTj0m9l2pNUfdcT RZzQ0+3w7MZt64I/idOmVNruVroomqdZIy5cY= MIME-Version: 1.0 Received: by 10.103.6.14 with SMTP id j14mr3718941mui.48.1247625397004; Tue, 14 Jul 2009 19:36:37 -0700 (PDT) Date: Wed, 15 Jul 2009 03:36:36 +0100 Message-ID: <597eea000907141936re692c6ajeac1956c9d7569f2@mail.gmail.com> Subject: Hadoop execution improvement in a heterogeneous environment From: roman kolcun To: common-dev@hadoop.apache.org Content-Type: multipart/alternative; boundary=0016364166f399eae6046eb56c08 X-Virus-Checked: Checked by ClamAV on apache.org --0016364166f399eae6046eb56c08 Content-Type: text/plain; charset=ISO-8859-1 Content-Transfer-Encoding: 7bit Hello everyone, I've got an idea of how to improve the execution time of map phase in a heterogeneous environment (when other processes may run on the machines rendering it slower than other machines). Currently map phase process data in larger chunks (usually 64MB). At the end of map phase data assigned to slower nodes (strugglers) are reassigned to faster nodes and map phase finishes whenever one of these tasks finishes sooner. The other one is killed. Lets assume there are two TaskTrackers each processing 128MB. One of these two is significantly slower. After first node process all 128MB (2 x 64MB) the second node has processed let's say only 80MB (1 x 64MB + 16MB). Currently the first node will be assigned to the second chunk from the second node (64MB) and will finish it probably faster then the second node. The second task on second node would be killed. When the first node finishes the reassigned data the second node will process approx. 100MB which means that 36MB were processed redundantly. My idea is that on each node there will be a special DataAssign thread which will take care of assigning data to each map thread. This thread will communicate with JobTracker and inform it about the progress. It will also split data into smaller chunks (i.e. 32MB) and assign these smaller chunks to map thread upon request. In the scenario above, the JobTracker will realise that the second node is slower so it will notify the DataAssign thread that it should not assign last 32MB to its node, but this last smaller chunk (only 32MB) will be assigned to the first node. At the end of the day, no data will be processed twice and overall execution should be shorter. In addition to this I would like to change the implementation of the JobTracker so it will assign whole input file(s) to all nodes at the beginning taking into account the data locality. After that the data may be rebalanced depending on the info retrieved from the DataAssign thread. Do you think it is a good idea? Do you think it could work? Has anyone been (or is) working on this or similar feature? Thank you for all replies and suggestions. Yours Sincerely, Roman --0016364166f399eae6046eb56c08--