ignite-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [ignite] alex-plekhanov commented on a change in pull request #5619: IGNITE-10619: CommunicationSpi support channels initial commit
Date Wed, 07 Aug 2019 16:01:46 GMT
alex-plekhanov commented on a change in pull request #5619: IGNITE-10619: CommunicationSpi
support channels initial commit
URL: https://github.com/apache/ignite/pull/5619#discussion_r311625145
 
 

 ##########
 File path: modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerFileTransmissionSelfTest.java
 ##########
 @@ -0,0 +1,951 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.Channel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.OpenOption;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.Consumer;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.FILE_SUFFIX;
+import static org.apache.ignite.internal.util.IgniteUtils.fileCount;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
+
+/**
+ * Test file transmission mamanger operations.
+ */
+public class GridIoManagerFileTransmissionSelfTest extends GridCommonAbstractTest {
+    /** Number of cache keys to generate. */
+    private static final long CACHE_SIZE = 50_000L;
+
+    /** Network timeout in ms. */
+    private static final long NET_TIMEOUT_MS = 2000L;
+
+    /** Temporary directory to store files. */
+    private static final String TEMP_FILES_DIR = "ctmp";
+
+    /** Factory to produce IO interfaces over files to transmit. */
+    private static final FileIOFactory IO_FACTORY = new RandomAccessFileIOFactory();
+
+    /** The topic to send files to. */
+    private static Object topic;
+
+    /** File filter. */
+    private static FilenameFilter fileBinFilter;
+
+    /** Locally used fileIo to interact with output file. */
+    private final FileIO[] fileIo = new FileIO[1];
+
+    /** The temporary directory to store files. */
+    private File tempStore;
+
+    /**
+     * Called before tests started.
+     */
+    @BeforeClass
+    public static void beforeAll() {
+        topic = GridTopic.TOPIC_CACHE.topic("test", 0);
+
+        fileBinFilter = new FilenameFilter() {
+            @Override public boolean accept(File dir, String name) {
+                return name.endsWith(FILE_SUFFIX);
+            }
+        };
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Before
+    public void before() throws Exception {
+        cleanPersistenceDir();
+
+        tempStore = U.resolveWorkDirectory(U.defaultWorkDirectory(), TEMP_FILES_DIR, true);
+    }
+
+    /**
+     * Called after test run.
+     */
+    @After
+    public void after() {
+        stopAllGrids();
+        U.closeQuiet(fileIo[0]);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws
Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setDataStorageConfiguration(new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                    .setPersistenceEnabled(true)
+                    .setMaxSize(500L * 1024 * 1024)))
+            .setCacheConfiguration(new CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME))
+            .setCommunicationSpi(new BlockingOpenChannelCommunicationSpi())
+            .setNetworkTimeout(NET_TIMEOUT_MS);
+    }
+
+    /**
+     * Transmit all cache partition to particular topic on the remote node.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testFileHandlerBase() throws Exception {
+        IgniteEx snd = startGrid(0);
+        IgniteEx rcv = startGrid(1);
+
+        snd.cluster().active(true);
+
+        addCacheData(snd, DEFAULT_CACHE_NAME);
+
+        awaitPartitionMapExchange();
+
+        Map<String, Long> fileSizes = new HashMap<>();
+        Map<String, Integer> fileCrcs = new HashMap<>();
+        Map<String, Serializable> fileParams = new HashMap<>();
+
+        assertTrue(snd.context().io().fileTransmissionSupported(rcv.localNode()));
+
+        rcv.context().io().addTransmissionHandler(topic, new TransmissionHandlerAdapter()
{
+            @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
+                return new File(tempStore, fileMeta.name()).getAbsolutePath();
+            }
+
+            @Override public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta
initMeta) {
+                return new Consumer<File>() {
+                    @Override public void accept(File file) {
+                        assertTrue(fileSizes.containsKey(file.getName()));
+                        // Save all params.
+                        fileParams.putAll(initMeta.params());
+                    }
+                };
+            }
+        });
+
+        File cacheDirIg0 = cacheWorkDir(snd, DEFAULT_CACHE_NAME);
+
+        File[] cacheParts = cacheDirIg0.listFiles(fileBinFilter);
+
+        for (File file : cacheParts) {
+            fileSizes.put(file.getName(), file.length());
+            fileCrcs.put(file.getName(), FastCrc.calcCrc(file));
+        }
+
+        try (GridIoManager.TransmissionSender sender = snd.context()
+            .io()
+            .openTransmissionSender(rcv.localNode().id(), topic)) {
+            // Iterate over cache partition cacheParts.
+            for (File file : cacheParts) {
+                Map<String, Serializable> params = new HashMap<>();
+
+                params.put(file.getName(), file.hashCode());
+
+                sender.send(file, params, TransmissionPolicy.FILE);
+            }
+        }
+
+        stopAllGrids();
+
+        assertEquals(fileSizes.size(), tempStore.listFiles(fileBinFilter).length);
+
+        for (File file : cacheParts) {
+            // Check received file lenghs
+            assertEquals("Received the file length is incorrect: " + file.getName(),
+                fileSizes.get(file.getName()), new Long(file.length()));
+
+            // Check received params
+            assertEquals("File additional parameters are not fully transmitted",
+                fileParams.get(file.getName()), file.hashCode());
+        }
+
+        // Check received file CRCs.
+        for (File file : tempStore.listFiles(fileBinFilter)) {
+            assertEquals("Received file CRC-32 checksum is incorrect: " + file.getName(),
+                fileCrcs.get(file.getName()), new Integer(FastCrc.calcCrc(file)));
+        }
+    }
+
+    /**
+     * @throws Exception If fails.
+     */
+    @Test(expected = IgniteCheckedException.class)
+    public void testFileHandlerFilePathThrowsEx() throws Exception {
+        final String exTestMessage = "Test exception. Handler initialization failed at onBegin.";
+
+        IgniteEx snd = startGrid(0);
+        IgniteEx rcv = startGrid(1);
+
+        snd.cluster().active(true);
+
+        File fileToSend = createFileRandomData("1Mb", 1024 * 1024);
+
+        rcv.context().io().addTransmissionHandler(topic, new DefaultTransmissionHandler(rcv,
fileToSend, tempStore) {
+            @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
+                throw new IgniteException(exTestMessage);
+            }
+
+            @Override public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta
initMeta) {
+                fail("fileHandler must never be called");
+
+                return super.fileHandler(nodeId, initMeta);
+            }
+
+            @Override public Consumer<ByteBuffer> chunkHandler(UUID nodeId, TransmissionMeta
initMeta) {
+                fail("chunkHandler must never be called");
+
+                return super.chunkHandler(nodeId, initMeta);
+            }
+
+            @Override public void onException(UUID nodeId, Throwable err) {
+                assertEquals(exTestMessage, err.getMessage());
+            }
+        });
+
+        try (GridIoManager.TransmissionSender sender = snd.context()
+            .io()
+            .openTransmissionSender(rcv.localNode().id(), topic)) {
+            sender.send(fileToSend, TransmissionPolicy.FILE);
+        }
+    }
+
+    /**
+     * @throws Exception If fails.
+     */
+    @Test(expected = IgniteCheckedException.class)
+    public void testFileHandlerOnReceiverLeft() throws Exception {
+        final int fileSizeBytes = 5 * 1024 * 1024;
+        final AtomicInteger chunksCnt = new AtomicInteger();
+
+        IgniteEx snd = startGrid(0);
+        IgniteEx rcv = startGrid(1);
+
+        snd.cluster().active(true);
+
+        File fileToSend = createFileRandomData("testFile", fileSizeBytes);
+
+        transmissionFileIoFactory(snd, new FileIOFactory() {
+            @Override public FileIO create(File file, OpenOption... modes) throws IOException
{
+                FileIO fileIo = IO_FACTORY.create(file, modes);
+
+                // Blocking writer and stopping node FileIo.
+                return new FileIODecorator(fileIo) {
+                    /** {@inheritDoc} */
+                    @Override public long transferTo(long position, long count, WritableByteChannel
target)
+                        throws IOException {
+                        // Send 5 chunks than stop the rcv.
+                        if (chunksCnt.incrementAndGet() == 5)
+                            stopGrid(rcv.name(), true);
+
+                        return super.transferTo(position, count, target);
+                    }
+                };
+            }
+        });
+
+        rcv.context().io().addTransmissionHandler(topic, new DefaultTransmissionHandler(rcv,
fileToSend, tempStore));
+
+        try (GridIoManager.TransmissionSender sender = snd.context()
+            .io()
+            .openTransmissionSender(rcv.localNode().id(), topic)) {
+            sender.send(fileToSend, TransmissionPolicy.FILE);
+        }
+    }
+
+    /**
+     * @throws Exception If fails.
+     */
+    @Test
+    public void tesFileHandlerReconnectTimeouted() throws Exception {
+        IgniteEx rcv = startGrid(1);
+        IgniteEx snd = startGrid(0);
+
+        final AtomicInteger chunksCnt = new AtomicInteger();
+        final CountDownLatch sndLatch = ((BlockingOpenChannelCommunicationSpi)snd.context()
+            .config()
+            .getCommunicationSpi()).latch;
+        final AtomicReference<Throwable> refErr = new AtomicReference<>();
+
+        snd.cluster().active(true);
+
+        File fileToSend = createFileRandomData("testFile", 5 * 1024 * 1024);
+
+        transmissionFileIoFactory(snd, new FileIOFactory() {
+            @Override public FileIO create(File file, OpenOption... modes) throws IOException
{
+                FileIO fileIo = IO_FACTORY.create(file, modes);
+
+                return new FileIODecorator(fileIo) {
+                    /** {@inheritDoc} */
+                    @Override public long transferTo(long position, long count, WritableByteChannel
target)
+                        throws IOException {
+                        if (chunksCnt.incrementAndGet() == 10) {
+                            target.close();
+
+                            ((BlockingOpenChannelCommunicationSpi)snd.context()
+                                .config()
+                                .getCommunicationSpi()).block = true;
+                        }
+
+                        return super.transferTo(position, count, target);
+                    }
+                };
+            }
+        });
+
+        rcv.context().io().addTransmissionHandler(topic, new DefaultTransmissionHandler(rcv,
fileToSend, tempStore) {
+            @Override public void onException(UUID nodeId, Throwable err) {
+                refErr.compareAndSet(null, err);
+
+                sndLatch.countDown();
+            }
+        });
+
+        try (GridIoManager.TransmissionSender sender = snd.context()
+            .io()
+            .openTransmissionSender(rcv.localNode().id(), topic)) {
+            sender.send(fileToSend, TransmissionPolicy.FILE);
+        }
+        catch (IgniteCheckedException | IOException | InterruptedException e) {
+            // Ignore err
+            U.warn(log, e);
+        }
+
+        assertNotNull("Timeout exception not occurred", refErr.get());
+        assertEquals("Type of timeout excpetion incorrect: " + refErr.get(),
+            IgniteCheckedException.class,
+            refErr.get().getClass());
+    }
+
+    /**
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testFileHandlerCleanedUpIfSenderLeft() throws Exception {
+        IgniteEx snd = startGrid(0);
+        IgniteEx rcv = startGrid(1);
+
+        snd.cluster().active(true);
+
+        File fileToSend = createFileRandomData("tempFile15Mb", 15 * 1024 * 1024);
+        File downloadTo = U.resolveWorkDirectory(tempStore.getAbsolutePath(), "download",
true);
+
+        transmissionFileIoFactory(snd, new FileIOFactory() {
+            @Override public FileIO create(File file, OpenOption... modes) throws IOException
{
+                FileIO fileIo = IO_FACTORY.create(file, modes);
+
+                return new FileIODecorator(fileIo) {
+                    /** {@inheritDoc} */
+                    @Override public long transferTo(long position, long count, WritableByteChannel
target)
+                        throws IOException {
+
+                        long transferred = super.transferTo(position, count, target);
+
+                        stopGrid(snd.name(), true);
+
+                        return transferred;
+                    }
+                };
+            }
+        });
+
+        rcv.context().io().addTransmissionHandler(topic, new DefaultTransmissionHandler(rcv,
fileToSend, tempStore){
+            /** {@inheritDoc} */
+            @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
+                return new File(downloadTo, fileMeta.name()).getAbsolutePath();
+            }
+        });
+
+        Exception err = null;
+
+        try (GridIoManager.TransmissionSender sender = snd.context()
+            .io()
+            .openTransmissionSender(rcv.localNode().id(), topic)) {
+            sender.send(fileToSend, TransmissionPolicy.FILE);
+        }
+        catch (Exception e) {
+            // Ignore node stopping exception.
+            err = e;
+        }
+
+        assertEquals(NodeStoppingException.class, err.getClass());
+        assertEquals("Uncomplete resources must be cleaned up on sender left",
+            0,
+            fileCount(downloadTo.toPath()));
+    }
+
+    /**
+     * @throws Exception If fails.
+     */
+    @Test(expected = IgniteCheckedException.class)
+    public void testFileHandlerReconnectOnReadFail() throws Exception {
+        final String chunkDownloadExMsg = "Test exception. Chunk processing error.";
+
+        IgniteEx snd = startGrid(0);
+        IgniteEx rcv = startGrid(1);
+
+        snd.cluster().active(true);
+
+        File fileToSend = createFileRandomData("testFile", 5 * 1024 * 1024);
+        final AtomicInteger readedChunks = new AtomicInteger();
+
+        transmissionFileIoFactory(rcv, new FileIOFactory() {
+            @Override public FileIO create(File file, OpenOption... modes) throws IOException
{
+                fileIo[0] = IO_FACTORY.create(file, modes);
+
+                // Blocking writer and stopping node FileIo.
+                return new FileIODecorator(fileIo[0]) {
+                    @Override public long transferFrom(ReadableByteChannel src, long position,
long count)
+                        throws IOException {
+                        // Read 4 chunks than throw an exception to emulate error processing.
+                        if (readedChunks.incrementAndGet() == 4)
+                            throw new IgniteException(chunkDownloadExMsg);
+
+                        return super.transferFrom(src, position, count);
+                    }
+                };
+            }
+        });
+
+        rcv.context().io().addTransmissionHandler(topic, new DefaultTransmissionHandler(rcv,
fileToSend, tempStore) {
+            @Override public void onException(UUID nodeId, Throwable err) {
+                assertEquals(chunkDownloadExMsg, err.getMessage());
+            }
+        });
+
+        try (GridIoManager.TransmissionSender sender = snd.context()
+            .io()
+            .openTransmissionSender(rcv.localNode().id(), topic)) {
+            sender.send(fileToSend, TransmissionPolicy.FILE);
+        }
+    }
+
+    /**
+     * @throws Exception If fails.
+     */
+    @Test(expected = IgniteCheckedException.class)
+    public void testFileHandlerSenderStoppedIfReceiverInitFail() throws Exception {
+        final int fileSizeBytes = 5 * 1024 * 1024;
+        final AtomicBoolean throwFirstTime = new AtomicBoolean();
+
+        IgniteEx snd = startGrid(0);
+        IgniteEx rcv = startGrid(1);
+
+        snd.cluster().active(true);
+
+        File fileToSend = createFileRandomData("testFile", fileSizeBytes);
+        File rcvFile = new File(tempStore, "testFile" + "_" + rcv.localNode().id());
+
+        rcv.context().io().addTransmissionHandler(topic, new DefaultTransmissionHandler(rcv,
fileToSend, tempStore) {
+            @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
+                if (throwFirstTime.compareAndSet(false, true))
+                    throw new IgniteException("Test exception. Initialization fail.");
+
+                return rcvFile.getAbsolutePath();
+            }
+        });
+
+        try (GridIoManager.TransmissionSender sender = snd.context()
+            .io()
+            .openTransmissionSender(rcv.localNode().id(), topic)) {
+            sender.send(fileToSend, TransmissionPolicy.FILE);
+        }
+
+        assertEquals(fileToSend.length(), rcvFile.length());
+        assertCrcEquals(fileToSend, rcvFile);
+    }
+
+    /**
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testFileHandlerNextWriterOpened() throws Exception {
+        final int fileSizeBytes = 5 * 1024 * 1024;
+        final AtomicBoolean networkExThrown = new AtomicBoolean();
+
+        IgniteEx snd = startGrid(0);
+        IgniteEx rcv = startGrid(1);
+
+        snd.cluster().active(true);
+
+        File fileToSend = createFileRandomData("File_5MB", fileSizeBytes);
+        File rcvFile = new File(tempStore, "File_5MB" + "_" + rcv.localNode().id());
+
+        rcv.context().io().addTransmissionHandler(topic, new DefaultTransmissionHandler(rcv,
fileToSend, tempStore) {
+            @Override public void onException(UUID nodeId, Throwable err) {
+                assertEquals("Previous session is not closed properly", IgniteCheckedException.class,
err.getClass());
+            }
+
+            @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
+                if (networkExThrown.compareAndSet(false, true))
+                    return null;
+
+                return rcvFile.getAbsolutePath();
+            }
+        });
+
+        Exception expectedErr = null;
+
+        try (GridIoManager.TransmissionSender sender = snd.context()
+            .io()
+            .openTransmissionSender(rcv.localNode().id(), topic)) {
+            sender.send(fileToSend, TransmissionPolicy.FILE);
+        }
+        catch (IgniteCheckedException e) {
+            // Expected exception.
+            expectedErr = e;
+        }
+
+        assertNotNull("Transmission must ends with an exception", expectedErr);
+
+        //Open next session and complete successfull.
+        try (GridIoManager.TransmissionSender sender = snd.context()
+            .io()
+            .openTransmissionSender(rcv.localNode().id(), topic)) {
+            sender.send(fileToSend, TransmissionPolicy.FILE);
+        }
+
+        assertEquals(fileToSend.length(), rcvFile.length());
+        assertCrcEquals(fileToSend, rcvFile);
+    }
+
+    /**
+     * @throws Exception If fails.
+     */
+    @Test(expected = IgniteException.class)
+    public void testFileHandlerSendToNullTopic() throws Exception {
+        IgniteEx snd = startGrid(0);
+        IgniteEx rcv = startGrid(1);
+
+        snd.cluster().active(true);
+
+        // Ensure topic handler is empty
 
 Review comment:
   point

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message