From commits-return-51477-archive-asf-public=cust-asf.ponee.io@pulsar.apache.org Mon Mar 30 22:04:09 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id A13C6180634 for ; Tue, 31 Mar 2020 00:04:08 +0200 (CEST) Received: (qmail 32006 invoked by uid 500); 30 Mar 2020 22:04:08 -0000 Mailing-List: contact commits-help@pulsar.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pulsar.apache.org Delivered-To: mailing list commits@pulsar.apache.org Received: (qmail 31997 invoked by uid 99); 30 Mar 2020 22:04:07 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 30 Mar 2020 22:04:07 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id D1EF9819F7; Mon, 30 Mar 2020 22:04:07 +0000 (UTC) Date: Mon, 30 Mar 2020 22:04:07 +0000 To: "commits@pulsar.apache.org" Subject: [pulsar] branch master updated: [pulsar-broker] add flag to skip broker shutdown on transient OOM (#6634) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <158560584679.30731.12507727804850186895@gitbox.apache.org> From: sijie@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: pulsar X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 7bd990a9ba3694d36b9b7eedb6d893f19d25227e X-Git-Newrev: 2ea2798219945e7898ca1145e9d1d45fe682980c X-Git-Rev: 2ea2798219945e7898ca1145e9d1d45fe682980c X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git The following commit(s) were added to refs/heads/master by this push: new 2ea2798 [pulsar-broker] add flag to skip broker shutdown on transient OOM (#6634) 2ea2798 is described below commit 2ea2798219945e7898ca1145e9d1d45fe682980c Author: Rajan Dhabalia AuthorDate: Mon Mar 30 14:59:12 2020 -0700 [pulsar-broker] add flag to skip broker shutdown on transient OOM (#6634) ### Motivation Some time due to high dispatch rate on one of the topic can temporarily cause broker to go OOM and it will be transient error and broker can recover within a few seconds as soon as some memory gets released. However, 2.4 release has change #4196 which restarts broker on OOM which can cause huge instability in cluster where that topic moves from one broker to another and restarts multiple brokers and cause disruption for other topics as well. we have seen similar kind of issue mentione [...] ``` 01:48:49.549 [pulsar-io-22-37] ERROR org.apache.pulsar.PulsarBrokerStarter - -- Shutting down - Received OOM exception: Direct buffer memory java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?] at java.nio.DirectByteBuffer.(DirectByteBuffer.java:118) ~[?:?] at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) ~[?:?] at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:769) ~[netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:745) ~[netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244) ~[netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.buffer.PoolArena.allocate(PoolArena.java:226) ~[netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.buffer.PoolArena.allocate(PoolArena.java:146) ~[netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324) ~[netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) ~[netty-all-4.1.32.Final.jar:4.1.32.Final] at org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:164) ~[bookkeeper-common-allocator-4.9.4.2-yahoo.jar:4.9.4.2-yahoo] at org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:158) ~[bookkeeper-common-allocator-4.9.4.2-yahoo.jar:4.9.4.2-yahoo] at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) ~[netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176) ~[netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.handler.ssl.SslHandler.allocate(SslHandler.java:1912) ~[netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.handler.ssl.SslHandler.allocateOutNetBuf(SslHandler.java:1923) ~[netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:826) ~[netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:797) ~[netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.handler.ssl.SslHandler.flush(SslHandler.java:778) ~[netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) ~[netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802) ~[netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814) ~[netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794) ~[netty-all-4.1.32.Final.jar:4.1.32.Final] at org.apache.pulsar.broker.service.Consumer.lambda$sendMessages$51(Consumer.java:265) ~[pulsar-broker-2.4.6-yahoo.jar:2.4.6-yahoo] at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) [netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) [netty-all-4.1.32.Final.jar:4.1.32. : : 01:48:49.549 [pulsar-io-22-39] ERROR org.apache.pulsar.PulsarBrokerStarter - -- Shutting down - Received OOM exception: Direct buffer memory java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?] at java.nio.DirectByteBuffer.(DirectByteBuffer.java:118) ~[?:?] at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) ~[?:?] at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:769) ~[netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:745) ~[netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:244) ~[netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.buffer.PoolArena.allocate(PoolArena.java:226) ~[netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.buffer.PoolArena.allocate(PoolArena.java:146) ~[netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:324) ~[netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) ~[netty-all-4.1.32.Final.jar:4.1.32.Final] at org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:164) [bookkeeper-common-allocator-4.9.4.2-ya hoo.jar:4.9.4.2-yahoo] at org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:158) [bookkeeper-common-allocator-4.9.4.2-yahoo.jar:4.9.4.2-yahoo] at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185) [netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176) [netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.channel.unix.PreferredDirectByteBufAllocator.ioBuffer(PreferredDirectByteBufAllocator.java:53) [netty-all-4.1.32.Final.jar:4.1.32.Final] at io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114) [netty-all-4.1.32.Final.jar:4.1.32.Final] ``` ### Modification Add dynamic flag to avoid broker shutdown on OOM. --- conf/broker.conf | 3 +++ conf/standalone.conf | 3 +++ deployment/terraform-ansible/templates/broker.conf | 3 +++ .../main/java/org/apache/pulsar/broker/ServiceConfiguration.java | 6 ++++++ .../src/main/java/org/apache/pulsar/PulsarBrokerStarter.java | 8 ++++++-- site2/docs/reference-configuration.md | 2 ++ 6 files changed, 23 insertions(+), 2 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 1a96ff1..2ded719 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -67,6 +67,9 @@ zooKeeperOperationTimeoutSeconds=30 # Time to wait for broker graceful shutdown. After this time elapses, the process will be killed brokerShutdownTimeoutMs=60000 +# Flag to skip broker shutdown when broker handles Out of memory error +skipBrokerShutdownOnOOM=false + # Enable backlog quota check. Enforces action on topic when the quota is reached backlogQuotaCheckEnabled=true diff --git a/conf/standalone.conf b/conf/standalone.conf index cf32ba3..9b3b615 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -57,6 +57,9 @@ zooKeeperOperationTimeoutSeconds=30 # Time to wait for broker graceful shutdown. After this time elapses, the process will be killed brokerShutdownTimeoutMs=60000 +# Flag to skip broker shutdown when broker handles Out of memory error +skipBrokerShutdownOnOOM=false + # Enable backlog quota check. Enforces action on topic when the quota is reached backlogQuotaCheckEnabled=true diff --git a/deployment/terraform-ansible/templates/broker.conf b/deployment/terraform-ansible/templates/broker.conf index 1f78a35..8da1a4a 100644 --- a/deployment/terraform-ansible/templates/broker.conf +++ b/deployment/terraform-ansible/templates/broker.conf @@ -55,6 +55,9 @@ zooKeeperSessionTimeoutMillis=30000 # Time to wait for broker graceful shutdown. After this time elapses, the process will be killed brokerShutdownTimeoutMs=60000 +# Flag to skip broker shutdown when broker handles Out of memory error +skipBrokerShutdownOnOOM=false + # Enable backlog quota check. Enforces action on topic when the quota is reached backlogQuotaCheckEnabled=true diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 190170d..239442e 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -213,6 +213,12 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "Time to wait for broker graceful shutdown. After this time elapses, the process will be killed" ) private long brokerShutdownTimeoutMs = 60000; + @FieldContext( + category = CATEGORY_SERVER, + dynamic = true, + doc = "Flag to skip broker shutdown when broker handles Out of memory error" + ) + private boolean skipBrokerShutdownOnOOM = false; @FieldContext( category = CATEGORY_POLICIES, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java index 37c5a1f..42a2cf1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java @@ -331,8 +331,12 @@ public class PulsarBrokerStarter { ); PulsarByteBufAllocator.registerOOMListener(oomException -> { - log.error("-- Shutting down - Received OOM exception: {}", oomException.getMessage(), oomException); - starter.shutdown(); + if (starter.brokerConfig.isSkipBrokerShutdownOnOOM()) { + log.error("-- Received OOM exception: {}", oomException.getMessage(), oomException); + } else { + log.error("-- Shutting down - Received OOM exception: {}", oomException.getMessage(), oomException); + starter.shutdown(); + } }); try { diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md index edc3f99..c4ed57a 100644 --- a/site2/docs/reference-configuration.md +++ b/site2/docs/reference-configuration.md @@ -126,6 +126,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di |dispatchThrottlingRatePerReplicatorInByte| The default bytes per second dispatch throttling-limit for every replicator in replication. The value of `0` means disabling replication message-byte dispatch-throttling| 0 | |zooKeeperSessionTimeoutMillis| Zookeeper session timeout in milliseconds |30000| |brokerShutdownTimeoutMs| Time to wait for broker graceful shutdown. After this time elapses, the process will be killed |60000| +|skipBrokerShutdownOnOOM| Flag to skip broker shutdown when broker handles Out of memory error. |false| |backlogQuotaCheckEnabled| Enable backlog quota check. Enforces action on topic when the quota is reached |true| |backlogQuotaCheckIntervalInSeconds| How often to check for topics that have reached the quota |60| |backlogQuotaDefaultLimitGB| The default per-topic backlog quota limit | -1 | @@ -335,6 +336,7 @@ The [`pulsar-client`](reference-cli-tools.md#pulsar-client) CLI tool can be used |clusterName| The name of the cluster that this broker belongs to. |standalone| |zooKeeperSessionTimeoutMillis| The ZooKeeper session timeout, in milliseconds. |30000| |brokerShutdownTimeoutMs| The time to wait for graceful broker shutdown. After this time elapses, the process will be killed. |60000| +|skipBrokerShutdownOnOOM| Flag to skip broker shutdown when broker handles Out of memory error. |false| |backlogQuotaCheckEnabled| Enable the backlog quota check, which enforces a specified action when the quota is reached. |true| |backlogQuotaCheckIntervalInSeconds| How often to check for topics that have reached the backlog quota. |60| |backlogQuotaDefaultLimitGB| The default per-topic backlog quota limit. |10|