phoenix-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [phoenix] swaroopak commented on a change in pull request #517: PHOENIX-5211 Consistent Immutable Global Indexes for Non-Transactiona…
Date Wed, 12 Jun 2019 05:58:36 GMT
swaroopak commented on a change in pull request #517: PHOENIX-5211 Consistent Immutable Global
Indexes for Non-Transactiona…
URL: https://github.com/apache/phoenix/pull/517#discussion_r292748326
 
 

 ##########
 File path: phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
 ##########
 @@ -931,220 +944,323 @@ private void send(Iterator<TableRef> tableRefIterator) throws
SQLException {
                     joinMutationState(new TableRef(tableRef), multiRowMutationState, txMutations);
                 }
             }
-            long serverTimestamp = HConstants.LATEST_TIMESTAMP;
+            Map<TableInfo, List<Mutation>> deletedIndexMutations = new LinkedHashMap<>();
+            Map<TableInfo, List<Mutation>> verifiedMutationsForIndex = filterIndexCheckerMutations(
+                    physicalTableMutationMap, deletedIndexMutations);
+            // We need to clone the verifiedMutations because sendMutations will remove entries
from the map
+            // We will need the whole map because we will need to set the verified to true.
+            List<TableInfo> clonedIndexTables = new ArrayList<>();
+            boolean hasVerifiedMutations = verifiedMutationsForIndex.size() > 0;
+
+            // Phase 1: Send verified indexes with VERIFIED=false
+            if (hasVerifiedMutations) {
+                clonedIndexTables.addAll(verifiedMutationsForIndex.keySet());
+                // addRowMutations generates the mutations with VERIFIED=false
+                sendMutations(verifiedMutationsForIndex.entrySet().iterator(), span, indexMetaDataPtr);
+            }
+
+            // Phase 2: Send data table and other indexes
             Iterator<Entry<TableInfo, List<Mutation>>> mutationsIterator
= physicalTableMutationMap.entrySet()
                     .iterator();
-            while (mutationsIterator.hasNext()) {
-                Entry<TableInfo, List<Mutation>> pair = mutationsIterator.next();
-                TableInfo tableInfo = pair.getKey();
-                byte[] htableName = tableInfo.getHTableName().getBytes();
-                List<Mutation> mutationList = pair.getValue();
-                List<List<Mutation>> mutationBatchList =
-                        getMutationBatchList(batchSize, batchSizeBytes, mutationList);
-
-                // create a span per target table
-                // TODO maybe we can be smarter about the table name to string here?
-                Span child = Tracing.child(span, "Writing mutation batch for table: " + Bytes.toString(htableName));
-
-                int retryCount = 0;
-                boolean shouldRetry = false;
-                long numMutations = 0;
-                long mutationSizeBytes = 0;
-                long mutationCommitTime = 0;
-                long numFailedMutations = 0;
-                ;
-                long startTime = 0;
-                boolean shouldRetryIndexedMutation = false;
-                IndexWriteException iwe = null;
-                do {
-                    TableRef origTableRef = tableInfo.getOrigTableRef();
-                    PTable table = origTableRef.getTable();
-                    table.getIndexMaintainers(indexMetaDataPtr, connection);
-                    final ServerCache cache = tableInfo.isDataTable() ? 
-                            IndexMetaDataCacheClient.setMetaDataOnMutations(connection, table,
-                                    mutationList, indexMetaDataPtr) : null;
-                    // If we haven't retried yet, retry for this case only, as it's possible
that
-                    // a split will occur after we send the index metadata cache to all known
-                    // region servers.
-                    shouldRetry = cache != null;
-                    SQLException sqlE = null;
-                    Table hTable = connection.getQueryServices().getTable(htableName);
-                    try {
-                        if (table.isTransactional()) {
-                            // Track tables to which we've sent uncommitted data
-                            if (tableInfo.isDataTable()) {
-                                uncommittedPhysicalNames.add(table.getPhysicalName().getString());
-                                phoenixTransactionContext.markDMLFence(table);
-                            }
-                            // Only pass true for last argument if the index is being written
to on it's own (i.e. initial
-                            // index population), not if it's being written to for normal
maintenance due to writes to
-                            // the data table. This case is different because the initial
index population does not need
-                            // to be done transactionally since the index is only made active
after all writes have
-                            // occurred successfully.
-                            hTable = phoenixTransactionContext.getTransactionalTableWriter(connection,
table, hTable, tableInfo.isDataTable() && table.getType() == PTableType.INDEX);
+            sendMutations(mutationsIterator, span, indexMetaDataPtr);
+
+            // Phase 3: Send verfied indexes with VERIFIED = true
+            if (hasVerifiedMutations) {
+                Map<TableInfo, List<Mutation>>
+                            clonedVerifiedMutations =
+                            setVerifiedColumnForIndexes(clonedIndexTables, clonedDataMutations,
deletedIndexMutations,
+                                    TRUE_BYTES);
+                try {
+                    sendMutations(clonedVerifiedMutations.entrySet().iterator(), span,
+                            indexMetaDataPtr);
+                } catch (SQLException ex) {
+                    LOGGER.warn("Ignoring exception that happened during setting index verified
value to TRUE", ex);
+                }
+            }
+        }
+    }
+
+    private void sendMutations(Iterator<Entry<TableInfo, List<Mutation>>>
mutationsIterator, Span span, ImmutableBytesWritable indexMetaDataPtr)
 
 Review comment:
   nice refactoring!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message