accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1189478 - in /incubator/accumulo/trunk: src/server/src/main/java/org/apache/accumulo/server/master/ src/server/src/main/java/org/apache/accumulo/server/tabletserver/ src/server/src/main/java/org/apache/accumulo/server/test/functional/ test...
Date Wed, 26 Oct 2011 21:28:11 GMT
Author: kturner
Date: Wed Oct 26 21:28:10 2011
New Revision: 1189478

URL: http://svn.apache.org/viewvc?rev=1189478&view=rev
Log:
ACCUMULO-72 fixed two merge bugs.  Logical time is now handled.  The batch writer used in
merges is now closed.

Added:
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/LogicalTimeTest.java
    incubator/accumulo/trunk/test/system/auto/simple/logicalTime.py   (with props)
Modified:
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
    incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletTime.java

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1189478&r1=1189477&r2=1189478&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
(original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
Wed Oct 26 21:28:10 2011
@@ -136,6 +136,7 @@ import org.apache.accumulo.server.monito
 import org.apache.accumulo.server.security.Authenticator;
 import org.apache.accumulo.server.security.SecurityConstants;
 import org.apache.accumulo.server.security.ZKAuthenticator;
+import org.apache.accumulo.server.tabletserver.TabletTime;
 import org.apache.accumulo.server.tabletserver.log.RemoteLogger;
 import org.apache.accumulo.server.trace.TraceFileSystem;
 import org.apache.accumulo.server.util.AddressUtil;
@@ -1587,16 +1588,19 @@ public class Master implements LiveTServ
         start = new Text();
       }
       Range scanRange = new Range(KeyExtent.getMetadataEntry(range.getTableId(), start),
false, stopRow, true);
+      BatchWriter bw = null;
       try {
         long fileCount = 0;
         Connector conn = getConnector();
         // Make file entries in highest tablet
-        BatchWriter bw = conn.createBatchWriter(Constants.METADATA_TABLE_NAME, 1000000L,
1000L, 1);
+        bw = conn.createBatchWriter(Constants.METADATA_TABLE_NAME, 1000000L, 1000L, 1);
         Scanner scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
         scanner.setRange(scanRange);
         ColumnFQ.fetch(scanner, Constants.METADATA_PREV_ROW_COLUMN);
+        ColumnFQ.fetch(scanner, Constants.METADATA_TIME_COLUMN);
         scanner.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY);
         Mutation m = new Mutation(stopRow);
+        String maxLogicalTime = null;
         for (Entry<Key,Value> entry : scanner) {
           Key key = entry.getKey();
           Value value = entry.getValue();
@@ -1607,8 +1611,24 @@ public class Master implements LiveTServ
           } else if (Constants.METADATA_PREV_ROW_COLUMN.hasColumns(key) && firstPrevRowValue
== null) {
             log.debug("prevRow entry for lowest tablet is " + value);
             firstPrevRowValue = new Value(value);
+          } else if (Constants.METADATA_TIME_COLUMN.hasColumns(key)) {
+            maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, value.toString());
+          }
+        }
+        
+        // read the logical time from the last tablet in the merge range, it is not included
in
+        // the loop above
+        scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS);
+        scanner.setRange(new Range(stopRow));
+        ColumnFQ.fetch(scanner, Constants.METADATA_TIME_COLUMN);
+        for (Entry<Key,Value> entry : scanner) {
+          if (Constants.METADATA_TIME_COLUMN.hasColumns(entry.getKey())) {
+            maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, entry.getValue().toString());
           }
         }
+        
+        if (maxLogicalTime != null) ColumnFQ.put(m, Constants.METADATA_TIME_COLUMN, new Value(maxLogicalTime.getBytes()));
+        
         if (!m.getUpdates().isEmpty()) {
           bw.addMutation(m);
           bw.flush();
@@ -1660,6 +1680,12 @@ public class Master implements LiveTServ
         
       } catch (Exception ex) {
         throw new AccumuloException(ex);
+      } finally {
+        if (bw != null) try {
+          bw.close();
+        } catch (Exception ex) {
+          throw new AccumuloException(ex);
+        }
       }
     }
     

Modified: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletTime.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletTime.java?rev=1189478&r1=1189477&r2=1189478&view=diff
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletTime.java
(original)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletTime.java
Wed Oct 26 21:28:10 2011
@@ -77,6 +77,32 @@ public abstract class TabletTime {
     
   }
   
