Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5297C200B87 for ; Mon, 19 Sep 2016 12:50:40 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 51339160ABC; Mon, 19 Sep 2016 10:50:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DAD60160B06 for ; Mon, 19 Sep 2016 12:50:34 +0200 (CEST) Received: (qmail 1874 invoked by uid 500); 19 Sep 2016 10:50:34 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 569 invoked by uid 99); 19 Sep 2016 10:50:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Sep 2016 10:50:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BC3C5E7E18; Mon, 19 Sep 2016 10:50:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Mon, 19 Sep 2016 10:51:13 -0000 Message-Id: <0db3c24a5e9b4d22937c24d1b867cba7@git.apache.org> In-Reply-To: <209484d636ea43f191eab75fbd570f63@git.apache.org> References: <209484d636ea43f191eab75fbd570f63@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [43/51] [partial] ignite git commit: IGNITE-3916: Created separate module. archived-at: Mon, 19 Sep 2016 10:50:40 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java new file mode 100644 index 0000000..3eb819b --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolSubmitJobTask.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.proto; + +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.compute.ComputeJobContext; +import org.apache.ignite.internal.processors.hadoop.Hadoop; +import org.apache.ignite.internal.processors.hadoop.HadoopDefaultJobInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus; + +import static org.apache.ignite.internal.processors.hadoop.HadoopJobPhase.PHASE_CANCELLING; + +/** + * Submit job task. + */ +public class HadoopProtocolSubmitJobTask extends HadoopProtocolTaskAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public HadoopJobStatus run(ComputeJobContext jobCtx, Hadoop hadoop, + HadoopProtocolTaskArguments args) throws IgniteCheckedException { + UUID nodeId = UUID.fromString(args.get(0)); + Integer id = args.get(1); + HadoopDefaultJobInfo info = args.get(2); + + assert nodeId != null; + assert id != null; + assert info != null; + + HadoopJobId jobId = new HadoopJobId(nodeId, id); + + hadoop.submit(jobId, info); + + HadoopJobStatus res = hadoop.status(jobId); + + if (res == null) // Submission failed. + res = new HadoopJobStatus(jobId, info.jobName(), info.user(), 0, 0, 0, 0, PHASE_CANCELLING, true, 1); + + return res; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java new file mode 100644 index 0000000..c3227ae --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskAdapter.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.proto; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.compute.ComputeJob; +import org.apache.ignite.compute.ComputeJobContext; +import org.apache.ignite.compute.ComputeJobResult; +import org.apache.ignite.compute.ComputeJobResultPolicy; +import org.apache.ignite.compute.ComputeTask; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.hadoop.Hadoop; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.resources.JobContextResource; +import org.jetbrains.annotations.Nullable; + +/** + * Hadoop protocol task adapter. + */ +public abstract class HadoopProtocolTaskAdapter implements ComputeTask { + /** {@inheritDoc} */ + @Nullable @Override public Map map(List subgrid, + @Nullable HadoopProtocolTaskArguments arg) { + return Collections.singletonMap(new Job(arg), subgrid.get(0)); + } + + /** {@inheritDoc} */ + @Override public ComputeJobResultPolicy result(ComputeJobResult res, List rcvd) { + return ComputeJobResultPolicy.REDUCE; + } + + /** {@inheritDoc} */ + @Nullable @Override public R reduce(List results) { + if (!F.isEmpty(results)) { + ComputeJobResult res = results.get(0); + + return res.getData(); + } + else + return null; + } + + /** + * Job wrapper. + */ + private class Job implements ComputeJob { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + @SuppressWarnings("UnusedDeclaration") + @JobContextResource + private ComputeJobContext jobCtx; + + /** Argument. */ + private final HadoopProtocolTaskArguments args; + + /** + * Constructor. + * + * @param args Job argument. + */ + private Job(HadoopProtocolTaskArguments args) { + this.args = args; + } + + /** {@inheritDoc} */ + @Override public void cancel() { + // No-op. + } + + /** {@inheritDoc} */ + @Nullable @Override public Object execute() { + try { + return run(jobCtx, ((IgniteEx)ignite).hadoop(), args); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + } + + /** + * Run the task. + * + * @param jobCtx Job context. + * @param hadoop Hadoop facade. + * @param args Arguments. + * @return Job result. + * @throws IgniteCheckedException If failed. + */ + public abstract R run(ComputeJobContext jobCtx, Hadoop hadoop, HadoopProtocolTaskArguments args) + throws IgniteCheckedException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java new file mode 100644 index 0000000..e497454 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/proto/HadoopProtocolTaskArguments.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.proto; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +/** + * Task arguments. + */ +public class HadoopProtocolTaskArguments implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Arguments. */ + private Object[] args; + + /** + * {@link Externalizable} support. + */ + public HadoopProtocolTaskArguments() { + // No-op. + } + + /** + * Constructor. + * + * @param args Arguments. + */ + public HadoopProtocolTaskArguments(Object... args) { + this.args = args; + } + + /** + * @param idx Argument index. + * @return Argument. + */ + @SuppressWarnings("unchecked") + @Nullable public T get(int idx) { + return (args != null && args.length > idx) ? (T)args[idx] : null; + } + + /** + * @return Size. + */ + public int size() { + return args != null ? args.length : 0; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeArray(out, args); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + args = U.readArray(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopProtocolTaskArguments.class, this); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java new file mode 100644 index 0000000..769bdc4 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle; + +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.GridTopic; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.hadoop.HadoopComponent; +import org.apache.ignite.internal.processors.hadoop.HadoopContext; +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput; +import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.lang.IgniteInClosure2X; +import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; + +/** + * Shuffle. + */ +public class HadoopShuffle extends HadoopComponent { + /** */ + private final ConcurrentMap> jobs = new ConcurrentHashMap<>(); + + /** */ + protected final GridUnsafeMemory mem = new GridUnsafeMemory(0); + + /** {@inheritDoc} */ + @Override public void start(HadoopContext ctx) throws IgniteCheckedException { + super.start(ctx); + + ctx.kernalContext().io().addUserMessageListener(GridTopic.TOPIC_HADOOP, + new IgniteBiPredicate() { + @Override public boolean apply(UUID nodeId, Object msg) { + return onMessageReceived(nodeId, (HadoopMessage)msg); + } + }); + } + + /** + * Stops shuffle. + * + * @param cancel If should cancel all ongoing activities. + */ + @Override public void stop(boolean cancel) { + for (HadoopShuffleJob job : jobs.values()) { + try { + job.close(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to close job.", e); + } + } + + jobs.clear(); + } + + /** + * Creates new shuffle job. + * + * @param jobId Job ID. + * @return Created shuffle job. + * @throws IgniteCheckedException If job creation failed. + */ + private HadoopShuffleJob newJob(HadoopJobId jobId) throws IgniteCheckedException { + HadoopMapReducePlan plan = ctx.jobTracker().plan(jobId); + + HadoopShuffleJob job = new HadoopShuffleJob<>(ctx.localNodeId(), log, + ctx.jobTracker().job(jobId, null), mem, plan.reducers(), plan.reducers(ctx.localNodeId())); + + UUID[] rdcAddrs = new UUID[plan.reducers()]; + + for (int i = 0; i < rdcAddrs.length; i++) { + UUID nodeId = plan.nodeForReducer(i); + + assert nodeId != null : "Plan is missing node for reducer [plan=" + plan + ", rdc=" + i + ']'; + + rdcAddrs[i] = nodeId; + } + + boolean init = job.initializeReduceAddresses(rdcAddrs); + + assert init; + + return job; + } + + /** + * @param nodeId Node ID to send message to. + * @param msg Message to send. + * @throws IgniteCheckedException If send failed. + */ + private void send0(UUID nodeId, Object msg) throws IgniteCheckedException { + ClusterNode node = ctx.kernalContext().discovery().node(nodeId); + + ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0); + } + + /** + * @param jobId Task info. + * @return Shuffle job. + */ + private HadoopShuffleJob job(HadoopJobId jobId) throws IgniteCheckedException { + HadoopShuffleJob res = jobs.get(jobId); + + if (res == null) { + res = newJob(jobId); + + HadoopShuffleJob old = jobs.putIfAbsent(jobId, res); + + if (old != null) { + res.close(); + + res = old; + } + else if (res.reducersInitialized()) + startSending(res); + } + + return res; + } + + /** + * Starts message sending thread. + * + * @param shuffleJob Job to start sending for. + */ + private void startSending(HadoopShuffleJob shuffleJob) { + shuffleJob.startSending(ctx.kernalContext().gridName(), + new IgniteInClosure2X() { + @Override public void applyx(UUID dest, HadoopShuffleMessage msg) throws IgniteCheckedException { + send0(dest, msg); + } + } + ); + } + + /** + * Message received callback. + * + * @param src Sender node ID. + * @param msg Received message. + * @return {@code True}. + */ + public boolean onMessageReceived(UUID src, HadoopMessage msg) { + if (msg instanceof HadoopShuffleMessage) { + HadoopShuffleMessage m = (HadoopShuffleMessage)msg; + + try { + job(m.jobId()).onShuffleMessage(m); + } + catch (IgniteCheckedException e) { + U.error(log, "Message handling failed.", e); + } + + try { + // Reply with ack. + send0(src, new HadoopShuffleAck(m.id(), m.jobId())); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to reply back to shuffle message sender [snd=" + src + ", msg=" + msg + ']', e); + } + } + else if (msg instanceof HadoopShuffleAck) { + HadoopShuffleAck m = (HadoopShuffleAck)msg; + + try { + job(m.jobId()).onShuffleAck(m); + } + catch (IgniteCheckedException e) { + U.error(log, "Message handling failed.", e); + } + } + else + throw new IllegalStateException("Unknown message type received to Hadoop shuffle [src=" + src + + ", msg=" + msg + ']'); + + return true; + } + + /** + * @param taskCtx Task info. + * @return Output. + */ + public HadoopTaskOutput output(HadoopTaskContext taskCtx) throws IgniteCheckedException { + return job(taskCtx.taskInfo().jobId()).output(taskCtx); + } + + /** + * @param taskCtx Task info. + * @return Input. + */ + public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException { + return job(taskCtx.taskInfo().jobId()).input(taskCtx); + } + + /** + * @param jobId Job id. + */ + public void jobFinished(HadoopJobId jobId) { + HadoopShuffleJob job = jobs.remove(jobId); + + if (job != null) { + try { + job.close(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to close job: " + jobId, e); + } + } + } + + /** + * Flushes all the outputs for the given job to remote nodes. + * + * @param jobId Job ID. + * @return Future. + */ + public IgniteInternalFuture flush(HadoopJobId jobId) { + HadoopShuffleJob job = jobs.get(jobId); + + if (job == null) + return new GridFinishedFuture<>(); + + try { + return job.flush(); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); + } + } + + /** + * @return Memory. + */ + public GridUnsafeMemory memory() { + return mem; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java new file mode 100644 index 0000000..6013ec6 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleAck.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; + +/** + * Acknowledgement message. + */ +public class HadoopShuffleAck implements HadoopMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + @GridToStringInclude + private long msgId; + + /** */ + @GridToStringInclude + private HadoopJobId jobId; + + /** + * + */ + public HadoopShuffleAck() { + // No-op. + } + + /** + * @param msgId Message ID. + */ + public HadoopShuffleAck(long msgId, HadoopJobId jobId) { + assert jobId != null; + + this.msgId = msgId; + this.jobId = jobId; + } + + /** + * @return Message ID. + */ + public long id() { + return msgId; + } + + /** + * @return Job ID. + */ + public HadoopJobId jobId() { + return jobId; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + jobId.writeExternal(out); + out.writeLong(msgId); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + jobId = new HadoopJobId(); + + jobId.readExternal(in); + msgId = in.readLong(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopShuffleAck.class, this); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java new file mode 100644 index 0000000..b940c72 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReferenceArray; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopJob; +import org.apache.ignite.internal.processors.hadoop.HadoopPartitioner; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskType; +import org.apache.ignite.internal.processors.hadoop.counter.HadoopPerformanceCounter; +import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopConcurrentHashMultimap; +import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopMultimap; +import org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipList; +import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.internal.util.future.GridCompoundFuture; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.io.GridUnsafeDataInput; +import org.apache.ignite.internal.util.lang.GridClosureException; +import org.apache.ignite.internal.util.lang.IgniteInClosure2X; +import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.internal.util.worker.GridWorker; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.thread.IgniteThread; + +import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.PARTITION_HASHMAP_SIZE; +import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_REDUCER_NO_SORTING; +import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get; + +/** + * Shuffle job. + */ +public class HadoopShuffleJob implements AutoCloseable { + /** */ + private static final int MSG_BUF_SIZE = 128 * 1024; + + /** */ + private final HadoopJob job; + + /** */ + private final GridUnsafeMemory mem; + + /** */ + private final boolean needPartitioner; + + /** Collection of task contexts for each reduce task. */ + private final Map reducersCtx = new HashMap<>(); + + /** Reducers addresses. */ + private T[] reduceAddrs; + + /** Local reducers address. */ + private final T locReduceAddr; + + /** */ + private final HadoopShuffleMessage[] msgs; + + /** */ + private final AtomicReferenceArray maps; + + /** */ + private volatile IgniteInClosure2X io; + + /** */ + protected ConcurrentMap>> sentMsgs = + new ConcurrentHashMap<>(); + + /** */ + private volatile GridWorker snd; + + /** Latch for remote addresses waiting. */ + private final CountDownLatch ioInitLatch = new CountDownLatch(1); + + /** Finished flag. Set on flush or close. */ + private volatile boolean flushed; + + /** */ + private final IgniteLogger log; + + /** + * @param locReduceAddr Local reducer address. + * @param log Logger. + * @param job Job. + * @param mem Memory. + * @param totalReducerCnt Amount of reducers in the Job. + * @param locReducers Reducers will work on current node. + * @throws IgniteCheckedException If error. + */ + public HadoopShuffleJob(T locReduceAddr, IgniteLogger log, HadoopJob job, GridUnsafeMemory mem, + int totalReducerCnt, int[] locReducers) throws IgniteCheckedException { + this.locReduceAddr = locReduceAddr; + this.job = job; + this.mem = mem; + this.log = log.getLogger(HadoopShuffleJob.class); + + if (!F.isEmpty(locReducers)) { + for (int rdc : locReducers) { + HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.REDUCE, job.id(), rdc, 0, null); + + reducersCtx.put(rdc, job.getTaskContext(taskInfo)); + } + } + + needPartitioner = totalReducerCnt > 1; + + maps = new AtomicReferenceArray<>(totalReducerCnt); + msgs = new HadoopShuffleMessage[totalReducerCnt]; + } + + /** + * @param reduceAddrs Addresses of reducers. + * @return {@code True} if addresses were initialized by this call. + */ + public boolean initializeReduceAddresses(T[] reduceAddrs) { + if (this.reduceAddrs == null) { + this.reduceAddrs = reduceAddrs; + + return true; + } + + return false; + } + + /** + * @return {@code True} if reducers addresses were initialized. + */ + public boolean reducersInitialized() { + return reduceAddrs != null; + } + + /** + * @param gridName Grid name. + * @param io IO Closure for sending messages. + */ + @SuppressWarnings("BusyWait") + public void startSending(String gridName, IgniteInClosure2X io) { + assert snd == null; + assert io != null; + + this.io = io; + + if (!flushed) { + snd = new GridWorker(gridName, "hadoop-shuffle-" + job.id(), log) { + @Override protected void body() throws InterruptedException { + try { + while (!isCancelled()) { + Thread.sleep(5); + + collectUpdatesAndSend(false); + } + } + catch (IgniteCheckedException e) { + throw new IllegalStateException(e); + } + } + }; + + new IgniteThread(snd).start(); + } + + ioInitLatch.countDown(); + } + + /** + * @param maps Maps. + * @param idx Index. + * @return Map. + */ + private HadoopMultimap getOrCreateMap(AtomicReferenceArray maps, int idx) { + HadoopMultimap map = maps.get(idx); + + if (map == null) { // Create new map. + map = get(job.info(), SHUFFLE_REDUCER_NO_SORTING, false) ? + new HadoopConcurrentHashMultimap(job.info(), mem, get(job.info(), PARTITION_HASHMAP_SIZE, 8 * 1024)): + new HadoopSkipList(job.info(), mem); + + if (!maps.compareAndSet(idx, null, map)) { + map.close(); + + return maps.get(idx); + } + } + + return map; + } + + /** + * @param msg Message. + * @throws IgniteCheckedException Exception. + */ + public void onShuffleMessage(HadoopShuffleMessage msg) throws IgniteCheckedException { + assert msg.buffer() != null; + assert msg.offset() > 0; + + HadoopTaskContext taskCtx = reducersCtx.get(msg.reducer()); + + HadoopPerformanceCounter perfCntr = HadoopPerformanceCounter.getCounter(taskCtx.counters(), null); + + perfCntr.onShuffleMessage(msg.reducer(), U.currentTimeMillis()); + + HadoopMultimap map = getOrCreateMap(maps, msg.reducer()); + + // Add data from message to the map. + try (HadoopMultimap.Adder adder = map.startAdding(taskCtx)) { + final GridUnsafeDataInput dataInput = new GridUnsafeDataInput(); + final UnsafeValue val = new UnsafeValue(msg.buffer()); + + msg.visit(new HadoopShuffleMessage.Visitor() { + /** */ + private HadoopMultimap.Key key; + + @Override public void onKey(byte[] buf, int off, int len) throws IgniteCheckedException { + dataInput.bytes(buf, off, off + len); + + key = adder.addKey(dataInput, key); + } + + @Override public void onValue(byte[] buf, int off, int len) { + val.off = off; + val.size = len; + + key.add(val); + } + }); + } + } + + /** + * @param ack Shuffle ack. + */ + @SuppressWarnings("ConstantConditions") + public void onShuffleAck(HadoopShuffleAck ack) { + IgniteBiTuple> tup = sentMsgs.get(ack.id()); + + if (tup != null) + tup.get2().onDone(); + else + log.warning("Received shuffle ack for not registered shuffle id: " + ack); + } + + /** + * Unsafe value. + */ + private static class UnsafeValue implements HadoopMultimap.Value { + /** */ + private final byte[] buf; + + /** */ + private int off; + + /** */ + private int size; + + /** + * @param buf Buffer. + */ + private UnsafeValue(byte[] buf) { + assert buf != null; + + this.buf = buf; + } + + /** */ + @Override public int size() { + return size; + } + + /** */ + @Override public void copyTo(long ptr) { + GridUnsafe.copyMemory(buf, GridUnsafe.BYTE_ARR_OFF + off, null, ptr, size); + } + } + + /** + * Sends map updates to remote reducers. + */ + private void collectUpdatesAndSend(boolean flush) throws IgniteCheckedException { + for (int i = 0; i < maps.length(); i++) { + HadoopMultimap map = maps.get(i); + + if (map == null || locReduceAddr.equals(reduceAddrs[i])) + continue; // Skip empty map and local node. + + if (msgs[i] == null) + msgs[i] = new HadoopShuffleMessage(job.id(), i, MSG_BUF_SIZE); + + final int idx = i; + + map.visit(false, new HadoopMultimap.Visitor() { + /** */ + private long keyPtr; + + /** */ + private int keySize; + + /** */ + private boolean keyAdded; + + /** {@inheritDoc} */ + @Override public void onKey(long keyPtr, int keySize) { + this.keyPtr = keyPtr; + this.keySize = keySize; + + keyAdded = false; + } + + private boolean tryAdd(long valPtr, int valSize) { + HadoopShuffleMessage msg = msgs[idx]; + + if (!keyAdded) { // Add key and value. + int size = keySize + valSize; + + if (!msg.available(size, false)) + return false; + + msg.addKey(keyPtr, keySize); + msg.addValue(valPtr, valSize); + + keyAdded = true; + + return true; + } + + if (!msg.available(valSize, true)) + return false; + + msg.addValue(valPtr, valSize); + + return true; + } + + /** {@inheritDoc} */ + @Override public void onValue(long valPtr, int valSize) { + if (tryAdd(valPtr, valSize)) + return; + + send(idx, keySize + valSize); + + keyAdded = false; + + if (!tryAdd(valPtr, valSize)) + throw new IllegalStateException(); + } + }); + + if (flush && msgs[i].offset() != 0) + send(i, 0); + } + } + + /** + * @param idx Index of message. + * @param newBufMinSize Min new buffer size. + */ + private void send(final int idx, int newBufMinSize) { + final GridFutureAdapter fut = new GridFutureAdapter<>(); + + HadoopShuffleMessage msg = msgs[idx]; + + final long msgId = msg.id(); + + IgniteBiTuple> old = sentMsgs.putIfAbsent(msgId, + new IgniteBiTuple>(msg, fut)); + + assert old == null; + + try { + io.apply(reduceAddrs[idx], msg); + } + catch (GridClosureException e) { + fut.onDone(U.unwrap(e)); + } + + fut.listen(new IgniteInClosure>() { + @Override public void apply(IgniteInternalFuture f) { + try { + f.get(); + + // Clean up the future from map only if there was no exception. + // Otherwise flush() should fail. + sentMsgs.remove(msgId); + } + catch (IgniteCheckedException e) { + log.error("Failed to send message.", e); + } + } + }); + + msgs[idx] = newBufMinSize == 0 ? null : new HadoopShuffleMessage(job.id(), idx, + Math.max(MSG_BUF_SIZE, newBufMinSize)); + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteCheckedException { + if (snd != null) { + snd.cancel(); + + try { + snd.join(); + } + catch (InterruptedException e) { + throw new IgniteInterruptedCheckedException(e); + } + } + + close(maps); + } + + /** + * @param maps Maps. + */ + private void close(AtomicReferenceArray maps) { + for (int i = 0; i < maps.length(); i++) { + HadoopMultimap map = maps.get(i); + + if (map != null) + map.close(); + } + } + + /** + * @return Future. + */ + @SuppressWarnings("unchecked") + public IgniteInternalFuture flush() throws IgniteCheckedException { + if (log.isDebugEnabled()) + log.debug("Flushing job " + job.id() + " on address " + locReduceAddr); + + flushed = true; + + if (maps.length() == 0) + return new GridFinishedFuture<>(); + + U.await(ioInitLatch); + + GridWorker snd0 = snd; + + if (snd0 != null) { + if (log.isDebugEnabled()) + log.debug("Cancelling sender thread."); + + snd0.cancel(); + + try { + snd0.join(); + + if (log.isDebugEnabled()) + log.debug("Finished waiting for sending thread to complete on shuffle job flush: " + job.id()); + } + catch (InterruptedException e) { + throw new IgniteInterruptedCheckedException(e); + } + } + + collectUpdatesAndSend(true); // With flush. + + if (log.isDebugEnabled()) + log.debug("Finished sending collected updates to remote reducers: " + job.id()); + + GridCompoundFuture fut = new GridCompoundFuture<>(); + + for (IgniteBiTuple> tup : sentMsgs.values()) + fut.add(tup.get2()); + + fut.markInitialized(); + + if (log.isDebugEnabled()) + log.debug("Collected futures to compound futures for flush: " + sentMsgs.size()); + + return fut; + } + + /** + * @param taskCtx Task context. + * @return Output. + * @throws IgniteCheckedException If failed. + */ + public HadoopTaskOutput output(HadoopTaskContext taskCtx) throws IgniteCheckedException { + switch (taskCtx.taskInfo().type()) { + case MAP: + assert !job.info().hasCombiner() : "The output creation is allowed if combiner has not been defined."; + + case COMBINE: + return new PartitionedOutput(taskCtx); + + default: + throw new IllegalStateException("Illegal type: " + taskCtx.taskInfo().type()); + } + } + + /** + * @param taskCtx Task context. + * @return Input. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException { + switch (taskCtx.taskInfo().type()) { + case REDUCE: + int reducer = taskCtx.taskInfo().taskNumber(); + + HadoopMultimap m = maps.get(reducer); + + if (m != null) + return m.input(taskCtx); + + return new HadoopTaskInput() { // Empty input. + @Override public boolean next() { + return false; + } + + @Override public Object key() { + throw new IllegalStateException(); + } + + @Override public Iterator values() { + throw new IllegalStateException(); + } + + @Override public void close() { + // No-op. + } + }; + + default: + throw new IllegalStateException("Illegal type: " + taskCtx.taskInfo().type()); + } + } + + /** + * Partitioned output. + */ + private class PartitionedOutput implements HadoopTaskOutput { + /** */ + private final HadoopTaskOutput[] adders = new HadoopTaskOutput[maps.length()]; + + /** */ + private HadoopPartitioner partitioner; + + /** */ + private final HadoopTaskContext taskCtx; + + /** + * Constructor. + * @param taskCtx Task context. + */ + private PartitionedOutput(HadoopTaskContext taskCtx) throws IgniteCheckedException { + this.taskCtx = taskCtx; + + if (needPartitioner) + partitioner = taskCtx.partitioner(); + } + + /** {@inheritDoc} */ + @Override public void write(Object key, Object val) throws IgniteCheckedException { + int part = 0; + + if (partitioner != null) { + part = partitioner.partition(key, val, adders.length); + + if (part < 0 || part >= adders.length) + throw new IgniteCheckedException("Invalid partition: " + part); + } + + HadoopTaskOutput out = adders[part]; + + if (out == null) + adders[part] = out = getOrCreateMap(maps, part).startAdding(taskCtx); + + out.write(key, val); + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteCheckedException { + for (HadoopTaskOutput adder : adders) { + if (adder != null) + adder.close(); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java new file mode 100644 index 0000000..69dfe64 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleMessage.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopJobId; +import org.apache.ignite.internal.processors.hadoop.message.HadoopMessage; +import org.apache.ignite.internal.util.GridUnsafe; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Shuffle message. + */ +public class HadoopShuffleMessage implements HadoopMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private static final AtomicLong ids = new AtomicLong(); + + /** */ + private static final byte MARKER_KEY = (byte)17; + + /** */ + private static final byte MARKER_VALUE = (byte)31; + + /** */ + @GridToStringInclude + private long msgId; + + /** */ + @GridToStringInclude + private HadoopJobId jobId; + + /** */ + @GridToStringInclude + private int reducer; + + /** */ + private byte[] buf; + + /** */ + @GridToStringInclude + private int off; + + /** + * + */ + public HadoopShuffleMessage() { + // No-op. + } + + /** + * @param size Size. + */ + public HadoopShuffleMessage(HadoopJobId jobId, int reducer, int size) { + assert jobId != null; + + buf = new byte[size]; + + this.jobId = jobId; + this.reducer = reducer; + + msgId = ids.incrementAndGet(); + } + + /** + * @return Message ID. + */ + public long id() { + return msgId; + } + + /** + * @return Job ID. + */ + public HadoopJobId jobId() { + return jobId; + } + + /** + * @return Reducer. + */ + public int reducer() { + return reducer; + } + + /** + * @return Buffer. + */ + public byte[] buffer() { + return buf; + } + + /** + * @return Offset. + */ + public int offset() { + return off; + } + + /** + * @param size Size. + * @param valOnly Only value wll be added. + * @return {@code true} If this message can fit additional data of this size + */ + public boolean available(int size, boolean valOnly) { + size += valOnly ? 5 : 10; + + if (off + size > buf.length) { + if (off == 0) { // Resize if requested size is too big. + buf = new byte[size]; + + return true; + } + + return false; + } + + return true; + } + + /** + * @param keyPtr Key pointer. + * @param keySize Key size. + */ + public void addKey(long keyPtr, int keySize) { + add(MARKER_KEY, keyPtr, keySize); + } + + /** + * @param valPtr Value pointer. + * @param valSize Value size. + */ + public void addValue(long valPtr, int valSize) { + add(MARKER_VALUE, valPtr, valSize); + } + + /** + * @param marker Marker. + * @param ptr Pointer. + * @param size Size. + */ + private void add(byte marker, long ptr, int size) { + buf[off++] = marker; + + GridUnsafe.putInt(buf, GridUnsafe.BYTE_ARR_OFF + off, size); + + off += 4; + + GridUnsafe.copyMemory(null, ptr, buf, GridUnsafe.BYTE_ARR_OFF + off, size); + + off += size; + } + + /** + * @param v Visitor. + */ + public void visit(Visitor v) throws IgniteCheckedException { + for (int i = 0; i < off;) { + byte marker = buf[i++]; + + int size = GridUnsafe.getInt(buf, GridUnsafe.BYTE_ARR_OFF + i); + + i += 4; + + if (marker == MARKER_VALUE) + v.onValue(buf, i, size); + else if (marker == MARKER_KEY) + v.onKey(buf, i, size); + else + throw new IllegalStateException(); + + i += size; + } + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + jobId.writeExternal(out); + out.writeLong(msgId); + out.writeInt(reducer); + out.writeInt(off); + U.writeByteArray(out, buf); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + jobId = new HadoopJobId(); + + jobId.readExternal(in); + msgId = in.readLong(); + reducer = in.readInt(); + off = in.readInt(); + buf = U.readByteArray(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopShuffleMessage.class, this); + } + + /** + * Visitor. + */ + public static interface Visitor { + /** + * @param buf Buffer. + * @param off Offset. + * @param len Length. + */ + public void onKey(byte[] buf, int off, int len) throws IgniteCheckedException; + + /** + * @param buf Buffer. + * @param off Offset. + * @param len Length. + */ + public void onValue(byte[] buf, int off, int len) throws IgniteCheckedException; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java new file mode 100644 index 0000000..ffa7871 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopConcurrentHashMultimap.java @@ -0,0 +1,616 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.collections; + +import java.io.DataInput; +import java.util.Random; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLongArray; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput; +import org.apache.ignite.internal.util.GridLongList; +import org.apache.ignite.internal.util.GridRandom; +import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +/** + * Multimap for map reduce intermediate results. + */ +public class HadoopConcurrentHashMultimap extends HadoopHashMultimapBase { + /** */ + private final AtomicReference state = new AtomicReference<>(State.READING_WRITING); + + /** */ + private volatile AtomicLongArray oldTbl; + + /** */ + private volatile AtomicLongArray newTbl; + + /** */ + private final AtomicInteger keys = new AtomicInteger(); + + /** */ + private final CopyOnWriteArrayList adders = new CopyOnWriteArrayList<>(); + + /** */ + private final AtomicInteger inputs = new AtomicInteger(); + + /** + * @param jobInfo Job info. + * @param mem Memory. + * @param cap Initial capacity. + */ + public HadoopConcurrentHashMultimap(HadoopJobInfo jobInfo, GridUnsafeMemory mem, int cap) { + super(jobInfo, mem); + + assert U.isPow2(cap); + + newTbl = oldTbl = new AtomicLongArray(cap); + } + + /** + * @return Number of keys. + */ + public long keys() { + int res = keys.get(); + + for (AdderImpl adder : adders) + res += adder.locKeys.get(); + + return res; + } + + /** + * @return Current table capacity. + */ + @Override public int capacity() { + return oldTbl.length(); + } + + /** + * @return Adder object. + * @param ctx Task context. + */ + @Override public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException { + if (inputs.get() != 0) + throw new IllegalStateException("Active inputs."); + + if (state.get() == State.CLOSING) + throw new IllegalStateException("Closed."); + + return new AdderImpl(ctx); + } + + /** {@inheritDoc} */ + @Override public void close() { + assert inputs.get() == 0 : inputs.get(); + assert adders.isEmpty() : adders.size(); + + state(State.READING_WRITING, State.CLOSING); + + if (keys() == 0) + return; + + super.close(); + } + + /** {@inheritDoc} */ + @Override protected long meta(int idx) { + return oldTbl.get(idx); + } + + /** + * Incrementally visits all the keys and values in the map. + * + * @param ignoreLastVisited Flag indicating that visiting must be started from the beginning. + * @param v Visitor. + * @return {@code false} If visiting was impossible due to rehashing. + */ + @Override public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException { + if (!state.compareAndSet(State.READING_WRITING, State.VISITING)) { + assert state.get() != State.CLOSING; + + return false; // Can not visit while rehashing happens. + } + + AtomicLongArray tbl0 = oldTbl; + + for (int i = 0; i < tbl0.length(); i++) { + long meta = tbl0.get(i); + + while (meta != 0) { + long valPtr = value(meta); + + long lastVisited = ignoreLastVisited ? 0 : lastVisitedValue(meta); + + if (valPtr != lastVisited) { + v.onKey(key(meta), keySize(meta)); + + lastVisitedValue(meta, valPtr); // Set it to the first value in chain. + + do { + v.onValue(valPtr + 12, valueSize(valPtr)); + + valPtr = nextValue(valPtr); + } + while (valPtr != lastVisited); + } + + meta = collision(meta); + } + } + + state(State.VISITING, State.READING_WRITING); + + return true; + } + + /** {@inheritDoc} */ + @Override public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException { + inputs.incrementAndGet(); + + if (!adders.isEmpty()) + throw new IllegalStateException("Active adders."); + + State s = state.get(); + + if (s == State.CLOSING) + throw new IllegalStateException("Closed."); + + assert s != State.REHASHING; + + return new Input(taskCtx) { + @Override public void close() throws IgniteCheckedException { + if (inputs.decrementAndGet() < 0) + throw new IllegalStateException(); + + super.close(); + } + }; + } + + /** + * @param fromTbl Table. + */ + private void rehashIfNeeded(AtomicLongArray fromTbl) { + if (fromTbl.length() == Integer.MAX_VALUE) + return; + + long keys0 = keys(); + + if (keys0 < 3 * (fromTbl.length() >>> 2)) // New size has to be >= than 3/4 of capacity to rehash. + return; + + if (fromTbl != newTbl) // Check if someone else have done the job. + return; + + if (!state.compareAndSet(State.READING_WRITING, State.REHASHING)) { + assert state.get() != State.CLOSING; // Visiting is allowed, but we will not rehash. + + return; + } + + if (fromTbl != newTbl) { // Double check. + state(State.REHASHING, State.READING_WRITING); // Switch back. + + return; + } + + // Calculate new table capacity. + int newLen = fromTbl.length(); + + do { + newLen <<= 1; + } + while (newLen < keys0); + + if (keys0 >= 3 * (newLen >>> 2)) // Still more than 3/4. + newLen <<= 1; + + // This is our target table for rehashing. + AtomicLongArray toTbl = new AtomicLongArray(newLen); + + // Make the new table visible before rehashing. + newTbl = toTbl; + + // Rehash. + int newMask = newLen - 1; + + long failedMeta = 0; + + GridLongList collisions = new GridLongList(16); + + for (int i = 0; i < fromTbl.length(); i++) { // Scan source table. + long meta = fromTbl.get(i); + + assert meta != -1; + + if (meta == 0) { // No entry. + failedMeta = 0; + + if (!fromTbl.compareAndSet(i, 0, -1)) // Mark as moved. + i--; // Retry. + + continue; + } + + do { // Collect all the collisions before the last one failed to nullify or 0. + collisions.add(meta); + + meta = collision(meta); + } + while (meta != failedMeta); + + do { // Go from the last to the first to avoid 'in-flight' state for meta entries. + meta = collisions.remove(); + + int addr = keyHash(meta) & newMask; + + for (;;) { // Move meta entry to the new table. + long toCollision = toTbl.get(addr); + + collision(meta, toCollision); + + if (toTbl.compareAndSet(addr, toCollision, meta)) + break; + } + } + while (!collisions.isEmpty()); + + // Here 'meta' will be a root pointer in old table. + if (!fromTbl.compareAndSet(i, meta, -1)) { // Try to mark as moved. + failedMeta = meta; + + i--; // Retry the same address in table because new keys were added. + } + else + failedMeta = 0; + } + + // Now old and new tables will be the same again. + oldTbl = toTbl; + + state(State.REHASHING, State.READING_WRITING); + } + + /** + * Switch state. + * + * @param oldState Expected state. + * @param newState New state. + */ + private void state(State oldState, State newState) { + if (!state.compareAndSet(oldState, newState)) + throw new IllegalStateException(); + } + + /** + * @param meta Meta pointer. + * @return Value pointer. + */ + @Override protected long value(long meta) { + return mem.readLongVolatile(meta + 16); + } + + /** + * @param meta Meta pointer. + * @param oldValPtr Old value. + * @param newValPtr New value. + * @return {@code true} If succeeded. + */ + private boolean casValue(long meta, long oldValPtr, long newValPtr) { + return mem.casLong(meta + 16, oldValPtr, newValPtr); + } + + /** + * @param meta Meta pointer. + * @return Collision pointer. + */ + @Override protected long collision(long meta) { + return mem.readLongVolatile(meta + 24); + } + + /** + * @param meta Meta pointer. + * @param collision Collision pointer. + */ + @Override protected void collision(long meta, long collision) { + assert meta != collision : meta; + + mem.writeLongVolatile(meta + 24, collision); + } + + /** + * @param meta Meta pointer. + * @return Last visited value pointer. + */ + private long lastVisitedValue(long meta) { + return mem.readLong(meta + 32); + } + + /** + * @param meta Meta pointer. + * @param valPtr Last visited value pointer. + */ + private void lastVisitedValue(long meta, long valPtr) { + mem.writeLong(meta + 32, valPtr); + } + + /** + * Adder. Must not be shared between threads. + */ + private class AdderImpl extends AdderBase { + /** */ + private final Reader keyReader; + + /** */ + private final AtomicInteger locKeys = new AtomicInteger(); + + /** */ + private final Random rnd = new GridRandom(); + + /** + * @param ctx Task context. + * @throws IgniteCheckedException If failed. + */ + private AdderImpl(HadoopTaskContext ctx) throws IgniteCheckedException { + super(ctx); + + keyReader = new Reader(keySer); + + rehashIfNeeded(oldTbl); + + adders.add(this); + } + + /** + * @param in Data input. + * @param reuse Reusable key. + * @return Key. + * @throws IgniteCheckedException If failed. + */ + @Override public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException { + KeyImpl k = reuse == null ? new KeyImpl() : (KeyImpl)reuse; + + k.tmpKey = keySer.read(in, k.tmpKey); + + k.meta = add(k.tmpKey, null); + + return k; + } + + /** {@inheritDoc} */ + @Override public void write(Object key, Object val) throws IgniteCheckedException { + A.notNull(val, "val"); + + add(key, val); + } + + /** + * @param tbl Table. + */ + private void incrementKeys(AtomicLongArray tbl) { + locKeys.lazySet(locKeys.get() + 1); + + if (rnd.nextInt(tbl.length()) < 512) + rehashIfNeeded(tbl); + } + + /** + * @param keyHash Key hash. + * @param keySize Key size. + * @param keyPtr Key pointer. + * @param valPtr Value page pointer. + * @param collisionPtr Pointer to meta with hash collision. + * @param lastVisitedVal Last visited value pointer. + * @return Created meta page pointer. + */ + private long createMeta(int keyHash, int keySize, long keyPtr, long valPtr, long collisionPtr, long lastVisitedVal) { + long meta = allocate(40); + + mem.writeInt(meta, keyHash); + mem.writeInt(meta + 4, keySize); + mem.writeLong(meta + 8, keyPtr); + mem.writeLong(meta + 16, valPtr); + mem.writeLong(meta + 24, collisionPtr); + mem.writeLong(meta + 32, lastVisitedVal); + + return meta; + } + + /** + * @param key Key. + * @param val Value. + * @return Updated or created meta page pointer. + * @throws IgniteCheckedException If failed. + */ + private long add(Object key, @Nullable Object val) throws IgniteCheckedException { + AtomicLongArray tbl = oldTbl; + + int keyHash = U.hash(key.hashCode()); + + long newMetaPtr = 0; + + long valPtr = 0; + + if (val != null) { + valPtr = write(12, val, valSer); + int valSize = writtenSize() - 12; + + valueSize(valPtr, valSize); + } + + for (AtomicLongArray old = null;;) { + int addr = keyHash & (tbl.length() - 1); + + long metaPtrRoot = tbl.get(addr); // Read root meta pointer at this address. + + if (metaPtrRoot == -1) { // The cell was already moved by rehashing. + AtomicLongArray n = newTbl; // Need to read newTbl first here. + AtomicLongArray o = oldTbl; + + tbl = tbl == o ? n : o; // Trying to get the oldest table but newer than ours. + + old = null; + + continue; + } + + if (metaPtrRoot != 0) { // Not empty slot. + long metaPtr = metaPtrRoot; + + do { // Scan all the collisions. + if (keyHash(metaPtr) == keyHash && key.equals(keyReader.readKey(metaPtr))) { // Found key. + if (newMetaPtr != 0) // Deallocate new meta if one was allocated. + localDeallocate(key(newMetaPtr)); // Key was allocated first, so rewind to it's pointer. + + if (valPtr != 0) { // Add value if it exists. + long nextValPtr; + + // Values are linked to each other to a stack like structure. + // Replace the last value in meta with ours and link it as next. + do { + nextValPtr = value(metaPtr); + + nextValue(valPtr, nextValPtr); + } + while (!casValue(metaPtr, nextValPtr, valPtr)); + } + + return metaPtr; + } + + metaPtr = collision(metaPtr); + } + while (metaPtr != 0); + + // Here we did not find our key, need to check if it was moved by rehashing to the new table. + if (old == null) { // If the old table already set, then we will just try to update it. + AtomicLongArray n = newTbl; + + if (n != tbl) { // Rehashing happens, try to find the key in new table but preserve the old one. + old = tbl; + tbl = n; + + continue; + } + } + } + + if (old != null) { // We just checked new table but did not find our key as well as in the old one. + tbl = old; // Try to add new key to the old table. + + addr = keyHash & (tbl.length() - 1); + + old = null; + } + + if (newMetaPtr == 0) { // Allocate new meta page. + long keyPtr = write(0, key, keySer); + int keySize = writtenSize(); + + if (valPtr != 0) + nextValue(valPtr, 0); + + newMetaPtr = createMeta(keyHash, keySize, keyPtr, valPtr, metaPtrRoot, 0); + } + else // Update new meta with root pointer collision. + collision(newMetaPtr, metaPtrRoot); + + if (tbl.compareAndSet(addr, metaPtrRoot, newMetaPtr)) { // Try to replace root pointer with new one. + incrementKeys(tbl); + + return newMetaPtr; + } + } + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteCheckedException { + if (!adders.remove(this)) + throw new IllegalStateException(); + + keys.addAndGet(locKeys.get()); // Here we have race and #keys() method can return wrong result but it is ok. + + super.close(); + } + + /** + * Key. + */ + private class KeyImpl implements Key { + /** */ + private long meta; + + /** */ + private Object tmpKey; + + /** + * @return Meta pointer for the key. + */ + public long address() { + return meta; + } + + /** + * @param val Value. + */ + @Override public void add(Value val) { + int size = val.size(); + + long valPtr = allocate(size + 12); + + val.copyTo(valPtr + 12); + + valueSize(valPtr, size); + + long nextVal; + + do { + nextVal = value(meta); + + nextValue(valPtr, nextVal); + } + while(!casValue(meta, nextVal, valPtr)); + } + } + } + + /** + * Current map state. + */ + private enum State { + /** */ + REHASHING, + + /** */ + VISITING, + + /** */ + READING_WRITING, + + /** */ + CLOSING + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java new file mode 100644 index 0000000..c32e9af --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimap.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.collections; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Hash multimap. + */ +public class HadoopHashMultimap extends HadoopHashMultimapBase { + /** */ + private long[] tbl; + + /** */ + private int keys; + + /** + * @param jobInfo Job info. + * @param mem Memory. + * @param cap Initial capacity. + */ + public HadoopHashMultimap(HadoopJobInfo jobInfo, GridUnsafeMemory mem, int cap) { + super(jobInfo, mem); + + assert U.isPow2(cap) : cap; + + tbl = new long[cap]; + } + + /** {@inheritDoc} */ + @Override public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException { + return new AdderImpl(ctx); + } + + /** + * Rehash. + */ + private void rehash() { + long[] newTbl = new long[tbl.length << 1]; + + int newMask = newTbl.length - 1; + + for (long meta : tbl) { + while (meta != 0) { + long collision = collision(meta); + + int idx = keyHash(meta) & newMask; + + collision(meta, newTbl[idx]); + + newTbl[idx] = meta; + + meta = collision; + } + } + + tbl = newTbl; + } + + /** + * @return Keys count. + */ + public int keys() { + return keys; + } + + /** {@inheritDoc} */ + @Override public int capacity() { + return tbl.length; + } + + /** {@inheritDoc} */ + @Override protected long meta(int idx) { + return tbl[idx]; + } + + /** + * Adder. + */ + private class AdderImpl extends AdderBase { + /** */ + private final Reader keyReader; + + /** + * @param ctx Task context. + * @throws IgniteCheckedException If failed. + */ + protected AdderImpl(HadoopTaskContext ctx) throws IgniteCheckedException { + super(ctx); + + keyReader = new Reader(keySer); + } + + /** + * @param keyHash Key hash. + * @param keySize Key size. + * @param keyPtr Key pointer. + * @param valPtr Value page pointer. + * @param collisionPtr Pointer to meta with hash collision. + * @return Created meta page pointer. + */ + private long createMeta(int keyHash, int keySize, long keyPtr, long valPtr, long collisionPtr) { + long meta = allocate(32); + + mem.writeInt(meta, keyHash); + mem.writeInt(meta + 4, keySize); + mem.writeLong(meta + 8, keyPtr); + mem.writeLong(meta + 16, valPtr); + mem.writeLong(meta + 24, collisionPtr); + + return meta; + } + + /** {@inheritDoc} */ + @Override public void write(Object key, Object val) throws IgniteCheckedException { + A.notNull(val, "val"); + + int keyHash = U.hash(key.hashCode()); + + // Write value. + long valPtr = write(12, val, valSer); + int valSize = writtenSize() - 12; + + valueSize(valPtr, valSize); + + // Find position in table. + int idx = keyHash & (tbl.length - 1); + + long meta = tbl[idx]; + + // Search for our key in collisions. + while (meta != 0) { + if (keyHash(meta) == keyHash && key.equals(keyReader.readKey(meta))) { // Found key. + nextValue(valPtr, value(meta)); + + value(meta, valPtr); + + return; + } + + meta = collision(meta); + } + + // Write key. + long keyPtr = write(0, key, keySer); + int keySize = writtenSize(); + + nextValue(valPtr, 0); + + tbl[idx] = createMeta(keyHash, keySize, keyPtr, valPtr, tbl[idx]); + + if (++keys > (tbl.length >>> 2) * 3) + rehash(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java new file mode 100644 index 0000000..8d9b3c3 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMultimapBase.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.collections; + +import java.util.Iterator; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.processors.hadoop.HadoopJobInfo; +import org.apache.ignite.internal.processors.hadoop.HadoopSerialization; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput; +import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; + +/** + * Base class for hash multimaps. + */ +public abstract class HadoopHashMultimapBase extends HadoopMultimapBase { + /** + * @param jobInfo Job info. + * @param mem Memory. + */ + protected HadoopHashMultimapBase(HadoopJobInfo jobInfo, GridUnsafeMemory mem) { + super(jobInfo, mem); + } + + /** {@inheritDoc} */ + @Override public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException { + throw new UnsupportedOperationException("visit"); + } + + /** {@inheritDoc} */ + @Override public HadoopTaskInput input(HadoopTaskContext taskCtx) throws IgniteCheckedException { + return new Input(taskCtx); + } + + /** + * @return Hash table capacity. + */ + public abstract int capacity(); + + /** + * @param idx Index in hash table. + * @return Meta page pointer. + */ + protected abstract long meta(int idx); + + /** + * @param meta Meta pointer. + * @return Key hash. + */ + protected int keyHash(long meta) { + return mem.readInt(meta); + } + + /** + * @param meta Meta pointer. + * @return Key size. + */ + protected int keySize(long meta) { + return mem.readInt(meta + 4); + } + + /** + * @param meta Meta pointer. + * @return Key pointer. + */ + protected long key(long meta) { + return mem.readLong(meta + 8); + } + + /** + * @param meta Meta pointer. + * @return Value pointer. + */ + protected long value(long meta) { + return mem.readLong(meta + 16); + } + /** + * @param meta Meta pointer. + * @param val Value pointer. + */ + protected void value(long meta, long val) { + mem.writeLong(meta + 16, val); + } + + /** + * @param meta Meta pointer. + * @return Collision pointer. + */ + protected long collision(long meta) { + return mem.readLong(meta + 24); + } + + /** + * @param meta Meta pointer. + * @param collision Collision pointer. + */ + protected void collision(long meta, long collision) { + assert meta != collision : meta; + + mem.writeLong(meta + 24, collision); + } + + /** + * Reader for key and value. + */ + protected class Reader extends ReaderBase { + /** + * @param ser Serialization. + */ + protected Reader(HadoopSerialization ser) { + super(ser); + } + + /** + * @param meta Meta pointer. + * @return Key. + */ + public Object readKey(long meta) { + assert meta > 0 : meta; + + try { + return read(key(meta), keySize(meta)); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + } + + /** + * Task input. + */ + protected class Input implements HadoopTaskInput { + /** */ + private int idx = -1; + + /** */ + private long metaPtr; + + /** */ + private final int cap; + + /** */ + private final Reader keyReader; + + /** */ + private final Reader valReader; + + /** + * @param taskCtx Task context. + * @throws IgniteCheckedException If failed. + */ + public Input(HadoopTaskContext taskCtx) throws IgniteCheckedException { + cap = capacity(); + + keyReader = new Reader(taskCtx.keySerialization()); + valReader = new Reader(taskCtx.valueSerialization()); + } + + /** {@inheritDoc} */ + @Override public boolean next() { + if (metaPtr != 0) { + metaPtr = collision(metaPtr); + + if (metaPtr != 0) + return true; + } + + while (++idx < cap) { // Scan table. + metaPtr = meta(idx); + + if (metaPtr != 0) + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public Object key() { + return keyReader.readKey(metaPtr); + } + + /** {@inheritDoc} */ + @Override public Iterator values() { + return new ValueIterator(value(metaPtr), valReader); + } + + /** {@inheritDoc} */ + @Override public void close() throws IgniteCheckedException { + keyReader.close(); + valReader.close(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/ce9bdeb7/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java new file mode 100644 index 0000000..5b71c47 --- /dev/null +++ b/modules/hadoop-impl/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimap.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.collections; + +import java.io.DataInput; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskInput; +import org.apache.ignite.internal.processors.hadoop.HadoopTaskOutput; +import org.jetbrains.annotations.Nullable; + +/** + * Multimap for hadoop intermediate results. + */ +@SuppressWarnings("PublicInnerClass") +public interface HadoopMultimap extends AutoCloseable { + /** + * Incrementally visits all the keys and values in the map. + * + * @param ignoreLastVisited Flag indicating that visiting must be started from the beginning. + * @param v Visitor. + * @return {@code false} If visiting was impossible. + */ + public boolean visit(boolean ignoreLastVisited, Visitor v) throws IgniteCheckedException; + + /** + * @param ctx Task context. + * @return Adder. + * @throws IgniteCheckedException If failed. + */ + public Adder startAdding(HadoopTaskContext ctx) throws IgniteCheckedException; + + /** + * @param taskCtx Task context. + * @return Task input. + * @throws IgniteCheckedException If failed. + */ + public HadoopTaskInput input(HadoopTaskContext taskCtx) + throws IgniteCheckedException; + + /** {@inheritDoc} */ + @Override public void close(); + + /** + * Adder. + */ + public interface Adder extends HadoopTaskOutput { + /** + * @param in Data input. + * @param reuse Reusable key. + * @return Key. + * @throws IgniteCheckedException If failed. + */ + public Key addKey(DataInput in, @Nullable Key reuse) throws IgniteCheckedException; + } + + /** + * Key add values to. + */ + public interface Key { + /** + * @param val Value. + */ + public void add(Value val); + } + + /** + * Value. + */ + public interface Value { + /** + * @return Size in bytes. + */ + public int size(); + + /** + * @param ptr Pointer. + */ + public void copyTo(long ptr); + } + + /** + * Key and values visitor. + */ + public interface Visitor { + /** + * @param keyPtr Key pointer. + * @param keySize Key size. + */ + public void onKey(long keyPtr, int keySize) throws IgniteCheckedException; + + /** + * @param valPtr Value pointer. + * @param valSize Value size. + */ + public void onValue(long valPtr, int valSize) throws IgniteCheckedException; + } +} \ No newline at end of file