ignite-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [ignite] alex-plekhanov commented on a change in pull request #5619: IGNITE-10619: CommunicationSpi support channels initial commit
Date Wed, 07 Aug 2019 16:01:45 GMT
alex-plekhanov commented on a change in pull request #5619: IGNITE-10619: CommunicationSpi
support channels initial commit
URL: https://github.com/apache/ignite/pull/5619#discussion_r311623821
 
 

 ##########
 File path: modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerFileTransmissionSelfTest.java
 ##########
 @@ -0,0 +1,951 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.Channel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.OpenOption;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.function.Consumer;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.FILE_SUFFIX;
+import static org.apache.ignite.internal.util.IgniteUtils.fileCount;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
+
+/**
+ * Test file transmission mamanger operations.
+ */
+public class GridIoManagerFileTransmissionSelfTest extends GridCommonAbstractTest {
+    /** Number of cache keys to generate. */
+    private static final long CACHE_SIZE = 50_000L;
+
+    /** Network timeout in ms. */
+    private static final long NET_TIMEOUT_MS = 2000L;
+
+    /** Temporary directory to store files. */
+    private static final String TEMP_FILES_DIR = "ctmp";
+
+    /** Factory to produce IO interfaces over files to transmit. */
+    private static final FileIOFactory IO_FACTORY = new RandomAccessFileIOFactory();
+
+    /** The topic to send files to. */
+    private static Object topic;
+
+    /** File filter. */
+    private static FilenameFilter fileBinFilter;
+
+    /** Locally used fileIo to interact with output file. */
+    private final FileIO[] fileIo = new FileIO[1];
+
+    /** The temporary directory to store files. */
+    private File tempStore;
+
+    /**
+     * Called before tests started.
+     */
+    @BeforeClass
+    public static void beforeAll() {
+        topic = GridTopic.TOPIC_CACHE.topic("test", 0);
+
+        fileBinFilter = new FilenameFilter() {
+            @Override public boolean accept(File dir, String name) {
+                return name.endsWith(FILE_SUFFIX);
+            }
+        };
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Before
+    public void before() throws Exception {
+        cleanPersistenceDir();
+
+        tempStore = U.resolveWorkDirectory(U.defaultWorkDirectory(), TEMP_FILES_DIR, true);
+    }
+
+    /**
+     * Called after test run.
+     */
+    @After
+    public void after() {
+        stopAllGrids();
+        U.closeQuiet(fileIo[0]);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws
Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setDataStorageConfiguration(new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                    .setPersistenceEnabled(true)
+                    .setMaxSize(500L * 1024 * 1024)))
+            .setCacheConfiguration(new CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME))
+            .setCommunicationSpi(new BlockingOpenChannelCommunicationSpi())
+            .setNetworkTimeout(NET_TIMEOUT_MS);
+    }
+
+    /**
+     * Transmit all cache partition to particular topic on the remote node.
+     *
+     * @throws Exception If fails.
+     */
+    @Test
+    public void testFileHandlerBase() throws Exception {
+        IgniteEx snd = startGrid(0);
+        IgniteEx rcv = startGrid(1);
+
+        snd.cluster().active(true);
+
+        addCacheData(snd, DEFAULT_CACHE_NAME);
+
+        awaitPartitionMapExchange();
+
+        Map<String, Long> fileSizes = new HashMap<>();
+        Map<String, Integer> fileCrcs = new HashMap<>();
+        Map<String, Serializable> fileParams = new HashMap<>();
+
+        assertTrue(snd.context().io().fileTransmissionSupported(rcv.localNode()));
+
+        rcv.context().io().addTransmissionHandler(topic, new TransmissionHandlerAdapter()
{
+            @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) {
+                return new File(tempStore, fileMeta.name()).getAbsolutePath();
+            }
+
+            @Override public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta
initMeta) {
+                return new Consumer<File>() {
+                    @Override public void accept(File file) {
+                        assertTrue(fileSizes.containsKey(file.getName()));
+                        // Save all params.
+                        fileParams.putAll(initMeta.params());
+                    }
+                };
+            }
+        });
+
+        File cacheDirIg0 = cacheWorkDir(snd, DEFAULT_CACHE_NAME);
+
+        File[] cacheParts = cacheDirIg0.listFiles(fileBinFilter);
+
+        for (File file : cacheParts) {
+            fileSizes.put(file.getName(), file.length());
+            fileCrcs.put(file.getName(), FastCrc.calcCrc(file));
+        }
+
+        try (GridIoManager.TransmissionSender sender = snd.context()
+            .io()
+            .openTransmissionSender(rcv.localNode().id(), topic)) {
+            // Iterate over cache partition cacheParts.
+            for (File file : cacheParts) {
+                Map<String, Serializable> params = new HashMap<>();
+
+                params.put(file.getName(), file.hashCode());
+
+                sender.send(file, params, TransmissionPolicy.FILE);
+            }
+        }
+
+        stopAllGrids();
+
+        assertEquals(fileSizes.size(), tempStore.listFiles(fileBinFilter).length);
+
+        for (File file : cacheParts) {
+            // Check received file lenghs
 
 Review comment:
   point

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message