ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [23/65] [abbrv] incubator-ignite git commit: IGNITE-386: Squashed changes.
Date Thu, 05 Mar 2015 19:08:35 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataInStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataInStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataInStream.java
deleted file mode 100644
index 8b4f0c4..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataInStream.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle.streams;
-
-import org.apache.ignite.internal.util.offheap.unsafe.*;
-
-import java.io.*;
-import java.nio.charset.*;
-
-/**
- * Data input stream.
- */
-public class GridHadoopDataInStream extends InputStream implements DataInput {
-    /** */
-    private final GridHadoopOffheapBuffer buf = new GridHadoopOffheapBuffer(0, 0);
-
-    /** */
-    private final GridUnsafeMemory mem;
-
-    /**
-     * @param mem Memory.
-     */
-    public GridHadoopDataInStream(GridUnsafeMemory mem) {
-        assert mem != null;
-
-        this.mem = mem;
-    }
-
-    /**
-     * @return Buffer.
-     */
-    public GridHadoopOffheapBuffer buffer() {
-        return buf;
-    }
-
-    /**
-     * @param size Size.
-     * @return Old pointer.
-     */
-    protected long move(long size) throws IOException {
-        long ptr = buf.move(size);
-
-        assert ptr != 0;
-
-        return ptr;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int read() throws IOException {
-        return readUnsignedByte();
-    }
-
-    /** {@inheritDoc} */
-    @Override public int read(byte[] b, int off, int len) throws IOException {
-        readFully(b, off, len);
-
-        return len;
-    }
-
-    /** {@inheritDoc} */
-    @Override public long skip(long n) throws IOException {
-        move(n);
-
-        return n;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readFully(byte[] b) throws IOException {
-        readFully(b, 0, b.length);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readFully(byte[] b, int off, int len) throws IOException {
-        mem.readBytes(move(len), b, off, len);
-    }
-
-    /** {@inheritDoc} */
-    @Override public int skipBytes(int n) throws IOException {
-        move(n);
-
-        return n;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readBoolean() throws IOException {
-        byte res = readByte();
-
-        if (res == 1)
-            return true;
-
-        assert res == 0 : res;
-
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte readByte() throws IOException {
-        return mem.readByte(move(1));
-    }
-
-    /** {@inheritDoc} */
-    @Override public int readUnsignedByte() throws IOException {
-        return readByte() & 0xff;
-    }
-
-    /** {@inheritDoc} */
-    @Override public short readShort() throws IOException {
-        return mem.readShort(move(2));
-    }
-
-    /** {@inheritDoc} */
-    @Override public int readUnsignedShort() throws IOException {
-        return readShort() & 0xffff;
-    }
-
-    /** {@inheritDoc} */
-    @Override public char readChar() throws IOException {
-        return (char)readShort();
-    }
-
-    /** {@inheritDoc} */
-    @Override public int readInt() throws IOException {
-        return mem.readInt(move(4));
-    }
-
-    /** {@inheritDoc} */
-    @Override public long readLong() throws IOException {
-        return mem.readLong(move(8));
-    }
-
-    /** {@inheritDoc} */
-    @Override public float readFloat() throws IOException {
-        return mem.readFloat(move(4));
-    }
-
-    /** {@inheritDoc} */
-    @Override public double readDouble() throws IOException {
-        return mem.readDouble(move(8));
-    }
-
-    /** {@inheritDoc} */
-    @Override public String readLine() throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String readUTF() throws IOException {
-        byte[] bytes = new byte[readInt()];
-
-        if (bytes.length != 0)
-            readFully(bytes);
-
-        return new String(bytes, StandardCharsets.UTF_8);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataOutStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataOutStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataOutStream.java
deleted file mode 100644
index 8b837c8..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataOutStream.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle.streams;
-
-import org.apache.ignite.internal.util.offheap.unsafe.*;
-
-import java.io.*;
-import java.nio.charset.*;
-
-import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*;
-
-/**
- * Data output stream.
- */
-public class GridHadoopDataOutStream extends OutputStream implements DataOutput {
-    /** */
-    private final GridHadoopOffheapBuffer buf = new GridHadoopOffheapBuffer(0, 0);
-
-    /** */
-    private final GridUnsafeMemory mem;
-
-    /**
-     * @param mem Memory.
-     */
-    public GridHadoopDataOutStream(GridUnsafeMemory mem) {
-        this.mem = mem;
-    }
-
-    /**
-     * @return Buffer.
-     */
-    public GridHadoopOffheapBuffer buffer() {
-        return buf;
-    }
-
-    /**
-     * @param size Size.
-     * @return Old pointer or {@code 0} if move was impossible.
-     */
-    public long move(long size) {
-        return buf.move(size);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void write(int b) {
-        writeByte(b);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void write(byte[] b) {
-        write(b, 0, b.length);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void write(byte[] b, int off, int len) {
-        UNSAFE.copyMemory(b, BYTE_ARR_OFF + off, null, move(len), len);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeBoolean(boolean v) {
-        writeByte(v ? 1 : 0);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeByte(int v) {
-        mem.writeByte(move(1), (byte)v);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeShort(int v) {
-        mem.writeShort(move(2), (short)v);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeChar(int v) {
-        writeShort(v);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeInt(int v) {
-        mem.writeInt(move(4), v);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeLong(long v) {
-        mem.writeLong(move(8), v);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeFloat(float v) {
-        mem.writeFloat(move(4), v);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeDouble(double v) {
-        mem.writeDouble(move(8), v);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeBytes(String s) {
-        writeUTF(s);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeChars(String s) {
-        writeUTF(s);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeUTF(String s) {
-        byte[] b = s.getBytes(StandardCharsets.UTF_8);
-
-        writeInt(b.length);
-        write(b);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopOffheapBuffer.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopOffheapBuffer.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopOffheapBuffer.java
deleted file mode 100644
index f9f0e1d..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopOffheapBuffer.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.shuffle.streams;
-
-/**
- * Offheap buffer.
- */
-public class GridHadoopOffheapBuffer {
-    /** Buffer begin address. */
-    private long bufPtr;
-
-    /** The first address we do not own. */
-    private long bufEnd;
-
-    /** Current read or write pointer. */
-    private long posPtr;
-
-    /**
-     * @param bufPtr Pointer to buffer begin.
-     * @param bufSize Size of the buffer.
-     */
-    public GridHadoopOffheapBuffer(long bufPtr, long bufSize) {
-        set(bufPtr, bufSize);
-    }
-
-    /**
-     * @param bufPtr Pointer to buffer begin.
-     * @param bufSize Size of the buffer.
-     */
-    public void set(long bufPtr, long bufSize) {
-        this.bufPtr = bufPtr;
-
-        posPtr = bufPtr;
-        bufEnd = bufPtr + bufSize;
-    }
-
-    /**
-     * @return Pointer to internal buffer begin.
-     */
-    public long begin() {
-        return bufPtr;
-    }
-
-    /**
-     * @return Buffer capacity.
-     */
-    public long capacity() {
-        return bufEnd - bufPtr;
-    }
-
-    /**
-     * @return Remaining capacity.
-     */
-    public long remaining() {
-        return bufEnd - posPtr;
-    }
-
-    /**
-     * @return Absolute pointer to the current position inside of the buffer.
-     */
-    public long pointer() {
-        return posPtr;
-    }
-
-    /**
-     * @param ptr Absolute pointer to the current position inside of the buffer.
-     */
-    public void pointer(long ptr) {
-        assert ptr >= bufPtr : bufPtr + " <= " + ptr;
-        assert ptr <= bufEnd : bufEnd + " <= " + bufPtr;
-
-        posPtr = ptr;
-    }
-
-    /**
-     * @param size Size move on.
-     * @return Old position pointer or {@code 0} if move goes beyond the end of the buffer.
-     */
-    public long move(long size) {
-        assert size > 0 : size;
-
-        long oldPos = posPtr;
-        long newPos = oldPos + size;
-
-        if (newPos > bufEnd)
-            return 0;
-
-        posPtr = newPos;
-
-        return oldPos;
-    }
-
-    /**
-     * @param ptr Pointer.
-     * @return {@code true} If the given pointer is inside of this buffer.
-     */
-    public boolean isInside(long ptr) {
-        return ptr >= bufPtr && ptr <= bufEnd;
-    }
-
-    /**
-     * Resets position to the beginning of buffer.
-     */
-    public void reset() {
-        posPtr = bufPtr;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java
new file mode 100644
index 0000000..8a1ee70
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataInStream.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.streams;
+
+import org.apache.ignite.internal.util.offheap.unsafe.*;
+
+import java.io.*;
+import java.nio.charset.*;
+
+/**
+ * Data input stream.
+ */
+public class HadoopDataInStream extends InputStream implements DataInput {
+    /** */
+    private final HadoopOffheapBuffer buf = new HadoopOffheapBuffer(0, 0);
+
+    /** */
+    private final GridUnsafeMemory mem;
+
+    /**
+     * @param mem Memory.
+     */
+    public HadoopDataInStream(GridUnsafeMemory mem) {
+        assert mem != null;
+
+        this.mem = mem;
+    }
+
+    /**
+     * @return Buffer.
+     */
+    public HadoopOffheapBuffer buffer() {
+        return buf;
+    }
+
+    /**
+     * @param size Size.
+     * @return Old pointer.
+     */
+    protected long move(long size) throws IOException {
+        long ptr = buf.move(size);
+
+        assert ptr != 0;
+
+        return ptr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int read() throws IOException {
+        return readUnsignedByte();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int read(byte[] b, int off, int len) throws IOException {
+        readFully(b, off, len);
+
+        return len;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long skip(long n) throws IOException {
+        move(n);
+
+        return n;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readFully(byte[] b) throws IOException {
+        readFully(b, 0, b.length);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readFully(byte[] b, int off, int len) throws IOException {
+        mem.readBytes(move(len), b, off, len);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int skipBytes(int n) throws IOException {
+        move(n);
+
+        return n;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readBoolean() throws IOException {
+        byte res = readByte();
+
+        if (res == 1)
+            return true;
+
+        assert res == 0 : res;
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte readByte() throws IOException {
+        return mem.readByte(move(1));
+    }
+
+    /** {@inheritDoc} */
+    @Override public int readUnsignedByte() throws IOException {
+        return readByte() & 0xff;
+    }
+
+    /** {@inheritDoc} */
+    @Override public short readShort() throws IOException {
+        return mem.readShort(move(2));
+    }
+
+    /** {@inheritDoc} */
+    @Override public int readUnsignedShort() throws IOException {
+        return readShort() & 0xffff;
+    }
+
+    /** {@inheritDoc} */
+    @Override public char readChar() throws IOException {
+        return (char)readShort();
+    }
+
+    /** {@inheritDoc} */
+    @Override public int readInt() throws IOException {
+        return mem.readInt(move(4));
+    }
+
+    /** {@inheritDoc} */
+    @Override public long readLong() throws IOException {
+        return mem.readLong(move(8));
+    }
+
+    /** {@inheritDoc} */
+    @Override public float readFloat() throws IOException {
+        return mem.readFloat(move(4));
+    }
+
+    /** {@inheritDoc} */
+    @Override public double readDouble() throws IOException {
+        return mem.readDouble(move(8));
+    }
+
+    /** {@inheritDoc} */
+    @Override public String readLine() throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String readUTF() throws IOException {
+        byte[] bytes = new byte[readInt()];
+
+        if (bytes.length != 0)
+            readFully(bytes);
+
+        return new String(bytes, StandardCharsets.UTF_8);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataOutStream.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataOutStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataOutStream.java
new file mode 100644
index 0000000..51bddf9
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataOutStream.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.streams;
+
+import org.apache.ignite.internal.util.offheap.unsafe.*;
+
+import java.io.*;
+import java.nio.charset.*;
+
+import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*;
+
+/**
+ * Data output stream.
+ */
+public class HadoopDataOutStream extends OutputStream implements DataOutput {
+    /** */
+    private final HadoopOffheapBuffer buf = new HadoopOffheapBuffer(0, 0);
+
+    /** */
+    private final GridUnsafeMemory mem;
+
+    /**
+     * @param mem Memory.
+     */
+    public HadoopDataOutStream(GridUnsafeMemory mem) {
+        this.mem = mem;
+    }
+
+    /**
+     * @return Buffer.
+     */
+    public HadoopOffheapBuffer buffer() {
+        return buf;
+    }
+
+    /**
+     * @param size Size.
+     * @return Old pointer or {@code 0} if move was impossible.
+     */
+    public long move(long size) {
+        return buf.move(size);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(int b) {
+        writeByte(b);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(byte[] b) {
+        write(b, 0, b.length);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void write(byte[] b, int off, int len) {
+        UNSAFE.copyMemory(b, BYTE_ARR_OFF + off, null, move(len), len);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBoolean(boolean v) {
+        writeByte(v ? 1 : 0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeByte(int v) {
+        mem.writeByte(move(1), (byte)v);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeShort(int v) {
+        mem.writeShort(move(2), (short)v);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeChar(int v) {
+        writeShort(v);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeInt(int v) {
+        mem.writeInt(move(4), v);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeLong(long v) {
+        mem.writeLong(move(8), v);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeFloat(float v) {
+        mem.writeFloat(move(4), v);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeDouble(double v) {
+        mem.writeDouble(move(8), v);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBytes(String s) {
+        writeUTF(s);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeChars(String s) {
+        writeUTF(s);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeUTF(String s) {
+        byte[] b = s.getBytes(StandardCharsets.UTF_8);
+
+        writeInt(b.length);
+        write(b);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java
new file mode 100644
index 0000000..a8e7a33
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopOffheapBuffer.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.shuffle.streams;
+
+/**
+ * Offheap buffer.
+ */
+public class HadoopOffheapBuffer {
+    /** Buffer begin address. */
+    private long bufPtr;
+
+    /** The first address we do not own. */
+    private long bufEnd;
+
+    /** Current read or write pointer. */
+    private long posPtr;
+
+    /**
+     * @param bufPtr Pointer to buffer begin.
+     * @param bufSize Size of the buffer.
+     */
+    public HadoopOffheapBuffer(long bufPtr, long bufSize) {
+        set(bufPtr, bufSize);
+    }
+
+    /**
+     * @param bufPtr Pointer to buffer begin.
+     * @param bufSize Size of the buffer.
+     */
+    public void set(long bufPtr, long bufSize) {
+        this.bufPtr = bufPtr;
+
+        posPtr = bufPtr;
+        bufEnd = bufPtr + bufSize;
+    }
+
+    /**
+     * @return Pointer to internal buffer begin.
+     */
+    public long begin() {
+        return bufPtr;
+    }
+
+    /**
+     * @return Buffer capacity.
+     */
+    public long capacity() {
+        return bufEnd - bufPtr;
+    }
+
+    /**
+     * @return Remaining capacity.
+     */
+    public long remaining() {
+        return bufEnd - posPtr;
+    }
+
+    /**
+     * @return Absolute pointer to the current position inside of the buffer.
+     */
+    public long pointer() {
+        return posPtr;
+    }
+
+    /**
+     * @param ptr Absolute pointer to the current position inside of the buffer.
+     */
+    public void pointer(long ptr) {
+        assert ptr >= bufPtr : bufPtr + " <= " + ptr;
+        assert ptr <= bufEnd : bufEnd + " <= " + bufPtr;
+
+        posPtr = ptr;
+    }
+
+    /**
+     * @param size Size move on.
+     * @return Old position pointer or {@code 0} if move goes beyond the end of the buffer.
+     */
+    public long move(long size) {
+        assert size > 0 : size;
+
+        long oldPos = posPtr;
+        long newPos = oldPos + size;
+
+        if (newPos > bufEnd)
+            return 0;
+
+        posPtr = newPos;
+
+        return oldPos;
+    }
+
+    /**
+     * @param ptr Pointer.
+     * @return {@code true} If the given pointer is inside of this buffer.
+     */
+    public boolean isInside(long ptr) {
+        return ptr >= bufPtr && ptr <= bufEnd;
+    }
+
+    /**
+     * Resets position to the beginning of buffer.
+     */
+    public void reset() {
+        posPtr = bufPtr;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopEmbeddedTaskExecutor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopEmbeddedTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopEmbeddedTaskExecutor.java
deleted file mode 100644
index fde5400..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopEmbeddedTaskExecutor.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-
-/**
- * Task executor.
- */
-public class GridHadoopEmbeddedTaskExecutor extends GridHadoopTaskExecutorAdapter {
-    /** Job tracker. */
-    private GridHadoopJobTracker jobTracker;
-
-    /** */
-    private final ConcurrentMap<GridHadoopJobId, Collection<GridHadoopRunnableTask>> jobs = new ConcurrentHashMap<>();
-
-    /** Executor service to run tasks. */
-    private GridHadoopExecutorService exec;
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStart() throws IgniteCheckedException {
-        super.onKernalStart();
-
-        jobTracker = ctx.jobTracker();
-
-        exec = new GridHadoopExecutorService(log, ctx.kernalContext().gridName(),
-            ctx.configuration().getMaxParallelTasks(), ctx.configuration().getMaxTaskQueueSize());
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStop(boolean cancel) {
-        if (exec != null) {
-            exec.shutdown(3000);
-
-            if (cancel) {
-                for (GridHadoopJobId jobId : jobs.keySet())
-                    cancelTasks(jobId);
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void stop(boolean cancel) {
-        if (exec != null && !exec.shutdown(30000))
-            U.warn(log, "Failed to finish running tasks in 30 sec.");
-    }
-
-    /** {@inheritDoc} */
-    @Override public void run(final GridHadoopJob job, Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException {
-        if (log.isDebugEnabled())
-            log.debug("Submitting tasks for local execution [locNodeId=" + ctx.localNodeId() +
-                ", tasksCnt=" + tasks.size() + ']');
-
-        Collection<GridHadoopRunnableTask> executedTasks = jobs.get(job.id());
-
-        if (executedTasks == null) {
-            executedTasks = new GridConcurrentHashSet<>();
-
-            Collection<GridHadoopRunnableTask> extractedCol = jobs.put(job.id(), executedTasks);
-
-            assert extractedCol == null;
-        }
-
-        final Collection<GridHadoopRunnableTask> finalExecutedTasks = executedTasks;
-
-        for (final GridHadoopTaskInfo info : tasks) {
-            assert info != null;
-
-            GridHadoopRunnableTask task = new GridHadoopRunnableTask(log, job, ctx.shuffle().memory(), info,
-                ctx.localNodeId()) {
-                @Override protected void onTaskFinished(GridHadoopTaskStatus status) {
-                    if (log.isDebugEnabled())
-                        log.debug("Finished task execution [jobId=" + job.id() + ", taskInfo=" + info + ", " +
-                            "waitTime=" + waitTime() + ", execTime=" + executionTime() + ']');
-
-                    finalExecutedTasks.remove(this);
-
-                    jobTracker.onTaskFinished(info, status);
-                }
-
-                @Override protected GridHadoopTaskInput createInput(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
-                    return ctx.shuffle().input(taskCtx);
-                }
-
-                @Override protected GridHadoopTaskOutput createOutput(GridHadoopTaskContext taskCtx) throws IgniteCheckedException {
-                    return ctx.shuffle().output(taskCtx);
-                }
-            };
-
-            executedTasks.add(task);
-
-            exec.submit(task);
-        }
-    }
-
-    /**
-     * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks
-     * for this job ID.
-     * <p>
-     * It is guaranteed that this method will not be called concurrently with
-     * {@link #run(GridHadoopJob, Collection)} method. No more job submissions will be performed via
-     * {@link #run(GridHadoopJob, Collection)} method for given job ID after this method is called.
-     *
-     * @param jobId Job ID to cancel.
-     */
-    @Override public void cancelTasks(GridHadoopJobId jobId) {
-        Collection<GridHadoopRunnableTask> executedTasks = jobs.get(jobId);
-
-        if (executedTasks != null) {
-            for (GridHadoopRunnableTask task : executedTasks)
-                task.cancel();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onJobStateChanged(GridHadoopJobMetadata meta) throws IgniteCheckedException {
-        if (meta.phase() == GridHadoopJobPhase.PHASE_COMPLETE) {
-            Collection<GridHadoopRunnableTask> executedTasks = jobs.remove(meta.jobId());
-
-            assert executedTasks == null || executedTasks.isEmpty();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopExecutorService.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopExecutorService.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopExecutorService.java
deleted file mode 100644
index 9ec637b..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopExecutorService.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor;
-
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.worker.*;
-import org.apache.ignite.thread.*;
-import org.jdk8.backport.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static java.util.Collections.*;
-
-/**
- * Executor service without thread pooling.
- */
-public class GridHadoopExecutorService {
-    /** */
-    private final LinkedBlockingQueue<Callable<?>> queue;
-
-    /** */
-    private final Collection<GridWorker> workers = newSetFromMap(new ConcurrentHashMap8<GridWorker, Boolean>());
-
-    /** */
-    private final AtomicInteger active = new AtomicInteger();
-
-    /** */
-    private final int maxTasks;
-
-    /** */
-    private final String gridName;
-
-    /** */
-    private final IgniteLogger log;
-
-    /** */
-    private volatile boolean shutdown;
-
-    /** */
-    private final GridWorkerListener lsnr = new GridWorkerListenerAdapter() {
-            @Override public void onStopped(GridWorker w) {
-                workers.remove(w);
-
-                if (shutdown) {
-                    active.decrementAndGet();
-
-                    return;
-                }
-
-                Callable<?> task = queue.poll();
-
-                if (task != null)
-                    startThread(task);
-                else {
-                    active.decrementAndGet();
-
-                    if (!queue.isEmpty())
-                        startFromQueue();
-                }
-            }
-        };
-
-    /**
-     * @param log Logger.
-     * @param gridName Grid name.
-     * @param maxTasks Max number of tasks.
-     * @param maxQueue Max queue length.
-     */
-    public GridHadoopExecutorService(IgniteLogger log, String gridName, int maxTasks, int maxQueue) {
-        assert maxTasks > 0 : maxTasks;
-        assert maxQueue > 0 : maxQueue;
-
-        this.maxTasks = maxTasks;
-        this.queue = new LinkedBlockingQueue<>(maxQueue);
-        this.gridName = gridName;
-        this.log = log.getLogger(GridHadoopExecutorService.class);
-    }
-
-    /**
-     * @return Number of active workers.
-     */
-    public int active() {
-        return workers.size();
-    }
-
-    /**
-     * Submit task.
-     *
-     * @param task Task.
-     */
-    public void submit(Callable<?> task) {
-        while (queue.isEmpty()) {
-            int active0 = active.get();
-
-            if (active0 == maxTasks)
-                break;
-
-            if (active.compareAndSet(active0, active0 + 1)) {
-                startThread(task);
-
-                return; // Started in new thread bypassing queue.
-            }
-        }
-
-        try {
-            while (!queue.offer(task, 100, TimeUnit.MILLISECONDS)) {
-                if (shutdown)
-                    return; // Rejected due to shutdown.
-            }
-        }
-        catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
-            return;
-        }
-
-        startFromQueue();
-    }
-
-    /**
-     * Attempts to start task from queue.
-     */
-    private void startFromQueue() {
-        do {
-            int active0 = active.get();
-
-            if (active0 == maxTasks)
-                break;
-
-            if (active.compareAndSet(active0, active0 + 1)) {
-                Callable<?> task = queue.poll();
-
-                if (task == null) {
-                    int res = active.decrementAndGet();
-
-                    assert res >= 0 : res;
-
-                    break;
-                }
-
-                startThread(task);
-            }
-        }
-        while (!queue.isEmpty());
-    }
-
-    /**
-     * @param task Task.
-     */
-    private void startThread(final Callable<?> task) {
-        String workerName;
-
-        if (task instanceof GridHadoopRunnableTask) {
-            final GridHadoopTaskInfo i = ((GridHadoopRunnableTask)task).taskInfo();
-
-            workerName = "Hadoop-task-" + i.jobId() + "-" + i.type() + "-" + i.taskNumber() + "-" + i.attempt();
-        }
-        else
-            workerName = task.toString();
-
-        GridWorker w = new GridWorker(gridName, workerName, log, lsnr) {
-            @Override protected void body() {
-                try {
-                    task.call();
-                }
-                catch (Exception e) {
-                    log.error("Failed to execute task: " + task, e);
-                }
-            }
-        };
-
-        workers.add(w);
-
-        if (shutdown)
-            w.cancel();
-
-        new IgniteThread(w).start();
-    }
-
-    /**
-     * Shuts down this executor service.
-     *
-     * @param awaitTimeMillis Time in milliseconds to wait for tasks completion.
-     * @return {@code true} If all tasks completed.
-     */
-    public boolean shutdown(long awaitTimeMillis) {
-        shutdown = true;
-
-        for (GridWorker w : workers)
-            w.cancel();
-
-        while (awaitTimeMillis > 0 && !workers.isEmpty()) {
-            try {
-                Thread.sleep(100);
-
-                awaitTimeMillis -= 100;
-            }
-            catch (InterruptedException e) {
-                break;
-            }
-        }
-
-        return workers.isEmpty();
-    }
-
-    /**
-     * @return {@code true} If method {@linkplain #shutdown(long)} was already called.
-     */
-    public boolean isShutdown() {
-        return shutdown;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java
deleted file mode 100644
index fd4a030..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.counter.*;
-import org.apache.ignite.internal.processors.hadoop.shuffle.collections.*;
-import org.apache.ignite.internal.util.offheap.unsafe.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobProperty.*;
-import static org.apache.ignite.internal.processors.hadoop.GridHadoopTaskType.*;
-
-/**
- * Runnable task.
- */
-public abstract class GridHadoopRunnableTask implements Callable<Void> {
-    /** */
-    private final GridUnsafeMemory mem;
-
-    /** */
-    private final IgniteLogger log;
-
-    /** */
-    private final GridHadoopJob job;
-
-    /** Task to run. */
-    private final GridHadoopTaskInfo info;
-
-    /** Submit time. */
-    private final long submitTs = U.currentTimeMillis();
-
-    /** Execution start timestamp. */
-    private long execStartTs;
-
-    /** Execution end timestamp. */
-    private long execEndTs;
-
-    /** */
-    private GridHadoopMultimap combinerInput;
-
-    /** */
-    private volatile GridHadoopTaskContext ctx;
-
-    /** Set if task is to cancelling. */
-    private volatile boolean cancelled;
-
-    /** Node id. */
-    private UUID nodeId;
-
-    /**
-     * @param log Log.
-     * @param job Job.
-     * @param mem Memory.
-     * @param info Task info.
-     * @param nodeId Node id.
-     */
-    protected GridHadoopRunnableTask(IgniteLogger log, GridHadoopJob job, GridUnsafeMemory mem, GridHadoopTaskInfo info,
-        UUID nodeId) {
-        this.nodeId = nodeId;
-        this.log = log.getLogger(GridHadoopRunnableTask.class);
-        this.job = job;
-        this.mem = mem;
-        this.info = info;
-    }
-
-    /**
-     * @return Wait time.
-     */
-    public long waitTime() {
-        return execStartTs - submitTs;
-    }
-
-    /**
-     * @return Execution time.
-     */
-    public long executionTime() {
-        return execEndTs - execStartTs;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Void call() throws IgniteCheckedException {
-        execStartTs = U.currentTimeMillis();
-
-        Throwable err = null;
-
-        GridHadoopTaskState state = GridHadoopTaskState.COMPLETED;
-
-        GridHadoopPerformanceCounter perfCntr = null;
-
-        try {
-            ctx = job.getTaskContext(info);
-
-            perfCntr = GridHadoopPerformanceCounter.getCounter(ctx.counters(), nodeId);
-
-            perfCntr.onTaskSubmit(info, submitTs);
-            perfCntr.onTaskPrepare(info, execStartTs);
-
-            ctx.prepareTaskEnvironment();
-
-            runTask(perfCntr);
-
-            if (info.type() == MAP && job.info().hasCombiner()) {
-                ctx.taskInfo(new GridHadoopTaskInfo(COMBINE, info.jobId(), info.taskNumber(), info.attempt(), null));
-
-                try {
-                    runTask(perfCntr);
-                }
-                finally {
-                    ctx.taskInfo(info);
-                }
-            }
-        }
-        catch (GridHadoopTaskCancelledException ignored) {
-            state = GridHadoopTaskState.CANCELED;
-        }
-        catch (Throwable e) {
-            state = GridHadoopTaskState.FAILED;
-            err = e;
-
-            U.error(log, "Task execution failed.", e);
-        }
-        finally {
-            execEndTs = U.currentTimeMillis();
-
-            if (perfCntr != null)
-                perfCntr.onTaskFinish(info, execEndTs);
-
-            onTaskFinished(new GridHadoopTaskStatus(state, err, ctx==null ? null : ctx.counters()));
-
-            if (combinerInput != null)
-                combinerInput.close();
-
-            if (ctx != null)
-                ctx.cleanupTaskEnvironment();
-        }
-
-        return null;
-    }
-
-    /**
-     * @param perfCntr Performance counter.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void runTask(GridHadoopPerformanceCounter perfCntr) throws IgniteCheckedException {
-        if (cancelled)
-            throw new GridHadoopTaskCancelledException("Task cancelled.");
-
-        try (GridHadoopTaskOutput out = createOutputInternal(ctx);
-             GridHadoopTaskInput in = createInputInternal(ctx)) {
-
-            ctx.input(in);
-            ctx.output(out);
-
-            perfCntr.onTaskStart(ctx.taskInfo(), U.currentTimeMillis());
-
-            ctx.run();
-        }
-    }
-
-    /**
-     * Cancel the executed task.
-     */
-    public void cancel() {
-        cancelled = true;
-
-        if (ctx != null)
-            ctx.cancel();
-    }
-
-    /**
-     * @param status Task status.
-     */
-    protected abstract void onTaskFinished(GridHadoopTaskStatus status);
-
-    /**
-     * @param ctx Task context.
-     * @return Task input.
-     * @throws IgniteCheckedException If failed.
-     */
-    @SuppressWarnings("unchecked")
-    private GridHadoopTaskInput createInputInternal(GridHadoopTaskContext ctx) throws IgniteCheckedException {
-        switch (ctx.taskInfo().type()) {
-            case SETUP:
-            case MAP:
-            case COMMIT:
-            case ABORT:
-                return null;
-
-            case COMBINE:
-                assert combinerInput != null;
-
-                return combinerInput.input(ctx);
-
-            default:
-                return createInput(ctx);
-        }
-    }
-
-    /**
-     * @param ctx Task context.
-     * @return Input.
-     * @throws IgniteCheckedException If failed.
-     */
-    protected abstract GridHadoopTaskInput createInput(GridHadoopTaskContext ctx) throws IgniteCheckedException;
-
-    /**
-     * @param ctx Task info.
-     * @return Output.
-     * @throws IgniteCheckedException If failed.
-     */
-    protected abstract GridHadoopTaskOutput createOutput(GridHadoopTaskContext ctx) throws IgniteCheckedException;
-
-    /**
-     * @param ctx Task info.
-     * @return Task output.
-     * @throws IgniteCheckedException If failed.
-     */
-    private GridHadoopTaskOutput createOutputInternal(GridHadoopTaskContext ctx) throws IgniteCheckedException {
-        switch (ctx.taskInfo().type()) {
-            case SETUP:
-            case REDUCE:
-            case COMMIT:
-            case ABORT:
-                return null;
-
-            case MAP:
-                if (job.info().hasCombiner()) {
-                    assert combinerInput == null;
-
-                    combinerInput = get(job.info(), SHUFFLE_COMBINER_NO_SORTING, false) ?
-                        new GridHadoopHashMultimap(job.info(), mem, get(job.info(), COMBINER_HASHMAP_SIZE, 8 * 1024)):
-                        new GridHadoopSkipList(job.info(), mem); // TODO replace with red-black tree
-
-                    return combinerInput.startAdding(ctx);
-                }
-
-            default:
-                return createOutput(ctx);
-        }
-    }
-
-    /**
-     * @return Task info.
-     */
-    public GridHadoopTaskInfo taskInfo() {
-        return info;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskExecutorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskExecutorAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskExecutorAdapter.java
deleted file mode 100644
index 8f66190..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskExecutorAdapter.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
-
-import java.util.*;
-
-/**
- * Common superclass for task executor.
- */
-public abstract class GridHadoopTaskExecutorAdapter extends GridHadoopComponent {
-    /**
-     * Runs tasks.
-     *
-     * @param job Job.
-     * @param tasks Tasks.
-     * @throws IgniteCheckedException If failed.
-     */
-    public abstract void run(final GridHadoopJob job, Collection<GridHadoopTaskInfo> tasks) throws IgniteCheckedException;
-
-    /**
-     * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks
-     * for this job ID.
-     * <p>
-     * It is guaranteed that this method will not be called concurrently with
-     * {@link #run(GridHadoopJob, Collection)} method. No more job submissions will be performed via
-     * {@link #run(GridHadoopJob, Collection)} method for given job ID after this method is called.
-     *
-     * @param jobId Job ID to cancel.
-     */
-    public abstract void cancelTasks(GridHadoopJobId jobId) throws IgniteCheckedException;
-
-    /**
-     * On job state change callback;
-     *
-     * @param meta Job metadata.
-     */
-    public abstract void onJobStateChanged(GridHadoopJobMetadata meta) throws IgniteCheckedException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskState.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskState.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskState.java
deleted file mode 100644
index d1eaa66..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskState.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor;
-
-/**
-* State of the task.
-*/
-public enum GridHadoopTaskState {
-    /** Running task. */
-    RUNNING,
-
-    /** Completed task. */
-    COMPLETED,
-
-    /** Failed task. */
-    FAILED,
-
-    /** Canceled task. */
-    CANCELED,
-
-    /** Process crashed. */
-    CRASHED
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskStatus.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskStatus.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskStatus.java
deleted file mode 100644
index 89ef8c1..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopTaskStatus.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.taskexecutor;
-
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-
-/**
- * Task status.
- */
-public class GridHadoopTaskStatus implements Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private GridHadoopTaskState state;
-
-    /** */
-    private Throwable failCause;
-
-    /** */
-    private GridHadoopCounters cntrs;
-
-    /**
-     * Default constructor required by {@link Externalizable}.
-     */
-    public GridHadoopTaskStatus() {
-        // No-op.
-    }
-
-    /**
-     * Creates new instance.
-     *
-     * @param state Task state.
-     * @param failCause Failure cause (if any).
-     */
-    public GridHadoopTaskStatus(GridHadoopTaskState state, @Nullable Throwable failCause) {
-        this(state, failCause, null);
-    }
-
-    /**
-     * Creates new instance.
-     *
-     * @param state Task state.
-     * @param failCause Failure cause (if any).
-     * @param cntrs Task counters.
-     */
-    public GridHadoopTaskStatus(GridHadoopTaskState state, @Nullable Throwable failCause,
-        @Nullable GridHadoopCounters cntrs) {
-        assert state != null;
-
-        this.state = state;
-        this.failCause = failCause;
-        this.cntrs = cntrs;
-    }
-
-    /**
-     * @return State.
-     */
-    public GridHadoopTaskState state() {
-        return state;
-    }
-
-    /**
-     * @return Fail cause.
-     */
-    @Nullable public Throwable failCause() {
-        return failCause;
-    }
-
-    /**
-     * @return Counters.
-     */
-    @Nullable public GridHadoopCounters counters() {
-        return cntrs;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridHadoopTaskStatus.class, this);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeObject(state);
-        out.writeObject(failCause);
-        out.writeObject(cntrs);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        state = (GridHadoopTaskState)in.readObject();
-        failCause = (Throwable)in.readObject();
-        cntrs = (GridHadoopCounters)in.readObject();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
new file mode 100644
index 0000000..a3c20d8
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+
+/**
+ * Task executor.
+ */
+public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter {
+    /** Job tracker. */
+    private HadoopJobTracker jobTracker;
+
+    /** */
+    private final ConcurrentMap<HadoopJobId, Collection<HadoopRunnableTask>> jobs = new ConcurrentHashMap<>();
+
+    /** Executor service to run tasks. */
+    private HadoopExecutorService exec;
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStart() throws IgniteCheckedException {
+        super.onKernalStart();
+
+        jobTracker = ctx.jobTracker();
+
+        exec = new HadoopExecutorService(log, ctx.kernalContext().gridName(),
+            ctx.configuration().getMaxParallelTasks(), ctx.configuration().getMaxTaskQueueSize());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        if (exec != null) {
+            exec.shutdown(3000);
+
+            if (cancel) {
+                for (HadoopJobId jobId : jobs.keySet())
+                    cancelTasks(jobId);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) {
+        if (exec != null && !exec.shutdown(30000))
+            U.warn(log, "Failed to finish running tasks in 30 sec.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void run(final HadoopJob job, Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException {
+        if (log.isDebugEnabled())
+            log.debug("Submitting tasks for local execution [locNodeId=" + ctx.localNodeId() +
+                ", tasksCnt=" + tasks.size() + ']');
+
+        Collection<HadoopRunnableTask> executedTasks = jobs.get(job.id());
+
+        if (executedTasks == null) {
+            executedTasks = new GridConcurrentHashSet<>();
+
+            Collection<HadoopRunnableTask> extractedCol = jobs.put(job.id(), executedTasks);
+
+            assert extractedCol == null;
+        }
+
+        final Collection<HadoopRunnableTask> finalExecutedTasks = executedTasks;
+
+        for (final HadoopTaskInfo info : tasks) {
+            assert info != null;
+
+            HadoopRunnableTask task = new HadoopRunnableTask(log, job, ctx.shuffle().memory(), info,
+                ctx.localNodeId()) {
+                @Override protected void onTaskFinished(HadoopTaskStatus status) {
+                    if (log.isDebugEnabled())
+                        log.debug("Finished task execution [jobId=" + job.id() + ", taskInfo=" + info + ", " +
+                            "waitTime=" + waitTime() + ", execTime=" + executionTime() + ']');
+
+                    finalExecutedTasks.remove(this);
+
+                    jobTracker.onTaskFinished(info, status);
+                }
+
+                @Override protected HadoopTaskInput createInput(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+                    return ctx.shuffle().input(taskCtx);
+                }
+
+                @Override protected HadoopTaskOutput createOutput(HadoopTaskContext taskCtx) throws IgniteCheckedException {
+                    return ctx.shuffle().output(taskCtx);
+                }
+            };
+
+            executedTasks.add(task);
+
+            exec.submit(task);
+        }
+    }
+
+    /**
+     * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks
+     * for this job ID.
+     * <p>
+     * It is guaranteed that this method will not be called concurrently with
+     * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method. No more job submissions will be performed via
+     * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method for given job ID after this method is called.
+     *
+     * @param jobId Job ID to cancel.
+     */
+    @Override public void cancelTasks(HadoopJobId jobId) {
+        Collection<HadoopRunnableTask> executedTasks = jobs.get(jobId);
+
+        if (executedTasks != null) {
+            for (HadoopRunnableTask task : executedTasks)
+                task.cancel();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onJobStateChanged(HadoopJobMetadata meta) throws IgniteCheckedException {
+        if (meta.phase() == HadoopJobPhase.PHASE_COMPLETE) {
+            Collection<HadoopRunnableTask> executedTasks = jobs.remove(meta.jobId());
+
+            assert executedTasks == null || executedTasks.isEmpty();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java
new file mode 100644
index 0000000..1c318e9
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor;
+
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.util.worker.*;
+import org.apache.ignite.thread.*;
+import org.jdk8.backport.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static java.util.Collections.*;
+
+/**
+ * Executor service without thread pooling.
+ */
+public class HadoopExecutorService {
+    /** */
+    private final LinkedBlockingQueue<Callable<?>> queue;
+
+    /** */
+    private final Collection<GridWorker> workers = newSetFromMap(new ConcurrentHashMap8<GridWorker, Boolean>());
+
+    /** */
+    private final AtomicInteger active = new AtomicInteger();
+
+    /** */
+    private final int maxTasks;
+
+    /** */
+    private final String gridName;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private volatile boolean shutdown;
+
+    /** */
+    private final GridWorkerListener lsnr = new GridWorkerListenerAdapter() {
+            @Override public void onStopped(GridWorker w) {
+                workers.remove(w);
+
+                if (shutdown) {
+                    active.decrementAndGet();
+
+                    return;
+                }
+
+                Callable<?> task = queue.poll();
+
+                if (task != null)
+                    startThread(task);
+                else {
+                    active.decrementAndGet();
+
+                    if (!queue.isEmpty())
+                        startFromQueue();
+                }
+            }
+        };
+
+    /**
+     * @param log Logger.
+     * @param gridName Grid name.
+     * @param maxTasks Max number of tasks.
+     * @param maxQueue Max queue length.
+     */
+    public HadoopExecutorService(IgniteLogger log, String gridName, int maxTasks, int maxQueue) {
+        assert maxTasks > 0 : maxTasks;
+        assert maxQueue > 0 : maxQueue;
+
+        this.maxTasks = maxTasks;
+        this.queue = new LinkedBlockingQueue<>(maxQueue);
+        this.gridName = gridName;
+        this.log = log.getLogger(HadoopExecutorService.class);
+    }
+
+    /**
+     * @return Number of active workers.
+     */
+    public int active() {
+        return workers.size();
+    }
+
+    /**
+     * Submit task.
+     *
+     * @param task Task.
+     */
+    public void submit(Callable<?> task) {
+        while (queue.isEmpty()) {
+            int active0 = active.get();
+
+            if (active0 == maxTasks)
+                break;
+
+            if (active.compareAndSet(active0, active0 + 1)) {
+                startThread(task);
+
+                return; // Started in new thread bypassing queue.
+            }
+        }
+
+        try {
+            while (!queue.offer(task, 100, TimeUnit.MILLISECONDS)) {
+                if (shutdown)
+                    return; // Rejected due to shutdown.
+            }
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            return;
+        }
+
+        startFromQueue();
+    }
+
+    /**
+     * Attempts to start task from queue.
+     */
+    private void startFromQueue() {
+        do {
+            int active0 = active.get();
+
+            if (active0 == maxTasks)
+                break;
+
+            if (active.compareAndSet(active0, active0 + 1)) {
+                Callable<?> task = queue.poll();
+
+                if (task == null) {
+                    int res = active.decrementAndGet();
+
+                    assert res >= 0 : res;
+
+                    break;
+                }
+
+                startThread(task);
+            }
+        }
+        while (!queue.isEmpty());
+    }
+
+    /**
+     * @param task Task.
+     */
+    private void startThread(final Callable<?> task) {
+        String workerName;
+
+        if (task instanceof HadoopRunnableTask) {
+            final HadoopTaskInfo i = ((HadoopRunnableTask)task).taskInfo();
+
+            workerName = "Hadoop-task-" + i.jobId() + "-" + i.type() + "-" + i.taskNumber() + "-" + i.attempt();
+        }
+        else
+            workerName = task.toString();
+
+        GridWorker w = new GridWorker(gridName, workerName, log, lsnr) {
+            @Override protected void body() {
+                try {
+                    task.call();
+                }
+                catch (Exception e) {
+                    log.error("Failed to execute task: " + task, e);
+                }
+            }
+        };
+
+        workers.add(w);
+
+        if (shutdown)
+            w.cancel();
+
+        new IgniteThread(w).start();
+    }
+
+    /**
+     * Shuts down this executor service.
+     *
+     * @param awaitTimeMillis Time in milliseconds to wait for tasks completion.
+     * @return {@code true} If all tasks completed.
+     */
+    public boolean shutdown(long awaitTimeMillis) {
+        shutdown = true;
+
+        for (GridWorker w : workers)
+            w.cancel();
+
+        while (awaitTimeMillis > 0 && !workers.isEmpty()) {
+            try {
+                Thread.sleep(100);
+
+                awaitTimeMillis -= 100;
+            }
+            catch (InterruptedException e) {
+                break;
+            }
+        }
+
+        return workers.isEmpty();
+    }
+
+    /**
+     * @return {@code true} If method {@linkplain #shutdown(long)} was already called.
+     */
+    public boolean isShutdown() {
+        return shutdown;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
new file mode 100644
index 0000000..2b36267
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.counter.*;
+import org.apache.ignite.internal.processors.hadoop.shuffle.collections.*;
+import org.apache.ignite.internal.util.offheap.unsafe.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.*;
+import static org.apache.ignite.internal.processors.hadoop.HadoopTaskType.*;
+
+/**
+ * Runnable task.
+ */
+public abstract class HadoopRunnableTask implements Callable<Void> {
+    /** */
+    private final GridUnsafeMemory mem;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private final HadoopJob job;
+
+    /** Task to run. */
+    private final HadoopTaskInfo info;
+
+    /** Submit time. */
+    private final long submitTs = U.currentTimeMillis();
+
+    /** Execution start timestamp. */
+    private long execStartTs;
+
+    /** Execution end timestamp. */
+    private long execEndTs;
+
+    /** */
+    private HadoopMultimap combinerInput;
+
+    /** */
+    private volatile HadoopTaskContext ctx;
+
+    /** Set if task is to cancelling. */
+    private volatile boolean cancelled;
+
+    /** Node id. */
+    private UUID nodeId;
+
+    /**
+     * @param log Log.
+     * @param job Job.
+     * @param mem Memory.
+     * @param info Task info.
+     * @param nodeId Node id.
+     */
+    protected HadoopRunnableTask(IgniteLogger log, HadoopJob job, GridUnsafeMemory mem, HadoopTaskInfo info,
+        UUID nodeId) {
+        this.nodeId = nodeId;
+        this.log = log.getLogger(HadoopRunnableTask.class);
+        this.job = job;
+        this.mem = mem;
+        this.info = info;
+    }
+
+    /**
+     * @return Wait time.
+     */
+    public long waitTime() {
+        return execStartTs - submitTs;
+    }
+
+    /**
+     * @return Execution time.
+     */
+    public long executionTime() {
+        return execEndTs - execStartTs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Void call() throws IgniteCheckedException {
+        execStartTs = U.currentTimeMillis();
+
+        Throwable err = null;
+
+        HadoopTaskState state = HadoopTaskState.COMPLETED;
+
+        HadoopPerformanceCounter perfCntr = null;
+
+        try {
+            ctx = job.getTaskContext(info);
+
+            perfCntr = HadoopPerformanceCounter.getCounter(ctx.counters(), nodeId);
+
+            perfCntr.onTaskSubmit(info, submitTs);
+            perfCntr.onTaskPrepare(info, execStartTs);
+
+            ctx.prepareTaskEnvironment();
+
+            runTask(perfCntr);
+
+            if (info.type() == MAP && job.info().hasCombiner()) {
+                ctx.taskInfo(new HadoopTaskInfo(COMBINE, info.jobId(), info.taskNumber(), info.attempt(), null));
+
+                try {
+                    runTask(perfCntr);
+                }
+                finally {
+                    ctx.taskInfo(info);
+                }
+            }
+        }
+        catch (HadoopTaskCancelledException ignored) {
+            state = HadoopTaskState.CANCELED;
+        }
+        catch (Throwable e) {
+            state = HadoopTaskState.FAILED;
+            err = e;
+
+            U.error(log, "Task execution failed.", e);
+        }
+        finally {
+            execEndTs = U.currentTimeMillis();
+
+            if (perfCntr != null)
+                perfCntr.onTaskFinish(info, execEndTs);
+
+            onTaskFinished(new HadoopTaskStatus(state, err, ctx==null ? null : ctx.counters()));
+
+            if (combinerInput != null)
+                combinerInput.close();
+
+            if (ctx != null)
+                ctx.cleanupTaskEnvironment();
+        }
+
+        return null;
+    }
+
+    /**
+     * @param perfCntr Performance counter.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void runTask(HadoopPerformanceCounter perfCntr) throws IgniteCheckedException {
+        if (cancelled)
+            throw new HadoopTaskCancelledException("Task cancelled.");
+
+        try (HadoopTaskOutput out = createOutputInternal(ctx);
+             HadoopTaskInput in = createInputInternal(ctx)) {
+
+            ctx.input(in);
+            ctx.output(out);
+
+            perfCntr.onTaskStart(ctx.taskInfo(), U.currentTimeMillis());
+
+            ctx.run();
+        }
+    }
+
+    /**
+     * Cancel the executed task.
+     */
+    public void cancel() {
+        cancelled = true;
+
+        if (ctx != null)
+            ctx.cancel();
+    }
+
+    /**
+     * @param status Task status.
+     */
+    protected abstract void onTaskFinished(HadoopTaskStatus status);
+
+    /**
+     * @param ctx Task context.
+     * @return Task input.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("unchecked")
+    private HadoopTaskInput createInputInternal(HadoopTaskContext ctx) throws IgniteCheckedException {
+        switch (ctx.taskInfo().type()) {
+            case SETUP:
+            case MAP:
+            case COMMIT:
+            case ABORT:
+                return null;
+
+            case COMBINE:
+                assert combinerInput != null;
+
+                return combinerInput.input(ctx);
+
+            default:
+                return createInput(ctx);
+        }
+    }
+
+    /**
+     * @param ctx Task context.
+     * @return Input.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected abstract HadoopTaskInput createInput(HadoopTaskContext ctx) throws IgniteCheckedException;
+
+    /**
+     * @param ctx Task info.
+     * @return Output.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected abstract HadoopTaskOutput createOutput(HadoopTaskContext ctx) throws IgniteCheckedException;
+
+    /**
+     * @param ctx Task info.
+     * @return Task output.
+     * @throws IgniteCheckedException If failed.
+     */
+    private HadoopTaskOutput createOutputInternal(HadoopTaskContext ctx) throws IgniteCheckedException {
+        switch (ctx.taskInfo().type()) {
+            case SETUP:
+            case REDUCE:
+            case COMMIT:
+            case ABORT:
+                return null;
+
+            case MAP:
+                if (job.info().hasCombiner()) {
+                    assert combinerInput == null;
+
+                    combinerInput = get(job.info(), SHUFFLE_COMBINER_NO_SORTING, false) ?
+                        new HadoopHashMultimap(job.info(), mem, get(job.info(), COMBINER_HASHMAP_SIZE, 8 * 1024)):
+                        new HadoopSkipList(job.info(), mem); // TODO replace with red-black tree
+
+                    return combinerInput.startAdding(ctx);
+                }
+
+            default:
+                return createOutput(ctx);
+        }
+    }
+
+    /**
+     * @return Task info.
+     */
+    public HadoopTaskInfo taskInfo() {
+        return info;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java
new file mode 100644
index 0000000..39b4935
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskExecutorAdapter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.hadoop.*;
+import org.apache.ignite.internal.processors.hadoop.jobtracker.*;
+
+import java.util.*;
+
+/**
+ * Common superclass for task executor.
+ */
+public abstract class HadoopTaskExecutorAdapter extends HadoopComponent {
+    /**
+     * Runs tasks.
+     *
+     * @param job Job.
+     * @param tasks Tasks.
+     * @throws IgniteCheckedException If failed.
+     */
+    public abstract void run(final HadoopJob job, Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException;
+
+    /**
+     * Cancels all currently running tasks for given job ID and cancels scheduled execution of tasks
+     * for this job ID.
+     * <p>
+     * It is guaranteed that this method will not be called concurrently with
+     * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method. No more job submissions will be performed via
+     * {@link #run(org.apache.ignite.internal.processors.hadoop.HadoopJob, Collection)} method for given job ID after this method is called.
+     *
+     * @param jobId Job ID to cancel.
+     */
+    public abstract void cancelTasks(HadoopJobId jobId) throws IgniteCheckedException;
+
+    /**
+     * On job state change callback;
+     *
+     * @param meta Job metadata.
+     */
+    public abstract void onJobStateChanged(HadoopJobMetadata meta) throws IgniteCheckedException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java
new file mode 100644
index 0000000..cf2a28e
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskState.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor;
+
+/**
+* State of the task.
+*/
+public enum HadoopTaskState {
+    /** Running task. */
+    RUNNING,
+
+    /** Completed task. */
+    COMPLETED,
+
+    /** Failed task. */
+    FAILED,
+
+    /** Canceled task. */
+    CANCELED,
+
+    /** Process crashed. */
+    CRASHED
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java
new file mode 100644
index 0000000..c5ee16c
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopTaskStatus.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.taskexecutor;
+
+import org.apache.ignite.internal.processors.hadoop.counter.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Task status.
+ */
+public class HadoopTaskStatus implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private HadoopTaskState state;
+
+    /** */
+    private Throwable failCause;
+
+    /** */
+    private HadoopCounters cntrs;
+
+    /**
+     * Default constructor required by {@link Externalizable}.
+     */
+    public HadoopTaskStatus() {
+        // No-op.
+    }
+
+    /**
+     * Creates new instance.
+     *
+     * @param state Task state.
+     * @param failCause Failure cause (if any).
+     */
+    public HadoopTaskStatus(HadoopTaskState state, @Nullable Throwable failCause) {
+        this(state, failCause, null);
+    }
+
+    /**
+     * Creates new instance.
+     *
+     * @param state Task state.
+     * @param failCause Failure cause (if any).
+     * @param cntrs Task counters.
+     */
+    public HadoopTaskStatus(HadoopTaskState state, @Nullable Throwable failCause,
+        @Nullable HadoopCounters cntrs) {
+        assert state != null;
+
+        this.state = state;
+        this.failCause = failCause;
+        this.cntrs = cntrs;
+    }
+
+    /**
+     * @return State.
+     */
+    public HadoopTaskState state() {
+        return state;
+    }
+
+    /**
+     * @return Fail cause.
+     */
+    @Nullable public Throwable failCause() {
+        return failCause;
+    }
+
+    /**
+     * @return Counters.
+     */
+    @Nullable public HadoopCounters counters() {
+        return cntrs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopTaskStatus.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(state);
+        out.writeObject(failCause);
+        out.writeObject(cntrs);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        state = (HadoopTaskState)in.readObject();
+        failCause = (Throwable)in.readObject();
+        cntrs = (HadoopCounters)in.readObject();
+    }
+}


Mime
View raw message