hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbau...@apache.org
Subject svn commit: r1388181 - /hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
Date Thu, 20 Sep 2012 19:46:54 GMT
Author: mbautin
Date: Thu Sep 20 19:46:54 2012
New Revision: 1388181

URL: http://svn.apache.org/viewvc?rev=1388181&view=rev
Log:
[jira] [HBASE-6732] [89-fb] Reduce scope of synchronized block in HBaseClient.Connection#sendParam

Author: michalgr

Summary: sendParam method synchronizes on out stream (so that only one thread writes to socket).
 Right now it prepares (eg. compress) message under this lock as well. Lock should be taken
only for sending.

Test Plan: Run unit tests in map reduce. Some tests failed. Some of them failed when repeated
on my computer (TestLogSplitOnMasterFailover, TestDistributedLogSplitting). These tests failed
when tested against trunk as well.

Reviewers: mbautin, kranganathan

Reviewed By: kranganathan

CC: Karthik, stack, Kannan, Liyin

Differential Revision: https://reviews.facebook.net/D5259

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1388181&r1=1388180&r2=1388181&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Thu
Sep 20 19:46:54 2012
@@ -510,55 +510,54 @@ public class HBaseClient {
       DataOutputStream outOS = null;
       Compressor compressor = null;
       try {
-        //noinspection SynchronizeOnNonFinalField
-        synchronized (this.out) { // FindBugs IS2_INCONSISTENT_SYNC
-          if (LOG.isDebugEnabled())
-            LOG.debug(getName() + " sending #" + call.id);
-
-          ByteArrayOutputStream baos = new ByteArrayOutputStream();
-          uncompressedOS = new DataOutputStream(baos);
-          outOS = uncompressedOS;
-          try {
-            // 1. write the call id uncompressed
-            uncompressedOS.writeInt(call.id);
-            // 2. write RPC options uncompressed
-            if (call.version >= HBaseServer.VERSION_RPCOPTIONS) {
-              call.options.write(outOS);
-            }
-            // preserve backwards compatibility
-            if (call.options.getTxCompression() != Compression.Algorithm.NONE) {
-              // 3. setup the compressor
-              compressor = call.options.getTxCompression().getCompressor();
-              OutputStream compressedOutputStream =
+        if (LOG.isDebugEnabled())
+          LOG.debug(getName() + " sending #" + call.id);
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        uncompressedOS = new DataOutputStream(baos);
+        outOS = uncompressedOS;
+        try {
+          // 1. write the call id uncompressed
+          uncompressedOS.writeInt(call.id);
+          // 2. write RPC options uncompressed
+          if (call.version >= HBaseServer.VERSION_RPCOPTIONS) {
+            call.options.write(outOS);
+          }
+          // preserve backwards compatibility
+          if (call.options.getTxCompression() != Compression.Algorithm.NONE) {
+            // 3. setup the compressor
+            compressor = call.options.getTxCompression().getCompressor();
+            OutputStream compressedOutputStream =
                 call.options.getTxCompression().createCompressionStream(
-                  uncompressedOS, compressor, 0);
-              outOS = new DataOutputStream(compressedOutputStream);
-            }
-            // 4. write the output params with the correct compression type
-            call.param.write(outOS);
-            outOS.flush();
-            baos.flush();
-            call.startTime = System.currentTimeMillis();
-          } catch (IOException e) {
-            LOG.error("Failed to prepare request in in-mem buffers!", e);
-            markClosed(e);
+                    uncompressedOS, compressor, 0);
+            outOS = new DataOutputStream(compressedOutputStream);
           }
-          byte[] data = baos.toByteArray();
-          int dataLength = data.length;
-          try {
+          // 4. write the output params with the correct compression type
+          call.param.write(outOS);
+          outOS.flush();
+          baos.flush();
+          call.startTime = System.currentTimeMillis();
+        } catch (IOException e) {
+          LOG.error("Failed to prepare request in in-mem buffers!", e);
+          markClosed(e);
+        }
+        byte[] data = baos.toByteArray();
+        int dataLength = data.length;
+        try {
+          synchronized (this.out) {
             out.writeInt(dataLength);      //first put the data length
             writeToSocket(out, data, 0, dataLength);
             out.flush();
-          } catch (IOException e) {
-            // It is not easy to get an exception here.
-            // The read is what always fails. Write gets accepted into
-            // the socket buffer. If the connection is already dead, even
-            // then read gets called first and fails first.
-            IOException rewrittenException =
-                new SyncFailedException("Failed to write to peer");
-            rewrittenException.initCause(e);
-            markClosed(rewrittenException);
           }
+        } catch (IOException e) {
+          // It is not easy to get an exception here.
+          // The read is what always fails. Write gets accepted into
+          // the socket buffer. If the connection is already dead, even
+          // then read gets called first and fails first.
+          IOException rewrittenException =
+              new SyncFailedException("Failed to write to peer");
+          rewrittenException.initCause(e);
+          markClosed(rewrittenException);
         }
       } finally {
         //the buffer is just an in-memory buffer, but it is still polite to



Mime
View raw message