From commits-return-16919-archive-asf-public=cust-asf.ponee.io@iotdb.apache.org Tue Aug 24 03:42:30 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-ec2-va.apache.org (mxout1-ec2-va.apache.org [3.227.148.255]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id C802E180608 for ; Tue, 24 Aug 2021 05:42:29 +0200 (CEST) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-ec2-va.apache.org (ASF Mail Server at mxout1-ec2-va.apache.org) with SMTP id 0D2D63EB94 for ; Tue, 24 Aug 2021 03:42:29 +0000 (UTC) Received: (qmail 15648 invoked by uid 500); 24 Aug 2021 03:42:28 -0000 Mailing-List: contact commits-help@iotdb.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@iotdb.apache.org Delivered-To: mailing list commits@iotdb.apache.org Received: (qmail 15631 invoked by uid 99); 24 Aug 2021 03:42:27 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Aug 2021 03:42:27 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 732AE81EF0; Tue, 24 Aug 2021 03:42:27 +0000 (UTC) Date: Tue, 24 Aug 2021 03:42:23 +0000 To: "commits@iotdb.apache.org" Subject: [iotdb] 01/04: optimize session creation MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: rong@apache.org In-Reply-To: <162977654011.20809.2365043409251466368@gitbox.apache.org> References: <162977654011.20809.2365043409251466368@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: iotdb X-Git-Refname: refs/heads/session-pool-optimization-0.12 X-Git-Reftype: branch X-Git-Rev: 44d3d493a08556ead510cbb4d80fb5ea651a1b42 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20210824034227.732AE81EF0@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch session-pool-optimization-0.12 in repository https://gitbox.apache.org/repos/asf/iotdb.git commit 44d3d493a08556ead510cbb4d80fb5ea651a1b42 Author: Steve Yurong Su AuthorDate: Thu Aug 19 19:43:02 2021 +0800 optimize session creation --- .../org/apache/iotdb/session/pool/SessionPool.java | 140 ++++++++++----------- 1 file changed, 70 insertions(+), 70 deletions(-) diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java index 79a539d..ed04509 100644 --- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java +++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java @@ -174,7 +174,6 @@ public class SessionPool { // if this method throws an exception, either the server is broken, or the ip/port/user/password // is incorrect. - @SuppressWarnings({"squid:S3776", "squid:S2446"}) // Suppress high Cognitive Complexity warning private Session getSession() throws IoTDBConnectionException { Session session = queue.poll(); @@ -183,91 +182,92 @@ public class SessionPool { } if (session != null) { return session; - } else { - long start = System.currentTimeMillis(); - boolean canCreate = false; + } + + boolean shouldCreate = false; + + long start = System.currentTimeMillis(); + while (session == null) { synchronized (this) { if (size < maxSize) { // we can create more session size++; - canCreate = true; + shouldCreate = true; // but we do it after skip synchronized block because connection a session is time // consuming. + break; } - } - if (canCreate) { - // create a new one. - if (logger.isDebugEnabled()) { - logger.debug("Create a new Session {}, {}, {}, {}", ip, port, user, password); - } - session = new Session(ip, port, user, password, fetchSize, zoneId, enableCacheLeader); + + // we have to wait for someone returns a session. try { - session.open(enableCompression); - // avoid someone has called close() the session pool - synchronized (this) { - if (closed) { - // have to release the connection... - session.close(); - throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED); - } else { - return session; - } + if (logger.isDebugEnabled()) { + logger.debug("no more sessions can be created, wait... queue.size={}", queue.size()); } - } catch (IoTDBConnectionException e) { - // if exception, we will throw the exception. - // Meanwhile, we have to set size-- - synchronized (this) { - size--; - // we do not need to notifyAll as any waited thread can continue to work after waked up. - this.notify(); - if (logger.isDebugEnabled()) { - logger.debug("open session failed, reduce the count and notify others..."); + this.wait(1000); + long time = timeout < 60_000 ? timeout : 60_000; + if (System.currentTimeMillis() - start > time) { + logger.warn( + "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}", + (System.currentTimeMillis() - start) / 1000, + ip, + port, + user, + password); + logger.warn( + "current occupied size {}, queue size {}, considered size {} ", + occupied.size(), + queue.size(), + size); + if (System.currentTimeMillis() - start > timeout) { + throw new IoTDBConnectionException( + String.format("timeout to get a connection from %s:%s", ip, port)); } } - throw e; + } catch (InterruptedException e) { + logger.error("the SessionPool is damaged", e); + Thread.currentThread().interrupt(); } - } else { - while (session == null) { - synchronized (this) { - if (closed) { - throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED); - } - // we have to wait for someone returns a session. - try { - if (logger.isDebugEnabled()) { - logger.debug( - "no more sessions can be created, wait... queue.size={}", queue.size()); - } - this.wait(1000); - long time = timeout < 60_000 ? timeout : 60_000; - if (System.currentTimeMillis() - start > time) { - logger.warn( - "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}", - (System.currentTimeMillis() - start) / 1000, - ip, - port, - user, - password); - logger.warn( - "current occupied size {}, queue size {}, considered size {} ", - occupied.size(), - queue.size(), - size); - if (System.currentTimeMillis() - start > timeout) { - throw new IoTDBConnectionException( - String.format("timeout to get a connection from %s:%s", ip, port)); - } - } - } catch (InterruptedException e) { - logger.error("the SessionPool is damaged", e); - Thread.currentThread().interrupt(); - } - session = queue.poll(); + + session = queue.poll(); + + if (closed) { + throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED); + } + } + } + + if (shouldCreate) { + // create a new one. + if (logger.isDebugEnabled()) { + logger.debug("Create a new Session {}, {}, {}, {}", ip, port, user, password); + } + session = new Session(ip, port, user, password, fetchSize, zoneId, enableCacheLeader); + try { + session.open(enableCompression); + // avoid someone has called close() the session pool + synchronized (this) { + if (closed) { + // have to release the connection... + session.close(); + throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED); } } - return session; + } catch (IoTDBConnectionException e) { + // if exception, we will throw the exception. + // Meanwhile, we have to set size-- + synchronized (this) { + size--; + // we do not need to notifyAll as any waited thread can continue to work after waked up. + this.notify(); + if (logger.isDebugEnabled()) { + logger.debug("open session failed, reduce the count and notify others..."); + } + } + throw e; } } + + return session; } public int currentAvailableSize() {