phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [1/3] phoenix git commit: PHOENIX-2557 Track unfree memory for server-side cache
Date Mon, 04 Jan 2016 17:37:55 GMT
Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.0 139e0d74e -> 423bd134f


PHOENIX-2557 Track unfree memory for server-side cache


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/31a414c8
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/31a414c8
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/31a414c8

Branch: refs/heads/4.x-HBase-1.0
Commit: 31a414c84e64a1de366703cb1faa25c9e506d1a3
Parents: 139e0d7
Author: James Taylor <jtaylor@salesforce.com>
Authored: Sun Jan 3 15:51:40 2016 -0800
Committer: James Taylor <jtaylor@salesforce.com>
Committed: Mon Jan 4 09:36:30 2016 -0800

----------------------------------------------------------------------
 .../ConnectionQueryServicesTestImpl.java        |   4 +-
 .../org/apache/phoenix/cache/GlobalCache.java   |  23 ++-
 .../org/apache/phoenix/cache/TenantCache.java   |   4 +-
 .../apache/phoenix/cache/TenantCacheImpl.java   |  28 ++-
 .../coprocessor/MetaDataEndpointImpl.java       |   5 +-
 .../coprocessor/ServerCachingEndpointImpl.java  |   7 +-
 .../coprocessor/generated/MetaDataProtos.java   | 171 +++++++++++++++----
 .../phoenix/memory/GlobalMemoryManager.java     |   6 +
 .../phoenix/query/ConnectionQueryServices.java  |   2 +-
 .../query/ConnectionQueryServicesImpl.java      |  39 +++--
 .../query/ConnectionlessQueryServicesImpl.java  |   9 +-
 .../query/DelegateConnectionQueryServices.java  |   4 +-
 .../apache/phoenix/cache/TenantCacheTest.java   | 100 +++++++++++
 phoenix-protocol/src/main/MetaDataService.proto |   1 +
 14 files changed, 332 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a414c8/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
index d941fd5..f5d7f18 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
@@ -81,7 +81,9 @@ public class ConnectionQueryServicesTestImpl extends ConnectionQueryServicesImpl
             Set<PhoenixConnection> connections = this.connections;
             this.connections = Sets.newHashSet();
             SQLCloseables.closeAll(connections);
