cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sn...@apache.org
Subject cassandra git commit: fix 2.2 eclipse-warnings
Date Fri, 27 Nov 2015 10:43:07 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 61e0251a1 -> f8fc0311b


fix 2.2 eclipse-warnings

patch by Ariel Weisberg; reviewed by Robert Stupp for CASSANDRA-9800


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f8fc0311
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f8fc0311
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f8fc0311

Branch: refs/heads/cassandra-2.2
Commit: f8fc0311b65b3d82737352f3d01483c0334a6867
Parents: 61e0251
Author: Ariel Weisberg <ariel.weisberg@datastax.com>
Authored: Fri Nov 27 11:40:16 2015 +0100
Committer: Robert Stupp <snazy@snazy.de>
Committed: Fri Nov 27 11:40:16 2015 +0100

----------------------------------------------------------------------
 .../apache/cassandra/cache/AutoSavingCache.java |   1 +
 .../db/WindowsFailedSnapshotTracker.java        |  41 ++---
 .../db/commitlog/CommitLogReplayer.java         |   3 +-
 .../hadoop/AbstractColumnFamilyInputFormat.java |   4 +-
 .../cassandra/hadoop/cql3/CqlRecordWriter.java  | 160 +++++++++++--------
 .../cassandra/hadoop/pig/CqlNativeStorage.java  |   6 +-
 .../io/util/ChecksummedRandomAccessReader.java  |  29 +++-
 .../apache/cassandra/io/util/SegmentedFile.java |   1 +
 .../cassandra/net/IncomingTcpConnection.java    |   3 +-
 9 files changed, 149 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index c08925d..2c6820e 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -318,6 +318,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
             return info.forProgress(keysWritten, Math.max(keysWritten, keysEstimate));
         }
 
