trafodion-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sand...@apache.org
Subject [1/2] incubator-trafodion git commit: [Trafodion-2608] Skips file flush in case of insert drop same transaction.
Date Fri, 12 May 2017 20:45:12 GMT
Repository: incubator-trafodion
Updated Branches:
  refs/heads/master 815d03329 -> 1767649c4


[Trafodion-2608] Skips file flush in case of insert drop same transaction.


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/fa55b836
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/fa55b836
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/fa55b836

Branch: refs/heads/master
Commit: fa55b83604fee5c0578e538e6e9fff1288c851dd
Parents: bf3e8d0
Author: Prashant Vasudev <prashanth.vasudev@esgyn.com>
Authored: Thu May 11 20:33:34 2017 +0000
Committer: Prashant Vasudev <prashanth.vasudev@esgyn.com>
Committed: Thu May 11 20:33:34 2017 +0000

----------------------------------------------------------------------
 .../transactional/SplitBalanceHelper.java       |  3 +-
 .../transactional/TrxRegionObserver.java.tmpl   | 34 +++++++++++++++++++-
 2 files changed, 34 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/fa55b836/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/SplitBalanceHelper.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/SplitBalanceHelper.java
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/SplitBalanceHelper.java
index e30f01e..989415b 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/SplitBalanceHelper.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/SplitBalanceHelper.java
@@ -244,8 +244,7 @@ public class SplitBalanceHelper {
                 }
             }
             // Reaching here means pendingListClear.
-            if (LOG.isDebugEnabled())
-                LOG.debug("pendingListClear is true because dropTableRecorded is true " +
hri.getRegionNameAsString());
+            LOG.info("pendingListClear is true because dropTableRecorded is true " + hri.getRegionNameAsString());
             return true;
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/fa55b836/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionObserver.java.tmpl
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionObserver.java.tmpl
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionObserver.java.tmpl
index 6d944c4..445d2f6 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionObserver.java.tmpl
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionObserver.java.tmpl
@@ -142,6 +142,7 @@ private ConcurrentHashMap<Long,TransactionalRegionScannerHolder>
scanners =
 static ConcurrentHashMap<String, Object> trxRegionMap;
 
 private ConcurrentHashMap<String, TrxTransactionState> transactionsById = new ConcurrentHashMap<String,
TrxTransactionState>();
+private SortedMap<Long, TrxTransactionState> commitedTransactionsBySequenceNumber =
Collections.synchronizedSortedMap(new TreeMap<Long, TrxTransactionState>());
 private Set<TrxTransactionState> commitPendingTransactions = Collections.synchronizedSet(new
HashSet<TrxTransactionState>());
 private AtomicBoolean blockAll = new AtomicBoolean(false);
 private AtomicBoolean blockNonPhase2 = new AtomicBoolean(false);
@@ -278,6 +279,17 @@ public void start(CoprocessorEnvironment e) throws IOException {
    }
 
    @SuppressWarnings("unchecked")
+   SortedMap<Long, TrxTransactionState> commitedTransactionsBySequenceNumberCheck =
(SortedMap<Long, TrxTransactionState>)
+                                                                       transactionsRefMap
+                                                                       .get(lv_regionName+trxkeycommitedTransactionsBySequenceNumber);
+   if(commitedTransactionsBySequenceNumberCheck != null) {
+       this.commitedTransactionsBySequenceNumber = commitedTransactionsBySequenceNumberCheck;
+   }
+   else {
+       transactionsRefMap.put(lv_regionName+trxkeycommitedTransactionsBySequenceNumber, this.commitedTransactionsBySequenceNumber);
+   }
+   
+   @SuppressWarnings("unchecked")
    Set<TrxTransactionState> commitPendingTransactionsCheck = (Set<TrxTransactionState>)transactionsRefMap
                                                           .get(lv_regionName+trxkeycommitPendingTransactions);
    if(commitPendingTransactionsCheck != null) {
@@ -774,6 +786,11 @@ public void createRecoveryzNode(int node, String encodedName, byte []
data) thro
 
     @Override
     public void    preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean
abortRequested) {
+    
+        LOG.info("preClose - " + regionInfo.getRegionNameAsString() + " - transactionsById
(" + transactionsById.size()
+                        + " ), committed Transactions (" + commitedTransactionsBySequenceNumber.size()
+                        +"), scanners (" + scanners.size() + ")");
+                        
         if (hasFlushed ||
             hasClosed ||
             splitDelayNoFlush ||
@@ -782,7 +799,13 @@ public void createRecoveryzNode(int node, String encodedName, byte []
data) thro
             return;
 
         if(LOG.isTraceEnabled()) LOG.trace("preClose -- commitPendingTransactions (" + commitPendingTransactions.size()
+")");
-
+        
+        //This flag indicates if pendingAndScannersWait call completed successfully.
+        //Setting this flag to true indicates there are no pending scanners or 
+        //commitPendingList outstanding. OR commitPendingList may have transactions
+        //in commitPending that have the table marked for drop in the same transaction.
+        boolean commitAndScannersChecked = false; 
+            
         if (!hasClosed) {
                 blockNonPhase2.set(true);
 
@@ -796,6 +819,7 @@ public void createRecoveryzNode(int node, String encodedName, byte []
data) thro
 	        }
 	        try {
                  sbHelper.pendingAndScannersWait(commitPendingTransactions, scanners, transactionsById,
pendingDelayLen);
+                 commitAndScannersChecked = true;
 	        } catch(IOException ioe) {
 	          LOG.error("Encountered exception when calling pendingAndScannersWait(): " + ioe);
 	        }
@@ -808,6 +832,13 @@ public void createRecoveryzNode(int node, String encodedName, byte []
data) thro
             c.getEnvironment().getRegionServerServices().isStopped() ||
             hasFlushed)
             return;
+        
+        if (((transactionsById.size() <= 0) && (commitPendingTransactions.size()
<= 0)) ||
+             commitAndScannersChecked ) {
+            hasFlushed = true;
+            commitedTransactionsBySequenceNumber.clear();
+            return;
+        }
 
         @SuppressWarnings("rawtypes")
         TrxRegionEndpoint tre = (TrxRegionEndpoint)trxRegionMap.get(regionInfo.getRegionNameAsString()+TrxRegionEndpoint.trxkeyEPCPinstance);
@@ -825,6 +856,7 @@ public void createRecoveryzNode(int node, String encodedName, byte []
data) thro
               }
             }
             hasFlushed = true;
+            commitedTransactionsBySequenceNumber.clear();
           } catch (IOException ioe) {
             if (LOG.isErrorEnabled()) LOG.error("Unable to flush to filesystem");
             hasFlushed = false;


Mime
View raw message