accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1356891 - in /accumulo/branches/1.4: src/core/src/main/java/org/apache/accumulo/core/ src/core/src/main/java/org/apache/accumulo/core/conf/ src/core/src/main/java/org/apache/accumulo/core/zookeeper/ src/server/src/main/java/org/apache/accu...
Date Tue, 03 Jul 2012 19:46:52 GMT
Author: kturner
Date: Tue Jul  3 19:46:50 2012
New Revision: 1356891

URL: http://svn.apache.org/viewvc?rev=1356891&view=rev
Log:
ACCUMULO-409 Make tservers copy failed bulk import files instead of master. 

Added:
    accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/BulkFailedCopyProcessor.java
    accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
Modified:
    accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/Constants.java
    accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/conf/Property.java
    accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java
    accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
    accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
    accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
    accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/IZooReaderWriter.java
    accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java
    accumulo/branches/1.4/test/system/test4/bulk_import_test.sh

Modified: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/Constants.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/Constants.java?rev=1356891&r1=1356890&r2=1356891&view=diff
==============================================================================
--- accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/Constants.java (original)
+++ accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/Constants.java Tue
Jul  3 19:46:50 2012
@@ -77,6 +77,8 @@ public class Constants {
   
   public static final String ZNEXT_FILE = "/next_file";
   
+  public static final String ZBULK_FAILED_COPYQ = "/bulk_failed_copyq";
+
   public static final String ZHDFS_RESERVATIONS = "/hdfs_reservations";
   
   public static final String METADATA_TABLE_ID = "!0";

Modified: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/conf/Property.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1356891&r1=1356890&r2=1356891&view=diff
==============================================================================
--- accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/conf/Property.java
(original)
+++ accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/conf/Property.java
Tue Jul  3 19:46:50 2012
@@ -153,7 +153,9 @@ public enum Property {
   TSERV_HOLD_TIME_SUICIDE("tserver.hold.time.max", "5m", PropertyType.TIMEDURATION,
       "The maximum time for a tablet server to be in the \"memory full\" state.  If the tablet
server cannot write out memory"
           + " in this much time, it will assume there is some failure local to its node,
and quit.  A value of zero is equivalent to forever."),
-  
+  TSERV_WORKQ_THREADS("tserver.workq.threads", "2", PropertyType.COUNT,
+      "The number of threads for the distributed workq.  These threads are used for copying
failed bulk files."),
+
   // properties that are specific to logger server behavior
   LOGGER_PREFIX("logger.", null, PropertyType.PREFIX, "Properties in this category affect
the behavior of the write-ahead logger servers"),
   LOGGER_PORT("logger.port.client", "11224", PropertyType.PORT, "The port used for write-ahead
logger services"),

Modified: accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java?rev=1356891&r1=1356890&r2=1356891&view=diff
==============================================================================
--- accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java
(original)
+++ accumulo/branches/1.4/src/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooUtil.java
Tue Jul  3 19:46:50 2012
@@ -221,6 +221,10 @@ public class ZooUtil {
     return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.PERSISTENT_SEQUENTIAL);
   }
   
+  public static String putEphemeralData(ZooKeeper zk, String zPath, byte[] data) throws KeeperException,
InterruptedException {
+    return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.EPHEMERAL);
+  }
+
   public static String putEphemeralSequential(ZooKeeper zk, String zPath, byte[] data) throws
KeeperException, InterruptedException {
     return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.EPHEMERAL_SEQUENTIAL);
   }
@@ -258,5 +262,4 @@ public class ZooUtil {
     Stat stat = zk.exists(lid.path + "/" + lid.node, false);
     return stat != null && stat.getEphemeralOwner() == lid.eid;
   }
-  
 }

Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java?rev=1356891&r1=1356890&r2=1356891&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
(original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
Tue Jul  3 19:46:50 2012
@@ -63,8 +63,6 @@ import org.apache.accumulo.core.util.Uti
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TServiceClient;
@@ -298,48 +296,16 @@ public class BulkImporter {
     if (completeFailures.size() == 0)
       return Collections.emptySet();
     
-    log.error("The following map files failed completely, saving this info to : " + new Path(failureDir,
"failures.seq"));
+    log.debug("The following map files failed ");
     
     for (Entry<Path,List<KeyExtent>> entry : es) {
       List<KeyExtent> extents = entry.getValue();
       
       for (KeyExtent keyExtent : extents)
-        log.error("\t" + entry.getKey() + " -> " + keyExtent);
+        log.debug("\t" + entry.getKey() + " -> " + keyExtent);
     }
-    
-    try {
-      
-      Writer outSeq = SequenceFile.createWriter(fs, conf, new Path(failureDir, "failures.seq"),
Text.class, KeyExtent.class);
-      
-      for (Entry<Path,List<KeyExtent>> entry : es) {
-        List<KeyExtent> extents = entry.getValue();
-        
-        for (KeyExtent keyExtent : extents)
-          outSeq.append(new Text(entry.getKey().toString()), keyExtent);
-      }
-      
-      outSeq.close();
-    } catch (IOException ioe) {
-      log.error("Failed to create " + new Path(failureDir, "failures.seq") + " : " + ioe.getMessage());
-    }
-    
-    // we should make copying multi-threaded
-    Set<Path> failedCopies = new HashSet<Path>();
-    
-    for (Entry<Path,List<KeyExtent>> entry : es) {
-      Path dest = new Path(failureDir, entry.getKey().getName());
-      
-      log.debug("Copying " + entry.getKey() + " to " + dest);
-      
-      try {
-        org.apache.hadoop.fs.FileUtil.copy(fs, entry.getKey(), fs, dest, false, conf);
-      } catch (IOException ioe) {
-        log.error("Failed to copy " + entry.getKey() + " : " + ioe.getMessage());
-        failedCopies.add(entry.getKey());
-      }
-    }
-    
-    return failedCopies;
+
+    return Collections.emptySet();
   }
   
   private class AssignmentInfo {

Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1356891&r1=1356890&r2=1356891&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
(original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
Tue Jul  3 19:46:50 2012
@@ -16,13 +16,19 @@
  */
 package org.apache.accumulo.server.master.tableOps;
 
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -36,6 +42,8 @@ import org.apache.accumulo.cloudtrace.in
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.impl.ServerClient;
 import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.client.impl.thrift.ClientService;
@@ -45,6 +53,9 @@ import org.apache.accumulo.core.client.i
 import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.security.thrift.AuthInfo;
@@ -63,12 +74,15 @@ import org.apache.accumulo.server.securi
 import org.apache.accumulo.server.tabletserver.UniqueNameAllocator;
 import org.apache.accumulo.server.trace.TraceFileSystem;
 import org.apache.accumulo.server.util.MetadataTable;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 
@@ -276,24 +290,6 @@ class CleanUpBulkImport extends MasterRe
   }
   
   @Override
-  public long isReady(long tid, Master master) throws Exception {
-    Set<TServerInstance> finished = new HashSet<TServerInstance>();
-    Set<TServerInstance> running = master.onlineTabletServers();
-    for (TServerInstance server : running) {
-      try {
-        TServerConnection client = master.getConnection(server);
-        if (client != null && !client.isActive(tid))
-          finished.add(server);
-      } catch (TException ex) {
-        log.info("Ignoring error trying to check on tid " + tid + " from server " + server
+ ": " + ex);
-      }
-    }
-    if (finished.containsAll(running))
-      return 0;
-    return 1000;
-  }
-  
-  @Override
   public Repo<Master> call(long tid, Master master) throws Exception {
     log.debug("removing the bulk processing flag file in " + bulk);
     Path bulkDir = new Path(bulk);
@@ -330,8 +326,125 @@ class CompleteBulkImport extends MasterR
   @Override
   public Repo<Master> call(long tid, Master master) throws Exception {
     ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, tid);
+    return new CopyFailed(tableId, source, bulk, error);
+  }
+}
+
+class CopyFailed extends MasterRepo {
+  
+  private static final long serialVersionUID = 1L;
+
+  private String tableId;
+  private String source;
+  private String bulk;
+  private String error;
+  
+  public CopyFailed(String tableId, String source, String bulk, String error) {
+    this.tableId = tableId;
+    this.source = source;
+    this.bulk = bulk;
+    this.error = error;
+  }
+  
+  @Override
+  public long isReady(long tid, Master master) throws Exception {
+    Set<TServerInstance> finished = new HashSet<TServerInstance>();
+    Set<TServerInstance> running = master.onlineTabletServers();
+    for (TServerInstance server : running) {
+      try {
+        TServerConnection client = master.getConnection(server);
+        if (client != null && !client.isActive(tid))
+          finished.add(server);
+      } catch (TException ex) {
+        log.info("Ignoring error trying to check on tid " + tid + " from server " + server
+ ": " + ex);
+      }
+    }
+    if (finished.containsAll(running))
+      return 0;
+    return 500;
+  }
+  
+  @Override
+  public Repo<Master> call(long tid, Master environment) throws Exception {
+	//This needs to execute after the arbiter is stopped  
+	  
+    FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(),
+        ServerConfiguration.getSiteConfiguration()));
+	  
+    if (!fs.exists(new Path(error, "failures.txt")))
+      return new CleanUpBulkImport(tableId, source, bulk, error);
+    
+    HashMap<String,String> failures = new HashMap<String,String>();
+    HashMap<String,String> loadedFailures = new HashMap<String,String>();
+    
+    FSDataInputStream failFile = fs.open(new Path(error, "failures.txt"));
+    BufferedReader in = new BufferedReader(new InputStreamReader(failFile));
+    try {
+      String line = null;
+      while ((line = in.readLine()) != null) {
+        Path path = new Path(line);
+        if (!fs.exists(new Path(error, path.getName())))
+          failures.put("/" + path.getParent().getName() + "/" + path.getName(), line);
+      }
+    } finally {
+      failFile.close();
+    }
+    
+    /*
+     * I thought I could move files that have no file references in the table. However its
possible a clone references a file. Therefore only move files that
+     * have no loaded markers.
+     */
+
+    // determine which failed files were loaded
+    AuthInfo creds = SecurityConstants.getSystemCredentials();
+    Connector conn = HdfsZooInstance.getInstance().getConnector(creds.user, creds.password);
+    Scanner mscanner = new IsolatedScanner(conn.createScanner(Constants.METADATA_TABLE_NAME,
Constants.NO_AUTHS));
+    mscanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange());
+    mscanner.fetchColumnFamily(Constants.METADATA_BULKFILE_COLUMN_FAMILY);
+    
+    for (Entry<Key,Value> entry : mscanner) {
+      if (Long.parseLong(entry.getValue().toString()) == tid) {
+        String loadedFile = entry.getKey().getColumnQualifierData().toString();
+        String absPath = failures.remove(loadedFile);
+        if (absPath != null) {
+          loadedFailures.put(loadedFile, absPath);
+        }
+      }
+    }
+    
+    // move failed files that were not loaded
+    for (String failure : failures.values()) {
+      Path orig = new Path(failure);
+      Path dest = new Path(error, orig.getName());
+      fs.rename(orig, dest);
+      log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": failed");
+    }
+    
+    if (loadedFailures.size() > 0) {
+      DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(Constants.ZROOT + "/"
+ HdfsZooInstance.getInstance().getInstanceID()
+          + Constants.ZBULK_FAILED_COPYQ);
+      
+      HashSet<String> workIds = new HashSet<String>();
+      
+      for (String failure : loadedFailures.values()) {
+        Path orig = new Path(failure);
+        Path dest = new Path(error, orig.getName());
+        
+        if (fs.exists(dest))
+          continue;
+        
+        bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes());
+        workIds.add(orig.getName());
+        log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + ": failed");
+      }
+      
+      bifCopyQueue.waitUntilDone(workIds);
+    }
+
+    fs.delete(new Path(error, "failures.txt"), true);
     return new CleanUpBulkImport(tableId, source, bulk, error);
   }
