accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [1/2] accumulo git commit: ACCUMULO-2883 Added locating tablets to API
Date Tue, 08 Dec 2015 20:47:04 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master a3b862d82 -> 1bcddcfa0


ACCUMULO-2883 Added locating tablets to API


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/8032d63f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/8032d63f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/8032d63f

Branch: refs/heads/master
Commit: 8032d63fe053dde2ea7da6cfec7f46ed59cc9a90
Parents: a3b862d
Author: Keith Turner <kturner@apache.org>
Authored: Thu Nov 12 23:31:22 2015 -0500
Committer: Keith Turner <kturner@apache.org>
Committed: Tue Dec 8 14:57:21 2015 -0500

----------------------------------------------------------------------
 .../accumulo/core/client/admin/Locations.java   |  49 +++++++
 .../core/client/admin/TableOperations.java      |  14 ++
 .../core/client/impl/TableOperationsImpl.java   | 112 ++++++++++++++++
 .../core/client/mock/MockTableOperations.java   |   6 +
 .../org/apache/accumulo/core/data/TabletId.java |   7 +
 .../accumulo/core/data/impl/TabletIdImpl.java   |   6 +
 .../client/impl/TableOperationsHelperTest.java  |   6 +
 .../org/apache/accumulo/test/LocatorIT.java     | 130 +++++++++++++++++++
 8 files changed, 330 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/8032d63f/core/src/main/java/org/apache/accumulo/core/client/admin/Locations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/Locations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/Locations.java
