ignite-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [ignite] alex-plekhanov commented on a change in pull request #6595: IGNITE-11685 Java thin client: Handle multiple async requests in parallel
Date Thu, 04 Jul 2019 11:29:55 GMT
alex-plekhanov commented on a change in pull request #6595: IGNITE-11685 Java thin client:
Handle multiple async requests in parallel
URL: https://github.com/apache/ignite/pull/6595#discussion_r300357871
 
 

 ##########
 File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
 ##########
 @@ -141,52 +174,108 @@
 
             write(req.array(), req.position());
         }
+        catch (Throwable t) {
+            pendingReqs.remove(id);
+
+            throw t;
+        }
+        finally {
+            sndLock.unlock();
+        }
 
         return id;
     }
 
     /** {@inheritDoc} */
-    @Override public <T> T receive(ClientOperation op, long reqId, Function<BinaryInputStream,
T> payloadReader)
+    @Override public <T> T receive(ClientOperation op, long reqId, Function<PayloadInputStream,
T> payloadReader)
         throws ClientConnectionException, ClientAuthorizationException {
+        ClientRequestFuture pendingReq = pendingReqs.get(reqId);
 
-        final int MIN_RES_SIZE = 8 + 4; // minimal response size: long (8 bytes) ID + int
(4 bytes) status
+        assert pendingReq != null : "Pending request future not found for request " + reqId;
 
-        int resSize = new BinaryHeapInputStream(read(4)).readInt();
+        try {
+            while (true) {
+                if (rcvLock.tryLock()) {
+                    try {
+                        if (!pendingReq.isDone())
+                            processNextResponse();
+                    }
+                    finally {
+                        rcvLock.unlock();
+                    }
+                }
 
-        if (resSize < 0)
+                try {
+                    byte[] payload = pendingReq.get(PAYLOAD_WAIT_TIMEOUT);
+
+                    if (payload == null || payloadReader == null)
+                        return null;
+
+                    return payloadReader.apply(new PayloadInputStream(this, payload));
+                }
+                catch (IgniteFutureTimeoutCheckedException ignore) {
+                    // Next cycle if timed out.
+                }
+            }
+        }
+        catch (IgniteCheckedException e) {
+            if (e.getCause() instanceof ClientError)
+                throw (ClientError)e.getCause();
+
+            if (e.getCause() instanceof ClientException)
+                throw (ClientException)e.getCause();
+
+            throw new ClientException(e.getMessage(), e);
+        }
+        finally {
+            pendingReqs.remove(reqId);
+        }
+    }
+
+    /**
+     * Process next response from the input stream and complete corresponding future.
+     */
+    private void processNextResponse() throws ClientProtocolError, ClientConnectionException
{
+        int resSize = readInt();
+
+        if (resSize <= 0)
             throw new ClientProtocolError(String.format("Invalid response size: %s", resSize));
 
-        if (resSize == 0)
-            return null;
+        long bytesReadOnStartReq = totalBytesRead;
+
+        long resId = readLong();
+
+        ClientRequestFuture pendingReq = pendingReqs.get(resId);
 
-        BinaryInputStream resIn = new BinaryHeapInputStream(read(MIN_RES_SIZE));
+        if (pendingReq == null)
+            throw new ClientProtocolError(String.format("Unexpected response ID [%s]", resId));
 
-        long resId = resIn.readLong();
+        int status = 0;
 
-        if (resId != reqId)
-            throw new ClientProtocolError(String.format("Unexpected response ID [%s], [%s]
was expected", resId, reqId));
+        BinaryInputStream resIn;
 
-        int status = resIn.readInt();
+        status = readInt();
 
-        if (status != 0) {
-            resIn = new BinaryHeapInputStream(read(resSize - MIN_RES_SIZE));
+        int hdrSize = (int)(totalBytesRead - bytesReadOnStartReq);
 
 Review comment:
   In current protocol version header size is fixed, but in next versions, header size depends
on different conditions (status, topology change, etc), so to calculate header size we need
either to increment the counter each time readLong/readInt/... is called or increment the
counter inside readLong/readInt/... There are much more parameters to read from the header
in next protocol versions, so incrementing counter inline is more error-prone and more verbose.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message