phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maryann...@apache.org
Subject [05/50] [abbrv] phoenix git commit: PHOENIX-2478 Rows committed in transaction overlapping index creation are not populated
Date Tue, 02 Feb 2016 03:59:39 GMT
PHOENIX-2478 Rows committed in transaction overlapping index creation are not populated


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/13699371
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/13699371
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/13699371

Branch: refs/heads/calcite
Commit: 13699371820928cf14e0e2c5bbffe338c7aa2e93
Parents: f591da4
Author: James Taylor <jtaylor@salesforce.com>
Authored: Mon Jan 18 21:14:34 2016 -0800
Committer: James Taylor <jtaylor@salesforce.com>
Committed: Mon Jan 18 21:14:34 2016 -0800

----------------------------------------------------------------------
 .../apache/phoenix/execute/MutationState.java   | 478 ++++++++++---------
 .../apache/phoenix/schema/MetaDataClient.java   |  31 +-
 2 files changed, 265 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/13699371/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index ee694e7..a6fe98d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -121,6 +121,7 @@ public class MutationState implements SQLCloseable {
     private static final Logger logger = LoggerFactory.getLogger(MutationState.class);
     private static final TransactionCodec CODEC = new TransactionCodec();
     private static final int[] EMPTY_STATEMENT_INDEX_ARRAY = new int[0];
+    private static final int MAX_COMMIT_RETRIES = 3;
     
     private final PhoenixConnection connection;
     private final long maxSize;
@@ -160,37 +161,37 @@ public class MutationState implements SQLCloseable {
     }
     
     private MutationState(long maxSize, PhoenixConnection connection, Transaction tx, TransactionContext
txContext, long sizeOffset) {
-    	this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5),
tx, txContext);
+        this(maxSize, connection, Maps.<TableRef, Map<ImmutableBytesPtr,RowMutationState>>newHashMapWithExpectedSize(5),
tx, txContext);
         this.sizeOffset = sizeOffset;
     }
     
-	MutationState(long maxSize, PhoenixConnection connection,
-			Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations,
-			Transaction tx, TransactionContext txContext) {
-		this.maxSize = maxSize;
-		this.connection = connection;
-		this.mutations = mutations;
-		boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled();
-		this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue()
-				: NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
-		this.tx = tx;
-		if (tx == null) {
+    MutationState(long maxSize, PhoenixConnection connection,
+            Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations,
+            Transaction tx, TransactionContext txContext) {
+        this.maxSize = maxSize;
+        this.connection = connection;
+        this.mutations = mutations;
+        boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled();
+        this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue()
+                : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE;
+        this.tx = tx;
+        if (tx == null) {
             this.txAwares = Collections.emptyList();
-		    if (txContext == null) {
-    			TransactionSystemClient txServiceClient = this.connection
-    					.getQueryServices().getTransactionSystemClient();
-    			this.txContext = new TransactionContext(txServiceClient);
-		    } else {
-		        isExternalTxContext = true;
-		        this.txContext = txContext;
-		    }
-		} else {
-			// this code path is only used while running child scans, we can't pass the txContext
to child scans
-			// as it is not thread safe, so we use the tx member variable
-			this.txAwares = Lists.newArrayList();
-			this.txContext = null;
-		}
-	}
+            if (txContext == null) {
+                TransactionSystemClient txServiceClient = this.connection
+                        .getQueryServices().getTransactionSystemClient();
+                this.txContext = new TransactionContext(txServiceClient);
+            } else {
+                isExternalTxContext = true;
+                this.txContext = txContext;
+            }
+        } else {
+            // this code path is only used while running child scans, we can't pass the txContext
to child scans
+            // as it is not thread safe, so we use the tx member variable
+            this.txAwares = Lists.newArrayList();
+            this.txContext = null;
+        }
+    }
 
     public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mutations,
