phoenix-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [phoenix] kadirozde commented on a change in pull request #469: PHOENIX-5156 Consistent Global Indexes for Non-Transactional Tables
Date Fri, 03 May 2019 00:55:52 GMT
kadirozde commented on a change in pull request #469: PHOENIX-5156 Consistent Global Indexes
for Non-Transactional Tables
URL: https://github.com/apache/phoenix/pull/469#discussion_r280642905
 
 

 ##########
 File path: phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
 ##########
 @@ -506,30 +623,130 @@ public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnviro
           current.addTimelineAnnotation("Built index updates, doing preStep");
           TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
           byte[] tableName = c.getEnvironment().getRegion().getTableDescriptor().getTableName().getName();
-          Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = indexUpdates.iterator();
+          Iterator<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdatesItr
= indexUpdates.iterator();
           List<Mutation> localUpdates = new ArrayList<Mutation>(indexUpdates.size());
+          postIndexUpdates = new ArrayList<>(indexUpdates.size());
+          indexUpdatesForDeletes = new ArrayList<>(indexUpdates.size());
           while(indexUpdatesItr.hasNext()) {
-              Pair<Mutation, byte[]> next = indexUpdatesItr.next();
-              if (Bytes.compareTo(next.getSecond(), tableName) == 0) {
-                  localUpdates.add(next.getFirst());
+              Pair<Pair<Mutation, byte[]>, byte[]> next = indexUpdatesItr.next();
+              if (Bytes.compareTo(next.getFirst().getSecond(), tableName) == 0) {
+                  localUpdates.add(next.getFirst().getFirst());
                   indexUpdatesItr.remove();
               }
+              else {
+                  // get index maintainer for this index table
+                  IndexMaintainer indexMaintainer = getIndexMaintainer(maintainers, next.getFirst().getSecond());
+                  if (indexMaintainer == null) {
+                      throw new DoNotRetryIOException(
+                              "preBatchMutateWithExceptions: indexMaintainer is null " +
+                                      c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+                  }
+                  byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+                  byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+                  // add the VERIFIED cell, which is the empty cell
+                  Mutation m = next.getFirst().getFirst();
+                  boolean rebuild = PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap());
+                  long ts = getMaxTimestamp(m);
+                  if (rebuild) {
+                      if (m instanceof Put) {
+                          ((Put)m).addColumn(emptyCF, emptyCQ, ts, TRUE_BYTES);
+                      }
+                  } else {
+                      if (m instanceof Put) {
+                          ((Put)m).addColumn(emptyCF, emptyCQ, ts, FALSE_BYTES);
+                          // Ignore post index updates (i.e., the third write phase updates)
for this row if it is
+                          // going through concurrent updates
+                          RowKey rowKey = new RowKey(next.getSecond());
+                          if (!context.pendingRows.contains(rowKey)) {
+                              Put put = new Put(m.getRow());
+                              put.addColumn(emptyCF, emptyCQ, ts, TRUE_BYTES);
+                              postIndexUpdates.add(new Pair<>(new Pair<>(put,
next.getFirst().getSecond()), next.getSecond()));
+                          }
+                      } else {
+                          // For a delete mutation, first unverify the exiting row in the
index table and then delete
+                          // the row from the index table after deleting the corresponding
row from the data table
+                          indexUpdatesItr.remove();
+                          Put put = new Put(m.getRow());
+                          put.addColumn(emptyCF, emptyCQ, ts, FALSE_BYTES);
+                          indexUpdatesForDeletes.add(new Pair<>(put, next.getFirst().getSecond()));
+                          // Ignore post index updates (i.e., the third write phase updates)
for this row if it is
+                          // going through concurrent updates
+                          RowKey rowKey = new RowKey(next.getSecond());
+                          if (!context.pendingRows.contains(rowKey)) {
+                              postIndexUpdates.add(next);
+                          }
+                      }
+                  }
+              }
           }
           if (!localUpdates.isEmpty()) {
               miniBatchOp.addOperationsFromCP(0,
                   localUpdates.toArray(new Mutation[localUpdates.size()]));
           }
-          if (!indexUpdates.isEmpty()) {
-              context.indexUpdates = indexUpdates;
-              // write index updates to WAL
-              if (durability != Durability.SKIP_WAL) {
-                  // we have all the WAL durability, so we just update the WAL entry and
move on
-                  for (Pair<Mutation, byte[]> entry : indexUpdates) {
-                    edit.add(new IndexedKeyValue(entry.getSecond(), entry.getFirst()));
-                  }              
+          if (!indexUpdatesForDeletes.isEmpty()) {
+              context.indexUpdates = indexUpdatesForDeletes;
+          }
+
+          if (!indexUpdates.isEmpty() && context.indexUpdates.isEmpty()) {
+              context.indexUpdates = new ArrayList<>(indexUpdates.size());
+          }
+          for (Pair<Pair<Mutation, byte[]>, byte[]> update : indexUpdates) {
+                  context.indexUpdates.add(update.getFirst());
+          }
+      }
+      // Sleep for one millisecond if we have prepared the index updates in less than 1 ms.
The sleep is necessary to
+      // get different timestamps for concurrent batches that share common rows. It is very
rare that the index updates
+      // can be prepared in less than one millisecond
+      if (!context.rowLocks.isEmpty() && now == EnvironmentEdgeManager.currentTimeMillis())
{
+          Thread.sleep(1);
+          LOG.debug("slept 1ms");
 
 Review comment:
   To make sure that it is a rare event. I will add the name of the index table.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message