activemq-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (ARTEMIS-1025) OutOfDirectMemoryError raised from Netty
Date Wed, 05 Apr 2017 15:38:41 GMT

    [ https://issues.apache.org/jira/browse/ARTEMIS-1025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957046#comment-15957046
] 

ASF GitHub Bot commented on ARTEMIS-1025:
-----------------------------------------

Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1119#discussion_r109950618
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
---
    @@ -211,143 +251,203 @@ public void run() {
     
        @Override
        public ActiveMQBuffer createTransportBuffer(final int size) {
    -      return new ChannelBufferWrapper(PooledByteBufAllocator.DEFAULT.directBuffer(size),
true);
    +      try {
    +         return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true);
    +      } catch (OutOfMemoryError oom) {
    +         if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
    +            final long totalPendingWriteBytes = batchBufferSize(this.channel, this.writeBufferHighWaterMark);
    +            ActiveMQClientLogger.LOGGER.trace("pendingWrites: [NETTY] -> " + totalPendingWriteBytes
+ "[EVENT LOOP] -> " + pendingWritesOnEventLoopView.get() + " causes: " + oom.getMessage(),
oom);
    +         }
    +         throw oom;
    +      }
        }
     
        @Override
    -   public Object getID() {
    +   public final Object getID() {
           // TODO: Think of it
           return channel.hashCode();
        }
     
        // This is called periodically to flush the batch buffer
        @Override
    -   public void checkFlushBatchBuffer() {
    -      if (!batchingEnabled) {
    -         return;
    -      }
    -
    -      if (writeLock.tryAcquire()) {
    -         try {
    -            if (batchBuffer != null && batchBuffer.readable()) {
    -               channel.writeAndFlush(batchBuffer.byteBuf());
    -
    -               batchBuffer = createTransportBuffer(BATCHING_BUFFER_SIZE);
    -            }
    -         } finally {
    -            writeLock.release();
    +   public final void checkFlushBatchBuffer() {
    +      if (this.batchingEnabled) {
    +         //perform the flush only if necessary
    +         final int batchBufferSize = batchBufferSize(this.channel, this.writeBufferHighWaterMark);
    +         if (batchBufferSize > 0) {
    +            this.channel.flush();
              }
           }
        }
     
        @Override
    -   public void write(final ActiveMQBuffer buffer) {
    +   public final void write(final ActiveMQBuffer buffer) {
           write(buffer, false, false);
        }
     
        @Override
    -   public void write(ActiveMQBuffer buffer, final boolean flush, final boolean batched)
{
    +   public final void write(ActiveMQBuffer buffer, final boolean flush, final boolean
batched) {
           write(buffer, flush, batched, null);
        }
     
        @Override
    -   public void write(ActiveMQBuffer buffer,
    -                     final boolean flush,
    -                     final boolean batched,
    -                     final ChannelFutureListener futureListener) {
    -
    -      try {
    -         writeLock.acquire();
    -
    -         try {
    -            if (batchBuffer == null && batchingEnabled && batched &&
!flush) {
    -               // Lazily create batch buffer
    -
    -               batchBuffer = ActiveMQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
    -            }
    -
    -            if (batchBuffer != null) {
    -               batchBuffer.writeBytes(buffer, 0, buffer.writerIndex());
    -
    -               if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || !batched ||
flush) {
    -                  // If the batch buffer is full or it's flush param or not batched then
flush the buffer
    -
    -                  buffer = batchBuffer;
    -               } else {
    -                  return;
    -               }
    -
    -               if (!batched || flush) {
    -                  batchBuffer = null;
    -               } else {
    -                  // Create a new buffer
    +   public final boolean blockUntilWritable(final int requiredCapacity, final long timeout,
final TimeUnit timeUnit) {
    +      final boolean isAllowedToBlock = isAllowedToBlock();
    +      if (!isAllowedToBlock) {
    +         return canWrite(requiredCapacity);
    +      } else {
    +         final long timeoutNanos = timeUnit.toNanos(timeout);
    +         final long deadline = System.nanoTime() + timeoutNanos;
    +         //choose wait time unit size
    +         final long parkNanos;
    +         //if is requested to wait more than a millisecond than we could use
    +         if (timeoutNanos >= 1_000_000L) {
    +            parkNanos = 100_000L;
    +         } else {
    +            //reduce it doesn't make sense, only a spin loop could be enough precise
with the most OS
    +            parkNanos = 1000L;
    +         }
    +         boolean canWrite;
    +         while (!(canWrite = canWrite(requiredCapacity)) && System.nanoTime()
< deadline) {
    +            LockSupport.parkNanos(parkNanos);
    +         }
    +         return canWrite;
    +      }
    +   }
     
    -                  batchBuffer = ActiveMQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
    -               }
    -            }
    +   private boolean isAllowedToBlock() {
    +      final EventLoop eventLoop = channel.eventLoop();
    +      final boolean inEventLoop = eventLoop.inEventLoop();
    --- End diff --
    
    It's not a blocker for this merge.. it was just a question.. would it return inevenLoop
if it's on the evenLoop for another connection?


> OutOfDirectMemoryError raised from Netty
> ----------------------------------------
>
>                 Key: ARTEMIS-1025
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-1025
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>          Components: Broker
>            Reporter: Francesco Nigro
>            Assignee: Francesco Nigro
>
> If you send and receive a lot of messages in short time to Artemis via Netty connector,
the OutOfDirectMemoryError exception is thrown from the client.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message