spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aarondav <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-6578] [core] Fix thread-safety issue in...
Date Tue, 31 Mar 2015 02:37:46 GMT
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/5234#discussion_r27449119
  
    --- Diff: network/common/src/main/java/org/apache/spark/network/protocol/MessageEncoder.java
---
    @@ -72,9 +80,84 @@ public void encode(ChannelHandlerContext ctx, Message in, List<Object>
out) {
         in.encode(header);
         assert header.writableBytes() == 0;
     
    -    out.add(header);
         if (body != null && bodyLength > 0) {
    -      out.add(body);
    +      out.add(new MessageWithHeader(header, headerLength, body, bodyLength));
    +    } else {
    +      out.add(header);
         }
       }
    +
    +  /**
    +   * A wrapper message that holds two separate pieces (a header and a body) to avoid
    +   * copying the body's content.
    +   */
    +  private static class MessageWithHeader extends AbstractReferenceCounted implements
FileRegion {
    +
    +    private final ByteBuf header;
    +    private final int headerLength;
    +    private final Object body;
    +    private final long bodyLength;
    +    private int bytesTransferred;
    +
    +    MessageWithHeader(ByteBuf header, int headerLength, Object body, long bodyLength)
{
    +      this.header = header;
    +      this.headerLength = headerLength;
    +      this.body = body;
    +      this.bodyLength = bodyLength;
    +    }
    +
    +    @Override
    +    public long count() {
    +      return headerLength + bodyLength;
    +    }
    +
    +    @Override
    +    public long position() {
    +      return 0;
    +    }
    +
    +    @Override
    +    public long transfered() {
    +      long total = bytesTransferred;
    +      if (body instanceof FileRegion) {
    +        total += ((FileRegion)body).transfered();
    +      }
    +      return total;
    +    }
    +
    +    @Override
    +    public long transferTo(WritableByteChannel target, long position) throws IOException
{
    +      Preconditions.checkArgument(position >= 0 && position < count(),
"Invalid position.");
    +
    +      if (position < headerLength && position >= 0) {
    +        header.skipBytes(Ints.checkedCast(position));
    +        int remaining = header.readableBytes();
    +        target.write(header.nioBuffer());
    +        bytesTransferred += remaining;
    +      }
    +
    +      long bodyPos = position > headerLength ? position - headerLength : 0;
    +      if (body instanceof FileRegion) {
    +        ((FileRegion)body).transferTo(target, bodyPos);
    --- End diff --
    
    For this transferTo and the target.write() below, we will similarly have to store the
actual bytes transferred; additionally, the return value should be the bytes transferred in
this transaction rather than total bytes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message