Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 923EE18DE4 for ; Tue, 13 Oct 2015 20:55:45 +0000 (UTC) Received: (qmail 59793 invoked by uid 500); 13 Oct 2015 20:55:38 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 59756 invoked by uid 500); 13 Oct 2015 20:55:38 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 59692 invoked by uid 99); 13 Oct 2015 20:55:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Oct 2015 20:55:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F0F14E01F5; Tue, 13 Oct 2015 20:55:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ecn@apache.org To: commits@accumulo.apache.org Date: Tue, 13 Oct 2015 20:55:38 -0000 Message-Id: In-Reply-To: <61245f0f196e481cb524cf6802221d1c@git.apache.org> References: <61245f0f196e481cb524cf6802221d1c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] accumulo git commit: Merge branch '1.6' into 1.7 Merge branch '1.6' into 1.7 Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/91b161a9 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/91b161a9 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/91b161a9 Branch: refs/heads/master Commit: 91b161a932307e2d16845a0c0f6304f123a140b9 Parents: 62821a0 4deaf73 Author: Eric C. Newton Authored: Tue Oct 13 16:50:18 2015 -0400 Committer: Eric C. Newton Committed: Tue Oct 13 16:50:18 2015 -0400 ---------------------------------------------------------------------- .../main/java/org/apache/accumulo/master/tableOps/BulkImport.java | 1 + 1 file changed, 1 insertion(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/91b161a9/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java ---------------------------------------------------------------------- diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java index 031a80c,e661968..ad20473 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java @@@ -263,5 -284,337 +263,6 @@@ public class BulkImport extends MasterR Utils.unreserveHdfsDirectory(sourceDir, tid); Utils.unreserveHdfsDirectory(errorDir, tid); Utils.getReadLock(tableId, tid).unlock(); + ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid); } } - -class CleanUpBulkImport extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private static final Logger log = Logger.getLogger(CleanUpBulkImport.class); - - private String tableId; - private String source; - private String bulk; - private String error; - - public CleanUpBulkImport(String tableId, String source, String bulk, String error) { - this.tableId = tableId; - this.source = source; - this.bulk = bulk; - this.error = error; - } - - @Override - public Repo call(long tid, Master master) throws Exception { - log.debug("removing the bulk processing flag file in " + bulk); - Path bulkDir = new Path(bulk); - MetadataTableUtil.removeBulkLoadInProgressFlag("/" + bulkDir.getParent().getName() + "/" + bulkDir.getName()); - MetadataTableUtil.addDeleteEntry(tableId, bulkDir.toString()); - log.debug("removing the metadata table markers for loaded files"); - Connector conn = master.getConnector(); - MetadataTableUtil.removeBulkLoadEntries(conn, tableId, tid); - log.debug("releasing HDFS reservations for " + source + " and " + error); - Utils.unreserveHdfsDirectory(source, tid); - Utils.unreserveHdfsDirectory(error, tid); - Utils.getReadLock(tableId, tid).unlock(); - log.debug("completing bulk import transaction " + tid); - ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid); - return null; - } -} - -class CompleteBulkImport extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private String tableId; - private String source; - private String bulk; - private String error; - - public CompleteBulkImport(String tableId, String source, String bulk, String error) { - this.tableId = tableId; - this.source = source; - this.bulk = bulk; - this.error = error; - } - - @Override - public Repo call(long tid, Master master) throws Exception { - ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, tid); - return new CopyFailed(tableId, source, bulk, error); - } -} - -class CopyFailed extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private String tableId; - private String source; - private String bulk; - private String error; - - public CopyFailed(String tableId, String source, String bulk, String error) { - this.tableId = tableId; - this.source = source; - this.bulk = bulk; - this.error = error; - } - - @Override - public long isReady(long tid, Master master) throws Exception { - Set finished = new HashSet(); - Set running = master.onlineTabletServers(); - for (TServerInstance server : running) { - try { - TServerConnection client = master.getConnection(server); - if (client != null && !client.isActive(tid)) - finished.add(server); - } catch (TException ex) { - log.info("Ignoring error trying to check on tid " + tid + " from server " + server + ": " + ex); - } - } - if (finished.containsAll(running)) - return 0; - return 500; - } - - @Override - public Repo call(long tid, Master master) throws Exception { - // This needs to execute after the arbiter is stopped - - VolumeManager fs = master.getFileSystem(); - - if (!fs.exists(new Path(error, BulkImport.FAILURES_TXT))) - return new CleanUpBulkImport(tableId, source, bulk, error); - - HashMap failures = new HashMap(); - HashMap loadedFailures = new HashMap(); - - FSDataInputStream failFile = fs.open(new Path(error, BulkImport.FAILURES_TXT)); - BufferedReader in = new BufferedReader(new InputStreamReader(failFile, UTF_8)); - try { - String line = null; - while ((line = in.readLine()) != null) { - Path path = new Path(line); - if (!fs.exists(new Path(error, path.getName()))) - failures.put(new FileRef(line, path), line); - } - } finally { - failFile.close(); - } - - /* - * I thought I could move files that have no file references in the table. However its possible a clone references a file. Therefore only move files that - * have no loaded markers. - */ - - // determine which failed files were loaded - Connector conn = master.getConnector(); - Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)); - mscanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange()); - mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME); - - for (Entry entry : mscanner) { - if (Long.parseLong(entry.getValue().toString()) == tid) { - FileRef loadedFile = new FileRef(fs, entry.getKey()); - String absPath = failures.remove(loadedFile); - if (absPath != null) { - loadedFailures.put(loadedFile, absPath); - } - } - } - - // move failed files that were not loaded - for (String failure : failures.values()) { - Path orig = new Path(failure); - Path dest = new Path(error, orig.getName()); - fs.rename(orig, dest); - log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": import failed"); - } - - if (loadedFailures.size() > 0) { - DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() - + Constants.ZBULK_FAILED_COPYQ); - - HashSet workIds = new HashSet(); - - for (String failure : loadedFailures.values()) { - Path orig = new Path(failure); - Path dest = new Path(error, orig.getName()); - - if (fs.exists(dest)) - continue; - - bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes(UTF_8)); - workIds.add(orig.getName()); - log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + ": failed"); - } - - bifCopyQueue.waitUntilDone(workIds); - } - - fs.deleteRecursively(new Path(error, BulkImport.FAILURES_TXT)); - return new CleanUpBulkImport(tableId, source, bulk, error); - } - -} - -class LoadFiles extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private static ExecutorService threadPool = null; - private static final Logger log = Logger.getLogger(BulkImport.class); - - private String tableId; - private String source; - private String bulk; - private String errorDir; - private boolean setTime; - - public LoadFiles(String tableId, String source, String bulk, String errorDir, boolean setTime) { - this.tableId = tableId; - this.source = source; - this.bulk = bulk; - this.errorDir = errorDir; - this.setTime = setTime; - } - - @Override - public long isReady(long tid, Master master) throws Exception { - if (master.onlineTabletServers().size() == 0) - return 500; - return 0; - } - - private static synchronized ExecutorService getThreadPool(Master master) { - if (threadPool == null) { - int threadPoolSize = master.getSystemConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE); - ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk import"); - pool.allowCoreThreadTimeOut(true); - threadPool = new TraceExecutorService(pool); - } - return threadPool; - } - - @Override - public Repo call(final long tid, final Master master) throws Exception { - ExecutorService executor = getThreadPool(master); - final SiteConfiguration conf = ServerConfiguration.getSiteConfiguration(); - VolumeManager fs = master.getFileSystem(); - List files = new ArrayList(); - for (FileStatus entry : fs.listStatus(new Path(bulk))) { - files.add(entry); - } - log.debug("tid " + tid + " importing " + files.size() + " files"); - - Path writable = new Path(this.errorDir, ".iswritable"); - if (!fs.createNewFile(writable)) { - // Maybe this is a re-try... clear the flag and try again - fs.delete(writable); - if (!fs.createNewFile(writable)) - throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, - "Unable to write to " + this.errorDir); - } - fs.delete(writable); - - final Set filesToLoad = Collections.synchronizedSet(new HashSet()); - for (FileStatus f : files) - filesToLoad.add(f.getPath().toString()); - - final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES)); - for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++) { - List>> results = new ArrayList>>(); - - if (master.onlineTabletServers().size() == 0) - log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid + ")"); - - while (master.onlineTabletServers().size() == 0) { - UtilWaitThread.sleep(500); - } - - // Use the threadpool to assign files one-at-a-time to the server - final List loaded = Collections.synchronizedList(new ArrayList()); - for (final String file : filesToLoad) { - results.add(executor.submit(new Callable>() { - @Override - public List call() { - List failures = new ArrayList(); - ClientService.Client client = null; - String server = null; - try { - // get a connection to a random tablet server, do not prefer cached connections because - // this is running on the master and there are lots of connections to tablet servers - // serving the metadata tablets - long timeInMillis = master.getConfiguration().getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT); - Pair pair = ServerClient.getConnection(master.getInstance(), false, timeInMillis); - client = pair.getSecond(); - server = pair.getFirst(); - List attempt = Collections.singletonList(file); - log.debug("Asking " + pair.getFirst() + " to bulk import " + file); - List fail = client.bulkImportFiles(Tracer.traceInfo(), SystemCredentials.get().toThrift(master.getInstance()), tid, tableId, attempt, - errorDir, setTime); - if (fail.isEmpty()) { - loaded.add(file); - } else { - failures.addAll(fail); - } - } catch (Exception ex) { - log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex); - } finally { - ServerClient.close(client); - } - return failures; - } - })); - } - Set failures = new HashSet(); - for (Future> f : results) - failures.addAll(f.get()); - filesToLoad.removeAll(loaded); - if (filesToLoad.size() > 0) { - log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + sampleList(filesToLoad, 10) + " failed"); - UtilWaitThread.sleep(100); - } - } - - FSDataOutputStream failFile = fs.create(new Path(errorDir, BulkImport.FAILURES_TXT), true); - BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile, UTF_8)); - try { - for (String f : filesToLoad) { - out.write(f); - out.write("\n"); - } - } finally { - out.close(); - } - - // return the next step, which will perform cleanup - return new CompleteBulkImport(tableId, source, bulk, errorDir); - } - - static String sampleList(Collection potentiallyLongList, int max) { - StringBuffer result = new StringBuffer(); - result.append("["); - int i = 0; - for (Object obj : potentiallyLongList) { - result.append(obj); - if (i >= max) { - result.append("..."); - break; - } else { - result.append(", "); - } - i++; - } - if (i < max) - result.delete(result.length() - 2, result.length()); - result.append("]"); - return result.toString(); - } - -}