phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [2/7] phoenix git commit: PHOENIX-4070 Delete row should mask upserts at same timestamp
Date Tue, 08 Aug 2017 06:06:13 GMT
http://git-wip-us.apache.org/repos/asf/phoenix/blob/8bc58328/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index b81b904..18e543c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -64,6 +64,7 @@ import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.compile.WhereCompiler;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
+import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService;
 import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest;
@@ -113,8 +114,8 @@ import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.transaction.PhoenixTransactionContext;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.protobuf.ServiceException;
 
 public class IndexUtil {
     public static final String INDEX_COLUMN_NAME_SEP = ":";
@@ -308,7 +309,7 @@ public class IndexUtil {
                     	}
         
                         @Override
-                        public ImmutableBytesWritable getLatestValue(ColumnReference ref)
{
+                        public ImmutableBytesWritable getLatestValue(ColumnReference ref,
long ts) {
                             // Always return null for our empty key value, as this will cause
the index
                             // maintainer to always treat this Put as a new row.
                             if (isEmptyKeyValue(table, ref)) {
@@ -712,8 +713,8 @@ public class IndexUtil {
             HConstants.NO_NONCE, HConstants.NO_NONCE);
     }
 
-    public static MetaDataMutationResult setIndexDisableTimeStamp(String indexTableName,
long minTimeStamp,
-            HTableInterface metaTable, PIndexState newState) throws ServiceException, Throwable
{
+    public static MetaDataMutationResult updateIndexState(String indexTableName, long minTimeStamp,
+            HTableInterface metaTable, PIndexState newState) throws Throwable {
         byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName);
         // Mimic the Put that gets generated by the client on an update of the index state
         Put put = new Put(indexTableKey);
@@ -786,5 +787,41 @@ public class IndexUtil {
     public static boolean isLocalIndexFamily(String family) {
         return family.indexOf(LOCAL_INDEX_COLUMN_FAMILY_PREFIX) != -1;
     }
+
+    public static void updateIndexState(PhoenixConnection conn, String indexTableName,
+    		PIndexState newState, Long indexDisableTimestamp) throws SQLException {
+        if (newState == PIndexState.ACTIVE) {
+            Preconditions.checkArgument(indexDisableTimestamp == 0,
+                "Index disable timestamp has to be 0 when marking an index as active");
+        }
+    	byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName);
+    	String schemaName = SchemaUtil.getSchemaNameFromFullName(indexTableName);
+    	String indexName = SchemaUtil.getTableNameFromFullName(indexTableName);
+    	// Mimic the Put that gets generated by the client on an update of the
+    	// index state
+    	Put put = new Put(indexTableKey);
+    	put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES,
+    			newState.getSerializedBytes());
+        if (indexDisableTimestamp != null) {
+            put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES,
+                PLong.INSTANCE.toBytes(indexDisableTimestamp));
+        }
+        if (newState == PIndexState.ACTIVE) {
+            put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES,
+                PhoenixDatabaseMetaData.ASYNC_REBUILD_TIMESTAMP_BYTES, PLong.INSTANCE.toBytes(0));
+        }
+    	final List<Mutation> tableMetadata = Collections.<Mutation> singletonList(put);
+    	MetaDataMutationResult result = conn.getQueryServices().updateIndexState(tableMetadata,
null);
+    	MutationCode code = result.getMutationCode();
+    	if (code == MutationCode.TABLE_NOT_FOUND) {
+    		throw new TableNotFoundException(schemaName, indexName);
+    	}
+    	if (code == MutationCode.UNALLOWED_TABLE_MUTATION) {
+    		throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_INDEX_STATE_TRANSITION)
+    				.setMessage("indexState=" + newState).setSchemaName(schemaName)
+    				.setTableName(indexName).build().buildException();
+    	}
+    }
     
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8bc58328/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
index 941493e..dd027b0 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java
@@ -37,6 +37,7 @@ import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
 import org.apache.phoenix.hbase.index.covered.data.LocalTable;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.scanner.Scanner;
