Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7B9EB200BCF for ; Mon, 5 Dec 2016 15:09:39 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 7A4D2160B17; Mon, 5 Dec 2016 14:09:39 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9ED39160AF9 for ; Mon, 5 Dec 2016 15:09:38 +0100 (CET) Received: (qmail 62930 invoked by uid 500); 5 Dec 2016 14:09:37 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 62921 invoked by uid 99); 5 Dec 2016 14:09:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 05 Dec 2016 14:09:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BA66CE0159; Mon, 5 Dec 2016 14:09:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: IGNITE-4281: Hadoop: decoupled remote and local maps to simplify further optimizations. This closes #1264. This closes #1315. Date: Mon, 5 Dec 2016 14:09:37 +0000 (UTC) archived-at: Mon, 05 Dec 2016 14:09:39 -0000 Repository: ignite Updated Branches: refs/heads/master acbb8aea8 -> 9e9e37146 IGNITE-4281: Hadoop: decoupled remote and local maps to simplify further optimizations. This closes #1264. This closes #1315. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9e9e3714 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9e9e3714 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9e9e3714 Branch: refs/heads/master Commit: 9e9e371468baef4e2bc7b9fc4c3089e6d073c014 Parents: acbb8ae Author: devozerov Authored: Mon Dec 5 17:09:28 2016 +0300 Committer: devozerov Committed: Mon Dec 5 17:09:28 2016 +0300 ---------------------------------------------------------------------- .../hadoop/shuffle/HadoopShuffleJob.java | 85 ++++++++++++++------ 1 file changed, 61 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9e9e3714/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java index aca5fdf..3afb55a 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java @@ -17,9 +17,7 @@ package org.apache.ignite.internal.processors.hadoop.shuffle; -import java.util.HashMap; import java.util.Iterator; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; @@ -56,8 +54,8 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.thread.IgniteThread; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.PARTITION_HASHMAP_SIZE; -import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_MSG_SIZE; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_JOB_THROTTLE; +import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_MSG_SIZE; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_REDUCER_NO_SORTING; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get; @@ -77,20 +75,26 @@ public class HadoopShuffleJob implements AutoCloseable { /** */ private final boolean needPartitioner; - /** Collection of task contexts for each reduce task. */ - private final Map reducersCtx = new HashMap<>(); + /** Task contexts for each reduce task. */ + private final AtomicReferenceArray locReducersCtx; /** Reducers addresses. */ private T[] reduceAddrs; + /** Total reducer count. */ + private final int totalReducerCnt; + /** Local reducers address. */ private final T locReduceAddr; /** */ private final HadoopShuffleMessage[] msgs; - /** */ - private final AtomicReferenceArray maps; + /** Maps for local reducers. */ + private final AtomicReferenceArray locMaps; + + /** Maps for remote reducers. */ + private final AtomicReferenceArray rmtMaps; /** */ private volatile IgniteInClosure2X io; @@ -129,23 +133,27 @@ public class HadoopShuffleJob implements AutoCloseable { public HadoopShuffleJob(T locReduceAddr, IgniteLogger log, HadoopJob job, GridUnsafeMemory mem, int totalReducerCnt, int[] locReducers) throws IgniteCheckedException { this.locReduceAddr = locReduceAddr; + this.totalReducerCnt = totalReducerCnt; this.job = job; this.mem = mem; this.log = log.getLogger(HadoopShuffleJob.class); msgSize = get(job.info(), SHUFFLE_MSG_SIZE, DFLT_SHUFFLE_MSG_SIZE); + locReducersCtx = new AtomicReferenceArray<>(totalReducerCnt); + if (!F.isEmpty(locReducers)) { for (int rdc : locReducers) { HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.REDUCE, job.id(), rdc, 0, null); - reducersCtx.put(rdc, new LocalTaskContextProxy(taskInfo)); + locReducersCtx.set(rdc, new LocalTaskContextProxy(taskInfo)); } } needPartitioner = totalReducerCnt > 1; - maps = new AtomicReferenceArray<>(totalReducerCnt); + locMaps = new AtomicReferenceArray<>(totalReducerCnt); + rmtMaps = new AtomicReferenceArray<>(totalReducerCnt); msgs = new HadoopShuffleMessage[totalReducerCnt]; throttle = get(job.info(), SHUFFLE_JOB_THROTTLE, 0); @@ -237,13 +245,13 @@ public class HadoopShuffleJob implements AutoCloseable { assert msg.buffer() != null; assert msg.offset() > 0; - HadoopTaskContext taskCtx = reducersCtx.get(msg.reducer()).get(); + HadoopTaskContext taskCtx = locReducersCtx.get(msg.reducer()).get(); HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(taskCtx.counters(), null); perfCntr.onShuffleMessage(msg.reducer(), U.currentTimeMillis()); - HadoopMultimap map = getOrCreateMap(maps, msg.reducer()); + HadoopMultimap map = getOrCreateMap(locMaps, msg.reducer()); // Add data from message to the map. try (HadoopMultimap.Adder adder = map.startAdding(taskCtx)) { @@ -320,10 +328,10 @@ public class HadoopShuffleJob implements AutoCloseable { * Sends map updates to remote reducers. */ private void collectUpdatesAndSend(boolean flush) throws IgniteCheckedException { - for (int i = 0; i < maps.length(); i++) { - HadoopMultimap map = maps.get(i); + for (int i = 0; i < rmtMaps.length(); i++) { + HadoopMultimap map = rmtMaps.get(i); - if (map == null || locReduceAddr.equals(reduceAddrs[i])) + if (map == null) continue; // Skip empty map and local node. if (msgs[i] == null) @@ -448,7 +456,8 @@ public class HadoopShuffleJob implements AutoCloseable { } } - close(maps); + close(locMaps); + close(rmtMaps); } /** @@ -473,7 +482,7 @@ public class HadoopShuffleJob implements AutoCloseable { flushed = true; - if (maps.length() == 0) + if (totalReducerCnt == 0) return new GridFinishedFuture<>(); U.await(ioInitLatch); @@ -544,7 +553,7 @@ public class HadoopShuffleJob implements AutoCloseable { case REDUCE: int reducer = taskCtx.taskInfo().taskNumber(); - HadoopMultimap m = maps.get(reducer); + HadoopMultimap m = locMaps.get(reducer); if (m != null) return m.input(taskCtx); @@ -573,11 +582,24 @@ public class HadoopShuffleJob implements AutoCloseable { } /** + * Check if certain partition (reducer) is local. + * + * @param part Partition. + * @return {@code True} if local. + */ + private boolean isLocalPartition(int part) { + return locReducersCtx.get(part) != null; + } + + /** * Partitioned output. */ private class PartitionedOutput implements HadoopTaskOutput { /** */ - private final HadoopTaskOutput[] adders = new HadoopTaskOutput[maps.length()]; + private final HadoopTaskOutput[] locAdders = new HadoopTaskOutput[locMaps.length()]; + + /** */ + private final HadoopTaskOutput[] rmtAdders = new HadoopTaskOutput[rmtMaps.length()]; /** */ private HadoopPartitioner partitioner; @@ -601,23 +623,38 @@ public class HadoopShuffleJob implements AutoCloseable { int part = 0; if (partitioner != null) { - part = partitioner.partition(key, val, adders.length); + part = partitioner.partition(key, val, totalReducerCnt); - if (part < 0 || part >= adders.length) + if (part < 0 || part >= totalReducerCnt) throw new IgniteCheckedException("Invalid partition: " + part); } - HadoopTaskOutput out = adders[part]; + HadoopTaskOutput out; - if (out == null) - adders[part] = out = getOrCreateMap(maps, part).startAdding(taskCtx); + if (isLocalPartition(part)) { + out = locAdders[part]; + + if (out == null) + locAdders[part] = out = getOrCreateMap(locMaps, part).startAdding(taskCtx); + } + else { + out = rmtAdders[part]; + + if (out == null) + rmtAdders[part] = out = getOrCreateMap(rmtMaps, part).startAdding(taskCtx); + } out.write(key, val); } /** {@inheritDoc} */ @Override public void close() throws IgniteCheckedException { - for (HadoopTaskOutput adder : adders) { + for (HadoopTaskOutput adder : locAdders) { + if (adder != null) + adder.close(); + } + + for (HadoopTaskOutput adder : rmtAdders) { if (adder != null) adder.close(); }