Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 53BD811A17 for ; Thu, 21 Aug 2014 20:57:59 +0000 (UTC) Received: (qmail 15461 invoked by uid 500); 21 Aug 2014 20:57:59 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 15424 invoked by uid 500); 21 Aug 2014 20:57:59 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 15410 invoked by uid 99); 21 Aug 2014 20:57:59 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Aug 2014 20:57:59 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id F11959C711F; Thu, 21 Aug 2014 20:57:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kturner@apache.org To: commits@accumulo.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: ACCUMULO-3021 Added constraint for max size of end rows Date: Thu, 21 Aug 2014 20:57:58 +0000 (UTC) Repository: accumulo Updated Branches: refs/heads/master c884c8259 -> 98327959f ACCUMULO-3021 Added constraint for max size of end rows Signed-off-by: Keith Turner Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/98327959 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/98327959 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/98327959 Branch: refs/heads/master Commit: 98327959fe88f3bc6dc5e52672a0d3a9f6832c97 Parents: c884c82 Author: Jenna Huston Authored: Wed Aug 13 11:03:12 2014 -0400 Committer: Keith Turner Committed: Thu Aug 21 16:56:41 2014 -0400 ---------------------------------------------------------------------- .../org/apache/accumulo/core/conf/Property.java | 3 +- .../apache/accumulo/tserver/tablet/Tablet.java | 38 ++- .../apache/accumulo/test/LargeSplitRowIT.java | 283 +++++++++++++++++++ 3 files changed, 319 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/98327959/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 85c56f8..c303648 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -356,6 +356,7 @@ public enum Property { + "Compactions of idle tablets are only started when regular compactions are not running. Idle " + "compactions only take place for tablets that have one or more files."), TABLE_SPLIT_THRESHOLD("table.split.threshold", "1G", PropertyType.MEMORY, "When combined size of files exceeds this amount a tablet is split."), + TABLE_MAX_END_ROW_SIZE("table.split.endrow.size.max", "10K", PropertyType.MEMORY, "Maximum size of end row"), TABLE_MINC_LOGS_MAX("table.compaction.minor.logs.threshold", "3", PropertyType.COUNT, "When there are more than this many write-ahead logs against a tablet, it will be minor compacted. See comment for property tserver.memory.maps.max"), TABLE_MINC_COMPACT_IDLETIME("table.compaction.minor.idle", "5m", PropertyType.TIMEDURATION, @@ -506,7 +507,7 @@ public enum Property { /** * Gets the key (string) for this property. * - * @return keuy + * @return key */ public String getKey() { return this.key; http://git-wip-us.apache.org/repos/asf/accumulo/blob/98327959/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 82045b6..37950fc 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -1582,6 +1582,8 @@ public class Tablet implements TabletCommitter { // check to see if we're big enough to split long splitThreshold = tableConfiguration.getMemoryInBytes(Property.TABLE_SPLIT_THRESHOLD); + long maxEndRow = tableConfiguration.getMemoryInBytes(Property.TABLE_MAX_END_ROW_SIZE); + if (extent.isRootTablet() || estimateTabletSize() <= splitThreshold) { return null; } @@ -1601,7 +1603,8 @@ public class Tablet implements TabletCommitter { try { // we should make .25 below configurable - keys = FileUtil.findMidPoint(getTabletServer().getFileSystem(), getTabletServer().getConfiguration(), extent.getPrevEndRow(), extent.getEndRow(), FileUtil.toPathStrings(files), .25); + keys = FileUtil.findMidPoint(getTabletServer().getFileSystem(), getTabletServer().getConfiguration(), extent.getPrevEndRow(), extent.getEndRow(), + FileUtil.toPathStrings(files), .25); } catch (IOException e) { log.error("Failed to find midpoint " + e.getMessage()); return null; @@ -1628,6 +1631,15 @@ public class Tablet implements TabletCommitter { if (mid.compareRow(lastRow) == 0) { if (keys.firstKey() < .5) { Key candidate = keys.get(keys.firstKey()); + if (candidate.getLength() > maxEndRow) { + log.warn("Cannot split tablet " + extent + ", selected split point too long. Length : " + candidate.getLength()); + + sawBigRow = true; + timeOfLastMinCWhenBigFreakinRowWasSeen = lastMinorCompactionFinishTime; + timeOfLastImportWhenBigFreakinRowWasSeen = lastMapFileImportTime; + + return null; + } if (candidate.compareRow(lastRow) != 0) { // we should use this ratio in split size estimations if (log.isTraceEnabled()) @@ -1645,6 +1657,7 @@ public class Tablet implements TabletCommitter { return null; } + Text text = mid.getRow(); SortedMap firstHalf = keys.headMap(.5); if (firstHalf.size() > 0) { @@ -1654,12 +1667,24 @@ public class Tablet implements TabletCommitter { shorter.set(text.getBytes(), 0, Math.min(text.getLength(), trunc + 1)); text = shorter; } + + if (text.getLength() > maxEndRow) { + log.warn("Cannot split tablet " + extent + ", selected split point too long. Length : " + text.getLength()); + + sawBigRow = true; + timeOfLastMinCWhenBigFreakinRowWasSeen = lastMinorCompactionFinishTime; + timeOfLastImportWhenBigFreakinRowWasSeen = lastMapFileImportTime; + + return null; + } + return new SplitRowSpec(.5, text); } catch (IOException e) { // don't split now, but check again later log.error("Failed to find lastkey " + e.getMessage()); return null; } + } private static int longestCommonLength(Text text, Text beforeMid) { @@ -2100,10 +2125,16 @@ public class Tablet implements TabletCommitter { } public TreeMap split(byte[] sp) throws IOException { - + if (sp != null && extent.getEndRow() != null && extent.getEndRow().equals(new Text(sp))) { throw new IllegalArgumentException(); } + + if (sp != null && sp.length > tableConfiguration.getMemoryInBytes(Property.TABLE_MAX_END_ROW_SIZE)) { + String msg = "Cannot split tablet " + extent + ", selected split point too long. Length : " + sp.length; + log.warn(msg); + throw new IOException(msg); + } if (extent.isRootTablet()) { String msg = "Cannot split root tablet"; @@ -2130,7 +2161,6 @@ public class Tablet implements TabletCommitter { TreeMap newTablets = new TreeMap(); long t1 = System.currentTimeMillis(); - // choose a split point SplitRowSpec splitPoint; if (sp == null) @@ -2146,7 +2176,7 @@ public class Tablet implements TabletCommitter { closeState = CloseState.OPEN; return null; } - + closeState = CloseState.CLOSING; completeClose(true, false); http://git-wip-us.apache.org/repos/asf/accumulo/blob/98327959/test/src/test/java/org/apache/accumulo/test/LargeSplitRowIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/LargeSplitRowIT.java b/test/src/test/java/org/apache/accumulo/test/LargeSplitRowIT.java new file mode 100644 index 0000000..85aec59 --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/LargeSplitRowIT.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map.Entry; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.impl.AccumuloServerException; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.conf.TableConfiguration; +import org.apache.accumulo.test.functional.ConfigurableMacIT; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.junit.Assert; +import org.junit.Test; + +public class LargeSplitRowIT extends ConfigurableMacIT { + static private final Logger log = Logger.getLogger(LargeSplitRowIT.class); + + @Override + public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setNumTservers(1); + + Map siteConfig = new HashMap(); + siteConfig.put(Property.TSERV_MAJC_DELAY.getKey(), "50ms"); + cfg.setSiteConfig(siteConfig); + } + + // User added split + @Test(timeout = 60 * 1000) + public void userAddedSplit() throws Exception { + + log.info("User added split"); + + // make a table and lower the TABLE_END_ROW_MAX_SIZE property + final String tableName = getUniqueNames(1)[0]; + final Connector conn = getConnector(); + conn.tableOperations().create(tableName); + conn.tableOperations().setProperty(tableName, Property.TABLE_MAX_END_ROW_SIZE.getKey(), "1000"); + + // Create a BatchWriter and add a mutation to the table + BatchWriter batchWriter = conn.createBatchWriter(tableName, new BatchWriterConfig()); + Mutation m = new Mutation("Row"); + m.put("cf", "cq", "value"); + batchWriter.addMutation(m); + batchWriter.close(); + + // Create a split point that is too large to be an end row and fill it with all 'm' + SortedSet partitionKeys = new TreeSet(); + byte data[] = new byte[(int) (TableConfiguration.getMemoryInBytes(Property.TABLE_MAX_END_ROW_SIZE.getDefaultValue()) + 2)]; + for (int i = 0; i < data.length; i++) { + data[i] = 'm'; + } + partitionKeys.add(new Text(data)); + + // try to add the split point that is too large, if the split point is created the test fails. + try { + conn.tableOperations().addSplits(tableName, partitionKeys); + Assert.fail(); + } catch (AccumuloServerException e) {} + + // Make sure that the information that was written to the table before we tried to add the split point is still correct + int counter = 0; + final Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY); + for (Entry entry : scanner) { + counter++; + Key k = entry.getKey(); + Assert.assertEquals("Row", k.getRow().toString()); + Assert.assertEquals("cf", k.getColumnFamily().toString()); + Assert.assertEquals("cq", k.getColumnQualifier().toString()); + Assert.assertEquals("value", entry.getValue().toString()); + + } + // Make sure there is only one line in the table + Assert.assertEquals(1, counter); + } + + // Test tablet server split with 250 entries with all the same prefix + @Test(timeout = 60 * 1000) + public void automaticSplitWith250Same() throws Exception { + log.info("Automatic with 250 with same prefix"); + + // make a table and lower the configure properties + final String tableName = getUniqueNames(1)[0]; + final Connector conn = getConnector(); + conn.tableOperations().create(tableName); + conn.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K"); + conn.tableOperations().setProperty(tableName, Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "none"); + conn.tableOperations().setProperty(tableName, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "64"); + conn.tableOperations().setProperty(tableName, Property.TABLE_MAX_END_ROW_SIZE.getKey(), "1000"); + + // Create a BatchWriter and key for a table entry that is longer than the allowed size for an end row + // Fill this key with all m's except the last spot + BatchWriter batchWriter = conn.createBatchWriter(tableName, new BatchWriterConfig()); + byte data[] = new byte[(int) (TableConfiguration.getMemoryInBytes(Property.TABLE_MAX_END_ROW_SIZE.getDefaultValue()) + 2)]; + for (int i = 0; i < data.length - 1; i++) { + data[i] = (byte) 'm'; + } + + // Make the last place in the key different for every entry added to the table + for (int i = 0; i < 250; i++) { + data[data.length - 1] = (byte) i; + Mutation m = new Mutation(data); + m.put("cf", "cq", "value"); + batchWriter.addMutation(m); + } + // Flush the BatchWriter and table and sleep for a bit to make sure that there is enough time for the table to split if need be. + batchWriter.close(); + conn.tableOperations().flush(tableName, new Text(), new Text("z"), true); + Thread.sleep(500); + + // Make sure all the data that was put in the table is still correct + int count = 0; + final Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY); + for (Entry entry : scanner) { + Key k = entry.getKey(); + data[data.length - 1] = (byte) count; + String expected = new String(data, StandardCharsets.UTF_8); + Assert.assertEquals(expected, k.getRow().toString()); + Assert.assertEquals("cf", k.getColumnFamily().toString()); + Assert.assertEquals("cq", k.getColumnQualifier().toString()); + Assert.assertEquals("value", entry.getValue().toString()); + count++; + } + Assert.assertEquals(250, count); + + // Make sure no splits occurred in the table + Assert.assertEquals(0, conn.tableOperations().listSplits(tableName).size()); + } + + // 10 0's; 10 2's; 10 4's... 10 30's etc + @Test(timeout = 60 * 1000) + public void automaticSplitWithGaps() throws Exception { + log.info("Automatic Split With Gaps"); + + automaticSplit(30, 2); + } + + // 10 0's; 10 1's; 10 2's... 10 15's etc + @Test(timeout = 60 * 1000) + public void automaticSplitWithoutGaps() throws Exception { + log.info("Automatic Split Without Gaps"); + + automaticSplit(15, 1); + } + + @Test(timeout = 60 * 1000) + public void automaticSplitLater() throws Exception { + log.info("Split later"); + automaticSplit(15, 1); + + final Connector conn = getConnector(); + + String tableName = new String(); + java.util.Iterator iterator = conn.tableOperations().list().iterator(); + + while (iterator.hasNext()) { + String curr = iterator.next(); + if (!curr.equals("accumulo.metadata") && !curr.equals("accumulo.root")) { + tableName = curr; + } + } + + // Create a BatchWriter and key for a table entry that is longer than the allowed size for an end row + BatchWriter batchWriter = conn.createBatchWriter(tableName, new BatchWriterConfig()); + byte data[] = new byte[10]; + + // Fill key with all j's except for last spot which alternates through 1 through 10 for every j value + for (int j = 15; j < 150; j += 1) { + for (int i = 0; i < data.length - 1; i++) { + data[i] = (byte) j; + } + + for (int i = 0; i < 25; i++) { + data[data.length - 1] = (byte) i; + Mutation m = new Mutation(data); + m.put("cf", "cq", "value"); + batchWriter.addMutation(m); + } + } + // Flush the BatchWriter and table and sleep for a bit to make sure that there is enough time for the table to split if need be. + batchWriter.close(); + conn.tableOperations().flush(tableName, new Text(), new Text("z"), true); + + // Make sure a split occurs + while (conn.tableOperations().listSplits(tableName).size() == 0) { + Thread.sleep(250); + } + + Assert.assertTrue(0 < conn.tableOperations().listSplits(tableName).size()); + } + + private void automaticSplit(int max, int spacing) throws Exception { + // make a table and lower the configure properties + final String tableName = getUniqueNames(1)[0]; + final Connector conn = getConnector(); + conn.tableOperations().create(tableName); + conn.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "10K"); + conn.tableOperations().setProperty(tableName, Property.TABLE_FILE_COMPRESSION_TYPE.getKey(), "none"); + conn.tableOperations().setProperty(tableName, Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "64"); + conn.tableOperations().setProperty(tableName, Property.TABLE_MAX_END_ROW_SIZE.getKey(), "1000"); + + // Create a BatchWriter and key for a table entry that is longer than the allowed size for an end row + BatchWriter batchWriter = conn.createBatchWriter(tableName, new BatchWriterConfig()); + byte data[] = new byte[(int) (TableConfiguration.getMemoryInBytes(Property.TABLE_MAX_END_ROW_SIZE.getDefaultValue()) + 2)]; + + // Fill key with all j's except for last spot which alternates through 1 through 10 for every j value + for (int j = 0; j < max; j += spacing) { + for (int i = 0; i < data.length - 1; i++) { + data[i] = (byte) j; + } + + for (int i = 0; i < 10; i++) { + data[data.length - 1] = (byte) i; + Mutation m = new Mutation(data); + m.put("cf", "cq", "value"); + batchWriter.addMutation(m); + } + } + // Flush the BatchWriter and table and sleep for a bit to make sure that there is enough time for the table to split if need be. + batchWriter.close(); + conn.tableOperations().flush(tableName, new Text(), new Text("z"), true); + Thread.sleep(500); + + // Make sure all the data that was put in the table is still correct + int count = 0; + int extra = 10; + final Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY); + for (Entry entry : scanner) { + if (extra == 10) { + extra = 0; + for (int i = 0; i < data.length - 1; i++) { + data[i] = (byte) count; + } + count += spacing; + + } + Key k = entry.getKey(); + data[data.length - 1] = (byte) extra; + String expected = new String(data, StandardCharsets.UTF_8); + Assert.assertEquals(expected, k.getRow().toString()); + Assert.assertEquals("cf", k.getColumnFamily().toString()); + Assert.assertEquals("cq", k.getColumnQualifier().toString()); + Assert.assertEquals("value", entry.getValue().toString()); + extra++; + } + Assert.assertEquals(10, extra); + Assert.assertEquals(max, count); + + // Make sure no splits occured in the table + Assert.assertEquals(0, conn.tableOperations().listSplits(tableName).size()); + + } + +}