incubator-accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1295255 - in /incubator/accumulo/branches/1.4/src/server/src: main/java/org/apache/accumulo/server/master/ main/java/org/apache/accumulo/server/master/state/ test/java/org/apache/accumulo/server/master/
Date Wed, 29 Feb 2012 20:37:50 GMT
Author: kturner
Date: Wed Feb 29 20:37:49 2012
New Revision: 1295255

URL: http://svn.apache.org/viewvc?rev=1295255&view=rev
Log:
ACCUMULO-436 
 * made merge resiliant to process death in middle of metadata updates
 * made delete row removed chopped flag from last tablet
 * added sanity check in getHighTablet()

Modified:
    incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
    incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/MergeStats.java
    incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/master/TestMergeState.java

Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1295255&r1=1295254&r2=1295255&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
(original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
Wed Feb 29 20:37:49 2012
@@ -41,11 +41,11 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchDeleter;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.RowIterator;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
@@ -1506,57 +1506,36 @@ public class Master implements LiveTServ
     private void updateMergeState(Map<Text,MergeStats> mergeStatsCache) {
       for (MergeStats stats : mergeStatsCache.values()) {
         try {
-          MergeState update = stats.nextMergeState();
-          if (update == MergeState.MERGING) {
-            if (mergeStarted(stats.getMergeInfo()) || stats.verifyMergeConsistency(getConnector(),
Master.this)) {
-              try {
-                if (stats.getMergeInfo().isDelete()) {
-                  deleteTablets(stats.getMergeInfo());
-                } else {
-                  mergeMetadataRecords(stats.getMergeInfo());
-                }
-                setMergeState(stats.getMergeInfo(), update = MergeState.COMPLETE);
-              } catch (Exception ex) {
-                log.error("Unable merge metadata table records", ex);
-              }
-            }
-          }
+          MergeState update = stats.nextMergeState(getConnector(), Master.this);
+          
+          // when next state is MERGING, its important to persist this before
+          // starting the merge... the verification check that is done before
+          // moving into the merging state could fail if merge starts but does
+          // not finish
           if (update == MergeState.COMPLETE)
             update = MergeState.NONE;
           if (update != stats.getMergeInfo().getState()) {
             setMergeState(stats.getMergeInfo(), update);
           }
+
+          if (update == MergeState.MERGING) {
+            try {
+              if (stats.getMergeInfo().isDelete()) {
+                deleteTablets(stats.getMergeInfo());
+              } else {
+                mergeMetadataRecords(stats.getMergeInfo());
+              }
+              setMergeState(stats.getMergeInfo(), update = MergeState.COMPLETE);
+            } catch (Exception ex) {
+              log.error("Unable merge metadata table records", ex);
+            }
+          }
         } catch (Exception ex) {
           log.error("Unable to update merge state for merge " + stats.getMergeInfo().getRange(),
ex);
         }
       }
     }
 
-    /**
-     * Determine if a merge has been started, and was interrupted and needs to be completed.
-     * 
-     * @param mergeInfo
-     * @return
-     * @throws AccumuloException
-     */
-    private boolean mergeStarted(MergeInfo mergeInfo) throws AccumuloException {
-      KeyExtent merge = mergeInfo.getRange();
-      KeyExtent highTablet = getHighTablet(merge);
-      // merge specifies -inf
-      if (merge.getPrevEndRow() == null) {
-        // lasttablet must start at -inf
-        return highTablet.getPrevEndRow() == null;
-      }
-      // upper end of merge is set -inf
-      if (highTablet.getPrevEndRow() == null) {
-        // nothing can come before this: merge is started
-        return true;
-      }
-      // if prevRow of last tablet is <= merge start, merge has started
-      Text mergeStart = merge.getPrevEndRow();
-      return highTablet.getPrevEndRow().compareTo(mergeStart.getBytes(), 0, mergeStart.getLength())
<= 0;
-    }
-
     private void deleteTablets(MergeInfo info) throws AccumuloException {
       KeyExtent range = info.getRange();
       log.debug("Deleting tablets for " + range);
@@ -1604,18 +1583,21 @@ public class Master implements LiveTServ
           }
         }
         MetadataTable.addDeleteEntries(range, datafiles, SecurityConstants.getSystemCredentials());
