cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sn...@apache.org
Subject [7/9] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2
Date Wed, 16 Sep 2015 20:05:00 GMT
Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e63dacf7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e63dacf7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e63dacf7

Branch: refs/heads/cassandra-3.0
Commit: e63dacf793fedc8a9eed9c7fc635cde5f9fd68f3
Parents: 8b2dc1f e889ee4
Author: Robert Stupp <snazy@snazy.de>
Authored: Wed Sep 16 22:00:25 2015 +0200
Committer: Robert Stupp <snazy@snazy.de>
Committed: Wed Sep 16 22:00:25 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/cache/AutoSavingCache.java | 193 ++++++++------
 .../org/apache/cassandra/cache/CacheKey.java    |  14 +-
 .../apache/cassandra/cache/CounterCacheKey.java |  26 +-
 .../org/apache/cassandra/cache/KeyCacheKey.java |  19 +-
 .../org/apache/cassandra/cache/OHCProvider.java |  17 +-
 .../org/apache/cassandra/cache/RowCacheKey.java |  34 +--
 .../org/apache/cassandra/config/CFMetaData.java |   9 +
 .../cassandra/config/DatabaseDescriptor.java    |  19 +-
 .../org/apache/cassandra/config/Schema.java     |  56 +++-
 .../apache/cassandra/db/ColumnFamilyStore.java  |  75 ++----
 src/java/org/apache/cassandra/db/Keyspace.java  |   4 -
 .../org/apache/cassandra/db/RowIndexEntry.java  |   2 +-
 .../db/index/SecondaryIndexManager.java         |  30 +--
 .../io/sstable/format/SSTableReader.java        |  10 +-
 .../io/sstable/format/big/BigTableReader.java   |   2 +-
 .../apache/cassandra/service/CacheService.java  |  58 ++--
 .../cassandra/service/CassandraDaemon.java      |  41 ++-
 .../cassandra/service/StorageService.java       |  31 ++-
 .../org/apache/cassandra/utils/FBUtilities.java |  16 ++
 .../cassandra/cache/AutoSavingCacheTest.java    |   5 +-
 .../cassandra/cache/CacheProviderTest.java      |  17 +-
 .../apache/cassandra/cql3/KeyCacheCqlTest.java  | 266 +++++++++++++++++++
 .../apache/cassandra/db/CounterCacheTest.java   |  70 ++++-
 .../org/apache/cassandra/db/KeyCacheTest.java   |   2 +-
 .../org/apache/cassandra/db/RowCacheTest.java   |  41 ++-
 26 files changed, 760 insertions(+), 298 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 7deebcf,207f16a..96ec0fa
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,14 -1,5 +1,15 @@@
 -2.1.10
 +2.2.2
 + * Defer default role manager setup until all nodes are on 2.2+ (CASSANDRA-9761)
 + * Cancel transaction for sstables we wont redistribute index summary
 +   for (CASSANDRA-10270)
 + * Handle missing RoleManager in config after upgrade to 2.2 (CASSANDRA-10209) 
 + * Retry snapshot deletion after compaction and gc on Windows (CASSANDRA-10222)
 + * Fix failure to start with space in directory path on Windows (CASSANDRA-10239)
 + * Fix repair hang when snapshot failed (CASSANDRA-10057)
 + * Fall back to 1/4 commitlog volume for commitlog_total_space on small disks
 +   (CASSANDRA-10199)
 +Merged from 2.1:
+  * Fix cache handling of 2i and base tables (CASSANDRA-10155)
   * Fix NPE in nodetool compactionhistory (CASSANDRA-9758)
   * (Pig) support BulkOutputFormat as a URL parameter (CASSANDRA-7410)
   * BATCH statement is broken in cqlsh (CASSANDRA-10272)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cache/AutoSavingCache.java
index f0f4e8a,3ebbc76..3ec9d4e
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@@ -61,8 -65,16 +67,16 @@@ public class AutoSavingCache<K extends 
      protected volatile ScheduledFuture<?> saveTask;
      protected final CacheService.CacheType cacheType;
  
 -    private CacheSerializer<K, V> cacheLoader;
 +    private final CacheSerializer<K, V> cacheLoader;
