accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1444984 - in /accumulo/trunk: core/src/main/java/org/apache/accumulo/core/data/ core/src/test/java/org/apache/accumulo/core/data/ server/src/main/java/org/apache/accumulo/server/data/ server/src/main/java/org/apache/accumulo/server/tablets...
Date Mon, 11 Feb 2013 22:31:51 GMT
Author: kturner
Date: Mon Feb 11 22:31:50 2013
New Revision: 1444984

URL: http://svn.apache.org/r1444984
Log:
ACCUMULO-1010 fixed bug in mutation that was preventing upgrade of accumulo 1.4 to 1.5.

Modified:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/data/Mutation.java
    accumulo/trunk/core/src/test/java/org/apache/accumulo/core/data/MutationTest.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/data/ServerMutation.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/data/Mutation.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/data/Mutation.java?rev=1444984&r1=1444983&r2=1444984&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/data/Mutation.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/data/Mutation.java Mon Feb
11 22:31:50 2013
@@ -468,35 +468,6 @@ public class Mutation implements Writabl
   }
 
   private ColumnUpdate deserializeColumnUpdate(SimpleReader in) {
-    if (useOldDeserialize)
-      return oldDeserializeColumnUpdate(in);
-    return newDeserializeColumnUpdate(in);
-  }
-  
-  private ColumnUpdate oldDeserializeColumnUpdate(SimpleReader in) {
-    byte[] cf = oldReadBytes(in);
-    byte[] cq = oldReadBytes(in);
-    byte[] cv = oldReadBytes(in);
-    boolean hasts = in.readBoolean();
-    long ts = in.readLong();
-    boolean deleted = in.readBoolean();
-    
-    byte[] val;
-    int valLen = in.readInt();
-    
-    if (valLen < 0) {
-      val = values.get((-1 * valLen) - 1);
-    } else if (valLen == 0) {
-      val = EMPTY_BYTES;
-    } else {
-      val = new byte[valLen];
-      in.readBytes(val);
-    }
-    
-    return newColumnUpdate(cf, cq, cv, hasts, ts, deleted, val);
-  }
-  
-  private ColumnUpdate newDeserializeColumnUpdate(SimpleReader in) {
     byte[] cf = readBytes(in);
     byte[] cq = readBytes(in);
     byte[] cv = readBytes(in);
@@ -557,11 +528,6 @@ public class Mutation implements Writabl
   
   @Override
   public void readFields(DataInput in) throws IOException {
-    byte first = in.readByte();
-    if ((first & 0x80) != 0x80) {
-      oldReadFields(first, in);
-      return;
-    }
     
     // Clear out cached column updates and value lengths so
     // that we recalculate them based on the (potentially) new
@@ -569,7 +535,15 @@ public class Mutation implements Writabl
     updates = null;
     cachedValLens = -1;
     buffer = null;
+    useOldDeserialize = false;
     
+    byte first = in.readByte();
+    if ((first & 0x80) != 0x80) {
+      oldReadFields(first, in);
+      useOldDeserialize = true;
+      return;
+    }
+
     int len = WritableUtils.readVInt(in);
     row = new byte[len];
     in.readFully(row);
@@ -593,14 +567,10 @@ public class Mutation implements Writabl
     }
   }
   
-  public void oldReadFields(byte first, DataInput in) throws IOException {
-    // Clear out cached column updates and value lengths so
-    // that we recalculate them based on the (potentially) new
-    // data we are about to read in.
-    useOldDeserialize = true;
-    updates = null;
-    cachedValLens = -1;
-    buffer = null;
+  protected void droppingOldTimestamp(long ts) {}
+
+  private void oldReadFields(byte first, DataInput in) throws IOException {
+
     byte b = (byte)in.readByte();
     byte c = (byte)in.readByte();
     byte d = (byte)in.readByte();
@@ -610,27 +580,56 @@ public class Mutation implements Writabl
     row = new byte[len];
     in.readFully(row);
     len = in.readInt();
-    data = new byte[len];
-    in.readFully(data);
-    entries = in.readInt();
+    byte[] localData = new byte[len];
+    in.readFully(localData);
+    int localEntries = in.readInt();
     
+    List<byte[]> localValues;
     boolean valuesPresent = in.readBoolean();
     if (!valuesPresent) {
-      values = null;
+      localValues = null;
     } else {
-      values = new ArrayList<byte[]>();
+      localValues = new ArrayList<byte[]>();
       int numValues = in.readInt();
       for (int i = 0; i < numValues; i++) {
         len = in.readInt();
         byte val[] = new byte[len];
         in.readFully(val);
-        values.add(val);
+        localValues.add(val);
       }
     }
-  }
-  
+    
+    // convert data to new format
+    SimpleReader din = new SimpleReader(localData);
+    buffer = new ByteBuffer();
+    for (int i = 0; i < localEntries; i++) {
+      byte[] cf = oldReadBytes(din);
+      byte[] cq = oldReadBytes(din);
+      byte[] cv = oldReadBytes(din);
+      boolean hasts = din.readBoolean();
+      long ts = din.readLong();
+      boolean deleted = din.readBoolean();
+      
+      byte[] val;
+      int valLen = din.readInt();
+      
+      if (valLen < 0) {
+        val = localValues.get((-1 * valLen) - 1);
+      } else if (valLen == 0) {
+        val = EMPTY_BYTES;
+      } else {
+        val = new byte[valLen];
+        din.readBytes(val);
+      }
+      
+      put(cf, cq, cv, hasts, ts, deleted, val);
+      if (!hasts)
+        droppingOldTimestamp(ts);
+    }
 
-  
+    serialize();
+
+  }
   
   @Override
   public void write(DataOutput out) throws IOException {
@@ -640,6 +639,7 @@ public class Mutation implements Writabl
     
     WritableUtils.writeVInt(out, row.length);
     out.write(row);
+
     WritableUtils.writeVInt(out, data.length);
     out.write(data);
     WritableUtils.writeVInt(out, entries);
@@ -691,7 +691,7 @@ public class Mutation implements Writabl
     return new TMutation(java.nio.ByteBuffer.wrap(row), java.nio.ByteBuffer.wrap(data), ByteBufferUtil.toByteBuffers(values),
entries);
   }
   