+import org.apache.phoenix.hbase.index.scanner.ScannerBuilder.CoveredDeleteScanner;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
@@ -102,7 +103,7 @@ public class TestLocalTableState {
     ColumnReference col = new ColumnReference(fam, qual);
     table.setCurrentTimestamp(ts);
     //check that our value still shows up first on scan, even though this is a lazy load
-    Pair<Scanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col),
false, false, indexMetaData);
+    Pair<CoveredDeleteScanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col),
false, false, indexMetaData);
     Scanner s = p.getFirst();
     assertEquals("Didn't get the pending mutation's value first", m.get(fam, qual).get(0),
s.next());
   }
@@ -185,7 +186,7 @@ public class TestLocalTableState {
     ColumnReference col = new ColumnReference(fam, qual);
     table.setCurrentTimestamp(ts);
     //check that our value still shows up first on scan, even though this is a lazy load
-    Pair<Scanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col),
false, false, indexMetaData);
+    Pair<CoveredDeleteScanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col),
false, false, indexMetaData);
     Scanner s = p.getFirst();
     assertEquals("Didn't get the pending mutation's value first", m.get(fam, qual).get(0),
s.next());
   }
@@ -229,7 +230,7 @@ public class TestLocalTableState {
     ColumnReference col = new ColumnReference(fam, qual);
     table.setCurrentTimestamp(ts);
     // check that the value is there
-    Pair<Scanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col),
false, false, indexMetaData);
+    Pair<CoveredDeleteScanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col),
false, false, indexMetaData);
     Scanner s = p.getFirst();
     assertEquals("Didn't get the pending mutation's value first", kv, s.next());
 
@@ -273,7 +274,7 @@ public class TestLocalTableState {
     ColumnReference col = new ColumnReference(fam, qual);
     table.setCurrentTimestamp(ts);
     // check that the value is there
-    Pair<Scanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col),
false, false, indexMetaData);
+    Pair<CoveredDeleteScanner, IndexUpdate> p = table.getIndexedColumnsTableState(Arrays.asList(col),
false, false, indexMetaData);
     Scanner s = p.getFirst();
     // make sure it read the table the one time
     assertEquals("Didn't get the stored keyvalue!", storedKv, s.next());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8bc58328/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