-     private static final String CURRENT_VERSION = "c";
+ 
+     /*
+      * CASSANDRA-10155 required a format change to fix 2i indexes and caching.
+      * 2.2 is already at version "c" and 3.0 is at "d".
+      *
+      * Since cache versions match exactly and there is no partial fallback just add
+      * a minor version letter.
+      */
 -    private static final String CURRENT_VERSION = "ba";
++    private static final String CURRENT_VERSION = "ca";
  
      private static volatile IStreamFactory streamFactory = new IStreamFactory()
      {
@@@ -90,16 -102,9 +104,14 @@@
          this.cacheLoader = cacheloader;
      }
  
-     public File getCacheDataPath(UUID cfId, String version)
 -    public File getCachePath(String version)
++    public File getCacheDataPath(String version)
      {
-         Pair<String, String> names = Schema.instance.getCF(cfId);
-         return DatabaseDescriptor.getSerializedCachePath(names.left, names.right, cfId,
cacheType, version, "db");
 -        return DatabaseDescriptor.getSerializedCachePath(cacheType, version);
++        return DatabaseDescriptor.getSerializedCachePath( cacheType, version, "db");
 +    }
 +
-     public File getCacheCrcPath(UUID cfId, String version)
++    public File getCacheCrcPath(String version)
 +    {
-         Pair<String, String> names = Schema.instance.getCF(cfId);
-         return DatabaseDescriptor.getSerializedCachePath(names.left, names.right, cfId,
cacheType, version, "crc");
++        return DatabaseDescriptor.getSerializedCachePath( cacheType, version, "crc");
      }
  
      public Writer getWriter(int keysToSave)
@@@ -136,42 -170,65 +177,70 @@@
          long start = System.nanoTime();
  
          // modern format, allows both key and value (so key cache load can be purely sequential)
-         File dataPath = getCacheDataPath(cfs.metadata.cfId, CURRENT_VERSION);
-         File crcPath = getCacheCrcPath(cfs.metadata.cfId, CURRENT_VERSION);
 -        File path = getCachePath(CURRENT_VERSION);
 -        if (path.exists())
++        File dataPath = getCacheDataPath(CURRENT_VERSION);
++        File crcPath = getCacheCrcPath(CURRENT_VERSION);
 +        if (dataPath.exists() && crcPath.exists())
          {
              DataInputStream in = null;
              try
              {
 -                logger.info(String.format("reading saved cache %s", path));
 -                in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(path)),
path.length()));
 +                logger.info(String.format("reading saved cache %s", dataPath));
 +                in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(dataPath,
crcPath)), dataPath.length()));
-                 List<Future<Pair<K, V>>> futures = new ArrayList<Future<Pair<K,
V>>>();
+                 ArrayDeque<Future<Pair<K, V>>> futures = new ArrayDeque<Future<Pair<K,
V>>>();
 -
                  while (in.available() > 0)
                  {
-                     Future<Pair<K, V>> entry = cacheLoader.deserialize(in, cfs);
+                     //ksname and cfname are serialized by the serializers in CacheService
+                     //That is delegated there because there are serializer specific conditions
+                     //where a cache key is skipped and not written
+                     String ksname = in.readUTF();
+                     String cfname = in.readUTF();
+ 
+                     ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreIncludingIndexes(Pair.create(ksname,
cfname));
+ 
+                     Future<Pair<K, V>> entryFuture = cacheLoader.deserialize(in,
cfs);
                      // Key cache entry can return null, if the SSTable doesn't exist.
-                     if (entry == null)
+                     if (entryFuture == null)
                          continue;
-                     futures.add(entry);
+ 
+                     futures.offer(entryFuture);
                      count++;
+ 
+                     /*
+                      * Kind of unwise to accrue an unbounded number of pending futures
+                      * So now there is this loop to keep a bounded number pending.
+                      */
+                     do
+                     {
+                         while (futures.peek() != null && futures.peek().isDone())
+                         {
+                             Future<Pair<K, V>> future = futures.poll();
+                             Pair<K, V> entry = future.get();
+                             if (entry != null && entry.right != null)
+                                 put(entry.left, entry.right);
+                         }
+ 
+                         if (futures.size() > 1000)
+                             Thread.yield();
+                     } while(futures.size() > 1000);
                  }
  
