chukwa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From asrab...@apache.org
Subject svn commit: r811521 - in /hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util: ConstRateAdaptor.java ConstRateValidator.java
Date Fri, 04 Sep 2009 19:16:40 GMT
Author: asrabkin
Date: Fri Sep  4 19:16:40 2009
New Revision: 811521

URL: http://svn.apache.org/viewvc?rev=811521&view=rev
Log:
CHUKWA-368. Some improvements to display of results

Modified:
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java
    hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateValidator.java

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java?rev=811521&r1=811520&r2=811521&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateAdaptor.java Fri Sep
 4 19:16:40 2009
@@ -89,12 +89,10 @@
   }
 
   public void run() {
-    Random timeCoin = new Random();
+    Random timeCoin = new Random(seed);
     try {
       while (!stopping) {
-        int MSToSleep = timeCoin.nextInt(SLEEP_VARIANCE) + MIN_SLEEP; // between 1 and
-                                                               // 3 secs
-        // FIXME: I think there's still a risk of integer overflow here
+        int MSToSleep = timeCoin.nextInt(SLEEP_VARIANCE) + MIN_SLEEP; 
         int arraySize = (int) (MSToSleep * (long) bytesPerSec / 1000L);
         ChunkImpl evt = nextChunk(arraySize + 8);
 

Modified: hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateValidator.java
URL: http://svn.apache.org/viewvc/hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateValidator.java?rev=811521&r1=811520&r2=811521&view=diff
==============================================================================
--- hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateValidator.java (original)
+++ hadoop/chukwa/trunk/src/java/org/apache/hadoop/chukwa/util/ConstRateValidator.java Fri
Sep  4 19:16:40 2009
@@ -39,6 +39,7 @@
   public static class ByteRange implements WritableComparable<ByteRange> {
     
     String stream;
+    String split ="";
     public long start;
     public long len;
     
@@ -56,16 +57,17 @@
     @Override
     public void readFields(DataInput in) throws IOException {
       stream = in.readUTF();
+      split = in.readUTF();
       start = in.readLong();
       len = in.readLong();
     }
     @Override
     public void write(DataOutput out) throws IOException {
       out.writeUTF(stream);
+      out.writeUTF(split);
       out.writeLong(start);
       out.writeLong(len);
     }
-    
 
     public static ByteRange read(DataInput in) throws IOException {
       ByteRange b = new ByteRange();
@@ -89,14 +91,15 @@
         else if(len < o.len)
           return -1;
         else
-          return 0;
+          return split.compareTo(o.split);
       }
     }
     
     public boolean equals(Object o) {
       if(o instanceof ByteRange) {
         ByteRange rhs = (ByteRange) o;
-        return stream.equals(rhs.stream) && rhs.start == start && rhs.len
== len;
+        return stream.equals(rhs.stream) &&
+         split.equals(rhs.split)&& rhs.start == start && rhs.len == len;
       } else
         return false;
     }
@@ -105,16 +108,19 @@
       return (int) (
           stream.hashCode() ^ (len>>32) ^ (len & 0xFFFFFFFF) ^ (start >>
32)
           ^ (start & 0xFFFFFFFF));
-    }
-    
+    } 
   }
   
+  
+  
+  ///////  State machine; expects chunks in order ////////
   public static class ValidatorSM {
     public long ok=0, missingBytes=0,dupBytes=0;
     long consecDupchunks=0;
     long nextExpectedStart = 0;
     public long chunks;
     public long dupChunks;
+    public Set<String> filesContaining = new LinkedHashSet<String>();
 
     public String closeSM() {
       if(consecDupchunks > 0)
@@ -124,6 +130,9 @@
     }
     
     public String advanceSM(ByteRange b) {
+      if(!b.split.equals(""))
+        filesContaining.add(b.split);
+      
       chunks++;
       
       if(b.start == nextExpectedStart) {
@@ -168,6 +177,8 @@
     } //end advance
   } //end class
   
+  
+  ///////  Map Class /////////
   public static class MapClass extends Mapper <ChukwaArchiveKey, ChunkImpl, ByteRange,
NullWritable> {
     
     @Override
@@ -178,16 +189,19 @@
       boolean valid = ConstRateAdaptor.checkChunk(val);
       String fname = "unknown";
       
+      ByteRange ret = new ByteRange(val);
+      
       InputSplit inSplit = context.getInputSplit();
       if(inSplit instanceof FileSplit) {
         FileSplit fs = (FileSplit) inSplit;
         fname = fs.getPath().getName();
       }
+      ret.split = fname;
       
       if(!valid) {
         context.getCounter("app", "badchunks").increment(1);
       }
-      context.write(new ByteRange(val), NullWritable.get());
+      context.write(ret, NullWritable.get());
     }
   }
     
@@ -210,18 +224,7 @@
 
       if(!curStream.equals(b.stream)) {
         if(!curStream.equals("")) {
-          Text cs = new Text(curStream);
-
-          String t = sm.closeSM();
-          if(t != null)
-            context.write(cs, new Text(t));
-
-          context.write(cs, new Text("total of " + sm.chunks + " chunks ("
-             + sm.dupChunks + " dups). " +" High byte =" + (sm.nextExpectedStart-1)));
-          
-          context.getCounter("app", "missing bytes").increment(sm.missingBytes);
-          context.getCounter("app", "duplicate bytes").increment(sm.dupBytes);
-          context.getCounter("app", "OK Bytes").increment(sm.ok);
+          printEndOfStream(context);
         }
         
         System.out.println("rolling over to new stream " + b.stream);
@@ -238,6 +241,34 @@
       e.printStackTrace();
     }
   }
+    
+    @Override
+    protected void cleanup(Reducer<ByteRange, NullWritable, Text,Text>.Context context)
+    throws IOException, InterruptedException{
+      printEndOfStream(context);
+    }
+    
+    public void printEndOfStream(Reducer<ByteRange, NullWritable, Text,Text>.Context
context) 
+    throws IOException, InterruptedException {
+      Text cs = new Text(curStream);
+
+      String t = sm.closeSM();
+      if(t != null)
+        context.write(cs, new Text(t));
+      if(!sm.filesContaining.isEmpty()) {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Data contained in");
+        for(String s: sm.filesContaining) 
+          sb.append(" ").append(s);
+        context.write(cs, new Text(sb.toString()));
+      }
+      context.write(cs, new Text("total of " + sm.chunks + " chunks ("
+         + sm.dupChunks + " dups). " +" High byte =" + (sm.nextExpectedStart-1)));
+      
+      context.getCounter("app", "missing bytes").increment(sm.missingBytes);
+      context.getCounter("app", "duplicate bytes").increment(sm.dupBytes);
+      context.getCounter("app", "OK Bytes").increment(sm.ok);
+    }
   } //end reduce class
 
 



Mime
View raw message