index 6aabacf..dbf67fc 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
@@ -34,6 +34,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
@@ -77,7 +78,7 @@ public class IndexMaintainerTest  extends BaseConnectionlessQueryTest {
         return new ValueGetter() {
 
             @Override
-            public ImmutableBytesWritable getLatestValue(ColumnReference ref) {
+            public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) {
                 return new ImmutableBytesPtr(valueMap.get(ref));
             }
 
@@ -140,7 +141,7 @@ public class IndexMaintainerTest  extends BaseConnectionlessQueryTest
{
             Mutation indexMutation = indexMutations.get(0);
             ImmutableBytesWritable indexKeyPtr = new ImmutableBytesWritable(indexMutation.getRow());
             ptr.set(rowKeyPtr.get(), rowKeyPtr.getOffset(), rowKeyPtr.getLength());
-            byte[] mutablelndexRowKey = im1.buildRowKey(valueGetter, ptr, null, null);
+            byte[] mutablelndexRowKey = im1.buildRowKey(valueGetter, ptr, null, null, HConstants.LATEST_TIMESTAMP);
             byte[] immutableIndexRowKey = indexKeyPtr.copyBytes();
             assertArrayEquals(immutableIndexRowKey, mutablelndexRowKey);
             for (ColumnReference ref : im1.getCoveredColumns()) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/8bc58328/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 8a5a8e4..266f4da 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -41,6 +41,7 @@ import java.sql.Date;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
@@ -107,6 +108,7 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PLongColumn;
 import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PTable;
@@ -120,6 +122,7 @@ import org.apache.phoenix.schema.stats.GuidePostsKey;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PDataType;
 
+import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
 
 
@@ -854,4 +857,141 @@ public class TestUtil {
         System.out.println("-----------------------------------------------");
     }
 
+    public static void waitForIndexRebuild(Connection conn, String fullIndexName, PIndexState
expectedIndexState) throws InterruptedException, SQLException {
+        boolean isActive = false;
+        String schema = SchemaUtil.getSchemaNameFromFullName(fullIndexName);
+        String index = SchemaUtil.getTableNameFromFullName(fullIndexName);
+        int maxTries = 12, nTries = 0;
+        do {
+            Thread.sleep(5 * 1000); // sleep 5 secs
+            String query = "SELECT CAST(" + PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP
+ " AS BIGINT) FROM " +
+                    PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME + " WHERE (" + PhoenixDatabaseMetaData.TABLE_SCHEM
+ "," + PhoenixDatabaseMetaData.TABLE_NAME
+                    + ") = (" + "'" + schema + "','" + index + "') "
+                    + "AND " + PhoenixDatabaseMetaData.COLUMN_FAMILY + " IS NULL AND " +
PhoenixDatabaseMetaData.COLUMN_NAME + " IS NULL"
+                    + " AND " + PhoenixDatabaseMetaData.INDEX_STATE + " = '" + expectedIndexState.getSerializedValue()
+ "'";
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            if (expectedIndexState == PIndexState.ACTIVE) {
+                if (rs.getLong(1) == 0 && !rs.wasNull()) {
+                    isActive = true;
+                    break;
+                }
+            }
+        } while (++nTries < maxTries);
+        if (expectedIndexState == PIndexState.ACTIVE) {
+            assertTrue(isActive);
+        }
+    }
+
+    public static long scutinizeIndex(Connection conn, String fullTableName, String fullIndexName)
throws SQLException {
+        PhoenixConnection pconn = conn.unwrap(PhoenixConnection.class);
+        PTable ptable = pconn.getTable(new PTableKey(pconn.getTenantId(), fullTableName));
+        PTable pindex = pconn.getTable(new PTableKey(pconn.getTenantId(), fullIndexName));
+        StringBuilder indexQueryBuf = new StringBuilder("SELECT ");
+        for (PColumn dcol : ptable.getPKColumns()) {
+            indexQueryBuf.append("CAST(\"" + IndexUtil.getIndexColumnName(dcol) + "\" AS
" + dcol.getDataType().getSqlTypeName() + ")");
+            indexQueryBuf.append(",");
+        }
+        for (PColumn icol : pindex.getColumns()) {
+            PColumn dcol = IndexUtil.getDataColumn(ptable, icol.getName().getString());
+            if (SchemaUtil.isPKColumn(icol) && !SchemaUtil.isPKColumn(dcol)) {
+                indexQueryBuf.append("CAST (\"" + icol.getName().getString() + "\" AS " +
dcol.getDataType().getSqlTypeName() + ")");
+                indexQueryBuf.append(",");
+            }
+        }
+        for (PColumn icol : pindex.getColumns()) {
+            if (!SchemaUtil.isPKColumn(icol)) {
+                PColumn dcol = IndexUtil.getDataColumn(ptable, icol.getName().getString());
+                indexQueryBuf.append("CAST (\"" + icol.getName().getString() + "\" AS " +
dcol.getDataType().getSqlTypeName() + ")");
+                indexQueryBuf.append(",");
+            }
+        }
+        indexQueryBuf.setLength(indexQueryBuf.length()-1);
+        indexQueryBuf.append("\nFROM " + fullIndexName);
+        
+        StringBuilder tableQueryBuf = new StringBuilder("SELECT ");
+        for (PColumn dcol : ptable.getPKColumns()) {
+            tableQueryBuf.append("\"" + dcol.getName().getString() + "\"");
+            tableQueryBuf.append(",");
+        }
+        for (PColumn icol : pindex.getColumns()) {
+            PColumn dcol = IndexUtil.getDataColumn(ptable, icol.getName().getString());
+            if (SchemaUtil.isPKColumn(icol) && !SchemaUtil.isPKColumn(dcol)) {
+                if (dcol.getFamilyName() != null) {
+                    tableQueryBuf.append("\"" + dcol.getFamilyName().getString() + "\"");
+                    tableQueryBuf.append(".");
+                }
+                tableQueryBuf.append("\"" + dcol.getName().getString() + "\"");
+                tableQueryBuf.append(",");
+            }
+        }
+        for (PColumn icol : pindex.getColumns()) {
+            if (!SchemaUtil.isPKColumn(icol)) {
+                PColumn dcol = IndexUtil.getDataColumn(ptable, icol.getName().getString());
+                if (dcol.getFamilyName() != null) {
+                    tableQueryBuf.append("\"" + dcol.getFamilyName().getString() + "\"");
+                    tableQueryBuf.append(".");
+                }
+                tableQueryBuf.append("\"" + dcol.getName().getString() + "\"");
+                tableQueryBuf.append(",");
+            }
+        }
+        tableQueryBuf.setLength(tableQueryBuf.length()-1);
+        tableQueryBuf.append("\nFROM " + fullTableName + "\nWHERE (");
+        for (PColumn dcol : ptable.getPKColumns()) {
+            tableQueryBuf.append("\"" + dcol.getName().getString() + "\"");
+            tableQueryBuf.append(",");
+        }
+        tableQueryBuf.setLength(tableQueryBuf.length()-1);
+        tableQueryBuf.append(") = ((");
+        for (int i = 0; i < ptable.getPKColumns().size(); i++) {
+            tableQueryBuf.append("?");
+            tableQueryBuf.append(",");
+        }
+        tableQueryBuf.setLength(tableQueryBuf.length()-1);
+        tableQueryBuf.append("))");
+        
+        String tableQuery = tableQueryBuf.toString();
+        PreparedStatement istmt = conn.prepareStatement(tableQuery);
+        
+        String indexQuery = indexQueryBuf.toString();
+        ResultSet irs = conn.createStatement().executeQuery(indexQuery);
+        ResultSetMetaData irsmd = irs.getMetaData();
+        long icount = 0;
+        while (irs.next()) {
+            icount++;
+            StringBuilder pkBuf = new StringBuilder("(");
+            for (int i = 0; i < ptable.getPKColumns().size(); i++) {
+                PColumn dcol = ptable.getPKColumns().get(i);
+                Object pkVal = irs.getObject(i+1);
+                PDataType pkType = PDataType.fromTypeId(irsmd.getColumnType(i + 1));
+                istmt.setObject(i+1, pkVal, dcol.getDataType().getSqlType());
+                pkBuf.append(pkType.toStringLiteral(pkVal));
+                pkBuf.append(",");
+            }
+            pkBuf.setLength(pkBuf.length()-1);
+            pkBuf.append(")");
+            ResultSet drs = istmt.executeQuery();
+            ResultSetMetaData drsmd = drs.getMetaData();
+            assertTrue("Expected to find PK in data table: " + pkBuf, drs.next());
+            for (int i = 0; i < irsmd.getColumnCount(); i++) {
+                Object iVal = irs.getObject(i + 1);
+                PDataType iType = PDataType.fromTypeId(irsmd.getColumnType(i + 1));
+                Object dVal = drs.getObject(i + 1);
+                PDataType dType = PDataType.fromTypeId(drsmd.getColumnType(i + 1));
+                assertTrue("Expected equality for " + drsmd.getColumnName(i + 1) + ", but
" + iType.toStringLiteral(iVal) + "!=" + dType.toStringLiteral(dVal), Objects.equal(iVal,
dVal));
+            }
+        }
+        
+        long dcount = getRowCount(conn, fullTableName);
+        assertEquals("Expected data table row count to match", icount, dcount);
+        return icount;
+    }
+    
+    private static long getRowCount(Connection conn, String tableName) throws SQLException
{
+        ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ count(*)
FROM " + tableName);
+        assertTrue(rs.next());
+        return rs.getLong(1);
+    }
+
 }


Mime
View raw message