Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1B784200C81 for ; Thu, 20 Apr 2017 19:35:27 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1A632160BB0; Thu, 20 Apr 2017 17:35:27 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 17900160B9F for ; Thu, 20 Apr 2017 19:35:25 +0200 (CEST) Received: (qmail 17508 invoked by uid 500); 20 Apr 2017 17:35:25 -0000 Mailing-List: contact commits-help@bookkeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: bookkeeper-dev@bookkeeper.apache.org Delivered-To: mailing list commits@bookkeeper.apache.org Received: (qmail 17443 invoked by uid 99); 20 Apr 2017 17:35:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Apr 2017 17:35:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 03D7EF4A25; Thu, 20 Apr 2017 17:35:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sijie@apache.org To: commits@bookkeeper.apache.org Date: Thu, 20 Apr 2017 17:35:24 -0000 Message-Id: <511d0b31c84a4e5dbd49325ff28d1846@git.apache.org> In-Reply-To: <70cd479885334178accd53aff931aa07@git.apache.org> References: <70cd479885334178accd53aff931aa07@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/7] bookkeeper git commit: fix merge conflicts archived-at: Thu, 20 Apr 2017 17:35:27 -0000 fix merge conflicts Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/014512c3 Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/014512c3 Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/014512c3 Branch: refs/heads/sijie/bookkeeper_fallocate Commit: 014512c3563bd06bd90789db0c5d0369fe421a62 Parents: d980296 Author: Sijie Guo Authored: Thu Nov 17 17:32:15 2016 -0800 Committer: Sijie Guo Committed: Thu Nov 17 17:32:15 2016 -0800 ---------------------------------------------------------------------- .../bookie/BookKeeperServerStats.java | 1 + .../bookkeeper/bookie/BufferedChannel.java | 8 +++-- .../org/apache/bookkeeper/bookie/Journal.java | 34 +++++++++----------- 3 files changed, 23 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/014512c3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java index 239f923..79c0d61 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java @@ -53,6 +53,7 @@ public interface BookKeeperServerStats { public final static String JOURNAL_FORCE_WRITE_BATCH_ENTRIES = "JOURNAL_FORCE_WRITE_BATCH_ENTRIES"; public final static String JOURNAL_FORCE_WRITE_BATCH_BYTES = "JOURNAL_FORCE_WRITE_BATCH_BYTES"; public final static String JOURNAL_FLUSH_LATENCY = "JOURNAL_FLUSH_LATENCY"; + public final static String JOURNAL_FLUSH_IN_MEM_ADD = "JOURNAL_FLUSH_IN_MEM_ADD"; public final static String JOURNAL_QUEUE_LATENCY = "JOURNAL_QUEUE_LATENCY"; public final static String JOURNAL_PROCESS_TIME_LATENCY = "JOURNAL_PROCESS_TIME_LATENCY"; public final static String JOURNAL_CREATION_LATENCY = "JOURNAL_CREATION_LATENCY"; http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/014512c3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java index cb7d914..0492943 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java @@ -24,9 +24,10 @@ package org.apache.bookkeeper.bookie; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; -import org.apache.bookkeeper.util.ZeroBuffer; import java.util.concurrent.atomic.AtomicLong; +import org.apache.bookkeeper.util.ZeroBuffer; + /** * Provides a buffering layer in front of a FileChannel. */ @@ -64,8 +65,9 @@ public class BufferedChannel extends BufferedReadChannel { * @param src The source ByteBuffer which contains the data to be written. * @throws IOException if a write operation fails. */ - synchronized public void write(ByteBuffer src) throws IOException { + synchronized public int write(ByteBuffer src) throws IOException { int copied = 0; + int flushes = 0; while(src.remaining() > 0) { int truncated = 0; if (writeBuffer.remaining() < src.remaining()) { @@ -78,9 +80,11 @@ public class BufferedChannel extends BufferedReadChannel { // if we have run out of buffer space, we should flush to the file if (writeBuffer.remaining() == 0) { flushInternal(); + ++flushes; } } position += copied; + return flushes; } /** http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/014512c3/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index dd62d28..e8adfff 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -30,8 +30,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -39,18 +37,15 @@ import com.google.common.base.Stopwatch; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; -<<<<<<< HEAD import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; -======= -import org.apache.bookkeeper.stats.BookkeeperServerStatsLogger; -import org.apache.bookkeeper.stats.ServerStatsProvider; ->>>>>>> 2d5718f... bookie: fallocate & sync_file_range import org.apache.bookkeeper.util.DaemonThreadFactory; import org.apache.bookkeeper.util.IOUtils; import org.apache.bookkeeper.util.MathUtils; +import org.apache.bookkeeper.util.OrderedSafeExecutor; +import org.apache.bookkeeper.util.SafeRunnable; import org.apache.bookkeeper.util.ZeroBuffer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -295,7 +290,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource { } @Override - public void run() { + public void safeRun() { if (LOG.isDebugEnabled()) { LOG.debug("Acknowledge Ledger: {}, Entry: {}", ledgerId, entryId); } @@ -541,10 +536,12 @@ class Journal extends BookieCriticalThread implements CheckpointSource { private final LedgerDirsManager ledgerDirsManager; // Expose Stats + private final StatsLogger statsLogger; private final OpStatsLogger journalAddEntryStats; - private final OpStatsLogger journalSyncStats; + private final OpStatsLogger journalMemAddEntryStats; private final OpStatsLogger journalCreationStats; private final OpStatsLogger journalFlushStats; + private final OpStatsLogger journalMemAddFlushStats; private final OpStatsLogger journalProcessTimeStats; private final OpStatsLogger journalQueueStats; private final OpStatsLogger forceWriteGroupingCountStats; @@ -580,7 +577,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource { this.cbThreadPool = OrderedSafeExecutor.newBuilder() .name("BookieJournal") .numThreads(conf.getNumJournalCallbackThreads()) - .statsLogger(Stats.get().getStatsLogger("journal")) + .statsLogger(statsLogger) .threadFactory(new DaemonThreadFactory()) .build(); @@ -594,10 +591,12 @@ class Journal extends BookieCriticalThread implements CheckpointSource { LOG.debug("Last Log Mark : {}", lastLogMark.getCurMark()); // Expose Stats + this.statsLogger = statsLogger; journalAddEntryStats = statsLogger.getOpStatsLogger(JOURNAL_ADD_ENTRY); - journalSyncStats = statsLogger.getOpStatsLogger(JOURNAL_SYNC); + journalMemAddEntryStats = statsLogger.getOpStatsLogger(JOURNAL_MEM_ADD_ENTRY); journalCreationStats = statsLogger.getOpStatsLogger(JOURNAL_CREATION_LATENCY); journalFlushStats = statsLogger.getOpStatsLogger(JOURNAL_FLUSH_LATENCY); + journalMemAddFlushStats = statsLogger.getOpStatsLogger(JOURNAL_FLUSH_IN_MEM_ADD); journalQueueStats = statsLogger.getOpStatsLogger(JOURNAL_QUEUE_LATENCY); journalProcessTimeStats = statsLogger.getOpStatsLogger(JOURNAL_PROCESS_TIME_LATENCY); forceWriteGroupingCountStats = statsLogger.getOpStatsLogger(JOURNAL_FORCE_WRITE_GROUPING_COUNT); @@ -928,8 +927,8 @@ class Journal extends BookieCriticalThread implements CheckpointSource { if (!enableGroupForceWrites) { logFile.startSyncRange(prevFlushPosition, lastFlushPosition); } - journalFlushLatencyStats.registerSuccessfulEvent( - journalFlushWatcher.stop().elapsed(TimeUnit.MICROSECONDS)); + journalFlushStats.registerSuccessfulEvent( + journalFlushWatcher.stop().elapsedTime(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); // Trace the lifetime of entries through persistence if (LOG.isDebugEnabled()) { @@ -983,9 +982,9 @@ class Journal extends BookieCriticalThread implements CheckpointSource { flushes += bc.write(lenBuff); flushes += bc.write(qe.entry); - journalMemAddFlushTimesStats.registerSuccessfulEvent(flushes); - journalMemAddLatencyStats.registerSuccessfulEvent( - MathUtils.elapsedMicroSec(qe.enqueueTime)); + journalMemAddFlushStats.registerSuccessfulValue(flushes); + journalMemAddEntryStats.registerSuccessfulEvent( + MathUtils.elapsedMicroSec(qe.enqueueTime), TimeUnit.MICROSECONDS); toFlush.add(qe); qe = null; @@ -1018,8 +1017,7 @@ class Journal extends BookieCriticalThread implements CheckpointSource { LOG.info("Shutting down Journal"); forceWriteThread.shutdown(); cbThreadPool.shutdown(); - ; - if (!cbThreadPool.forceShutdown(5, TimeUnit.SECONDS)) { + if (!cbThreadPool.awaitTermination(5, TimeUnit.SECONDS)) { LOG.warn("Couldn't shutdown journal callback thread gracefully. Forcing"); } running = false;