cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [2/3] cassandra git commit: Properly handle range tombstones when reading old format sstables
Date Tue, 20 Oct 2015 13:18:39 GMT
Properly handle range tombstones when reading old format sstables


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fd5e4742
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fd5e4742
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fd5e4742

Branch: refs/heads/10360
Commit: fd5e47426941d21fefbe5d412b8bc2004deb780b
Parents: 10c65d4
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Fri Oct 2 10:39:09 2015 +0200
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Mon Oct 19 11:00:34 2015 +0200

----------------------------------------------------------------------
 .../org/apache/cassandra/db/LegacyLayout.java   |  50 ---
 .../cassandra/db/UnfilteredDeserializer.java    | 361 ++++++++++++-------
 .../columniterator/SSTableReversedIterator.java |  18 +-
 3 files changed, 253 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd5e4742/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java
index 194b6e8..6cfd5d9 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -2200,54 +2200,4 @@ public abstract class LegacyLayout
             return size;
         }
     }
-
-    public static class TombstoneTracker
-    {
-        private final CFMetaData metadata;
-        private final DeletionTime partitionDeletion;
-        private final List<LegacyRangeTombstone> openTombstones = new ArrayList<>();
-
-        public TombstoneTracker(CFMetaData metadata, DeletionTime partitionDeletion)
-        {
-            this.metadata = metadata;
-            this.partitionDeletion = partitionDeletion;
-        }
-
-        public void update(LegacyAtom atom)
-        {
-            if (atom.isCell())
-            {
-                if (openTombstones.isEmpty())
-                    return;
-
-                Iterator<LegacyRangeTombstone> iter = openTombstones.iterator();
-                while (iter.hasNext())
-                {
-                    LegacyRangeTombstone tombstone = iter.next();
-                    if (metadata.comparator.compare(atom.clustering(), tombstone.stop.bound)
>= 0)
-                        iter.remove();
-                }
-            }
-
-            LegacyRangeTombstone tombstone = atom.asRangeTombstone();
-            if (tombstone.deletionTime.supersedes(partitionDeletion) && !tombstone.isRowDeletion(metadata)
&& !tombstone.isCollectionTombstone())
-                openTombstones.add(tombstone);
-        }
-
-        public boolean isShadowed(LegacyAtom atom)
-        {
-            long timestamp = atom.isCell() ? atom.asCell().timestamp : atom.asRangeTombstone().deletionTime.markedForDeleteAt();
-
-            if (partitionDeletion.deletes(timestamp))
-                return true;
-
-            for (LegacyRangeTombstone tombstone : openTombstones)
-            {
-                if (tombstone.deletionTime.deletes(timestamp))
-                    return true;
-            }
-
-            return false;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd5e4742/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
index ef30289..6be587b 100644
--- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
+++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
+import java.util.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -218,18 +219,19 @@ public abstract class UnfilteredDeserializer
         private final boolean readAllAsDynamic;
         private boolean skipStatic;
 
-        private boolean isDone;
-        private boolean isStart = true;
-
-        private final LegacyLayout.CellGrouper grouper;
-        private LegacyLayout.LegacyAtom nextAtom;
+        private boolean inputExhausted;
 
-        private boolean staticFinished;
-        private LegacyLayout.LegacyAtom savedAtom;
+        // The next Unfiltered to return, computed by hasNext()
+        private Unfiltered next;
 
-        private final LegacyLayout.TombstoneTracker tombstoneTracker;
+        // In some condition, we don't use the last atom read from the input for the current
iteration
+        // (typically, because we need to close an open tombstone range first). In that case
we use
+        // nextAtom to save the read but still unused atom. It is then pick up by the next
iteration before
+        // reaching into the input.
+        private LegacyLayout.LegacyAtom nextAtom;
 
-        private RangeTombstoneMarker closingMarker;
+        private final LegacyLayout.CellGrouper grouper;
+        private final TombstoneTracker tombstoneTracker;
 
         private OldFormatDeserializer(CFMetaData metadata,
                                       DataInputPlus in,
@@ -240,7 +242,7 @@ public abstract class UnfilteredDeserializer
             super(metadata, in, helper);
             this.readAllAsDynamic = readAllAsDynamic;
             this.grouper = new LegacyLayout.CellGrouper(metadata, helper);
-            this.tombstoneTracker = new LegacyLayout.TombstoneTracker(metadata, partitionDeletion);
+            this.tombstoneTracker = new TombstoneTracker(partitionDeletion);
         }
 
         public void setSkipStatic()
@@ -250,165 +252,280 @@ public abstract class UnfilteredDeserializer
 
         public boolean hasNext() throws IOException
         {
-            return nextAtom != null || (!isDone && deserializeNextAtom());
-        }
-
-        private boolean deserializeNextAtom() throws IOException
-        {
-            if (staticFinished && savedAtom != null)
-            {
-                nextAtom = savedAtom;
-                savedAtom = null;
-                return true;
-            }
+            LegacyLayout.LegacyAtom atom = null;
 
-            while (true)
+            while (next == null)
             {
-                nextAtom = LegacyLayout.readLegacyAtom(metadata, in, readAllAsDynamic);
-                if (nextAtom == null)
+                // Deserialize the next Unfiltered to return, updating the 'next' field.
+                if (atom == null)
                 {
-                    isDone = true;
-                    return false;
+                    if (nextAtom == null)
+                    {
+                        atom = readNextAtom();
+                        if (atom == null)
+                            return false;
+                    }
+                    else
+                    {
+                        atom = nextAtom;
+                        nextAtom = null;
+                    }
                 }
-                else if (tombstoneTracker.isShadowed(nextAtom))
+
+                // If a range tombstone closes strictly before this atom, we need to return
that close (or boundary) marker first.
+                if (tombstoneTracker.hasClosingMarkerBefore(atom))
                 {
-                    // We don't want to return shadowed data because that would fail the
contract
-                    // of UnfilteredRowIterator. However the old format could have shadowed
data, so filter it here.
-                    nextAtom = null;
-                    continue;
+                    nextAtom = atom;
+                    next = tombstoneTracker.popClosingMarker();
+                    return true;
                 }
 
-                tombstoneTracker.update(nextAtom);
-
-                // For static compact tables, the "column_metadata" columns are supposed
to be static, but in the old
-                // format they are intermingled with other columns. We deal with that with
2 different strategy:
-                //  1) for thrift queries, we basically consider everything as a "dynamic"
cell. This is ok because
-                //     that's basically what we end up with on ThriftResultsMerger has done
its thing.
-                //  2) otherwise, we make sure to extract the "static" columns first (see
AbstractSSTableIterator.readStaticRow
-                //     and SSTableSimpleIterator.readStaticRow) as a first pass. So, when
we do a 2nd pass for dynamic columns
-                //     (which in practice we only do for compactions), we want to ignore
those extracted static columns.
-                if (skipStatic && metadata.isStaticCompactTable() && nextAtom.isCell())
+                if (isRow(atom))
                 {
-                    LegacyLayout.LegacyCell cell = nextAtom.asCell();
-                    if (cell.name.column.isStatic())
+                    // Read the full row
+                    LegacyLayout.CellGrouper grouper = atom.isStatic()
+                                                     ? LegacyLayout.CellGrouper.staticGrouper(metadata,
helper)
+                                                     : this.grouper;
+
+                    grouper.reset();
+                    grouper.addAtom(atom);
+                    while ((atom = readNextAtom()) != null)
                     {
-                        nextAtom = null;
-                        continue;
+                        // Add the atom to the grouper. If it's not an atom belonging to
the built row, we're done for this
+                        // row and should the current atom for the next iteration
+                        if (!grouper.addAtom(atom))
+                        {
+                            assert nextAtom == null;
+                            nextAtom = atom;
+                            break;
+                        }
                     }
-                }
+                    atom = null;
+                    Row row = grouper.getRow();
 
-                // We want to fetch the static row as the first thing this deserializer return.
-                // However, in practice, it's possible to have range tombstone before the
static row cells
-                // if that tombstone has an empty start. So if we do, we save it initially
so we can get
-                // to the static parts (if there is any).
-                if (isStart)
+                    // When reading old tables, we sometimes want to skip static data (due
to how staticly defined column of compact tables are
+                    // handled). So if we're asked to, ignore if it's static.
+                    if (!(skipStatic && row.isStatic()))
+                        next = row;
+                }
+                else
                 {
-                    isStart = false;
-                    if (!nextAtom.isCell())
+                    LegacyLayout.LegacyRangeTombstone tombstone = atom.asRangeTombstone();
+                    atom = null; // It's possible this tombstone doesn't generate something
to return right away. That's the case if the
+                                 // beginning of the new range is shadowed by the currently
open range for instance (but the end is not
+                                 // shadowed so that the atom wasn't skipped by readNextAtom).
In that case, we'll want to clear atom for
+                                 // the next iteration, so clear it now (we'll use the tombstone
variable in what follow anyway).
+
+                    // The sstable iterators assume that if there is one, the static row
is the first thing this deserializer will return.
+                    // However, in the old format, a range tombstone with an empty start
would sort before any static cell. So we should
+                    // detect that case and invert the return order if necessary.
+                    if (tombstone.start.bound.size() == 0)
                     {
-                        LegacyLayout.LegacyRangeTombstone tombstone = nextAtom.asRangeTombstone();
-                        if (tombstone.start.bound.size() == 0)
+                        LegacyLayout.LegacyAtom followingAtom = readNextAtom();
+                        if (followingAtom != null && followingAtom.isStatic())
+                        {
+                            // We have a both a RT starting from the beginning of the partition,
and a static row. So set
+                            // atom to the static row so it's returned next, and save the
RT for after that.
+                            atom = followingAtom;
+                            nextAtom = tombstone;
+                            continue;
+                        }
+                        else
                         {
-                            savedAtom = tombstone;
-                            nextAtom = LegacyLayout.readLegacyAtom(metadata, in, readAllAsDynamic);
-                            if (nextAtom == null)
-                            {
-                                // That was actually the only atom so use it after all
-                                nextAtom = savedAtom;
-                                savedAtom = null;
-                            }
-                            else if (!nextAtom.isStatic())
-                            {
-                                // We don't have anything static. So we do want to send first
-                                // the saved atom, so switch
-                                LegacyLayout.LegacyAtom atom = nextAtom;
-                                nextAtom = savedAtom;
-                                savedAtom = atom;
-                            }
+                            // The following atom isn't static, so return the RT normally
but save that following atom for after
+                            nextAtom = followingAtom;
                         }
                     }
+
+                    // We have an opening range. What we do depends on what already opened
range we have. 
+                    next = tombstoneTracker.openNew(tombstone);
                 }
+            }
+            // We get here is next != null so we have something to return
+            return true;
+        }
 
-                return true;
+        // Returns null if we're done
+        private LegacyLayout.LegacyAtom readNextAtom() throws IOException
+        {
+            while (!inputExhausted)
+            {
+                LegacyLayout.LegacyAtom atom = LegacyLayout.readLegacyAtom(metadata, in,
readAllAsDynamic);
+                // We don't want to return shadowed data because that would fail the contract
+                // of UnfilteredRowIterator. However the old format could have shadowed data,
so filter it here.
+                if (atom == null)
+                    inputExhausted = true;
+                else if (!tombstoneTracker.isShadowed(atom))
+                    return atom;
             }
+            return null;
         }
 
-        private void checkReady() throws IOException
+        private boolean isRow(LegacyLayout.LegacyAtom atom)
         {
-            if (nextAtom == null)
-                hasNext();
-            assert !isDone;
+            if (atom.isCell())
+                return true;
+
+            LegacyLayout.LegacyRangeTombstone tombstone = atom.asRangeTombstone();
+            return tombstone.isCollectionTombstone() || tombstone.isRowDeletion(metadata);
         }
 
         public int compareNextTo(Slice.Bound bound) throws IOException
         {
-            checkReady();
-            int cmp = metadata.comparator.compare(nextAtom.clustering(), bound);
-            if (cmp != 0 || nextAtom.isCell() || !nextIsRow())
-                return cmp;
-
-            // Comparing the clustering of the LegacyAtom to the bound work most of the time.
There is the case
-            // of LegacyRangeTombstone that are either a collectionTombstone or a rowDeletion.
In those case, their
-            // clustering will be the inclusive start of the row they are a tombstone for,
which can be equal to
-            // the slice bound. But we don't want to return equality because the LegacyTombstone
should stand for
-            // it's row and should sort accordingly. This matter particularly because SSTableIterator
will skip
-            // equal results for the start bound (see SSTableIterator.handlePreSliceData
for details).
-            return bound.isStart() ? 1 : -1;
+            assert hasNext();
+            return metadata.comparator.compare(next.clustering(), bound);
         }
 
         public boolean nextIsRow() throws IOException
         {
-            checkReady();
-            if (nextAtom.isCell())
-                return true;
-
-            LegacyLayout.LegacyRangeTombstone tombstone = nextAtom.asRangeTombstone();
-            return tombstone.isCollectionTombstone() || tombstone.isRowDeletion(metadata);
+            assert hasNext();
+            return next.isRow();
         }
 
         public boolean nextIsStatic() throws IOException
         {
-            checkReady();
-            return nextAtom.isStatic();
+            return nextIsRow() && ((Row)next).isStatic();
         }
 
         public Unfiltered readNext() throws IOException
         {
-            if (!nextIsRow())
-            {
-                LegacyLayout.LegacyRangeTombstone tombstone = nextAtom.asRangeTombstone();
-                // TODO: this is actually more complex, we can have repeated markers etc....
-                if (closingMarker == null)
-                    throw new UnsupportedOperationException();
-                closingMarker = new RangeTombstoneBoundMarker(tombstone.stop.bound, tombstone.deletionTime);
-                return new RangeTombstoneBoundMarker(tombstone.start.bound, tombstone.deletionTime);
-            }
-
-            LegacyLayout.CellGrouper grouper = nextAtom.isStatic()
-                                             ? LegacyLayout.CellGrouper.staticGrouper(metadata,
helper)
-                                             : this.grouper;
-
-            grouper.reset();
-            grouper.addAtom(nextAtom);
-            while (deserializeNextAtom() && grouper.addAtom(nextAtom))
-            {
-                // Nothing to do, deserializeNextAtom() changes nextAtom and it's then added
to the grouper
-            }
-
-            // if this was the first static row, we're done with it. Otherwise, we're also
done with static.
-            staticFinished = true;
-            return grouper.getRow();
+            assert hasNext();
+            Unfiltered toReturn = next;
+            next = null;
+            return toReturn;
         }
 
         public void skipNext() throws IOException
         {
-            readNext();
+            assert hasNext();
+            next = null;
         }
 
         public void clearState()
         {
-            isDone = false;
+            next = null;
             nextAtom = null;
+            inputExhausted = false;
+        }
+
+        /**
+         * Tracks which range tombstones are open when deserializing the old format.
+         */
+        private class TombstoneTracker
+        {
+            private final DeletionTime partitionDeletion;
+
+            // Open tombstones sorted by their closing bound (i.e. first tombstone is the
first to close).
+            // As we only track non-fully-shadowed ranges, the first range is necessarily
the currently
+            // open tombstone (the one with the higher timestamp).
+            private final SortedSet<LegacyLayout.LegacyRangeTombstone> openTombstones;
+
+            public TombstoneTracker(DeletionTime partitionDeletion)
+            {
+                this.partitionDeletion = partitionDeletion;
+                this.openTombstones = new TreeSet<>((rt1, rt2) -> metadata.comparator.compare(rt1.stop.bound,
rt2.stop.bound));
+            }
+
+            /**
+             * Checks if the provided atom is fully shadowed by the open tombstones of this
tracker (or the partition deletion).
+             */
+            public boolean isShadowed(LegacyLayout.LegacyAtom atom)
+            {
+                long timestamp = atom.isCell() ? atom.asCell().timestamp : atom.asRangeTombstone().deletionTime.markedForDeleteAt();
+
+                if (partitionDeletion.deletes(timestamp))
+                    return true;
+
+                for (LegacyLayout.LegacyRangeTombstone tombstone : openTombstones)
+                {
+                    if (tombstone.deletionTime.deletes(timestamp))
+                    {
+                        // If it's a cell, then it's shadowed. If it's a RT, it's only shadowed
if it ends before the tombstone we test against
+                        return atom.isCell() || metadata.comparator.compare(atom.asRangeTombstone().stop.bound,
tombstone.stop.bound) <= 0;
+                    }
+                }
+
+                return false;
+            }
+
+            /**
+             * Whether the currently open marker closes stricly before the provided atom.
+             */
+            public boolean hasClosingMarkerBefore(LegacyLayout.LegacyAtom atom)
+            {
+                return !openTombstones.isEmpty()
+                    && metadata.comparator.compare(openTombstones.first().stop.bound,
atom.clustering()) < 0;
+            }
+
+            /**
+             * Returns the unfiltered corresponding to closing the currently open marker
(and update the tracker accordingly).
+             */
+            public Unfiltered popClosingMarker()
+            {
+                assert !openTombstones.isEmpty();
+
+                Iterator<LegacyLayout.LegacyRangeTombstone> iter = openTombstones.iterator();
+                LegacyLayout.LegacyRangeTombstone first = iter.next();
+                iter.remove();
+
+                // If that was the last open tombstone, we just want to close it. Otherwise,
we have a boundary with the
+                // next tombstone
+                if (!iter.hasNext())
+                    return new RangeTombstoneBoundMarker(first.stop.bound, first.deletionTime);
+
+                LegacyLayout.LegacyRangeTombstone next = iter.next();
+                return RangeTombstoneBoundaryMarker.makeBoundary(false, first.stop.bound,
first.stop.bound.invert(), first.deletionTime, next.deletionTime);
+            }
+
+            /**
+             * Update the tracker given the provided newly open tombstone. This return the
Unfiltered corresponding to the opening
+             * of said tombstone: this can be a simple open mark, a boundary (if there was
an open tombstone superseded by this new one)
+             * or even null (if the new tombston start is supersedes by the currently open
tombstone).
+             *
+             * Note that this method assume the added tombstone is not fully shadowed, i.e.
that !isShadowed(tombstone). It also
+             * assumes no opened tombstone closes before that tombstone (so !hasClosingMarkerBefore(tombstone)).
+             */
+            public Unfiltered openNew(LegacyLayout.LegacyRangeTombstone tombstone)
+            {
+                if (openTombstones.isEmpty())
+                {
+                    openTombstones.add(tombstone);
+                    return new RangeTombstoneBoundMarker(tombstone.start.bound, tombstone.deletionTime);
+                }
+
+                Iterator<LegacyLayout.LegacyRangeTombstone> iter = openTombstones.iterator();
+                LegacyLayout.LegacyRangeTombstone first = iter.next();
+                if (tombstone.deletionTime.supersedes(first.deletionTime))
+                {
+                    // We're supperseding the currently open tombstone, so we should produce
a boundary that close the currently open
+                    // one and open the new one. We should also add the tombstone, but if
it stop after the first one, we should
+                    // also remove that first tombstone as it won't be useful anymore.
+                    if (metadata.comparator.compare(tombstone.stop.bound, first.stop.bound)
>= 0)
+                        iter.remove();
+
+                    openTombstones.add(tombstone);
+                    return RangeTombstoneBoundaryMarker.makeBoundary(false, tombstone.start.bound.invert(),
tombstone.start.bound, first.deletionTime, tombstone.deletionTime);
+                }
+                else
+                {
+                    // If the new tombstone don't supersedes the currently open tombstone,
we don't have anything to return, we
+                    // just add the new tombstone (because we know tombstone is not fully
shadowed, this imply the new tombstone
+                    // simply extend after the first one and we'll deal with it later)
+                    assert metadata.comparator.compare(tombstone.start.bound, first.stop.bound)
> 0;
+                    openTombstones.add(tombstone);
+                    return null;
+                }
+            }
+
+            public boolean hasOpenTombstones()
+            {
+                return !openTombstones.isEmpty();
+            }
+
+            private boolean formBoundary(LegacyLayout.LegacyRangeTombstone close, LegacyLayout.LegacyRangeTombstone
open)
+            {
+                return metadata.comparator.compare(close.stop.bound, open.start.bound) ==
0;
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd5e4742/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
index 06855e3..01a8fb2 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
@@ -155,17 +155,23 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
             buffer.reset();
 
             boolean isFirst = true;
+            boolean isDone = false;
 
             // If the start might be in this block, skip everything that comes before it.
             if (start != null)
             {
-                while (deserializer.hasNext() && deserializer.compareNextTo(start)
<= 0 && !stopReadingDisk())
+                while (!isDone && deserializer.hasNext() && deserializer.compareNextTo(start)
<= 0)
                 {
                     isFirst = false;
                     if (deserializer.nextIsRow())
                         deserializer.skipNext();
                     else
                         updateOpenMarker((RangeTombstoneMarker)deserializer.readNext());
+
+                    // Note that because 'deserializer.hasNext()' may advance our file pointer,
we need to always check stopReadingDisk() before any call to it,
+                    // i.e. just after we've called readNext/skipNext
+                    if (stopReadingDisk())
+                        isDone = true;
                 }
             }
 
@@ -177,14 +183,17 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
             }
 
             // Now deserialize everything until we reach our requested end (if we have one)
-            while (deserializer.hasNext()
-                   && (end == null || deserializer.compareNextTo(end) <= 0)
-                   && !stopReadingDisk())
+            while (!isDone
+                   && deserializer.hasNext()
+                   && (end == null || deserializer.compareNextTo(end) <= 0))
             {
                 Unfiltered unfiltered = deserializer.readNext();
                 if (!isFirst || includeFirst)
                     buffer.add(unfiltered);
 
+                if (stopReadingDisk())
+                    isDone = true;
+
                 isFirst = false;
 
                 if (unfiltered.isRangeTombstoneMarker())
@@ -317,6 +326,7 @@ public class SSTableReversedIterator extends AbstractSSTableIterator
                 ClusteringPrefix firstOfCurrent = indexState.index(currentBlock).firstName;
                 includeFirst = metadata().comparator.compare(lastOfPrevious, firstOfCurrent)
!= 0;
             }
+
             loadFromDisk(canIncludeSliceStart ? slice.start() : null, canIncludeSliceEnd
? slice.end() : null, includeFirst);
         }
 


Mime
View raw message