Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 39A3F17337 for ; Thu, 19 Feb 2015 20:51:38 +0000 (UTC) Received: (qmail 41622 invoked by uid 500); 19 Feb 2015 20:51:38 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 41590 invoked by uid 500); 19 Feb 2015 20:51:38 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 41581 invoked by uid 99); 19 Feb 2015 20:51:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Feb 2015 20:51:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 02DE7E042E; Thu, 19 Feb 2015 20:51:38 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ecn@apache.org To: commits@accumulo.apache.org Date: Thu, 19 Feb 2015 20:51:38 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/3] accumulo git commit: ACCUMULO-3601 transfer migration information to the metadata table filtering iterator Repository: accumulo Updated Branches: refs/heads/master c8a30df18 -> d97a6a2b1 ACCUMULO-3601 transfer migration information to the metadata table filtering iterator Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4936b37f Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4936b37f Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4936b37f Branch: refs/heads/master Commit: 4936b37f6dc5bee2ba6c6b41b8f12c7e02a493f2 Parents: 7a570bd Author: Eric Newton Authored: Thu Feb 19 15:38:48 2015 -0500 Committer: Eric Newton Committed: Thu Feb 19 15:38:48 2015 -0500 ---------------------------------------------------------------------- .../server/master/state/CurrentState.java | 3 + .../master/state/MetaDataTableScanner.java | 2 + .../master/state/TabletStateChangeIterator.java | 40 +++++++++ .../java/org/apache/accumulo/master/Master.java | 5 ++ .../apache/accumulo/master/TestMergeState.java | 5 ++ .../org/apache/accumulo/test/BalanceIT.java | 87 ++++++++++++++++++++ .../functional/TabletStateChangeIteratorIT.java | 5 ++ 7 files changed, 147 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/4936b37f/server/base/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java index 501d66a..b07a931 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/CurrentState.java @@ -19,6 +19,8 @@ package org.apache.accumulo.server.master.state; import java.util.Collection; import java.util.Set; +import org.apache.accumulo.core.data.KeyExtent; + public interface CurrentState { Set onlineTables(); @@ -27,4 +29,5 @@ public interface CurrentState { Collection merges(); + Collection migrations(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/4936b37f/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java index 025f082..8adce32 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java @@ -86,6 +86,7 @@ public class MetaDataTableScanner implements ClosableIterator current; Set onlineTables; Map merges; boolean debug = false; + Set migrations; @Override public void init(SortedKeyValueIterator source, Map options, IteratorEnvironment env) throws IOException { @@ -64,6 +67,26 @@ public class TabletStateChangeIterator extends SkippingIterator { onlineTables = parseTables(options.get(TABLES_OPTION)); merges = parseMerges(options.get(MERGES_OPTION)); debug = options.containsKey(DEBUG_OPTION); + migrations = parseMigrations(options.get(MIGRATIONS_OPTION)); + } + + private Set parseMigrations(String migrations) { + if (migrations == null) + return Collections.emptySet(); + try { + Set result = new HashSet(); + DataInputBuffer buffer = new DataInputBuffer(); + byte[] data = Base64.decodeBase64(migrations.getBytes(UTF_8)); + buffer.reset(data, data.length); + while (buffer.available() > 0) { + KeyExtent extent = new KeyExtent(); + extent.readFields(buffer); + result.add(extent); + } + return result; + } catch (Exception ex) { + throw new RuntimeException(ex); + } } private Set parseTables(String tables) { @@ -136,6 +159,10 @@ public class TabletStateChangeIterator extends SkippingIterator { // could make this smarter by only returning if the tablet is involved in the merge return; } + // always return the informatin for migrating tablets + if (migrations.contains(tls.extent)) { + return; + } // is the table supposed to be online or offline? boolean shouldBeOnline = onlineTables.contains(tls.extent.getTableId().toString()); @@ -200,4 +227,17 @@ public class TabletStateChangeIterator extends SkippingIterator { cfg.addOption(MERGES_OPTION, encoded); } + public static void setMigrations(IteratorSetting cfg, Collection migrations) { + DataOutputBuffer buffer = new DataOutputBuffer(); + try { + for (KeyExtent extent : migrations) { + extent.write(buffer); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + String encoded = Base64.encodeBase64String(Arrays.copyOf(buffer.getData(), buffer.getLength())); + cfg.addOption(MIGRATIONS_OPTION, encoded); + } + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/4936b37f/server/master/src/main/java/org/apache/accumulo/master/Master.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java index 2989608..bc552bc 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@ -1325,4 +1325,9 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt } } } + + @Override + public Collection migrations() { + return migrations.keySet(); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/4936b37f/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java index eddbe15..b84df2b 100644 --- a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java +++ b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java @@ -80,6 +80,11 @@ public class TestMergeState { public Collection merges() { return Collections.singleton(mergeInfo); } + + @Override + public Collection migrations() { + return Collections.emptyList(); + } } private static void update(Connector c, Mutation m) throws TableNotFoundException, MutationsRejectedException { http://git-wip-us.apache.org/repos/asf/accumulo/blob/4936b37f/test/src/test/java/org/apache/accumulo/test/BalanceIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/BalanceIT.java b/test/src/test/java/org/apache/accumulo/test/BalanceIT.java new file mode 100644 index 0000000..7c8e452 --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/BalanceIT.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.fate.util.UtilWaitThread; +import org.apache.accumulo.test.functional.ConfigurableMacIT; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +public class BalanceIT extends ConfigurableMacIT { + + @Test(timeout = 60 * 1000) + public void testBalance() throws Exception { + String tableName = getUniqueNames(1)[0]; + Connector c = getConnector(); + System.out.println("Creating table"); + c.tableOperations().create(tableName); + SortedSet splits = new TreeSet(); + for (int i = 0; i < 10; i++) { + splits.add(new Text("" + i)); + } + System.out.println("Adding splits"); + c.tableOperations().addSplits(tableName, splits); + System.out.println("Waiting for balance"); + waitForBalance(c); + } + + private void waitForBalance(Connector c) throws Exception { + while (!isBalanced(c)) { + UtilWaitThread.sleep(1000); + } + } + + private boolean isBalanced(Connector c) throws Exception { + Map counts = new HashMap(); + Scanner scanner = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + scanner.setRange(MetadataSchema.TabletsSection.getRange()); + scanner.fetchColumnFamily(MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME); + for (Entry entry : scanner) { + String host = entry.getKey().getColumnQualifier().toString(); + Integer count = counts.get(host); + if (count == null) { + count = new Integer(0); + } + counts.put(host, count.intValue() + 1); + } + if (counts.size() < 2) { + return false; + } + Iterator iter = counts.values().iterator(); + int initial = iter.next().intValue(); + while (iter.hasNext()) { + if (Math.abs(iter.next().intValue() - initial) > 2) + return false; + } + return true; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/4936b37f/test/src/test/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java b/test/src/test/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java index ffd7636..3adf6c0 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/TabletStateChangeIteratorIT.java @@ -165,6 +165,11 @@ public class TabletStateChangeIteratorIT extends SharedMiniClusterIT { public Collection merges() { return Collections.emptySet(); } + + @Override + public Collection migrations() { + return Collections.emptyList(); + } } }