hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From la...@apache.org
Subject svn commit: r1303915 - in /hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver: HRegion.java SplitLogWorker.java
Date Thu, 22 Mar 2012 17:49:32 GMT
Author: larsh
Date: Thu Mar 22 17:49:32 2012
New Revision: 1303915

URL: http://svn.apache.org/viewvc?rev=1303915&view=rev
Log:
HBASE-5542 Unify HRegion.mutateRowsWithLocks() and HRegion.processRow() (Scott Chen)

Modified:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1303915&r1=1303914&r2=1303915&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Thu Mar 22
17:49:32 2012
@@ -49,6 +49,8 @@ import java.util.concurrent.ConcurrentSk
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.ThreadFactory;
@@ -95,7 +97,6 @@ import org.apache.hadoop.hbase.client.Ro
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.coprocessor.Exec;
 import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
-import org.apache.hadoop.hbase.coprocessor.RowProcessor;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
@@ -232,7 +233,11 @@ public class HRegion implements HeapSize
   final Configuration conf;
   final int rowLockWaitDuration;
   static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
-  static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 10 * 1000L;
+
+  // negative number indicates infinite timeout
+  static final long DEFAULT_ROW_PROCESSOR_TIMEOUT = 60 * 1000L;
+  final ExecutorService rowProcessorExecutor = Executors.newCachedThreadPool();
+
   final HRegionInfo regionInfo;
   final Path regiondir;
   KeyValue.KVComparator comparator;
@@ -486,6 +491,10 @@ public class HRegion implements HeapSize
         "hbase.hregion.keyvalue.timestamp.slop.millisecs",
         HConstants.LATEST_TIMESTAMP);
 
+    /**
+     * Timeout for the process time in processRowsWithLocks().
+     * Use -1 to switch off time bound.
+     */
     this.rowProcessorTimeout = conf.getLong(
         "hbase.hregion.row.processor.timeout", DEFAULT_ROW_PROCESSOR_TIMEOUT);
 
@@ -1676,7 +1685,7 @@ public class HRegion implements HeapSize
   /*
    * @param delete The passed delete is modified by this method. WARNING!
    */
