accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject git commit: ACCUMULO-3021 Added constraint for max size of end rows
Date Thu, 21 Aug 2014 20:57:58 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master c884c8259 -> 98327959f


ACCUMULO-3021 Added constraint for max size of end rows

Signed-off-by: Keith Turner <kturner@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/98327959
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/98327959
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/98327959

Branch: refs/heads/master
Commit: 98327959fe88f3bc6dc5e52672a0d3a9f6832c97
Parents: c884c82
Author: Jenna Huston <jenna.huston22@gmail.com>
Authored: Wed Aug 13 11:03:12 2014 -0400
Committer: Keith Turner <kturner@apache.org>
Committed: Thu Aug 21 16:56:41 2014 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java |   3 +-
 .../apache/accumulo/tserver/tablet/Tablet.java  |  38 ++-
 .../apache/accumulo/test/LargeSplitRowIT.java   | 283 +++++++++++++++++++
 3 files changed, 319 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/98327959/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 85c56f8..c303648 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -356,6 +356,7 @@ public enum Property {
           + "Compactions of idle tablets are only started when regular compactions are not
running. Idle "
           + "compactions only take place for tablets that have one or more files."),
   TABLE_SPLIT_THRESHOLD("table.split.threshold", "1G", PropertyType.MEMORY, "When combined
size of files exceeds this amount a tablet is split."),
+  TABLE_MAX_END_ROW_SIZE("table.split.endrow.size.max", "10K", PropertyType.MEMORY, "Maximum
size of end row"),
   TABLE_MINC_LOGS_MAX("table.compaction.minor.logs.threshold", "3", PropertyType.COUNT,
       "When there are more than this many write-ahead logs against a tablet, it will be minor
compacted. See comment for property tserver.memory.maps.max"),
   TABLE_MINC_COMPACT_IDLETIME("table.compaction.minor.idle", "5m", PropertyType.TIMEDURATION,
@@ -506,7 +507,7 @@ public enum Property {
   /**
    * Gets the key (string) for this property.
    *
-   * @return keuy
+   * @return key
    */
   public String getKey() {
     return this.key;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98327959/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 82045b6..37950fc 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -1582,6 +1582,8 @@ public class Tablet implements TabletCommitter {
     // check to see if we're big enough to split
 
     long splitThreshold = tableConfiguration.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD);
+    long maxEndRow = tableConfiguration.getMemoryInBytes(Property.TABLE_MAX_END_ROW_SIZE);
+
     if (extent.isRootTablet() || estimateTabletSize() <= splitThreshold) {
       return null;
     }
@@ -1601,7 +1603,8 @@ public class Tablet implements TabletCommitter {
 
     try {
       // we should make .25 below configurable
-      keys = FileUtil.findMidPoint(getTabletServer().getFileSystem(), getTabletServer().getConfiguration(),
extent.getPrevEndRow(), extent.getEndRow(), FileUtil.toPathStrings(files), .25);
+      keys = FileUtil.findMidPoint(getTabletServer().getFileSystem(), getTabletServer().getConfiguration(),
extent.getPrevEndRow(), extent.getEndRow(),
+          FileUtil.toPathStrings(files), .25);
     } catch (IOException e) {
       log.error("Failed to find midpoint " + e.getMessage());
       return null;
@@ -1628,6 +1631,15 @@ public class Tablet implements TabletCommitter {
       if (mid.compareRow(lastRow) == 0) {
         if (keys.firstKey() < .5) {
           Key candidate = keys.get(keys.firstKey());
+          if (candidate.getLength() > maxEndRow) {
+            log.warn("Cannot split tablet " + extent + ", selected split point too long.
 Length :  " + candidate.getLength());
+
+            sawBigRow = true;
+            timeOfLastMinCWhenBigFreakinRowWasSeen = lastMinorCompactionFinishTime;
+            timeOfLastImportWhenBigFreakinRowWasSeen = lastMapFileImportTime;
+
+            return null;
+          }
           if (candidate.compareRow(lastRow) != 0) {
             // we should use this ratio in split size estimations
             if (log.isTraceEnabled())
@@ -1645,6 +1657,7 @@ public class Tablet implements TabletCommitter {
 
         return null;
       }
+
       Text text = mid.getRow();
       SortedMap<Double,Key> firstHalf = keys.headMap(.5);
       if (firstHalf.size() > 0) {
@@ -1654,12 +1667,24 @@ public class Tablet implements TabletCommitter {
         shorter.set(text.getBytes(), 0, Math.min(text.getLength(), trunc + 1));
         text = shorter;
       }
+
+      if (text.getLength() > maxEndRow) {
+        log.warn("Cannot split tablet " + extent + ", selected split point too long.  Length
:  " + text.getLength());
+
+        sawBigRow = true;
+        timeOfLastMinCWhenBigFreakinRowWasSeen = lastMinorCompactionFinishTime;
+        timeOfLastImportWhenBigFreakinRowWasSeen = lastMapFileImportTime;
+
+        return null;
+      }
+
       return new SplitRowSpec(.5, text);
     } catch (IOException e) {
       // don't split now, but check again later
       log.error("Failed to find lastkey " + e.getMessage());
       return null;
     }
+
   }
 
   private static int longestCommonLength(Text text, Text beforeMid) {
@@ -2100,10 +2125,16 @@ public class Tablet implements TabletCommitter {
   }
 
   public TreeMap<KeyExtent,SplitInfo> split(byte[] sp) throws IOException {
-
+	  
     if (sp != null && extent.getEndRow() != null && extent.getEndRow().equals(new
Text(sp))) {
       throw new IllegalArgumentException();
     }
+    
+    if (sp != null && sp.length > tableConfiguration.getMemoryInBytes(Property.TABLE_MAX_END_ROW_SIZE))
{
+      String msg = "Cannot split tablet " + extent + ", selected split point too long.  Length
:  " + sp.length;
+      log.warn(msg);
+      throw new IOException(msg);
+    }
 
     if (extent.isRootTablet()) {
       String msg = "Cannot split root tablet";
@@ -2130,7 +2161,6 @@ public class Tablet implements TabletCommitter {
       TreeMap<KeyExtent,SplitInfo> newTablets = new TreeMap<KeyExtent,SplitInfo>();
 
       long t1 = System.currentTimeMillis();
-
       // choose a split point
       SplitRowSpec splitPoint;
       if (sp == null)
@@ -2146,7 +2176,7 @@ public class Tablet implements TabletCommitter {
         closeState = CloseState.OPEN;
         return null;
       }
-
+      
       closeState = CloseState.CLOSING;
       completeClose(true, false);
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/98327959/test/src/test/java/org/apache/accumulo/test/LargeSplitRowIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/LargeSplitRowIT.java b/test/src/test/java/org/apache/accumulo/test/LargeSplitRowIT.java
new file mode 100644
index 0000000..85aec59
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/LargeSplitRowIT.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.impl.AccumuloServerException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class LargeSplitRowIT extends ConfigurableMacIT {
+  static private final Logger log = Logger.getLogger(LargeSplitRowIT.class);
+
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setNumTservers(1);
+
+    Map<String,String> siteConfig = new HashMap<String,String>();
+    siteConfig.put(Property.TSERV_MAJC_DELAY.getKey(), "50ms");
+    cfg.setSiteConfig(siteConfig);
+  }
+
+  // User added split
+  @Test(timeout = 60 * 1000)
+  public void userAddedSplit() throws Exception {
+
+    log.info("User added split");
+
+    // make a table and lower the TABLE_END_ROW_MAX_SIZE property
+    final String tableName = getUniqueNames(1)[0];
+    final Connector conn = getConnector();
+    conn.tableOperations().create(tableName);
+    conn.tableOperations().setProperty(tableName, Property.TABLE_MAX_END_ROW_SIZE.getKey(),
"1000");
+
+    // Create a BatchWriter and add a mutation to the table
+    BatchWriter batchWriter = conn.createBatchWriter(tableName, new BatchWriterConfig());
+    Mutation m = new Mutation("Row");
+    m.put("cf", "cq", "value");
+    batchWriter.addMutation(m);
+    batchWriter.close();
+
+    // Create a split point that is too large to be an end row and fill it with all 'm'
+    SortedSet<Text> partitionKeys = new TreeSet<Text>();
+    byte data[] = new byte[(int) (TableConfiguration.getMemoryInBytes(Property.TABLE_MAX_END_ROW_SIZE.getDefaultValue())
+ 2)];
+    for (int i = 0; i < data.length; i++) {
+      data[i] = 'm';
+    }
+    partitionKeys.add(new Text(data));
+
+    // try to add the split point that is too large, if the split point is created the test
fails.
+    try {
+      conn.tableOperations().addSplits(tableName, partitionKeys);
+      Assert.fail();
+    } catch (AccumuloServerException e) {}
+
+    // Make sure that the information that was written to the table before we tried to add
the split point is still correct
+    int counter = 0;
+    final Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY);
+    for (Entry<Key,Value> entry : scanner) {
+      counter++;
+      Key k = entry.getKey();
+      Assert.assertEquals("Row", k.getRow().toString());
+      Assert.assertEquals("cf", k.getColumnFamily().toString());
+      Assert.assertEquals("cq", k.getColumnQualifier().toString());
+      Assert.assertEquals("value", entry.getValue().toString());
+
+    }
+    // Make sure there is only one line in the table
+    Assert.assertEquals(1, counter);
+  }
+
+  // Test tablet server split with 250 entries with all the same prefix
+  @Test(timeout = 60 * 1000)
+  public void automaticSplitWith250Same() throws Exception {
+    log.info("Automatic with 250 with same prefix");
+
+    // make a table and lower the configure properties
+    final String tableName = getUniqueNames(1)[0];
+    final Connector conn = getConnector();
+    conn.tableOperations().create(tableName);
+    conn.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(),
"10K");
+    conn.tableOperations().setProperty(tableName, Property.TABLE_FILE_COMPRESSION_TYPE.getKey(),
"none");
+    conn.tableOperations().setProperty(tableName, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(),
"64");
+    conn.tableOperations().setProperty(tableName, Property.TABLE_MAX_END_ROW_SIZE.getKey(),
"1000");
+
+    // Create a BatchWriter and key for a table entry that is longer than the allowed size
for an end row
+    // Fill this key with all m's except the last spot
+    BatchWriter batchWriter = conn.createBatchWriter(tableName, new BatchWriterConfig());
+    byte data[] = new byte[(int) (TableConfiguration.getMemoryInBytes(Property.TABLE_MAX_END_ROW_SIZE.getDefaultValue())
+ 2)];
+    for (int i = 0; i < data.length - 1; i++) {
+      data[i] = (byte) 'm';
+    }
+
+    // Make the last place in the key different for every entry added to the table
+    for (int i = 0; i < 250; i++) {
+      data[data.length - 1] = (byte) i;
+      Mutation m = new Mutation(data);
+      m.put("cf", "cq", "value");
+      batchWriter.addMutation(m);
+    }
+    // Flush the BatchWriter and table and sleep for a bit to make sure that there is enough
time for the table to split if need be.
+    batchWriter.close();
+    conn.tableOperations().flush(tableName, new Text(), new Text("z"), true);
+    Thread.sleep(500);
+
+    // Make sure all the data that was put in the table is still correct
+    int count = 0;
+    final Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY);
+    for (Entry<Key,Value> entry : scanner) {
+      Key k = entry.getKey();
+      data[data.length - 1] = (byte) count;
+      String expected = new String(data, StandardCharsets.UTF_8);
+      Assert.assertEquals(expected, k.getRow().toString());
+      Assert.assertEquals("cf", k.getColumnFamily().toString());
+      Assert.assertEquals("cq", k.getColumnQualifier().toString());
+      Assert.assertEquals("value", entry.getValue().toString());
+      count++;
+    }
+    Assert.assertEquals(250, count);
+
+    // Make sure no splits occurred in the table
+    Assert.assertEquals(0, conn.tableOperations().listSplits(tableName).size());
+  }
+
+  // 10 0's; 10 2's; 10 4's... 10 30's etc
+  @Test(timeout = 60 * 1000)
+  public void automaticSplitWithGaps() throws Exception {
+    log.info("Automatic Split With Gaps");
+
+    automaticSplit(30, 2);
+  }
+
+  // 10 0's; 10 1's; 10 2's... 10 15's etc
+  @Test(timeout = 60 * 1000)
+  public void automaticSplitWithoutGaps() throws Exception {
+    log.info("Automatic Split Without Gaps");
+
+    automaticSplit(15, 1);
+  }
+
+  @Test(timeout = 60 * 1000)
+  public void automaticSplitLater() throws Exception {
+    log.info("Split later");
+    automaticSplit(15, 1);
+
+    final Connector conn = getConnector();
+
+    String tableName = new String();
+    java.util.Iterator<String> iterator = conn.tableOperations().list().iterator();
+
+    while (iterator.hasNext()) {
+      String curr = iterator.next();
+      if (!curr.equals("accumulo.metadata") && !curr.equals("accumulo.root")) {
+        tableName = curr;
+      }
+    }
+
+    // Create a BatchWriter and key for a table entry that is longer than the allowed size
for an end row
+    BatchWriter batchWriter = conn.createBatchWriter(tableName, new BatchWriterConfig());
+    byte data[] = new byte[10];
+
+    // Fill key with all j's except for last spot which alternates through 1 through 10 for
every j value
+    for (int j = 15; j < 150; j += 1) {
+      for (int i = 0; i < data.length - 1; i++) {
+        data[i] = (byte) j;
+      }
+
+      for (int i = 0; i < 25; i++) {
+        data[data.length - 1] = (byte) i;
+        Mutation m = new Mutation(data);
+        m.put("cf", "cq", "value");
+        batchWriter.addMutation(m);
+      }
+    }
+    // Flush the BatchWriter and table and sleep for a bit to make sure that there is enough
time for the table to split if need be.
+    batchWriter.close();
+    conn.tableOperations().flush(tableName, new Text(), new Text("z"), true);
+
+    // Make sure a split occurs
+    while (conn.tableOperations().listSplits(tableName).size() == 0) {
+      Thread.sleep(250);
+    }
+
+    Assert.assertTrue(0 < conn.tableOperations().listSplits(tableName).size());
+  }
+
+  private void automaticSplit(int max, int spacing) throws Exception {
+    // make a table and lower the configure properties
+    final String tableName = getUniqueNames(1)[0];
+    final Connector conn = getConnector();
+    conn.tableOperations().create(tableName);
+    conn.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(),
"10K");
+    conn.tableOperations().setProperty(tableName, Property.TABLE_FILE_COMPRESSION_TYPE.getKey(),
"none");
+    conn.tableOperations().setProperty(tableName, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(),
"64");
+    conn.tableOperations().setProperty(tableName, Property.TABLE_MAX_END_ROW_SIZE.getKey(),
"1000");
+
+    // Create a BatchWriter and key for a table entry that is longer than the allowed size
for an end row
+    BatchWriter batchWriter = conn.createBatchWriter(tableName, new BatchWriterConfig());
+    byte data[] = new byte[(int) (TableConfiguration.getMemoryInBytes(Property.TABLE_MAX_END_ROW_SIZE.getDefaultValue())
+ 2)];
+
+    // Fill key with all j's except for last spot which alternates through 1 through 10 for
every j value
+    for (int j = 0; j < max; j += spacing) {
+      for (int i = 0; i < data.length - 1; i++) {
+        data[i] = (byte) j;
+      }
+
+      for (int i = 0; i < 10; i++) {
+        data[data.length - 1] = (byte) i;
+        Mutation m = new Mutation(data);
+        m.put("cf", "cq", "value");
+        batchWriter.addMutation(m);
+      }
+    }
+    // Flush the BatchWriter and table and sleep for a bit to make sure that there is enough
time for the table to split if need be.
+    batchWriter.close();
+    conn.tableOperations().flush(tableName, new Text(), new Text("z"), true);
+    Thread.sleep(500);
+
+    // Make sure all the data that was put in the table is still correct
+    int count = 0;
+    int extra = 10;
+    final Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY);
+    for (Entry<Key,Value> entry : scanner) {
+      if (extra == 10) {
+        extra = 0;
+        for (int i = 0; i < data.length - 1; i++) {
+          data[i] = (byte) count;
+        }
+        count += spacing;
+
+      }
+      Key k = entry.getKey();
+      data[data.length - 1] = (byte) extra;
+      String expected = new String(data, StandardCharsets.UTF_8);
+      Assert.assertEquals(expected, k.getRow().toString());
+      Assert.assertEquals("cf", k.getColumnFamily().toString());
+      Assert.assertEquals("cq", k.getColumnQualifier().toString());
+      Assert.assertEquals("value", entry.getValue().toString());
+      extra++;
+    }
+    Assert.assertEquals(10, extra);
+    Assert.assertEquals(max, count);
+
+    // Make sure no splits occured in the table
+    Assert.assertEquals(0, conn.tableOperations().listSplits(tableName).size());
+
+  }
+
+}


Mime
View raw message