hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nspiegelb...@apache.org
Subject svn commit: r1181941 - in /hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase: client/HBaseLocalityCheck.java master/HMaster.java util/FSUtils.java
Date Tue, 11 Oct 2011 17:43:15 GMT
Author: nspiegelberg
Date: Tue Oct 11 17:43:14 2011
New Revision: 1181941

URL: http://svn.apache.org/viewvc?rev=1181941&view=rev
Log:
Implementing a ThreadPool to parallelize startup region-locality HDFS scans

Summary: By using a ThreadPool we can turn the formerly serialized scans of
HDFS into a parallel operation. Special care was taken to ensure that if any
operation fails, the scans are stopped abruptly and the exception is thrown to a
higher level.
Test Plan: The mechanism is still the same, so to test that it works, one
should just start up the cluster and check with the locality checker that Liyin
created, as before.
Reviewed By: liyintang
Reviewers: liyintang, kannan, nspiegelberg
Commenters: kannan, nspiegelberg
CC: hbase@lists, kannan, liyintang, nspiegelberg, bogdan
Differential Revision: 283312
Task ID: 620114

Conflicts:

	src/main/java/org/apache/hadoop/hbase/util/FSUtils.java

Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseLocalityCheck.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseLocalityCheck.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseLocalityCheck.java?rev=1181941&r1=1181940&r2=1181941&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseLocalityCheck.java
(original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseLocalityCheck.java
Tue Oct 11 17:43:14 2011
@@ -4,7 +4,6 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -14,6 +13,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.client.HBaseFsck.HbckInfo;
@@ -54,8 +54,10 @@ public class HBaseLocalityCheck {
     LOG.info("Locality information by region");
 
     // Get the locality info for each region by scanning the file system
-    preferredRegionToRegionServerMapping =
-      FSUtils.getRegionLocalityMappingFromFS(fs, rootdir);
+    preferredRegionToRegionServerMapping = FSUtils
+        .getRegionLocalityMappingFromFS(fs, rootdir,
+            conf.getInt("hbase.client.localityCheck.threadPoolSize", 2),
+            conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 60 * 1000));
 
     Map<String, AtomicInteger> tableToRegionCountMap =
       new HashMap<String, AtomicInteger>();

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1181941&r1=1181940&r2=1181941&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Tue Oct
11 17:43:14 2011
@@ -36,8 +36,7 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FileStatus;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.Options;
@@ -45,6 +44,7 @@ import org.apache.commons.cli.ParseExcep
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.ClusterSt
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HConstants.Modify;
 import org.apache.hadoop.hbase.HMsg;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -65,17 +66,16 @@ import org.apache.hadoop.hbase.MasterNot
 import org.apache.hadoop.hbase.MiniZooKeeperCluster;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.TableExistsException;
-import org.apache.hadoop.hbase.HConstants.Modify;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.MetaScanner;
+import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.ServerConnection;
 import org.apache.hadoop.hbase.client.ServerConnectionManager;
-import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
-import org.apache.hadoop.hbase.executor.HBaseExecutorService;
 import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
+import org.apache.hadoop.hbase.executor.HBaseExecutorService;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
@@ -428,6 +428,7 @@ public class HMaster extends Thread impl
     return this.address;
   }
 
+  @Override
   public long getProtocolVersion(String protocol, long clientVersion) {
     return HBaseRPCProtocolVersion.versionID;
   }
