Github user anew commented on a diff in the pull request:
https://github.com/apache/incubator-tephra/pull/53#discussion_r138733253
--- Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java
---
@@ -134,57 +124,112 @@ private void ensureAvailable() throws IOException {
}
}
- /*
- * Appends new writes to the pendingWrites. It is better to keep it in
- * our own queue rather than writing it to the HDFS output stream because
- * HDFSOutputStream.writeChunk is not lightweight at all.
+ /**
+ * Return all pending writes at the time the method is called, or null if no writes
are pending.
+ *
+ * Note that after this method returns, there can be additional pending writes,
+ * added concurrently while the existing pending writes are removed.
+ */
+ @Nullable
+ private Entry[] getPendingWrites() {
+ synchronized (this) {
+ if (pendingWrites.isEmpty()) {
+ return null;
+ }
+ Entry[] entriesToSync = new Entry[pendingWrites.size()];
+ for (int i = 0; i < entriesToSync.length; i++) {
+ entriesToSync[i] = pendingWrites.remove();
+ }
+ return entriesToSync;
+ }
+ }
+
+ /**
+ * When multiple threads try to log edits at the same time, they all will call (@link
#append}
+ * followed by {@link #sync()}, concurrently. Hence, it can happen that multiple {@code
append()}
+ * are followed by a single {@code sync}, or vice versa.
+ *
+ * We want to record the time and position of the first {@code append()} after a {@code
sync()},
+ * then measure the time after the next {@code sync()}, and log a warning if it exceeds
a threshold.
+ * Therefore this is called every time before we write the pending list out to the
log writer.
+ *
+ * See {@link #stopTimerIfNeeded(TransactionLogWriter)}.
+ *
+ * @throws IOException if the position of the writer cannot be determined
*/
- private void append(Entry e) throws IOException {
- pendingWrites.add(e);
+ private void startTimerIfNeeded(TransactionLogWriter writer, int entryCount) throws
IOException {
+ // no sync needed because this is only called within a sync block
+ if (positionBeforeWrite == -1L) {
+ positionBeforeWrite = writer.getPosition();
+ countSinceLastSync = 0;
+ stopWatch.reset().start();
+ }
+ countSinceLastSync += entryCount;
}
- // Returns all currently pending writes. New writes
- // will accumulate in a new list.
- private List<Entry> getPendingWrites() {
- synchronized (this) {
- List<Entry> save = this.pendingWrites;
- this.pendingWrites = new LinkedList<>();
- return save;
+ /**
+ * Called by a {@code sync()} after flushing to file system. Issues a warning if the
write(s)+sync
+ * together exceed a threshold.
+ *
+ * See {@link #startTimerIfNeeded(TransactionLogWriter, int)}.
+ *
+ * @throws IOException if the position of the writer cannot be determined
+ */
+ private void stopTimerIfNeeded(TransactionLogWriter writer) throws IOException {
+ // this method is only called by a thread if it actually called sync(), inside a
sync block
+ if (positionBeforeWrite != -1L) { // actually it should never be -1, but just in
case
+ stopWatch.stop();
+ long elapsed = stopWatch.elapsedMillis();
+ if (elapsed >= slowAppendThreshold) {
+ long currentPosition = writer.getPosition();
+ long bytesWritten = currentPosition - positionBeforeWrite;
+ LOG.info("Slow append to log {}, took {} ms for {} entr{} and {} bytes.",
+ getName(), elapsed, countSinceLastSync, countSinceLastSync == 1 ? "y"
: "ies", bytesWritten);
+ }
}
+ positionBeforeWrite = -1L;
+ countSinceLastSync = 0;
}
private void sync() throws IOException {
// writes out pending entries to the HLog
- TransactionLogWriter tmpWriter = null;
long latestSeq = 0;
int entryCount = 0;
synchronized (this) {
if (closed) {
return;
}
- // prevent writer being dereferenced
- tmpWriter = writer;
-
- List<Entry> currentPending = getPendingWrites();
- if (!currentPending.isEmpty()) {
- tmpWriter.commitMarker(currentPending.size());
- }
-
- // write out all accumulated entries to log.
- for (Entry e : currentPending) {
- tmpWriter.append(e);
- entryCount++;
- latestSeq = Math.max(latestSeq, e.getKey().get());
+ Entry[] currentPending = getPendingWrites();
+ if (currentPending != null) {
+ entryCount = currentPending.length;
+ startTimerIfNeeded(writer, entryCount);
+ writer.commitMarker(entryCount);
+ for (Entry e : currentPending) {
+ writer.append(e);
+ }
+ // sequence are guaranteed to be ascending, so the last one is the greatest
+ latestSeq = currentPending[currentPending.length - 1].getKey().get();
+ writtenUpTo = latestSeq;
}
}
- long lastSynced = syncedUpTo.get();
+ // giving up the sync lock here allows other threads to write their edits before
the sync happens.
+ // hence, we can have the edits from n threads in one sync.
+
// someone else might have already synced our edits, avoid double syncing
- if (lastSynced < latestSeq) {
- tmpWriter.sync();
- metricsCollector.histogram("wal.sync.size", entryCount);
- syncedUpTo.compareAndSet(lastSynced, latestSeq);
+ if (syncedUpTo < latestSeq) {
+ synchronized (this) {
+ // someone else might have synced our edits while we were waiting
+ if (syncedUpTo < latestSeq) {
+ writer.sync();
+ syncedUpTo = writtenUpTo;
+ stopTimerIfNeeded(writer);
+ }
+ }
}
+ // in any case, emit metrics for the number entries we wrote.
+ // because the thread that actually syncs does not know how many it synced (it not
write all of them)
+ metricsCollector.histogram("wal.sync.size", entryCount);
--- End diff --
we could emit that metric in stopTimerIfNeeded(). And actually this one, too.
---
|