drill-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (DRILL-3714) Query runs out of memory and remains in CANCELLATION_REQUESTED state until drillbit is restarted
Date Tue, 12 Apr 2016 18:56:25 GMT

    [ https://issues.apache.org/jira/browse/DRILL-3714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15237767#comment-15237767
] 

ASF GitHub Bot commented on DRILL-3714:
---------------------------------------

Github user jacques-n commented on a diff in the pull request:

    https://github.com/apache/drill/pull/463#discussion_r59433373
  
    --- Diff: exec/rpc/src/main/java/org/apache/drill/exec/rpc/RequestIdMap.java ---
    @@ -20,51 +20,82 @@
     import io.netty.buffer.ByteBuf;
     import io.netty.channel.ChannelFuture;
     
    -import java.util.Map;
    -import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.atomic.AtomicInteger;
     
     import org.apache.drill.common.exceptions.UserRemoteException;
     import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
     
    +import com.carrotsearch.hppc.IntObjectHashMap;
    +import com.carrotsearch.hppc.procedures.IntObjectProcedure;
    +import com.google.common.base.Preconditions;
    +
     /**
    - * Manages the creation of rpc futures for a particular socket.
    + * Manages the creation of rpc futures for a particular socket <--> socket
    + * connection. Generally speaking, there will be two threads working with this
    + * class (the socket thread and the Request generating thread). Synchronization
    + * is simple with the map being the only thing that is protected. Everything
    + * else works via Atomic variables.
      */
    -public class CoordinationQueue {
    -  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CoordinationQueue.class);
    +class RequestIdMap {
    +  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RequestIdMap.class);
    +
    +  private final AtomicInteger value = new AtomicInteger();
    +  private final AtomicBoolean acceptMessage = new AtomicBoolean(true);
     
    -  private final PositiveAtomicInteger circularInt = new PositiveAtomicInteger();
    -  private final Map<Integer, RpcOutcome<?>> map;
    +  /** Access to map must be protected. **/
    +  private final IntObjectHashMap<RpcOutcome<?>> map;
     
    -  public CoordinationQueue(int segmentSize, int segmentCount) {
    -    map = new ConcurrentHashMap<Integer, RpcOutcome<?>>(segmentSize, 0.75f,
segmentCount);
    +  public RequestIdMap() {
    +    map = new IntObjectHashMap<RpcOutcome<?>>();
       }
     
       void channelClosed(Throwable ex) {
    +    acceptMessage.set(false);
         if (ex != null) {
    -      RpcException e;
    -      if (ex instanceof RpcException) {
    -        e = (RpcException) ex;
    -      } else {
    -        e = new RpcException(ex);
    +      final RpcException e = RpcException.mapException(ex);
    +      synchronized (map) {
    +        map.forEach(new Closer(e));
    +        map.clear();
           }
    -      for (RpcOutcome<?> f : map.values()) {
    -        f.setException(e);
    +    }
    +  }
    +
    +  private class Closer implements IntObjectProcedure<RpcOutcome<?>> {
    +    final RpcException exception;
    +
    +    public Closer(RpcException exception) {
    +      this.exception = exception;
    +    }
    +
    +    @Override
    +    public void apply(int key, RpcOutcome<?> value) {
    +      try{
    +        value.setException(exception);
    +      }catch(Exception e){
    +        logger.warn("Failure while attempting to fail rpc response.", e);
           }
         }
    +
       }
     
    -  public <V> ChannelListenerWithCoordinationId get(RpcOutcomeListener<V>
handler, Class<V> clazz, RemoteConnection connection) {
    -    int i = circularInt.getNext();
    +  public <V> ChannelListenerWithCoordinationId createNewRpcListener(RpcOutcomeListener<V>
handler, Class<V> clazz,
    +      RemoteConnection connection) {
    +    int i = value.incrementAndGet();
         RpcListener<V> future = new RpcListener<V>(handler, clazz, i, connection);
    -    Object old = map.put(i, future);
    -    if (old != null) {
    -      throw new IllegalStateException(
    -          "You attempted to reuse a coordination id when the previous coordination id
has not been removed.  This is likely rpc future callback memory leak.");
    +    final Object old;
    +    synchronized (map) {
    +      Preconditions.checkArgument(acceptMessage.get(),
    +          "Attempted to send a message when connection is no longer valid.");
    +      old = map.put(i, future);
         }
    +    Preconditions.checkArgument(old == null,
    --- End diff --
    
    This is an assertion to ensure that there isn't a bug some place.  


> Query runs out of memory and remains in CANCELLATION_REQUESTED state until drillbit is
restarted
> ------------------------------------------------------------------------------------------------
>
>                 Key: DRILL-3714
>                 URL: https://issues.apache.org/jira/browse/DRILL-3714
>             Project: Apache Drill
>          Issue Type: Bug
>          Components: Execution - Flow
>    Affects Versions: 1.2.0
>            Reporter: Victoria Markman
>            Assignee: Jacques Nadeau
>            Priority: Critical
>             Fix For: 1.7.0
>
>         Attachments: Screen Shot 2015-08-26 at 10.36.33 AM.png, drillbit.log, jstack.txt,
query_profile_2a2210a7-7a78-c774-d54c-c863d0b77bb0.json
>
>
> This is a variation of DRILL-3705 with the difference of drill behavior when hitting
OOM condition.
> Query runs out of memory during execution and remains in "CANCELLATION_REQUESTED" state
until drillbit is bounced.
> Client (sqlline in this case) never gets a response from the server.
> Reproduction details:
>         Single node drillbit installation.
>         DRILL_MAX_DIRECT_MEMORY="8G"
>         DRILL_HEAP="4G"
> Run this query on TPCDS SF100 data set
> {code}
> SELECT SUM(ss.ss_net_paid_inc_tax) OVER (PARTITION BY ss.ss_store_sk) AS TotalSpend FROM
store_sales ss WHERE ss.ss_store_sk IS NOT NULL ORDER BY 1 LIMIT 10;
> {code}
> drillbit.log
> {code}
> 2015-08-26 16:54:58,469 [2a2210a7-7a78-c774-d54c-c863d0b77bb0:frag:3:22] INFO  o.a.d.e.w.f.FragmentStatusReporter
- 2a2210a7-7a78-c774-d54c-c863d0b77bb0:3:22: State to report: RUNNING
> 2015-08-26 16:55:50,498 [BitServer-5] WARN  o.a.drill.exec.rpc.data.DataServer - Message
of mode REQUEST of rpc type 3 took longer than 500ms.  Actual duration was 2569ms.
> 2015-08-26 16:56:31,086 [BitServer-5] ERROR o.a.d.exec.rpc.RpcExceptionHandler - Exception
in RPC communication.  Connection: /10.10.88.133:31012 <--> /10.10.88.133:54554 (data
server).  Closing connection.
> io.netty.handler.codec.DecoderException: java.lang.OutOfMemoryError: Direct buffer memory
>         at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:233)
~[netty-codec-4.0.27.Final.jar:4.0.27.Final]
>         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
[netty-transport-4.0.27.Final.jar:4.0.27.Final]
>         at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
[netty-transport-4.0.27.Final.jar:4.0.27.Final]
>         at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
[netty-transport-4.0.27.Final.jar:4.0.27.Final]
>         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
[netty-transport-4.0.27.Final.jar:4.0.27.Final]
>         at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
[netty-transport-4.0.27.Final.jar:4.0.27.Final]
>         at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
[netty-transport-4.0.27.Final.jar:4.0.27.Final]
>         at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:618)
[netty-transport-native-epoll-4.0.27.Final-linux-x86_64.jar:na]
>         at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:329)
[netty-transport-native-epoll-4.0.27.Final-linux-x86_64.jar:na]
>         at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:250) [netty-transport-native-epoll-4.0.27.Final-linux-x86_64.jar:na]
>         at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
[netty-common-4.0.27.Final.jar:4.0.27.Final]
>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory
>         at java.nio.Bits.reserveMemory(Bits.java:658) ~[na:1.7.0_71]
>         at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) ~[na:1.7.0_71]
>         at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306) ~[na:1.7.0_71]
>         at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:437) ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final]
>         at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:179) ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final]
>         at io.netty.buffer.PoolArena.allocate(PoolArena.java:168) ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final]
>         at io.netty.buffer.PoolArena.reallocate(PoolArena.java:280) ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final]
>         at io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:110) ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final]
>         at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251) ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final]
>         at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849) ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final]
>         at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841) ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final]
>         at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831) ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final]
>         at io.netty.buffer.WrappedByteBuf.writeBytes(WrappedByteBuf.java:600) ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final]
>         at io.netty.buffer.UnsafeDirectLittleEndian.writeBytes(UnsafeDirectLittleEndian.java:28)
~[drill-java-exec-1.2.0-SNAPSHOT.jar:4.0.27.Final]
>         at io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92)
~[netty-codec-4.0.27.Final.jar:4.0.27.Final]
>         at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:227)
~[netty-codec-4.0.27.Final.jar:4.0.27.Final]
>         ... 11 common frames omitted
> 2015-08-26 16:56:31,087 [BitServer-5] INFO  o.a.d.exec.rpc.ProtobufLengthDecoder - Channel
is closed, discarding remaining 124958 byte(s) in buffer.
> 2015-08-26 16:56:31,087 [BitClient-1] ERROR o.a.d.exec.rpc.RpcExceptionHandler - Exception
in RPC communication.  Connection: /10.10.88.133:54554 <--> /10.10.88.133:31012 (data
client).  Closing connection.
> java.io.IOException: syscall:read(...)() failed: Connection reset by peer
> 2015-08-26 16:56:31,088 [BitClient-1] INFO  o.a.drill.exec.rpc.data.DataClient - Channel
closed /10.10.88.133:54554 <--> /10.10.88.133:31012.
> 2015-08-26 16:56:35,325 [BitServer-6] ERROR o.a.d.exec.rpc.RpcExceptionHandler - Exception
in RPC communication.  Connection: /10.10.88.133:31012 <--> /10.10.88.133:54555 (data
server).  Closing connection.
> io.netty.handler.codec.DecoderException: java.lang.OutOfMemoryError: Direct buffer memory
>         at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:233)
~[netty-codec-4.0.27.Final.jar:4.0.27.Final]
>         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
[netty-transport-4.0.27.Final.jar:4.0.27.Final]
>         at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
[netty-transport-4.0.27.Final.jar:4.0.27.Final]
>         at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
[netty-transport-4.0.27.Final.jar:4.0.27.Final]
>         at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
[netty-transport-4.0.27.Final.jar:4.0.27.Final]
>         at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
[netty-transport-4.0.27.Final.jar:4.0.27.Final]
>         at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
[netty-transport-4.0.27.Final.jar:4.0.27.Final]
>         at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:618)
[netty-transport-native-epoll-4.0.27.Final-linux-x86_64.jar:na]
>         at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:329)
[netty-transport-native-epoll-4.0.27.Final-linux-x86_64.jar:na]
>         at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:250) [netty-transport-native-epoll-4.0.27.Final-linux-x86_64.jar:na]
>         at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
[netty-common-4.0.27.Final.jar:4.0.27.Final]
>         at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
> {code}
> Attached:
>         query_profile_2a2210a7-7a78-c774-d54c-c863d0b77bb0.json
>         drillbit.log
>         jstack.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message