+        @SuppressWarnings("resource")
         public void saveCache()
         {
             logger.trace("Deleting old {} files.", cacheType);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/db/WindowsFailedSnapshotTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/WindowsFailedSnapshotTracker.java b/src/java/org/apache/cassandra/db/WindowsFailedSnapshotTracker.java
index 9e6bb47..7cc7893 100644
--- a/src/java/org/apache/cassandra/db/WindowsFailedSnapshotTracker.java
+++ b/src/java/org/apache/cassandra/db/WindowsFailedSnapshotTracker.java
@@ -52,32 +52,33 @@ public class WindowsFailedSnapshotTracker
         {
             try
             {
-                BufferedReader reader = new BufferedReader(new FileReader(TODELETEFILE));
-                String snapshotDirectory;
-                while ((snapshotDirectory = reader.readLine()) != null)
+                try (BufferedReader reader = new BufferedReader(new FileReader(TODELETEFILE)))
                 {
-                    File f = new File(snapshotDirectory);
+                    String snapshotDirectory;
+                    while ((snapshotDirectory = reader.readLine()) != null)
+                    {
+                        File f = new File(snapshotDirectory);
 
-                    // Skip folders that aren't a subset of temp or a data folder. We don't
want people to accidentally
-                    // delete something important by virtue of adding something invalid to
the .toDelete file.
-                    boolean validFolder = FileUtils.isSubDirectory(new File(System.getenv("TEMP")),
f);
-                    for (String s : DatabaseDescriptor.getAllDataFileLocations())
-                        validFolder |= FileUtils.isSubDirectory(new File(s), f);
+                        // Skip folders that aren't a subset of temp or a data folder. We
don't want people to accidentally
+                        // delete something important by virtue of adding something invalid
to the .toDelete file.
+                        boolean validFolder = FileUtils.isSubDirectory(new File(System.getenv("TEMP")),
f);
+                        for (String s : DatabaseDescriptor.getAllDataFileLocations())
+                            validFolder |= FileUtils.isSubDirectory(new File(s), f);
 
-                    if (!validFolder)
-                    {
-                        logger.warn("Skipping invalid directory found in .toDelete: {}. Only
%TEMP% or data file subdirectories are valid.", f);
-                        continue;
-                    }
+                        if (!validFolder)
+                        {
+                            logger.warn("Skipping invalid directory found in .toDelete: {}.
Only %TEMP% or data file subdirectories are valid.", f);
+                            continue;
+                        }
 
-                    // Could be a non-existent directory if deletion worked on previous JVM
shutdown.
-                    if (f.exists())
-                    {
-                        logger.warn("Discovered obsolete snapshot. Deleting directory [{}]",
snapshotDirectory);
-                        FileUtils.deleteRecursive(new File(snapshotDirectory));
+                        // Could be a non-existent directory if deletion worked on previous
JVM shutdown.
+                        if (f.exists())
+                        {
+                            logger.warn("Discovered obsolete snapshot. Deleting directory
[{}]", snapshotDirectory);
+                            FileUtils.deleteRecursive(new File(snapshotDirectory));
+                        }
                     }
                 }
-                reader.close();
 
                 // Only delete the old .toDelete file if we succeed in deleting all our known
bad snapshots.
                 Files.delete(Paths.get(TODELETEFILE));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index cb02a8c..98fb556 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -197,7 +197,7 @@ public class CommitLogReplayer
         }
         return end;
     }
-    
+
     abstract static class ReplayFilter
     {
         public abstract Iterable<ColumnFamily> filter(Mutation mutation);
@@ -273,6 +273,7 @@ public class CommitLogReplayer
         }
     }
 
+    @SuppressWarnings("resource")
     public void recover(File file, boolean tolerateTruncation) throws IOException
     {
         CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
index 148c08a..3c088c2 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
@@ -117,9 +117,9 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends
InputFormat<
             }
         }
 
-        try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","),
conf))
+        try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","),
conf);
+             Session session = cluster.connect())
         {
-            Session session = cluster.connect();
             Metadata metadata = session.getCluster().getMetadata();
 
             for (TokenRange range : masterRangeNodes.keySet())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index 14e24fb..84102a5 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.util.Progressable;
 /**
  * The <code>CqlRecordWriter</code> maps the output &lt;key, value&gt;
  * pairs to a Cassandra table. In particular, it applies the binded variables
- * in the value to the prepared statement, which it associates with the key, and in 
+ * in the value to the prepared statement, which it associates with the key, and in
  * turn the responsible endpoint.
  *
  * <p>
@@ -112,11 +112,11 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>,
List<ByteBuf
         this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors());
         batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
         this.clients = new HashMap<>();
+        String keyspace = ConfigHelper.getOutputKeyspace(conf);
 
-        try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf),
conf))
+        try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf),
conf);
+             Session client = cluster.connect(keyspace))
         {
-            String keyspace = ConfigHelper.getOutputKeyspace(conf);
-            Session client = cluster.connect(keyspace);
             ringCache = new NativeRingCache(conf);
             if (client != null)
             {
@@ -179,7 +179,7 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>,
List<ByteBuf
         if (clientException != null)
             throw clientException;
     }
-    
+
     /**
      * If the key is to be associated with a valid value, a mutation is created
      * for it with the given table and columns. In the event the value
@@ -225,6 +225,20 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>,
List<ByteBuf
             HadoopCompat.progress(context);
     }
 
+    private static void closeSession(Session session)
+    {
+        //Close the session to satisfy to avoid warnings for the resource not being closed
+        try
+        {
+            if (session != null)
+                session.close();
+        }
+        catch (Throwable t)
+        {
+            logger.warn("Error closing connection", t);
+        }
+    }
+
     /**
      * A client that runs in a threadpool and connects to the list of endpoints for a particular
      * range. Bound variables for keys in that range are sent to this client via a queue.
@@ -273,94 +287,104 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>,
List<ByteBuf
                 }
             }
         }
-        
+
         /**
          * Loops collecting cql binded variable values from the queue and sending to Cassandra
          */
+        @SuppressWarnings("resource")
         public void run()
         {
             Session session = null;
-            outer:
-            while (run || !queue.isEmpty())
+            try
             {
-                List<ByteBuffer> bindVariables;
-                try
+                outer:
+                while (run || !queue.isEmpty())
                 {
-                    bindVariables = queue.take();
-                }
-                catch (InterruptedException e)
-                {
-                    // re-check loop condition after interrupt
-                    continue;
-                }
+                    List<ByteBuffer> bindVariables;
+                    try
+                    {
+                        bindVariables = queue.take();
+                    }
+                    catch (InterruptedException e)
+                    {
+                        // re-check loop condition after interrupt
+                        continue;
+                    }
 
-                ListIterator<InetAddress> iter = endpoints.listIterator();
-                while (true)
-                {
-                    // send the mutation to the last-used endpoint.  first time through,
this will NPE harmlessly.
-                    if (session != null)
+                    ListIterator<InetAddress> iter = endpoints.listIterator();
+                    while (true)
                     {
-                        try
+                        // send the mutation to the last-used endpoint.  first time through,
this will NPE harmlessly.
+                        if (session != null)
                         {
-                            int i = 0;
-                            PreparedStatement statement = preparedStatement(session);
-                            while (bindVariables != null)
+                            try
                             {
-                                BoundStatement boundStatement = new BoundStatement(statement);
-                                for (int columnPosition = 0; columnPosition < bindVariables.size();
columnPosition++)
+                                int i = 0;
+                                PreparedStatement statement = preparedStatement(session);
+                                while (bindVariables != null)
                                 {
-                                    boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition));
+                                    BoundStatement boundStatement = new BoundStatement(statement);
+                                    for (int columnPosition = 0; columnPosition < bindVariables.size();
columnPosition++)
+                                    {
+                                        boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition));
+                                    }
+                                    session.execute(boundStatement);
+                                    i++;
+
+                                    if (i >= batchThreshold)
+                                        break;
+                                    bindVariables = queue.poll();
                                 }
-                                session.execute(boundStatement);
-                                i++;
-
-                                if (i >= batchThreshold)
-                                    break;
-                                bindVariables = queue.poll();
+                                break;
                             }
-                            break;
+                            catch (Exception e)
+                            {
+                                closeInternal();
+                                if (!iter.hasNext())
+                                {
+                                    lastException = new IOException(e);
+                                    break outer;
+                                }
+                            }
+                        }
+
+                        // attempt to connect to a different endpoint
+                        try
+                        {
+                            InetAddress address = iter.next();
+                            String host = address.getHostName();
+                            cluster = CqlConfigHelper.getOutputCluster(host, conf);
+                            closeSession(session);
+                            session = cluster.connect();
                         }
                         catch (Exception e)
                         {
+                            //If connection died due to Interrupt, just try connecting to
the endpoint again.
+                            //There are too many ways for the Thread.interrupted() state
to be cleared, so
+                            //we can't rely on that here. Until the java driver gives us
a better way of knowing
+                            //that this exception came from an InterruptedException, this
is the best solution.
+                            if (canRetryDriverConnection(e))
+                            {
+                                iter.previous();
+                            }
                             closeInternal();
-                            if (!iter.hasNext())
+
+                            // Most exceptions mean something unexpected went wrong to that
endpoint, so
+                            // we should try again to another.  Other exceptions (auth or
invalid request) are fatal.
+                            if ((e instanceof AuthenticationException || e instanceof InvalidQueryException)
|| !iter.hasNext())
                             {
                                 lastException = new IOException(e);
                                 break outer;
                             }
-                        }                        
-                    }
-
-                    // attempt to connect to a different endpoint
-                    try
-                    {
-                        InetAddress address = iter.next();
-                        String host = address.getHostName();
-                        cluster = CqlConfigHelper.getOutputCluster(host, conf);
-                        session = cluster.connect();
-                    }
-                    catch (Exception e)
-                    {
-                        //If connection died due to Interrupt, just try connecting to the
endpoint again.
-                        //There are too many ways for the Thread.interrupted() state to be
cleared, so
-                        //we can't rely on that here. Until the java driver gives us a better
way of knowing
-                        //that this exception came from an InterruptedException, this is
the best solution.
-                        if (canRetryDriverConnection(e))
-                        {
-                            iter.previous();
-                        }
-                        closeInternal();
-
-                        // Most exceptions mean something unexpected went wrong to that endpoint,
so
-                        // we should try again to another.  Other exceptions (auth or invalid
request) are fatal.
-                        if ((e instanceof AuthenticationException || e instanceof InvalidQueryException)
|| !iter.hasNext())
-                        {
-                            lastException = new IOException(e);
-                            break outer;
                         }
                     }
                 }
             }