-  public SERIALIZED_FORMAT getSerializedFormat() {
+  protected SERIALIZED_FORMAT getSerializedFormat() {
     return this.useOldDeserialize ? SERIALIZED_FORMAT.VERSION1 : SERIALIZED_FORMAT.VERSION2;
   }
 

Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/data/MutationTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/data/MutationTest.java?rev=1444984&r1=1444983&r2=1444984&view=diff
==============================================================================
--- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/data/MutationTest.java (original)
+++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/data/MutationTest.java Mon
Feb 11 22:31:50 2013
@@ -444,7 +444,7 @@ public class MutationTest extends TestCa
     assertEquals(3, m1.size());
     assertEquals(m1.getUpdates().get(0), "cf1", "cq1", "", 0l, false, false, "v1");
     assertEquals(m1.getUpdates().get(1), "cf2", "cq2", "cv2", 0l, false, false, "v2");
-    assertEquals(m2.getUpdates().get(2), "cf3", "cq3", "", 0l, false, true, "");
+    assertEquals(m1.getUpdates().get(2), "cf3", "cq3", "", 0l, false, true, "");
     
     Text exampleRow = new Text(" 123456789 123456789 123456789 123456789 123456789");
     int exampleLen = exampleRow.getLength();
@@ -473,4 +473,39 @@ public class MutationTest extends TestCa
     
   }
   
+  public void testReserialize() throws Exception {
+    // test reading in a new mutation from an old mutation and reserializing the new mutation...
this was failing
+    OldMutation om = new OldMutation("r1");
+    om.put("cf1", "cq1", "v1");
+    om.put("cf2", "cq2", new ColumnVisibility("cv2"), "v2");
+    om.putDelete("cf3", "cq3");
+    StringBuilder bigVal = new StringBuilder();
+    for (int i = 0; i < 100000; i++) {
+      bigVal.append('a');
+    }
+    om.put("cf2", "big", bigVal);
+
+    
+    Mutation m1 = convert(om);
+
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(bos);
+    m1.write(dos);
+    dos.close();
+    
+    Mutation m2 = new Mutation();
+    
+    ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
+    DataInputStream dis = new DataInputStream(bis);
+    m2.readFields(dis);
+    
+    assertEquals("r1", new String(m1.getRow()));
+    assertEquals(4, m2.getUpdates().size());
+    assertEquals(4, m2.size());
+    assertEquals(m2.getUpdates().get(0), "cf1", "cq1", "", 0l, false, false, "v1");
+    assertEquals(m2.getUpdates().get(1), "cf2", "cq2", "cv2", 0l, false, false, "v2");
+    assertEquals(m2.getUpdates().get(2), "cf3", "cq3", "", 0l, false, true, "");
+    assertEquals(m2.getUpdates().get(3), "cf2", "big", "", 0l, false, false, bigVal.toString());
+  }
+
 }

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/data/ServerMutation.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/data/ServerMutation.java?rev=1444984&r1=1444983&r2=1444984&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/data/ServerMutation.java
(original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/data/ServerMutation.java
Mon Feb 11 22:31:50 2013
@@ -16,8 +16,6 @@
  */
 package org.apache.accumulo.server.data;
 
-import static org.apache.accumulo.core.data.Mutation.SERIALIZED_FORMAT.VERSION2;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -45,21 +43,16 @@ public class ServerMutation extends Muta
   public ServerMutation() {
   }
 
+  protected void droppingOldTimestamp(long ts) {
+    this.systemTime = ts;
+  }
+
   @Override
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
     // new format writes system time with the mutation
-    if (getSerializedFormat() == VERSION2)
+    if (getSerializedFormat() == SERIALIZED_FORMAT.VERSION2)
       systemTime = WritableUtils.readVLong(in);
-    else {
-      // old format stored it in the timestamp of each mutation
-      for (ColumnUpdate upd : getUpdates()) {
-        if (!upd.hasTimestamp()) {
-          systemTime = upd.getTimestamp();
-          break;
-        }
-      }
-    }
   }
   
   @Override

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java?rev=1444984&r1=1444983&r2=1444984&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
(original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/log/DfsLogger.java
Mon Feb 11 22:31:50 2013
@@ -222,8 +222,8 @@ public class DfsLogger {
     try {
       byte[] magic = LOG_FILE_HEADER_V2.getBytes();
       byte[] buffer = new byte[magic.length];
-      int read = file.read(buffer);
-      if (read == magic.length && Arrays.equals(buffer, magic)) {
+      file.readFully(buffer);
+      if (Arrays.equals(buffer, magic)) {
         int count = file.readInt();
         for (int i = 0; i < count; i++) {
           String key = file.readUTF();



Mime
View raw message