@@ -634,10 +635,12 @@ public class HMaster extends Thread impl
             1 * 60 * 1000);
       LOG.debug("get preferredRegionToHostMapping; expecting pause here");
       try {
-        this.preferredRegionToRegionServerMapping =
-          FSUtils.getRegionLocalityMappingFromFS(fs, rootdir);
+        this.preferredRegionToRegionServerMapping = FSUtils
+            .getRegionLocalityMappingFromFS(fs, rootdir,
+                conf.getInt("hbase.master.localityCheck.threadPoolSize", 5),
+                threadWakeFrequency);
       } catch (Exception e) {
-        LOG.equals("Got unexpected exception when getting " +
+        LOG.error("Got unexpected exception when getting " +
             "preferredRegionToHostMapping : " + e.toString());
         // do not pause the master's construction
         preferredRegionToRegionServerMapping = null;
@@ -815,6 +818,7 @@ public class HMaster extends Thread impl
     this.serverManager.notifyServers();
   }
 
+  @Override
   public MapWritable regionServerStartup(final HServerInfo serverInfo)
   throws IOException {
     // Set the ip into the passed in serverInfo.  Its ip is more than likely
@@ -845,6 +849,7 @@ public class HMaster extends Thread impl
     return mw;
   }
 
+  @Override
   public HMsg [] regionServerReport(HServerInfo serverInfo, HMsg msgs[],
     HRegionInfo[] mostLoadedRegions)
   throws IOException {
@@ -864,16 +869,19 @@ public class HMaster extends Thread impl
     return msgs;
   }
 
+  @Override
   public boolean isMasterRunning() {
     return !this.closed.get();
   }
 
+  @Override
   public void shutdown() {
     LOG.info("Cluster shutdown requested. Starting to quiesce servers");
     this.shutdownRequested.set(true);
     this.zooKeeperWrapper.setClusterState(false);
   }
 
+  @Override
   public void createTable(HTableDescriptor desc, byte [][] splitKeys)
   throws IOException {
     if (!isMasterRunning()) {
@@ -952,6 +960,7 @@ public class HMaster extends Thread impl
     regionManager.metaScannerThread.triggerNow();
   }
 
+  @Override
   public void deleteTable(final byte [] tableName) throws IOException {
     if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
       throw new IOException("Can't delete root table");
@@ -960,22 +969,26 @@ public class HMaster extends Thread impl
     LOG.info("deleted table: " + Bytes.toString(tableName));
   }
 
+  @Override
   public void addColumn(byte [] tableName, HColumnDescriptor column)
   throws IOException {
     new AddColumn(this, tableName, column).process();
   }
 
+  @Override
   public void modifyColumn(byte [] tableName, byte [] columnName,
     HColumnDescriptor descriptor)
   throws IOException {
     new ModifyColumn(this, tableName, columnName, descriptor).process();
   }
 
+  @Override
   public void deleteColumn(final byte [] tableName, final byte [] c)
   throws IOException {
     new DeleteColumn(this, tableName, KeyValue.parseColumn(c)[0]).process();
   }
 
+  @Override
   public void enableTable(final byte [] tableName) throws IOException {
     if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
       throw new IOException("Can't enable root table");
@@ -983,6 +996,7 @@ public class HMaster extends Thread impl
     new ChangeTableState(this, tableName, true).process();
   }
 
+  @Override
   public void disableTable(final byte [] tableName) throws IOException {
     if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
       throw new IOException("Can't disable root table");
@@ -1130,6 +1144,7 @@ public class HMaster extends Thread impl
     return this.connection.getHTableDescriptor(tableName);
   }
 
+  @Override
   public void modifyTable(final byte[] tableName, HConstants.Modify op,
       Writable[] args)
   throws IOException {
@@ -1270,6 +1285,7 @@ public class HMaster extends Thread impl
   /**
    * @return cluster status
    */
+  @Override
   public ClusterStatus getClusterStatus() {
     ClusterStatus status = new ClusterStatus();
     status.setHBaseVersion(VersionInfo.getVersion());

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=1181941&r1=1181940&r2=1181941&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java Tue Oct 11
17:43:14 2011
@@ -19,6 +19,23 @@
  */
 package org.apache.hadoop.hbase.util;
 
+import java.io.DataInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -40,22 +57,6 @@ import org.apache.hadoop.hdfs.protocol.F
 import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
 import org.apache.hadoop.io.SequenceFile;
 
-import java.io.DataInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.lang.reflect.InvocationTargetException;
-import java.io.PrintStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
 /**
  * Utility methods for interacting with the underlying file system.
  */
@@ -572,6 +573,7 @@ public class FSUtils {
       this.fs = fs;
     }
 
+    @Override
     public boolean accept(Path p) {
       boolean isdir = false;
       try {
@@ -702,99 +704,126 @@ public class FSUtils {
    * mapping between the region name and its best locality region server
    *
    * @param fs
+   *          the file system to use
    * @param rootPath
-   * @param out
-   * @return
+   *          the root path to start from
+   * @param threadPoolSize
+   *          the thread pool size to use
+   * @param threadWakeFrequency
+   *          the wake frequency to perform stat printing
+   * @return the mapping to consider as best possible assignment
    * @throws IOException
+   *           in case of file system errors or interrupts
    */
-  public static  Map<String, String> getRegionLocalityMappingFromFS(
-      final FileSystem fs,  final Path rootPath)
+  public static Map<String, String> getRegionLocalityMappingFromFS(
+      final FileSystem fs, final Path rootPath, int threadPoolSize,
+      final int threadWakeFrequency)
       throws IOException {
     // region name to its best locality region server mapping
     Map<String, String> regionToBestLocalityRSMapping =
-       new HashMap<String,  String>();
-    // keep the most block count mapping
-    HashMap<String, AtomicInteger> blockCountMap =
-      new HashMap<String, AtomicInteger>();
+       new ConcurrentHashMap<String,  String>();
 
     long startTime = System.currentTimeMillis();
     Path queryPath = new Path(rootPath.toString() + "/*/*/");
     FileStatus[] statusList = fs.globStatus(queryPath);
+
     LOG.debug("Query Path: " + queryPath + " ; # list of files: " +
         statusList.length);
 
-		if (statusList == null) {
-			return regionToBestLocalityRSMapping;
-		}
-    for (FileStatus regionStatus : statusList) {
-      if(!regionStatus.isDir()) {
-        continue;
-      }
+    if (null == statusList) {
+      return regionToBestLocalityRSMapping;
+    }
 
-			// get the region name; it may get some noise data
-      Path regionPath = regionStatus.getPath();
-      String regionName = regionPath.getName();
-      if (!regionName.toLowerCase().matches("[0-9a-f]+")) {
-        continue;
+    // run in multiple threads
+    int totalGoodRegionFiles = 0;
+    ThreadPoolExecutor tpe = null;
+    FSRegionScanner[] parallelTasks = null;
+    try {
+      // lower the number of threads in case we have very few expected regions
+      int maxRegions = statusList.length;
+
+      if (maxRegions < threadPoolSize) {
+        threadPoolSize = maxRegions;
       }
-      // ignore the empty directory
-      FileStatus[] cfList = fs.listStatus(regionPath);
-      if (cfList == null) {
-				continue;
-			}
-
-			//get table name
-      String tableName = regionPath.getParent().getName();
-      int totalBlkCount = 0;
-      blockCountMap.clear();
-
-      // for each cf, get all the blocks information
-			for (FileStatus cfStatus : cfList) {
-        if (!cfStatus.isDir()) {
-          // skip because this is not a CF directory
+
+      // initialize executor service
+      tpe = new ThreadPoolExecutor(threadPoolSize,
+          threadPoolSize, 60, TimeUnit.SECONDS,
+          new ArrayBlockingQueue<Runnable>(
+              threadPoolSize));
+
+      // set defaults
+      FSRegionScanner.setFileSystem(fs);
+      FSRegionScanner
+          .setRegionToBestLocalityRSMapping(regionToBestLocalityRSMapping);
+
+      // start initializing "thread pool" and threads
+      parallelTasks = new FSRegionScanner[threadPoolSize];
+      ArrayList<LinkedList<Path>> buckets = new ArrayList<LinkedList<Path>>();
+      for (int i = 0; i < threadPoolSize; ++i) {
+        // create buckets for each thread
+        LinkedList<Path> bucket = new LinkedList<Path>();
+        buckets.add(bucket);
+        parallelTasks[i] = new FSRegionScanner(bucket);
+      }
+
+      // ignore all file status items that are not of interest
+      int current = 0;
+      for (FileStatus regionStatus : statusList) {
+        if (null == regionStatus) {
           continue;
         }
-        FileStatus[] storeFileLists = fs.listStatus(cfStatus.getPath());
-				if (storeFileLists == null) {
-					continue;
-				}
-        for (FileStatus storeFile : storeFileLists) {
-          BlockLocation[] blkLocations =
-            fs.getFileBlockLocations(storeFile, 0, storeFile.getLen());
-					if (blkLocations == null) {
-						continue;
-					}
-					totalBlkCount += blkLocations.length;
-          for(BlockLocation blk: blkLocations) {
-            for (String host: blk.getHosts()) {
-              AtomicInteger count = blockCountMap.get(host);
-              if (count == null) {
-                count = new AtomicInteger(0);
-                blockCountMap.put(host, count);
-              }
-             count.incrementAndGet();
-            }
-          }
+
+        if (!regionStatus.isDir()) {
+          continue;
         }
-      }
 
-      int largestBlkCount = 0;
-      String hostToRun = null;
-      for (String host: blockCountMap.keySet()) {
-        int tmp = blockCountMap.get(host).get();
-        if (tmp > largestBlkCount) {
-          largestBlkCount = tmp;
-          hostToRun = host;
+        // get the region name; it may get some noise data
+        Path regionPath = regionStatus.getPath();
+        String regionName = regionPath.getName();
+        if (!regionName.toLowerCase().matches("[0-9a-f]+")) {
+          continue;
         }
-      }
 
-      if (hostToRun.endsWith(".")) {
-        hostToRun = hostToRun.substring(0, hostToRun.length()-1);
-      }
-      String name = tableName + ":" + regionName;
-      regionToBestLocalityRSMapping.put(name,hostToRun);
+        // add to respective bucket, do round-robin additions to make sure all
+        // threads get things to do; can create empty buckets in the rare case
+        // in which we end up getting less region paths than we have threads to
+        // handle them
+        buckets.get(current % threadPoolSize).add(regionPath);
+        ++totalGoodRegionFiles;
+        ++current;
+      }
+
+      // start each thread in executor service
+      for (FSRegionScanner task : parallelTasks) {
+        tpe.execute(task);
+      }
+    } finally {
+      if (null != tpe && null != parallelTasks) {
+        tpe.shutdown();
+        try {
+          // here we wait until TPE terminates, which is either naturally or by
+          // exceptions in the execution of the threads
+          while (!tpe.awaitTermination(threadWakeFrequency,
+              TimeUnit.MILLISECONDS)) {
+            int filesDone = 0;
+            for (FSRegionScanner rs : parallelTasks) {
+              filesDone += rs.getNumberOfFinishedRegions();
+            }
 
+            // printing out rough estimate, so as to not introduce
+            // AtomicInteger
+            LOG.info("Locality checking is underway: { THREADS : "
+                + tpe.getCompletedTaskCount() + "/" + parallelTasks.length
+                + " , Scanned Regions : " + filesDone + "/"
+                + totalGoodRegionFiles + " }");
+          }
+        } catch (InterruptedException e) {
+          throw new IOException(e);
+        }
+      }
     }
+
     long overhead = System.currentTimeMillis() - startTime;
     String overheadMsg = "Scan DFS for locality info takes " + overhead + " ms";
 
@@ -803,3 +832,143 @@ public class FSUtils {
   }
 
 }
+
+/**
+ * Thread to be used for
+ */
+class FSRegionScanner implements Runnable {
+  /**
+   * The shared block count map
+   */
+  private HashMap<String, AtomicInteger> blockCountMap;
+
+  /**
+   * The file system used
+   */
+  static private FileSystem fs;
+
+  static void setFileSystem(FileSystem fs) {
+    FSRegionScanner.fs = fs;
+  }
+
+  /**
+   * The locality mapping returned by the above getRegionLocalityMappingFromFS
+   * method
+   */
+  static private Map<String, String> regionToBestLocalityRSMapping;
+
+  static void setRegionToBestLocalityRSMapping(
+      Map<String, String> regionToBestLocalityRSMapping) {
+    FSRegionScanner.regionToBestLocalityRSMapping =
+      regionToBestLocalityRSMapping;
+  }
+
+  /**
+   * The respective paths to analyze for each thread
+   */
+  private LinkedList<Path> paths;
+
+  /**
+   * Number of finished blocks by now
+   */
+  private int numerOfFinishedRegions;
+
+  public int getNumberOfFinishedRegions() {
+    return numerOfFinishedRegions;
+  }
+
+  FSRegionScanner(LinkedList<Path> paths) {
+    this.paths = paths;
+    this.numerOfFinishedRegions = 0;
+    this.blockCountMap = new HashMap<String, AtomicInteger>();
+  }
+
+  @Override
+  public void run() {
+    try {
+      for (Path regionPath : paths) {
+        try {
+
+          // break here in case this for loop selects some of the null-pading
+          // elements at the end of the receiving arrays
+          if (null == regionPath) {
+            break;
+          }
+          //get table name
+          String tableName = regionPath.getParent().getName();
+          int totalBlkCount = 0;
+
+          // ignore null
+          FileStatus[] cfList = fs.listStatus(regionPath);
+          if (null == cfList) {
+            continue;
+          }
+
+          // for each cf, get all the blocks information
+          for (FileStatus cfStatus : cfList) {
+            if (!cfStatus.isDir()) {
+              // skip because this is not a CF directory
+              continue;
+            }
+            FileStatus[] storeFileLists = fs.listStatus(cfStatus.getPath());
+            if (null == storeFileLists) {
+              continue;
+            }
+
+            for (FileStatus storeFile : storeFileLists) {
+              BlockLocation[] blkLocations =
+                fs.getFileBlockLocations(storeFile, 0, storeFile.getLen());
+              if (null == blkLocations) {
+                continue;
+              }
+
+              totalBlkCount += blkLocations.length;
+              for(BlockLocation blk: blkLocations) {
+                for (String host: blk.getHosts()) {
+                  AtomicInteger count = blockCountMap.get(host);
+                  if (count == null) {
+                    count = new AtomicInteger(0);
+                    blockCountMap.put(host, count);
+                  }
+                 count.incrementAndGet();
+                }
+              }
+            }
+          }
+
+          int largestBlkCount = 0;
+          String hostToRun = null;
+          for (String host: blockCountMap.keySet()) {
+            int tmp = blockCountMap.get(host).get();
+            if (tmp > largestBlkCount) {
+              largestBlkCount = tmp;
+              hostToRun = host;
+            }
+          }
+
+          // empty regions could make this null
+          if (null == hostToRun) {
+            continue;
+          }
+
+          if (hostToRun.endsWith(".")) {
+            hostToRun = hostToRun.substring(0, hostToRun.length()-1);
+          }
+          String name = tableName + ":" + regionPath.getName();
+          regionToBestLocalityRSMapping.put(name,hostToRun);
+
+          this.numerOfFinishedRegions++;
+        } catch (IOException e) {
+          continue;
+        } catch (RuntimeException e) {
+          continue;
+        }
+      }
+    } finally {
+      // sanity check
+      assert this.numerOfFinishedRegions <= this.paths.size();
+    }
+  }
+  // we can use the error mechanism to stop the whole process if we want,
+  // instead of continuing on errors...
+}



Mime
View raw message