hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject hadoop git commit: HDFS-7054. Make DFSOutputStream tracing more fine-grained (cmccabe)
Date Thu, 19 Mar 2015 01:15:40 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 388696c08 -> 94976cb36


HDFS-7054. Make DFSOutputStream tracing more fine-grained (cmccabe)

(cherry picked from commit 8234fd0e1087e0e49aa1d6f286f292b7f70b368e)
(cherry picked from commit 79c07bbacae94bb0aae476dfbc17ae38564e2028)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/94976cb3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/94976cb3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/94976cb3

Branch: refs/heads/branch-2.7
Commit: 94976cb3694113c147725a9c8c227ab0ec88d9e4
Parents: 388696c
Author: Colin Patrick Mccabe <cmccabe@cloudera.com>
Authored: Wed Mar 18 18:06:17 2015 -0700
Committer: Colin Patrick Mccabe <cmccabe@cloudera.com>
Committed: Wed Mar 18 18:15:34 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 +
 .../org/apache/hadoop/hdfs/DFSOutputStream.java | 318 ++++++++++++-------
 .../java/org/apache/hadoop/hdfs/DFSPacket.java  |  73 +++++
 .../org/apache/hadoop/hdfs/TestDFSPacket.java   |  25 ++
 4 files changed, 301 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/94976cb3/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index f01f047..6c3a3a7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -437,6 +437,8 @@ Release 2.7.0 - UNRELEASED
     HDFS-7940. Add tracing to DFSClient#setQuotaByStorageType (Rakesh R via
     Colin P. McCabe)
 
+    HDFS-7054. Make DFSOutputStream tracing more fine-grained (cmccabe)
+
   OPTIMIZATIONS
 
     HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/94976cb3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 0a8720a..a5983c7 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -95,8 +95,11 @@ import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DataChecksum.Type;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Time;
+import org.apache.htrace.NullScope;
+import org.apache.htrace.Sampler;
 import org.apache.htrace.Span;
 import org.apache.htrace.Trace;
+import org.apache.htrace.TraceInfo;
 import org.apache.htrace.TraceScope;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -270,17 +273,11 @@ public class DFSOutputStream extends FSOutputSummer
     /** Append on an existing block? */
     private final boolean isAppend;
 
-    private final Span traceSpan;
-
-    /**
-     * construction with tracing info
-     */
-    private DataStreamer(HdfsFileStatus stat, ExtendedBlock block, Span span) {
+    private DataStreamer(HdfsFileStatus stat, ExtendedBlock block) {
       isAppend = false;
       isLazyPersistFile = isLazyPersist(stat);
       this.block = block;
       stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
-      traceSpan = span;
     }
     
     /**
@@ -291,10 +288,9 @@ public class DFSOutputStream extends FSOutputSummer
      * @throws IOException if error occurs
      */
     private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
-        int bytesPerChecksum, Span span) throws IOException {
+        int bytesPerChecksum) throws IOException {
       isAppend = true;
       stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
-      traceSpan = span;
       block = lastBlock.getBlock();
       bytesSent = block.getNumBytes();
       accessToken = lastBlock.getBlockToken();
@@ -385,12 +381,8 @@ public class DFSOutputStream extends FSOutputSummer
     @Override
     public void run() {
       long lastPacket = Time.now();
-      TraceScope traceScope = null;
-      if (traceSpan != null) {
-        traceScope = Trace.continueSpan(traceSpan);
-      }
+      TraceScope scope = NullScope.INSTANCE;
       while (!streamerClosed && dfsClient.clientRunning) {
-
         // if the Responder encountered an error, shutdown Responder
         if (hasError && response != null) {
           try {
@@ -436,11 +428,18 @@ public class DFSOutputStream extends FSOutputSummer
             // get packet to be sent.
             if (dataQueue.isEmpty()) {
               one = createHeartbeatPacket();
+              assert one != null;
             } else {
               one = dataQueue.getFirst(); // regular data packet
+              long parents[] = one.getTraceParents();
+              if (parents.length > 0) {
+                scope = Trace.startSpan("dataStreamer", new TraceInfo(0, parents[0]));
+                // TODO: use setParents API once it's available from HTrace 3.2
+//                scope = Trace.startSpan("dataStreamer", Sampler.ALWAYS);
+//                scope.getSpan().setParents(parents);
+              }
             }
           }
-          assert one != null;
 
           // get new block from namenode.
           if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
@@ -486,9 +485,12 @@ public class DFSOutputStream extends FSOutputSummer
           }
           
           // send the packet
+          Span span = null;
           synchronized (dataQueue) {
             // move packet from dataQueue to ackQueue
             if (!one.isHeartbeatPacket()) {
+              span = scope.detach();
+              one.setTraceSpan(span);
               dataQueue.removeFirst();
               ackQueue.addLast(one);
               dataQueue.notifyAll();
@@ -501,6 +503,7 @@ public class DFSOutputStream extends FSOutputSummer
           }
 
           // write out data to remote datanode
+          TraceScope writeScope = Trace.startSpan("writeTo", span);
           try {
             one.writeTo(blockStream);
             blockStream.flush();   
@@ -513,6 +516,8 @@ public class DFSOutputStream extends FSOutputSummer
             // will be taken out then.
             tryMarkPrimaryDatanodeFailed();
             throw e;
+          } finally {
+            writeScope.close();
           }
           lastPacket = Time.now();
           
@@ -562,11 +567,10 @@ public class DFSOutputStream extends FSOutputSummer
             // Not a datanode issue
             streamerClosed = true;
           }
+        } finally {
+          scope.close();
         }
       }
-      if (traceScope != null) {
-        traceScope.close();
-      }
       closeInternal();
     }
 
