livy-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [incubator-livy] wypoon commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of limits on connections.
Date Tue, 10 Mar 2020 00:30:33 GMT
wypoon commented on a change in pull request #284: [LIVY-752][THRIFT] Fix implementation of
limits on connections.
URL: https://github.com/apache/incubator-livy/pull/284#discussion_r390032048
 
 

 ##########
 File path: thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala
 ##########
 @@ -455,21 +443,58 @@ class LivyThriftSessionManager(val server: LivyThriftServer, val livyConf:
LivyC
     }
   }
 
-  // Taken from Hive
-  private def anyViolations(username: String, ipAddress: String): Option[String] = {
-    val userAndAddress = username + ":" + ipAddress
-    if (trackConnectionsPerUser(username) && !withinLimits(username, userLimit))
{
-      Some(s"Connection limit per user reached (user: $username limit: $userLimit)")
-    } else if (trackConnectionsPerIpAddress(ipAddress) &&
-        !withinLimits(ipAddress, ipAddressLimit)) {
-      Some(s"Connection limit per ipaddress reached (ipaddress: $ipAddress limit: " +
-        s"$ipAddressLimit)")
-    } else if (trackConnectionsPerUserIpAddress(username, ipAddress) &&
-        !withinLimits(userAndAddress, userIpAddressLimit)) {
-      Some(s"Connection limit per user:ipaddress reached (user:ipaddress: $userAndAddress
" +
-        s"limit: $userIpAddressLimit)")
-    } else {
-      None
+  // Visible for testing
+  @throws[HiveSQLException]
+  private[thriftserver] def incrementConnections(
+      username: String,
+      ipAddress: String,
+      forwardedAddresses: util.List[String]): Unit = {
+    val clientIpAddress: String = getOriginClientIpAddress(ipAddress, forwardedAddresses)
+    val userAndAddress = username + ":" + clientIpAddress
+    val trackUser = trackConnectionsPerUser(username)
+    val trackIpAddress = trackConnectionsPerIpAddress(clientIpAddress)
+    val trackUserIpAddress = trackConnectionsPerUserIpAddress(username, clientIpAddress)
+    var userLimitExceeded = false
+    var ipAddressLimitExceeded = false
+    var userIpAddressLimitExceeded = false
+
+    // Optimistically increment the counts while getting them to check for violations.
+    if (trackUser) {
+      val userCount = incrementConnectionsCount(username)
+      if (userCount > userLimit) userLimitExceeded = true
+    }
+    if (trackIpAddress) {
+      val ipAddressCount = incrementConnectionsCount(clientIpAddress)
+      if (ipAddressCount > ipAddressLimit) ipAddressLimitExceeded = true
+    }
+    if (trackUserIpAddress) {
+      val userIpAddressCount = incrementConnectionsCount(userAndAddress)
+      if (userIpAddressCount > userIpAddressLimit) userIpAddressLimitExceeded = true
+    }
+
+    // If any limit has been exceeded, we won't be going ahead with the connection,
+    // so decrement all counts that have been incremented.
+    if (userLimitExceeded || ipAddressLimitExceeded || userIpAddressLimitExceeded) {
+      if (trackUser) decrementConnectionsCount(username)
 
 Review comment:
   I think this is clean and easy to reason about. If I understand you correctly, you are
suggesting not calling incrementConnectionsCount on a key if we already have a limit exceeded
so that we don't have to call decrementConnectionsCount on that same key. That leads to more
complex logic and is harder to reason about.
   
   E.g.,
   ```
       // Optimistically increment the counts while getting them to check for violations.
       // If any limit has been exceeded, we won't be going ahead with the connection,
       // so decrement all counts that have been incremented.
       if (trackUser) {
         val userCount = incrementConnectionsCount(username)
         if (userCount > userLimit) {
           userLimitExceeded = true
           decrementConnectionsCount(username)
         }
       }
       if (trackIpAddress && !userLimitExceeded) {
         val ipAddressCount = incrementConnectionsCount(clientIpAddress)
         if (ipAddressCount > ipAddressLimit) {
           ipAddressLimitExceeded = true
           if (trackUser) decrementConnectionsCount(username)
           decrementConnectionsCount(clientIpAddress)
         }
       }
       if (trackUserIpAddress && !userLimitExceeded && !ipAddressLimitExceeded)
{
         val userIpAddressCount = incrementConnectionsCount(userAndAddress)
         if (userIpAddressCount > userIpAddressLimit) {
           userIpAddressLimitExceeded = true
           if (trackUser) decrementConnectionsCount(username)
           if (trackIpAddress) decrementConnectionsCount(clientIpAddress)
           decrementConnectionsCount(userAndAddress)
         }
       }
   ```
   This logic is, imo, more complex and more difficult to reason about (and prove is correct!).
   Therefore I prefer to stick with the existing version.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message