phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From an...@apache.org
Subject phoenix git commit: PHOENIX-3797 Local Index - Compaction fails on table with local index due to non-increasing bloom keys
Date Tue, 06 Jun 2017 20:13:51 GMT
Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.2 31107556e -> 378a1884a


PHOENIX-3797 Local Index - Compaction fails on table with local index due to non-increasing
bloom keys


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/378a1884
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/378a1884
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/378a1884

Branch: refs/heads/4.x-HBase-1.2
Commit: 378a1884ada67d2c6e9d3decbda2f98ab28f3904
Parents: 3110755
Author: Ankit Singhal <ankitsinghal59@gmail.com>
Authored: Tue Jun 6 13:13:43 2017 -0700
Committer: Ankit Singhal <ankitsinghal59@gmail.com>
Committed: Tue Jun 6 13:13:43 2017 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/index/LocalIndexIT.java     | 17 +++--
 .../DataTableLocalIndexRegionScanner.java       | 77 ++++++++++++++++----
 .../IndexHalfStoreFileReaderGenerator.java      | 17 ++++-
 .../UngroupedAggregateRegionObserver.java       | 15 ++--
 .../org/apache/phoenix/util/RepairUtil.java     |  7 +-
 .../org/apache/phoenix/util/ServerUtil.java     |  5 ++
 6 files changed, 98 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/378a1884/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
