From commits-return-21469-archive-asf-public=cust-asf.ponee.io@accumulo.apache.org Wed Feb 14 20:37:44 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id BCB3D180656 for ; Wed, 14 Feb 2018 20:37:43 +0100 (CET) Received: (qmail 57921 invoked by uid 500); 14 Feb 2018 19:37:42 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 57912 invoked by uid 99); 14 Feb 2018 19:37:42 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Feb 2018 19:37:42 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id EF48E81FA3; Wed, 14 Feb 2018 19:37:41 +0000 (UTC) Date: Wed, 14 Feb 2018 19:37:42 +0000 To: "commits@accumulo.apache.org" Subject: [accumulo] 01/01: Merge branch '1.8' MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: adamjshook@apache.org In-Reply-To: <151863706183.28668.13632827140687117213@gitbox.apache.org> References: <151863706183.28668.13632827140687117213@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: accumulo X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Rev: 23cabbc7a487b8844cbbe48a89a26741b7a696d8 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20180214193741.EF48E81FA3@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. adamjshook pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git commit 23cabbc7a487b8844cbbe48a89a26741b7a696d8 Merge: 2a58165 927e673 Author: Adam J. Shook AuthorDate: Wed Feb 14 14:26:12 2018 -0500 Merge branch '1.8' .../org/apache/accumulo/tserver/log/DfsLogger.java | 5 +- .../org/apache/accumulo/tserver/log/LogSorter.java | 68 ++--- .../apache/accumulo/tserver/logger/LogReader.java | 43 +-- .../tserver/replication/AccumuloReplicaSystem.java | 297 ++++++++++----------- 4 files changed, 207 insertions(+), 206 deletions(-) diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java index fc72b98,ba5e488..c35a315 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java @@@ -113,44 -113,46 +113,46 @@@ public class LogSorter // the following call does not throw an exception if the file/dir does not exist fs.deleteRecursively(new Path(destPath)); - DFSLoggerInputStreams inputStreams; - try { - inputStreams = DfsLogger.readHeaderAndReturnStream(fs, srcPath, conf); - } catch (LogHeaderIncompleteException e) { - log.warn("Could not read header from write-ahead log {}. Not sorting.", srcPath); - // Creating a 'finished' marker will cause recovery to proceed normally and the - // empty file will be correctly ignored downstream. - fs.mkdirs(new Path(destPath)); - writeBuffer(destPath, Collections.> emptyList(), part++); - fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close(); - return; - } - - this.input = inputStreams.getOriginalInput(); - this.decryptingInput = inputStreams.getDecryptingInputStream(); - - final long bufferSize = conf.getAsBytes(Property.TSERV_SORT_BUFFER_SIZE); - Thread.currentThread().setName("Sorting " + name + " for recovery"); - while (true) { - final ArrayList> buffer = new ArrayList<>(); + try (final FSDataInputStream fsinput = fs.open(srcPath)) { + DFSLoggerInputStreams inputStreams; try { - long start = input.getPos(); - while (input.getPos() - start < bufferSize) { - LogFileKey key = new LogFileKey(); - LogFileValue value = new LogFileValue(); - key.readFields(decryptingInput); - value.readFields(decryptingInput); - buffer.add(new Pair<>(key, value)); + inputStreams = DfsLogger.readHeaderAndReturnStream(fsinput, conf); + } catch (LogHeaderIncompleteException e) { - log.warn("Could not read header from write-ahead log " + srcPath + ". Not sorting."); ++ log.warn("Could not read header from write-ahead log {}. Not sorting.", srcPath); + // Creating a 'finished' marker will cause recovery to proceed normally and the + // empty file will be correctly ignored downstream. + fs.mkdirs(new Path(destPath)); + writeBuffer(destPath, Collections.> emptyList(), part++); + fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close(); + return; + } + + this.input = inputStreams.getOriginalInput(); + this.decryptingInput = inputStreams.getDecryptingInputStream(); + - final long bufferSize = conf.getMemoryInBytes(Property.TSERV_SORT_BUFFER_SIZE); ++ final long bufferSize = conf.getAsBytes(Property.TSERV_SORT_BUFFER_SIZE); + Thread.currentThread().setName("Sorting " + name + " for recovery"); + while (true) { + final ArrayList> buffer = new ArrayList<>(); + try { + long start = input.getPos(); + while (input.getPos() - start < bufferSize) { + LogFileKey key = new LogFileKey(); + LogFileValue value = new LogFileValue(); + key.readFields(decryptingInput); + value.readFields(decryptingInput); + buffer.add(new Pair<>(key, value)); + } + writeBuffer(destPath, buffer, part++); + buffer.clear(); + } catch (EOFException ex) { + writeBuffer(destPath, buffer, part++); + break; } - writeBuffer(destPath, buffer, part++); - buffer.clear(); - } catch (EOFException ex) { - writeBuffer(destPath, buffer, part++); - break; } + fs.create(new Path(destPath, "finished")).close(); - log.info("Finished log sort " + name + " " + getBytesCopied() + " bytes " + part + " parts in " + getSortTime() + "ms"); ++ log.info("Finished log sort {} {} bytes {} parts in {}ms", name, getBytesCopied(), part, getSortTime()); } - fs.create(new Path(destPath, "finished")).close(); - log.info("Finished log sort {} {} bytes {} parts in {}ms", name, getBytesCopied(), part, getSortTime()); } catch (Throwable t) { try { // parent dir may not exist diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java index 479550f,fb286c4..76056b4 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java @@@ -101,28 -101,30 +102,30 @@@ public class LogReader LogFileValue value = new LogFileValue(); if (fs.isFile(path)) { - // read log entries from a simple hdfs file - DFSLoggerInputStreams streams; - try { - streams = DfsLogger.readHeaderAndReturnStream(fs, path, SiteConfiguration.getInstance()); - } catch (LogHeaderIncompleteException e) { - log.warn("Could not read header for {}. Ignoring...", path); - continue; - } - DataInputStream input = streams.getDecryptingInputStream(); + try (final FSDataInputStream fsinput = fs.open(path)) { + // read log entries from a simple hdfs file + DFSLoggerInputStreams streams; + try { + streams = DfsLogger.readHeaderAndReturnStream(fsinput, SiteConfiguration.getInstance()); + } catch (LogHeaderIncompleteException e) { - log.warn("Could not read header for " + path + ". Ignoring..."); ++ log.warn("Could not read header for {} . Ignoring...", path); + continue; + } + DataInputStream input = streams.getDecryptingInputStream(); - try { - while (true) { - try { - key.readFields(input); - value.readFields(input); - } catch (EOFException ex) { - break; + try { + while (true) { + try { + key.readFields(input); + value.readFields(input); + } catch (EOFException ex) { + break; + } + printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations); } - printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations); + } finally { + input.close(); } - } finally { - input.close(); } } else { // read the log entries sorted in a map file -- To stop receiving notification emails like this one, please contact adamjshook@apache.org.