Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9FD19106A6 for ; Thu, 5 Mar 2015 09:07:43 +0000 (UTC) Received: (qmail 57023 invoked by uid 500); 5 Mar 2015 09:07:40 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 56991 invoked by uid 500); 5 Mar 2015 09:07:40 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 56982 invoked by uid 99); 5 Mar 2015 09:07:40 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Mar 2015 09:07:40 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 05 Mar 2015 09:05:05 +0000 Received: (qmail 42396 invoked by uid 99); 5 Mar 2015 09:05:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 Mar 2015 09:05:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 51B76E0FCA; Thu, 5 Mar 2015 09:05:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yzhdanov@apache.org To: commits@ignite.incubator.apache.org Date: Thu, 05 Mar 2015 09:05:01 -0000 Message-Id: <56d156eb01174f0b88f954783fd4b143@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [01/58] [abbrv] incubator-ignite git commit: IGNITE-386: Squashed changes. X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-ignite Updated Branches: refs/heads/ignite-313 bc18635a9 -> 401835cc6 http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMapSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMapSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMapSelfTest.java new file mode 100644 index 0000000..5b1b6a8 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopHashMapSelfTest.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.collections; + +import com.google.common.collect.*; +import org.apache.hadoop.io.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.offheap.unsafe.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.util.*; +import java.util.concurrent.*; + +/** + * + */ +public class HadoopHashMapSelfTest extends HadoopAbstractMapTest { + + public void _testAllocation() throws Exception { + final GridUnsafeMemory mem = new GridUnsafeMemory(0); + + long size = 3L * 1024 * 1024 * 1024; + + final long chunk = 16;// * 1024; + + final int page = 4 * 1024; + + final int writes = chunk < page ? 1 : (int)(chunk / page); + + final long cnt = size / chunk; + + assert cnt < Integer.MAX_VALUE; + + final int threads = 4; + + long start = System.currentTimeMillis(); + + multithreaded(new Callable() { + @Override public Object call() throws Exception { + int cnt0 = (int)(cnt / threads); + + for (int i = 0; i < cnt0; i++) { + long ptr = mem.allocate(chunk); + + for (int j = 0; j < writes; j++) + mem.writeInt(ptr + j * page, 100500); + } + + return null; + } + }, threads); + + X.println("End: " + (System.currentTimeMillis() - start) + " mem: " + mem.allocatedSize() + " cnt: " + cnt); + + Thread.sleep(30000); + } + + + /** */ + public void testMapSimple() throws Exception { + GridUnsafeMemory mem = new GridUnsafeMemory(0); + +// mem.listen(new GridOffHeapEventListener() { +// @Override public void onEvent(GridOffHeapEvent evt) { +// if (evt == GridOffHeapEvent.ALLOCATE) +// U.dumpStack(); +// } +// }); + + Random rnd = new Random(); + + int mapSize = 16 << rnd.nextInt(3); + + HadoopTaskContext taskCtx = new TaskContext(); + + final HadoopHashMultimap m = new HadoopHashMultimap(new JobInfo(), mem, mapSize); + + HadoopMultimap.Adder a = m.startAdding(taskCtx); + + Multimap mm = ArrayListMultimap.create(); + + for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) { + int key = rnd.nextInt(mapSize); + int val = rnd.nextInt(); + + a.write(new IntWritable(key), new IntWritable(val)); + mm.put(key, val); + + X.println("k: " + key + " v: " + val); + + a.close(); + + check(m, mm, taskCtx); + + a = m.startAdding(taskCtx); + } + +// a.add(new IntWritable(10), new IntWritable(2)); +// mm.put(10, 2); +// check(m, mm); + + a.close(); + + X.println("Alloc: " + mem.allocatedSize()); + + m.close(); + + assertEquals(0, mem.allocatedSize()); + } + + private void check(HadoopHashMultimap m, Multimap mm, HadoopTaskContext taskCtx) throws Exception { + final HadoopTaskInput in = m.input(taskCtx); + + Map> mmm = mm.asMap(); + + int keys = 0; + + while (in.next()) { + keys++; + + IntWritable k = (IntWritable)in.key(); + + assertNotNull(k); + + ArrayList vs = new ArrayList<>(); + + Iterator it = in.values(); + + while (it.hasNext()) + vs.add(((IntWritable) it.next()).get()); + + Collection exp = mmm.get(k.get()); + + assertEquals(sorted(exp), sorted(vs)); + } + + X.println("keys: " + keys + " cap: " + m.capacity()); + + assertEquals(mmm.size(), keys); + + assertEquals(m.keys(), keys); + + in.close(); + } + + private GridLongList sorted(Collection col) { + GridLongList lst = new GridLongList(col.size()); + + for (Integer i : col) + lst.add(i); + + return lst.sort(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java new file mode 100644 index 0000000..8a046e0 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopSkipListSelfTest.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.collections; + +import com.google.common.collect.*; +import org.apache.hadoop.io.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.io.*; +import org.apache.ignite.internal.util.offheap.unsafe.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +import static java.lang.Math.*; +import static org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory.*; + +/** + * Skip list tests. + */ +public class HadoopSkipListSelfTest extends HadoopAbstractMapTest { + /** + * + */ + public void testLevel() { + Random rnd = new GridRandom(); + + int[] levelsCnts = new int[32]; + + int all = 10000; + + for (int i = 0; i < all; i++) { + int level = HadoopSkipList.randomLevel(rnd); + + levelsCnts[level]++; + } + + X.println("Distribution: " + Arrays.toString(levelsCnts)); + + for (int level = 0; level < levelsCnts.length; level++) { + int exp = (level + 1) == levelsCnts.length ? 0 : all >>> (level + 1); + + double precission = 0.72 / Math.max(32 >>> level, 1); + + int sigma = max((int)ceil(precission * exp), 5); + + X.println("Level: " + level + " exp: " + exp + " act: " + levelsCnts[level] + " precission: " + precission + + " sigma: " + sigma); + + assertTrue(abs(exp - levelsCnts[level]) <= sigma); + } + } + + public void testMapSimple() throws Exception { + GridUnsafeMemory mem = new GridUnsafeMemory(0); + +// mem.listen(new GridOffHeapEventListener() { +// @Override public void onEvent(GridOffHeapEvent evt) { +// if (evt == GridOffHeapEvent.ALLOCATE) +// U.dumpStack(); +// } +// }); + + Random rnd = new Random(); + + int mapSize = 16 << rnd.nextInt(6); + + HadoopJobInfo job = new JobInfo(); + + HadoopTaskContext taskCtx = new TaskContext(); + + HadoopMultimap m = new HadoopSkipList(job, mem); + + HadoopConcurrentHashMultimap.Adder a = m.startAdding(taskCtx); + + Multimap mm = ArrayListMultimap.create(); + Multimap vis = ArrayListMultimap.create(); + + for (int i = 0, vals = 4 * mapSize + rnd.nextInt(25); i < vals; i++) { + int key = rnd.nextInt(mapSize); + int val = rnd.nextInt(); + + a.write(new IntWritable(key), new IntWritable(val)); + mm.put(key, val); + + X.println("k: " + key + " v: " + val); + + a.close(); + + check(m, mm, vis, taskCtx); + + a = m.startAdding(taskCtx); + } + +// a.add(new IntWritable(10), new IntWritable(2)); +// mm.put(10, 2); +// check(m, mm); + + a.close(); + + X.println("Alloc: " + mem.allocatedSize()); + + m.close(); + + assertEquals(0, mem.allocatedSize()); + } + + private void check(HadoopMultimap m, Multimap mm, final Multimap vis, HadoopTaskContext taskCtx) + throws Exception { + final HadoopTaskInput in = m.input(taskCtx); + + Map> mmm = mm.asMap(); + + int keys = 0; + + int prevKey = Integer.MIN_VALUE; + + while (in.next()) { + keys++; + + IntWritable k = (IntWritable)in.key(); + + assertNotNull(k); + + assertTrue(k.get() > prevKey); + + prevKey = k.get(); + + Deque vs = new LinkedList<>(); + + Iterator it = in.values(); + + while (it.hasNext()) + vs.addFirst(((IntWritable) it.next()).get()); + + Collection exp = mmm.get(k.get()); + + assertEquals(exp, vs); + } + + assertEquals(mmm.size(), keys); + +//! assertEquals(m.keys(), keys); + + // Check visitor. + + final byte[] buf = new byte[4]; + + final GridDataInput dataInput = new GridUnsafeDataInput(); + + m.visit(false, new HadoopConcurrentHashMultimap.Visitor() { + /** */ + IntWritable key = new IntWritable(); + + /** */ + IntWritable val = new IntWritable(); + + @Override public void onKey(long keyPtr, int keySize) { + read(keyPtr, keySize, key); + } + + @Override public void onValue(long valPtr, int valSize) { + read(valPtr, valSize, val); + + vis.put(key.get(), val.get()); + } + + private void read(long ptr, int size, Writable w) { + assert size == 4 : size; + + UNSAFE.copyMemory(null, ptr, buf, BYTE_ARR_OFF, size); + + dataInput.bytes(buf, size); + + try { + w.readFields(dataInput); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + }); + +// X.println("vis: " + vis); + + assertEquals(mm, vis); + + in.close(); + } + + /** + * @throws Exception if failed. + */ + public void testMultiThreaded() throws Exception { + GridUnsafeMemory mem = new GridUnsafeMemory(0); + + X.println("___ Started"); + + Random rnd = new GridRandom(); + + for (int i = 0; i < 20; i++) { + HadoopJobInfo job = new JobInfo(); + + final HadoopTaskContext taskCtx = new TaskContext(); + + final HadoopMultimap m = new HadoopSkipList(job, mem); + + final ConcurrentMap> mm = new ConcurrentHashMap<>(); + + X.println("___ MT"); + + multithreaded(new Callable() { + @Override public Object call() throws Exception { + X.println("___ TH in"); + + Random rnd = new GridRandom(); + + IntWritable key = new IntWritable(); + IntWritable val = new IntWritable(); + + HadoopMultimap.Adder a = m.startAdding(taskCtx); + + for (int i = 0; i < 50000; i++) { + int k = rnd.nextInt(32000); + int v = rnd.nextInt(); + + key.set(k); + val.set(v); + + a.write(key, val); + + Collection list = mm.get(k); + + if (list == null) { + list = new ConcurrentLinkedQueue<>(); + + Collection old = mm.putIfAbsent(k, list); + + if (old != null) + list = old; + } + + list.add(v); + } + + a.close(); + + X.println("___ TH out"); + + return null; + } + }, 3 + rnd.nextInt(27)); + + HadoopTaskInput in = m.input(taskCtx); + + int prevKey = Integer.MIN_VALUE; + + while (in.next()) { + IntWritable key = (IntWritable)in.key(); + + assertTrue(key.get() > prevKey); + + prevKey = key.get(); + + Iterator valsIter = in.values(); + + Collection vals = mm.remove(key.get()); + + assertNotNull(vals); + + while (valsIter.hasNext()) { + IntWritable val = (IntWritable) valsIter.next(); + + assertTrue(vals.remove(val.get())); + } + + assertTrue(vals.isEmpty()); + } + + in.close(); + m.close(); + + assertEquals(0, mem.allocatedSize()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataStreamSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataStreamSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataStreamSelfTest.java deleted file mode 100644 index 39a537b..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/GridHadoopDataStreamSelfTest.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.shuffle.streams; - -import org.apache.ignite.internal.util.offheap.unsafe.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.io.*; -import java.util.*; - -/** - * - */ -public class GridHadoopDataStreamSelfTest extends GridCommonAbstractTest { - - public void testStreams() throws IOException { - GridUnsafeMemory mem = new GridUnsafeMemory(0); - - GridHadoopDataOutStream out = new GridHadoopDataOutStream(mem); - - int size = 4 * 1024; - - final long ptr = mem.allocate(size); - - out.buffer().set(ptr, size); - - out.writeBoolean(false); - out.writeBoolean(true); - out.writeBoolean(false); - out.write(17); - out.write(121); - out.write(0xfafa); - out.writeByte(17); - out.writeByte(121); - out.writeByte(0xfafa); - out.writeChar('z'); - out.writeChar('o'); - out.writeChar('r'); - out.writeShort(100); - out.writeShort(Short.MIN_VALUE); - out.writeShort(Short.MAX_VALUE); - out.writeShort(65535); - out.writeShort(65536); // 0 - out.writeInt(Integer.MAX_VALUE); - out.writeInt(Integer.MIN_VALUE); - out.writeInt(-1); - out.writeInt(0); - out.writeInt(1); - out.writeFloat(0.33f); - out.writeFloat(0.5f); - out.writeFloat(-0.7f); - out.writeFloat(Float.MAX_VALUE); - out.writeFloat(Float.MIN_VALUE); - out.writeFloat(Float.MIN_NORMAL); - out.writeFloat(Float.POSITIVE_INFINITY); - out.writeFloat(Float.NEGATIVE_INFINITY); - out.writeFloat(Float.NaN); - out.writeDouble(-12312312.3333333336666779); - out.writeDouble(123123.234); - out.writeDouble(Double.MAX_VALUE); - out.writeDouble(Double.MIN_VALUE); - out.writeDouble(Double.MIN_NORMAL); - out.writeDouble(Double.NEGATIVE_INFINITY); - out.writeDouble(Double.POSITIVE_INFINITY); - out.writeDouble(Double.NaN); - out.writeLong(Long.MAX_VALUE); - out.writeLong(Long.MIN_VALUE); - out.writeLong(0); - out.writeLong(-1L); - out.write(new byte[]{1,2,3}); - out.write(new byte[]{0,1,2,3}, 1, 2); - out.writeUTF("mom washes rum"); - - GridHadoopDataInStream in = new GridHadoopDataInStream(mem); - - in.buffer().set(ptr, out.buffer().pointer()); - - assertEquals(false, in.readBoolean()); - assertEquals(true, in.readBoolean()); - assertEquals(false, in.readBoolean()); - assertEquals(17, in.read()); - assertEquals(121, in.read()); - assertEquals(0xfa, in.read()); - assertEquals(17, in.readByte()); - assertEquals(121, in.readByte()); - assertEquals((byte)0xfa, in.readByte()); - assertEquals('z', in.readChar()); - assertEquals('o', in.readChar()); - assertEquals('r', in.readChar()); - assertEquals(100, in.readShort()); - assertEquals(Short.MIN_VALUE, in.readShort()); - assertEquals(Short.MAX_VALUE, in.readShort()); - assertEquals(-1, in.readShort()); - assertEquals(0, in.readShort()); - assertEquals(Integer.MAX_VALUE, in.readInt()); - assertEquals(Integer.MIN_VALUE, in.readInt()); - assertEquals(-1, in.readInt()); - assertEquals(0, in.readInt()); - assertEquals(1, in.readInt()); - assertEquals(0.33f, in.readFloat()); - assertEquals(0.5f, in.readFloat()); - assertEquals(-0.7f, in.readFloat()); - assertEquals(Float.MAX_VALUE, in.readFloat()); - assertEquals(Float.MIN_VALUE, in.readFloat()); - assertEquals(Float.MIN_NORMAL, in.readFloat()); - assertEquals(Float.POSITIVE_INFINITY, in.readFloat()); - assertEquals(Float.NEGATIVE_INFINITY, in.readFloat()); - assertEquals(Float.NaN, in.readFloat()); - assertEquals(-12312312.3333333336666779, in.readDouble()); - assertEquals(123123.234, in.readDouble()); - assertEquals(Double.MAX_VALUE, in.readDouble()); - assertEquals(Double.MIN_VALUE, in.readDouble()); - assertEquals(Double.MIN_NORMAL, in.readDouble()); - assertEquals(Double.NEGATIVE_INFINITY, in.readDouble()); - assertEquals(Double.POSITIVE_INFINITY, in.readDouble()); - assertEquals(Double.NaN, in.readDouble()); - assertEquals(Long.MAX_VALUE, in.readLong()); - assertEquals(Long.MIN_VALUE, in.readLong()); - assertEquals(0, in.readLong()); - assertEquals(-1, in.readLong()); - - byte[] b = new byte[3]; - - in.read(b); - - assertTrue(Arrays.equals(new byte[]{1,2,3}, b)); - - b = new byte[4]; - - in.read(b, 1, 2); - - assertTrue(Arrays.equals(new byte[]{0, 1, 2, 0}, b)); - - assertEquals("mom washes rum", in.readUTF()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataStreamSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataStreamSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataStreamSelfTest.java new file mode 100644 index 0000000..48b99ab --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/shuffle/streams/HadoopDataStreamSelfTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.shuffle.streams; + +import org.apache.ignite.internal.util.offheap.unsafe.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; + +/** + * + */ +public class HadoopDataStreamSelfTest extends GridCommonAbstractTest { + + public void testStreams() throws IOException { + GridUnsafeMemory mem = new GridUnsafeMemory(0); + + HadoopDataOutStream out = new HadoopDataOutStream(mem); + + int size = 4 * 1024; + + final long ptr = mem.allocate(size); + + out.buffer().set(ptr, size); + + out.writeBoolean(false); + out.writeBoolean(true); + out.writeBoolean(false); + out.write(17); + out.write(121); + out.write(0xfafa); + out.writeByte(17); + out.writeByte(121); + out.writeByte(0xfafa); + out.writeChar('z'); + out.writeChar('o'); + out.writeChar('r'); + out.writeShort(100); + out.writeShort(Short.MIN_VALUE); + out.writeShort(Short.MAX_VALUE); + out.writeShort(65535); + out.writeShort(65536); // 0 + out.writeInt(Integer.MAX_VALUE); + out.writeInt(Integer.MIN_VALUE); + out.writeInt(-1); + out.writeInt(0); + out.writeInt(1); + out.writeFloat(0.33f); + out.writeFloat(0.5f); + out.writeFloat(-0.7f); + out.writeFloat(Float.MAX_VALUE); + out.writeFloat(Float.MIN_VALUE); + out.writeFloat(Float.MIN_NORMAL); + out.writeFloat(Float.POSITIVE_INFINITY); + out.writeFloat(Float.NEGATIVE_INFINITY); + out.writeFloat(Float.NaN); + out.writeDouble(-12312312.3333333336666779); + out.writeDouble(123123.234); + out.writeDouble(Double.MAX_VALUE); + out.writeDouble(Double.MIN_VALUE); + out.writeDouble(Double.MIN_NORMAL); + out.writeDouble(Double.NEGATIVE_INFINITY); + out.writeDouble(Double.POSITIVE_INFINITY); + out.writeDouble(Double.NaN); + out.writeLong(Long.MAX_VALUE); + out.writeLong(Long.MIN_VALUE); + out.writeLong(0); + out.writeLong(-1L); + out.write(new byte[]{1,2,3}); + out.write(new byte[]{0,1,2,3}, 1, 2); + out.writeUTF("mom washes rum"); + + HadoopDataInStream in = new HadoopDataInStream(mem); + + in.buffer().set(ptr, out.buffer().pointer()); + + assertEquals(false, in.readBoolean()); + assertEquals(true, in.readBoolean()); + assertEquals(false, in.readBoolean()); + assertEquals(17, in.read()); + assertEquals(121, in.read()); + assertEquals(0xfa, in.read()); + assertEquals(17, in.readByte()); + assertEquals(121, in.readByte()); + assertEquals((byte)0xfa, in.readByte()); + assertEquals('z', in.readChar()); + assertEquals('o', in.readChar()); + assertEquals('r', in.readChar()); + assertEquals(100, in.readShort()); + assertEquals(Short.MIN_VALUE, in.readShort()); + assertEquals(Short.MAX_VALUE, in.readShort()); + assertEquals(-1, in.readShort()); + assertEquals(0, in.readShort()); + assertEquals(Integer.MAX_VALUE, in.readInt()); + assertEquals(Integer.MIN_VALUE, in.readInt()); + assertEquals(-1, in.readInt()); + assertEquals(0, in.readInt()); + assertEquals(1, in.readInt()); + assertEquals(0.33f, in.readFloat()); + assertEquals(0.5f, in.readFloat()); + assertEquals(-0.7f, in.readFloat()); + assertEquals(Float.MAX_VALUE, in.readFloat()); + assertEquals(Float.MIN_VALUE, in.readFloat()); + assertEquals(Float.MIN_NORMAL, in.readFloat()); + assertEquals(Float.POSITIVE_INFINITY, in.readFloat()); + assertEquals(Float.NEGATIVE_INFINITY, in.readFloat()); + assertEquals(Float.NaN, in.readFloat()); + assertEquals(-12312312.3333333336666779, in.readDouble()); + assertEquals(123123.234, in.readDouble()); + assertEquals(Double.MAX_VALUE, in.readDouble()); + assertEquals(Double.MIN_VALUE, in.readDouble()); + assertEquals(Double.MIN_NORMAL, in.readDouble()); + assertEquals(Double.NEGATIVE_INFINITY, in.readDouble()); + assertEquals(Double.POSITIVE_INFINITY, in.readDouble()); + assertEquals(Double.NaN, in.readDouble()); + assertEquals(Long.MAX_VALUE, in.readLong()); + assertEquals(Long.MIN_VALUE, in.readLong()); + assertEquals(0, in.readLong()); + assertEquals(-1, in.readLong()); + + byte[] b = new byte[3]; + + in.read(b); + + assertTrue(Arrays.equals(new byte[]{1,2,3}, b)); + + b = new byte[4]; + + in.read(b, 1, 2); + + assertTrue(Arrays.equals(new byte[]{0, 1, 2, 0}, b)); + + assertEquals("mom washes rum", in.readUTF()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopExecutorServiceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopExecutorServiceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopExecutorServiceTest.java deleted file mode 100644 index c97b6ab..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopExecutorServiceTest.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor; - -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.testframework.junits.common.*; -import org.jdk8.backport.*; - -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -/** - * - */ -public class GridHadoopExecutorServiceTest extends GridCommonAbstractTest { - /** - * @throws Exception If failed. - */ - public void testExecutesAll() throws Exception { - final GridHadoopExecutorService exec = new GridHadoopExecutorService(log, "_GRID_NAME_", 10, 5); - - for (int i = 0; i < 5; i++) { - final int loops = 5000; - int threads = 17; - - final LongAdder sum = new LongAdder(); - - multithreaded(new Callable() { - @Override public Object call() throws Exception { - for (int i = 0; i < loops; i++) { - exec.submit(new Callable() { - @Override - public Void call() throws Exception { - sum.increment(); - - return null; - } - }); - } - - return null; - } - }, threads); - - while (exec.active() != 0) { - X.println("__ active: " + exec.active()); - - Thread.sleep(200); - } - - assertEquals(threads * loops, sum.sum()); - - X.println("_ ok"); - } - - assertTrue(exec.shutdown(0)); - } - - /** - * @throws Exception If failed. - */ - public void testShutdown() throws Exception { - for (int i = 0; i < 5; i++) { - final GridHadoopExecutorService exec = new GridHadoopExecutorService(log, "_GRID_NAME_", 10, 5); - - final LongAdder sum = new LongAdder(); - - final AtomicBoolean finish = new AtomicBoolean(); - - IgniteInternalFuture fut = multithreadedAsync(new Callable() { - @Override public Object call() throws Exception { - while (!finish.get()) { - exec.submit(new Callable() { - @Override public Void call() throws Exception { - sum.increment(); - - return null; - } - }); - } - - return null; - } - }, 19); - - Thread.sleep(200); - - assertTrue(exec.shutdown(50)); - - long res = sum.sum(); - - assertTrue(res > 0); - - finish.set(true); - - fut.get(); - - assertEquals(res, sum.sum()); // Nothing was executed after shutdown. - - X.println("_ ok"); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorServiceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorServiceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorServiceTest.java new file mode 100644 index 0000000..aa50fa9 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorServiceTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor; + +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.testframework.junits.common.*; +import org.jdk8.backport.*; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +/** + * + */ +public class HadoopExecutorServiceTest extends GridCommonAbstractTest { + /** + * @throws Exception If failed. + */ + public void testExecutesAll() throws Exception { + final HadoopExecutorService exec = new HadoopExecutorService(log, "_GRID_NAME_", 10, 5); + + for (int i = 0; i < 5; i++) { + final int loops = 5000; + int threads = 17; + + final LongAdder sum = new LongAdder(); + + multithreaded(new Callable() { + @Override public Object call() throws Exception { + for (int i = 0; i < loops; i++) { + exec.submit(new Callable() { + @Override + public Void call() throws Exception { + sum.increment(); + + return null; + } + }); + } + + return null; + } + }, threads); + + while (exec.active() != 0) { + X.println("__ active: " + exec.active()); + + Thread.sleep(200); + } + + assertEquals(threads * loops, sum.sum()); + + X.println("_ ok"); + } + + assertTrue(exec.shutdown(0)); + } + + /** + * @throws Exception If failed. + */ + public void testShutdown() throws Exception { + for (int i = 0; i < 5; i++) { + final HadoopExecutorService exec = new HadoopExecutorService(log, "_GRID_NAME_", 10, 5); + + final LongAdder sum = new LongAdder(); + + final AtomicBoolean finish = new AtomicBoolean(); + + IgniteInternalFuture fut = multithreadedAsync(new Callable() { + @Override public Object call() throws Exception { + while (!finish.get()) { + exec.submit(new Callable() { + @Override public Void call() throws Exception { + sum.increment(); + + return null; + } + }); + } + + return null; + } + }, 19); + + Thread.sleep(200); + + assertTrue(exec.shutdown(50)); + + long res = sum.sum(); + + assertTrue(res > 0); + + finish.set(true); + + fut.get(); + + assertEquals(res, sum.sum()); // Nothing was executed after shutdown. + + X.println("_ ok"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopExternalTaskExecutionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopExternalTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopExternalTaskExecutionSelfTest.java deleted file mode 100644 index 52bfa98..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/GridHadoopExternalTaskExecutionSelfTest.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.io.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.mapreduce.lib.input.*; -import org.apache.hadoop.mapreduce.lib.output.*; -import org.apache.ignite.*; -import org.apache.ignite.igfs.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.typedef.*; - -import java.io.*; -import java.util.*; - -import static org.apache.ignite.internal.processors.hadoop.GridHadoopUtils.*; - -/** - * Job tracker self test. - */ -public class GridHadoopExternalTaskExecutionSelfTest extends GridHadoopAbstractSelfTest { - /** {@inheritDoc} */ - @Override protected boolean igfsEnabled() { - return true; - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - startGrids(gridCount()); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - stopAllGrids(); - } - - /** {@inheritDoc} */ - @Override public GridHadoopConfiguration hadoopConfiguration(String gridName) { - GridHadoopConfiguration cfg = super.hadoopConfiguration(gridName); - - cfg.setExternalExecution(true); - - return cfg; - } - - /** - * @throws Exception If failed. - */ - public void testSimpleTaskSubmit() throws Exception { - String testInputFile = "/test"; - - prepareTestFile(testInputFile); - - Configuration cfg = new Configuration(); - - setupFileSystems(cfg); - - Job job = Job.getInstance(cfg); - - job.setMapperClass(TestMapper.class); - job.setCombinerClass(TestReducer.class); - job.setReducerClass(TestReducer.class); - - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(IntWritable.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - job.setNumReduceTasks(1); - - FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/" + testInputFile)); - FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output")); - - job.setJarByClass(getClass()); - - IgniteInternalFuture fut = grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 1), - createJobInfo(job.getConfiguration())); - - fut.get(); - } - - /** - * @throws Exception If failed. - */ - public void testMapperException() throws Exception { - String testInputFile = "/test"; - - prepareTestFile(testInputFile); - - Configuration cfg = new Configuration(); - - setupFileSystems(cfg); - - Job job = Job.getInstance(cfg); - - job.setMapperClass(TestFailingMapper.class); - job.setCombinerClass(TestReducer.class); - job.setReducerClass(TestReducer.class); - - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(IntWritable.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - - job.setNumReduceTasks(1); - - FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/" + testInputFile)); - FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output")); - - job.setJarByClass(getClass()); - - IgniteInternalFuture fut = grid(0).hadoop().submit(new GridHadoopJobId(UUID.randomUUID(), 1), - createJobInfo(job.getConfiguration())); - - try { - fut.get(); - } - catch (IgniteCheckedException e) { - IOException exp = X.cause(e, IOException.class); - - assertNotNull(exp); - assertEquals("Test failure", exp.getMessage()); - } - } - - /** - * @param filePath File path to prepare. - * @throws Exception If failed. - */ - private void prepareTestFile(String filePath) throws Exception { - IgniteFs igfs = grid(0).fileSystem(igfsName); - - try (IgfsOutputStream out = igfs.create(new IgfsPath(filePath), true)) { - PrintWriter wr = new PrintWriter(new OutputStreamWriter(out)); - - for (int i = 0; i < 1000; i++) - wr.println("Hello, world: " + i); - - wr.flush(); - } - } - - /** - * - */ - private static class TestMapper extends Mapper { - /** One constant. */ - private IntWritable one = new IntWritable(1); - - /** Line constant. */ - private Text line = new Text("line"); - - @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { - ctx.write(line, one); - } - } - - /** - * Failing mapper. - */ - private static class TestFailingMapper extends Mapper { - @Override protected void map(Object key, Text val, Context c) throws IOException, InterruptedException { - throw new IOException("Test failure"); - } - } - - /** - * - */ - private static class TestReducer extends Reducer { - /** Line constant. */ - private Text line = new Text("line"); - - @Override protected void setup(Context ctx) throws IOException, InterruptedException { - super.setup(ctx); - } - - /** {@inheritDoc} */ - @Override protected void reduce(Text key, Iterable values, Context ctx) - throws IOException, InterruptedException { - int s = 0; - - for (IntWritable val : values) - s += val.get(); - - System.out.println(">>>> Reduced: " + s); - - ctx.write(line, new IntWritable(s)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java new file mode 100644 index 0000000..59ac445 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutionSelfTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.*; +import org.apache.hadoop.mapreduce.lib.output.*; +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.util.typedef.*; + +import java.io.*; +import java.util.*; + +import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; + +/** + * Job tracker self test. + */ +public class HadoopExternalTaskExecutionSelfTest extends HadoopAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected boolean igfsEnabled() { + return true; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrids(gridCount()); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override public HadoopConfiguration hadoopConfiguration(String gridName) { + HadoopConfiguration cfg = super.hadoopConfiguration(gridName); + + cfg.setExternalExecution(true); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testSimpleTaskSubmit() throws Exception { + String testInputFile = "/test"; + + prepareTestFile(testInputFile); + + Configuration cfg = new Configuration(); + + setupFileSystems(cfg); + + Job job = Job.getInstance(cfg); + + job.setMapperClass(TestMapper.class); + job.setCombinerClass(TestReducer.class); + job.setReducerClass(TestReducer.class); + + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(IntWritable.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + job.setNumReduceTasks(1); + + FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/" + testInputFile)); + FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output")); + + job.setJarByClass(getClass()); + + IgniteInternalFuture fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1), + createJobInfo(job.getConfiguration())); + + fut.get(); + } + + /** + * @throws Exception If failed. + */ + public void testMapperException() throws Exception { + String testInputFile = "/test"; + + prepareTestFile(testInputFile); + + Configuration cfg = new Configuration(); + + setupFileSystems(cfg); + + Job job = Job.getInstance(cfg); + + job.setMapperClass(TestFailingMapper.class); + job.setCombinerClass(TestReducer.class); + job.setReducerClass(TestReducer.class); + + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(IntWritable.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + job.setNumReduceTasks(1); + + FileInputFormat.setInputPaths(job, new Path("igfs://:" + getTestGridName(0) + "@/" + testInputFile)); + FileOutputFormat.setOutputPath(job, new Path("igfs://:" + getTestGridName(0) + "@/output")); + + job.setJarByClass(getClass()); + + IgniteInternalFuture fut = grid(0).hadoop().submit(new HadoopJobId(UUID.randomUUID(), 1), + createJobInfo(job.getConfiguration())); + + try { + fut.get(); + } + catch (IgniteCheckedException e) { + IOException exp = X.cause(e, IOException.class); + + assertNotNull(exp); + assertEquals("Test failure", exp.getMessage()); + } + } + + /** + * @param filePath File path to prepare. + * @throws Exception If failed. + */ + private void prepareTestFile(String filePath) throws Exception { + IgniteFileSystem igfs = grid(0).fileSystem(igfsName); + + try (IgfsOutputStream out = igfs.create(new IgfsPath(filePath), true)) { + PrintWriter wr = new PrintWriter(new OutputStreamWriter(out)); + + for (int i = 0; i < 1000; i++) + wr.println("Hello, world: " + i); + + wr.flush(); + } + } + + /** + * + */ + private static class TestMapper extends Mapper { + /** One constant. */ + private IntWritable one = new IntWritable(1); + + /** Line constant. */ + private Text line = new Text("line"); + + @Override protected void map(Object key, Text val, Context ctx) throws IOException, InterruptedException { + ctx.write(line, one); + } + } + + /** + * Failing mapper. + */ + private static class TestFailingMapper extends Mapper { + @Override protected void map(Object key, Text val, Context c) throws IOException, InterruptedException { + throw new IOException("Test failure"); + } + } + + /** + * + */ + private static class TestReducer extends Reducer { + /** Line constant. */ + private Text line = new Text("line"); + + @Override protected void setup(Context ctx) throws IOException, InterruptedException { + super.setup(ctx); + } + + /** {@inheritDoc} */ + @Override protected void reduce(Text key, Iterable values, Context ctx) + throws IOException, InterruptedException { + int s = 0; + + for (IntWritable val : values) + s += val.get(); + + System.out.println(">>>> Reduced: " + s); + + ctx.write(line, new IntWritable(s)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunicationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunicationSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunicationSelfTest.java deleted file mode 100644 index a725ddc..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/GridHadoopExternalCommunicationSelfTest.java +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.message.*; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.marshaller.*; -import org.apache.ignite.marshaller.optimized.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.*; - -/** - * Tests Hadoop external communication component. - */ -public class GridHadoopExternalCommunicationSelfTest extends GridCommonAbstractTest { - /** - * @throws Exception If failed. - */ - public void testSimpleMessageSendingTcp() throws Exception { - checkSimpleMessageSending(false); - } - - /** - * @throws Exception If failed. - */ - public void testSimpleMessageSendingShmem() throws Exception { - checkSimpleMessageSending(true); - } - - /** - * @throws Exception If failed. - */ - private void checkSimpleMessageSending(boolean useShmem) throws Exception { - UUID parentNodeId = UUID.randomUUID(); - - Marshaller marsh = new OptimizedMarshaller(); - - IgniteLogger log = log(); - - GridHadoopExternalCommunication[] comms = new GridHadoopExternalCommunication[4]; - - try { - String name = "grid"; - - TestHadoopListener[] lsnrs = new TestHadoopListener[4]; - - int msgs = 10; - - for (int i = 0; i < comms.length; i++) { - comms[i] = new GridHadoopExternalCommunication(parentNodeId, UUID.randomUUID(), marsh, log, - Executors.newFixedThreadPool(1), name + i); - - if (useShmem) - comms[i].setSharedMemoryPort(14000); - - lsnrs[i] = new TestHadoopListener(msgs); - - comms[i].setListener(lsnrs[i]); - - comms[i].start(); - } - - for (int r = 0; r < msgs; r++) { - for (int from = 0; from < comms.length; from++) { - for (int to = 0; to < comms.length; to++) { - if (from == to) - continue; - - comms[from].sendMessage(comms[to].localProcessDescriptor(), new TestMessage(from, to)); - } - } - } - - U.sleep(1000); - - for (TestHadoopListener lsnr : lsnrs) { - lsnr.await(3_000); - - assertEquals(String.valueOf(lsnr.messages()), msgs * (comms.length - 1), lsnr.messages().size()); - } - } - finally { - for (GridHadoopExternalCommunication comm : comms) { - if (comm != null) - comm.stop(); - } - } - } - - /** - * - */ - private static class TestHadoopListener implements GridHadoopMessageListener { - /** Received messages (array list is safe because executor has one thread). */ - private Collection msgs = new ArrayList<>(); - - /** Await latch. */ - private CountDownLatch receiveLatch; - - /** - * @param msgs Number of messages to await. - */ - private TestHadoopListener(int msgs) { - receiveLatch = new CountDownLatch(msgs); - } - - /** {@inheritDoc} */ - @Override public void onMessageReceived(GridHadoopProcessDescriptor desc, GridHadoopMessage msg) { - assert msg instanceof TestMessage; - - msgs.add((TestMessage)msg); - - receiveLatch.countDown(); - } - - /** {@inheritDoc} */ - @Override public void onConnectionLost(GridHadoopProcessDescriptor desc) { - // No-op. - } - - /** - * @return Received messages. - */ - public Collection messages() { - return msgs; - } - - /** - * @param millis Time to await. - * @throws InterruptedException If wait interrupted. - */ - public void await(int millis) throws InterruptedException { - receiveLatch.await(millis, TimeUnit.MILLISECONDS); - } - } - - /** - * - */ - private static class TestMessage implements GridHadoopMessage { - /** From index. */ - private int from; - - /** To index. */ - private int to; - - /** - * @param from From index. - * @param to To index. - */ - private TestMessage(int from, int to) { - this.from = from; - this.to = to; - } - - /** - * Required by {@link Externalizable}. - */ - public TestMessage() { - // No-op. - } - - /** - * @return From index. - */ - public int from() { - return from; - } - - /** - * @return To index. - */ - public int to() { - return to; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeInt(from); - out.writeInt(to); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - from = in.readInt(); - to = in.readInt(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java new file mode 100644 index 0000000..a21633d --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunicationSelfTest.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.message.*; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.marshaller.optimized.*; +import org.apache.ignite.testframework.junits.common.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Tests Hadoop external communication component. + */ +public class HadoopExternalCommunicationSelfTest extends GridCommonAbstractTest { + /** + * @throws Exception If failed. + */ + public void testSimpleMessageSendingTcp() throws Exception { + checkSimpleMessageSending(false); + } + + /** + * @throws Exception If failed. + */ + public void testSimpleMessageSendingShmem() throws Exception { + checkSimpleMessageSending(true); + } + + /** + * @throws Exception If failed. + */ + private void checkSimpleMessageSending(boolean useShmem) throws Exception { + UUID parentNodeId = UUID.randomUUID(); + + Marshaller marsh = new OptimizedMarshaller(); + + IgniteLogger log = log(); + + HadoopExternalCommunication[] comms = new HadoopExternalCommunication[4]; + + try { + String name = "grid"; + + TestHadoopListener[] lsnrs = new TestHadoopListener[4]; + + int msgs = 10; + + for (int i = 0; i < comms.length; i++) { + comms[i] = new HadoopExternalCommunication(parentNodeId, UUID.randomUUID(), marsh, log, + Executors.newFixedThreadPool(1), name + i); + + if (useShmem) + comms[i].setSharedMemoryPort(14000); + + lsnrs[i] = new TestHadoopListener(msgs); + + comms[i].setListener(lsnrs[i]); + + comms[i].start(); + } + + for (int r = 0; r < msgs; r++) { + for (int from = 0; from < comms.length; from++) { + for (int to = 0; to < comms.length; to++) { + if (from == to) + continue; + + comms[from].sendMessage(comms[to].localProcessDescriptor(), new TestMessage(from, to)); + } + } + } + + U.sleep(1000); + + for (TestHadoopListener lsnr : lsnrs) { + lsnr.await(3_000); + + assertEquals(String.valueOf(lsnr.messages()), msgs * (comms.length - 1), lsnr.messages().size()); + } + } + finally { + for (HadoopExternalCommunication comm : comms) { + if (comm != null) + comm.stop(); + } + } + } + + /** + * + */ + private static class TestHadoopListener implements HadoopMessageListener { + /** Received messages (array list is safe because executor has one thread). */ + private Collection msgs = new ArrayList<>(); + + /** Await latch. */ + private CountDownLatch receiveLatch; + + /** + * @param msgs Number of messages to await. + */ + private TestHadoopListener(int msgs) { + receiveLatch = new CountDownLatch(msgs); + } + + /** {@inheritDoc} */ + @Override public void onMessageReceived(HadoopProcessDescriptor desc, HadoopMessage msg) { + assert msg instanceof TestMessage; + + msgs.add((TestMessage)msg); + + receiveLatch.countDown(); + } + + /** {@inheritDoc} */ + @Override public void onConnectionLost(HadoopProcessDescriptor desc) { + // No-op. + } + + /** + * @return Received messages. + */ + public Collection messages() { + return msgs; + } + + /** + * @param millis Time to await. + * @throws InterruptedException If wait interrupted. + */ + public void await(int millis) throws InterruptedException { + receiveLatch.await(millis, TimeUnit.MILLISECONDS); + } + } + + /** + * + */ + private static class TestMessage implements HadoopMessage { + /** From index. */ + private int from; + + /** To index. */ + private int to; + + /** + * @param from From index. + * @param to To index. + */ + private TestMessage(int from, int to) { + this.from = from; + this.to = to; + } + + /** + * Required by {@link Externalizable}. + */ + public TestMessage() { + // No-op. + } + + /** + * @return From index. + */ + public int from() { + return from; + } + + /** + * @return To index. + */ + public int to() { + return to; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeInt(from); + out.writeInt(to); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + from = in.readInt(); + to = in.readInt(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java index 836cdaa..4790e63 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java @@ -50,75 +50,75 @@ public class IgniteHadoopTestSuite extends TestSuite { downloadHadoop(); downloadHive(); - GridHadoopClassLoader ldr = new GridHadoopClassLoader(null); + HadoopClassLoader ldr = new HadoopClassLoader(null); TestSuite suite = new TestSuite("Ignite Hadoop MR Test Suite"); - suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemLoopbackExternalPrimarySelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemLoopbackExternalSecondarySelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemLoopbackExternalDualSyncSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemLoopbackExternalDualAsyncSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemLoopbackEmbeddedPrimarySelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemLoopbackEmbeddedSecondarySelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemLoopbackEmbeddedDualSyncSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemLoopbackEmbeddedDualAsyncSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackExternalPrimarySelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackExternalSecondarySelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackExternalDualSyncSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackExternalDualAsyncSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackEmbeddedPrimarySelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackEmbeddedSecondarySelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackEmbeddedDualSyncSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoopbackEmbeddedDualAsyncSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemSecondaryModeSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemSecondaryModeSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemClientSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemClientSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemLoggerStateSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemLoggerSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoggerStateSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemLoggerSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemHandshakeSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemHandshakeSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoop20FileSystemLoopbackPrimarySelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopIgfs20FileSystemLoopbackPrimarySelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopDualSyncSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopDualAsyncSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopIgfsDualSyncSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopIgfsDualAsyncSelfTest.class.getName()))); suite.addTest(IgfsEventsTestSuite.suiteNoarchOnly()); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopFileSystemsTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopFileSystemsTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopValidationSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopValidationSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopDefaultMapReducePlannerSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopJobTrackerSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopDefaultMapReducePlannerSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopJobTrackerSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopHashMapSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopDataStreamSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopConcurrentHashMultimapSelftest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopHashMapSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopDataStreamSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopConcurrentHashMultimapSelftest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopSkipListSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopSkipListSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopTaskExecutionSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopTaskExecutionSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopV2JobSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopV2JobSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopSerializationWrapperSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopSplitWrapperSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopSerializationWrapperSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopSplitWrapperSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopTasksV1Test.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopTasksV2Test.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopTasksV1Test.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopTasksV2Test.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopMapReduceTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopMapReduceTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopMapReduceEmbeddedSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopMapReduceEmbeddedSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopExternalTaskExecutionSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopExternalCommunicationSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopExternalTaskExecutionSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopExternalCommunicationSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopSortingTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopSortingTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopSortingExternalTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopSortingExternalTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopGroupingTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopGroupingTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopClientProtocolSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopClientProtocolEmbeddedSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopClientProtocolSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopClientProtocolEmbeddedSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(GridHadoopCommandLineTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopCommandLineTest.class.getName()))); suite.addTest(new TestSuite(ldr.loadClass(HadoopSecondaryFileSystemConfigurationTest.class.getName()))); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java index 6055db9..87233fc 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteIgfsLinuxAndMacOSTestSuite.java @@ -36,25 +36,25 @@ public class IgniteIgfsLinuxAndMacOSTestSuite extends TestSuite { public static TestSuite suite() throws Exception { downloadHadoop(); - GridHadoopClassLoader ldr = new GridHadoopClassLoader(null); + HadoopClassLoader ldr = new HadoopClassLoader(null); TestSuite suite = new TestSuite("Ignite IGFS Test Suite For Linux And Mac OS"); suite.addTest(new TestSuite(ldr.loadClass(IgfsServerManagerIpcEndpointRegistrationOnLinuxAndMacSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemShmemExternalPrimarySelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemShmemExternalSecondarySelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemShmemExternalDualSyncSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemShmemExternalDualAsyncSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemShmemExternalPrimarySelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemShmemExternalSecondarySelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemShmemExternalDualSyncSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemShmemExternalDualAsyncSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemShmemEmbeddedPrimarySelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemShmemEmbeddedSecondarySelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemShmemEmbeddedDualSyncSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemShmemEmbeddedDualAsyncSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemShmemEmbeddedPrimarySelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemShmemEmbeddedSecondarySelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemShmemEmbeddedDualSyncSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemShmemEmbeddedDualAsyncSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoopFileSystemIpcCacheSelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(IgniteHadoopFileSystemIpcCacheSelfTest.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(IgfsHadoop20FileSystemShmemPrimarySelfTest.class.getName()))); + suite.addTest(new TestSuite(ldr.loadClass(HadoopIgfs20FileSystemShmemPrimarySelfTest.class.getName()))); suite.addTest(IgfsEventsTestSuite.suite()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java ---------------------------------------------------------------------- diff --git a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java index 31aa9e5..0f0a93f 100644 --- a/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java +++ b/modules/spring/src/main/java/org/apache/ignite/IgniteSpringBean.java @@ -254,14 +254,14 @@ public class IgniteSpringBean implements Ignite, DisposableBean, InitializingBea } /** {@inheritDoc} */ - @Override public IgniteFs fileSystem(String name) { + @Override public IgniteFileSystem fileSystem(String name) { assert g != null; return g.fileSystem(name); } /** {@inheritDoc} */ - @Override public Collection fileSystems() { + @Override public Collection fileSystems() { assert g != null; return g.fileSystems(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala ---------------------------------------------------------------------- diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala index 964de1d..f0e5eba 100644 --- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala +++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/visor.scala @@ -1520,8 +1520,8 @@ object visor extends VisorTag { val cfgs = try // Cache, IGFS, streamer and DR configurations should be excluded from daemon node config. - spring.loadConfigurations(url, "cacheConfiguration", "igfsConfiguration", "streamerConfiguration", - "drSenderHubConfiguration", "drReceiverHubConfiguration").get1() + spring.loadConfigurations(url, "cacheConfiguration", "fileSystemConfiguration", + "streamerConfiguration", "drSenderHubConfiguration", "drReceiverHubConfiguration").get1() finally { if (log4jTup != null) U.removeLog4jNoOpLogger(log4jTup) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index b561aab..9c6ef75 100644 --- a/pom.xml +++ b/pom.xml @@ -731,12 +731,12 @@ org.apache.ignite.services - Ignite File System - org.apache.ignite.igfs + File System APIs + org.apache.ignite.igfs:org.apache.ignite.igfs.mapreduce:org.apache.ignite.igfs.mapreduce.records:org.apache.ignite.igfs.secondary Hadoop Accelerator APIs - org.apache.ignite.igfs.hadoop:org.apache.ignite.igfs.hadoop.v1:org.apache.ignite.igfs.hadoop.v2:org.apache.ignite.igfs.mapreduce:org.apache.ignite.igfs.mapreduce.records:org.apache.ignite.hadoop + org.apache.ignite.hadoop:org.apache.ignite.hadoop.fs:org.apache.ignite.hadoop.fs.v1:org.apache.ignite.hadoop.fs.v2:org.apache.ignite.hadoop.mapreduce Streaming APIs @@ -926,12 +926,12 @@ org.apache.ignite.services - Ignite File System - org.apache.ignite.igfs + File System APIs + org.apache.ignite.igfs:org.apache.ignite.igfs.mapreduce:org.apache.ignite.igfs.mapreduce.records:org.apache.ignite.igfs.secondary Hadoop Accelerator APIs - org.apache.ignite.igfs.hadoop:org.apache.ignite.igfs.hadoop.v1:org.apache.ignite.igfs.hadoop.v2:org.apache.ignite.igfs.mapreduce:org.apache.ignite.igfs.mapreduce.records:org.apache.ignite.hadoop + org.apache.ignite.hadoop:org.apache.ignite.hadoop.fs:org.apache.ignite.hadoop.fs.v1:org.apache.ignite.hadoop.fs.v2:org.apache.ignite.hadoop.mapreduce Streaming APIs