+  public static String maxMetadataTime(String mv1, String mv2) {
+    if (mv1 == null) {
+      checkType(mv2);
+      return mv2;
+    }
+    
+    if (mv2 == null) {
+      checkType(mv1);
+      return mv1;
+    }
+    
+    if (mv1.charAt(0) != mv2.charAt(0)) throw new IllegalArgumentException("Time types differ
" + mv1 + " " + mv2);
+    checkType(mv1);
+    
+    long t1 = Long.parseLong(mv1.substring(1));
+    long t2 = Long.parseLong(mv2.substring(1));
+    
+    if (t1 < t2) return mv2;
+    else return mv1;
+    
+  }
+  
+  private static void checkType(String mv1) {
+    if (mv1.charAt(0) != LOGICAL_TIME_ID && mv1.charAt(0) != MILLIS_TIME_ID) throw
new IllegalArgumentException("Invalid time type " + mv1);
+  }
+  
   static class MillisTime extends TabletTime {
     
     private long lastTime;

Added: incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/LogicalTimeTest.java
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/LogicalTimeTest.java?rev=1189478&view=auto
==============================================================================
--- incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/LogicalTimeTest.java
(added)
+++ incubator/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional/LogicalTimeTest.java
Wed Oct 26 21:28:10 2011
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.test.functional;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.TimeType;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.hadoop.io.Text;
+
+public class LogicalTimeTest extends FunctionalTest {
+  
+  @Override
+  public Map<String,String> getInitialConfig() {
+    return Collections.emptyMap();
+  }
+  
+  @Override
+  public List<TableSetup> getTablesToCreate() {
+    return Collections.emptyList();
+  }
+  
+  @Override
+  public void run() throws Exception {
+    int tc = 0;
+    
+    runMergeTest("foo" + tc++, new String[] {"m"}, new String[] {"a"}, null, null, "b", 2l);
+    runMergeTest("foo" + tc++, new String[] {"m"}, new String[] {"z"}, null, null, "b", 2l);
+    runMergeTest("foo" + tc++, new String[] {"m"}, new String[] {"a", "z"}, null, null, "b",
2l);
+    runMergeTest("foo" + tc++, new String[] {"m"}, new String[] {"a", "c", "z"}, null, null,
"b", 3l);
+    runMergeTest("foo" + tc++, new String[] {"m"}, new String[] {"a", "y", "z"}, null, null,
"b", 3l);
+    
+    runMergeTest("foo" + tc++, new String[] {"g", "r"}, new String[] {"a"}, null, null, "b",
2l);
+    runMergeTest("foo" + tc++, new String[] {"g", "r"}, new String[] {"h"}, null, null, "b",
2l);
+    runMergeTest("foo" + tc++, new String[] {"g", "r"}, new String[] {"s"}, null, null, "b",
2l);
+    runMergeTest("foo" + tc++, new String[] {"g", "r"}, new String[] {"a", "h", "s"}, null,
null, "b", 2l);
+    runMergeTest("foo" + tc++, new String[] {"g", "r"}, new String[] {"a", "c", "h", "s"},
null, null, "b", 3l);
+    runMergeTest("foo" + tc++, new String[] {"g", "r"}, new String[] {"a", "h", "s", "i"},
null, null, "b", 3l);
+    runMergeTest("foo" + tc++, new String[] {"g", "r"}, new String[] {"t", "a", "h", "s"},
null, null, "b", 3l);
+    
+    runMergeTest("foo" + tc++, new String[] {"g", "r"}, new String[] {"a"}, null, "h", "b",
2l);
+    runMergeTest("foo" + tc++, new String[] {"g", "r"}, new String[] {"h"}, null, "h", "b",
2l);
+    runMergeTest("foo" + tc++, new String[] {"g", "r"}, new String[] {"s"}, null, "h", "b",
1l);
+    runMergeTest("foo" + tc++, new String[] {"g", "r"}, new String[] {"a", "h", "s"}, null,
"h", "b", 2l);
+    runMergeTest("foo" + tc++, new String[] {"g", "r"}, new String[] {"a", "c", "h", "s"},
null, "h", "b", 3l);
+    runMergeTest("foo" + tc++, new String[] {"g", "r"}, new String[] {"a", "h", "s", "i"},
null, "h", "b", 3l);
+    runMergeTest("foo" + tc++, new String[] {"g", "r"}, new String[] {"t", "a", "h", "s"},
null, "h", "b", 2l);
+    
+  }
+  
+  private void runMergeTest(String table, String[] splits, String[] inserts, String start,
String end, String last, long expected) throws Exception {
+    Connector conn = super.getConnector();
+    conn.tableOperations().create(table, true, TimeType.LOGICAL);
+    TreeSet<Text> splitSet = new TreeSet<Text>();
+    for (String split : splits) {
+      splitSet.add(new Text(split));
+    }
+    conn.tableOperations().addSplits(table, splitSet);
+    
+    BatchWriter bw = conn.createBatchWriter(table, 1000000, 60000l, 1);
+    for (String row : inserts) {
+      Mutation m = new Mutation(row);
+      m.put("cf", "cq", "v");
+      bw.addMutation(m);
+    }
+    
+    bw.flush();
+    
+    conn.tableOperations().merge(table, start == null ? null : new Text(start), end == null
? null : new Text(end));
+    
+    Mutation m = new Mutation(last);
+    m.put("cf", "cq", "v");
+    bw.addMutation(m);
+    bw.flush();
+    
+    Scanner scanner = conn.createScanner(table, Constants.NO_AUTHS);
+    scanner.setRange(new Range(last));
+    
+    bw.close();
+    
+    long time = scanner.iterator().next().getKey().getTimestamp();
+    if (time != expected) throw new RuntimeException("unexpected time " + time + " " + expected);
+  }
+  
+  @Override
+  public void cleanup() throws Exception {}
+  
+}

Added: incubator/accumulo/trunk/test/system/auto/simple/logicalTime.py
URL: http://svn.apache.org/viewvc/incubator/accumulo/trunk/test/system/auto/simple/logicalTime.py?rev=1189478&view=auto
==============================================================================
--- incubator/accumulo/trunk/test/system/auto/simple/logicalTime.py (added)
+++ incubator/accumulo/trunk/test/system/auto/simple/logicalTime.py Wed Oct 26 21:28:10 2011
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from JavaTest import JavaTest
+
+import unittest
+
+class LogicalTimeTest(JavaTest):
+    "Logical Time Test"
+
+    order = 21
+    testClass="org.apache.accumulo.server.test.functional.LogicalTimeTest"
+
+
+def suite():
+    result = unittest.TestSuite()
+    result.addTest(LogicalTimeTest())
+    return result

Propchange: incubator/accumulo/trunk/test/system/auto/simple/logicalTime.py
------------------------------------------------------------------------------
    svn:executable = *



Mime
View raw message