@@ -721,6 +725,7 @@ public class DFSOutputStream extends FSOutputSummer
         setName("ResponseProcessor for block " + block);
         PipelineAck ack = new PipelineAck();
 
+        TraceScope scope = NullScope.INSTANCE;
         while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock)
{
           // process responses from datanodes.
           try {
@@ -795,6 +800,8 @@ public class DFSOutputStream extends FSOutputSummer
             block.setNumBytes(one.getLastByteOffsetBlock());
 
             synchronized (dataQueue) {
+              scope = Trace.continueSpan(one.getTraceSpan());
+              one.setTraceSpan(null);
               lastAckedSeqno = seqno;
               ackQueue.removeFirst();
               dataQueue.notifyAll();
@@ -819,6 +826,8 @@ public class DFSOutputStream extends FSOutputSummer
               }
               responderClosed = true;
             }
+          } finally {
+            scope.close();
           }
         }
       }
@@ -879,6 +888,12 @@ public class DFSOutputStream extends FSOutputSummer
           // a client waiting on close() will be aware that the flush finished.
           synchronized (dataQueue) {
             DFSPacket endOfBlockPacket = dataQueue.remove();  // remove the end of block
packet
+            Span span = endOfBlockPacket.getTraceSpan();
+            if (span != null) {
+              // Close any trace span associated with this Packet
+              TraceScope scope = Trace.continueSpan(span);
+              scope.close();
+            }
             assert endOfBlockPacket.isLastPacketInBlock();
             assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1;
             lastAckedSeqno = endOfBlockPacket.getSeqno();
@@ -1586,11 +1601,7 @@ public class DFSOutputStream extends FSOutputSummer
 
     computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
 
-    Span traceSpan = null;
-    if (Trace.isTracing()) {
-      traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach();
-    }
-    streamer = new DataStreamer(stat, null, traceSpan);
+    streamer = new DataStreamer(stat, null);
     if (favoredNodes != null && favoredNodes.length != 0) {
       streamer.setFavoredNodes(favoredNodes);
     }
@@ -1600,50 +1611,56 @@ public class DFSOutputStream extends FSOutputSummer
       FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
       short replication, long blockSize, Progressable progress, int buffersize,
       DataChecksum checksum, String[] favoredNodes) throws IOException {
-    HdfsFileStatus stat = null;
-
-    // Retry the create if we get a RetryStartFileException up to a maximum
-    // number of times
-    boolean shouldRetry = true;
-    int retryCount = CREATE_RETRY_COUNT;
-    while (shouldRetry) {
-      shouldRetry = false;
-      try {
-        stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
-            new EnumSetWritable<CreateFlag>(flag), createParent, replication,
-            blockSize, SUPPORTED_CRYPTO_VERSIONS);
-        break;
-      } catch (RemoteException re) {
-        IOException e = re.unwrapRemoteException(
-            AccessControlException.class,
-            DSQuotaExceededException.class,
-            FileAlreadyExistsException.class,
-            FileNotFoundException.class,
-            ParentNotDirectoryException.class,
-            NSQuotaExceededException.class,
-            RetryStartFileException.class,
-            SafeModeException.class,
-            UnresolvedPathException.class,
-            SnapshotAccessControlException.class,
-            UnknownCryptoProtocolVersionException.class);
-        if (e instanceof RetryStartFileException) {
-          if (retryCount > 0) {
-            shouldRetry = true;
-            retryCount--;
+    TraceScope scope =
+        dfsClient.getPathTraceScope("newStreamForCreate", src);
+    try {
+      HdfsFileStatus stat = null;
+
+      // Retry the create if we get a RetryStartFileException up to a maximum
+      // number of times
+      boolean shouldRetry = true;
+      int retryCount = CREATE_RETRY_COUNT;
+      while (shouldRetry) {
+        shouldRetry = false;
+        try {
+          stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
+              new EnumSetWritable<CreateFlag>(flag), createParent, replication,
+              blockSize, SUPPORTED_CRYPTO_VERSIONS);
+          break;
+        } catch (RemoteException re) {
+          IOException e = re.unwrapRemoteException(
+              AccessControlException.class,
+              DSQuotaExceededException.class,
+              FileAlreadyExistsException.class,
+              FileNotFoundException.class,
+              ParentNotDirectoryException.class,
+              NSQuotaExceededException.class,
+              RetryStartFileException.class,
+              SafeModeException.class,
+              UnresolvedPathException.class,
+              SnapshotAccessControlException.class,
+              UnknownCryptoProtocolVersionException.class);
+          if (e instanceof RetryStartFileException) {
+            if (retryCount > 0) {
+              shouldRetry = true;
+              retryCount--;
+            } else {
+              throw new IOException("Too many retries because of encryption" +
+                  " zone operations", e);
+            }
           } else {
-            throw new IOException("Too many retries because of encryption" +
-                " zone operations", e);
+            throw e;
           }
-        } else {
-          throw e;
         }
       }
+      Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
+      final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
+          flag, progress, checksum, favoredNodes);
+      out.start();
+      return out;
+    } finally {
+      scope.close();
     }
-    Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
-    final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
-        flag, progress, checksum, favoredNodes);
-    out.start();
-    return out;
   }
 
   /** Construct a new output stream for append. */