-            clearCache();
+            long unfreedBytes = clearCache();
+            // FIXME: once PHOENIX-2556 is fixed, comment this back in
+            // assertEquals(0,unfreedBytes);
         } finally {
             super.close();
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a414c8/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
index af5438c..7d04f5b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
@@ -22,6 +22,7 @@ import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_WAIT_MS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.MAX_TENANT_MEMORY_PERC_ATTRIB;
 
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
@@ -29,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.memory.ChildMemoryManager;
 import org.apache.phoenix.memory.GlobalMemoryManager;
@@ -36,6 +38,8 @@ import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PMetaDataEntity;
 import org.apache.phoenix.util.SizedUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
@@ -51,6 +55,7 @@ import com.google.common.cache.Weigher;
  * @since 0.1
  */
 public class GlobalCache extends TenantCacheImpl {
+    private static final Logger logger = LoggerFactory.getLogger(GlobalCache.class);
     private static volatile GlobalCache INSTANCE; 
     
     private final Configuration config;
@@ -59,8 +64,24 @@ public class GlobalCache extends TenantCacheImpl {
     // Cache for lastest PTable for a given Phoenix table
     private volatile Cache<ImmutableBytesPtr,PMetaDataEntity> metaDataCache;
     
-    public void clearTenantCache() {
+    public long clearTenantCache() {
+        long unfreedBytes = getMemoryManager().getMaxMemory() - getMemoryManager().getAvailableMemory();
+        if (unfreedBytes != 0 && logger.isDebugEnabled()) {
+            logger.debug("Found " + (getMemoryManager().getMaxMemory() - getMemoryManager().getAvailableMemory())
+ " bytes not freed from global cache");
+        }
+        removeAllServerCache();
+        for (Map.Entry<ImmutableBytesWritable, TenantCache> entry : perTenantCacheMap.entrySet())
{
+            TenantCache cache = entry.getValue();
+            long unfreedTenantBytes = cache.getMemoryManager().getMaxMemory() - cache.getMemoryManager().getAvailableMemory();
+            if (unfreedTenantBytes != 0 && logger.isDebugEnabled()) {
+                ImmutableBytesWritable cacheId = entry.getKey();
+                logger.debug("Found " + unfreedTenantBytes + " bytes not freed for tenant
" + Bytes.toStringBinary(cacheId.get(), cacheId.getOffset(), cacheId.getLength()));
+            }
+            unfreedBytes += unfreedTenantBytes;
+            cache.removeAllServerCache();
+        }
         perTenantCacheMap.clear();
+        return unfreedBytes;
     }
     
     public Cache<ImmutableBytesPtr,PMetaDataEntity> getMetaDataCache() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a414c8/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
index c7cd58f..5c33967 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
@@ -21,7 +21,6 @@ import java.io.Closeable;
 import java.sql.SQLException;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
 import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.memory.MemoryManager;
@@ -38,5 +37,6 @@ public interface TenantCache {
     MemoryManager getMemoryManager();
     Closeable getServerCache(ImmutableBytesPtr cacheId);
     Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable cachePtr,
byte[] txState, ServerCacheFactory cacheFactory) throws SQLException;
-    void removeServerCache(ImmutableBytesPtr cacheId) throws SQLException;
+    void removeServerCache(ImmutableBytesPtr cacheId);
+    void removeAllServerCache();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a414c8/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
index 6ef7a6f..658b4cc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
@@ -29,6 +29,7 @@ import org.apache.phoenix.memory.MemoryManager;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.util.Closeables;
 
+import com.google.common.base.Ticker;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalListener;
@@ -45,11 +46,30 @@ import com.google.common.cache.RemovalNotification;
 public class TenantCacheImpl implements TenantCache {
     private final int maxTimeToLiveMs;
     private final MemoryManager memoryManager;
+    private final Ticker ticker;
     private volatile Cache<ImmutableBytesPtr, Closeable> serverCaches;
 
     public TenantCacheImpl(MemoryManager memoryManager, int maxTimeToLiveMs) {
+        this(memoryManager, maxTimeToLiveMs, Ticker.systemTicker());
+    }
+    
+    public TenantCacheImpl(MemoryManager memoryManager, int maxTimeToLiveMs, Ticker ticker)
{
         this.memoryManager = memoryManager;
         this.maxTimeToLiveMs = maxTimeToLiveMs;
+        this.ticker = ticker;
+    }
+    
+    public Ticker getTicker() {
+        return ticker;
+    }
+    
+    // For testing
+    public void cleanUp() {
+        synchronized(this) {
+            if (serverCaches != null) {
+                serverCaches.cleanUp();
+            }
+        }
     }
     
     @Override
@@ -64,6 +84,7 @@ public class TenantCacheImpl implements TenantCache {
                 if (serverCaches == null) {
                     serverCaches = CacheBuilder.newBuilder()
                         .expireAfterAccess(maxTimeToLiveMs, TimeUnit.MILLISECONDS)
+                        .ticker(getTicker())
                         .removalListener(new RemovalListener<ImmutableBytesPtr, Closeable>(){
                             @Override
                             public void onRemoval(RemovalNotification<ImmutableBytesPtr,
Closeable> notification) {
@@ -99,7 +120,12 @@ public class TenantCacheImpl implements TenantCache {
     }
     
     @Override
-    public void removeServerCache(ImmutableBytesPtr cacheId) throws SQLException {
+    public void removeServerCache(ImmutableBytesPtr cacheId) {
         getServerCaches().invalidate(cacheId);
     }
+
+    @Override
+    public void removeAllServerCache() {
+        getServerCaches().invalidateAll();
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a414c8/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index ea4a7e1..13876a0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -2667,7 +2667,10 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements
Coprocesso
         Cache<ImmutableBytesPtr, PMetaDataEntity> metaDataCache =
                 GlobalCache.getInstance(this.env).getMetaDataCache();
         metaDataCache.invalidateAll();
-        cache.clearTenantCache();
+        long unfreedBytes = cache.clearTenantCache();
+        ClearCacheResponse.Builder builder = ClearCacheResponse.newBuilder();
+        builder.setUnfreedBytes(unfreedBytes);
+        done.run(builder.build());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a414c8/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
index 9f3bdb4..bf889d5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
@@ -18,7 +18,6 @@
 package org.apache.phoenix.coprocessor;
 
 import java.io.IOException;
-import java.sql.SQLException;
 
 import org.apache.hadoop.hbase.Coprocessor;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
@@ -91,11 +90,7 @@ public class ServerCachingEndpointImpl extends ServerCachingService implements
C
       tenantId = new ImmutableBytesPtr(request.getTenantId().toByteArray());
     }
     TenantCache tenantCache = GlobalCache.getTenantCache(this.env, tenantId);
-    try {
-      tenantCache.removeServerCache(new ImmutableBytesPtr(request.getCacheId().toByteArray()));
-    } catch (SQLException e) {
-      ProtobufUtil.setControllerException(controller, new IOException(e));
-    }
+    tenantCache.removeServerCache(new ImmutableBytesPtr(request.getCacheId().toByteArray()));
     RemoveServerCacheResponse.Builder responseBuilder = RemoveServerCacheResponse.newBuilder();
     responseBuilder.setReturn(true);
     RemoveServerCacheResponse result = responseBuilder.build();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a414c8/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/MetaDataProtos.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/MetaDataProtos.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/MetaDataProtos.java
index a121d28..dc5726a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/MetaDataProtos.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/generated/MetaDataProtos.java
@@ -8767,6 +8767,16 @@ public final class MetaDataProtos {
 
   public interface ClearCacheResponseOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
+
+    // optional int64 unfreedBytes = 1;
+    /**
+     * <code>optional int64 unfreedBytes = 1;</code>
+     */
+    boolean hasUnfreedBytes();
+    /**
+     * <code>optional int64 unfreedBytes = 1;</code>
+     */
+    long getUnfreedBytes();
   }
   /**
    * Protobuf type {@code ClearCacheResponse}
@@ -8801,6 +8811,7 @@ public final class MetaDataProtos {
         com.google.protobuf.ExtensionRegistryLite extensionRegistry)
         throws com.google.protobuf.InvalidProtocolBufferException {
       initFields();
+      int mutable_bitField0_ = 0;
       com.google.protobuf.UnknownFieldSet.Builder unknownFields =
           com.google.protobuf.UnknownFieldSet.newBuilder();
       try {
@@ -8818,6 +8829,11 @@ public final class MetaDataProtos {
               }
               break;
             }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              unfreedBytes_ = input.readInt64();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -8857,7 +8873,25 @@ public final class MetaDataProtos {
       return PARSER;
     }
 
+    private int bitField0_;
+    // optional int64 unfreedBytes = 1;
+    public static final int UNFREEDBYTES_FIELD_NUMBER = 1;
+    private long unfreedBytes_;
+    /**
+     * <code>optional int64 unfreedBytes = 1;</code>
+     */
+    public boolean hasUnfreedBytes() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>optional int64 unfreedBytes = 1;</code>
+     */
+    public long getUnfreedBytes() {
+      return unfreedBytes_;
+    }
+
     private void initFields() {
+      unfreedBytes_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -8871,6 +8905,9 @@ public final class MetaDataProtos {
     public void writeTo(com.google.protobuf.CodedOutputStream output)
                         throws java.io.IOException {
       getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeInt64(1, unfreedBytes_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -8880,6 +8917,10 @@ public final class MetaDataProtos {
       if (size != -1) return size;
 
       size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(1, unfreedBytes_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -8903,6 +8944,11 @@ public final class MetaDataProtos {
       org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse other =
(org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse) obj;
 
       boolean result = true;
+      result = result && (hasUnfreedBytes() == other.hasUnfreedBytes());
+      if (hasUnfreedBytes()) {
+        result = result && (getUnfreedBytes()
+            == other.getUnfreedBytes());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -8916,6 +8962,10 @@ public final class MetaDataProtos {
       }
       int hash = 41;
       hash = (19 * hash) + getDescriptorForType().hashCode();
+      if (hasUnfreedBytes()) {
+        hash = (37 * hash) + UNFREEDBYTES_FIELD_NUMBER;
+        hash = (53 * hash) + hashLong(getUnfreedBytes());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -9025,6 +9075,8 @@ public final class MetaDataProtos {
 
       public Builder clear() {
         super.clear();
+        unfreedBytes_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000001);
         return this;
       }
 
@@ -9051,6 +9103,13 @@ public final class MetaDataProtos {
 
       public org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse buildPartial()
{
         org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse result
= new org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.unfreedBytes_ = unfreedBytes_;
+        result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
       }
@@ -9066,6 +9125,9 @@ public final class MetaDataProtos {
 
       public Builder mergeFrom(org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse
other) {
         if (other == org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse.getDefaultInstance())
return this;
+        if (other.hasUnfreedBytes()) {
+          setUnfreedBytes(other.getUnfreedBytes());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -9091,6 +9153,40 @@ public final class MetaDataProtos {
         }
         return this;
       }
+      private int bitField0_;
+
+      // optional int64 unfreedBytes = 1;
+      private long unfreedBytes_ ;
+      /**
+       * <code>optional int64 unfreedBytes = 1;</code>
+       */
+      public boolean hasUnfreedBytes() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      /**
+       * <code>optional int64 unfreedBytes = 1;</code>
+       */
+      public long getUnfreedBytes() {
+        return unfreedBytes_;
+      }
+      /**
+       * <code>optional int64 unfreedBytes = 1;</code>
+       */
+      public Builder setUnfreedBytes(long value) {
+        bitField0_ |= 0x00000001;
+        unfreedBytes_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int64 unfreedBytes = 1;</code>
+       */
+      public Builder clearUnfreedBytes() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        unfreedBytes_ = 0L;
+        onChanged();
+        return this;
+      }
 
       // @@protoc_insertion_point(builder_scope:ClearCacheResponse)
     }
@@ -12285,42 +12381,43 @@ public final class MetaDataProtos {
       "ateIndexStateRequest\022\036\n\026tableMetadataMut" +
       "ations\030\001 \003(\014\022\025\n\rclientVersion\030\002 \001(\005\"*\n\021C",
       "learCacheRequest\022\025\n\rclientVersion\030\001 \001(\005\"" +
-      "\024\n\022ClearCacheResponse\"*\n\021GetVersionReque" +
-      "st\022\025\n\rclientVersion\030\001 \001(\005\"%\n\022GetVersionR" +
-      "esponse\022\017\n\007version\030\001 \002(\003\"\205\001\n\032ClearTableF" +
-      "romCacheRequest\022\020\n\010tenantId\030\001 \002(\014\022\022\n\nsch" +
-      "emaName\030\002 \002(\014\022\021\n\ttableName\030\003 \002(\014\022\027\n\017clie"
+
-      "ntTimestamp\030\004 \002(\003\022\025\n\rclientVersion\030\005 \001(\005" +
-      "\"\035\n\033ClearTableFromCacheResponse*\365\002\n\014Muta" +
-      "tionCode\022\030\n\024TABLE_ALREADY_EXISTS\020\000\022\023\n\017TA" +
-      "BLE_NOT_FOUND\020\001\022\024\n\020COLUMN_NOT_FOUND\020\002\022\031\n",
-      "\025COLUMN_ALREADY_EXISTS\020\003\022\035\n\031CONCURRENT_T" +
-      "ABLE_MUTATION\020\004\022\027\n\023TABLE_NOT_IN_REGION\020\005" +
-      "\022\025\n\021NEWER_TABLE_FOUND\020\006\022\034\n\030UNALLOWED_TAB" +
-      "LE_MUTATION\020\007\022\021\n\rNO_PK_COLUMNS\020\010\022\032\n\026PARE" +
-      "NT_TABLE_NOT_FOUND\020\t\022\033\n\027FUNCTION_ALREADY" +
-      "_EXISTS\020\n\022\026\n\022FUNCTION_NOT_FOUND\020\013\022\030\n\024NEW" +
-      "ER_FUNCTION_FOUND\020\014\022\032\n\026FUNCTION_NOT_IN_R" +
-      "EGION\020\r2\304\005\n\017MetaDataService\022/\n\010getTable\022" +
-      "\020.GetTableRequest\032\021.MetaDataResponse\0227\n\014" +
-      "getFunctions\022\024.GetFunctionsRequest\032\021.Met",
-      "aDataResponse\0225\n\013createTable\022\023.CreateTab" +
-      "leRequest\032\021.MetaDataResponse\022;\n\016createFu" +
-      "nction\022\026.CreateFunctionRequest\032\021.MetaDat" +
-      "aResponse\0221\n\tdropTable\022\021.DropTableReques" +
-      "t\032\021.MetaDataResponse\0227\n\014dropFunction\022\024.D" +
-      "ropFunctionRequest\032\021.MetaDataResponse\0221\n" +
-      "\taddColumn\022\021.AddColumnRequest\032\021.MetaData" +
-      "Response\0223\n\ndropColumn\022\022.DropColumnReque" +
-      "st\032\021.MetaDataResponse\022?\n\020updateIndexStat" +
-      "e\022\030.UpdateIndexStateRequest\032\021.MetaDataRe",
-      "sponse\0225\n\nclearCache\022\022.ClearCacheRequest" +
-      "\032\023.ClearCacheResponse\0225\n\ngetVersion\022\022.Ge" +
-      "tVersionRequest\032\023.GetVersionResponse\022P\n\023" +
-      "clearTableFromCache\022\033.ClearTableFromCach" +
-      "eRequest\032\034.ClearTableFromCacheResponseBB" +
-      "\n(org.apache.phoenix.coprocessor.generat" +
-      "edB\016MetaDataProtosH\001\210\001\001\240\001\001"
+      "*\n\022ClearCacheResponse\022\024\n\014unfreedBytes\030\001 " +
+      "\001(\003\"*\n\021GetVersionRequest\022\025\n\rclientVersio" +
+      "n\030\001 \001(\005\"%\n\022GetVersionResponse\022\017\n\007version" +
+      "\030\001 \002(\003\"\205\001\n\032ClearTableFromCacheRequest\022\020\n" +
+      "\010tenantId\030\001 \002(\014\022\022\n\nschemaName\030\002 \002(\014\022\021\n\tt"
+
+      "ableName\030\003 \002(\014\022\027\n\017clientTimestamp\030\004 \002(\003\022" +
+      "\025\n\rclientVersion\030\005 \001(\005\"\035\n\033ClearTableFrom" +
+      "CacheResponse*\365\002\n\014MutationCode\022\030\n\024TABLE_" +
+      "ALREADY_EXISTS\020\000\022\023\n\017TABLE_NOT_FOUND\020\001\022\024\n",
+      "\020COLUMN_NOT_FOUND\020\002\022\031\n\025COLUMN_ALREADY_EX" +
+      "ISTS\020\003\022\035\n\031CONCURRENT_TABLE_MUTATION\020\004\022\027\n" +
+      "\023TABLE_NOT_IN_REGION\020\005\022\025\n\021NEWER_TABLE_FO" +
+      "UND\020\006\022\034\n\030UNALLOWED_TABLE_MUTATION\020\007\022\021\n\rN" +
+      "O_PK_COLUMNS\020\010\022\032\n\026PARENT_TABLE_NOT_FOUND" +
+      "\020\t\022\033\n\027FUNCTION_ALREADY_EXISTS\020\n\022\026\n\022FUNCT" +
+      "ION_NOT_FOUND\020\013\022\030\n\024NEWER_FUNCTION_FOUND\020" +
+      "\014\022\032\n\026FUNCTION_NOT_IN_REGION\020\r2\304\005\n\017MetaDa" +
+      "taService\022/\n\010getTable\022\020.GetTableRequest\032" +
+      "\021.MetaDataResponse\0227\n\014getFunctions\022\024.Get",
+      "FunctionsRequest\032\021.MetaDataResponse\0225\n\013c" +
+      "reateTable\022\023.CreateTableRequest\032\021.MetaDa" +
+      "taResponse\022;\n\016createFunction\022\026.CreateFun" +
+      "ctionRequest\032\021.MetaDataResponse\0221\n\tdropT" +
+      "able\022\021.DropTableRequest\032\021.MetaDataRespon" +
+      "se\0227\n\014dropFunction\022\024.DropFunctionRequest" +
+      "\032\021.MetaDataResponse\0221\n\taddColumn\022\021.AddCo" +
+      "lumnRequest\032\021.MetaDataResponse\0223\n\ndropCo" +
+      "lumn\022\022.DropColumnRequest\032\021.MetaDataRespo" +
+      "nse\022?\n\020updateIndexState\022\030.UpdateIndexSta",
+      "teRequest\032\021.MetaDataResponse\0225\n\nclearCac" +
+      "he\022\022.ClearCacheRequest\032\023.ClearCacheRespo" +
+      "nse\0225\n\ngetVersion\022\022.GetVersionRequest\032\023." +
+      "GetVersionResponse\022P\n\023clearTableFromCach" +
+      "e\022\033.ClearTableFromCacheRequest\032\034.ClearTa" +
+      "bleFromCacheResponseBB\n(org.apache.phoen" +
+      "ix.coprocessor.generatedB\016MetaDataProtos" +
+      "H\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -12398,7 +12495,7 @@ public final class MetaDataProtos {
           internal_static_ClearCacheResponse_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_ClearCacheResponse_descriptor,
-              new java.lang.String[] { });
+              new java.lang.String[] { "UnfreedBytes", });
           internal_static_GetVersionRequest_descriptor =
             getDescriptor().getMessageTypes().get(12);
           internal_static_GetVersionRequest_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a414c8/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
b/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
index e70b35f..74f2c5f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/memory/GlobalMemoryManager.java
@@ -73,6 +73,7 @@ public class GlobalMemoryManager implements MemoryManager {
         synchronized(sync) {
             while (usedMemoryBytes + minBytes > maxMemoryBytes) { // Only wait if minBytes
not available
                 try {
+                    logger.debug("Waiting for " + (usedMemoryBytes + minBytes - maxMemoryBytes)
+ " bytes to be free");
                     long remainingWaitTimeMs = maxWaitMs - (System.currentTimeMillis() -
startTimeMs);
                     if (remainingWaitTimeMs <= 0) { // Ran out of time waiting for some
memory to get freed up
                         throw new InsufficientMemoryException("Requested memory of " + minBytes
+ " bytes could not be allocated. Using memory of " + usedMemoryBytes + " bytes from global
pool of " + maxMemoryBytes + " bytes after waiting for " + maxWaitMs + "ms.");
@@ -109,12 +110,15 @@ public class GlobalMemoryManager implements MemoryManager {
 
     private class GlobalMemoryChunk implements MemoryChunk {
         private volatile long size;
+        //private volatile String stack;
 
         private GlobalMemoryChunk(long size) {
             if (size < 0) {
                 throw new IllegalStateException("Size of memory chunk must be greater than
zero, but instead is " + size);
             }
             this.size = size;
+            // Useful for debugging where a piece of memory was allocated
+            // this.stack = ExceptionUtils.getStackTrace(new Throwable());
         }
 
         @Override
@@ -138,6 +142,7 @@ public class GlobalMemoryManager implements MemoryManager {
                 } else {
                     allocateBytes(nAdditionalBytes, nAdditionalBytes);
                     size = nBytes;
+                    //this.stack = ExceptionUtils.getStackTrace(new Throwable());
                 }
             }
         }
@@ -150,6 +155,7 @@ public class GlobalMemoryManager implements MemoryManager {
             try {
                 if (size > 0) {
                     logger.warn("Orphaned chunk of " + size + " bytes found during finalize");
+                    //logger.warn("Orphaned chunk of " + size + " bytes found during finalize
allocated here:\n" + stack);
                 }
                 freeMemory();
             } finally {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a414c8/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index fc41706..d839fa3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -116,7 +116,7 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated
 
     public PTableStats getTableStats(byte[] physicalName, long clientTimeStamp) throws SQLException;
     
-    public void clearCache() throws SQLException;
+    public long clearCache() throws SQLException;
     public int getSequenceSaltBuckets();
 
     TransactionSystemClient getTransactionSystemClient();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a414c8/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 1ea7985..29b9756 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -2505,27 +2505,35 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
      * @throws SQLException
      */
     @Override
-    public void clearCache() throws SQLException {
+    public long clearCache() throws SQLException {
         try {
             SQLException sqlE = null;
             HTableInterface htable = this.getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
             try {
-                htable.coprocessorService(MetaDataService.class, HConstants.EMPTY_START_ROW,
-                        HConstants.EMPTY_END_ROW, new Batch.Call<MetaDataService, ClearCacheResponse>()
{
-                    @Override
-                    public ClearCacheResponse call(MetaDataService instance) throws IOException
{
-                        ServerRpcController controller = new ServerRpcController();
-                        BlockingRpcCallback<ClearCacheResponse> rpcCallback =
-                                new BlockingRpcCallback<ClearCacheResponse>();
-                        ClearCacheRequest.Builder builder = ClearCacheRequest.newBuilder();
-                        builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION,
PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
-                        instance.clearCache(controller, builder.build(), rpcCallback);
-                        if(controller.getFailedOn() != null) {
-                            throw controller.getFailedOn();
+                final Map<byte[], Long> results =
+                    htable.coprocessorService(MetaDataService.class, HConstants.EMPTY_START_ROW,
+                            HConstants.EMPTY_END_ROW, new Batch.Call<MetaDataService,
Long>() {
+                        @Override
+                        public Long call(MetaDataService instance) throws IOException {
+                            ServerRpcController controller = new ServerRpcController();
+                            BlockingRpcCallback<ClearCacheResponse> rpcCallback =
+                                    new BlockingRpcCallback<ClearCacheResponse>();
+                            ClearCacheRequest.Builder builder = ClearCacheRequest.newBuilder();
+                            builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION,
PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER));
+                            instance.clearCache(controller, builder.build(), rpcCallback);
+                            if(controller.getFailedOn() != null) {
+                                throw controller.getFailedOn();
+                            }
+                            return rpcCallback.get().getUnfreedBytes();
                         }
-                        return rpcCallback.get();
+                      });
+                long unfreedBytes = 0;
+                for (Map.Entry<byte[],Long> result : results.entrySet()) {
+                    if (result.getValue() != null) {
+                        unfreedBytes += result.getValue();
                     }
-                  });
+                }
+                return unfreedBytes;
             } catch (IOException e) {
                 throw ServerUtil.parseServerException(e);
             } catch (Throwable e) {
@@ -2549,6 +2557,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
         } catch (Exception e) {
             throw new SQLException(ServerUtil.parseServerException(e));
         }
+        return 0;
     }
 
     private void flushTable(byte[] tableName) throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a414c8/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index e983a4c..f1ab319 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -83,13 +83,13 @@ import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.SequenceUtil;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 import co.cask.tephra.TransactionManager;
 import co.cask.tephra.TransactionSystemClient;
 import co.cask.tephra.inmemory.InMemoryTxSystemClient;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
 
 /**
  *
@@ -512,7 +512,8 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices
imple
     }
 
     @Override
-    public void clearCache() throws SQLException {
+    public long clearCache() throws SQLException {
+        return 0;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a414c8/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index ca662be..b56ff85 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -251,8 +251,8 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices
imple
 
 
     @Override
-    public void clearCache() throws SQLException {
-        getDelegate().clearCache();
+    public long clearCache() throws SQLException {
+        return getDelegate().clearCache();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a414c8/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java b/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java
new file mode 100644
index 0000000..ac2a850
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.cache;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.Closeable;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.sql.SQLException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.memory.GlobalMemoryManager;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.util.ByteUtil;
+import org.junit.Test;
+
+import com.google.common.base.Ticker;
+
+public class TenantCacheTest {
+
+    @Test
+    public void testInvalidateClosesMemoryChunk() throws SQLException {
+        int maxServerCacheTimeToLive = 10000;
+        long maxBytes = 1000;
+        int maxWaitMs = 1000;
+        GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes, maxWaitMs);
+        TenantCacheImpl newTenantCache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive);
+        ImmutableBytesPtr cacheId = new ImmutableBytesPtr(Bytes.toBytes("a"));
+        ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("a"));
+        newTenantCache.addServerCache(cacheId, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory);
+        assertEquals(maxBytes-1, memoryManager.getAvailableMemory());
+        newTenantCache.removeServerCache(cacheId);
+        assertEquals(maxBytes, memoryManager.getAvailableMemory());
+    }
+    
+    @Test
+    public void testTimeoutClosesMemoryChunk() throws Exception {
+        int maxServerCacheTimeToLive = 10;
+        long maxBytes = 1000;
+        int maxWaitMs = 10;
+        GlobalMemoryManager memoryManager = new GlobalMemoryManager(maxBytes, maxWaitMs);
+        ManualTicker ticker = new ManualTicker();
+        TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive,
ticker);
+        ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes("a"));
+        ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("a"));
+        cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory);
+        assertEquals(maxBytes-1, memoryManager.getAvailableMemory());
+        ticker.time += (maxServerCacheTimeToLive + 1) * 1000000;
+        cache.cleanUp();
+        assertEquals(maxBytes, memoryManager.getAvailableMemory());
+    }
+    
+    public static class ManualTicker extends Ticker {
+        public long time = 0;
+        
+        @Override
+        public long read() {
+            return time;
+        }
+        
+    }
+    
+    public static ServerCacheFactory cacheFactory = new ServerCacheFactory() {
+
+        @Override
+        public void readFields(DataInput arg0) throws IOException {
+        }
+
+        @Override
+        public void write(DataOutput arg0) throws IOException {
+        }
+
+        @Override
+        public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk
chunk)
+                throws SQLException {
+            return chunk;
+        }
+        
+    };
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/31a414c8/phoenix-protocol/src/main/MetaDataService.proto
----------------------------------------------------------------------
diff --git a/phoenix-protocol/src/main/MetaDataService.proto b/phoenix-protocol/src/main/MetaDataService.proto
index c265158..c631512 100644
--- a/phoenix-protocol/src/main/MetaDataService.proto
+++ b/phoenix-protocol/src/main/MetaDataService.proto
@@ -119,6 +119,7 @@ message ClearCacheRequest {
 }
 
 message ClearCacheResponse {
+  optional int64 unfreedBytes = 1;
 }
 
 message GetVersionRequest {


Mime
View raw message