cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gdusba...@apache.org
Subject svn commit: r944344 - in /cassandra/trunk/src/java/org/apache/cassandra/streaming: IncomingStreamReader.java StreamOut.java StreamOutManager.java StreamingService.java
Date Fri, 14 May 2010 17:07:12 GMT
Author: gdusbabek
Date: Fri May 14 17:07:11 2010
New Revision: 944344

URL: http://svn.apache.org/viewvc?rev=944344&view=rev
Log:
a better StreamingServcice.getStatus. Patch by gdusbabek, reviewed by stuhood. CASSANDRA-1076

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=944344&r1=944343&r2=944344&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Fri
May 14 17:07:11 2010
@@ -49,7 +49,7 @@ public class IncomingStreamReader
 
     public void read() throws IOException
     {
-        StreamingService.instance.setStatus("Receiving stream");
+        logger.debug("Receiving stream");
         InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.socket().getRemoteSocketAddress();
         if (logger.isDebugEnabled())
           logger.debug("Creating file for " + pendingFile.getFilename());
@@ -63,7 +63,7 @@ public class IncomingStreamReader
                 bytesRead += fc.transferFrom(socketChannel, bytesRead, FileStreamTask.CHUNK_SIZE);
                 pendingFile.update(bytesRead);
             }
-            StreamingService.instance.setStatus("Receiving stream: finished reading chunk,
awaiting more");
+            logger.debug("Receiving stream: finished reading chunk, awaiting more");
         }
         catch (IOException ex)
         {
@@ -73,7 +73,7 @@ public class IncomingStreamReader
             /* Delete the orphaned file. */
             File file = new File(pendingFile.getFilename());
             file.delete();
-            StreamingService.instance.setStatus("Receiving stream: recovering from IO error");
+            logger.debug("Receiving stream: recovering from IO error");
             throw ex;
         }
         finally
@@ -88,7 +88,6 @@ public class IncomingStreamReader
                 logger.debug("Removing stream context " + pendingFile);
             }
             fc.close();
-            StreamingService.instance.setStatus(StreamingService.NOTHING);
             handleStreamCompletion(remoteAddress.getAddress());
         }
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=944344&r1=944343&r2=944344&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Fri May 14 17:07:11
2010
@@ -57,13 +57,6 @@ public class StreamOut
     private static Logger logger = LoggerFactory.getLogger(StreamOut.class);
 
     static String TABLE_NAME = "STREAMING-TABLE-NAME";
-    
-    private static void updateStatus(String msg)
-    {
-        StreamingService.instance.setStatus(msg);
-        if (logger.isInfoEnabled() && !StreamingService.NOTHING.equals(msg))
-            logger.info(msg);
-    }
 
     /**
      * Split out files for all tables on disk locally for each range and then stream them
to the target endpoint.
@@ -75,7 +68,7 @@ public class StreamOut
         // this is so that this target shows up as a destination while anticompaction is
happening.
         StreamOutManager.pendingDestinations.add(target);
 
-        logger.debug("Beginning transfer process to " + target + " for ranges " + StringUtils.join(ranges,
", "));
+        logger.info("Beginning transfer process to " + target + " for ranges " + StringUtils.join(ranges,
", "));
 
         /*
          * (1) dump all the memtables to disk.
@@ -85,7 +78,7 @@ public class StreamOut
         try
         {
             Table table = Table.open(tableName);
-            updateStatus("Flushing memtables for " + tableName + "...");
+            logger.info("Flushing memtables for " + tableName + "...");
             for (Future f : table.flush())
             {
                 try
@@ -101,7 +94,7 @@ public class StreamOut
                     throw new RuntimeException(e);
                 }
             }
-            updateStatus("Performing anticompaction ...");
+            logger.info("Performing anticompaction ...");
             /* Get the list of files that need to be streamed */
             transferSSTables(target, table.forceAntiCompaction(ranges, target), tableName);
// SSTR GC deletes the file when done
         }
@@ -111,7 +104,6 @@ public class StreamOut
         }
         finally
         {
-            StreamingService.instance.setStatus(StreamingService.NOTHING);
             StreamOutManager.remove(target);
         }
         if (callback != null)