index 32254b0..27edfb7 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
@@ -585,16 +585,16 @@ public class LocalIndexIT extends BaseLocalIndexIT {
             String indexName = "IDX_T_AUTO_MATIC_REPAIR";
             String indexName1 = "IDX_T_AUTO_MATIC_REPAIR_1";
             statement.execute("create table " + tableName + " (id integer not null,fn varchar,"
-                    + "cf1.ln varchar constraint pk primary key(id)) split on (1,2,3,4,5)");
+                    + "cf1.ln varchar constraint pk primary key(id)) split on (400,800,1200,1600)");
             statement.execute("create local index " + indexName + " on " + tableName + "
 (fn,cf1.ln)");
             statement.execute("create local index " + indexName1 + " on " + tableName + "
 (fn)");
-            for (int i = 0; i < 7; i++) {
+            for (int i = 0; i < 2000; i++) {
                 statement.execute("upsert into " + tableName + "  values(" + i + ",'fn" +
i + "','ln" + i + "')");
             }
             conn.commit();
             ResultSet rs = statement.executeQuery("SELECT COUNT(*) FROM " + indexName);
             assertTrue(rs.next());
-            assertEquals(7, rs.getLong(1));
+            assertEquals(2000, rs.getLong(1));
             List<HRegionInfo> tableRegions = admin.getTableRegions(TableName.valueOf(tableName));
             admin.disableTable(tableName);
             copyLocalIndexHFiles(config, tableRegions.get(0), tableRegions.get(1), false);
@@ -602,17 +602,20 @@ public class LocalIndexIT extends BaseLocalIndexIT {
             admin.enableTable(tableName);
 
             int count=getCount(conn, tableName, "L#0");
-            assertTrue(count > 14);
+            assertTrue(count > 4000);
             admin.majorCompact(TableName.valueOf(tableName));
             int tryCount = 5;// need to wait for rebuilding of corrupted local index region
-            while (tryCount-- > 0 && count != 14) {
+            while (tryCount-- > 0 && count != 4000) {
                 Thread.sleep(15000);
                 count = getCount(conn, tableName, "L#0");
             }
-            assertEquals(14, count);
+            assertEquals(4000, count);
             rs = statement.executeQuery("SELECT COUNT(*) FROM " + indexName1);
             assertTrue(rs.next());
-            assertEquals(7, rs.getLong(1));
+            assertEquals(2000, rs.getLong(1));
+            rs = statement.executeQuery("SELECT COUNT(*) FROM " + indexName);
+            assertTrue(rs.next());
+            assertEquals(2000, rs.getLong(1));
             statement.execute("DROP INDEX " + indexName1 + " ON " + tableName);
             admin.majorCompact(TableName.valueOf(tableName));
             statement.execute("DROP INDEX " + indexName + " ON " + tableName);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/378a1884/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java
b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java
index e4486bc..4c44e82 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/DataTableLocalIndexRegionScanner.java
@@ -17,20 +17,36 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.coprocessor.DelegateRegionScanner;
+import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
+import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.MutationList;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
-
+import org.apache.phoenix.util.ServerUtil;
+/*
+ * Scanner to read data store and regenerate the local index data
+ */
 public class DataTableLocalIndexRegionScanner extends DelegateRegionScanner {
     MultiKeyValueTuple result = new MultiKeyValueTuple();
     ImmutableBytesWritable ptr = new ImmutableBytesWritable();
@@ -39,34 +55,48 @@ public class DataTableLocalIndexRegionScanner extends DelegateRegionScanner
{
     private byte[] startKey;
     private byte[] endKey;
     private byte[] localIndexFamily;
-
+    private Region region;
+    long maxBatchSizeBytes;
+    int maxBatchSize;
+    private MutationList mutationList;
+    
+    
+    /**
+     * @param scanner Scanner for data table stores 
+     * @param region 
+     * @param indexMaintainers Maintainer of local Indexes which needs to built
+     * @param localIndexFamily LocalIndex family needs to be built.
+     * @param conf
+     * @throws IOException
+     */
     public DataTableLocalIndexRegionScanner(RegionScanner scanner, Region region,
-            List<IndexMaintainer> indexMaintainers, byte[] localIndexFamily) throws
IOException {
+            List<IndexMaintainer> indexMaintainers, byte[] localIndexFamily,Configuration
conf) throws IOException {
         super(scanner);
         this.indexMaintainers = indexMaintainers;
         this.startKey = region.getRegionInfo().getStartKey();
         this.endKey = region.getRegionInfo().getEndKey();
         this.localIndexFamily = localIndexFamily;
+        this.region=region;
+        maxBatchSize = conf.getInt(MUTATE_BATCH_SIZE_ATTRIB, QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+        maxBatchSizeBytes = conf.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
+            QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
+        mutationList=new UngroupedAggregateRegionObserver.MutationList(maxBatchSize);   
     }
 
     @Override
     public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws
IOException {
         List<Cell> dataTableResults = new ArrayList<Cell>();
         boolean next = super.next(dataTableResults, scannerContext);
-        getLocalIndexCellsFromDataTable(dataTableResults, outResult);
-        return next;
-    }
-
-    @Override
-    public boolean next(List<Cell> results) throws IOException {
-        List<Cell> dataTableResults = new ArrayList<Cell>();
-        boolean next = super.next(dataTableResults);
-        getLocalIndexCellsFromDataTable(dataTableResults, results);
+        addMutations(dataTableResults);
+        if (ServerUtil.readyToCommit(mutationList.size(), mutationList.byteSize(), maxBatchSize,
maxBatchSizeBytes)||!next) {
+            region.batchMutate(mutationList.toArray(new Mutation[mutationList.size()]), HConstants.NO_NONCE,
+                    HConstants.NO_NONCE);
+            mutationList.clear();
+        }
         return next;
     }
 
-    private void getLocalIndexCellsFromDataTable(List<Cell> dataTableResults, List<Cell>
localIndexResults)
-            throws IOException {
+    private void addMutations(List<Cell> dataTableResults) throws IOException {
         if (!dataTableResults.isEmpty()) {
             result.setKeyValues(dataTableResults);
             for (IndexMaintainer maintainer : indexMaintainers) {
@@ -76,11 +106,26 @@ public class DataTableLocalIndexRegionScanner extends DelegateRegionScanner
{
                 List<Cell> list = maintainer.buildUpdateMutation(kvBuilder, valueGetter,
ptr,
                         dataTableResults.get(0).getTimestamp(), startKey, endKey).getFamilyCellMap()
                         .get(localIndexFamily);
-                if (list != null) {
-                    localIndexResults.addAll(list);
+                Put put = null;
+                Delete del = null;
+                for (Cell cell : list) {
+                    if (KeyValue.Type.codeToType(cell.getTypeByte()) == KeyValue.Type.Put)
{
+                        if (put == null) {
+                            put = new Put(CellUtil.cloneRow(cell));
+                            mutationList.add(put);
+                        }
+                        put.add(cell);
+                    } else {
+                        if (del == null) {
+                            del = new Delete(CellUtil.cloneRow(cell));
+                            mutationList.add(del);
+                        }
+                        del.addDeleteMarker(cell);
+                    }
                 }
             }
         }
     }
+    
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/378a1884/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
index 1e9151a..88154a7 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReaderGenerator.java
@@ -26,6 +26,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellUtil;
@@ -68,6 +70,9 @@ import com.google.common.collect.Lists;
 
 public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver {
     
+    private static final String LOCAL_INDEX_AUTOMATIC_REPAIR = "local.index.automatic.repair";
+    public static final Log LOG = LogFactory.getLog(IndexHalfStoreFileReaderGenerator.class);
+
     @Override
     public Reader preStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment>
ctx,
             FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
@@ -196,7 +201,13 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver
{
         if (!store.hasReferences()) {
             InternalScanner repairScanner = null;
             if (request.isMajor() && (!RepairUtil.isLocalIndexStoreFilesConsistent(c.getEnvironment(),
store))) {
-                repairScanner = getRepairScanner(c.getEnvironment(), store);
+                LOG.info("we have found inconsistent data for local index for region:"
+                        + c.getEnvironment().getRegion().getRegionInfo());
+                if (c.getEnvironment().getConfiguration().getBoolean(LOCAL_INDEX_AUTOMATIC_REPAIR,
true)) {
+                    LOG.info("Starting automatic repair of local Index for region:"
+                            + c.getEnvironment().getRegion().getRegionInfo());
+                    repairScanner = getRepairScanner(c.getEnvironment(), store);
+                }
             }
             if (repairScanner != null) {
                 return repairScanner;
@@ -271,7 +282,7 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver
{
             }
         }
         try {
-            PhoenixConnection conn = QueryUtil.getConnection(env.getConfiguration())
+            PhoenixConnection conn = QueryUtil.getConnectionOnServer(env.getConfiguration())
                     .unwrap(PhoenixConnection.class);
             PTable dataPTable = IndexUtil.getPDataTable(conn, env.getRegion().getTableDesc());
             final List<IndexMaintainer> maintainers = Lists
@@ -282,7 +293,7 @@ public class IndexHalfStoreFileReaderGenerator extends BaseRegionObserver
{
                 }
             }
             return new DataTableLocalIndexRegionScanner(env.getRegion().getScanner(scan),
env.getRegion(),
-                    maintainers, store.getFamily().getName());
+                    maintainers, store.getFamily().getName(),env.getConfiguration());
             
 
         } catch (ClassNotFoundException | SQLException e) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/378a1884/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 16f19fb..75654c9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -287,7 +287,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
         return s;
     }
 
-    class MutationList extends ArrayList<Mutation> {
+   public static class MutationList extends ArrayList<Mutation> {
         private long byteSize = 0l;
         public MutationList() {
             super();
@@ -702,14 +702,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                                 }
                             }
                         }
-                        if (readyToCommit(rowCount, mutations.byteSize(), maxBatchSize, maxBatchSizeBytes))
{
+                        if (ServerUtil.readyToCommit(rowCount, mutations.byteSize(), maxBatchSize,
maxBatchSizeBytes)) {
                             commit(region, mutations, indexUUID, blockingMemStoreSize, indexMaintainersPtr,
txState,
                                     areMutationInSameRegion, targetHTable, useIndexProto);
                             mutations.clear();
                         }
                         // Commit in batches based on UPSERT_BATCH_SIZE_BYTES_ATTRIB in config
 
-                        if (readyToCommit(rowCount, indexMutations.byteSize(), maxBatchSize,
maxBatchSizeBytes)) {
+                        if (ServerUtil.readyToCommit(rowCount, indexMutations.byteSize(),
maxBatchSize, maxBatchSizeBytes)) {
                             commitBatch(region, indexMutations, null, blockingMemStoreSize,
null, txState,
                                     useIndexProto);
                             indexMutations.clear();
@@ -792,12 +792,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
             commitBatch(region, mutations, indexUUID, blockingMemstoreSize, indexMaintainersPtr,
txState, useIndexProto);
         }
     }
-
-    private boolean readyToCommit(int rowCount, long mutationSize, int maxBatchSize, long
maxBatchSizeBytes) {
-        return maxBatchSize > 0 && rowCount > maxBatchSize
-                || (maxBatchSizeBytes > 0 && mutationSize > maxBatchSizeBytes);
-    }
-
+    
     @Override
     public InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment>
c, final Store store,
             final InternalScanner scanner, final ScanType scanType) throws IOException {
@@ -888,7 +883,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver
                                 del.addDeleteMarker(cell);
                             }
                         }
-                        if (readyToCommit(rowCount, mutations.byteSize(), maxBatchSize, maxBatchSizeBytes))
{
+                        if (ServerUtil.readyToCommit(rowCount, mutations.byteSize(), maxBatchSize,
maxBatchSizeBytes)) {
                             region.batchMutate(mutations.toArray(new Mutation[mutations.size()]),
HConstants.NO_NONCE,
                                     HConstants.NO_NONCE);
                             uuidValue = ServerCacheClient.generateId();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/378a1884/phoenix-core/src/main/java/org/apache/phoenix/util/RepairUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/RepairUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/RepairUtil.java
index ea14715..d394a68 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/RepairUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/RepairUtil.java
@@ -31,10 +31,9 @@ public class RepairUtil {
         for (StoreFile file : store.getStorefiles()) {
             if (file.getReader() != null && file.getReader().getFirstKey() != null)
{
                 byte[] fileFirstRowKey = KeyValue.createKeyValueFromKey(file.getReader().getFirstKey()).getRow();
-                ;
-                if ((fileFirstRowKey != null && Bytes.compareTo(file.getReader().getFirstKey(),
0,
-                        indexKeyEmbedded.length, indexKeyEmbedded, 0, indexKeyEmbedded.length)
!= 0)
-                /* || (endKey.length > 0 && Bytes.compareTo(file.getLastKey(),
endKey) < 0) */) { return false; }
+                if ((fileFirstRowKey != null && Bytes.compareTo(fileFirstRowKey,
0,
+                        indexKeyEmbedded.length, indexKeyEmbedded, 0, indexKeyEmbedded.length)
!= 0)) {
+                    return false; }
             }
         }
         return true;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/378a1884/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
index c90d061..0c2495a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ServerUtil.java
@@ -213,4 +213,9 @@ public class ServerUtil {
         return new DoNotRetryIOException(msg, t);
     }
     
+    public static boolean readyToCommit(int rowCount, long mutationSize, int maxBatchSize,
long maxBatchSizeBytes) {
+        return maxBatchSize > 0 && rowCount > maxBatchSize
+                || (maxBatchSizeBytes > 0 && mutationSize > maxBatchSizeBytes);
+    }
+    
 }


Mime
View raw message