phoenix-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [phoenix] gokceni commented on a change in pull request #469: PHOENIX-5156 Consistent Global Indexes for Non-Transactional Tables
Date Thu, 02 May 2019 23:30:51 GMT
gokceni commented on a change in pull request #469: PHOENIX-5156 Consistent Global Indexes
for Non-Transactional Tables
URL: https://github.com/apache/phoenix/pull/469#discussion_r280630543
 
 

 ##########
 File path: phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
 ##########
 @@ -506,30 +623,130 @@ public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnviro
           current.addTimelineAnnotation("Built index updates, doing preStep");
           TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
           byte[] tableName = c.getEnvironment().getRegion().getTableDescriptor().getTableName().getName();
-          Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = indexUpdates.iterator();
+          Iterator<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdatesItr
= indexUpdates.iterator();
           List<Mutation> localUpdates = new ArrayList<Mutation>(indexUpdates.size());
+          postIndexUpdates = new ArrayList<>(indexUpdates.size());
+          indexUpdatesForDeletes = new ArrayList<>(indexUpdates.size());
           while(indexUpdatesItr.hasNext()) {
-              Pair<Mutation, byte[]> next = indexUpdatesItr.next();
-              if (Bytes.compareTo(next.getSecond(), tableName) == 0) {
-                  localUpdates.add(next.getFirst());
+              Pair<Pair<Mutation, byte[]>, byte[]> next = indexUpdatesItr.next();
+              if (Bytes.compareTo(next.getFirst().getSecond(), tableName) == 0) {
+                  localUpdates.add(next.getFirst().getFirst());
                   indexUpdatesItr.remove();
               }
+              else {
+                  // get index maintainer for this index table
+                  IndexMaintainer indexMaintainer = getIndexMaintainer(maintainers, next.getFirst().getSecond());
+                  if (indexMaintainer == null) {
+                      throw new DoNotRetryIOException(
+                              "preBatchMutateWithExceptions: indexMaintainer is null " +
+                                      c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+                  }
+                  byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+                  byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+                  // add the VERIFIED cell, which is the empty cell
+                  Mutation m = next.getFirst().getFirst();
+                  boolean rebuild = PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap());
+                  long ts = getMaxTimestamp(m);
+                  if (rebuild) {
+                      if (m instanceof Put) {
+                          ((Put)m).addColumn(emptyCF, emptyCQ, ts, TRUE_BYTES);
+                      }
+                  } else {
+                      if (m instanceof Put) {
+                          ((Put)m).addColumn(emptyCF, emptyCQ, ts, FALSE_BYTES);
+                          // Ignore post index updates (i.e., the third write phase updates)
for this row if it is
+                          // going through concurrent updates
+                          RowKey rowKey = new RowKey(next.getSecond());
+                          if (!context.pendingRows.contains(rowKey)) {
+                              Put put = new Put(m.getRow());
+                              put.addColumn(emptyCF, emptyCQ, ts, TRUE_BYTES);
+                              postIndexUpdates.add(new Pair<>(new Pair<>(put,
next.getFirst().getSecond()), next.getSecond()));
+                          }
+                      } else {
+                          // For a delete mutation, first unverify the exiting row in the
index table and then delete
+                          // the row from the index table after deleting the corresponding
row from the data table
+                          indexUpdatesItr.remove();
+                          Put put = new Put(m.getRow());
+                          put.addColumn(emptyCF, emptyCQ, ts, FALSE_BYTES);
+                          indexUpdatesForDeletes.add(new Pair<>(put, next.getFirst().getSecond()));
+                          // Ignore post index updates (i.e., the third write phase updates)
for this row if it is
+                          // going through concurrent updates
+                          RowKey rowKey = new RowKey(next.getSecond());
+                          if (!context.pendingRows.contains(rowKey)) {
+                              postIndexUpdates.add(next);
+                          }
+                      }
+                  }
+              }
           }
           if (!localUpdates.isEmpty()) {
               miniBatchOp.addOperationsFromCP(0,
                   localUpdates.toArray(new Mutation[localUpdates.size()]));
           }
-          if (!indexUpdates.isEmpty()) {
-              context.indexUpdates = indexUpdates;
-              // write index updates to WAL
-              if (durability != Durability.SKIP_WAL) {
-                  // we have all the WAL durability, so we just update the WAL entry and
move on
-                  for (Pair<Mutation, byte[]> entry : indexUpdates) {
-                    edit.add(new IndexedKeyValue(entry.getSecond(), entry.getFirst()));
-                  }              
+          if (!indexUpdatesForDeletes.isEmpty()) {
+              context.indexUpdates = indexUpdatesForDeletes;
+          }
+
+          if (!indexUpdates.isEmpty() && context.indexUpdates.isEmpty()) {
+              context.indexUpdates = new ArrayList<>(indexUpdates.size());
+          }
+          for (Pair<Pair<Mutation, byte[]>, byte[]> update : indexUpdates) {
+                  context.indexUpdates.add(update.getFirst());
+          }
+      }
+      // Sleep for one millisecond if we have prepared the index updates in less than 1 ms.
The sleep is necessary to
+      // get different timestamps for concurrent batches that share common rows. It is very
rare that the index updates
+      // can be prepared in less than one millisecond
+      if (!context.rowLocks.isEmpty() && now == EnvironmentEdgeManager.currentTimeMillis())
{
+          Thread.sleep(1);
+          LOG.debug("slept 1ms");
+      }
+      // Release the locks before making RPC calls for index updates
+      for (RowLock rowLock : context.rowLocks) {
+          rowLock.release();
+      }
+      context.rowLocks.clear();
+      // Do the index updates
 
 Review comment:
   Suggestion: Refactor the code to have different functions for each step. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message