-  private void prepareDelete(Delete delete) throws IOException {
+  void prepareDelete(Delete delete) throws IOException {
     // Check to see if this is a deleteRow insert
     if(delete.getFamilyMap().isEmpty()){
       for(byte [] family : this.htableDescriptor.getFamiliesKeys()){
@@ -1748,7 +1757,7 @@ public class HRegion implements HeapSize
    * @param now
    * @throws IOException
    */
-  private void prepareDeleteTimestamps(Delete delete, byte[] byteNow)
+  void prepareDeleteTimestamps(Delete delete, byte[] byteNow)
       throws IOException {
     Map<byte[], List<KeyValue>> familyMap = delete.getFamilyMap();
     for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
@@ -2367,7 +2376,7 @@ public class HRegion implements HeapSize
    * Replaces any KV timestamps set to {@link HConstants#LATEST_TIMESTAMP}
    * with the provided current timestamp.
    */
-  private void updateKVTimestamps(
+  void updateKVTimestamps(
       final Iterable<List<KeyValue>> keyLists, final byte[] now) {
     for (List<KeyValue> keys: keyLists) {
       if (keys == null) continue;
@@ -2591,7 +2600,7 @@ public class HRegion implements HeapSize
    * Check the collection of families for validity.
    * @throws NoSuchColumnFamilyException if a family does not exist.
    */
-  private void checkFamilies(Collection<byte[]> families)
+  void checkFamilies(Collection<byte[]> families)
   throws NoSuchColumnFamilyException {
     for (byte[] family : families) {
       checkFamily(family);
@@ -2601,7 +2610,7 @@ public class HRegion implements HeapSize
     checkTimestamps(p.getFamilyMap(), now);
   }
 
-  private void checkTimestamps(final Map<byte[], List<KeyValue>> familyMap,
+  void checkTimestamps(final Map<byte[], List<KeyValue>> familyMap,
       long now) throws DoNotRetryIOException {
     if (timestampSlop == HConstants.LATEST_TIMESTAMP) {
       return;
@@ -4232,243 +4241,123 @@ public class HRegion implements HeapSize
    */
   public void mutateRowsWithLocks(Collection<Mutation> mutations,
       Collection<byte[]> rowsToLock) throws IOException {
-    boolean flush = false;
-
-    startRegionOperation();
-    List<Integer> acquiredLocks = null;
-    try {
-      // 1. run all pre-hooks before the atomic operation
-      // if any pre hook indicates "bypass", bypass the entire operation
-
-      // one WALEdit is used for all edits.
-      WALEdit walEdit = new WALEdit();
-      if (coprocessorHost != null) {
-        for (Mutation m : mutations) {
-          if (m instanceof Put) {
-            if (coprocessorHost.prePut((Put) m, walEdit, m.getWriteToWAL())) {
-              // by pass everything
-              return;
-            }
-          } else if (m instanceof Delete) {
-            Delete d = (Delete) m;
-            prepareDelete(d);
-            if (coprocessorHost.preDelete(d, walEdit, d.getWriteToWAL())) {
-              // by pass everything
-              return;
-            }
-          }
-        }
-      }
-
-      long txid = 0;
-      boolean walSyncSuccessful = false;
-      boolean locked = false;
-
-      // 2. acquire the row lock(s)
-      acquiredLocks = new ArrayList<Integer>(rowsToLock.size());
-      for (byte[] row : rowsToLock) {
-        // attempt to lock all involved rows, fail if one lock times out
-        Integer lid = getLock(null, row, true);
-        if (lid == null) {
-          throw new IOException("Failed to acquire lock on "
-              + Bytes.toStringBinary(row));
-        }
-        acquiredLocks.add(lid);
-      }
-
-      // 3. acquire the region lock
-      this.updatesLock.readLock().lock();
-      locked = true;
-
-      // 4. Get a mvcc write number
-      MultiVersionConsistencyControl.WriteEntry w = mvcc.beginMemstoreInsert();
-
-      long now = EnvironmentEdgeManager.currentTimeMillis();
-      byte[] byteNow = Bytes.toBytes(now);
-      try {
-        // 5. Check mutations and apply edits to a single WALEdit
-        for (Mutation m : mutations) {
-          if (m instanceof Put) {
-            Map<byte[], List<KeyValue>> familyMap = m.getFamilyMap();
-            checkFamilies(familyMap.keySet());
-            checkTimestamps(familyMap, now);
-            updateKVTimestamps(familyMap.values(), byteNow);
-          } else if (m instanceof Delete) {
-            Delete d = (Delete) m;
-            prepareDelete(d);
-            prepareDeleteTimestamps(d, byteNow);
-          } else {
-            throw new DoNotRetryIOException(
-                "Action must be Put or Delete. But was: "
-                    + m.getClass().getName());
-          }
-          if (m.getWriteToWAL()) {
-            addFamilyMapToWALEdit(m.getFamilyMap(), walEdit);
-          }
-        }
-
-        // 6. append all edits at once (don't sync)
-        if (walEdit.size() > 0) {
-          txid = this.log.appendNoSync(regionInfo,
-              this.htableDescriptor.getName(), walEdit,
-              HConstants.DEFAULT_CLUSTER_ID, now, this.htableDescriptor);
-        }
-
-        // 7. apply to memstore
-        long addedSize = 0;
-        for (Mutation m : mutations) {
-          addedSize += applyFamilyMapToMemstore(m.getFamilyMap(), w);
-        }
-        flush = isFlushSize(this.addAndGetGlobalMemstoreSize(addedSize));
-
-        // 8. release region and row lock(s)
-        this.updatesLock.readLock().unlock();
-        locked = false;
-        if (acquiredLocks != null) {
-          for (Integer lid : acquiredLocks) {
-            releaseRowLock(lid);
-          }
-          acquiredLocks = null;
-        }
-
-        // 9. sync WAL if required
-        if (walEdit.size() > 0 &&
-            (this.regionInfo.isMetaRegion() ||
-             !this.htableDescriptor.isDeferredLogFlush())) {
-          this.log.sync(txid);
-        }
-        walSyncSuccessful = true;
-
-        // 10. advance mvcc
-        mvcc.completeMemstoreInsert(w);
-        w = null;
-
-        // 11. run coprocessor post host hooks
-        // after the WAL is sync'ed and all locks are released
-        // (similar to doMiniBatchPut)
-        if (coprocessorHost != null) {
-          for (Mutation m : mutations) {
-            if (m instanceof Put) {
-              coprocessorHost.postPut((Put) m, walEdit, m.getWriteToWAL());
-            } else if (m instanceof Delete) {
-              coprocessorHost.postDelete((Delete) m, walEdit, m.getWriteToWAL());
-            }
-          }
-        }
-      } finally {
-        // 12. clean up if needed
-        if (!walSyncSuccessful) {
-          int kvsRolledback = 0;
-          for (Mutation m : mutations) {
-            for (Map.Entry<byte[], List<KeyValue>> e : m.getFamilyMap()
-                .entrySet()) {
-              List<KeyValue> kvs = e.getValue();
-              byte[] family = e.getKey();
-              Store store = getStore(family);
-              // roll back each kv
-              for (KeyValue kv : kvs) {
-                store.rollback(kv);
-                kvsRolledback++;
-              }
-            }
-          }
-          LOG.info("mutateRowWithLocks: rolled back " + kvsRolledback
-              + " KeyValues");
-        }
-
-        if (w != null) {
-          mvcc.completeMemstoreInsert(w);
-        }
 
-        if (locked) {
-          this.updatesLock.readLock().unlock();
-        }
+    MultiRowMutationProcessor proc =
+        new MultiRowMutationProcessor(mutations, rowsToLock);
+    processRowsWithLocks(proc, -1);
+  }
 
-        if (acquiredLocks != null) {
-          for (Integer lid : acquiredLocks) {
-            releaseRowLock(lid);
-          }
-        }
-      }
-    } finally {
-      if (flush) {
-        // 13. Flush cache if needed. Do it outside update lock.
-        requestFlush();
-      }
-      closeRegionOperation();
-    }
+  /**
+   * Performs atomic multiple reads and writes on a given row.
+   *
+   * @param processor The object defines the reads and writes to a row.
+   */
+  public void processRowsWithLocks(RowProcessor<?> processor)
+      throws IOException {
+    processRowsWithLocks(processor, rowProcessorTimeout);
   }
 
   /**
    * Performs atomic multiple reads and writes on a given row.
+   *
    * @param processor The object defines the reads and writes to a row.
+   * @param timeout The timeout of the processor.process() execution
+   *                Use a negative number to switch off the time bound
    */
-  public void processRow(RowProcessor<?> processor)
+  public void processRowsWithLocks(RowProcessor<?> processor, long timeout)
       throws IOException {
-    byte[] row = processor.getRow();
-    checkRow(row, "processRow");
+
+    for (byte[] row : processor.getRowsToLock()) {
+      checkRow(row, "processRowsWithLocks");
+    }
     if (!processor.readOnly()) {
       checkReadOnly();
     }
     checkResources();
 
-    MultiVersionConsistencyControl.WriteEntry writeEntry = null;
-
     startRegionOperation();
+    WALEdit walEdit = new WALEdit();
 
+    // 1. Run pre-process hook
+    processor.preProcess(this, walEdit);
+
+    // Short circuit the read only case
+    if (processor.readOnly()) {
+      try {
+        long now = EnvironmentEdgeManager.currentTimeMillis();
+        doProcessRowWithTimeout(
+            processor, now, this, null, null, timeout);
+        processor.postProcess(this, walEdit);
+      } finally {
+        closeRegionOperation();
+      }
+      return;
+    }
+
+    MultiVersionConsistencyControl.WriteEntry writeEntry = null;
     boolean locked = false;
     boolean walSyncSuccessful = false;
-    Integer rowLockID = null;
+    List<Integer> acquiredLocks = null;
     long addedSize = 0;
     List<KeyValue> mutations = new ArrayList<KeyValue>();
+    Collection<byte[]> rowsToLock = processor.getRowsToLock();
     try {
-      // 1. Row lock
-      rowLockID = getLock(null, row, true);
-
-      // 2. Region lock
+      // 2. Acquire the row lock(s)
+      acquiredLocks = new ArrayList<Integer>(rowsToLock.size());
+      for (byte[] row : rowsToLock) {
+        // Attempt to lock all involved rows, fail if one lock times out
+        Integer lid = getLock(null, row, true);
+        if (lid == null) {
+          throw new IOException("Failed to acquire lock on "
+              + Bytes.toStringBinary(row));
+        }
+        acquiredLocks.add(lid);
+      }
+      // 3. Region lock
       this.updatesLock.readLock().lock();
       locked = true;
+
       long now = EnvironmentEdgeManager.currentTimeMillis();
       try {
-        // 3. Let the processor scan the row and generate mutations
-        WALEdit walEdits = new WALEdit();
-        doProcessRowWithTimeout(processor, now, rowScanner, mutations,
-            walEdits, rowProcessorTimeout);
-        if (processor.readOnly() && !mutations.isEmpty()) {
-          throw new IOException(
-              "Processor is readOnly but generating mutations on row:" +
-              Bytes.toStringBinary(row));
-        }
+        // 4. Let the processor scan the rows, generate mutations and add
+        //    waledits
+        doProcessRowWithTimeout(
+            processor, now, this, mutations, walEdit, timeout);
 
         if (!mutations.isEmpty()) {
-          // 4. Get a mvcc write number
+          // 5. Get a mvcc write number
           writeEntry = mvcc.beginMemstoreInsert();
-          // 5. Apply to memstore and a WALEdit
+          // 6. Apply to memstore
           for (KeyValue kv : mutations) {
             kv.setMemstoreTS(writeEntry.getWriteNumber());
-            walEdits.add(kv);
-            addedSize += stores.get(kv.getFamily()).add(kv);
+            byte[] family = kv.getFamily();
+            checkFamily(family);
+            addedSize += stores.get(family).add(kv);
           }
 
           long txid = 0;
-          // 6. Append no sync
-          if (!walEdits.isEmpty()) {
+          // 7. Append no sync
+          if (!walEdit.isEmpty()) {
             txid = this.log.appendNoSync(this.regionInfo,
-                this.htableDescriptor.getName(), walEdits,
+                this.htableDescriptor.getName(), walEdit,
                 processor.getClusterId(), now, this.htableDescriptor);
           }
-          // 7. Release region lock
+          // 8. Release region lock
           if (locked) {
             this.updatesLock.readLock().unlock();
             locked = false;
           }
-          // 8. Release row lock
-          if (rowLockID != null) {
-            releaseRowLock(rowLockID);
-            rowLockID = null;
+          // 9. Release row lock(s)
+          if (acquiredLocks != null) {
+            for (Integer lid : acquiredLocks) {
+              releaseRowLock(lid);
+            }
+            acquiredLocks = null;
           }
-          // 9. Sync edit log
-          if (txid != 0) {
+          // 10. Sync edit log
+          if (txid != 0 &&
+              (this.regionInfo.isMetaRegion() ||
+               !this.htableDescriptor.isDeferredLogFlush())) {
             this.log.sync(txid);
           }
           walSyncSuccessful = true;
@@ -4476,12 +4365,13 @@ public class HRegion implements HeapSize
       } finally {
         if (!mutations.isEmpty() && !walSyncSuccessful) {
           LOG.warn("Wal sync failed. Roll back " + mutations.size() +
-              " memstore keyvalues for row:" + processor.getRow());
+              " memstore keyvalues for row(s):" +
+              processor.getRowsToLock().iterator().next() + "...");
           for (KeyValue kv : mutations) {
             stores.get(kv.getFamily()).rollback(kv);
           }
         }
-        // 10. Roll mvcc forward
+        // 11. Roll mvcc forward
         if (writeEntry != null) {
           mvcc.completeMemstoreInsert(writeEntry);
           writeEntry = null;
@@ -4490,11 +4380,16 @@ public class HRegion implements HeapSize
           this.updatesLock.readLock().unlock();
           locked = false;
         }
-        if (rowLockID != null) {
-          releaseRowLock(rowLockID);
-          rowLockID = null;
+        if (acquiredLocks != null) {
+          for (Integer lid : acquiredLocks) {
+            releaseRowLock(lid);
+          }
         }
       }
+
+      // 12. Run post-process hook
+      processor.postProcess(this, walEdit);
+
     } finally {
       closeRegionOperation();
       if (!mutations.isEmpty() &&
@@ -4506,48 +4401,54 @@ public class HRegion implements HeapSize
 
   private void doProcessRowWithTimeout(final RowProcessor<?> processor,
                                        final long now,
-                                       final RowProcessor.RowScanner scanner,
+                                       final HRegion region,
                                        final List<KeyValue> mutations,
-                                       final WALEdit walEdits,
+                                       final WALEdit walEdit,
                                        final long timeout) throws IOException {
+    // Short circuit the no time bound case.
+    if (timeout < 0) {
+      try {
+        processor.process(now, region, mutations, walEdit);
+      } catch (IOException e) {
+        LOG.warn("RowProcessor:" + processor.getClass().getName() +
+            " throws Exception on row(s):" +
+            Bytes.toStringBinary(
+              processor.getRowsToLock().iterator().next()) + "...", e);
+        throw e;
+      }
+      return;
+    }
+
+    // Case with time bound
     FutureTask<Void> task =
       new FutureTask<Void>(new Callable<Void>() {
         @Override
         public Void call() throws IOException {
-          processor.process(now, scanner, mutations, walEdits);
-          return null;
+          try {
+            processor.process(now, region, mutations, walEdit);
+            return null;
+          } catch (IOException e) {
+            LOG.warn("RowProcessor:" + processor.getClass().getName() +
+                " throws Exception on row(s):" +
+                Bytes.toStringBinary(
+                    processor.getRowsToLock().iterator().next()) + "...", e);
+            throw e;
+          }
         }
       });
-    Thread t = new Thread(task);
-    t.setDaemon(true);
-    t.start();
+    rowProcessorExecutor.execute(task);
     try {
       task.get(timeout, TimeUnit.MILLISECONDS);
     } catch (TimeoutException te) {
-      LOG.error("RowProcessor timeout on row:" +
-          Bytes.toStringBinary(processor.getRow()) + " timeout:" + timeout, te);
+      LOG.error("RowProcessor timeout:" + timeout + " ms on row(s):" +
+          Bytes.toStringBinary(processor.getRowsToLock().iterator().next()) +
+          "...");
       throw new IOException(te);
     } catch (Exception e) {
       throw new IOException(e);
     }
   }
 
-  final private RowProcessor.RowScanner rowScanner =
-      new RowProcessor.RowScanner() {
-    @Override
-    public void doScan(Scan scan, List<KeyValue> result) throws IOException {
-      InternalScanner scanner = null;
-      try {
-        scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
-        scanner = HRegion.this.getScanner(scan);
-        result.clear();
-        scanner.next(result);
-      } finally {
-        if (scanner != null) scanner.close();
-      }
-    }
-  };
-
   // TODO: There's a lot of boiler plate code identical
   // to increment... See how to better unify that.
   /**

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java?rev=1303915&r1=1303914&r2=1303915&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java Thu
Mar 22 17:49:32 2012
@@ -138,40 +138,45 @@ public class SplitLogWorker extends ZooK
 
   @Override
   public void run() {
-   try {
-    LOG.info("SplitLogWorker " + this.serverName + " starting");
-    this.watcher.registerListener(this);
-    int res;
-    // wait for master to create the splitLogZnode
-    res = -1;
-    while (res == -1) {
-      try {
-        res = ZKUtil.checkExists(watcher, watcher.splitLogZNode);
-      } catch (KeeperException e) {
-        // ignore
-        LOG.warn("Exception when checking for " + watcher.splitLogZNode +
-            " ... retrying", e);
-      }
-      if (res == -1) {
+    try {
+      LOG.info("SplitLogWorker " + this.serverName + " starting");
+      this.watcher.registerListener(this);
+      int res;
+      // wait for master to create the splitLogZnode
+      res = -1;
+      while (res == -1 && !exitWorker) {
         try {
-          LOG.info(watcher.splitLogZNode + " znode does not exist," +
-              " waiting for master to create one");
-          Thread.sleep(1000);
-        } catch (InterruptedException e) {
-          LOG.debug("Interrupted while waiting for " + watcher.splitLogZNode);
-          assert exitWorker == true;
+          res = ZKUtil.checkExists(watcher, watcher.splitLogZNode);
+        } catch (KeeperException e) {
+          // ignore
+          LOG.warn("Exception when checking for " + watcher.splitLogZNode +
+              " ... retrying", e);
+        }
+        if (res == -1) {
+          try {
+            LOG.info(watcher.splitLogZNode + " znode does not exist," +
+                " waiting for master to create one");
+            Thread.sleep(1000);
+          } catch (InterruptedException e) {
+            LOG.debug("Interrupted while waiting for " + watcher.splitLogZNode
+                + (exitWorker ? "" : " (ERROR: exitWorker is not set, " +
+                "exiting anyway)"));
+            exitWorker = true;
+            break;
+          }
         }
       }
-    }
 
-    taskLoop();
-   } catch (Throwable t) {
-	   // only a logical error can cause here. Printing it out 
-	   // to make debugging easier
-	   LOG.error("unexpected error ", t);
-   } finally {
-	   LOG.info("SplitLogWorker " + this.serverName + " exiting");
-   }
+      if (!exitWorker) {
+        taskLoop();
+      }
+    } catch (Throwable t) {
+      // only a logical error can cause here. Printing it out
+      // to make debugging easier
+      LOG.error("unexpected error ", t);
+    } finally {
+      LOG.info("SplitLogWorker " + this.serverName + " exiting");
+    }
   }
 
   /**
@@ -183,7 +188,7 @@ public class SplitLogWorker extends ZooK
    * try to grab every task that has been put up
    */
   private void taskLoop() {
-    while (true) {
+    while (!exitWorker) {
       int seq_start = taskReadySeq;
       List<String> paths = getTaskList();
       if (paths == null) {
@@ -197,7 +202,7 @@ public class SplitLogWorker extends ZooK
         // don't call ZKSplitLog.getNodeName() because that will lead to
         // double encoding of the path name
         grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
-        if (exitWorker == true) {
+        if (exitWorker) {
           return;
         }
       }
@@ -207,8 +212,9 @@ public class SplitLogWorker extends ZooK
             taskReadyLock.wait();
           } catch (InterruptedException e) {
             LOG.info("SplitLogWorker interrupted while waiting for task," +
-              " exiting: " + e.toString());
-            assert exitWorker == true;
+                " exiting: " + e.toString() + (exitWorker ? "" :
+                " (ERROR: exitWorker is not set, exiting anyway)"));
+            exitWorker = true;
             return;
           }
         }



Mime
View raw message