-                 for (Future<Pair<K, V>> future : futures)
+                 Future<Pair<K, V>> future = null;
+                 while ((future = futures.poll()) != null)
                  {
                      Pair<K, V> entry = future.get();
                      if (entry != null && entry.right != null)
                          put(entry.left, entry.right);
                  }
              }
 +            catch (CorruptFileException e)
 +            {
 +                JVMStabilityInspector.inspectThrowable(e);
 +                logger.warn(String.format("Non-fatal checksum error reading saved cache
%s", dataPath.getAbsolutePath()), e);
 +            }
-             catch (Exception e)
+             catch (Throwable t)
              {
-                 JVMStabilityInspector.inspectThrowable(e);
-                 logger.debug(String.format("harmless error reading saved cache %s", dataPath.getAbsolutePath()),
e);
+                 JVMStabilityInspector.inspectThrowable(t);
 -                logger.info(String.format("Harmless error reading saved cache %s", path.getAbsolutePath()),
t);
++                logger.info(String.format("Harmless error reading saved cache %s", dataPath.getAbsolutePath()),
t);
              }
              finally
              {
@@@ -236,11 -284,9 +305,10 @@@
          public CompactionInfo getCompactionInfo()
          {
              // keyset can change in size, thus total can too
 -            return info.forProgress(keysWritten, Math.max(keysWritten, keys.size()));
 +            // TODO need to check for this one... was: info.forProgress(keysWritten, Math.max(keysWritten,
keys.size()));
 +            return info.forProgress(keysWritten, Math.max(keysWritten, keysEstimate));
          }
  
-         @SuppressWarnings("resource")
          public void saveCache()
          {
              logger.debug("Deleting old {} files.", cacheType);
@@@ -254,37 -300,25 +322,26 @@@
  
              long start = System.nanoTime();
  
-             HashMap<UUID, DataOutputPlus> writers = new HashMap<>();
-             HashMap<UUID, OutputStream> streams = new HashMap<>();
-             HashMap<UUID, Pair<File, File>> paths = new HashMap<>();
- 
 -            DataOutputStreamPlus writer = null;
 -            File tempCacheFile = tempCacheFile();
++            WrappedDataOutputStreamPlus writer = null;
++            Pair<File, File> cacheFilePaths = tempCacheFiles();
              try
              {
+                 try
+                 {
 -                    writer = new DataOutputStreamPlus(streamFactory.getOutputStream(tempCacheFile));
++                    writer = new WrappedDataOutputStreamPlus(streamFactory.getOutputStream(cacheFilePaths.left,
cacheFilePaths.right));
+                 }
+                 catch (FileNotFoundException e)
+                 {
+                     throw new RuntimeException(e);
+                 }
+ 
 -                for (K key : keys)
 +                while (keyIterator.hasNext())
                  {
 +                    K key = keyIterator.next();
-                     UUID cfId = key.getCFId();
-                     if (!Schema.instance.hasCF(key.getCFId()))
-                         continue; // the table has been dropped.
  
-                     DataOutputPlus writer = writers.get(cfId);
-                     if (writer == null)
-                     {
-                         Pair<File, File> cacheFilePaths = tempCacheFiles(cfId);
-                         OutputStream stream;
-                         try
-                         {
-                             stream = streamFactory.getOutputStream(cacheFilePaths.left,
cacheFilePaths.right);
-                             writer = new WrappedDataOutputStreamPlus(stream);
-                         }
-                         catch (FileNotFoundException e)
-                         {
-                             throw new RuntimeException(e);
-                         }
-                         paths.put(cfId, cacheFilePaths);
-                         streams.put(cfId, stream);
-                         writers.put(cfId, writer);
-                     }
+                     ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreIncludingIndexes(key.ksAndCFName);
+                     if (cfs == null)
+                         continue; // the table or 2i has been dropped.
  
                      try
                      {
@@@ -292,7 -326,7 +349,7 @@@
                      }
                      catch (IOException e)
                      {
-                         throw new FSWriteError(e, paths.get(cfId).left);
 -                        throw new FSWriteError(e, tempCacheFile);
++                        throw new FSWriteError(e, cacheFilePaths.left);
                      }
  
                      keysWritten++;
@@@ -302,49 -334,24 +359,31 @@@
              }
              finally
              {
-                 if (keyIterator instanceof Closeable)
-                     try
-                     {
-                         ((Closeable)keyIterator).close();
-                     }
-                     catch (IOException ignored)
-                     {
-                         // not thrown (by OHC)
-                     }
- 
-                 for (OutputStream writer : streams.values())
-                 {
+                 if (writer != null)
                      FileUtils.closeQuietly(writer);
-                 }
              }
  
-             for (Map.Entry<UUID, DataOutputPlus> entry : writers.entrySet())
-             {
-                 UUID cfId = entry.getKey();
 -            File cacheFile = getCachePath(CURRENT_VERSION);
++            File cacheFile = getCacheDataPath(CURRENT_VERSION);
++            File crcFile = getCacheCrcPath(CURRENT_VERSION);
  
-                 Pair<File, File> tmpFiles = paths.get(cfId);
-                 File cacheFile = getCacheDataPath(cfId, CURRENT_VERSION);
-                 File crcFile = getCacheCrcPath(cfId, CURRENT_VERSION);
+             cacheFile.delete(); // ignore error if it didn't exist
++            crcFile.delete();
 +
-                 cacheFile.delete(); // ignore error if it didn't exist
-                 crcFile.delete();
++            if (!cacheFilePaths.left.renameTo(cacheFile))
++                logger.error("Unable to rename {} to {}", cacheFilePaths.left, cacheFile);
  
-                 if (!tmpFiles.left.renameTo(cacheFile))
-                     logger.error("Unable to rename {} to {}", tmpFiles.left, cacheFile);
- 
-                 if (!tmpFiles.right.renameTo(crcFile))
-                     logger.error("Unable to rename {} to {}", tmpFiles.right, crcFile);
-             }
 -            if (!tempCacheFile.renameTo(cacheFile))
 -                logger.error("Unable to rename {} to {}", tempCacheFile, cacheFile);
++            if (!cacheFilePaths.right.renameTo(crcFile))
++                logger.error("Unable to rename {} to {}", cacheFilePaths.right, crcFile);
  
 -            logger.info("Saved {} ({} items) in {} ms", cacheType, keys.size(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime()
- start));
 +            logger.info("Saved {} ({} items) in {} ms", cacheType, keysWritten, TimeUnit.NANOSECONDS.toMillis(System.nanoTime()
- start));
          }
  
-         private Pair<File, File> tempCacheFiles(UUID cfId)
 -        private File tempCacheFile()
++        private Pair<File, File> tempCacheFiles()
          {
-             File dataPath = getCacheDataPath(cfId, CURRENT_VERSION);
-             File crcPath = getCacheCrcPath(cfId, CURRENT_VERSION);
 -            File path = getCachePath(CURRENT_VERSION);
 -            return FileUtils.createTempFile(path.getName(), null, path.getParentFile());
++            File dataPath = getCacheDataPath(CURRENT_VERSION);
++            File crcPath = getCacheCrcPath(CURRENT_VERSION);
 +            return Pair.create(FileUtils.createTempFile(dataPath.getName(), null, dataPath.getParentFile()),
 +                               FileUtils.createTempFile(crcPath.getName(), null, crcPath.getParentFile()));
          }
  
          private void deleteOldCacheFiles()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/cache/OHCProvider.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cache/OHCProvider.java
index e4cfb69,0000000..9b1c8cf
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/cache/OHCProvider.java
+++ b/src/java/org/apache/cassandra/cache/OHCProvider.java
@@@ -1,282 -1,0 +1,285 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.cache;
 +
 +import java.io.DataInput;
 +import java.io.DataOutput;
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.nio.channels.WritableByteChannel;
 +import java.util.Iterator;
- import java.util.UUID;
 +
 +import com.google.common.base.Function;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.ColumnFamily;
 +import org.apache.cassandra.db.TypeSizes;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.io.util.Memory;
 +import org.apache.cassandra.net.MessagingService;
++import org.apache.cassandra.utils.Pair;
 +import org.caffinitas.ohc.OHCache;
 +import org.caffinitas.ohc.OHCacheBuilder;
 +
 +public class OHCProvider implements CacheProvider<RowCacheKey, IRowCacheEntry>
 +{
 +    public ICache<RowCacheKey, IRowCacheEntry> create()
 +    {
 +        OHCacheBuilder<RowCacheKey, IRowCacheEntry> builder = OHCacheBuilder.newBuilder();
 +        builder.capacity(DatabaseDescriptor.getRowCacheSizeInMB() * 1024 * 1024)
 +               .keySerializer(new KeySerializer())
 +               .valueSerializer(new ValueSerializer())
 +               .throwOOME(true);
 +
 +        return new OHCacheAdapter(builder.build());
 +    }
 +
 +    private static class OHCacheAdapter implements ICache<RowCacheKey, IRowCacheEntry>
 +    {
 +        private final OHCache<RowCacheKey, IRowCacheEntry> ohCache;
 +
 +        public OHCacheAdapter(OHCache<RowCacheKey, IRowCacheEntry> ohCache)
 +        {
 +            this.ohCache = ohCache;
 +        }
 +
 +        public long capacity()
 +        {
 +            return ohCache.capacity();
 +        }
 +
 +        public void setCapacity(long capacity)
 +        {
 +            ohCache.setCapacity(capacity);
 +        }
 +
 +        public void put(RowCacheKey key, IRowCacheEntry value)
 +        {
 +            ohCache.put(key, value);
 +        }
 +
 +        public boolean putIfAbsent(RowCacheKey key, IRowCacheEntry value)
 +        {
 +            return ohCache.putIfAbsent(key, value);
 +        }
 +
 +        public boolean replace(RowCacheKey key, IRowCacheEntry old, IRowCacheEntry value)
 +        {
 +            return ohCache.addOrReplace(key, old, value);
 +        }
 +
 +        public IRowCacheEntry get(RowCacheKey key)
 +        {
 +            return ohCache.get(key);
 +        }
 +
 +        public void remove(RowCacheKey key)
 +        {
 +            ohCache.remove(key);
 +        }
 +
 +        public int size()
 +        {
 +            return (int) ohCache.size();
 +        }
 +
 +        public long weightedSize()
 +        {
 +            return ohCache.size();
 +        }
 +
 +        public void clear()
 +        {
 +            ohCache.clear();
 +        }
 +
 +        public Iterator<RowCacheKey> hotKeyIterator(int n)
 +        {
 +            return ohCache.hotKeyIterator(n);
 +        }
 +
 +        public Iterator<RowCacheKey> keyIterator()
 +        {
 +            return ohCache.keyIterator();
 +        }
 +
 +        public boolean containsKey(RowCacheKey key)
 +        {
 +            return ohCache.containsKey(key);
 +        }
 +    }
 +
 +    private static class KeySerializer implements org.caffinitas.ohc.CacheSerializer<RowCacheKey>
 +    {
 +        public void serialize(RowCacheKey rowCacheKey, DataOutput dataOutput) throws IOException
 +        {
-             dataOutput.writeLong(rowCacheKey.cfId.getMostSignificantBits());
-             dataOutput.writeLong(rowCacheKey.cfId.getLeastSignificantBits());
++            dataOutput.writeUTF(rowCacheKey.ksAndCFName.left);
++            dataOutput.writeUTF(rowCacheKey.ksAndCFName.right);
 +            dataOutput.writeInt(rowCacheKey.key.length);
 +            dataOutput.write(rowCacheKey.key);
 +        }
 +
 +        public RowCacheKey deserialize(DataInput dataInput) throws IOException
 +        {
-             long msb = dataInput.readLong();
-             long lsb = dataInput.readLong();
++            String ksName = dataInput.readUTF();
++            String cfName = dataInput.readUTF();
 +            byte[] key = new byte[dataInput.readInt()];
 +            dataInput.readFully(key);
-             return new RowCacheKey(new UUID(msb, lsb), key);
++            return new RowCacheKey(Pair.create(ksName, cfName), key);
 +        }
 +
 +        public int serializedSize(RowCacheKey rowCacheKey)
 +        {
-             return 20 + rowCacheKey.key.length;
++            return TypeSizes.NATIVE.sizeof(rowCacheKey.ksAndCFName.left)
++                    + TypeSizes.NATIVE.sizeof(rowCacheKey.ksAndCFName.right)
++                    + 4
++                    + rowCacheKey.key.length;
 +        }
 +    }
 +
 +    private static class ValueSerializer implements org.caffinitas.ohc.CacheSerializer<IRowCacheEntry>
 +    {
 +        public void serialize(IRowCacheEntry entry, DataOutput out) throws IOException
 +        {
 +            assert entry != null; // unlike CFS we don't support nulls, since there is no
need for that in the cache
 +            boolean isSentinel = entry instanceof RowCacheSentinel;
 +            out.writeBoolean(isSentinel);
 +            if (isSentinel)
 +                out.writeLong(((RowCacheSentinel) entry).sentinelId);
 +            else
 +                ColumnFamily.serializer.serialize((ColumnFamily) entry, new DataOutputPlusAdapter(out),
MessagingService.current_version);
 +        }
 +
 +        public IRowCacheEntry deserialize(DataInput in) throws IOException
 +        {
 +            boolean isSentinel = in.readBoolean();
 +            if (isSentinel)
 +                return new RowCacheSentinel(in.readLong());
 +            return ColumnFamily.serializer.deserialize(in, MessagingService.current_version);
 +        }
 +
 +        public int serializedSize(IRowCacheEntry entry)
 +        {
 +            TypeSizes typeSizes = TypeSizes.NATIVE;
 +            int size = typeSizes.sizeof(true);
 +            if (entry instanceof RowCacheSentinel)
 +                size += typeSizes.sizeof(((RowCacheSentinel) entry).sentinelId);
 +            else
 +                size += ColumnFamily.serializer.serializedSize((ColumnFamily) entry, typeSizes,
MessagingService.current_version);
 +            return size;
 +        }
 +    }
 +
 +    static class DataOutputPlusAdapter implements DataOutputPlus
 +    {
 +        private final DataOutput out;
 +
 +        public void write(byte[] b) throws IOException
 +        {
 +            out.write(b);
 +        }
 +
 +        public void write(byte[] b, int off, int len) throws IOException
 +        {
 +            out.write(b, off, len);
 +        }
 +
 +        public void write(int b) throws IOException
 +        {
 +            out.write(b);
 +        }
 +
 +        public void writeBoolean(boolean v) throws IOException
 +        {
 +            out.writeBoolean(v);
 +        }
 +
 +        public void writeByte(int v) throws IOException
 +        {
 +            out.writeByte(v);
 +        }
 +
 +        public void writeBytes(String s) throws IOException
 +        {
 +            out.writeBytes(s);
 +        }
 +
 +        public void writeChar(int v) throws IOException
 +        {
 +            out.writeChar(v);
 +        }
 +
 +        public void writeChars(String s) throws IOException
 +        {
 +            out.writeChars(s);
 +        }
 +
 +        public void writeDouble(double v) throws IOException
 +        {
 +            out.writeDouble(v);
 +        }
 +
 +        public void writeFloat(float v) throws IOException
 +        {
 +            out.writeFloat(v);
 +        }
 +
 +        public void writeInt(int v) throws IOException
 +        {
 +            out.writeInt(v);
 +        }
 +
 +        public void writeLong(long v) throws IOException
 +        {
 +            out.writeLong(v);
 +        }
 +
 +        public void writeShort(int v) throws IOException
 +        {
 +            out.writeShort(v);
 +        }
 +
 +        public void writeUTF(String s) throws IOException
 +        {
 +            out.writeUTF(s);
 +        }
 +
 +        public DataOutputPlusAdapter(DataOutput out)
 +        {
 +            this.out = out;
 +        }
 +
 +        public void write(ByteBuffer buffer) throws IOException
 +        {
 +            if (buffer.hasArray())
 +                out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
 +            else
 +                throw new UnsupportedOperationException("IMPLEMENT ME");
 +        }
 +
 +        public void write(Memory memory, long offset, long length) throws IOException
 +        {
 +            throw new UnsupportedOperationException("IMPLEMENT ME");
 +        }
 +
 +        public <R> R applyToChannel(Function<WritableByteChannel, R> c) throws
IOException
 +        {
 +            throw new UnsupportedOperationException("IMPLEMENT ME");
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/cache/RowCacheKey.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cache/RowCacheKey.java
index ccb85d8,c959fd1..e02db42
--- a/src/java/org/apache/cassandra/cache/RowCacheKey.java
+++ b/src/java/org/apache/cassandra/cache/RowCacheKey.java
@@@ -33,20 -31,14 +31,20 @@@ public final class RowCacheKey extends 
  
      private static final long EMPTY_SIZE = ObjectSizes.measure(new RowCacheKey(null, ByteBufferUtil.EMPTY_BYTE_BUFFER));
  
-     public RowCacheKey(UUID cfId, byte[] key)
++    public RowCacheKey(Pair<String, String> ksAndCFName, byte[] key)
 +    {
-         this.cfId = cfId;
++        super(ksAndCFName);
 +        this.key = key;
 +    }
 +
-     public RowCacheKey(UUID cfId, DecoratedKey key)
+     public RowCacheKey(Pair<String, String> ksAndCFName, DecoratedKey key)
      {
-         this(cfId, key.getKey());
+         this(ksAndCFName, key.getKey());
      }
  
-     public RowCacheKey(UUID cfId, ByteBuffer key)
+     public RowCacheKey(Pair<String, String> ksAndCFName, ByteBuffer key)
      {
-         this.cfId = cfId;
+         super(ksAndCFName);
          this.key = ByteBufferUtil.getArray(key);
          assert this.key != null;
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/CFMetaData.java
index 6468973,2939f09..348eb89
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@@ -47,11 -48,15 +47,12 @@@ import org.apache.cassandra.db.marshal.
  import org.apache.cassandra.exceptions.*;
  import org.apache.cassandra.io.compress.CompressionParameters;
  import org.apache.cassandra.io.compress.LZ4Compressor;
 -import org.apache.cassandra.io.sstable.Descriptor;
 -import org.apache.cassandra.serializers.MarshalException;
 -import org.apache.cassandra.thrift.CfDef;
 -import org.apache.cassandra.thrift.CqlResult;
 -import org.apache.cassandra.thrift.CqlRow;
 -import org.apache.cassandra.tracing.Tracing;
 +import org.apache.cassandra.io.sstable.format.Version;
 +import org.apache.cassandra.io.util.FileDataInput;
 +import org.apache.cassandra.schema.LegacySchemaTables;
  import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.Pair;
  import org.apache.cassandra.utils.UUIDGen;
  import org.github.jamm.Unmetered;
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 545ad05,84381a0..c459b5d
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -1480,20 -1431,17 +1480,11 @@@ public class DatabaseDescripto
          return conf.max_hint_window_in_ms;
      }
  
-     public static File getSerializedCachePath(String ksName,
-                                               String cfName,
-                                               UUID cfId,
-                                               CacheService.CacheType cacheType,
-                                               String version,
-                                               String extension)
 -    @Deprecated
 -    public static Integer getIndexInterval()
--    {
-         StringBuilder builder = new StringBuilder();
-         builder.append(ksName).append('-');
-         builder.append(cfName).append('-');
-         builder.append(ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(cfId))).append('-');
-         builder.append(cacheType);
-         builder.append((version == null ? "" : "-" + version + "." + extension));
-         return new File(conf.saved_caches_directory, builder.toString());
 -        return conf.index_interval;
 -    }
 -
 -    public static File getSerializedCachePath(CacheService.CacheType cacheType, String version)
++    public static File getSerializedCachePath(CacheService.CacheType cacheType, String version,
String extension)
+     {
+         String name = cacheType.toString()
 -                + (version == null ? "" : "-" + version + ".db");
++                + (version == null ? "" : "-" + version + "." + extension);
+         return new File(conf.saved_caches_directory, name);
      }
  
      public static int getDynamicUpdateInterval()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/Schema.java
index 548341e,fada670..00c9358
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@@ -26,18 -28,14 +26,19 @@@ import com.google.common.collect.Sets
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 +import org.apache.cassandra.cql3.functions.Functions;
 +import org.apache.cassandra.cql3.functions.UDAggregate;
 +import org.apache.cassandra.cql3.functions.UDFunction;
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.commitlog.CommitLog;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.marshal.UserType;
+ import org.apache.cassandra.db.index.SecondaryIndex;
 -import org.apache.cassandra.db.index.SecondaryIndexManager;
  import org.apache.cassandra.io.sstable.Descriptor;
 +import org.apache.cassandra.schema.LegacySchemaTables;
  import org.apache.cassandra.service.MigrationManager;
  import org.apache.cassandra.utils.ConcurrentBiMap;
 -import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.cassandra.utils.Pair;
  import org.cliffc.high_scale_lib.NonBlockingHashMap;
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 343ecee,ffaa276..a8a8910
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -1665,9 -1655,9 +1626,9 @@@ public class ColumnFamilyStore implemen
      private ColumnFamily getThroughCache(UUID cfId, QueryFilter filter)
      {
          assert isRowCacheEnabled()
 -               : String.format("Row cache is not enabled on column family [" + name + "]");
 +               : String.format("Row cache is not enabled on table [" + name + "]");
  
-         RowCacheKey key = new RowCacheKey(cfId, filter.key);
+         RowCacheKey key = new RowCacheKey(metadata.ksAndCFName, filter.key);
  
          // attempt a sentinel-read-cache sequence.  if a write invalidates our sentinel,
we'll return our
          // (now potentially obsolete) data, but won't cache it. see CASSANDRA-3862
@@@ -2075,23 -2026,19 +2036,23 @@@
      {
          Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName());
  
 -        for (RowCacheKey key : CacheService.instance.rowCache.getKeySet())
 +        for (Iterator<RowCacheKey> keyIter = CacheService.instance.rowCache.keyIterator();
 +             keyIter.hasNext(); )
          {
 +            RowCacheKey key = keyIter.next();
              DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.key));
-             if (key.cfId.equals(metadata.cfId) && !Range.isInRanges(dk.getToken(),
ranges))
+             if (key.ksAndCFName.equals(metadata.ksAndCFName) && !Range.isInRanges(dk.getToken(),
ranges))
                  invalidateCachedRow(dk);
          }
  
          if (metadata.isCounter())
          {
 -            for (CounterCacheKey key : CacheService.instance.counterCache.getKeySet())
 +            for (Iterator<CounterCacheKey> keyIter = CacheService.instance.counterCache.keyIterator();
 +                 keyIter.hasNext(); )
              {
 +                CounterCacheKey key = keyIter.next();
                  DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.partitionKey));
-                 if (key.cfId.equals(metadata.cfId) && !Range.isInRanges(dk.getToken(),
ranges))
+                 if (key.ksAndCFName.equals(metadata.ksAndCFName) && !Range.isInRanges(dk.getToken(),
ranges))
                      CacheService.instance.counterCache.remove(key);
              }
          }
@@@ -2965,13 -2955,21 +2926,13 @@@
          }
      }
  
 -    /**
 -     * Returns the creation time of the oldest memtable not fully flushed yet.
 -     */
 -    public long oldestUnflushedMemtable()
 -    {
 -        return data.getView().getOldestMemtable().creationTime();
 -    }
 -
      public boolean isEmpty()
      {
 -        DataTracker.View view = data.getView();
 -        return view.sstables.isEmpty() && view.getCurrentMemtable().getOperations()
== 0 && view.getCurrentMemtable() == view.getOldestMemtable();
 +        View view = data.getView();
 +        return view.sstables.isEmpty() && view.getCurrentMemtable().getOperations()
== 0 && view.liveMemtables.size() <= 1 && view.flushingMemtables.size()
== 0;
      }
  
-     private boolean isRowCacheEnabled()
+     public boolean isRowCacheEnabled()
      {
          return metadata.getCaching().rowCache.isEnabled() && CacheService.instance.rowCache.getCapacity()
> 0;
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/db/RowIndexEntry.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e63dacf7/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------


Mime
View raw message