ignite-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [ignite] alex-plekhanov commented on a change in pull request #6554: IGNITE-11073: Backup page store manager, initial
Date Fri, 01 Nov 2019 14:48:50 GMT
alex-plekhanov commented on a change in pull request #6554: IGNITE-11073: Backup page store
manager, initial
URL: https://github.com/apache/ignite/pull/6554#discussion_r341553046
 
 

 ##########
 File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
 ##########
 @@ -0,0 +1,1783 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.file.Paths;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicIntegerArray;
+import java.util.concurrent.atomic.LongAdder;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.zip.CRC32;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.MarshallerMappingWriter;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.communication.TransmissionHandler;
+import org.apache.ignite.internal.managers.communication.TransmissionMeta;
+import org.apache.ignite.internal.managers.communication.TransmissionPolicy;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.store.PageStore;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.CheckpointFuture;
+import org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.StorageException;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.PagesAllocationRange;
+import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionAllocationMap;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc;
+import org.apache.ignite.internal.processors.cacheobject.BinaryTypeWriter;
+import org.apache.ignite.internal.processors.marshaller.MappedName;
+import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric;
+import org.apache.ignite.internal.util.GridBusyLock;
+import org.apache.ignite.internal.util.GridIntIterator;
+import org.apache.ignite.internal.util.GridIntList;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+
+import static java.nio.file.StandardOpenOption.READ;
+import static org.apache.ignite.internal.IgniteFeatures.PERSISTENCE_CACHE_SNAPSHOT;
+import static org.apache.ignite.internal.IgniteFeatures.nodeSupports;
+import static org.apache.ignite.internal.MarshallerContextImpl.addPlatformMappings;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.MAX_PARTITION_ID;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_TEMPLATE;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheDirName;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.cacheWorkDir;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFileEx;
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionNameEx;
+import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getFlagByPartId;
+
+/** */
+public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter implements PartitionsExchangeAware
 {
+    /** File with delta pages suffix. */
+    public static final String DELTA_SUFFIX = ".delta";
+
+    /** File name template consists of delta pages. */
+    public static final String PART_DELTA_TEMPLATE = PART_FILE_TEMPLATE + DELTA_SUFFIX;
+
+    /** File name template for index delta pages. */
+    public static final String INDEX_DELTA_NAME = INDEX_FILE_NAME + DELTA_SUFFIX;
+
+    /** The reason of checkpoint start for needs of bakcup. */
+    public static final String SNAPSHOT_CP_REASON = "Wakeup for checkpoint to take snapshot
[name=%s]";
+
+    /** Default working directory for snapshot temporary files. */
+    public static final String DFLT_LOCAL_SNAPSHOT_DIRECTORY = "snapshots";
+
+    /** Default snapshot directory for loading remote snapshots. */
+    public static final String DFLT_SNAPSHOT_WORK_DIRECTORY = "snp";
+
+    /** Prefix for snapshot threads. */
+    private static final String SNAPSHOT_RUNNER_THREAD_PREFIX = "snapshot-runner";
+
+    /** Total number of thread to perform local snapshot. */
+    private static final int SNAPSHOT_THEEAD_POOL_SIZE = 4;
+
+    /** Default snapshot topic to receive snapshots from remote node. */
+    private static final Object DFLT_RMT_SNAPSHOT_TOPIC = GridTopic.TOPIC_RMT_SNAPSHOT.topic("0");
+
+    /** Cache group id parameter name for a file transmission. */
+    private static final String SNP_GRP_ID_PARAM = "grpId";
+
+    /** Cache partition id parameter name for a file transmission. */
+    private static final String SNP_PART_ID_PARAM = "partId";
+
+    /** Cache local node directory path name (e.g. db/IgniteNode0). */
+    private static final String SNP_DB_NODE_PATH_PARAM = "dbNodePath";
+
+    /** Cache directory parameter name for a file transmission. */
+    private static final String SNP_CACHE_DIR_NAME_PARAM = "cacheDirName";
+
+    /** Snapshot parameter name for a file transmission. */
+    private static final String SNP_NAME_PARAM = "snpName";
+
+    /** Map of registered cache snapshot processes and their corresponding contexts. */
+    private final ConcurrentMap<String, LocalSnapshotContext> localSnpCtxs = new ConcurrentHashMap<>();
+
+    /** Map of requested snapshot from remote node. */
+    private final ConcurrentMap<T2<UUID, String>, SnapshotTransmission> reqSnps
= new ConcurrentHashMap<>();
+
+    /** All registered page writers of all running snapshot processes. */
+    private final ConcurrentMap<GroupPartitionId, List<PageStoreSerialWriter>>
partWriters = new ConcurrentHashMap<>();
+
+    /** Lock to protect the resources is used. */
+    private final GridBusyLock busyLock = new GridBusyLock();
+
+    /** Main snapshot directory to store files. */
+    private File localSnpDir;
+
+    /** Working directory for loaded snapshots from remote nodes. */
+    private File snpWorkDir;
+
+    /** Factory to working with delta as file storage. */
+    private volatile FileIOFactory ioFactory = new RandomAccessFileIOFactory();
+
+    /** Factory to create page store for restore. */
+    private volatile BiFunction<Integer, Boolean, FilePageStoreFactory> storeFactory;
+
+    /** snapshot thread pool. */
+    private IgniteThreadPoolExecutor snpRunner;
+
+    /** Checkpoint listener to handle scheduled snapshot requests. */
+    private DbCheckpointListener cpLsnr;
+
+    /** Snapshot listener on created snapshots. */
+    private volatile SnapshotListener snpLsnr;
+
+    /** Database manager for enabled persistence. */
+    private GridCacheDatabaseSharedManager dbMgr;
+
+    /** Configured data storage page size. */
+    private int pageSize;
+
+    /**
+     * @param ctx Kernal context.
+     */
+    public IgniteSnapshotManager(GridKernalContext ctx) {
+        // No-op.
+    }
+
+    /**
+     * @param snapshotCacheDir Snapshot directory to store files.
+     * @param partId Cache partition identifier.
+     * @return A file representation.
+     */
+    public static File getPartionDeltaFile(File snapshotCacheDir, int partId) {
+        return new File(snapshotCacheDir, getPartitionDeltaFileName(partId));
+    }
+
+    /**
+     * @param partId Partitoin id.
+     * @return File name of delta partition pages.
+     */
+    public static String getPartitionDeltaFileName(int partId) {
+        assert partId <= MAX_PARTITION_ID || partId == INDEX_PARTITION;
+
+        return partId == INDEX_PARTITION ? INDEX_DELTA_NAME : String.format(PART_DELTA_TEMPLATE,
partId);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void start0() throws IgniteCheckedException {
+        super.start0();
+
+        GridKernalContext kctx = cctx.kernalContext();
+
+        if (kctx.clientNode())
+            return;
+
+        if (!CU.isPersistenceEnabled(cctx.kernalContext().config()))
+            return;
+
+        pageSize = kctx.config()
+            .getDataStorageConfiguration()
+            .getPageSize();
+
+        assert pageSize > 0;
+
+        snpRunner = new IgniteThreadPoolExecutor(
+            SNAPSHOT_RUNNER_THREAD_PREFIX,
+            cctx.igniteInstanceName(),
+            SNAPSHOT_THEEAD_POOL_SIZE,
+            SNAPSHOT_THEEAD_POOL_SIZE,
+            30_000,
+            new LinkedBlockingQueue<>(),
+            SYSTEM_POOL,
+            (t, e) -> kctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR,
e)));
+
+        assert cctx.pageStore() instanceof FilePageStoreManager;
+
+        FilePageStoreManager storeMgr = (FilePageStoreManager)cctx.pageStore();
+
+        PdsFolderSettings rslvDir = kctx.pdsFolderResolver().resolveFolders();
 
 Review comment:
   `rslvDir` is never used

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message