+            finally
+            {
+                closeSession(session);
+            }
+
             // close all our connections once we are done.
             closeInternal();
         }
@@ -489,9 +513,9 @@ class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>,
List<ByteBuf
         private void refreshEndpointMap()
         {
             String keyspace = ConfigHelper.getOutputKeyspace(conf);
-            try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf),
conf))
+            try (Cluster cluster = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf),
conf);
+                 Session session = cluster.connect(keyspace))
             {
-                Session session = cluster.connect(keyspace);
                 rangeMap = new HashMap<>();
                 metadata = session.getCluster().getMetadata();
                 Set<TokenRange> ranges = metadata.getTokenRanges();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index 74058b1..8831cf2 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -690,7 +690,7 @@ public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface,
Lo
             else
                 throw new IOException("bulk_insert_statement is missing in input url parameter");
             if (bulkTableAlias != null)
-                CqlBulkOutputFormat.setTableAlias(conf, bulkTableAlias, column_family); 
+                CqlBulkOutputFormat.setTableAlias(conf, bulkTableAlias, column_family);
             CqlBulkOutputFormat.setDeleteSourceOnSuccess(conf, bulkDeleteSourceOnSuccess);
             if (bulkOutputLocation != null)
                 conf.set(CqlBulkRecordWriter.OUTPUT_LOCATION, bulkOutputLocation);
@@ -724,9 +724,9 @@ public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface,
Lo
         // Only get the schema if we haven't already gotten it
         if (!properties.containsKey(signature))
         {
-            try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf),
conf))
+            try (Cluster cluster = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf),
conf);
+                 Session client = cluster.connect())
             {
-                Session client = cluster.connect();
                 client.execute("USE " + keyspace);
 
                 // compose the CfDef for the columfamily

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
index 442236d..9015b61 100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java
@@ -48,15 +48,36 @@ public class ChecksummedRandomAccessReader extends RandomAccessReader
         this.file = file;
     }
 