@@ -1653,21 +1670,16 @@ public class DFSOutputStream extends FSOutputSummer
     this(dfsClient, src, progress, stat, checksum);
     initialFileSize = stat.getLen(); // length of file when opened
 
-    Span traceSpan = null;
-    if (Trace.isTracing()) {
-      traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach();
-    }
-
     // The last partial block of the file has to be filled.
     if (!toNewBlock && lastBlock != null) {
       // indicate that we are appending to an existing block
       bytesCurBlock = lastBlock.getBlockSize();
-      streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum, traceSpan);
+      streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum);
     } else {
       computePacketChunkSize(dfsClient.getConf().writePacketSize,
           bytesPerChecksum);
       streamer = new DataStreamer(stat,
-          lastBlock != null ? lastBlock.getBlock() : null, traceSpan);
+          lastBlock != null ? lastBlock.getBlock() : null);
     }
     this.fileEncryptionInfo = stat.getFileEncryptionInfo();
   }
@@ -1676,13 +1688,19 @@ public class DFSOutputStream extends FSOutputSummer
       boolean toNewBlock, int bufferSize, Progressable progress,
       LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum,
       String[] favoredNodes) throws IOException {
-    final DFSOutputStream out = new DFSOutputStream(dfsClient, src, toNewBlock,
-        progress, lastBlock, stat, checksum);
-    if (favoredNodes != null && favoredNodes.length != 0) {
-      out.streamer.setFavoredNodes(favoredNodes);
+    TraceScope scope =
+        dfsClient.getPathTraceScope("newStreamForAppend", src);
+    try {
+      final DFSOutputStream out = new DFSOutputStream(dfsClient, src, toNewBlock,
+          progress, lastBlock, stat, checksum);
+      if (favoredNodes != null && favoredNodes.length != 0) {
+        out.streamer.setFavoredNodes(favoredNodes);
+      }
+      out.start();
+      return out;
+    } finally {
+      scope.close();
     }
-    out.start();
-    return out;
   }
   
   private static boolean isLazyPersist(HdfsFileStatus stat) {
@@ -1707,6 +1725,7 @@ public class DFSOutputStream extends FSOutputSummer
   private void queueCurrentPacket() {
     synchronized (dataQueue) {
       if (currentPacket == null) return;
+      currentPacket.addTraceParent(Trace.currentSpan());
       dataQueue.addLast(currentPacket);
       lastQueuedSeqno = currentPacket.getSeqno();
       if (DFSClient.LOG.isDebugEnabled()) {
@@ -1721,23 +1740,39 @@ public class DFSOutputStream extends FSOutputSummer
     synchronized (dataQueue) {
       try {
       // If queue is full, then wait till we have enough space
-      while (!isClosed() && dataQueue.size() + ackQueue.size() > dfsClient.getConf().writeMaxPackets)
{
+        boolean firstWait = true;
         try {
-          dataQueue.wait();
-        } catch (InterruptedException e) {
-          // If we get interrupted while waiting to queue data, we still need to get rid
-          // of the current packet. This is because we have an invariant that if
-          // currentPacket gets full, it will get queued before the next writeChunk.
-          //
-          // Rather than wait around for space in the queue, we should instead try to
-          // return to the caller as soon as possible, even though we slightly overrun
-          // the MAX_PACKETS length.
-          Thread.currentThread().interrupt();
-          break;
+          while (!isClosed() && dataQueue.size() + ackQueue.size() >
+              dfsClient.getConf().writeMaxPackets) {
+            if (firstWait) {
+              Span span = Trace.currentSpan();
+              if (span != null) {
+                span.addTimelineAnnotation("dataQueue.wait");
+              }
+              firstWait = false;
+            }
+            try {
+              dataQueue.wait();
+            } catch (InterruptedException e) {
+              // If we get interrupted while waiting to queue data, we still need to get
rid
+              // of the current packet. This is because we have an invariant that if
+              // currentPacket gets full, it will get queued before the next writeChunk.
+              //
+              // Rather than wait around for space in the queue, we should instead try to
+              // return to the caller as soon as possible, even though we slightly overrun
+              // the MAX_PACKETS length.
+              Thread.currentThread().interrupt();
+              break;
+            }
+          }
+        } finally {
+          Span span = Trace.currentSpan();
+          if ((span != null) && (!firstWait)) {
+            span.addTimelineAnnotation("end.wait");
+          }
         }
-      }
-      checkClosed();
-      queueCurrentPacket();
+        checkClosed();
+        queueCurrentPacket();
       } catch (ClosedChannelException e) {
       }
     }
@@ -1747,6 +1782,17 @@ public class DFSOutputStream extends FSOutputSummer
   @Override
   protected synchronized void writeChunk(byte[] b, int offset, int len,
       byte[] checksum, int ckoff, int cklen) throws IOException {
+    TraceScope scope =
+        dfsClient.getPathTraceScope("DFSOutputStream#writeChunk", src);
+    try {
+      writeChunkImpl(b, offset, len, checksum, ckoff, cklen);
+    } finally {
+      scope.close();
+    }
+  }
+
+  private synchronized void writeChunkImpl(byte[] b, int offset, int len,
+          byte[] checksum, int ckoff, int cklen) throws IOException {
     dfsClient.checkOpen();
     checkClosed();
 
@@ -1835,12 +1881,24 @@ public class DFSOutputStream extends FSOutputSummer
    */
   @Override
   public void hflush() throws IOException {
-    flushOrSync(false, EnumSet.noneOf(SyncFlag.class));
+    TraceScope scope =
+        dfsClient.getPathTraceScope("hflush", src);
+    try {
+      flushOrSync(false, EnumSet.noneOf(SyncFlag.class));
+    } finally {
+      scope.close();
+    }
   }
 
   @Override
   public void hsync() throws IOException {
-    hsync(EnumSet.noneOf(SyncFlag.class));
+    TraceScope scope =
+        dfsClient.getPathTraceScope("hsync", src);
+    try {
+      flushOrSync(true, EnumSet.noneOf(SyncFlag.class));
+    } finally {
+      scope.close();
+    }
   }
   
   /**
@@ -1857,7 +1915,13 @@ public class DFSOutputStream extends FSOutputSummer
    *          whether or not to update the block length in NameNode.
    */
   public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException {
-    flushOrSync(true, syncFlags);
+    TraceScope scope =
+        dfsClient.getPathTraceScope("hsync", src);
+    try {
+      flushOrSync(true, syncFlags);
+    } finally {
+      scope.close();
+    }
   }
 
   /**
@@ -2038,33 +2102,38 @@ public class DFSOutputStream extends FSOutputSummer
   }
 
   private void waitForAckedSeqno(long seqno) throws IOException {
-    if (DFSClient.LOG.isDebugEnabled()) {
-      DFSClient.LOG.debug("Waiting for ack for: " + seqno);
-    }
-    long begin = Time.monotonicNow();
+    TraceScope scope = Trace.startSpan("waitForAckedSeqno", Sampler.NEVER);
     try {
-      synchronized (dataQueue) {
-        while (!isClosed()) {
-          checkClosed();
-          if (lastAckedSeqno >= seqno) {
-            break;
-          }
-          try {
-            dataQueue.wait(1000); // when we receive an ack, we notify on
-                                  // dataQueue
-          } catch (InterruptedException ie) {
-            throw new InterruptedIOException(
-                "Interrupted while waiting for data to be acknowledged by pipeline");
+      if (DFSClient.LOG.isDebugEnabled()) {
+        DFSClient.LOG.debug("Waiting for ack for: " + seqno);
+      }
+      long begin = Time.monotonicNow();
+      try {
+        synchronized (dataQueue) {
+          while (!isClosed()) {
+            checkClosed();
+            if (lastAckedSeqno >= seqno) {
+              break;
+            }
+            try {
+              dataQueue.wait(1000); // when we receive an ack, we notify on
+              // dataQueue
+            } catch (InterruptedException ie) {
+              throw new InterruptedIOException(
+                  "Interrupted while waiting for data to be acknowledged by pipeline");
+            }
           }
         }
+        checkClosed();
+      } catch (ClosedChannelException e) {
       }
-      checkClosed();
-    } catch (ClosedChannelException e) {
-    }
-    long duration = Time.monotonicNow() - begin;
-    if (duration > dfsclientSlowLogThresholdMs) {
-      DFSClient.LOG.warn("Slow waitForAckedSeqno took " + duration
-          + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)");
+      long duration = Time.monotonicNow() - begin;
+      if (duration > dfsclientSlowLogThresholdMs) {
+        DFSClient.LOG.warn("Slow waitForAckedSeqno took " + duration
+            + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)");
+      }
+    } finally {
+      scope.close();
     }
   }
 
@@ -2129,6 +2198,16 @@ public class DFSOutputStream extends FSOutputSummer
    */
   @Override
   public synchronized void close() throws IOException {
+    TraceScope scope =
+        dfsClient.getPathTraceScope("DFSOutputStream#close", src);
+    try {
+      closeImpl();
+    } finally {
+      scope.close();
+    }
+  }
+
+  private synchronized void closeImpl() throws IOException {
     if (isClosed()) {
       IOException e = lastException.getAndSet(null);
       if (e == null)
@@ -2154,7 +2233,12 @@ public class DFSOutputStream extends FSOutputSummer
       // get last block before destroying the streamer
       ExtendedBlock lastBlock = streamer.getBlock();
       closeThreads(false);
-      completeFile(lastBlock);
+      TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER);
+      try {
+        completeFile(lastBlock);
+      } finally {
+        scope.close();
+      }
       dfsClient.endFileLease(fileId);
     } catch (ClosedChannelException e) {
     } finally {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/94976cb3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
index 9b3ea51..7e7f780 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java
@@ -21,9 +21,12 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.BufferOverflowException;
 import java.nio.channels.ClosedChannelException;
+import java.util.Arrays;
+
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.util.ByteArrayManager;
+import org.apache.htrace.Span;
 
 /****************************************************************
  * DFSPacket is used by DataStreamer and DFSOutputStream.
@@ -33,6 +36,7 @@ import org.apache.hadoop.hdfs.util.ByteArrayManager;
 
 class DFSPacket {
   public static final long HEART_BEAT_SEQNO = -1L;
+  private static long[] EMPTY = new long[0];
   private final long seqno; // sequence number of buffer in block
   private final long offsetInBlock; // offset in block
   private boolean syncBlock; // this packet forces the current block to disk
@@ -59,6 +63,9 @@ class DFSPacket {
   private int checksumPos;
   private final int dataStart;
   private int dataPos;
+  private long[] traceParents = EMPTY;
+  private int traceParentsUsed;
+  private Span span;
 
   /**
    * Create a new packet.
@@ -267,4 +274,70 @@ class DFSPacket {
         " lastPacketInBlock: " + this.lastPacketInBlock +
         " lastByteOffsetInBlock: " + this.getLastByteOffsetBlock();
   }
+
+  /**
+   * Add a trace parent span for this packet.<p/>
+   *
+   * Trace parent spans for a packet are the trace spans responsible for
+   * adding data to that packet.  We store them as an array of longs for
+   * efficiency.<p/>
+   *
+   * Protected by the DFSOutputStream dataQueue lock.
+   */
+  public void addTraceParent(Span span) {
+    if (span == null) {
+      return;
+    }
+    addTraceParent(span.getSpanId());
+  }
+
+  public void addTraceParent(long id) {
+    if (traceParentsUsed == traceParents.length) {
+      int newLength = (traceParents.length == 0) ? 8 :
+          traceParents.length * 2;
+      traceParents = Arrays.copyOf(traceParents, newLength);
+    }
+    traceParents[traceParentsUsed] = id;
+    traceParentsUsed++;
+  }
+
+  /**
+   * Get the trace parent spans for this packet.<p/>
+   *
+   * Will always be non-null.<p/>
+   *
+   * Protected by the DFSOutputStream dataQueue lock.
+   */
+  public long[] getTraceParents() {
+    // Remove duplicates from the array.
+    int len = traceParentsUsed;
+    Arrays.sort(traceParents, 0, len);
+    int i = 0, j = 0;
+    long prevVal = 0; // 0 is not a valid span id
+    while (true) {
+      if (i == len) {
+        break;
+      }
+      long val = traceParents[i];
+      if (val != prevVal) {
+        traceParents[j] = val;
+        j++;
+        prevVal = val;
+      }
+      i++;
+    }
+    if (j < traceParents.length) {
+      traceParents = Arrays.copyOf(traceParents, j);
+      traceParentsUsed = traceParents.length;
+    }
+    return traceParents;
+  }
+
+  public void setTraceSpan(Span span) {
+    this.span = span;
+  }
+
+  public Span getTraceSpan() {
+    return span;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/94976cb3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPacket.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPacket.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPacket.java
index 8bf6097..daee608 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPacket.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSPacket.java
@@ -65,4 +65,29 @@ public class TestDFSPacket {
       }
     }
   }
+
+  @Test
+  public void testAddParentsGetParents() throws Exception {
+    DFSPacket p = new DFSPacket(null, maxChunksPerPacket,
+                                0, 0, checksumSize, false);
+    long parents[] = p.getTraceParents();
+    Assert.assertEquals(0, parents.length);
+    p.addTraceParent(123);
+    p.addTraceParent(123);
+    parents = p.getTraceParents();
+    Assert.assertEquals(1, parents.length);
+    Assert.assertEquals(123, parents[0]);
+    parents = p.getTraceParents(); // test calling 'get' again.
+    Assert.assertEquals(1, parents.length);
+    Assert.assertEquals(123, parents[0]);
+    p.addTraceParent(1);
+    p.addTraceParent(456);
+    p.addTraceParent(789);
+    parents = p.getTraceParents();
+    Assert.assertEquals(4, parents.length);
+    Assert.assertEquals(1, parents[0]);
+    Assert.assertEquals(123, parents[1]);
+    Assert.assertEquals(456, parents[2]);
+    Assert.assertEquals(789, parents[3]);
+  }
 }


Mime
View raw message