-        log.debug("Removing metadata table entries in range " + deleteRange);
-        BatchDeleter bd = conn.createBatchDeleter(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS,
4, 100000l, 1000l, 4);
-        bd.setRanges(Collections.singleton(deleteRange));
-        bd.delete();
-        bd.close();
         
+        BatchWriter bw = conn.createBatchWriter(Constants.METADATA_TABLE_NAME, 1000000l,
100l, 1);
+        try {
+          deleteTablets(deleteRange, bw, conn);
+        } finally {
+          bw.close();
+        }
+
         if (followingTablet != null) {
           log.debug("Updating prevRow of " + followingTablet + " to " + range.getPrevEndRow());
-          BatchWriter bw = conn.createBatchWriter(Constants.METADATA_TABLE_NAME, 1000l, 100l,
1);
+          bw = conn.createBatchWriter(Constants.METADATA_TABLE_NAME, 1000l, 100l, 1);
           try {
             Mutation m = new Mutation(followingTablet.getMetadataEntry());
             ColumnFQ.put(m, Constants.METADATA_PREV_ROW_COLUMN, KeyExtent.encodePrevEndRow(range.getPrevEndRow()));
+            ColumnFQ.putDelete(m, Constants.METADATA_CHOPPED_COLUMN);
             bw.addMutation(m);
             bw.flush();
           } finally {
@@ -1643,7 +1625,7 @@ public class Master implements LiveTServ
       if (start == null) {
         start = new Text();
       }
-      Range scanRange = new Range(KeyExtent.getMetadataEntry(range.getTableId(), start),
false, stopRow, true);
+      Range scanRange = new Range(KeyExtent.getMetadataEntry(range.getTableId(), start),
false, stopRow, false);
       BatchWriter bw = null;
       try {
         long fileCount = 0;
@@ -1654,14 +1636,13 @@ public class Master implements LiveTServ
         scanner.setRange(scanRange);
         ColumnFQ.fetch(scanner, Constants.METADATA_PREV_ROW_COLUMN);
         ColumnFQ.fetch(scanner, Constants.METADATA_TIME_COLUMN);
+        ColumnFQ.fetch(scanner, Constants.METADATA_DIRECTORY_COLUMN);
         scanner.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY);
         Mutation m = new Mutation(stopRow);
         String maxLogicalTime = null;
         for (Entry<Key,Value> entry : scanner) {
           Key key = entry.getKey();
           Value value = entry.getValue();
-          if (key.getRow().equals(stopRow))
-            break;
           if (key.getColumnFamily().equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) {
             m.put(key.getColumnFamily(), key.getColumnQualifier(), value);
             fileCount++;
@@ -1670,6 +1651,8 @@ public class Master implements LiveTServ
             firstPrevRowValue = new Value(value);
           } else if (Constants.METADATA_TIME_COLUMN.hasColumns(key)) {
             maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, value.toString());
+          } else if (Constants.METADATA_DIRECTORY_COLUMN.hasColumns(key)) {
+            bw.addMutation(MetadataTable.createDeleteMutation(range.getTableId().toString(),
entry.getValue().toString()));
           }
         }
         
@@ -1688,8 +1671,10 @@ public class Master implements LiveTServ
         
         if (!m.getUpdates().isEmpty()) {
           bw.addMutation(m);
-          bw.flush();
         }
+        
+        bw.flush();
+
         log.debug("Moved " + fileCount + " files to " + stop);
         
         if (firstPrevRowValue == null) {
@@ -1702,38 +1687,13 @@ public class Master implements LiveTServ
         log.debug("Setting the prevRow for last tablet: " + stop);
         bw.addMutation(updatePrevRow);
         bw.flush();
-        
-        // Delete everything in the other tablets
-        scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
-        log.debug("Scanning range " + scanRange);
-        scanner.setRange(scanRange);
-        for (Entry<Key,Value> entry : scanner) {
-          Key key = entry.getKey();
-          if (key.getRow().equals(stopRow))
-            break;
-          if (Constants.METADATA_DIRECTORY_COLUMN.hasColumns(key)) {
-            bw.addMutation(MetadataTable.createDeleteMutation(range.getTableId().toString(),
entry.getValue().toString()));
-          }
-          
-          // TODO could group by row
-          m = new Mutation(key.getRow());
-          m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
-          log.debug("deleting entry " + key);
-          bw.addMutation(m);
-        }
-        bw.flush();
+
+        deleteTablets(scanRange, bw, conn);
         
         // Clean-up the last chopped marker
-        scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
-        scanner.fetchColumnFamily(Constants.METADATA_CHOPPED_COLUMN_FAMILY);
-        scanner.setRange(new Range(stopRow, stopRow));
-        for (Entry<Key,Value> entry : scanner) {
-          Key key = entry.getKey();
-          m = new Mutation(key.getRow());
-          m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
-          log.debug("deleting entry " + key);
-          bw.addMutation(m);
-        }
+        m = new Mutation(stopRow);
+        ColumnFQ.putDelete(m, Constants.METADATA_CHOPPED_COLUMN);
+        bw.addMutation(m);
         bw.flush();
         
       } catch (Exception ex) {
@@ -1746,6 +1706,36 @@ public class Master implements LiveTServ
         }
       }
     }
