phoenix-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ortutay <...@git.apache.org>
Subject [GitHub] phoenix pull request #298: PHOENIX-4666 Persistent subquery cache for hash j...
Date Wed, 16 May 2018 21:23:42 GMT
Github user ortutay commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/298#discussion_r188777276
  
    --- Diff: phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java ---
    @@ -77,57 +153,164 @@ public MemoryManager getMemoryManager() {
             return memoryManager;
         }
     
    -    private Cache<ImmutableBytesPtr,Closeable> getServerCaches() {
    +    private Cache<ImmutableBytesPtr,CacheEntry> getServerCaches() {
             /* Delay creation of this map until it's needed */
             if (serverCaches == null) {
                 synchronized(this) {
                     if (serverCaches == null) {
    -                    serverCaches = CacheBuilder.newBuilder()
    -                        .expireAfterAccess(maxTimeToLiveMs, TimeUnit.MILLISECONDS)
    -                        .ticker(getTicker())
    -                        .removalListener(new RemovalListener<ImmutableBytesPtr, Closeable>(){
    -                            @Override
    -                            public void onRemoval(RemovalNotification<ImmutableBytesPtr,
Closeable> notification) {
    -                                Closeables.closeAllQuietly(Collections.singletonList(notification.getValue()));
    -                            }
    -                        })
    -                        .build();
    +                    serverCaches = buildCache(maxTimeToLiveMs, false);
                     }
                 }
             }
             return serverCaches;
         }
    +
    +    private Cache<ImmutableBytesPtr,CacheEntry> getPersistentServerCaches() {
    +        /* Delay creation of this map until it's needed */
    +        if (persistentServerCaches == null) {
    +            synchronized(this) {
    +                if (persistentServerCaches == null) {
    +                    persistentServerCaches = buildCache(maxPersistenceTimeToLiveMs, true);
    +                }
    +            }
    +        }
    +        return persistentServerCaches;
    +    }
    +
    +    private Cache<ImmutableBytesPtr, CacheEntry> buildCache(final int ttl, final
boolean isPersistent) {
    +        return CacheBuilder.newBuilder()
    +            .expireAfterAccess(ttl, TimeUnit.MILLISECONDS)
    +            .ticker(getTicker())
    +            .removalListener(new RemovalListener<ImmutableBytesPtr, CacheEntry>(){
    +                @Override
    +                public void onRemoval(RemovalNotification<ImmutableBytesPtr, CacheEntry>
notification) {
    +                	if (isPersistent || !notification.getValue().getUsePersistentCache())
{
    +                        Closeables.closeAllQuietly(Collections.singletonList(notification.getValue()));
    +                	}
    +                }
    +            })
    +            .build();
    +    }
         
    -    @Override
    +    private void evictInactiveEntries(long bytesNeeded) {
    +        CacheEntry[] entries = getPersistentServerCaches().asMap().values().toArray(new
CacheEntry[]{});
    +        Arrays.sort(entries);
    +        long available = this.getMemoryManager().getAvailableMemory();
    +        for (int i = 0; i < entries.length && available < bytesNeeded;
i++) {
    +            CacheEntry entry = entries[i];
    +            if (!entry.isLive()) {
    +            	getServerCaches().invalidate(entry.getCacheId());
    +            	getPersistentServerCaches().invalidate(entry.getCacheId());
    +                available = this.getMemoryManager().getAvailableMemory();
    +            }
    +        }
    +    }
    +
    +    private CacheEntry maybeGet(ImmutableBytesPtr cacheId) {
    +        maybePromote(cacheId);
    +        CacheEntry entry = getServerCaches().getIfPresent(cacheId);
    +        return entry;
    +    }
    +
    +    private void maybePromote(ImmutableBytesPtr cacheId) {
    +        CacheEntry entry = getPersistentServerCaches().getIfPresent(cacheId);
    +        if (entry == null) {
    +            return;
    +        }
    +        getServerCaches().put(cacheId, entry);
    +    }
    +    
    +    private void maybeDemote(ImmutableBytesPtr cacheId) {
    +        CacheEntry entry = getServerCaches().getIfPresent(cacheId);
    +        if (entry == null) {
    +            return;
    +        }
    +        entry.decrementLiveQueryCount();
    +        if (!entry.isLive()) {
    +            getServerCaches().invalidate(cacheId);
    +        }
    +    }
    +    
    +    public void debugPrintCaches() {
    +		System.out.println("Live cache:" + getServerCaches());
    +		for (ImmutableBytesPtr key : getServerCaches().asMap().keySet()) {
    +			System.out.println("- " + Hex.encodeHexString(key.get()) +
    +					" -> " + getServerCaches().getIfPresent(key).size +
    +					" lq:" + getServerCaches().getIfPresent(key).liveQueriesCount +
    +					" " + Hex.encodeHexString(getServerCaches().getIfPresent(key).cachePtr.get()));
    +		}
    +		System.out.println("Persistent cache:" + getPersistentServerCaches());
    +		for (ImmutableBytesPtr key : getPersistentServerCaches().asMap().keySet()) {
    +			System.out.println("- " + Hex.encodeHexString(key.get()) +
    +					" -> " + getPersistentServerCaches().getIfPresent(key).size +
    +					" " + Hex.encodeHexString(getPersistentServerCaches().getIfPresent(key).cachePtr.get()));
    +		}
    +	}
    +
    +	@Override
         public Closeable getServerCache(ImmutableBytesPtr cacheId) {
             getServerCaches().cleanUp();
    -        return getServerCaches().getIfPresent(cacheId);
    +        CacheEntry entry = maybeGet(cacheId);
    +        if (entry == null) {
    +            return null;
    +        }
    +        return entry.closeable;
         }
         
         @Override
    -    public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable
cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean useProtoForIndexMaintainer,
int clientVersion) throws SQLException {
    +    public boolean checkServerCache(ImmutableBytesPtr cacheId, boolean shouldIncrementLiveQueryCount)
{
             getServerCaches().cleanUp();
    -        MemoryChunk chunk = this.getMemoryManager().allocate(cachePtr.getLength() + txState.length);
    +        CacheEntry entry = maybeGet(cacheId);
    +        if (entry != null && shouldIncrementLiveQueryCount) {
    +        	entry.incrementLiveQueryCount();
    +        }
    +		return entry != null;
    +    }
    +    
    +    @Override
    +    public Closeable addServerCache(ImmutableBytesPtr cacheId, ImmutableBytesWritable
cachePtr, byte[] txState, ServerCacheFactory cacheFactory, boolean useProtoForIndexMaintainer,
boolean usePersistentCache, int clientVersion) throws SQLException {
    +        getServerCaches().cleanUp();
    +        long available = this.getMemoryManager().getAvailableMemory();
    +        int size = cachePtr.getLength() + txState.length;
    +        if (size > available) {
    +            evictInactiveEntries(Math.max(size - available + EVICTION_MARGIN_BYTES, EVICTION_MARGIN_BYTES));
    +        }
    +        MemoryChunk chunk = this.getMemoryManager().allocate(size);
             boolean success = false;
             try {
    -            Closeable element = cacheFactory.newCache(cachePtr, txState, chunk, useProtoForIndexMaintainer,
clientVersion);
    -            getServerCaches().put(cacheId, element);
    +            CacheEntry entry;
    +            synchronized(this) {
    +                entry = maybeGet(cacheId);
    +                if (entry == null) {
    +                    entry = new CacheEntry(
    +                            cacheId, cachePtr, cacheFactory, txState, chunk,
    +                            usePersistentCache, useProtoForIndexMaintainer,
    +                            clientVersion);
    +                    getServerCaches().put(cacheId, entry);
    +                    available = this.getMemoryManager().getAvailableMemory();
    +                    if (usePersistentCache) {
    +                        getPersistentServerCaches().put(cacheId, entry);
    +                    }
    +                }
    +                entry.incrementLiveQueryCount();
    +            }
                 success = true;
    -            return element;
    +            return entry;
             } finally {
                 if (!success) {
                     Closeables.closeAllQuietly(Collections.singletonList(chunk));
                 }
    -        }           
    +        }
         }
    -    
    +
         @Override
    -    public void removeServerCache(ImmutableBytesPtr cacheId) {
    -        getServerCaches().invalidate(cacheId);
    +    synchronized public void removeServerCache(ImmutableBytesPtr cacheId) {
    --- End diff --
    
    Yea good point, removed `maybeDemote`


---

Mime
View raw message