phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject [2/4] phoenix git commit: PHOENIX-4224 Automatic resending cache for HashJoin doesn't work when cache has expired on server side
Date Wed, 27 Sep 2017 19:16:50 GMT
PHOENIX-4224 Automatic resending cache for HashJoin doesn't work when cache has expired on
server side


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/53016519
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/53016519
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/53016519

Branch: refs/heads/4.x-HBase-1.2
Commit: 53016519df73606f49433470768c5037b69ea185
Parents: d714afc
Author: Sergey Soldatov <ssa@apache.org>
Authored: Mon Sep 25 19:57:49 2017 -0700
Committer: Sergey Soldatov <ssa@apache.org>
Committed: Wed Sep 27 12:09:08 2017 -0700

----------------------------------------------------------------------
 .../apache/phoenix/end2end/HashJoinCacheIT.java | 32 +++++++++++++-
 .../apache/phoenix/cache/ServerCacheClient.java | 46 ++++++++++++++++----
 2 files changed, 68 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/53016519/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
index 76f45e2..cebb9ad 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/HashJoinCacheIT.java
@@ -19,9 +19,13 @@ package org.apache.phoenix.end2end;
 
 import java.io.IOException;
 import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Properties;
 import java.util.Random;
 
 import org.apache.hadoop.hbase.client.Scan;
@@ -31,9 +35,12 @@ import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.join.HashJoinInfo;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.Test;
@@ -43,6 +50,9 @@ import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.collect.Lists;
 
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.fail;
+
 @RunWith(Parameterized.class)
 public class HashJoinCacheIT extends HashJoinIT {
     
@@ -426,7 +436,27 @@ public class HashJoinCacheIT extends HashJoinIT {
 	public void testUpsertWithJoin() throws Exception {
 		// TODO: We will enable this test once PHOENIX-3163
 	}
-    
+
+    @Test
+    public void testExpiredCache() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB, "1");
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        String tableName1 = getTableName(conn, JOIN_SUPPLIER_TABLE_FULL_NAME);
+        String tableName2 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
+        String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name
FROM " +
+                tableName1 + " supp RIGHT JOIN " + tableName2 +
+                " item ON item.\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\"";
+        try {
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            rs.next();
+            fail("HashJoinCacheNotFoundException was not thrown or incorrectly handled");
+        } catch (HashJoinCacheNotFoundException e) {
+            //Expected exception
+        }
+    }
+
     public static class InvalidateHashCache extends SimpleRegionObserver {
         public static Random rand= new Random();
         public static List<ImmutableBytesPtr> lastRemovedJoinIds=new ArrayList<ImmutableBytesPtr>();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/53016519/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
index ce46a3e..28a42fa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerCacheClient.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -121,16 +122,24 @@ public class ServerCacheClient {
     public class ServerCache implements SQLCloseable {
         private final int size;
         private final byte[] id;
-        private final Set<HRegionLocation> servers;
+        private final Map<HRegionLocation, Long> servers;
         private ImmutableBytesWritable cachePtr;
         private MemoryChunk chunk;
         private File outputFile;
+        private long maxServerCacheTTL;
         
         
         public ServerCache(byte[] id, Set<HRegionLocation> servers, ImmutableBytesWritable
cachePtr,
                 ConnectionQueryServices services, boolean storeCacheOnClient) throws IOException
{
+            maxServerCacheTTL = services.getProps().getInt(
+                    QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS_ATTRIB,
+                    QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
             this.id = id;
-            this.servers = new HashSet<HRegionLocation>(servers);
+            this.servers = new HashMap();
+            long currentTime = System.currentTimeMillis();
+            for(HRegionLocation loc : servers) {
+                this.servers.put(loc, currentTime);
+            }
             this.size =  cachePtr.getLength();
             if (storeCacheOnClient) {
                 try {
@@ -171,10 +180,28 @@ public class ServerCacheClient {
         public byte[] getId() {
             return id;
         }
-        
-		public boolean addServer(HRegionLocation loc) {
-			return this.servers.add(loc);
-		}
+
+        public boolean addServer(HRegionLocation loc) {
+            if(this.servers.containsKey(loc)) {
+                return false;
+            } else {
+                this.servers.put(loc, System.currentTimeMillis());
+                return true;
+            }
+        }
+
+        public boolean isExpired(HRegionLocation loc) {
+            if(this.servers.containsKey(loc)) {
+                Long time = this.servers.get(loc);
+                if(System.currentTimeMillis() - time > maxServerCacheTTL)
+                    return true; // cache was send more than maxTTL ms ago, expecting that
it's expired
+            } else {
+                return false; // should be on server yet.
+            }
+            return false; // Unknown region location. Need to send the cache.
+        }
+
+
         
         /**
          * Call to free up cache on region servers when no longer needed
@@ -182,7 +209,7 @@ public class ServerCacheClient {
         @Override
         public void close() throws SQLException {
             try{
-                removeServerCache(this, servers);
+                removeServerCache(this, servers.keySet());
             }finally{
                 cachePtr = null;
                 if (chunk != null) {
@@ -305,8 +332,6 @@ public class ServerCacheClient {
     
     /**
      * Remove the cached table from all region servers
-     * @param cacheId unique identifier for the hash join (returned from {@link #addHashCache(HTable,
Scan, Set)})
-     * @param servers list of servers upon which table was cached (filled in by {@link #addHashCache(HTable,
Scan, Set)})
      * @throws SQLException
      * @throws IllegalStateException if hashed table cannot be removed on any region server
on which it was added
      */
@@ -421,6 +446,9 @@ public class ServerCacheClient {
             byte[] tableName = pTable.getPhysicalName().getBytes();
             table = services.getTable(tableName);
             HRegionLocation tableRegionLocation = services.getTableRegionLocation(tableName,
startkeyOfRegion);
+            if(cache.isExpired(tableRegionLocation)) {
+                return false;
+            }
 			if (cache.addServer(tableRegionLocation) || services.getProps().getBoolean(HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER,false))
{
 				success = addServerCache(table, startkeyOfRegion, pTable, cacheId, cache.getCachePtr(),
cacheFactory,
 						txState);


Mime
View raw message