new file mode 100644
index 0000000..11a0254
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/Locations.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.admin;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TabletId;
+
+/**
+ * A snapshot of metadata information about where a specified set of ranges are located returned
by {@link TableOperations#locate(String, java.util.Collection)}
+ *
+ * @since 1.8.0
+ */
+public interface Locations {
+
+  /**
+   * For all of the ranges passed to {@link TableOperations#locate(String, java.util.Collection)},
return a map of the tablets each range overlaps.
+   */
+  public Map<Range,List<TabletId>> groupByRange();
+
+  /**
+   * For all of the ranges passed to {@link TableOperations#locate(String, java.util.Collection)},
return a map of the ranges each tablet overlaps.
+   */
+  public Map<TabletId,List<Range>> groupByTablet();
+
+  /**
+   * For any TabletId returned by {@link #getOverlappingTablets(Range)}, the method will
return the tablet server location for that tablet.
+   *
+   * @return A tablet server location in the form of {@code <host>:<port>}
+   */
+  public String getTabletLocation(TabletId tabletId);
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8032d63f/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
index fa6fef4..b20de49 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
@@ -30,6 +30,7 @@ import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.security.Authorizations;
@@ -231,6 +232,19 @@ public interface TableOperations {
   Collection<Text> listSplits(String tableName, int maxSplits) throws TableNotFoundException,
AccumuloSecurityException, AccumuloException;
 
   /**
+   * Locates the tablet servers and tablets that would service a collections of ranges. If
a range covers multiple tablets, it will occur multiple times in the
+   * returned map.
+   *
+   * @param ranges
+   *          The input ranges that should be mapped to tablet servers and tablets.
+   *
+   * @throws TableOfflineException
+   *           if the table is offline or goes offline during the operation
+   * @since 1.8.0
+   */
+  Locations locate(String tableName, Collection<Range> ranges) throws AccumuloException,
AccumuloSecurityException, TableNotFoundException;
+
+  /**
    * Finds the max row within a given range. To find the max row in a table, pass null for
start and end row.
    *
    * @param auths

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8032d63f/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
index 8434f2f..b8ca626 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
@@ -66,6 +66,7 @@ import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.core.client.admin.CompactionConfig;
 import org.apache.accumulo.core.client.admin.DiskUsage;
 import org.apache.accumulo.core.client.admin.FindMax;
+import org.apache.accumulo.core.client.admin.Locations;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.client.admin.SamplerConfiguration;
 import org.apache.accumulo.core.client.admin.TableOperations;
@@ -83,8 +84,10 @@ import org.apache.accumulo.core.constraints.Constraint;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TabletId;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.data.impl.TabletIdImpl;
 import org.apache.accumulo.core.iterators.IteratorUtil;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
@@ -109,6 +112,7 @@ import org.apache.accumulo.core.util.OpTimer;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.TextUtil;
 import org.apache.accumulo.core.volume.VolumeConfiguration;
+import org.apache.accumulo.fate.zookeeper.Retry;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -120,6 +124,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
 import com.google.common.net.HostAndPort;
 
 public class TableOperationsImpl extends TableOperationsHelper {
@@ -1513,4 +1518,111 @@ public class TableOperationsImpl extends TableOperationsHelper {
     return sci.toSamplerConfiguration();
   }
 
+  private static class LoctionsImpl implements Locations {
+
+    private Map<Range,List<TabletId>> groupedByRanges;
+    private Map<TabletId,List<Range>> groupedByTablets;
+    private Map<TabletId,String> tabletLocations;
+
+    public LoctionsImpl(Map<String,Map<KeyExtent,List<Range>>> binnedRanges)
{
+      groupedByTablets = new HashMap<>();
+      groupedByRanges = null;
+      tabletLocations = new HashMap<>();
+
+      for (Entry<String,Map<KeyExtent,List<Range>>> entry : binnedRanges.entrySet())
{
+        String location = entry.getKey();
+
+        for (Entry<KeyExtent,List<Range>> entry2 : entry.getValue().entrySet())
{
+          TabletIdImpl tabletId = new TabletIdImpl(entry2.getKey());
+          tabletLocations.put(tabletId, location);
+          List<Range> prev = groupedByTablets.put(tabletId, Collections.unmodifiableList(entry2.getValue()));
+          if (prev != null) {
+            throw new RuntimeException("Unexpected : tablet at multiple locations : " + location
+ " " + tabletId);
+          }
+        }
+      }
+
+      groupedByTablets = Collections.unmodifiableMap(groupedByTablets);
+    }
+
+    @Override
+    public String getTabletLocation(TabletId tabletId) {
+      return tabletLocations.get(tabletId);
+    }
+
+    @Override
+    public Map<Range,List<TabletId>> groupByRange() {
+      if (groupedByRanges == null) {
+        Map<Range,List<TabletId>> tmp = new HashMap<>();
+
+        for (Entry<TabletId,List<Range>> entry : groupedByTablets.entrySet())
{
+          for (Range range : entry.getValue()) {
+            List<TabletId> tablets = tmp.get(range);
+            if (tablets == null) {
+              tablets = new ArrayList<>();
+              tmp.put(range, tablets);
+            }
+
+            tablets.add(entry.getKey());
+          }
+        }
+
+        Map<Range,List<TabletId>> tmp2 = new HashMap<>();
+        for (Entry<Range,List<TabletId>> entry : tmp.entrySet()) {
+          tmp2.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
+        }
+
+        groupedByRanges = Collections.unmodifiableMap(tmp2);
+      }
+
+      return groupedByRanges;
+    }
+
+    @Override
+    public Map<TabletId,List<Range>> groupByTablet() {
+      return groupedByTablets;
+    }
+  }
+
+  @Override
+  public Locations locate(String tableName, Collection<Range> ranges) throws AccumuloException,
AccumuloSecurityException, TableNotFoundException {
+    Preconditions.checkNotNull(tableName, "tableName must be non null");
+    Preconditions.checkNotNull(ranges, "ranges must be non null");
+
+    String tableId = Tables.getTableId(context.getInstance(), tableName);
+    TabletLocator locator = TabletLocator.getLocator(context, new Text(tableId));
+
+    List<Range> rangeList = null;
+    if (ranges instanceof List) {
+      rangeList = (List<Range>) ranges;
+    } else {
+      rangeList = new ArrayList<>(ranges);
+    }
+
+    Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<>();
+
+    locator.invalidateCache();
+
+    Retry retry = new Retry(Long.MAX_VALUE, 100, 100, 2000);
+
+    while (!locator.binRanges(context, rangeList, binnedRanges).isEmpty()) {
+
+      if (!Tables.exists(context.getInstance(), tableId))
+        throw new TableNotFoundException(tableId, tableName, null);
+      if (Tables.getTableState(context.getInstance(), tableId) == TableState.OFFLINE)
+        throw new TableOfflineException(context.getInstance(), tableId);
+
+      binnedRanges.clear();
+
+      try {
+        retry.waitForNextAttempt();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+
+      locator.invalidateCache();
+    }
+
+    return new LoctionsImpl(binnedRanges);
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8032d63f/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
b/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
index 7ca5766..41b4603 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
@@ -39,6 +39,7 @@ import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.admin.CompactionConfig;
 import org.apache.accumulo.core.client.admin.DiskUsage;
 import org.apache.accumulo.core.client.admin.FindMax;
+import org.apache.accumulo.core.client.admin.Locations;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.client.admin.SamplerConfiguration;
 import org.apache.accumulo.core.client.admin.TimeType;
@@ -497,4 +498,9 @@ class MockTableOperations extends TableOperationsHelper {
   public SamplerConfiguration getSamplerConfiguration(String tableName) throws TableNotFoundException,
AccumuloException, AccumuloSecurityException {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public Locations locate(String tableName, Collection<Range> ranges) throws AccumuloException,
AccumuloSecurityException, TableNotFoundException {
+    throw new UnsupportedOperationException();
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8032d63f/core/src/main/java/org/apache/accumulo/core/data/TabletId.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/data/TabletId.java b/core/src/main/java/org/apache/accumulo/core/data/TabletId.java
index 113183d..8680760 100644
--- a/core/src/main/java/org/apache/accumulo/core/data/TabletId.java
+++ b/core/src/main/java/org/apache/accumulo/core/data/TabletId.java
@@ -30,4 +30,11 @@ public interface TabletId extends Comparable<TabletId> {
   public Text getEndRow();
 
   public Text getPrevEndRow();
+
+  /**
+   * @return a range based on the row range of the tablet. The range will cover {@code (<prev
end row>, <end row>]}.
+   * @since 1.8.0
+   */
+  public Range toRange();
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8032d63f/core/src/main/java/org/apache/accumulo/core/data/impl/TabletIdImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/data/impl/TabletIdImpl.java b/core/src/main/java/org/apache/accumulo/core/data/impl/TabletIdImpl.java
index 61e882a..41ff3f5 100644
--- a/core/src/main/java/org/apache/accumulo/core/data/impl/TabletIdImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/data/impl/TabletIdImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.accumulo.core.data.impl;
 
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.TabletId;
 import org.apache.hadoop.io.Text;
 
@@ -97,4 +98,9 @@ public class TabletIdImpl implements TabletId {
   public String toString() {
     return ke.toString();
   }
+
+  @Override
+  public Range toRange() {
+    return ke.toDataRange();
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8032d63f/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java
b/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java
index 7bf9eb1..86857fa 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java
@@ -35,6 +35,7 @@ import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.admin.CompactionConfig;
 import org.apache.accumulo.core.client.admin.DiskUsage;
+import org.apache.accumulo.core.client.admin.Locations;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.client.admin.SamplerConfiguration;
 import org.apache.accumulo.core.client.admin.TimeType;
@@ -243,6 +244,11 @@ public class TableOperationsHelperTest {
     public SamplerConfiguration getSamplerConfiguration(String tableName) throws TableNotFoundException,
AccumuloException, AccumuloSecurityException {
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    public Locations locate(String tableName, Collection<Range> ranges) throws AccumuloException,
AccumuloSecurityException, TableNotFoundException {
+      throw new UnsupportedOperationException();
+    }
   }
 
   protected TableOperationsHelper getHelper() {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/8032d63f/test/src/main/java/org/apache/accumulo/test/LocatorIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/LocatorIT.java b/test/src/main/java/org/apache/accumulo/test/LocatorIT.java
new file mode 100644
index 0000000..193d0d4
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/LocatorIT.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableOfflineException;
+import org.apache.accumulo.core.client.admin.Locations;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TabletId;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.data.impl.TabletIdImpl;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+public class LocatorIT extends AccumuloClusterHarness {
+
+  private void assertContains(Locations locations, HashSet<String> tservers, Map<Range,ImmutableSet<TabletId>>
expected1,
+      Map<TabletId,ImmutableSet<Range>> expected2) {
+
+    Map<Range,Set<TabletId>> gbr = new HashMap<>();
+    for (Entry<Range,List<TabletId>> entry : locations.groupByRange().entrySet())
{
+      gbr.put(entry.getKey(), new HashSet<>(entry.getValue()));
+    }
+
+    Assert.assertEquals(expected1, gbr);
+
+    Map<TabletId,Set<Range>> gbt = new HashMap<>();
+    for (Entry<TabletId,List<Range>> entry : locations.groupByTablet().entrySet())
{
+      gbt.put(entry.getKey(), new HashSet<>(entry.getValue()));
+
+      TabletId tid = entry.getKey();
+      String location = locations.getTabletLocation(tid);
+      Assert.assertNotNull("Location for " + tid + " was null", location);
+      Assert.assertTrue("Unknown location " + location, tservers.contains(location));
+      Assert.assertTrue("Expected <host>:<port> " + location, location.split(":").length
== 2);
+
+    }
+
+    Assert.assertEquals(expected2, gbt);
+  }
+
+  private static TabletId newTabletId(String tableId, String endRow, String prevRow) {
+    return new TabletIdImpl(new KeyExtent(new Text(tableId), endRow == null ? null : new
Text(endRow), prevRow == null ? null : new Text(prevRow)));
+  }
+
+  @Test
+  public void testBasic() throws Exception {
+    Connector conn = getConnector();
+    String tableName = getUniqueNames(1)[0];
+
+    conn.tableOperations().create(tableName);
+
+    Range r1 = new Range("m");
+    Range r2 = new Range("o", "x");
+
+    String tableId = conn.tableOperations().tableIdMap().get(tableName);
+
+    TabletId t1 = newTabletId(tableId, null, null);
+    TabletId t2 = newTabletId(tableId, "r", null);
+    TabletId t3 = newTabletId(tableId, null, "r");
+
+    ArrayList<Range> ranges = new ArrayList<>();
+
+    HashSet<String> tservers = new HashSet<>(conn.instanceOperations().getTabletServers());
+
+    ranges.add(r1);
+    Locations ret = conn.tableOperations().locate(tableName, ranges);
+    assertContains(ret, tservers, ImmutableMap.of(r1, ImmutableSet.of(t1)), ImmutableMap.of(t1,
ImmutableSet.of(r1)));
+
+    ranges.add(r2);
+    ret = conn.tableOperations().locate(tableName, ranges);
+    assertContains(ret, tservers, ImmutableMap.of(r1, ImmutableSet.of(t1), r2, ImmutableSet.of(t1)),
ImmutableMap.of(t1, ImmutableSet.of(r1, r2)));
+
+    TreeSet<Text> splits = new TreeSet<Text>();
+    splits.add(new Text("r"));
+    conn.tableOperations().addSplits(tableName, splits);
+
+    ret = conn.tableOperations().locate(tableName, ranges);
+    assertContains(ret, tservers, ImmutableMap.of(r1, ImmutableSet.of(t2), r2, ImmutableSet.of(t2,
t3)),
+        ImmutableMap.of(t2, ImmutableSet.of(r1, r2), t3, ImmutableSet.of(r2)));
+
+    conn.tableOperations().offline(tableName, true);
+
+    try {
+      conn.tableOperations().locate(tableName, ranges);
+      Assert.fail();
+    } catch (TableOfflineException e) {
+      // expected
+    }
+
+    conn.tableOperations().delete(tableName);
+
+    try {
+      conn.tableOperations().locate(tableName, ranges);
+      Assert.fail();
+    } catch (TableNotFoundException e) {
+      // expected
+    }
+  }
+}


Mime
View raw message