+  
 }
 
 class LoadFiles extends MasterRepo {
@@ -455,23 +568,18 @@ class LoadFiles extends MasterRepo {
         UtilWaitThread.sleep(100);
       }
     }
-    // Copy/Create failed file markers
-    for (String f : filesToLoad) {
-      Path orig = new Path(f);
-      Path dest = new Path(errorDir, orig.getName());
-      try {
-        FileUtil.copy(fs, orig, fs, dest, false, true, CachedConfiguration.getInstance());
-        log.debug("tid " + tid + " copied " + orig + " to " + dest + ": failed");
-      } catch (IOException ex) {
-        try {
-          fs.create(dest).close();
-          log.debug("tid " + tid + " marked " + dest + " failed");
-        } catch (IOException e) {
-          log.error("Unable to create failure flag file " + dest, e);
-        }
+    
+    FSDataOutputStream failFile = fs.create(new Path(errorDir, "failures.txt"), true);
+    BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile));
+    try {
+      for (String f : filesToLoad) {
+        out.write(f);
+        out.write("\n");
       }
+    } finally {
+      out.close();
     }
-    
+
     // return the next step, which will perform cleanup
     return new CompleteBulkImport(tableId, source, bulk, errorDir);
   }

Added: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/BulkFailedCopyProcessor.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/BulkFailedCopyProcessor.java?rev=1356891&view=auto
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/BulkFailedCopyProcessor.java
(added)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/BulkFailedCopyProcessor.java
Tue Jul  3 19:46:50 2012
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.server.conf.ServerConfiguration;
+import org.apache.accumulo.server.trace.TraceFileSystem;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue.Processor;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+/**
+ * Copy failed bulk imports.
+ */
+public class BulkFailedCopyProcessor implements Processor {
+  
+  private static final Logger log = Logger.getLogger(BulkFailedCopyProcessor.class);
+  
+  @Override
+  public Processor newProcessor() {
+    return new BulkFailedCopyProcessor();
+  }
+  
+  @Override
+  public void process(String workID, byte[] data) {
+    
+    String paths[] = new String(data).split(",");
+    
+    Path orig = new Path(paths[0]);
+    Path dest = new Path(paths[1]);
+    Path tmp = new Path(dest.getParent(), dest.getName() + ".tmp");
+    
+    try {
+      FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(),
+          ServerConfiguration.getSiteConfiguration()));
+      
+      FileUtil.copy(fs, orig, fs, tmp, false, true, CachedConfiguration.getInstance());
+      fs.rename(tmp, dest);
+      log.debug("copied " + orig + " to " + dest);
+    } catch (IOException ex) {
+      try {
+        FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(),
+            ServerConfiguration.getSiteConfiguration()));
+        
+        fs.create(dest).close();
+        log.warn(" marked " + dest + " failed", ex);
+      } catch (IOException e) {
+        log.error("Unable to create failure flag file " + dest, e);
+      }
+    }
+
+  }
+  
+}

Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1356891&r1=1356890&r2=1356891&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
(original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
Tue Jul  3 19:46:50 2012
@@ -50,8 +50,10 @@ import java.util.concurrent.ArrayBlockin
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -176,10 +178,12 @@ import org.apache.accumulo.server.util.H
 import org.apache.accumulo.server.util.MapCounter;
 import org.apache.accumulo.server.util.MetadataTable;
 import org.apache.accumulo.server.util.MetadataTable.LogEntry;
+import org.apache.accumulo.server.util.NamingThreadFactory;
 import org.apache.accumulo.server.util.TServerUtils;
 import org.apache.accumulo.server.util.TServerUtils.ServerPort;
 import org.apache.accumulo.server.util.time.RelativeTime;
 import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.accumulo.server.zookeeper.IZooReaderWriter;
 import org.apache.accumulo.server.zookeeper.TransactionWatcher;
 import org.apache.accumulo.server.zookeeper.ZooCache;
@@ -2514,6 +2518,8 @@ public class TabletServer extends Abstra
   
   private TServer server;
   
+  private DistributedWorkQueue bulkFailedCopyQ;
+  
   private static final String METRICS_PREFIX = "tserver";
   
   private static ObjectName OBJECT_NAME = null;
@@ -2677,6 +2683,17 @@ public class TabletServer extends Abstra
     clientAddress = new InetSocketAddress(clientAddress.getAddress(), clientPort);
     announceExistence();
     
+    ThreadPoolExecutor distWorkQThreadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(
+        ServerConfiguration.getSystemConfiguration().getCount(Property.TSERV_WORKQ_THREADS),
+        new NamingThreadFactory("distributed work queue"));
+
+    bulkFailedCopyQ = new DistributedWorkQueue(Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID()
+ Constants.ZBULK_FAILED_COPYQ);
+    try {
+      bulkFailedCopyQ.startProcessing(new BulkFailedCopyProcessor(), distWorkQThreadPool);
+    } catch (Exception e1) {
+      throw new RuntimeException("Failed to start distributed work queue for copying ", e1);
+    }
+
     try {
       OBJECT_NAME = new ObjectName("accumulo.server.metrics:service=TServerInfo,name=TabletServerMBean,instance="
+ Thread.currentThread().getName());
       // Do this because interface not in same package.

Added: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java?rev=1356891&view=auto
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
(added)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
Tue Jul  3 19:46:50 2012
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.zookeeper;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.TimerTask;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.core.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.util.time.SimpleTimer;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+
+/**
+ * Provides a way to push work out to tablet servers via zookeeper and wait for that work
to be done. Any tablet server can pick up a work item and process it.
+ * 
+ * Worker processes watch a zookeeper node for tasks to be performed. After getting an exclusive
lock on the node, the worker will perform the task.
+ */
+public class DistributedWorkQueue {
+  
+  private static final String LOCKS_NODE = "locks";
+
+  private static final Logger log = Logger.getLogger(DistributedWorkQueue.class);
+  
+  private ThreadPoolExecutor threadPool;
+  private ZooReaderWriter zoo = ZooReaderWriter.getInstance();
+  private String path;
+
+  private AtomicInteger numTask = new AtomicInteger(0);
+
+  private void lookForWork(final Processor processor, List<String> children) {
+    if (children.size() == 0)
+      return;
+    
+    if (numTask.get() >= threadPool.getCorePoolSize())
+      return;
+    
+    Random random = new Random();
+    Collections.shuffle(children, random);
+    try {
+      for (final String child : children) {
+        
+        if (child.equals(LOCKS_NODE))
+          continue;
+
+        final String lockPath = path + "/locks/" + child;
+
+        try {
+          // no need to use zoolock, because a queue (ephemeral sequential) is not needed
+          // if can not get the lock right now then do not want to wait
+          zoo.putEphemeralData(lockPath, new byte[0]);
+        } catch (NodeExistsException nee) {
+          // someone else has reserved it
+          continue;
+        }
+
+        final String childPath = path + "/" + child;
+        
+        // check to see if another node processed it already
+        if (!zoo.exists(childPath)) {
+          zoo.recursiveDelete(lockPath, NodeMissingPolicy.SKIP);
+          continue;
+        }
+
+        // Great... we got the lock, but maybe we're too busy
+        if (numTask.get() >= threadPool.getCorePoolSize()) {
+          zoo.recursiveDelete(lockPath, NodeMissingPolicy.SKIP);
+          break;
+        }
+        
+        log.debug("got lock for " + child);
+        
+        Runnable task = new Runnable() {
+
+          @Override
+          public void run() {
+            try {
+              try {
+                processor.newProcessor().process(child, zoo.getData(childPath, null));
+                
+                // if the task fails, then its entry in the Q is not deleted... so it will
be retried
+                try {
+                  zoo.recursiveDelete(childPath, NodeMissingPolicy.SKIP);
+                } catch (Exception e) {
+                  log.error("Error received when trying to delete entry in zookeeper " +
childPath, e);
+                }
+                
+                try {
+                  zoo.recursiveDelete(lockPath, NodeMissingPolicy.SKIP);
+                } catch (Exception e) {
+                  log.error("Error received when trying to delete entry in zookeeper " +
childPath, e);
+                }
+                
+              } catch (Exception e) {
+                log.warn("Failed to process work " + child, e);
+              }
+              
+            } finally {
+              numTask.decrementAndGet();
+            }
+            
+            try {
+              // its important that this is called after numTask is decremented
+              lookForWork(processor, zoo.getChildren(path));
+            } catch (KeeperException e) {
+              log.error("Failed to look for work", e);
+            } catch (InterruptedException e) {
+              log.info("Interrupted looking for work", e);
+            }
+          }
+        };
+        
+        numTask.incrementAndGet();
+        threadPool.execute(task);
+
+      }
+    } catch (Throwable t) {
+      log.error("Unexpected error", t);
+    }
+  }
+
+  public interface Processor {
+    Processor newProcessor();
+
+    void process(String workID, byte[] data);
+  }
+  
+  public DistributedWorkQueue(String path) {
+    this.path = path;
+  }
+  
+  public void startProcessing(final Processor processor, ThreadPoolExecutor executorService)
throws KeeperException, InterruptedException {
+    
+    threadPool = (ThreadPoolExecutor) executorService;
+
+    zoo.mkdirs(path);
+    zoo.mkdirs(path + "/" + LOCKS_NODE);
+
+    List<String> children = zoo.getChildren(path, new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        switch (event.getType()) {
+          case NodeChildrenChanged:
+            if (event.getPath().equals(path))
+              try {
+                lookForWork(processor, zoo.getChildren(path, this));
+              } catch (KeeperException e) {
+                log.error("Failed to look for work", e);
+              } catch (InterruptedException e) {
+                log.info("Interrupted looking for work", e);
+              }
+            else
+              log.info("Unexpected path for NodeChildrenChanged event " + event.getPath());
+            break;
+          case NodeCreated:
+          case NodeDataChanged:
+          case NodeDeleted:
+          case None:
+            log.info("Got unexpected zookeeper event: " + event.getType() + " for " + path);
+            break;
+        
+        }
+      }
+    });
+    
+    lookForWork(processor, children);
+    
+    Random r = new Random();
+    // Add a little jitter to avoid all the tservers slamming zookeeper at once
+    SimpleTimer.getInstance().schedule(new TimerTask() {
+      @Override
+      public void run() {
+        try {
+          lookForWork(processor, zoo.getChildren(path));
+        } catch (KeeperException e) {
+          log.error("Failed to look for work", e);
+        } catch (InterruptedException e) {
+          log.info("Interrupted looking for work", e);
+        }
+      }
+    }, r.nextInt(60 * 1000), 60 * 1000);
+  }
+  
+  public void addWork(String workId, byte[] data) throws KeeperException, InterruptedException
{
+    if (workId.equalsIgnoreCase(LOCKS_NODE))
+      throw new IllegalArgumentException("locks is reserved work id");
+
+    zoo.mkdirs(path);
+    zoo.putPersistentData(path + "/" + workId, data, NodeExistsPolicy.SKIP);
+  }
+  
+  public void waitUntilDone(Set<String> workIDs) throws KeeperException, InterruptedException
{
+    
+    final String condVar = new String("cond");
+    
+    Watcher watcher = new Watcher() {
+      @Override
+      public void process(WatchedEvent event) {
+        switch (event.getType()) {
+          case NodeChildrenChanged:
+            synchronized (condVar) {
+              condVar.notify();
+            }
+            break;
+          case NodeCreated:
+          case NodeDataChanged:
+          case NodeDeleted:
+          case None:
+            log.info("Got unexpected zookeeper event: " + event.getType() + " for " + path);
+            break;
+        
+        }
+      }
+    };
+    
+    List<String> children = zoo.getChildren(path, watcher);
+    
+    while (!Collections.disjoint(children, workIDs)) {
+      synchronized (condVar) {
+        condVar.wait(10000);
+      }
+      children = zoo.getChildren(path, watcher);
+    }
+  }
+}

Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/IZooReaderWriter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/IZooReaderWriter.java?rev=1356891&r1=1356890&r2=1356891&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/IZooReaderWriter.java
(original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/IZooReaderWriter.java
Tue Jul  3 19:46:50 2012
@@ -50,6 +50,8 @@ public interface IZooReaderWriter extend
   
   public abstract String putEphemeralSequential(String zPath, byte[] data) throws KeeperException,
InterruptedException;
   
+  public String putEphemeralData(String zPath, byte[] data) throws KeeperException, InterruptedException;
+
   public abstract void recursiveCopyPersistent(String source, String destination, NodeExistsPolicy
policy) throws KeeperException, InterruptedException;
   
   public abstract void delete(String path, int version) throws InterruptedException, KeeperException;

Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java?rev=1356891&r1=1356890&r2=1356891&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java
(original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooReaderWriter.java
Tue Jul  3 19:46:50 2012
@@ -98,6 +98,11 @@ public class ZooReaderWriter extends Zoo
   }
   
   @Override
+  public String putEphemeralData(String zPath, byte[] data) throws KeeperException, InterruptedException
{
+    return ZooUtil.putEphemeralData(getZooKeeper(), zPath, data);
+  }
+  
+  @Override
   public String putEphemeralSequential(String zPath, byte[] data) throws KeeperException,
InterruptedException {
     return ZooUtil.putEphemeralSequential(getZooKeeper(), zPath, data);
   }

Modified: accumulo/branches/1.4/test/system/test4/bulk_import_test.sh
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/test/system/test4/bulk_import_test.sh?rev=1356891&r1=1356890&r2=1356891&view=diff
==============================================================================
--- accumulo/branches/1.4/test/system/test4/bulk_import_test.sh (original)
+++ accumulo/branches/1.4/test/system/test4/bulk_import_test.sh Tue Jul  3 19:46:50 2012
@@ -19,11 +19,11 @@ hadoop dfs -rmr /testmf
 
 echo "creating first set of map files"
 
-../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile /testmf/mf01 -timestamp
1 -size 50 -random 56 1000000 0 1 &
-../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile /testmf/mf02 -timestamp
1 -size 50 -random 56 1000000 1000000 1 &
-../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile /testmf/mf03 -timestamp
1 -size 50 -random 56 1000000 2000000 1 &
-../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile /testmf/mf04 -timestamp
1 -size 50 -random 56 1000000 3000000 1 &
-../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile /testmf/mf05 -timestamp
1 -size 50 -random 56 1000000 4000000 1 &
+../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile /testmf/mf01 -timestamp
1 -size 50 -random 56 1000000 0 1 &
+../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile /testmf/mf02 -timestamp
1 -size 50 -random 56 1000000 1000000 1 &
+../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile /testmf/mf03 -timestamp
1 -size 50 -random 56 1000000 2000000 1 &
+../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile /testmf/mf04 -timestamp
1 -size 50 -random 56 1000000 3000000 1 &
+../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile /testmf/mf05 -timestamp
1 -size 50 -random 56 1000000 4000000 1 &
 
 wait
 
@@ -46,11 +46,11 @@ hadoop dfs -rmr /testmf
 
 echo "creating second set of map files"
 
-../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile /testmf/mf01 -timestamp
2 -size 50 -random 57 1000000 0 1 &
-../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile /testmf/mf02 -timestamp
2 -size 50 -random 57 1000000 1000000 1 &
-../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile /testmf/mf03 -timestamp
2 -size 50 -random 57 1000000 2000000 1 &
-../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile /testmf/mf04 -timestamp
2 -size 50 -random 57 1000000 3000000 1 &
-../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile /testmf/mf05 -timestamp
2 -size 50 -random 57 1000000 4000000 1 &
+../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile /testmf/mf01 -timestamp
2 -size 50 -random 57 1000000 0 1 &
+../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile /testmf/mf02 -timestamp
2 -size 50 -random 57 1000000 1000000 1 &
+../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile /testmf/mf03 -timestamp
2 -size 50 -random 57 1000000 2000000 1 &
+../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile /testmf/mf04 -timestamp
2 -size 50 -random 57 1000000 3000000 1 &
+../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile /testmf/mf05 -timestamp
2 -size 50 -random 57 1000000 4000000 1 &
 
 wait
 



Mime
View raw message