activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuconic <...@git.apache.org>
Subject [GitHub] activemq-artemis pull request #1119: ARTEMIS-1025 OutOfDirectMemoryError rai...
Date Wed, 05 Apr 2017 15:38:00 GMT
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1119#discussion_r109950618
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
---
    @@ -211,143 +251,203 @@ public void run() {
     
        @Override
        public ActiveMQBuffer createTransportBuffer(final int size) {
    -      return new ChannelBufferWrapper(PooledByteBufAllocator.DEFAULT.directBuffer(size),
true);
    +      try {
    +         return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true);
    +      } catch (OutOfMemoryError oom) {
    +         if (ActiveMQClientLogger.LOGGER.isTraceEnabled()) {
    +            final long totalPendingWriteBytes = batchBufferSize(this.channel, this.writeBufferHighWaterMark);
    +            ActiveMQClientLogger.LOGGER.trace("pendingWrites: [NETTY] -> " + totalPendingWriteBytes
+ "[EVENT LOOP] -> " + pendingWritesOnEventLoopView.get() + " causes: " + oom.getMessage(),
oom);
    +         }
    +         throw oom;
    +      }
        }
     
        @Override
    -   public Object getID() {
    +   public final Object getID() {
           // TODO: Think of it
           return channel.hashCode();
        }
     
        // This is called periodically to flush the batch buffer
        @Override
    -   public void checkFlushBatchBuffer() {
    -      if (!batchingEnabled) {
    -         return;
    -      }
    -
    -      if (writeLock.tryAcquire()) {
    -         try {
    -            if (batchBuffer != null && batchBuffer.readable()) {
    -               channel.writeAndFlush(batchBuffer.byteBuf());
    -
    -               batchBuffer = createTransportBuffer(BATCHING_BUFFER_SIZE);
    -            }
    -         } finally {
    -            writeLock.release();
    +   public final void checkFlushBatchBuffer() {
    +      if (this.batchingEnabled) {
    +         //perform the flush only if necessary
    +         final int batchBufferSize = batchBufferSize(this.channel, this.writeBufferHighWaterMark);
    +         if (batchBufferSize > 0) {
    +            this.channel.flush();
              }
           }
        }
     
        @Override
    -   public void write(final ActiveMQBuffer buffer) {
    +   public final void write(final ActiveMQBuffer buffer) {
           write(buffer, false, false);
        }
     
        @Override
    -   public void write(ActiveMQBuffer buffer, final boolean flush, final boolean batched)
{
    +   public final void write(ActiveMQBuffer buffer, final boolean flush, final boolean
batched) {
           write(buffer, flush, batched, null);
        }
     
        @Override
    -   public void write(ActiveMQBuffer buffer,
    -                     final boolean flush,
    -                     final boolean batched,
    -                     final ChannelFutureListener futureListener) {
    -
    -      try {
    -         writeLock.acquire();
    -
    -         try {
    -            if (batchBuffer == null && batchingEnabled && batched &&
!flush) {
    -               // Lazily create batch buffer
    -
    -               batchBuffer = ActiveMQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
    -            }
    -
    -            if (batchBuffer != null) {
    -               batchBuffer.writeBytes(buffer, 0, buffer.writerIndex());
    -
    -               if (batchBuffer.writerIndex() >= BATCHING_BUFFER_SIZE || !batched ||
flush) {
    -                  // If the batch buffer is full or it's flush param or not batched then
flush the buffer
    -
    -                  buffer = batchBuffer;
    -               } else {
    -                  return;
    -               }
    -
    -               if (!batched || flush) {
    -                  batchBuffer = null;
    -               } else {
    -                  // Create a new buffer
    +   public final boolean blockUntilWritable(final int requiredCapacity, final long timeout,
final TimeUnit timeUnit) {
    +      final boolean isAllowedToBlock = isAllowedToBlock();
    +      if (!isAllowedToBlock) {
    +         return canWrite(requiredCapacity);
    +      } else {
    +         final long timeoutNanos = timeUnit.toNanos(timeout);
    +         final long deadline = System.nanoTime() + timeoutNanos;
    +         //choose wait time unit size
    +         final long parkNanos;
    +         //if is requested to wait more than a millisecond than we could use
    +         if (timeoutNanos >= 1_000_000L) {
    +            parkNanos = 100_000L;
    +         } else {
    +            //reduce it doesn't make sense, only a spin loop could be enough precise
with the most OS
    +            parkNanos = 1000L;
    +         }
    +         boolean canWrite;
    +         while (!(canWrite = canWrite(requiredCapacity)) && System.nanoTime()
< deadline) {
    +            LockSupport.parkNanos(parkNanos);
    +         }
    +         return canWrite;
    +      }
    +   }
     
    -                  batchBuffer = ActiveMQBuffers.dynamicBuffer(BATCHING_BUFFER_SIZE);
    -               }
    -            }
    +   private boolean isAllowedToBlock() {
    +      final EventLoop eventLoop = channel.eventLoop();
    +      final boolean inEventLoop = eventLoop.inEventLoop();
    --- End diff --
    
    It's not a blocker for this merge.. it was just a question.. would it return inevenLoop
if it's on the evenLoop for another connection?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message