+
+    private void deleteTablets(Range scanRange, BatchWriter bw, Connector conn) throws TableNotFoundException,
MutationsRejectedException {
+      Scanner scanner;
+      Mutation m;
+      // Delete everything in the other tablets
+      // group all deletes into tablet into one mutation, this makes tablets
+      // either dissapear entirely or not all.. this is important for the case
+      // where the process terminates in the loop below...
+      scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
+      log.debug("Deleting range " + scanRange);
+      scanner.setRange(scanRange);
+      RowIterator rowIter = new RowIterator(scanner);
+      while (rowIter.hasNext()) {
+        Iterator<Entry<Key,Value>> row = rowIter.next();
+        m = null;
+        while (row.hasNext()) {
+          Entry<Key,Value> entry = row.next();
+          Key key = entry.getKey();
+
+          if (m == null)
+            m = new Mutation(key.getRow());
+          
+          m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
+          log.debug("deleting entry " + key);
+        }
+        bw.addMutation(m);
+      }
+
+      bw.flush();
+    }
     
     private KeyExtent getHighTablet(KeyExtent range) throws AccumuloException {
       try {
@@ -1759,7 +1749,11 @@ public class Master implements LiveTServ
           throw new AccumuloException("No last tablet for a merge " + range);
         }
         Entry<Key,Value> entry = iterator.next();
-        return new KeyExtent(entry.getKey().getRow(), KeyExtent.decodePrevEndRow(entry.getValue()));
+        KeyExtent highTablet = new KeyExtent(entry.getKey().getRow(), KeyExtent.decodePrevEndRow(entry.getValue()));
+        if (highTablet.getTableId() != range.getTableId()) {
+          throw new AccumuloException("No last tablet for merge " + range + " " + highTablet);
+        }
+        return highTablet;
       } catch (Exception ex) {
         throw new AccumuloException("Unexpected failure finding the last tablet for a merge
" + range, ex);
       }

Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/MergeStats.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/MergeStats.java?rev=1295255&r1=1295254&r2=1295255&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/MergeStats.java
(original)
+++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/MergeStats.java
Wed Feb 29 20:37:49 2012
@@ -93,7 +93,7 @@ public class MergeStats {
       this.unassigned++;
   }
   
-  public MergeState nextMergeState() throws Exception {
+  public MergeState nextMergeState(Connector connector, CurrentState master) throws Exception
{
     MergeState state = info.getState();
     if (state == MergeState.NONE)
       return state;
@@ -141,7 +141,10 @@ public class MergeStats {
       } else {
         log.info(chopped + " tablets are chopped, " + unassigned + " are offline " + info.getRange());
         if (unassigned == total && chopped == needsToBeChopped) {
-          state = MergeState.MERGING;
+          if (verifyMergeConsistency(connector, master))
+            state = MergeState.MERGING;
+          else
+            log.info("Merge consistency check failed " + info.getRange());
         } else {
           log.info("Waiting for " + unassigned + " unassigned tablets to be " + total + "
" + info.getRange());
         }
@@ -163,7 +166,7 @@ public class MergeStats {
     return state;
   }
   
-  public boolean verifyMergeConsistency(Connector connector, CurrentState master) throws
TableNotFoundException, IOException {
+  private boolean verifyMergeConsistency(Connector connector, CurrentState master) throws
TableNotFoundException, IOException {
     MergeStats verify = new MergeStats(info);
     Scanner scanner = connector.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
     MetaDataTableScanner.configureScanner(scanner, master);

Modified: incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/master/TestMergeState.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/master/TestMergeState.java?rev=1295255&r1=1295254&r2=1295255&view=diff
==============================================================================
--- incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/master/TestMergeState.java
(original)
+++ incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/master/TestMergeState.java
Wed Feb 29 20:37:49 2012
@@ -132,7 +132,7 @@ public class TestMergeState {
     
     // do the state check
     MergeStats stats = scan(state, metaDataStateStore);
-    MergeState newState = stats.nextMergeState();
+    MergeState newState = stats.nextMergeState(connector, state);
     Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, newState);
     
     // unassign the tablets
@@ -141,12 +141,9 @@ public class TestMergeState {
     deleter.setRanges(Collections.singletonList(new Range()));
     deleter.delete();
     
-    // now we should be ready to merge
+    // now we should be ready to merge but, we have an inconsistent !METADATA table
     stats = scan(state, metaDataStateStore);
-    Assert.assertEquals(MergeState.MERGING, stats.nextMergeState());
-
-    // but, we have an inconsistent !METADATA table, so double check
-    Assert.assertFalse(stats.verifyMergeConsistency(connector, state));
+    Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, stats.nextMergeState(connector, state));
     
     // finish the split
     KeyExtent tablet = new KeyExtent(tableId, new Text("p"), new Text("o"));
@@ -157,7 +154,7 @@ public class TestMergeState {
     
     // onos... there's a new tablet online
     stats = scan(state, metaDataStateStore);
-    Assert.assertEquals(MergeState.WAITING_FOR_CHOPPED, stats.nextMergeState());
+    Assert.assertEquals(MergeState.WAITING_FOR_CHOPPED, stats.nextMergeState(connector, state));
     
     // chop it
     m = tablet.getPrevRowUpdateMutation();
@@ -165,7 +162,7 @@ public class TestMergeState {
     update(connector, m);
 
     stats = scan(state, metaDataStateStore);
-    Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, stats.nextMergeState());
+    Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, stats.nextMergeState(connector, state));
 
     // take it offline
     m = tablet.getPrevRowUpdateMutation();
@@ -174,10 +171,7 @@ public class TestMergeState {
     
     // now we can split
     stats = scan(state, metaDataStateStore);
-    Assert.assertEquals(MergeState.MERGING, stats.nextMergeState());
-
-    // and we have consistent !METADATA table
-    Assert.assertTrue(stats.verifyMergeConsistency(connector, state));
+    Assert.assertEquals(MergeState.MERGING, stats.nextMergeState(connector, state));
 
   }
 



Mime
View raw message