tephra-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (TEPHRA-243) When transaction log sync is slow, the warning message should contain more information
Date Wed, 13 Sep 2017 22:36:00 GMT

    [ https://issues.apache.org/jira/browse/TEPHRA-243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16165404#comment-16165404
] 

ASF GitHub Bot commented on TEPHRA-243:
---------------------------------------

Github user poornachandra commented on a diff in the pull request:

    https://github.com/apache/incubator-tephra/pull/53#discussion_r138757206
  
    --- Diff: tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java
---
    @@ -85,105 +100,145 @@ public long getTimestamp() {
     
       @Override
       public void append(TransactionEdit edit) throws IOException {
    -    long startTime = System.nanoTime();
    -    synchronized (this) {
    -      ensureAvailable();
    -
    -      Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit);
    -
    -      // add to pending edits
    -      append(entry);
    -    }
    -
    -    // wait for sync to complete
    -    sync();
    -    long durationMillis = (System.nanoTime() - startTime) / 1000000L;
    -    if (durationMillis > SLOW_APPEND_THRESHOLD) {
    -      LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec.");
    -    }
    +    append(Collections.singletonList(edit));
       }
     
       @Override
       public void append(List<TransactionEdit> edits) throws IOException {
    -    long startTime = System.nanoTime();
    -    synchronized (this) {
    -      ensureAvailable();
    -
    +    if (closing) { // or closed, which implies closing
    +      throw new IOException("Log " + getName() + " is closing or already closed, cannot
append");
    +    }
    +    if (!initialized) {
    +      init();
    +    }
    +    // synchronizing here ensures that elements in the queue are ordered by seq number
    +    synchronized (logSequence) {
           for (TransactionEdit edit : edits) {
    -        Entry entry = new Entry(new LongWritable(logSequence.getAndIncrement()), edit);
    -
    -        // add to pending edits
    -        append(entry);
    +        pendingWrites.add(new Entry(new LongWritable(logSequence.getAndIncrement()),
edit));
           }
         }
    -
    -    // wait for sync to complete
    +    // try to sync all pending edits (competing for this with other threads)
         sync();
    -    long durationMillis = (System.nanoTime() - startTime) / 1000000L;
    -    if (durationMillis > SLOW_APPEND_THRESHOLD) {
    -      LOG.info("Slow append to log " + getName() + ", took " + durationMillis + " msec.");
    -    }
       }
     
    -  private void ensureAvailable() throws IOException {
    -    if (closed) {
    -      throw new IOException("Log " + getName() + " is already closed, cannot append!");
    -    }
    -    if (!initialized) {
    -      init();
    +  /**
    +   * Return all pending writes at the time the method is called, or null if no writes
are pending.
    +   *
    +   * Note that after this method returns, there can be additional pending writes,
    +   * added concurrently while the existing pending writes are removed.
    +   */
    +  @Nullable
    +  private Entry[] getPendingWrites() {
    +    synchronized (this) {
    +      if (pendingWrites.isEmpty()) {
    +        return null;
    +      }
    +      Entry[] entriesToSync = new Entry[pendingWrites.size()];
    +      for (int i = 0; i < entriesToSync.length; i++) {
    +        entriesToSync[i] = pendingWrites.remove();
    +      }
    +      return entriesToSync;
         }
       }
     
    -  /*
    -   * Appends new writes to the pendingWrites. It is better to keep it in
    -   * our own queue rather than writing it to the HDFS output stream because
    -   * HDFSOutputStream.writeChunk is not lightweight at all.
    +  /**
    +   * When multiple threads try to log edits at the same time, they all will call (@link
#append}
    +   * followed by {@link #sync()}, concurrently. Hence, it can happen that multiple {@code
append()}
    +   * are followed by a single {@code sync}, or vice versa.
    +   *
    +   * We want to record the time and position of the first {@code append()} after a {@code
sync()},
    +   * then measure the time after the next {@code sync()}, and log a warning if it exceeds
a threshold.
    +   * Therefore this is called every time before we write the pending list out to the
log writer.
    +   *
    +   * See {@link #stopTimer(TransactionLogWriter)}.
    +   *
    +   * @throws IOException if the position of the writer cannot be determined
        */
    -  private void append(Entry e) throws IOException {
    -    pendingWrites.add(e);
    +  private void startTimerIfNeeded(TransactionLogWriter writer, int entryCount) throws
IOException {
    +    // no sync needed because this is only called within a sync block
    +    if (positionBeforeWrite == -1L) {
    +      positionBeforeWrite = writer.getPosition();
    +      countSinceLastSync = 0;
    +      stopWatch.reset().start();
    +    }
    +    countSinceLastSync += entryCount;
       }
     
    -  // Returns all currently pending writes. New writes
    -  // will accumulate in a new list.
    -  private List<Entry> getPendingWrites() {
    -    synchronized (this) {
    -      List<Entry> save = this.pendingWrites;
    -      this.pendingWrites = new LinkedList<>();
    -      return save;
    +  /**
    +   * Called by a {@code sync()} after flushing to file system. Issues a warning if the
write(s)+sync
    +   * together exceed a threshold.
    +   *
    +   * See {@link #startTimerIfNeeded(TransactionLogWriter, int)}.
    +   *
    +   * @throws IOException if the position of the writer cannot be determined
    +   */
    +  private void stopTimer(TransactionLogWriter writer) throws IOException {
    +    // this method is only called by a thread if it actually called sync(), inside a
sync block
    +    if (positionBeforeWrite != -1L) { // actually it should never be -1, but just in
case
    +      stopWatch.stop();
    +      long elapsed = stopWatch.elapsedMillis();
    +      long bytesWritten = writer.getPosition() - positionBeforeWrite;
    +      if (elapsed >= slowAppendThreshold) {
    +        LOG.info("Slow append to log {}, took {} ms for {} entr{} and {} bytes.",
    +                 getName(), elapsed, countSinceLastSync, countSinceLastSync == 1 ? "y"
: "ies", bytesWritten);
    +      }
    +      metricsCollector.histogram("wal.sync.size", countSinceLastSync);
    +      metricsCollector.histogram("wal.sync.bytes", (int) bytesWritten); // single sync
won't exceed max int
         }
    +    positionBeforeWrite = -1L;
    +    countSinceLastSync = 0;
       }
     
       private void sync() throws IOException {
         // writes out pending entries to the HLog
    -    TransactionLogWriter tmpWriter = null;
         long latestSeq = 0;
         int entryCount = 0;
         synchronized (this) {
           if (closed) {
    -        return;
    -      }
    -      // prevent writer being dereferenced
    -      tmpWriter = writer;
    -
    -      List<Entry> currentPending = getPendingWrites();
    -      if (!currentPending.isEmpty()) {
    -        tmpWriter.commitMarker(currentPending.size());
    +        if (pendingWrites.isEmpty()) {
    +          // this expected: close() sets closed to true after syncing all pending writes
(including ours)
    +          return;
    +        }
    +        // this should never happen because close() only sets closed=true after syncing.
    +        // but if it should happen, we must fail this call because we don't know whether
the edit was persisted
    +        throw new IOException(
    +          "Unexpected state: Writer is closed but there are pending edits. Cannot guarantee
that edits were persisted");
           }
    -
    -      // write out all accumulated entries to log.
    -      for (Entry e : currentPending) {
    -        tmpWriter.append(e);
    -        entryCount++;
    -        latestSeq = Math.max(latestSeq, e.getKey().get());
    +      Entry[] currentPending = getPendingWrites();
    +      if (currentPending != null) {
    +        entryCount = currentPending.length;
    +        startTimerIfNeeded(writer, entryCount);
    +        writer.commitMarker(entryCount);
    +        for (Entry e : currentPending) {
    +          writer.append(e);
    +        }
    +        // sequence are guaranteed to be ascending, so the last one is the greatest
    +        latestSeq = currentPending[currentPending.length - 1].getKey().get();
    +        writtenUpTo = latestSeq;
           }
         }
     
    -    long lastSynced = syncedUpTo.get();
    +    // giving up the sync lock here allows other threads to write their edits before
the sync happens.
    +    // hence, we can have the edits from n threads in one sync.
    +
         // someone else might have already synced our edits, avoid double syncing
    -    if (lastSynced < latestSeq) {
    -      tmpWriter.sync();
    -      metricsCollector.histogram("wal.sync.size", entryCount);
    -      syncedUpTo.compareAndSet(lastSynced, latestSeq);
    +    if (syncedUpTo < latestSeq) {
    --- End diff --
    
    Since this check is happening outside the synchronized block, `latestSeq` may not give
the right value - we can just use `writtenUpTo` instead of `latestSeq`. 
    
    To be safe, let's move the definition of both `latestSeq` and `entryCount` into the synchronized
block on line 196, so that they cannot be used outside the block.


> When transaction log sync is slow, the warning message should contain more information
> --------------------------------------------------------------------------------------
>
>                 Key: TEPHRA-243
>                 URL: https://issues.apache.org/jira/browse/TEPHRA-243
>             Project: Tephra
>          Issue Type: Improvement
>    Affects Versions: 0.12.0-incubating
>            Reporter: Andreas Neumann
>            Assignee: Andreas Neumann
>             Fix For: 0.13.0-incubating
>
>
> Currently we get this message:
> {noformat}
> 2017-08-12 00:59:46,938 - INFO [TTransactionServer-rpc-857:o.a.t.p.AbstractTransactionLog@102]
- Slow append to log txlog.1502517541689, took 1431 msec.
> {noformat}
> It would be more useful to know how many bytes were written, how many edits were in this
sync. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message