jackrabbit-oak-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ju...@apache.org
Subject svn commit: r1583839 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/segment/ main/java/org/apache/jackrabbit/oak/plugins/segment/file/ test/java/org/apache/jackrabbit/oak/plugins/segment/file/
Date Wed, 02 Apr 2014 00:39:34 GMT
Author: jukka
Date: Wed Apr  2 00:39:33 2014
New Revision: 1583839

URL: http://svn.apache.org/r1583839
Log:
OAK-631: SegmentMK: Implement garbage collection

Split TarFile into TarReader/Writer

Added:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarWriter.java   (with props)
Removed:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/MappedAccess.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/RandomAccess.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileAccess.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarEntry.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFileTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java?rev=1583839&r1=1583838&r2=1583839&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java Wed Apr  2 00:39:33 2014
@@ -124,6 +124,10 @@ public class Segment {
         this.data = checkNotNull(data);
 
         if (id.isDataSegmentId()) {
+            checkState(data.get(0) == '0'
+                    && data.get(1) == 'a'
+                    && data.get(2) == 'K'
+                    && data.get(3) == '\n');
             this.refids = new SegmentId[getRefCount()];
             refids[0] = id;
         } else {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileAccess.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileAccess.java?rev=1583839&r1=1583838&r2=1583839&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileAccess.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileAccess.java Wed Apr  2 00:39:33 2014
@@ -16,22 +16,130 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment.file;
 
+import static com.google.common.base.Preconditions.checkState;
+import static java.nio.channels.FileChannel.MapMode.READ_ONLY;
+
+import java.io.File;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.util.zip.CRC32;
 
-interface FileAccess {
-
-    int length() throws IOException;
-
-    long crc32(int position, int size) throws IOException;
-
-    ByteBuffer read(int position, int length) throws IOException;
-
-    void write(int position, byte[] b, int offset, int length)
-            throws IOException;
+abstract class FileAccess {
 
-    void flush() throws IOException;
+    static FileAccess open(File file, boolean memoryMapping)
+            throws IOException {
+        RandomAccessFile access = new RandomAccessFile(file, "r");
+        if (memoryMapping) {
+            return new Mapped(access);
+        } else {
+            return new Random(access);
+        }
+    }
+
+    abstract boolean isMemoryMapped();
+
+    abstract int length() throws IOException;
+
+    abstract long crc32(int position, int size) throws IOException;
+
+    abstract ByteBuffer read(int position, int length) throws IOException;
+
+    abstract void close() throws IOException;
+
+    //-----------------------------------------------------------< private >--
+
+    private static class Mapped extends FileAccess {
+
+        private final MappedByteBuffer buffer;
+
+        Mapped(RandomAccessFile file) throws IOException {
+            try {
+                buffer = file.getChannel().map(READ_ONLY, 0, file.length());
+            } finally {
+                file.close();
+            }
+        }
+
+        @Override
+        boolean isMemoryMapped() {
+            return true;
+        }
+
+        @Override
+        public int length() {
+            return buffer.remaining();
+        }
+
+        @Override
+        public long crc32(int position, int length) {
+            ByteBuffer entry = buffer.asReadOnlyBuffer();
+            entry.position(entry.position() + position);
+
+            byte[] data = new byte[length];
+            entry.get(data);
+
+            CRC32 checksum = new CRC32();
+            checksum.update(data);
+            return checksum.getValue();
+        }
+
+        @Override
+        public ByteBuffer read(int position, int length) {
+            ByteBuffer entry = buffer.asReadOnlyBuffer();
+            entry.position(entry.position() + position);
+            entry.limit(entry.position() + length);
+            return entry.slice();
+        }
+
+        @Override
+        public void close() {
+        }
+
+    }
+
+    private static class Random extends FileAccess {
+
+        private final RandomAccessFile file;
+
+        Random(RandomAccessFile file) {
+            this.file = file;
+        }
+
+        @Override
+        boolean isMemoryMapped() {
+            return false;
+        }
+
+        @Override
+        public int length() throws IOException {
+            long length = file.length();
+            checkState(length < Integer.MAX_VALUE);
+            return (int) length;
+        }
+
+        @Override
+        public long crc32(int position, int length) throws IOException {
+            CRC32 checksum = new CRC32();
+            checksum.update(read(position, length).array());
+            return checksum.getValue();
+        }
+
+        @Override
+        public synchronized ByteBuffer read(int position, int length)
+                throws IOException {
+            ByteBuffer entry = ByteBuffer.allocate(length);
+            file.seek(position);
+            file.readFully(entry.array());
+            return entry;
+        }
+
+        @Override
+        public synchronized void close() throws IOException {
+            file.close();
+        }
 
-    void close() throws IOException;
+    }
 
 }
\ No newline at end of file

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1583839&r1=1583838&r2=1583839&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java Wed Apr  2 00:39:33 2014
@@ -20,11 +20,11 @@ import static com.google.common.base.Pre
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.collect.Lists.newArrayList;
 import static com.google.common.collect.Lists.newArrayListWithCapacity;
-import static com.google.common.collect.Lists.newCopyOnWriteArrayList;
 import static com.google.common.collect.Lists.newLinkedList;
 import static com.google.common.collect.Maps.newHashMap;
 import static com.google.common.collect.Maps.newTreeMap;
 import static java.lang.String.format;
+import static java.util.Collections.singletonMap;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 
@@ -84,7 +84,13 @@ public class FileStore implements Segmen
 
     private final boolean memoryMapping;
 
-    private final List<TarFile> files;
+    private volatile List<TarReader> readers;
+
+    private int writeNumber;
+
+    private File writeFile;
+
+    private TarWriter writer;
 
     private final RandomAccessFile journalFile;
 
@@ -147,14 +153,23 @@ public class FileStore implements Segmen
         this.memoryMapping = memoryMapping;
 
         Map<Integer, File> map = collectFiles(directory);
-        List<TarFile> list = newArrayListWithCapacity(map.size());
+        this.readers = newArrayListWithCapacity(map.size());
         Integer[] indices = map.keySet().toArray(new Integer[map.size()]);
         Arrays.sort(indices);
-        for (Integer index : indices) {
-            File file = map.get(index);
-            list.add(new TarFile(file, maxFileSize, memoryMapping));
+        for (int i = indices.length - 1; i >= 0; i--) {
+            readers.add(TarReader.open(
+                    singletonMap('a', map.get(indices[i])), memoryMapping));
         }
-        this.files = newCopyOnWriteArrayList(list);
+
+        if (indices.length > 0) {
+            this.writeNumber = indices[indices.length - 1] + 1;
+        } else {
+            this.writeNumber = 0;
+        }
+        this.writeFile = new File(
+                directory,
+                String.format(FILE_NAME_FORMAT, writeNumber, "a"));
+        this.writer = new TarWriter(writeFile);
 
         journalFile = new RandomAccessFile(
                 new File(directory, JOURNAL_FILE_NAME), "rw");
@@ -284,16 +299,7 @@ public class FileStore implements Segmen
                 tracker.getWriter().flush();
 
                 synchronized (this) {
-                    boolean success = true;
-                    for (TarFile file : files) {
-                        success = success && file.flush();
-                    }
-                    if (!success) {
-                        log.warn("Failed to sync one ore more tar files with"
-                                + " the underlying file system, possibly because of"
-                                + " http://bugs.java.com/bugdatabase/view_bug.do?bug_id=6539707."
-                                + " Will retry later.");
-                    }
+                    writer.flush();
                     journalFile.writeBytes(after + " root\n");
                     journalFile.getChannel().force(false);
                     persistedHead.set(after);
@@ -302,10 +308,15 @@ public class FileStore implements Segmen
         }
     }
 
-    public Iterable<SegmentId> getSegmentIds() {
+    public synchronized Iterable<SegmentId> getSegmentIds() {
         List<SegmentId> ids = newArrayList();
-        for (TarFile file : files) {
-            for (UUID uuid : file.getUUIDs()) {
+        for (UUID uuid : writer.getUUIDs()) {
+            ids.add(tracker.getSegmentId(
+                    uuid.getMostSignificantBits(),
+                    uuid.getLeastSignificantBits()));
+        }
+        for (TarReader reader : readers) {
+            for (UUID uuid : reader.getUUIDs()) {
                 ids.add(tracker.getSegmentId(
                         uuid.getMostSignificantBits(),
                         uuid.getLeastSignificantBits()));
@@ -346,12 +357,14 @@ public class FileStore implements Segmen
             synchronized (this) {
                 flush();
 
+                writer.close();
                 journalFile.close();
 
-                for (TarFile file : files) {
-                    file.close();
+                List<TarReader> list = readers;
+                readers = newArrayList();
+                for (TarReader reader : list) {
+                    reader.close();
                 }
-                files.clear();
 
                 System.gc(); // for any memory-mappings that are no longer used
             }
@@ -373,17 +386,14 @@ public class FileStore implements Segmen
     }
 
     private boolean containsSegment(long msb, long lsb) {
-        for (TarFile file : files.toArray(new TarFile[0])) {
-            try {
-                ByteBuffer buffer = file.readEntry(msb, lsb);
-                if (buffer != null) {
-                    return true;
-                }
-            } catch (IOException e) {
-                log.warn("Failed to access file " + file, e);
+        for (TarReader reader : readers) {
+            if (reader.containsEntry(msb, lsb)) {
+                return true;
             }
         }
-        return false;
+        synchronized (this) {
+            return writer.containsEntry(msb, lsb);
+        }
     }
 
     @Override
@@ -391,14 +401,32 @@ public class FileStore implements Segmen
         long msb = id.getMostSignificantBits();
         long lsb = id.getLeastSignificantBits();
 
-        for (TarFile file : files) {
+        for (TarReader reader : readers) {
+            try {
+                ByteBuffer buffer = reader.readEntry(msb, lsb);
+                if (buffer != null) {
+                    return new Segment(tracker, id, buffer);
+                }
+            } catch (IOException e) {
+                log.warn("Failed to read from tar file " + reader, e);
+            }
+        }
+
+        synchronized (this) {
+            ByteBuffer buffer = writer.readEntry(msb, lsb);
+            if (buffer != null) {
+                return new Segment(tracker, id, buffer);
+            }
+        }
+
+        for (TarReader reader : readers) {
             try {
-                ByteBuffer buffer = file.readEntry(msb, lsb);
+                ByteBuffer buffer = reader.readEntry(msb, lsb);
                 if (buffer != null) {
                     return new Segment(tracker, id, buffer);
                 }
             } catch (IOException e) {
-                log.warn("Failed to access file " + file, e);
+                log.warn("Failed to read from tar file " + reader, e);
             }
         }
 
@@ -409,16 +437,24 @@ public class FileStore implements Segmen
     public synchronized void writeSegment(
             SegmentId id, byte[] data, int offset, int length) {
         try {
-            UUID uuid = new UUID(
+            long size = writer.writeEntry(
                     id.getMostSignificantBits(),
-                    id.getLeastSignificantBits());
-            if (files.isEmpty() || !files.get(files.size() - 1).writeEntry(
-                    uuid, data, offset, length)) {
-                String name = format(FILE_NAME_FORMAT, files.size(), "a");
-                File file = new File(directory, name);
-                TarFile last = new TarFile(file, maxFileSize, memoryMapping);
-                checkState(last.writeEntry(uuid, data, offset, length));
-                files.add(last);
+                    id.getLeastSignificantBits(),
+                    data, offset, length);
+            if (size >= maxFileSize) {
+                writer.close();
+
+                List<TarReader> list =
+                        newArrayListWithCapacity(1 + readers.size());
+                list.add(new TarReader(writeFile, memoryMapping));
+                list.addAll(readers);
+                readers = list;
+
+                writeNumber++;
+                writeFile = new File(
+                        directory,
+                        String.format(FILE_NAME_FORMAT, writeNumber, "a"));
+                writer = new TarWriter(writeFile);
             }
         } catch (IOException e) {
             throw new RuntimeException(e);

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarEntry.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarEntry.java?rev=1583839&r1=1583838&r2=1583839&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarEntry.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarEntry.java Wed Apr  2 00:39:33 2014
@@ -20,20 +20,20 @@ import java.util.Comparator;
 
 class TarEntry {
 
-    static final Comparator<TarEntry> REVERSE_OFFSET = new Comparator<TarEntry>() {
+    static final Comparator<TarEntry> OFFSET_ORDER = new Comparator<TarEntry>() {
         @Override
         public int compare(TarEntry a, TarEntry b) {
             if (a.offset > b.offset) {
-                return -1;
-            } else if (a.offset < b.offset) {
                 return 1;
+            } else if (a.offset < b.offset) {
+                return -1;
             } else {
                 return 0;
             }
         }
     };
 
-    static final Comparator<TarEntry> IDENTIFIER = new Comparator<TarEntry>() {
+    static final Comparator<TarEntry> IDENTIFIER_ORDER = new Comparator<TarEntry>() {
         @Override
         public int compare(TarEntry a, TarEntry b) {
             if (a.msb > b.msb) {

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java?rev=1583839&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java Wed Apr  2 00:39:33 2014
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.file;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static com.google.common.collect.Maps.newLinkedHashMap;
+import static com.google.common.collect.Sets.newHashSetWithExpectedSize;
+import static org.apache.jackrabbit.oak.plugins.segment.Segment.REF_COUNT_OFFSET;
+import static org.apache.jackrabbit.oak.plugins.segment.SegmentId.isDataSegmentId;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.zip.CRC32;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class TarReader {
+
+    /** Logger instance */
+    private static final Logger log = LoggerFactory.getLogger(TarReader.class);
+
+    /** Magic byte sequence at the end of the index block. */
+    private static final int INDEX_MAGIC = TarWriter.INDEX_MAGIC;
+
+    /** Pattern of the segment entry names */
+    private static final Pattern NAME_PATTERN = Pattern.compile(
+            "([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
+            + "(\\.([0-9a-f]{8}))?");
+
+    /** The tar file block size. */
+    private static final int BLOCK_SIZE = TarWriter.BLOCK_SIZE;
+
+    private static final int getEntrySize(int size) {
+        return BLOCK_SIZE + size + TarWriter.getPaddingSize(size);
+    }
+
+    static TarReader open(Map<Character, File> files, boolean memoryMapping)
+            throws IOException {
+        Character[] generations =
+                files.keySet().toArray(new Character[files.size()]);
+        Arrays.sort(generations);
+        for (int i = generations.length - 1; i >= 0; i--) {
+            File file = files.get(generations[i]);
+            try {
+                TarReader reader = new TarReader(file, memoryMapping);
+                if (reader.index != null) {
+                    // found a generation with a valid index, drop the others
+                    for (File other : files.values()) {
+                        if (other != file) {
+                            log.info("Removing unused tar file {}", other);
+                            other.delete();
+                        }
+                    }
+                    return reader;
+                } else {
+                    reader.close();
+                }
+            } catch (IOException e) {
+                log.warn("Failed to access tar file " + file, e);
+            }
+        }
+
+        // no generation has a valid index, so recover as much as we can
+        LinkedHashMap<UUID, byte[]> entries = newLinkedHashMap();
+        for (File file : files.values()) {
+            try {
+                FileAccess access = FileAccess.open(file, memoryMapping);
+                try {
+                    recoverEntries(file, access, entries);
+                } finally {
+                    access.close();
+                }
+            } catch (IOException e) {
+                log.warn("Failed to access tar file " + file, e);
+            }
+        }
+
+        // regenerate the first generation based on the recovered data
+        File file = files.get(generations[0]);
+        File backup = new File(file.getParentFile(), file.getName() + ".bak");
+        if (backup.exists()) {
+            log.info("Removing old backup file " + backup);
+            backup.delete();
+        }
+        if (!file.renameTo(backup)) {
+            throw new IOException("Could not backup tar file " + file);
+        }
+
+        log.info("Regenerating tar file " + file);
+        TarWriter writer = new TarWriter(file);
+        for (Map.Entry<UUID, byte[]> entry : entries.entrySet()) {
+            UUID uuid = entry.getKey();
+            byte[] data = entry.getValue();
+            writer.writeEntry(
+                    uuid.getMostSignificantBits(),
+                    uuid.getLeastSignificantBits(),
+                    data, 0, data.length);
+        }
+        writer.close();
+
+        log.info("Tar file regenerated, removing backup file " + backup);
+        backup.delete();
+        for (File other : files.values()) {
+            if (other != file) {
+                log.info("Removing unused tar file {}", other);
+                other.delete();
+            }
+        }
+
+        return new TarReader(file, memoryMapping);
+    }
+
+    /**
+     * Scans through the tar file, looking for all segment entries.
+     *
+     * @return map of all segment entries in this tar file
+     * @throws IOException if the tar file could not be read
+     */
+    private static void recoverEntries(
+            File file, FileAccess access, LinkedHashMap<UUID, byte[]> entries)
+            throws IOException {
+        int position = 0;
+        int length = access.length();
+        while (position + TarWriter.BLOCK_SIZE <= length) {
+            // read the tar header block
+            ByteBuffer header = access.read(position, BLOCK_SIZE);
+            int pos = header.position();
+            String name = readString(header, 100);
+            header.position(pos + 124);
+            int size = readNumber(header, 12);
+
+            if (name.isEmpty() && size == 0) {
+                return; // no more entries in this file
+            } else if (position + BLOCK_SIZE + size > length) {
+                log.warn("Invalid entry {} in tar file {}", name, file);
+                return; // invalid entry, stop here
+            }
+
+            Matcher matcher = NAME_PATTERN.matcher(name);
+            if (matcher.matches()) {
+                UUID id = UUID.fromString(matcher.group(1));
+
+                String checksum = matcher.group(3);
+                if (checksum == null && entries.containsKey(id)) {
+                    // entry already loaded, so skip
+                } else {
+                    byte[] data = new byte[size];
+                    access.read(position + BLOCK_SIZE, size).get(data);
+
+                    if (checksum == null) {
+                        entries.put(id, data);
+                    } else {
+                        CRC32 crc = new CRC32();
+                        crc.update(data);
+                        if (crc.getValue() == Long.parseLong(checksum, 16)) {
+                            entries.put(id, data);
+                        } else {
+                            log.warn("Checksum mismatch in entry {} of tar file {}", name, file);
+                        }
+                    }
+                }
+            } else if (!name.equals(file.getName() + ".idx")) {
+                log.warn("Ignoring unexpected entry {} in tar file {}",
+                        name, file);
+            }
+
+            position += getEntrySize(size);
+        }
+    }
+
+    private final File file;
+
+    private final FileAccess access;
+
+    private final ByteBuffer index;
+
+    TarReader(File file, boolean memoryMapping) throws IOException {
+        this.file = file;
+        this.access = FileAccess.open(file, memoryMapping);
+
+        ByteBuffer index = null;
+        try {
+            index = loadAndValidateIndex();
+        } catch (IOException e) {
+            log.warn("Unable to access tar file " + file, e);
+        }
+        this.index = index;
+    }
+
+    Set<UUID> getUUIDs() {
+        Set<UUID> uuids = newHashSetWithExpectedSize(index.remaining() / 24);
+        int position = index.position();
+        while (position < index.limit()) {
+            uuids.add(new UUID(
+                    index.getLong(position),
+                    index.getLong(position + 8)));
+            position += 24;
+        }
+        return uuids;
+    }
+
+    boolean containsEntry(long msb, long lsb) {
+        return findEntry(msb, lsb) != -1;
+    }
+
+    ByteBuffer readEntry(long msb, long lsb) throws IOException {
+        int position = findEntry(msb, lsb);
+        if (position != -1) {
+            return access.read(
+                    index.getInt(position + 16),
+                    index.getInt(position + 20));
+        } else {
+            return null;
+        }
+    }
+
+    private int findEntry(long msb, long lsb) {
+        // The segment identifiers are randomly generated with uniform
+        // distribution, so we can use interpolation search to find the
+        // matching entry in the index. The average runtime is O(log log n).
+
+        int lowIndex = 0;
+        int highIndex = index.remaining() / 24 - 1;
+        float lowValue = Long.MIN_VALUE;
+        float highValue = Long.MAX_VALUE;
+        float targetValue = msb;
+
+        while (lowIndex <= highIndex) {
+            int guessIndex = lowIndex + Math.round(
+                    (highIndex - lowIndex)
+                    * (targetValue - lowValue)
+                    / (highValue - lowValue));
+            int position = index.position() + guessIndex * 24;
+            long m = index.getLong(position);
+            if (msb < m) {
+                highIndex = guessIndex - 1;
+                highValue = m;
+            } else if (msb > m) {
+                lowIndex = guessIndex + 1;
+                lowValue = m;
+            } else {
+                // getting close...
+                long l = index.getLong(position + 8);
+                if (lsb < l) {
+                    highIndex = guessIndex - 1;
+                    highValue = m;
+                } else if (lsb > l) {
+                    lowIndex = guessIndex + 1;
+                    lowValue = m;
+                } else {
+                    // found it!
+                    return position;
+                }
+            }
+        }
+
+        // not found
+        return -1;
+    }
+
+    synchronized TarReader cleanup(Set<UUID> referencedIds) throws IOException {
+        TarEntry[] sorted = new TarEntry[index.remaining() / 24];
+        int position = index.position();
+        for (int i = 0; position < index.limit(); i++) {
+            sorted[i]  = new TarEntry(
+                    index.getLong(position),
+                    index.getLong(position + 8),
+                    index.getInt(position + 16),
+                    index.getInt(position + 20));
+            position += 24;
+        }
+        Arrays.sort(sorted, TarEntry.OFFSET_ORDER);
+
+        int size = 0;
+        int count = 0;
+        for (int i = sorted.length - 1; i >= 0; i--) {
+            TarEntry entry = sorted[i];
+            UUID id = new UUID(entry.msb(), entry.lsb());
+            if (!referencedIds.remove(id)) {
+                // this segment is not referenced anywhere
+                sorted[i] = null;
+            } else {
+                size += getEntrySize(entry.size());
+                count += 1;
+
+                if (isDataSegmentId(entry.lsb())) {
+                    // this is a referenced data segment, so follow the graph
+                    ByteBuffer segment = access.read(
+                            entry.offset(),
+                            Math.min(entry.size(), 16 * 256));
+                    int pos = segment.position();
+                    int refcount = segment.get(pos + REF_COUNT_OFFSET) & 0xff;
+                    int refend = pos + 16 * (refcount + 1);
+                    for (int refpos = pos + 16; refpos < refend; refpos += 16) {
+                        referencedIds.add(new UUID(
+                                segment.getLong(refpos),
+                                segment.getLong(refpos + 8)));
+                    }
+                }
+            }
+        }
+        size += getEntrySize(24 * count + 16);
+        size += 2 * BLOCK_SIZE;
+
+        if (size >= access.length() * 3 / 4) {
+            // the space savings are not worth it at less than 25%
+            return this;
+        }
+
+        String name = file.getName();
+        int pos = name.length() - "a.tar".length();
+        char generation = name.charAt(pos);
+        if (generation == 'z') {
+            // no garbage collection after reaching generation z
+            return this;
+        }
+
+        File newFile = new File(
+                file.getParentFile(),
+                name.substring(0, pos) + (char) (generation + 1) + ".tar");
+        TarWriter writer = new TarWriter(newFile);
+        for (int i = 0; i < sorted.length; i++) {
+            TarEntry entry = sorted[i];
+            if (entry != null) {
+                byte[] data = new byte[entry.size()];
+                access.read(entry.offset(), entry.size()).get(data);
+                writer.writeEntry(
+                        entry.msb(), entry.lsb(), data, 0, entry.size());
+            }
+        }
+        writer.close();
+
+        return new TarReader(newFile, access.isMemoryMapped());
+    }
+
+    void close() throws IOException {
+        access.close();
+    }
+
+    //-----------------------------------------------------------< private >--
+
+    /**
+     * Tries to read an existing index from the tar file. The index is
+     * returned if it is found and looks valid (correct checksum, passes
+     * sanity checks).
+     *
+     * @return tar index, or {@code null} if not found or not valid
+     * @throws IOException if the tar file could not be read
+     */
+    private ByteBuffer loadAndValidateIndex() throws IOException {
+        long length = file.length();
+        if (length % BLOCK_SIZE != 0
+                || length < 6 * BLOCK_SIZE
+                || length > Integer.MAX_VALUE) {
+            log.warn("Unexpected size {} of tar file {}", length, file);
+            return null; // unexpected file size
+        }
+
+        // read the index metadata just before the two final zero blocks
+        ByteBuffer meta = access.read((int) (length - 2 * BLOCK_SIZE - 16), 16);
+        int crc32 = meta.getInt();
+        int count = meta.getInt();
+        int bytes = meta.getInt();
+        int magic = meta.getInt();
+
+        if (magic != INDEX_MAGIC) {
+            log.warn("No index found in tar file {}", file);
+            return null; // magic byte mismatch
+        }
+
+        if (count < 1 || bytes < count * 24 + 16 || bytes % BLOCK_SIZE != 0) {
+            log.warn("Invalid index metadata in tar file {}", file);
+            return null; // impossible entry and/or byte counts
+        }
+
+        ByteBuffer index = access.read(
+                (int) (length - 2 * BLOCK_SIZE - 16 - count * 24),
+                count * 24);
+        index.mark();
+
+        CRC32 checksum = new CRC32();
+        long limit = length - 2 * BLOCK_SIZE - bytes - BLOCK_SIZE;
+        long lastmsb = Long.MIN_VALUE;
+        long lastlsb = Long.MIN_VALUE;
+        byte[] entry = new byte[24];
+        for (int i = 0; i < count; i++) {
+            index.get(entry);
+            checksum.update(entry);
+
+            ByteBuffer buffer = ByteBuffer.wrap(entry);
+            long msb   = buffer.getLong();
+            long lsb   = buffer.getLong();
+            int offset = buffer.getInt();
+            int size   = buffer.getInt();
+
+            if (lastmsb > msb || (lastmsb == msb && lastlsb > lsb)) {
+                log.warn("Incorrect index ordering in tar file {}", file);
+                return null;
+            } else if (lastmsb == msb && lastlsb == lsb && i > 0) {
+                log.warn("Duplicate index entry in tar file {}", file);
+                return null;
+            } else if (offset < 0 || offset % BLOCK_SIZE != 0) {
+                log.warn("Invalid index entry offset in tar file {}", file);
+                return null;
+            } else if (size < 1 || offset + size > limit) {
+                log.warn("Invalid index entry size in tar file {}", file);
+                return null;
+            }
+
+            lastmsb = msb;
+            lastlsb = lsb;
+        }
+
+        if (crc32 != (int) checksum.getValue()) {
+            log.warn("Invalid index checksum in tar file {}", file);
+            return null; // checksum mismatch
+        }
+
+        index.reset();
+        return index;
+    }
+
+    private static String readString(ByteBuffer buffer, int fieldSize) {
+        byte[] b = new byte[fieldSize];
+        buffer.get(b);
+        int n = 0;
+        while (n < fieldSize && b[n] != 0) {
+            n++;
+        }
+        return new String(b, 0, n, UTF_8);
+    }
+
+    private static int readNumber(ByteBuffer buffer, int fieldSize) {
+        byte[] b = new byte[fieldSize];
+        buffer.get(b);
+        int number = 0;
+        for (int i = 0; i < fieldSize; i++) {
+            int digit = b[i] & 0xff;
+            if ('0' <= digit && digit <= '7') {
+                number = number * 8 + digit - '0';
+            } else {
+                break;
+            }
+        }
+        return number;
+    }
+
+    //------------------------------------------------------------< Object >--
+
+    @Override
+    public String toString() {
+        return file.toString();
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarReader.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarWriter.java?rev=1583839&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarWriter.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarWriter.java Wed Apr  2 00:39:33 2014
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment.file;
+
+import static com.google.common.base.Charsets.UTF_8;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkPositionIndexes;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Maps.newHashMap;
+import static com.google.common.collect.Sets.newHashSet;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.zip.CRC32;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class TarWriter {
+
+    /** Logger instance */
+    private static final Logger log = LoggerFactory.getLogger(TarWriter.class);
+
+    /** Magic byte sequence at the end of the index block. */
+    static final int INDEX_MAGIC =
+            ('\n' << 24) + ('0' << 16) + ('K' << 8) + '\n';
+
+    /** The tar file block size. */
+    static final int BLOCK_SIZE = 512;
+
+    private static final byte[] ZERO_BYTES = new byte[BLOCK_SIZE];
+
+    static final int getPaddingSize(int size) {
+        int remainder = size % BLOCK_SIZE;
+        if (remainder > 0) {
+            return BLOCK_SIZE - remainder;
+        } else {
+            return 0;
+        }
+    }
+
+    private final File file;
+
+    private RandomAccessFile access = null;
+
+    private final Map<UUID, TarEntry> index = newHashMap();
+
+    TarWriter(File file) {
+        this.file = file;
+    }
+
+    synchronized Set<UUID> getUUIDs() {
+        return newHashSet(index.keySet());
+    }
+
+    synchronized boolean containsEntry(long msb, long lsb) {
+        return index.containsKey(new UUID(msb, lsb));
+    }
+
+    synchronized ByteBuffer readEntry(long msb, long lsb) {
+        TarEntry entry = index.get(new UUID(msb, lsb));
+        if (entry != null) {
+            checkState(access != null); // implied by entry != null
+            try {
+                try {
+                    byte[] data = new byte[entry.size()];
+                    access.seek(entry.offset());
+                    access.readFully(data);
+                    return ByteBuffer.wrap(data);
+                } finally {
+                    access.seek(access.length());
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(
+                        "Unable to read from tar file " + file, e);
+            }
+        } else {
+            return null;
+        }
+    }
+
+    long writeEntry(
+            long msb, long lsb, byte[] data, int offset, int size)
+            throws IOException {
+        checkNotNull(data);
+        checkPositionIndexes(offset, offset + size, data.length);
+
+        UUID uuid = new UUID(msb, lsb);
+        CRC32 checksum = new CRC32();
+        checksum.update(data, offset, size);
+        String entryName = String.format("%s.%08x", uuid, checksum.getValue());
+        byte[] header = newEntryHeader(entryName, size);
+
+        log.debug("Writing segment {} to {}", uuid, file);
+        return writeEntry(uuid, header, data, offset, size);
+    }
+
+    private synchronized long writeEntry(
+            UUID uuid, byte[] header, byte[] data, int offset, int size)
+            throws IOException {
+        if (access == null) {
+            access = new RandomAccessFile(file, "rw");
+        }
+
+        access.write(header);
+        access.write(data, offset, size);
+        int padding = getPaddingSize(size);
+        if (padding > 0) {
+            access.write(ZERO_BYTES, 0, padding);
+        }
+
+        long length = access.getFilePointer();
+        checkState(length <= Integer.MAX_VALUE);
+        TarEntry entry = new TarEntry(
+                uuid.getMostSignificantBits(), uuid.getLeastSignificantBits(),
+                (int) (length - size - padding), size);
+        index.put(uuid, entry);
+
+        return length;
+    }
+
+    synchronized void flush() throws IOException {
+        if (access != null) {
+            access.getFD().sync();
+        }
+    }
+
+    synchronized void close() throws IOException {
+        if (access != null) {
+            int indexSize = index.size() * 24 + 16;
+            int padding = getPaddingSize(indexSize);
+
+            String indexName = file.getName() + ".idx";
+            byte[] header = newEntryHeader(indexName, indexSize);
+
+            ByteBuffer buffer = ByteBuffer.allocate(indexSize);
+            TarEntry[] sorted = index.values().toArray(new TarEntry[index.size()]);
+            Arrays.sort(sorted, TarEntry.IDENTIFIER_ORDER);
+            for (TarEntry entry : sorted) {
+                buffer.putLong(entry.msb());
+                buffer.putLong(entry.lsb());
+                buffer.putInt(entry.offset());
+                buffer.putInt(entry.size());
+            }
+
+            CRC32 checksum = new CRC32();
+            checksum.update(buffer.array(), 0, buffer.position());
+            buffer.putInt((int) checksum.getValue());
+            buffer.putInt(index.size());
+            buffer.putInt(padding + indexSize);
+            buffer.putInt(INDEX_MAGIC);
+
+            access.write(header);
+            if (padding > 0) {
+                // padding comes *before* the index!
+                access.write(ZERO_BYTES, 0, padding);
+            }
+            access.write(buffer.array());
+            access.write(ZERO_BYTES);
+            access.write(ZERO_BYTES);
+            access.close();
+
+            access = null;
+        }
+    }
+
+    private byte[] newEntryHeader(String name, int size) throws IOException {
+        byte[] header = new byte[BLOCK_SIZE];
+
+        // File name
+        byte[] nameBytes = name.getBytes(UTF_8);
+        System.arraycopy(
+                nameBytes, 0, header, 0, Math.min(nameBytes.length, 100));
+
+        // File mode
+        System.arraycopy(
+                String.format("%07o", 0400).getBytes(UTF_8), 0,
+                header, 100, 7);
+
+        // User's numeric user ID
+        System.arraycopy(
+                String.format("%07o", 0).getBytes(UTF_8), 0,
+                header, 108, 7);
+
+        // Group's numeric user ID
+        System.arraycopy(
+                String.format("%07o", 0).getBytes(UTF_8), 0,
+                header, 116, 7);
+
+        // File size in bytes (octal basis)
+        System.arraycopy(
+                String.format("%011o", size).getBytes(UTF_8), 0,
+                header, 124, 11);
+
+        // Last modification time in numeric Unix time format (octal)
+        long time = System.currentTimeMillis() / 1000;
+        System.arraycopy(
+                String.format("%011o", time).getBytes(UTF_8), 0,
+                header, 136, 11);
+
+        // Checksum for header record
+        System.arraycopy(
+                new byte[] { ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ' }, 0,
+                header, 148, 8);
+
+        // Type flag
+        header[156] = '0';
+
+        // Compute checksum
+        int checksum = 0;
+        for (int i = 0; i < header.length; i++) {
+            checksum += header[i] & 0xff;
+        }
+        System.arraycopy(
+                String.format("%06o", checksum).getBytes(UTF_8), 0,
+                header, 148, 6);
+        header[154] = 0;
+
+        return header;
+    }
+
+    //------------------------------------------------------------< Object >--
+
+    @Override
+    public String toString() {
+        return file.toString();
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarWriter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java?rev=1583839&r1=1583838&r2=1583839&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStoreTest.java Wed Apr  2 00:39:33 2014
@@ -46,7 +46,7 @@ public class FileStoreTest {
 
     @Test
     public void testRecovery() throws IOException {
-        FileStore store = new FileStore(directory, 1);
+        FileStore store = new FileStore(directory, 1, false);
         store.flush(); // first 1kB
 
         SegmentNodeState base = store.getHead();
@@ -61,27 +61,25 @@ public class FileStoreTest {
         store.setHead(base, builder.getNodeState());
         store.close(); // third 1kB
 
-        store = new FileStore(directory, 1);
+        store = new FileStore(directory, 1, false);
         assertEquals("b", store.getHead().getString("step"));
         store.close();
 
         RandomAccessFile file = new RandomAccessFile(
                 new File(directory, "data00000a.tar"), "rw");
-        file.seek(2048);
-        file.write(new byte[1024], 0, 1024);
+        file.setLength(2048);
         file.close();
 
-        store = new FileStore(directory, 1);
+        store = new FileStore(directory, 1, false);
         assertEquals("a", store.getHead().getString("step"));
         store.close();
 
         file = new RandomAccessFile(
                 new File(directory, "data00000a.tar"), "rw");
-        file.seek(1024);
-        file.write(new byte[1024], 0, 1024);
+        file.setLength(1024);
         file.close();
 
-        store = new FileStore(directory, 1);
+        store = new FileStore(directory, 1, false);
         assertFalse(store.getHead().hasProperty("step"));
         store.close();
     }

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFileTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFileTest.java?rev=1583839&r1=1583838&r2=1583839&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFileTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFileTest.java Wed Apr  2 00:39:33 2014
@@ -34,18 +34,7 @@ public class TarFileTest {
 
     @Before
     public void setUp() throws IOException {
-        file = File.createTempFile("TarFileTest", ".tar");
-    }
-
-    @After
-    public void tearDown() {
-        file.delete();
-    }
-
-    @Test
-    public void testOpenClose() throws IOException {
-        new TarFile(file, 10240, true).close();
-        new TarFile(file, 10240, false).close();
+        file = File.createTempFile("TarFileTest", ".tar", new File("target"));
     }
 
     @Test
@@ -55,21 +44,31 @@ public class TarFileTest {
         long lsb = id.getLeastSignificantBits();
         byte[] data = "Hello, World!".getBytes(UTF_8);
 
-        TarFile tar = new TarFile(file, 10240, false);
+        TarWriter writer = new TarWriter(file);
         try {
-            tar.writeEntry(id, data, 0, data.length);
-            assertEquals(ByteBuffer.wrap(data), tar.readEntry(msb, lsb));
+            writer.writeEntry(
+                    id.getMostSignificantBits(),
+                    id.getLeastSignificantBits(),
+                    data, 0, data.length);
+            assertEquals(ByteBuffer.wrap(data), writer.readEntry(msb, lsb));
         } finally {
-            tar.close();
+            writer.close();
         }
 
-        assertEquals(10240, file.length());
+        assertEquals(3072, file.length());
+
+        TarReader reader = new TarReader(file, false);
+        try {
+            assertEquals(ByteBuffer.wrap(data), reader.readEntry(msb, lsb));
+        } finally {
+            reader.close();
+        }
 
-        tar = new TarFile(file, 10240, false);
+        reader = new TarReader(file, false);
         try {
-            assertEquals(ByteBuffer.wrap(data), tar.readEntry(msb, lsb));
+            assertEquals(ByteBuffer.wrap(data), reader.readEntry(msb, lsb));
         } finally {
-            tar.close();
+            reader.close();
         }
     }
 



Mime
View raw message