ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [29/47] ignite git commit: IGNITE-5267 - Moved ignite-ps module to ignite-core
Date Sun, 11 Jun 2017 20:03:55 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/6bf5ce46/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
new file mode 100644
index 0000000..be1e477
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
+import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * File input.
+ */
+public final class FileInput implements ByteBufferBackedDataInput {
+    /** */
+    private ByteBuffer buf;
+
+    /** */
+    private FileChannel ch;
+
+    /** */
+    private long pos;
+
+    /**
+     * @param ch  Channel.
+     * @param buf Buffer.
+     */
+    public FileInput(FileChannel ch, ByteBuffer buf) throws IOException {
+        assert ch != null;
+
+        this.ch = ch;
+        this.buf = buf;
+
+        pos = ch.position();
+
+        clearBuffer();
+    }
+
+    /**
+     * Clear buffer.
+     */
+    private void clearBuffer() {
+        buf.clear();
+        buf.limit(0);
+
+        assert buf.remaining() == 0; // Buffer is empty.
+    }
+
+    /**
+     * @param pos Position in bytes from file begin.
+     */
+    public void seek(long pos) throws IOException {
+        if (pos > ch.size())
+            throw new EOFException();
+
+        ch.position(pos);
+
+        this.pos = pos;
+
+        clearBuffer();
+    }
+
+    /**
+     * @return Underlying buffer.
+     */
+    @Override public ByteBuffer buffer() {
+        return buf;
+    }
+
+
+    /** {@inheritDoc} */
+    @Override public void ensure(int requested) throws IOException {
+        int available = buf.remaining();
+
+        if (available >= requested)
+            return;
+
+        if (buf.capacity() < requested)
+            throw new IOException("Requested size is greater than buffer: " + requested);
+
+        buf.compact();
+
+        do {
+            int read = ch.read(buf);
+
+            if (read == -1)
+                throw new EOFException();
+
+            available += read;
+
+            pos += read;
+        }
+        while (available < requested);
+
+        buf.flip();
+    }
+
+    /**
+     * @return Position in the stream.
+     */
+    public long position() {
+        return pos - buf.remaining();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public void readFully(@NotNull byte[] b) throws IOException {
+        ensure(b.length);
+
+        buf.get(b);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public void readFully(@NotNull byte[] b, int off, int len) throws IOException
{
+        ensure(len);
+
+        buf.get(b, off, len);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public int skipBytes(int n) throws IOException {
+        if (buf.remaining() >= n)
+            buf.position(buf.position() + n);
+        else
+            seek(pos + n);
+
+        return n;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public boolean readBoolean() throws IOException {
+        return readByte() == 1;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public byte readByte() throws IOException {
+        ensure(1);
+
+        return buf.get();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public int readUnsignedByte() throws IOException {
+        return readByte() & 0xFF;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public short readShort() throws IOException {
+        ensure(2);
+
+        return buf.getShort();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public int readUnsignedShort() throws IOException {
+        return readShort() & 0xFFFF;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public char readChar() throws IOException {
+        ensure(2);
+
+        return buf.getChar();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public int readInt() throws IOException {
+        ensure(4);
+
+        return buf.getInt();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public long readLong() throws IOException {
+        ensure(8);
+
+        return buf.getLong();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public float readFloat() throws IOException {
+        ensure(4);
+
+        return buf.getFloat();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public double readDouble() throws IOException {
+        ensure(8);
+
+        return buf.getDouble();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public String readLine() throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override public String readUTF() throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * @param skipCheck If CRC check should be skipped.
+     * @return autoclosable fileInput, after its closing crc32 will be calculated and compared
with saved one
+     */
+    public Crc32CheckingFileInput startRead(boolean skipCheck) {
+        return new Crc32CheckingFileInput(buf.position(), skipCheck);
+    }
+
+    /**
+     * Checking of CRC32.
+     */
+    public class Crc32CheckingFileInput implements ByteBufferBackedDataInput, AutoCloseable
{
+        /** */
+        private final PureJavaCrc32 crc32 = new PureJavaCrc32();
+
+        /** Last calc position. */
+        private int lastCalcPosition;
+
+        /** Skip crc check. */
+        private boolean skipCheck;
+
+        /**
+         * @param position Position.
+         */
+        public Crc32CheckingFileInput(int position, boolean skipCheck) {
+            this.lastCalcPosition = position;
+            this.skipCheck = skipCheck;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void ensure(int requested) throws IOException {
+            int available = buf.remaining();
+
+            if (available >= requested)
+                return;
+
+            updateCrc();
+
+            FileInput.this.ensure(requested);
+
+            lastCalcPosition = 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws Exception {
+            updateCrc();
+
+            int val = crc32.getValue();
+
+            int writtenCrc =  this.readInt();
+
+            if ((val ^ writtenCrc) != 0 && !skipCheck) {
+                // If it last message we will skip it (EOF will be thrown).
+                ensure(5);
+
+                throw new IgniteDataIntegrityViolationException(
+                    "val: " + val + " writtenCrc: " + writtenCrc
+                );
+            }
+        }
+
+        /**
+         *
+         */
+        private void updateCrc() {
+            if (skipCheck)
+                return;
+
+            int oldPos = buf.position();
+
+            buf.position(lastCalcPosition);
+
+            crc32.update(buf, oldPos - lastCalcPosition);
+
+            lastCalcPosition = oldPos;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int skipBytes(int n) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public void readFully(@NotNull byte[] b) throws IOException {
+            ensure(b.length);
+
+            buf.get(b);
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public void readFully(@NotNull byte[] b, int off, int len) throws IOException
{
+            ensure(len);
+
+            buf.get(b, off, len);
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public boolean readBoolean() throws IOException {
+            return readByte() == 1;
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public byte readByte() throws IOException {
+            ensure(1);
+
+            return buf.get();
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public int readUnsignedByte() throws IOException {
+            return readByte() & 0xFF;
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public short readShort() throws IOException {
+            ensure(2);
+
+            return buf.getShort();
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public int readUnsignedShort() throws IOException {
+            return readShort() & 0xFFFF;
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public char readChar() throws IOException {
+            ensure(2);
+
+            return buf.getChar();
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public int readInt() throws IOException {
+            ensure(4);
+
+            return buf.getInt();
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public long readLong() throws IOException {
+            ensure(8);
+
+            return buf.getLong();
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public float readFloat() throws IOException {
+            ensure(4);
+
+            return buf.getFloat();
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public double readDouble() throws IOException {
+            ensure(8);
+
+            return buf.getDouble();
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public String readLine() throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public String readUTF() throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public ByteBuffer buffer() {
+            return FileInput.this.buffer();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6bf5ce46/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java
new file mode 100644
index 0000000..033b1c4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * File WAL pointer.
+ */
+public class FileWALPointer implements WALPointer, Comparable<FileWALPointer> {
+    /** */
+    private final long idx;
+
+    /** */
+    private final int fileOffset;
+
+    /** Written record length */
+    private int len;
+
+    /** Force flush flag. Used in BACKGROUND WAL mode. */
+    private boolean forceFlush;
+
+    /**
+     * @param idx File timestamp index.
+     * @param fileOffset Offset in file, from the beginning.
+     * @param len Record length.
+     */
+    public FileWALPointer(long idx, int fileOffset, int len) {
+        this(idx, fileOffset, len, false);
+    }
+
+    /**
+     * @param idx File timestamp index.
+     * @param fileOffset Offset in file, from the beginning.
+     * @param len Record length.
+     * @param forceFlush Force flush flag.
+     */
+    public FileWALPointer(long idx, int fileOffset, int len, boolean forceFlush) {
+        this.idx = idx;
+        this.fileOffset = fileOffset;
+        this.len = len;
+        this.forceFlush = forceFlush;
+    }
+
+    /**
+     * @return Timestamp index.
+     */
+    public long index() {
+        return idx;
+    }
+
+    /**
+     * @return File offset.
+     */
+    public int fileOffset() {
+        return fileOffset;
+    }
+
+    /**
+     * @return Record length.
+     */
+    public int length() {
+        return len;
+    }
+
+    /**
+     * @param len Record length.
+     */
+    public void length(int len) {
+        this.len = len;
+    }
+
+    /** {@inheritDoc} */
+    @Override public WALPointer next() {
+        if (len == 0)
+            throw new IllegalStateException("Failed to calculate next WAL pointer " +
+                "(this pointer is a terminal): " + this);
+
+        // Return a terminal pointer.
+        return new FileWALPointer(idx, fileOffset + len, 0);
+    }
+
+    /**
+     * @return Force flush flag.
+     */
+    public boolean forceFlush() {
+        return forceFlush;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof FileWALPointer))
+            return false;
+
+        FileWALPointer that = (FileWALPointer)o;
+
+        return idx == that.idx && fileOffset == that.fileOffset;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int result = (int)(idx ^ (idx >>> 32));
+
+        result = 31 * result + fileOffset;
+
+        return result;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int compareTo(FileWALPointer o) {
+        int res = Long.compare(idx, o.idx);
+
+        return res == 0 ? Integer.compare(fileOffset, o.fileOffset) : res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(FileWALPointer.class, this);
+    }
+}


Mime
View raw message