ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [08/13] ignite git commit: IGNITE-5558 - Added ability to read WAL in standalone mode - Fixes #2174.
Date Wed, 05 Jul 2017 04:23:06 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
new file mode 100644
index 0000000..df932e6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.reader;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridComponent;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridKernalGateway;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.MarshallerContextImpl;
+import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManager;
+import org.apache.ignite.internal.managers.collision.GridCollisionManager;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.deployment.GridDeploymentManager;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
+import org.apache.ignite.internal.managers.failover.GridFailoverManager;
+import org.apache.ignite.internal.managers.indexing.GridIndexingManager;
+import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager;
+import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
+import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
+import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
+import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
+import org.apache.ignite.internal.processors.cluster.ClusterProcessor;
+import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor;
+import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
+import org.apache.ignite.internal.processors.hadoop.HadoopHelper;
+import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter;
+import org.apache.ignite.internal.processors.igfs.IgfsHelper;
+import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter;
+import org.apache.ignite.internal.processors.job.GridJobProcessor;
+import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
+import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor;
+import org.apache.ignite.internal.processors.odbc.SqlListenerProcessor;
+import org.apache.ignite.internal.processors.platform.PlatformProcessor;
+import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
+import org.apache.ignite.internal.processors.pool.PoolProcessor;
+import org.apache.ignite.internal.processors.port.GridPortProcessor;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.processors.resource.GridResourceProcessor;
+import org.apache.ignite.internal.processors.rest.GridRestProcessor;
+import org.apache.ignite.internal.processors.schedule.IgniteScheduleProcessorAdapter;
+import org.apache.ignite.internal.processors.security.GridSecurityProcessor;
+import org.apache.ignite.internal.processors.segmentation.GridSegmentationProcessor;
+import org.apache.ignite.internal.processors.service.GridServiceProcessor;
+import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
+import org.apache.ignite.internal.processors.task.GridTaskProcessor;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
+import org.apache.ignite.internal.util.IgniteExceptionRegistry;
+import org.apache.ignite.internal.util.StripedExecutor;
+import org.apache.ignite.plugin.PluginNotFoundException;
+import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Dummy grid kernal context
+ */
+public class StandaloneGridKernalContext implements GridKernalContext {
+    private IgniteLogger log;
+
+    /**
+     * @param log Logger.
+     */
+    StandaloneGridKernalContext(IgniteLogger log) {
+        this.log = log;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<GridComponent> components() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public UUID localNodeId() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String igniteInstanceName() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteLogger log(String ctgr) {
+        return log;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteLogger log(Class<?> cls) {
+        return log;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isStopping() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridKernalGateway gateway() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteEx grid() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteConfiguration config() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridTaskProcessor task() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridAffinityProcessor affinity() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridJobProcessor job() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridTimeoutProcessor timeout() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridResourceProcessor resource() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridJobMetricsProcessor jobMetric() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheProcessor cache() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridClusterStateProcessor state() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridTaskSessionProcessor session() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridClosureProcessor closure() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridServiceProcessor service() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridPortProcessor ports() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteScheduleProcessorAdapter schedule() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridRestProcessor rest() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridSegmentationProcessor segmentation() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K, V> DataStreamProcessor<K, V> dataStream() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsProcessorAdapter igfs() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsHelper igfsHelper() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridContinuousProcessor continuous() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopProcessorAdapter hadoop() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public PoolProcessor pools() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridMarshallerMappingProcessor mapping() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopHelper hadoopHelper() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ExecutorService utilityCachePool() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteStripedThreadPoolExecutor asyncCallbackPool() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCacheObjectProcessor cacheObjects() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridQueryProcessor query() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public SqlListenerProcessor sqlListener() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgnitePluginProcessor plugins() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridDeploymentManager deploy() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridIoManager io() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridDiscoveryManager discovery() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCheckpointManager checkpoint() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridEventStorageManager event() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridFailoverManager failover() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCollisionManager collision() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridSecurityProcessor security() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridLoadBalancerManager loadBalancing() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridIndexingManager indexing() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public DataStructuresProcessor dataStructures() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void markSegmented() { }
+
+    /** {@inheritDoc} */
+    @Override public boolean segmented() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void printMemoryStats() { }
+
+    /** {@inheritDoc} */
+    @Override public boolean isDaemon() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridPerformanceSuggestions performance() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String userVersion(ClassLoader ldr) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public PluginProvider pluginProvider(String name) throws PluginNotFoundException {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <T> T createComponent(Class<T> cls) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ExecutorService getExecutorService() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ExecutorService getServiceExecutorService() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ExecutorService getSystemExecutorService() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public StripedExecutor getStripedExecutorService() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ExecutorService getManagementExecutorService() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ExecutorService getPeerClassLoadingExecutorService() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ExecutorService getIgfsExecutorService() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ExecutorService getDataStreamerExecutorService() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ExecutorService getRestExecutorService() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ExecutorService getAffinityExecutorService() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public ExecutorService getIndexingExecutorService() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ExecutorService getQueryExecutorService() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Map<String, ? extends ExecutorService> customExecutors() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ExecutorService getSchemaExecutorService() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteExceptionRegistry exceptionRegistry() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object nodeAttribute(String key) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasNodeAttribute(String key) {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Object addNodeAttribute(String key, Object val) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<String, Object> nodeAttributes() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterProcessor cluster() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public MarshallerContextImpl marshallerContext() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean clientNode() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean clientDisconnected() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public PlatformProcessor platform() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @NotNull @Override public Iterator<GridComponent> iterator() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgniteCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgniteCacheDatabaseSharedManager.java
new file mode 100644
index 0000000..85a8724
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgniteCacheDatabaseSharedManager.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.reader;
+
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+
+/**
+ * Fake implementation for publishing setter and for creation in standalone WAL reader tool
+ */
+class StandaloneIgniteCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedManager {
+    /** {@inheritDoc} */
+    @Override public void setPageSize(int pageSize) {
+        super.setPageSize(pageSize);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
new file mode 100644
index 0000000..f17c112
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.reader;
+
+import java.io.DataInput;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRecordsIterator;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentEofException;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE;
+
+/**
+ * WAL reader iterator, for creation in standalone WAL reader tool
+ * Operates over one directory, does not provide start and end boundaries
+ */
+class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Record buffer size */
+    private static final int BUF_SIZE = 2 * 1024 * 1024;
+
+    /**
+     * WAL files directory. Should already contain 'consistent ID' as subfolder.
+     * <code>null</code> value means file-by-file iteration mode
+     */
+    @Nullable
+    private File walFilesDir;
+
+    /**
+     * File descriptors remained to scan.
+     * <code>null</code> value means directory scan mode
+     */
+    @Nullable
+    private List<FileWriteAheadLogManager.FileDescriptor> walFileDescriptors;
+
+    /**
+     * True if this iterator used for work dir, false for archive.
+     * In work dir mode exceptions come from record reading are ignored (file may be not completed).
+     * Index of file is taken from file itself, not from file name
+     */
+    private boolean workDir;
+
+    /**
+     * Creates iterator in directory scan mode
+     *
+     * @param walFilesDir Wal files directory. Should already contain node consistent ID as subfolder
+     * @param log Logger.
+     * @param sharedCtx Shared context.
+     */
+    StandaloneWalRecordsIterator(
+        @NotNull final File walFilesDir,
+        @NotNull final IgniteLogger log,
+        @NotNull final GridCacheSharedContext sharedCtx) throws IgniteCheckedException {
+        super(log,
+            sharedCtx,
+            new RecordV1Serializer(sharedCtx),
+            BUF_SIZE);
+        init(walFilesDir, false, null);
+        advance();
+    }
+
+    /**
+     * Creates iterator in file-by-file iteration mode. Directory
+     *
+     * @param log Logger.
+     * @param sharedCtx Shared context.
+     * @param workDir Work directory is scanned, false - archive
+     * @param walFiles Wal files.
+     */
+    StandaloneWalRecordsIterator(
+        @NotNull final IgniteLogger log,
+        @NotNull final GridCacheSharedContext sharedCtx,
+        final boolean workDir,
+        @NotNull final File... walFiles) throws IgniteCheckedException {
+        super(log,
+            sharedCtx,
+            new RecordV1Serializer(sharedCtx),
+            BUF_SIZE);
+        this.workDir = workDir;
+        init(null, workDir, walFiles);
+        advance();
+    }
+
+    /**
+     * For directory mode sets oldest file as initial segment,
+     * for file by file mode, converts all files to descriptors and gets oldest as initial.
+     *
+     * @param walFilesDir directory for directory scan mode
+     * @param workDir work directory, only for file-by-file mode
+     * @param walFiles files for file-by-file iteration mode
+     */
+    private void init(
+        @Nullable final File walFilesDir,
+        final boolean workDir,
+        @Nullable final File[] walFiles) throws IgniteCheckedException {
+        if (walFilesDir != null) {
+            FileWriteAheadLogManager.FileDescriptor[] descs = loadFileDescriptors(walFilesDir);
+            curWalSegmIdx = !F.isEmpty(descs) ? descs[0].getIdx() : 0;
+            this.walFilesDir = walFilesDir;
+            this.workDir = false;
+        }
+        else {
+            this.workDir = workDir;
+            if (workDir)
+                walFileDescriptors = scanIndexesFromFileHeaders(walFiles);
+            else
+                walFileDescriptors = new ArrayList<>(Arrays.asList(FileWriteAheadLogManager.scan(walFiles)));
+            curWalSegmIdx = !walFileDescriptors.isEmpty() ? walFileDescriptors.get(0).getIdx() : 0;
+        }
+        curWalSegmIdx--;
+
+        if (log.isDebugEnabled())
+            log.debug("Initialized WAL cursor [curWalSegmIdx=" + curWalSegmIdx + ']');
+    }
+
+    /**
+     * This methods checks all provided files to be correct WAL segment.
+     * Header record and its position is checked. WAL position is used to deremine real index.
+     * File index from file name is ignored.
+     *
+     * @param allFiles files to scan
+     * @return list of file descriptors with checked header records, file index is set
+     * @throws IgniteCheckedException if IO error occurs
+     */
+    private List<FileWriteAheadLogManager.FileDescriptor> scanIndexesFromFileHeaders(
+        @Nullable final File[] allFiles) throws IgniteCheckedException {
+        if (allFiles == null || allFiles.length == 0)
+            return Collections.emptyList();
+
+        final List<FileWriteAheadLogManager.FileDescriptor> resultingDescs = new ArrayList<>();
+
+        for (File file : allFiles) {
+            if (file.length() < HEADER_RECORD_SIZE)
+                continue;
+
+            FileWALPointer ptr;
+
+            try (RandomAccessFile rf = new RandomAccessFile(file, "r");) {
+                final FileChannel ch = rf.getChannel();
+                final ByteBuffer buf = ByteBuffer.allocate(HEADER_RECORD_SIZE);
+
+                buf.order(ByteOrder.nativeOrder());
+
+                final DataInput in = new FileInput(ch, buf);
+                // Header record must be agnostic to the serializer version.
+                final int type = in.readUnsignedByte();
+
+                if (type == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE)
+                    throw new SegmentEofException("Reached logical end of the segment", null);
+                ptr = RecordV1Serializer.readPosition(in);
+            }
+            catch (IOException e) {
+                throw new IgniteCheckedException("Failed to scan index from file [" + file + "]", e);
+            }
+
+            resultingDescs.add(new FileWriteAheadLogManager.FileDescriptor(file, ptr.index()));
+        }
+        Collections.sort(resultingDescs);
+        return resultingDescs;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected FileWriteAheadLogManager.ReadFileHandle advanceSegment(
+        @Nullable final FileWriteAheadLogManager.ReadFileHandle curWalSegment) throws IgniteCheckedException {
+
+        if (curWalSegment != null)
+            curWalSegment.close();
+
+        curWalSegmIdx++;
+        // curHandle.workDir is false
+        final FileWriteAheadLogManager.FileDescriptor fd;
+
+        if (walFilesDir != null) {
+            fd = new FileWriteAheadLogManager.FileDescriptor(
+                new File(walFilesDir,
+                    FileWriteAheadLogManager.FileDescriptor.fileName(curWalSegmIdx)));
+        }
+        else {
+            if (walFileDescriptors.isEmpty())
+                return null; //no files to read, stop iteration
+
+            fd = walFileDescriptors.remove(0);
+        }
+
+        if (log.isDebugEnabled())
+            log.debug("Reading next file [absIdx=" + curWalSegmIdx + ", file=" + fd.getAbsolutePath() + ']');
+
+        assert fd != null;
+
+        curRec = null;
+        try {
+            return initReadHandle(fd, null);
+        }
+        catch (FileNotFoundException e) {
+            log.info("Missing WAL segment in the archive: " + e.getMessage());
+            return null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void handleRecordException(
+        @NotNull final Exception e,
+        @Nullable final FileWALPointer ptr) {
+        super.handleRecordException(e, ptr);
+        final RuntimeException ex = new RuntimeException("Record reading problem occurred at file pointer [" + ptr + "]:" + e.getMessage(), e);
+
+        ex.printStackTrace();
+        if (!workDir)
+            throw ex;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void onClose() throws IgniteCheckedException {
+        super.onClose();
+        curRec = null;
+
+        closeCurrentWalSegment();
+
+        curWalSegmIdx = Integer.MAX_VALUE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
index 0ccd3a0..0a7b3dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
@@ -103,21 +103,37 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
 
 /**
  * Record V1 serializer.
+ * Stores records in following format:
+ * <ul>
+ *     <li>Record type from {@link RecordType#ordinal()} incremented by 1</li>
+ *     <li>WAL pointer to double check consistency</li>
+ *     <li>Data</li>
+ *     <li>CRC or zero padding</li>
+ * </ul>
  */
 public class RecordV1Serializer implements RecordSerializer {
-    /** */
-    public static final int HEADER_RECORD_SIZE = /*Type*/1 + /*Pointer */12 + /*Magic*/8 + /*Version*/4 + /*CRC*/4;
+    /** Length of Type */
+    public static final int REC_TYPE_SIZE = 1;
+
+    /** Length of WAL Pointer */
+    public static final int FILE_WAL_POINTER_SIZE = 12;
+
+    /** Length of CRC value */
+    private static final int CRC_SIZE = 4;
 
     /** */
+    public static final int HEADER_RECORD_SIZE = REC_TYPE_SIZE + FILE_WAL_POINTER_SIZE + /*Magic*/8 + /*Version*/4 + CRC_SIZE;
+
+    /** Cache shared context */
     private GridCacheSharedContext cctx;
 
-    /** */
+    /** Size of page used for PageMemory regions */
     private int pageSize;
 
-    /** */
+    /** Cache object processor to reading {@link DataEntry DataEntries} */
     private IgniteCacheObjectProcessor co;
 
-    /** */
+    /** Skip CRC calculation/check flag */
     private boolean skipCrc = IgniteSystemProperties.getBoolean(IGNITE_PDS_SKIP_CRC, false);
 
     /**
@@ -658,7 +674,7 @@ public class RecordV1Serializer implements RecordSerializer {
 
             assert res != null;
 
-            res.size((int)(in0.position() - startPos + 4)); // Account for CRC which will be read afterwards.
+            res.size((int)(in0.position() - startPos + CRC_SIZE)); // Account for CRC which will be read afterwards.
 
             return res;
         }
@@ -671,12 +687,16 @@ public class RecordV1Serializer implements RecordSerializer {
     }
 
     /**
-     * @param in In.
+     * Loads record from input, does not read CRC value
+     *
+     * @param in Input to read record from
+     * @param expPtr expected WAL pointer for record. Used to validate actual position against expected from the file
+     * @throws SegmentEofException if end of WAL segment reached
      */
     private WALRecord readRecord(ByteBufferBackedDataInput in, WALPointer expPtr) throws IOException, IgniteCheckedException {
         int type = in.readUnsignedByte();
 
-        if (type == 0)
+        if (type == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE)
             throw new SegmentEofException("Reached logical end of the segment", null);
 
         FileWALPointer ptr = readPosition(in);
@@ -1212,7 +1232,7 @@ public class RecordV1Serializer implements RecordSerializer {
     /** {@inheritDoc} */
     @SuppressWarnings("CastConflictsWithInstanceof")
     @Override public int size(WALRecord record) throws IgniteCheckedException {
-        int commonFields = /* Type */1 + /* Pointer */12 + /*CRC*/4;
+        int commonFields = REC_TYPE_SIZE + FILE_WAL_POINTER_SIZE + CRC_SIZE;
 
         switch (record.type()) {
             case PAGE_RECORD:
@@ -1371,7 +1391,7 @@ public class RecordV1Serializer implements RecordSerializer {
                 return commonFields + /*cacheId*/ 4 + /*pageId*/ 8;
 
             case SWITCH_SEGMENT_RECORD:
-                return commonFields;
+                return commonFields - CRC_SIZE; //CRC is not loaded for switch segment, exception is thrown instead
 
             default:
                 throw new UnsupportedOperationException("Type: " + record.type());
@@ -1379,10 +1399,11 @@ public class RecordV1Serializer implements RecordSerializer {
     }
 
     /**
+     * Saves position, WAL pointer (requires {@link #FILE_WAL_POINTER_SIZE} bytes)
      * @param buf Byte buffer to serialize version to.
      * @param ptr File WAL pointer to write.
      */
-    private void putPosition(ByteBuffer buf, FileWALPointer ptr) {
+    public static void putPosition(ByteBuffer buf, FileWALPointer ptr) {
         buf.putLong(ptr.index());
         buf.putInt(ptr.fileOffset());
     }
@@ -1392,7 +1413,7 @@ public class RecordV1Serializer implements RecordSerializer {
      * @return Read file WAL pointer.
      * @throws IOException If failed to write.
      */
-    private FileWALPointer readPosition(DataInput in) throws IOException {
+    public static FileWALPointer readPosition(DataInput in) throws IOException {
         long idx = in.readLong();
         int fileOffset = in.readInt();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreDataStructuresTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreDataStructuresTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreDataStructuresTest.java
index 5561d95..fed8766 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreDataStructuresTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreDataStructuresTest.java
@@ -181,6 +181,8 @@ public class IgnitePersistentStoreDataStructuresTest extends GridCommonAbstractT
         for (int i = 0; i < 100; i++)
             set.add(i);
 
+        assertEquals(100, set.size());
+
         stopAllGrids();
 
         ignite = startGrids(4);

http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
index 793806e..48d8c21 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
@@ -297,7 +297,7 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest {
         final int entryCnt = 10_000;
         final int initGridCnt = 4;
 
-        final IgniteEx ig0 = (IgniteEx)startGrids(initGridCnt);
+        final Ignite ig0 = startGrids(initGridCnt);
 
         ig0.active(true);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
new file mode 100644
index 0000000..06bcf08
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.wal.reader;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteEvents;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.BinaryConfiguration;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.MemoryPolicyConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.events.WalSegmentArchivedEvent;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED;
+
+/**
+ * Test suite for WAL segments reader and event generator.
+ */
+public class IgniteWalReaderTest extends GridCommonAbstractTest {
+    /** Wal segments count */
+    private static final int WAL_SEGMENTS = 10;
+
+    /** Cache name. */
+    private static final String CACHE_NAME = "cache0";
+
+    /** Fill wal with some data before iterating. Should be true for non local run */
+    private static final boolean fillWalBeforeTest = true;
+
+    /** Delete DB dir before test. */
+    private static final boolean deleteBefore = true;
+
+    /** Delete DB dir after test. */
+    private static final boolean deleteAfter = true;
+
+    /** Dump records to logger. Should be false for non local run */
+    private static final boolean dumpRecords = false;
+
+    /** Page size to set */
+    public static final int PAGE_SIZE = 4 * 1024;
+
+    /**
+     * Field for transferring setting from test to getConfig method
+     * Archive incomplete segment after inactivity milliseconds.
+     */
+    private int archiveIncompleteSegmentAfterInactivityMs = 0;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        final IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        final CacheConfiguration<Integer, IgniteWalReaderTest.IndexedObject> ccfg = new CacheConfiguration<>(CACHE_NAME);
+
+        ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+        ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, 32));
+        ccfg.setIndexedTypes(Integer.class, IgniteWalReaderTest.IndexedObject.class);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        cfg.setIncludeEventTypes(EventType.EVT_WAL_SEGMENT_ARCHIVED);
+
+        final MemoryConfiguration dbCfg = new MemoryConfiguration();
+
+        dbCfg.setPageSize(PAGE_SIZE);
+
+        final MemoryPolicyConfiguration memPlcCfg = new MemoryPolicyConfiguration();
+
+        memPlcCfg.setName("dfltMemPlc");
+        memPlcCfg.setInitialSize(1024 * 1024 * 1024);
+        memPlcCfg.setMaxSize(1024 * 1024 * 1024);
+
+        dbCfg.setMemoryPolicies(memPlcCfg);
+        dbCfg.setDefaultMemoryPolicyName("dfltMemPlc");
+
+        cfg.setMemoryConfiguration(dbCfg);
+
+        final PersistentStoreConfiguration pCfg = new PersistentStoreConfiguration();
+        pCfg.setWalHistorySize(1);
+        pCfg.setWalSegmentSize(1024 * 1024);
+        pCfg.setWalSegments(WAL_SEGMENTS);
+        pCfg.setWalMode(WALMode.BACKGROUND);
+
+        if (archiveIncompleteSegmentAfterInactivityMs > 0)
+            pCfg.setWalAutoArchiveAfterInactivity(archiveIncompleteSegmentAfterInactivityMs);
+
+        cfg.setPersistentStoreConfiguration(pCfg);
+
+        final BinaryConfiguration binCfg = new BinaryConfiguration();
+
+        binCfg.setCompactFooter(false);
+
+        cfg.setBinaryConfiguration(binCfg);
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        stopAllGrids();
+
+        if (deleteBefore)
+            deleteWorkFiles();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        if (deleteAfter)
+            deleteWorkFiles();
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    private void deleteWorkFiles() throws IgniteCheckedException {
+        if (fillWalBeforeTest)
+            deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testFillWalAndReadRecords() throws Exception {
+        final int cacheObjectsToWrite = 10000;
+
+        if (fillWalBeforeTest) {
+            final Ignite ignite0 = startGrid("node0");
+
+            ignite0.active(true);
+
+            putDummyRecords(ignite0, cacheObjectsToWrite);
+
+            stopGrid("node0");
+        }
+
+        final File db = U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false);
+        final File wal = new File(db, "wal");
+        final File walArchive = new File(wal, "archive");
+        final String consistentId = "127_0_0_1_47500";
+        final MockWalIteratorFactory mockItFactory = new MockWalIteratorFactory(log, PAGE_SIZE, consistentId, WAL_SEGMENTS);
+        final WALIterator it = mockItFactory.iterator(wal, walArchive);
+        final int cntUsingMockIter = iterateAndCount(it);
+
+        log.info("Total records loaded " + cntUsingMockIter);
+        assert cntUsingMockIter > 0;
+        assert cntUsingMockIter > cacheObjectsToWrite;
+
+        final File walArchiveDirWithConsistentId = new File(walArchive, consistentId);
+        final File walWorkDirWithConsistentId = new File(wal, consistentId);
+
+        final IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log, PAGE_SIZE);
+        final int cntArchiveDir = iterateAndCount(factory.iteratorArchiveDirectory(walArchiveDirWithConsistentId));
+
+        log.info("Total records loaded using directory : " + cntArchiveDir);
+
+        final int cntArchiveFileByFile = iterateAndCount(
+            factory.iteratorArchiveFiles(
+                walArchiveDirWithConsistentId.listFiles(FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER)));
+
+        log.info("Total records loaded using archive directory (file-by-file): " + cntArchiveFileByFile);
+
+        assert cntArchiveFileByFile > cacheObjectsToWrite;
+        assert cntArchiveDir > cacheObjectsToWrite;
+        assert cntArchiveDir == cntArchiveFileByFile;
+        //really count2 may be less because work dir correct loading is not supported yet
+        assert cntUsingMockIter >= cntArchiveDir
+            : "Mock based reader loaded " + cntUsingMockIter + " records but standalone has loaded only " + cntArchiveDir;
+
+
+        final File[] workFiles = walWorkDirWithConsistentId.listFiles(FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER);
+        int cntWork = 0;
+
+        try (WALIterator stIt = factory.iteratorWorkFiles(workFiles)) {
+            while (stIt.hasNextX()) {
+                IgniteBiTuple<WALPointer, WALRecord> next = stIt.nextX();
+                if (dumpRecords)
+                    log.info("Work. Record: " + next.get2());
+                cntWork++;
+            }
+        }
+        log.info("Total records loaded from work: " + cntWork);
+
+        assert cntWork + cntArchiveFileByFile == cntUsingMockIter
+            : "Work iterator loaded [" + cntWork + "] " +
+            "Archive iterator loaded [" + cntArchiveFileByFile + "]; " +
+            "mock iterator [" + cntUsingMockIter + "]";
+
+    }
+
+    /**
+     * @param walIter iterator to count, will be closed
+     * @return count of records
+     * @throws IgniteCheckedException if failed to iterate
+     */
+    private int iterateAndCount(WALIterator walIter) throws IgniteCheckedException {
+        int cntUsingMockIter = 0;
+
+        try(WALIterator it = walIter) {
+            while (it.hasNextX()) {
+                IgniteBiTuple<WALPointer, WALRecord> next = it.nextX();
+                if (dumpRecords)
+                    log.info("Record: " + next.get2());
+                cntUsingMockIter++;
+            }
+        }
+        return cntUsingMockIter;
+    }
+
+    /**
+     * Tests archive completed event is fired
+     *
+     * @throws Exception if failed
+     */
+    public void testArchiveCompletedEventFired() throws Exception {
+        final AtomicBoolean evtRecorded = new AtomicBoolean();
+
+        final Ignite ignite = startGrid("node0");
+
+        ignite.active(true);
+
+        final IgniteEvents evts = ignite.events();
+
+        if (!evts.isEnabled(EVT_WAL_SEGMENT_ARCHIVED))
+            return; //nothing to test
+
+        evts.localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event e) {
+                WalSegmentArchivedEvent archComplEvt = (WalSegmentArchivedEvent)e;
+                long idx = archComplEvt.getAbsWalSegmentIdx();
+                log.info("Finished archive for segment [" + idx + ", " +
+                    archComplEvt.getArchiveFile() + "]: [" + e + "]");
+
+                evtRecorded.set(true);
+                return true;
+            }
+        }, EVT_WAL_SEGMENT_ARCHIVED);
+
+        putDummyRecords(ignite, 150);
+
+        stopGrid("node0");
+        assert evtRecorded.get();
+    }
+
+    /**
+     * Puts provided number of records to fill WAL
+     *
+     * @param ignite ignite instance
+     * @param recordsToWrite count
+     */
+    private void putDummyRecords(Ignite ignite, int recordsToWrite) {
+        IgniteCache<Object, Object> cache0 = ignite.cache(CACHE_NAME);
+
+        for (int i = 0; i < recordsToWrite; i++)
+            cache0.put(i, new IndexedObject(i));
+    }
+
+    /**
+     * Tests time out based WAL segment archiving
+     *
+     * @throws Exception if failure occurs
+     */
+    public void testArchiveIncompleteSegmentAfterInactivity() throws Exception {
+        final AtomicBoolean waitingForEvt = new AtomicBoolean();
+        final CountDownLatch archiveSegmentForInactivity = new CountDownLatch(1);
+
+        archiveIncompleteSegmentAfterInactivityMs = 1000;
+
+        final Ignite ignite = startGrid("node0");
+
+        ignite.active(true);
+
+        final IgniteEvents evts = ignite.events();
+
+        evts.localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event e) {
+                WalSegmentArchivedEvent archComplEvt = (WalSegmentArchivedEvent)e;
+                long idx = archComplEvt.getAbsWalSegmentIdx();
+                log.info("Finished archive for segment [" + idx + ", " +
+                    archComplEvt.getArchiveFile() + "]: [" + e + "]");
+
+                if (waitingForEvt.get())
+                    archiveSegmentForInactivity.countDown();
+                return true;
+            }
+        }, EVT_WAL_SEGMENT_ARCHIVED);
+
+        putDummyRecords(ignite, 100);
+        waitingForEvt.set(true); //flag for skipping regular log() and rollOver()
+
+        log.info("Wait for archiving segment for inactive grid started");
+
+        boolean recordedAfterSleep =
+            archiveSegmentForInactivity.await(archiveIncompleteSegmentAfterInactivityMs + 1001, TimeUnit.MILLISECONDS);
+
+        stopGrid("node0");
+        assert recordedAfterSleep;
+    }
+
+    /** Test object for placing into grid in this test */
+    private static class IndexedObject {
+        /** */
+        @QuerySqlField(index = true)
+        private int iVal;
+
+        /** Data filled with recognizable pattern */
+        private byte[] data;
+
+        /**
+         * @param iVal Integer value.
+         */
+        private IndexedObject(int iVal) {
+            this.iVal = iVal;
+            int sz = 40000;
+            data = new byte[sz];
+            for (int i = 0; i < sz; i++)
+                data[i] = (byte)('A' + (i % 10));
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            IndexedObject obj = (IndexedObject)o;
+
+            if (iVal != obj.iVal)
+                return false;
+            return Arrays.equals(data, obj.data);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = iVal;
+            res = 31 * res + Arrays.hashCode(data);
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(IgniteWalReaderTest.IndexedObject.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java
new file mode 100644
index 0000000..95079a0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.wal.reader;
+
+import java.io.File;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.jetbrains.annotations.Nullable;
+import org.mockito.Mockito;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+/**
+ * Mockito based WAL iterator provider
+ */
+public class MockWalIteratorFactory {
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Page size. */
+    private final int pageSize;
+
+    /** Consistent node id. */
+    private final String consistentId;
+
+    /** Segments count in work dir. */
+    private int segments;
+
+    /**
+     * Creates factory
+     * @param log Logger.
+     * @param pageSize Page size.
+     * @param consistentId Consistent id.
+     * @param segments Segments.
+     */
+    public MockWalIteratorFactory(@Nullable IgniteLogger log, int pageSize, String consistentId, int segments) {
+        this.log = log == null ? Mockito.mock(IgniteLogger.class) : log;
+        this.pageSize = pageSize;
+        this.consistentId = consistentId;
+        this.segments = segments;
+    }
+
+    /**
+     * Creates iterator
+     * @param wal WAL directory without node id
+     * @param walArchive WAL archive without node id
+     * @return iterator
+     * @throws IgniteCheckedException if IO failed
+     */
+    public WALIterator iterator(File wal, File walArchive) throws IgniteCheckedException {
+        final PersistentStoreConfiguration persistentCfg1 = Mockito.mock(PersistentStoreConfiguration.class);
+
+        when(persistentCfg1.getWalStorePath()).thenReturn(wal.getAbsolutePath());
+        when(persistentCfg1.getWalArchivePath()).thenReturn(walArchive.getAbsolutePath());
+        when(persistentCfg1.getWalSegments()).thenReturn(segments);
+        when(persistentCfg1.getTlbSize()).thenReturn(PersistentStoreConfiguration.DFLT_TLB_SIZE);
+        when(persistentCfg1.getWalRecordIteratorBufferSize()).thenReturn(PersistentStoreConfiguration.DFLT_WAL_RECORD_ITERATOR_BUFFER_SIZE);
+
+        final IgniteConfiguration cfg = Mockito.mock(IgniteConfiguration.class);
+
+        when(cfg.getPersistentStoreConfiguration()).thenReturn(persistentCfg1);
+
+        final GridKernalContext ctx = Mockito.mock(GridKernalContext.class);
+
+        when(ctx.config()).thenReturn(cfg);
+        when(ctx.clientNode()).thenReturn(false);
+
+        final GridDiscoveryManager disco = Mockito.mock(GridDiscoveryManager.class);
+
+        when(disco.consistentId()).thenReturn(consistentId);
+        when(ctx.discovery()).thenReturn(disco);
+
+        final IgniteWriteAheadLogManager mgr = new FileWriteAheadLogManager(ctx);
+        final GridCacheSharedContext sctx = Mockito.mock(GridCacheSharedContext.class);
+
+        when(sctx.kernalContext()).thenReturn(ctx);
+        when(sctx.discovery()).thenReturn(disco);
+
+        final GridCacheDatabaseSharedManager database = Mockito.mock(GridCacheDatabaseSharedManager.class);
+
+        when(database.pageSize()).thenReturn(pageSize);
+        when(sctx.database()).thenReturn(database);
+        when(sctx.logger(any(Class.class))).thenReturn(log);
+
+        mgr.start(sctx);
+
+        return mgr.replay(null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 351f52e..8018705 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -18,22 +18,18 @@
 package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
-import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsAtomicCacheRebalancingTest;
-import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTxCacheRebalancingTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsPageSizesTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRecoveryAfterFileCorruptionTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePersistenceMetricsSelfTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePersistentStoreDataStructuresTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPageEvictionDuringPartitionClearTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsRebalancingOnNotStableTopologyTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsTransactionsHangTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWholeClusterRestartTest;
-import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsRebalancingOnNotStableTopologyTest;
-import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsNoActualWalHistoryTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalHistoryReservationsTest;
-import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRecoveryTest;
-import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRecoveryTxLogicalRecordsTest;
 import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.IgniteDataIntegrityTests;
+import org.apache.ignite.internal.processors.cache.persistence.db.wal.reader.IgniteWalReaderTest;
 
 /**
  *
@@ -69,6 +65,7 @@ public class IgnitePdsTestSuite2 extends TestSuite {
 
         suite.addTestSuite(IgnitePersistentStoreDataStructuresTest.class);
 
+        suite.addTestSuite(IgniteWalReaderTest.class);
         return suite;
     }
 }


Mime
View raw message