@@ -135,22 +127,21 @@ public class StreamOut
                 pendingFiles[i++] = new PendingFile(desc, component, filelen);
             }
         }
-        if (logger.isDebugEnabled())
-            logger.debug("Stream context metadata " + StringUtils.join(pendingFiles, ", "
+ " " + sstables.size() + " sstables."));
+        logger.info("Stream context metadata " + StringUtils.join(pendingFiles, ", " + "
" + sstables.size() + " sstables."));
         StreamOutManager.get(target).addFilesToStream(pendingFiles);
         StreamInitiateMessage biMessage = new StreamInitiateMessage(pendingFiles);
         Message message = StreamInitiateMessage.makeStreamInitiateMessage(biMessage);
         message.setHeader(StreamOut.TABLE_NAME, table.getBytes());
-        updateStatus("Sending a stream initiate message to " + target + " ...");
+        logger.info("Sending a stream initiate message to " + target + " ...");
         MessagingService.instance.sendOneWay(message, target);
 
         if (pendingFiles.length > 0)
         {
-            StreamingService.instance.setStatus("Waiting for transfer to " + target + " to
complete");
+            logger.info("Waiting for transfer to " + target + " to complete");
             StreamOutManager.get(target).waitForStreamCompletion();
             // todo: it would be good if there were a dafe way to remove the StreamManager
for target.
             // (StreamManager will delete the streamed file on completion.)
-            updateStatus("Done with transfer to " + target);
+            logger.info("Done with transfer to " + target);
         }
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java?rev=944344&r1=944343&r2=944344&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java Fri May
14 17:07:11 2010
@@ -77,6 +77,19 @@ public class StreamOutManager
         hosts.addAll(pendingDestinations);
         return hosts;
     }
+    
+    /** 
+     * this method exists so that we don't have to call StreamOutManager.get() which has
a nasty side-effect of 
+     * indicating that we are streaming to a particular host.
+     **/     
+    public static List<PendingFile> getPendingFiles(InetAddress host)
+    {
+        List<PendingFile> list = new ArrayList<PendingFile>();
+        StreamOutManager manager = streamManagers.get(host);
+        if (manager != null)
+            list.addAll(manager.getFiles());
+        return list;
+    }
 
     // we need sequential and random access to the files. hence, the map and the list.
     private final List<PendingFile> files = new ArrayList<PendingFile>();

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java?rev=944344&r1=944343&r2=944344&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java Fri May
14 17:07:11 2010
@@ -35,8 +35,6 @@ public class StreamingService implements
     private static final Logger logger = LoggerFactory.getLogger(StreamingService.class);
     public static final String MBEAN_OBJECT_NAME = "org.apache.cassandra.streaming:type=StreamingService";
     public static final StreamingService instance = new StreamingService();
-    static final String NOTHING = "Nothing is happening";
-    private String status = NOTHING;
 
     private StreamingService()
     {
@@ -50,16 +48,29 @@ public class StreamingService implements
             throw new RuntimeException(e);
         }
     }
-    
-    public void setStatus(String s)
-    {
-        assert s != null;
-        status = s;
-    }
 
     public String getStatus()
     {
-        return status;
+        StringBuilder sb = new StringBuilder();
+        sb.append("Receiving from:\n");
+        for (InetAddress source : StreamInManager.getSources())
+        {
+            sb.append(String.format(" %s:\n", source.getHostAddress()));
+            for (PendingFile pf : StreamInManager.getIncomingFiles(source))
+            {
+                sb.append(String.format("  %s %d/%d\n", pf.getFilename(), pf.getPtr(), pf.getExpectedBytes()));
+            }
+        }
+        sb.append("Sending to:\n");
+        for (InetAddress dest : StreamOutManager.getDestinations())
+        {
+            sb.append(String.format(" %s:\n", dest.getHostAddress()));
+            for (PendingFile pf : StreamOutManager.getPendingFiles(dest))
+            {
+                sb.append(String.format("  %s %d/%d\n", pf.getFilename(), pf.getPtr(), pf.getExpectedBytes()));
+            }
+        }
+        return sb.toString();
     }
 
     /** hosts receiving outgoing streams. */



Mime
View raw message