+    @SuppressWarnings("resource")
     public static ChecksummedRandomAccessReader open(File file, File crcFile) throws IOException
     {
         try (ChannelProxy channel = new ChannelProxy(file))
         {
             RandomAccessReader crcReader = RandomAccessReader.open(crcFile);
-            @SuppressWarnings("resource")
-            DataIntegrityMetadata.ChecksumValidator validator =
-                new DataIntegrityMetadata.ChecksumValidator(new Adler32(), crcReader, file.getPath());
-            return new ChecksummedRandomAccessReader(file, channel, validator);
+            boolean closeCrcReader = true;
+            try
+            {
+                DataIntegrityMetadata.ChecksumValidator validator =
+                        new DataIntegrityMetadata.ChecksumValidator(new Adler32(), crcReader,
file.getPath());
+                closeCrcReader = false;
+                boolean closeValidator = true;
+                try
+                {
+                    ChecksummedRandomAccessReader retval = new ChecksummedRandomAccessReader(file,
channel, validator);
+                    closeValidator = false;
+                    return retval;
+                }
+                finally
+                {
+                    if (closeValidator)
+                        validator.close();
+                }
+            }
+            finally
+            {
+                if (closeCrcReader)
+                    crcReader.close();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index 30707d8..553cc0d 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -179,6 +179,7 @@ public abstract class SegmentedFile extends SharedCloseableImpl
             return complete(path, -1L);
         }
 
+        @SuppressWarnings("resource")
         public SegmentedFile complete(String path, long overrideLength)
         {
             ChannelProxy channelCopy = getChannel(path);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8fc0311/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
index f6652b0..a972114 100644
--- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
@@ -108,7 +108,7 @@ public class IncomingTcpConnection extends Thread implements Closeable
             close();
         }
     }
-    
+
     @Override
     public void close()
     {
@@ -164,6 +164,7 @@ public class IncomingTcpConnection extends Thread implements Closeable
         }
         else
         {
+            @SuppressWarnings("resource")
             ReadableByteChannel channel = socket.getChannel();
             in = new NIODataInputStream(channel != null ? channel : Channels.newChannel(socket.getInputStream()),
BUFFER_SIZE);
         }


Mime
View raw message