long sizeOffset, long maxSize, PhoenixConnection connection) {
         this(maxSize, connection, null, null, sizeOffset);
@@ -217,9 +218,11 @@ public class MutationState implements SQLCloseable {
     public void commitWriteFence(PTable dataTable) throws SQLException {
         if (dataTable.isTransactional()) {
             byte[] key = SchemaUtil.getTableKey(dataTable);
+            boolean success = false;
             try {
                 FenceWait fenceWait = VisibilityFence.prepareWait(key, connection.getQueryServices().getTransactionSystemClient());
                 fenceWait.await(10000, TimeUnit.MILLISECONDS);
+                success = true;
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
                 throw new SQLExceptionInfo.Builder(SQLExceptionCode.INTERRUPTED_EXCEPTION).setRootCause(e).build().buildException();
@@ -235,6 +238,7 @@ public class MutationState implements SQLCloseable {
                 // TODO: seems like an autonomous tx capability in Tephra would be useful
here.
                 try {
                     txContext.start();
+                    if (logger.isInfoEnabled() && success) logger.info("Added write
fence at ~" + getTransaction().getReadPointer());
                 } catch (TransactionFailureException e) {
                     throw TransactionUtil.getTransactionFailureException(e);
                 }
@@ -306,18 +310,18 @@ public class MutationState implements SQLCloseable {
             }
             if (hasUncommittedData) {
                 try {
-                	if (txContext == null) {
-                		currentTx = tx = connection.getQueryServices().getTransactionSystemClient().checkpoint(currentTx);
-                	}  else {
-                		txContext.checkpoint();
-                		currentTx = tx = txContext.getCurrentTransaction();
-                	}
+                    if (txContext == null) {
+                        currentTx = tx = connection.getQueryServices().getTransactionSystemClient().checkpoint(currentTx);
+                    }  else {
+                        txContext.checkpoint();
+                        currentTx = tx = txContext.getCurrentTransaction();
+                    }
                     // Since we've checkpointed, we can clear out uncommitted set, since
a statement run afterwards
                     // should see all this data.
                     uncommittedPhysicalNames.clear();
                 } catch (TransactionFailureException e) {
                     throw new SQLException(e);
-				} 
+                } 
             }
             // Since we're querying our own table while mutating it, we must exclude
             // see our current mutations, otherwise we can get erroneous results (for DELETE)
@@ -356,7 +360,7 @@ public class MutationState implements SQLCloseable {
     }
     
     public PhoenixConnection getConnection() {
-    	return connection;
+        return connection;
     }
     
     // Kept private as the Transaction may change when check pointed. Keeping it private
ensures
@@ -366,7 +370,7 @@ public class MutationState implements SQLCloseable {
     }
     
     public boolean isTransactionStarted() {
-    	return getTransaction() != null;
+        return getTransaction() != null;
     }
     
     public long getInitialWritePointer() {
@@ -391,11 +395,11 @@ public class MutationState implements SQLCloseable {
             throw new SQLExceptionInfo.Builder(SQLExceptionCode.NULL_TRANSACTION_CONTEXT).build().buildException();
         }
         
-		if (connection.getSCN() != null) {
-			throw new SQLExceptionInfo.Builder(
-					SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET)
-					.build().buildException();
-		}
+        if (connection.getSCN() != null) {
+            throw new SQLExceptionInfo.Builder(
+                    SQLExceptionCode.CANNOT_START_TRANSACTION_WITH_SCN_SET)
+                    .build().buildException();
+        }
         
         try {
             if (!isTransactionStarted()) {
@@ -460,9 +464,9 @@ public class MutationState implements SQLCloseable {
                 // Loop through new rows and replace existing with new
                 for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : entry.getValue().entrySet())
{
                     // Replace existing row with new row
-                	RowMutationState existingRowMutationState = existingRows.put(rowEntry.getKey(),
rowEntry.getValue());
+                    RowMutationState existingRowMutationState = existingRows.put(rowEntry.getKey(),
rowEntry.getValue());
                     if (existingRowMutationState != null) {
-                    	Map<PColumn,byte[]> existingValues = existingRowMutationState.getColumnValues();
+                        Map<PColumn,byte[]> existingValues = existingRowMutationState.getColumnValues();
                         if (existingValues != PRow.DELETE_MARKER) {
                             Map<PColumn,byte[]> newRow = rowEntry.getValue().getColumnValues();
                             // if new row is PRow.DELETE_MARKER, it means delete, and we
don't need to merge it with existing row. 
@@ -502,7 +506,7 @@ public class MutationState implements SQLCloseable {
     }
     
 
-	private static ImmutableBytesPtr getNewRowKeyWithRowTimestamp(ImmutableBytesPtr ptr, long
rowTimestamp, PTable table) {
+    private static ImmutableBytesPtr getNewRowKeyWithRowTimestamp(ImmutableBytesPtr ptr,
long rowTimestamp, PTable table) {
         RowKeySchema schema = table.getRowKeySchema();
         int rowTimestampColPos = table.getRowTimestampColPos();
         Field rowTimestampField = schema.getField(rowTimestampColPos); 
@@ -523,7 +527,7 @@ public class MutationState implements SQLCloseable {
         return ptr;
     }
     
-	private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final TableRef
tableRef, final Map<ImmutableBytesPtr, RowMutationState> values, final long timestamp,
boolean includeMutableIndexes, final boolean sendAll) { 
+    private Iterator<Pair<byte[],List<Mutation>>> addRowMutations(final
TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values, final long
timestamp, boolean includeMutableIndexes, final boolean sendAll) { 
         final PTable table = tableRef.getTable();
         final Iterator<PTable> indexes = // Only maintain tables with immutable rows
through this client-side mechanism
                 (table.isImmutableRows() || includeMutableIndexes) ? 
@@ -554,13 +558,13 @@ public class MutationState implements SQLCloseable {
                                 connection.getKeyValueBuilder(), connection);
                     // we may also have to include delete mutations for immutable tables
if we are not processing all the tables in the mutations map
                     if (!sendAll) {
-	                    TableRef key = new TableRef(index);
-						Map<ImmutableBytesPtr, RowMutationState> rowToColumnMap = mutations.remove(key);
-	                    if (rowToColumnMap!=null) {
-		                    final List<Mutation> deleteMutations = Lists.newArrayList();
-		                    generateMutations(tableRef, timestamp, rowToColumnMap, deleteMutations,
null);
-		                    indexMutations.addAll(deleteMutations);
-	                    }
+                        TableRef key = new TableRef(index);
+                        Map<ImmutableBytesPtr, RowMutationState> rowToColumnMap = mutations.remove(key);
+                        if (rowToColumnMap!=null) {
+                            final List<Mutation> deleteMutations = Lists.newArrayList();
+                            generateMutations(tableRef, timestamp, rowToColumnMap, deleteMutations,
null);
+                            indexMutations.addAll(deleteMutations);
+                        }
                     }
                 } catch (SQLException e) {
                     throw new IllegalDataException(e);
@@ -676,32 +680,32 @@ public class MutationState implements SQLCloseable {
         };
     }
         
-	/**
-	 * Validates that the meta data is valid against the server meta data if we haven't yet
done so.
-	 * Otherwise, for every UPSERT VALUES call, we'd need to hit the server to see if the meta
data
-	 * has changed.
-	 * @param connection
-	 * @return the server time to use for the upsert
-	 * @throws SQLException if the table or any columns no longer exist
-	 */
-	private long[] validateAll() throws SQLException {
-	    int i = 0;
-	    long[] timeStamps = new long[this.mutations.size()];
-	    for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry
: mutations.entrySet()) {
-	        TableRef tableRef = entry.getKey();
-	        timeStamps[i++] = validate(tableRef, entry.getValue());
-	    }
-	    return timeStamps;
-	}
-	
-	private long validate(TableRef tableRef, Map<ImmutableBytesPtr, RowMutationState>
rowKeyToColumnMap) throws SQLException {
-	    Long scn = connection.getSCN();
-	    MetaDataClient client = new MetaDataClient(connection);
-	    long serverTimeStamp = tableRef.getTimeStamp();
-	    // If we're auto committing, we've already validated the schema when we got the ColumnResolver,
-	    // so no need to do it again here.
-	    if (!connection.getAutoCommit()) {
-	        PTable table = tableRef.getTable();
+    /**
+     * Validates that the meta data is valid against the server meta data if we haven't yet
done so.
+     * Otherwise, for every UPSERT VALUES call, we'd need to hit the server to see if the
meta data
+     * has changed.
+     * @param connection
+     * @return the server time to use for the upsert
+     * @throws SQLException if the table or any columns no longer exist
+     */
+    private long[] validateAll() throws SQLException {
+        int i = 0;
+        long[] timeStamps = new long[this.mutations.size()];
+        for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry
: mutations.entrySet()) {
+            TableRef tableRef = entry.getKey();
+            timeStamps[i++] = validate(tableRef, entry.getValue());
+        }
+        return timeStamps;
+    }
+    
+    private long validate(TableRef tableRef, Map<ImmutableBytesPtr, RowMutationState>
rowKeyToColumnMap) throws SQLException {
+        Long scn = connection.getSCN();
+        MetaDataClient client = new MetaDataClient(connection);
+        long serverTimeStamp = tableRef.getTimeStamp();
+        // If we're auto committing, we've already validated the schema when we got the ColumnResolver,
+        // so no need to do it again here.
+        if (!connection.getAutoCommit()) {
+            PTable table = tableRef.getTable();
             MetaDataMutationResult result = client.updateCache(table.getSchemaName().getString(),
table.getTableName().getString());
             PTable resolvedTable = result.getTable();
             if (resolvedTable == null) {
@@ -717,14 +721,14 @@ public class MutationState implements SQLCloseable {
                     // TODO: use bitset?
                     PColumn[] columns = new PColumn[resolvedTable.getColumns().size()];
                     for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : rowKeyToColumnMap.entrySet())
{
-                    	RowMutationState valueEntry = rowEntry.getValue();
+                        RowMutationState valueEntry = rowEntry.getValue();
                         if (valueEntry != null) {
-                        	Map<PColumn, byte[]> colValues = valueEntry.getColumnValues();
-                        	if (colValues != PRow.DELETE_MARKER) {
+                            Map<PColumn, byte[]> colValues = valueEntry.getColumnValues();
+                            if (colValues != PRow.DELETE_MARKER) {
                                 for (PColumn column : colValues.keySet()) {
                                     columns[column.getPosition()] = column;
                                 }
-                        	}
+                            }
                         }
                     }
                     for (PColumn column : columns) {
@@ -735,8 +739,8 @@ public class MutationState implements SQLCloseable {
                 }
             }
         }
-	    return scn == null ? serverTimeStamp == QueryConstants.UNSET_TIMESTAMP ? HConstants.LATEST_TIMESTAMP
: serverTimeStamp : scn;
-	}
+        return scn == null ? serverTimeStamp == QueryConstants.UNSET_TIMESTAMP ? HConstants.LATEST_TIMESTAMP
: serverTimeStamp : scn;
+    }
     
     private static long calculateMutationSize(List<Mutation> mutations) {
         long byteSize = 0;
@@ -845,74 +849,74 @@ public class MutationState implements SQLCloseable {
         // add tracing for this operation
         try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to
tables")) {
             Span span = trace.getSpan();
-	        ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable();
-	        boolean isTransactional;
-	        while (tableRefIterator.hasNext()) {
-	        	// at this point we are going through mutations for each table
-	            final TableRef tableRef = tableRefIterator.next();
-	            valuesMap = mutations.get(tableRef);
-	            if (valuesMap == null || valuesMap.isEmpty()) {
-	                continue;
-	            }
+            ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable();
+            boolean isTransactional;
+            while (tableRefIterator.hasNext()) {
+                // at this point we are going through mutations for each table
+                final TableRef tableRef = tableRefIterator.next();
+                valuesMap = mutations.get(tableRef);
+                if (valuesMap == null || valuesMap.isEmpty()) {
+                    continue;
+                }
                 // Validate as we go if transactional since we can undo if a problem occurs
(which is unlikely)
                 long serverTimestamp = serverTimeStamps == null ? validate(tableRef, valuesMap)
: serverTimeStamps[i++];
-	            final PTable table = tableRef.getTable();
-	            // Track tables to which we've sent uncommitted data
-	            if (isTransactional = table.isTransactional()) {
-	                txTableRefs.add(tableRef);
-	                uncommittedPhysicalNames.add(table.getPhysicalName().getString());
-	            }
-	            boolean isDataTable = true;
+                final PTable table = tableRef.getTable();
+                // Track tables to which we've sent uncommitted data
+                if (isTransactional = table.isTransactional()) {
+                    txTableRefs.add(tableRef);
+                    uncommittedPhysicalNames.add(table.getPhysicalName().getString());
+                }
+                boolean isDataTable = true;
                 table.getIndexMaintainers(indexMetaDataPtr, connection);
-	            Iterator<Pair<byte[],List<Mutation>>> mutationsIterator =
addRowMutations(tableRef, valuesMap, serverTimestamp, false, sendAll);
-	            while (mutationsIterator.hasNext()) {
-	                Pair<byte[],List<Mutation>> pair = mutationsIterator.next();
-	                byte[] htableName = pair.getFirst();
-	                List<Mutation> mutationList = pair.getSecond();
-	                
-	                //create a span per target table
-	                //TODO maybe we can be smarter about the table name to string here?
-	                Span child = Tracing.child(span,"Writing mutation batch for table: "+Bytes.toString(htableName));
-	
-	                int retryCount = 0;
-	                boolean shouldRetry = false;
-	                do {
-	                    final ServerCache cache = isDataTable ? setMetaDataOnMutations(tableRef,
mutationList, indexMetaDataPtr) : null;
-	                
-	                    // If we haven't retried yet, retry for this case only, as it's possible
that
-	                    // a split will occur after we send the index metadata cache to all
known
-	                    // region servers.
-	                    shouldRetry = cache != null;
-	                    SQLException sqlE = null;
-	                    HTableInterface hTable = connection.getQueryServices().getTable(htableName);
-	                    try {
-	                        if (isTransactional) {
-	                            // If we have indexes, wrap the HTable in a delegate HTable
that
-	                            // will attach the necessary index meta data in the event of
a
-	                            // rollback
-	                            if (!table.getIndexes().isEmpty()) {
-	                                hTable = new MetaDataAwareHTable(hTable, tableRef);
-	                            }
-	                            TransactionAwareHTable txnAware = TransactionUtil.getTransactionAwareHTable(hTable,
table);
-	                            // Don't add immutable indexes (those are the only ones that
would participate
-	                            // during a commit), as we don't need conflict detection for
these.
-	                            if (isDataTable) {
-	                                // Even for immutable, we need to do this so that an abort
has the state
-	                                // necessary to generate the rows to delete.
-	                                addTransactionParticipant(txnAware);
-	                            } else {
-	                                txnAware.startTx(getTransaction());
-	                            }
-	                            hTable = txnAware;
-	                        }
-	                        long numMutations = mutationList.size();
+                Iterator<Pair<byte[],List<Mutation>>> mutationsIterator
= addRowMutations(tableRef, valuesMap, serverTimestamp, false, sendAll);
+                while (mutationsIterator.hasNext()) {
+                    Pair<byte[],List<Mutation>> pair = mutationsIterator.next();
+                    byte[] htableName = pair.getFirst();
+                    List<Mutation> mutationList = pair.getSecond();
+                    
+                    //create a span per target table
+                    //TODO maybe we can be smarter about the table name to string here?
+                    Span child = Tracing.child(span,"Writing mutation batch for table: "+Bytes.toString(htableName));
+    
+                    int retryCount = 0;
+                    boolean shouldRetry = false;
+                    do {
+                        final ServerCache cache = isDataTable ? setMetaDataOnMutations(tableRef,
mutationList, indexMetaDataPtr) : null;
+                    
+                        // If we haven't retried yet, retry for this case only, as it's possible
that
+                        // a split will occur after we send the index metadata cache to all
known
+                        // region servers.
+                        shouldRetry = cache != null;
+                        SQLException sqlE = null;
+                        HTableInterface hTable = connection.getQueryServices().getTable(htableName);
+                        try {
+                            if (isTransactional) {
+                                // If we have indexes, wrap the HTable in a delegate HTable
that
+                                // will attach the necessary index meta data in the event
of a
+                                // rollback
+                                if (!table.getIndexes().isEmpty()) {
+                                    hTable = new MetaDataAwareHTable(hTable, tableRef);
+                                }
+                                TransactionAwareHTable txnAware = TransactionUtil.getTransactionAwareHTable(hTable,
table);
+                                // Don't add immutable indexes (those are the only ones that
would participate
+                                // during a commit), as we don't need conflict detection
for these.
+                                if (isDataTable) {
+                                    // Even for immutable, we need to do this so that an
abort has the state
+                                    // necessary to generate the rows to delete.
+                                    addTransactionParticipant(txnAware);
+                                } else {
+                                    txnAware.startTx(getTransaction());
+                                }
+                                hTable = txnAware;
+                            }
+                            long numMutations = mutationList.size();
                             GLOBAL_MUTATION_BATCH_SIZE.update(numMutations);
                             
                             long startTime = System.currentTimeMillis();
                             child.addTimelineAnnotation("Attempt " + retryCount);
-	                        hTable.batch(mutationList);
-	                        child.stop();
-	                        child.stop();
+                            hTable.batch(mutationList);
+                            child.stop();
+                            child.stop();
                             shouldRetry = false;
                             long mutationCommitTime = System.currentTimeMillis() - startTime;
                             GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime);
@@ -920,80 +924,80 @@ public class MutationState implements SQLCloseable {
                             long mutationSizeBytes = calculateMutationSize(mutationList);
                             MutationMetric mutationsMetric = new MutationMetric(numMutations,
mutationSizeBytes, mutationCommitTime);
                             mutationMetricQueue.addMetricsForTable(Bytes.toString(htableName),
mutationsMetric);
-	                    } catch (Exception e) {
-	                        SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e);
-	                        if (inferredE != null) {
-	                            if (shouldRetry && retryCount == 0 && inferredE.getErrorCode()
== SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
-	                                // Swallow this exception once, as it's possible that we
split after sending the index metadata
-	                                // and one of the region servers doesn't have it. This will
cause it to have it the next go around.
-	                                // If it fails again, we don't retry.
-	                                String msg = "Swallowing exception and retrying after clearing
meta cache on connection. " + inferredE;
-	                                logger.warn(LogUtil.addCustomAnnotations(msg, connection));
-	                                connection.getQueryServices().clearTableRegionCache(htableName);
-	
-	                                // add a new child span as this one failed
-	                                child.addTimelineAnnotation(msg);
-	                                child.stop();
-	                                child = Tracing.child(span,"Failed batch, attempting retry");
-	
-	                                continue;
-	                            }
-	                            e = inferredE;
-	                        }
-	                        // Throw to client an exception that indicates the statements that
-	                        // were not committed successfully.
-	                        sqlE = new CommitException(e, getUncommittedStatementIndexes());
-	                    } finally {
-	                        try {
-	                            if (cache != null) {
-	                                cache.close();
-	                            }
-	                        } finally {
-	                            try {
-	                                hTable.close();
-	                            } 
-	                            catch (IOException e) {
-	                                if (sqlE != null) {
-	                                    sqlE.setNextException(ServerUtil.parseServerException(e));
-	                                } else {
-	                                    sqlE = ServerUtil.parseServerException(e);
-	                                }
-	                            } 
-	                            if (sqlE != null) {
-	                                throw sqlE;
-	                            }
-	                        }
-	                    }
-	                } while (shouldRetry && retryCount++ < 1);
-	                isDataTable = false;
-	            }
-	            if (tableRef.getTable().getType() != PTableType.INDEX) {
-	                numRows -= valuesMap.size();
-	            }
-	            // For transactions, track the statement indexes as we send data
-	            // over because our CommitException should include all statements
-	            // involved in the transaction since none of them would have been
-	            // committed in the event of a failure.
-	            if (isTransactional) {
-	                addUncommittedStatementIndexes(valuesMap.values());
-	                if (txMutations == null) {
-	                    txMutations = Maps.newHashMapWithExpectedSize(mutations.size());
-	                }
-	                // Keep all mutations we've encountered until a commit or rollback.
-	                // This is not ideal, but there's not good way to get the values back
-	                // in the event that we need to replay the commit.
-	                txMutations.put(tableRef, valuesMap);
-	            }
+                        } catch (Exception e) {
+                            SQLException inferredE = ServerUtil.parseServerExceptionOrNull(e);
+                            if (inferredE != null) {
+                                if (shouldRetry && retryCount == 0 && inferredE.getErrorCode()
== SQLExceptionCode.INDEX_METADATA_NOT_FOUND.getErrorCode()) {
+                                    // Swallow this exception once, as it's possible that
we split after sending the index metadata
+                                    // and one of the region servers doesn't have it. This
will cause it to have it the next go around.
+                                    // If it fails again, we don't retry.
+                                    String msg = "Swallowing exception and retrying after
clearing meta cache on connection. " + inferredE;
+                                    logger.warn(LogUtil.addCustomAnnotations(msg, connection));
+                                    connection.getQueryServices().clearTableRegionCache(htableName);
+    
+                                    // add a new child span as this one failed
+                                    child.addTimelineAnnotation(msg);
+                                    child.stop();
+                                    child = Tracing.child(span,"Failed batch, attempting
retry");
+    
+                                    continue;
+                                }
+                                e = inferredE;
+                            }
+                            // Throw to client an exception that indicates the statements
that
+                            // were not committed successfully.
+                            sqlE = new CommitException(e, getUncommittedStatementIndexes());
+                        } finally {
+                            try {
+                                if (cache != null) {
+                                    cache.close();
+                                }
+                            } finally {
+                                try {
+                                    hTable.close();
+                                } 
+                                catch (IOException e) {
+                                    if (sqlE != null) {
+                                        sqlE.setNextException(ServerUtil.parseServerException(e));
+                                    } else {
+                                        sqlE = ServerUtil.parseServerException(e);
+                                    }
+                                } 
+                                if (sqlE != null) {
+                                    throw sqlE;
+                                }
+                            }
+                        }
+                    } while (shouldRetry && retryCount++ < 1);
+                    isDataTable = false;
+                }
+                if (tableRef.getTable().getType() != PTableType.INDEX) {
+                    numRows -= valuesMap.size();
+                }
+                // For transactions, track the statement indexes as we send data
+                // over because our CommitException should include all statements
+                // involved in the transaction since none of them would have been
+                // committed in the event of a failure.
+                if (isTransactional) {
+                    addUncommittedStatementIndexes(valuesMap.values());
+                    if (txMutations == null) {
+                        txMutations = Maps.newHashMapWithExpectedSize(mutations.size());
+                    }
+                    // Keep all mutations we've encountered until a commit or rollback.
+                    // This is not ideal, but there's not good way to get the values back
+                    // in the event that we need to replay the commit.
+                    txMutations.put(tableRef, valuesMap);
+                }
                 // Remove batches as we process them
-	            if (sendAll) {
-	                // Iterating through map key set in this case, so we cannot use
-	                // the remove method without getting a concurrent modification
-	                // exception.
-	            	tableRefIterator.remove();
-	            } else {
-	            	mutations.remove(tableRef);
-	            }
-	        }
+                if (sendAll) {
+                    // Iterating through map key set in this case, so we cannot use
+                    // the remove method without getting a concurrent modification
+                    // exception.
+                    tableRefIterator.remove();
+                } else {
+                    mutations.remove(tableRef);
+                }
+            }
         }
     }
 
@@ -1006,7 +1010,7 @@ public class MutationState implements SQLCloseable {
     }
     
     public static Transaction decodeTransaction(byte[] txnBytes) throws IOException {
-    	return (txnBytes == null || txnBytes.length==0) ? null : CODEC.decode(txnBytes);
+        return (txnBytes == null || txnBytes.length==0) ? null : CODEC.decode(txnBytes);
     }
 
     private ServerCache setMetaDataOnMutations(TableRef tableRef, List<? extends Mutation>
mutations,
@@ -1059,10 +1063,10 @@ public class MutationState implements SQLCloseable {
     }
     
     private int[] getUncommittedStatementIndexes() {
-    	for (Map<ImmutableBytesPtr, RowMutationState> rowMutationMap : mutations.values())
{
-    	    addUncommittedStatementIndexes(rowMutationMap.values());
-    	}
-    	return uncommittedStatementIndexes;
+        for (Map<ImmutableBytesPtr, RowMutationState> rowMutationMap : mutations.values())
{
+            addUncommittedStatementIndexes(rowMutationMap.values());
+        }
+        return uncommittedStatementIndexes;
     }
     
     @Override
@@ -1101,9 +1105,9 @@ public class MutationState implements SQLCloseable {
         Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations =
Collections.emptyMap();
         int retryCount = 0;
         do {
-        	boolean sendSuccessful=false;
-        	boolean retryCommit = false;
-        	SQLException sqlE = null;
+            boolean sendSuccessful=false;
+            boolean retryCommit = false;
+            SQLException sqlE = null;
             try {
                 send();
                 txMutations = this.txMutations;
@@ -1121,7 +1125,8 @@ public class MutationState implements SQLCloseable {
                                 finishSuccessful = true;
                             }
                         } catch (TransactionFailureException e) {
-                            retryCommit = (e instanceof TransactionConflictException &&
retryCount == 0);
+                            if (logger.isInfoEnabled()) logger.info(e.getClass().getName()
+ " at timestamp " + getInitialWritePointer() + " with retry count of " + retryCount);
+                            retryCommit = (e instanceof TransactionConflictException &&
retryCount < MAX_COMMIT_RETRIES);
                             txFailure = e;
                             SQLException nextE = TransactionUtil.getTransactionFailureException(e);
                             if (sqlE == null) {
@@ -1134,7 +1139,9 @@ public class MutationState implements SQLCloseable {
                             if (!finishSuccessful) {
                                 try {
                                     txContext.abort(txFailure);
+                                    if (logger.isInfoEnabled()) logger.info("Abort successful");
                                 } catch (TransactionFailureException e) {
+                                    if (logger.isInfoEnabled()) logger.info("Abort failed
with " + e);
                                     SQLException nextE = TransactionUtil.getTransactionFailureException(e);
                                     if (sqlE == null) {
                                         sqlE = nextE;
@@ -1151,8 +1158,15 @@ public class MutationState implements SQLCloseable {
                     } finally {
                         if (retryCommit) {
                             startTransaction();
+                            // Add back read fences
+                            Set<TableRef> txTableRefs = txMutations.keySet();
+                            for (TableRef tableRef : txTableRefs) {
+                                PTable dataTable = tableRef.getTable();
+                                addReadFence(dataTable);
+                            }
                             try {
-                                retryCommit = wasIndexAdded(txMutations.keySet());
+                                // Only retry if an index was added
+                                retryCommit = wasIndexAdded(txTableRefs);
                             } catch (SQLException e) {
                                 retryCommit = false;
                                 if (sqlE == null) {
@@ -1173,7 +1187,9 @@ public class MutationState implements SQLCloseable {
                 break;
             }
             retryCount++;
-            mutations.putAll(txMutations);
+            if (txMutations != null) {
+                mutations.putAll(txMutations);
+            }
         } while (true);
     }
 
@@ -1183,6 +1199,7 @@ public class MutationState implements SQLCloseable {
      * @throws SQLException 
      */
     private boolean wasIndexAdded(Set<TableRef> txTableRefs) throws SQLException {
+        if (logger.isInfoEnabled()) logger.info("Checking for index updates as of "  + getInitialWritePointer());
         MetaDataClient client = new MetaDataClient(connection);
         PMetaData cache = connection.getMetaDataCache();
         boolean addedIndexes = false;
@@ -1198,6 +1215,7 @@ public class MutationState implements SQLCloseable {
                 throw new TableNotFoundException(dataTable.getSchemaName().getString(), dataTable.getTableName().getString());
             }
             if (!result.wasUpdated()) {
+                if (logger.isInfoEnabled()) logger.info("No updates to " + dataTable.getName().getString()
+ " as of "  + timestamp);
                 continue;
             }
             if (!addedIndexes) {
@@ -1205,8 +1223,10 @@ public class MutationState implements SQLCloseable {
                 // that an index was dropped and recreated with the same name but different
                 // indexed/covered columns.
                 addedIndexes = (!oldIndexes.equals(result.getTable().getIndexes()));
+                if (logger.isInfoEnabled()) logger.info((addedIndexes ? "Updates " : "No
updates ") + "as of "  + timestamp + " to " + dataTable.getName().getString() + " with indexes
" + dataTable.getIndexes());
             }
         }
+        if (logger.isInfoEnabled()) logger.info((addedIndexes ? "Updates " : "No updates
") + "to indexes as of "  + getInitialWritePointer());
         return addedIndexes;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/13699371/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index ee212ed..d134f08 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -479,8 +479,8 @@ public class MetaDataClient {
         // Do not make rpc to getTable if 
         // 1. table is a system table
         // 2. table was already resolved as of that timestamp
-		if (table != null && !alwaysHitServer
-				&& (systemTable || resolvedTimestamp == tableResolvedTimestamp || connection.getMetaDataCache().getAge(tableRef)
< table.getUpdateCacheFrequency())) {
+        if (table != null && !alwaysHitServer
+                && (systemTable || resolvedTimestamp == tableResolvedTimestamp ||
connection.getMetaDataCache().getAge(tableRef) < table.getUpdateCacheFrequency())) {
             return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, QueryConstants.UNSET_TIMESTAMP,
table);
         }
 
@@ -1383,6 +1383,7 @@ public class MetaDataClient {
             return new MutationState(0,connection);
         }
 
+        if (logger.isInfoEnabled()) logger.info("Created index " + table.getName().getString()
+ " at " + table.getTimeStamp());
         // In async process, we return immediately as the MR job needs to be triggered .
         if(statement.isAsync()) {
             return new MutationState(0, connection);
@@ -2887,19 +2888,19 @@ public class MetaDataClient {
                     String fullTableName = SchemaUtil.getTableName(schemaName, tableName);
                     long resolvedTimeStamp = TransactionUtil.getResolvedTime(connection,
result);
                     if (table.getIndexes().isEmpty() || (numPkColumnsAdded==0 &&
!nonTxToTx)) {
-						connection.addColumn(
-								tenantId,
-								fullTableName,
-								columns,
-								result.getMutationTime(),
-								seqNum,
-								isImmutableRows == null ? table.isImmutableRows() : isImmutableRows,
-								disableWAL == null ? table.isWALDisabled() : disableWAL,
-								multiTenant == null ? table.isMultiTenant() : multiTenant,
-								storeNulls == null ? table.getStoreNulls() : storeNulls, 
-								isTransactional == null ? table.isTransactional() : isTransactional,
-								updateCacheFrequency == null ? table.getUpdateCacheFrequency() : updateCacheFrequency,
-								resolvedTimeStamp);
+                        connection.addColumn(
+                                tenantId,
+                                fullTableName,
+                                columns,
+                                result.getMutationTime(),
+                                seqNum,
+                                isImmutableRows == null ? table.isImmutableRows() : isImmutableRows,
+                                disableWAL == null ? table.isWALDisabled() : disableWAL,
+                                multiTenant == null ? table.isMultiTenant() : multiTenant,
+                                storeNulls == null ? table.getStoreNulls() : storeNulls,

+                                isTransactional == null ? table.isTransactional() : isTransactional,
+                                updateCacheFrequency == null ? table.getUpdateCacheFrequency()
: updateCacheFrequency,
+                                resolvedTimeStamp);
                     } else if (updateCacheFrequency != null) {
                         // Force removal from cache as the update cache frequency has changed
                         // Note that clients outside this JVM won't be affected.


Mime
View raw message