Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 00780200CF3 for ; Wed, 13 Sep 2017 22:41:09 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F334B1609CA; Wed, 13 Sep 2017 20:41:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 26A611609C3 for ; Wed, 13 Sep 2017 22:41:08 +0200 (CEST) Received: (qmail 92880 invoked by uid 500); 13 Sep 2017 20:41:06 -0000 Mailing-List: contact dev-help@tephra.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tephra.incubator.apache.org Delivered-To: mailing list dev@tephra.incubator.apache.org Received: (qmail 92869 invoked by uid 99); 13 Sep 2017 20:41:06 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Sep 2017 20:41:06 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id D9C641A2049 for ; Wed, 13 Sep 2017 20:41:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.02 X-Spam-Level: X-Spam-Status: No, score=-4.02 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id hvGoc7gt05r4 for ; Wed, 13 Sep 2017 20:41:04 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 983845FC21 for ; Wed, 13 Sep 2017 20:41:03 +0000 (UTC) Received: (qmail 92730 invoked by uid 99); 13 Sep 2017 20:41:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Sep 2017 20:41:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CFDD5F5713; Wed, 13 Sep 2017 20:41:02 +0000 (UTC) From: anew To: dev@tephra.incubator.apache.org Reply-To: dev@tephra.incubator.apache.org References: In-Reply-To: Subject: [GitHub] incubator-tephra pull request #53: (TEPHRA-243) Improve logging for slow log... Content-Type: text/plain Message-Id: <20170913204102.CFDD5F5713@git1-us-west.apache.org> Date: Wed, 13 Sep 2017 20:41:02 +0000 (UTC) archived-at: Wed, 13 Sep 2017 20:41:09 -0000 Github user anew commented on a diff in the pull request: https://github.com/apache/incubator-tephra/pull/53#discussion_r138733253 --- Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java --- @@ -134,57 +124,112 @@ private void ensureAvailable() throws IOException { } } - /* - * Appends new writes to the pendingWrites. It is better to keep it in - * our own queue rather than writing it to the HDFS output stream because - * HDFSOutputStream.writeChunk is not lightweight at all. + /** + * Return all pending writes at the time the method is called, or null if no writes are pending. + * + * Note that after this method returns, there can be additional pending writes, + * added concurrently while the existing pending writes are removed. + */ + @Nullable + private Entry[] getPendingWrites() { + synchronized (this) { + if (pendingWrites.isEmpty()) { + return null; + } + Entry[] entriesToSync = new Entry[pendingWrites.size()]; + for (int i = 0; i < entriesToSync.length; i++) { + entriesToSync[i] = pendingWrites.remove(); + } + return entriesToSync; + } + } + + /** + * When multiple threads try to log edits at the same time, they all will call (@link #append} + * followed by {@link #sync()}, concurrently. Hence, it can happen that multiple {@code append()} + * are followed by a single {@code sync}, or vice versa. + * + * We want to record the time and position of the first {@code append()} after a {@code sync()}, + * then measure the time after the next {@code sync()}, and log a warning if it exceeds a threshold. + * Therefore this is called every time before we write the pending list out to the log writer. + * + * See {@link #stopTimerIfNeeded(TransactionLogWriter)}. + * + * @throws IOException if the position of the writer cannot be determined */ - private void append(Entry e) throws IOException { - pendingWrites.add(e); + private void startTimerIfNeeded(TransactionLogWriter writer, int entryCount) throws IOException { + // no sync needed because this is only called within a sync block + if (positionBeforeWrite == -1L) { + positionBeforeWrite = writer.getPosition(); + countSinceLastSync = 0; + stopWatch.reset().start(); + } + countSinceLastSync += entryCount; } - // Returns all currently pending writes. New writes - // will accumulate in a new list. - private List getPendingWrites() { - synchronized (this) { - List save = this.pendingWrites; - this.pendingWrites = new LinkedList<>(); - return save; + /** + * Called by a {@code sync()} after flushing to file system. Issues a warning if the write(s)+sync + * together exceed a threshold. + * + * See {@link #startTimerIfNeeded(TransactionLogWriter, int)}. + * + * @throws IOException if the position of the writer cannot be determined + */ + private void stopTimerIfNeeded(TransactionLogWriter writer) throws IOException { + // this method is only called by a thread if it actually called sync(), inside a sync block + if (positionBeforeWrite != -1L) { // actually it should never be -1, but just in case + stopWatch.stop(); + long elapsed = stopWatch.elapsedMillis(); + if (elapsed >= slowAppendThreshold) { + long currentPosition = writer.getPosition(); + long bytesWritten = currentPosition - positionBeforeWrite; + LOG.info("Slow append to log {}, took {} ms for {} entr{} and {} bytes.", + getName(), elapsed, countSinceLastSync, countSinceLastSync == 1 ? "y" : "ies", bytesWritten); + } } + positionBeforeWrite = -1L; + countSinceLastSync = 0; } private void sync() throws IOException { // writes out pending entries to the HLog - TransactionLogWriter tmpWriter = null; long latestSeq = 0; int entryCount = 0; synchronized (this) { if (closed) { return; } - // prevent writer being dereferenced - tmpWriter = writer; - - List currentPending = getPendingWrites(); - if (!currentPending.isEmpty()) { - tmpWriter.commitMarker(currentPending.size()); - } - - // write out all accumulated entries to log. - for (Entry e : currentPending) { - tmpWriter.append(e); - entryCount++; - latestSeq = Math.max(latestSeq, e.getKey().get()); + Entry[] currentPending = getPendingWrites(); + if (currentPending != null) { + entryCount = currentPending.length; + startTimerIfNeeded(writer, entryCount); + writer.commitMarker(entryCount); + for (Entry e : currentPending) { + writer.append(e); + } + // sequence are guaranteed to be ascending, so the last one is the greatest + latestSeq = currentPending[currentPending.length - 1].getKey().get(); + writtenUpTo = latestSeq; } } - long lastSynced = syncedUpTo.get(); + // giving up the sync lock here allows other threads to write their edits before the sync happens. + // hence, we can have the edits from n threads in one sync. + // someone else might have already synced our edits, avoid double syncing - if (lastSynced < latestSeq) { - tmpWriter.sync(); - metricsCollector.histogram("wal.sync.size", entryCount); - syncedUpTo.compareAndSet(lastSynced, latestSeq); + if (syncedUpTo < latestSeq) { + synchronized (this) { + // someone else might have synced our edits while we were waiting + if (syncedUpTo < latestSeq) { + writer.sync(); + syncedUpTo = writtenUpTo; + stopTimerIfNeeded(writer); + } + } } + // in any case, emit metrics for the number entries we wrote. + // because the thread that actually syncs does not know how many it synced (it not write all of them) + metricsCollector.histogram("wal.sync.size", entryCount); --- End diff -- we could emit that metric in stopTimerIfNeeded(). And actually this one, too. ---