accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [6/6] accumulo git commit: ACCUMULO-1798 Add ability to specify compaction strategy for user specificed compactions.
Date Fri, 05 Dec 2014 03:51:36 GMT
ACCUMULO-1798 Add ability to specify compaction strategy for user specificed compactions.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/2f788f48
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/2f788f48
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/2f788f48

Branch: refs/heads/master
Commit: 2f788f4826d0fd6d96ad04def79320e8479c9108
Parents: 433b6df
Author: Keith Turner <kturner@apache.org>
Authored: Thu Dec 4 22:33:00 2014 -0500
Committer: Keith Turner <kturner@apache.org>
Committed: Thu Dec 4 22:33:00 2014 -0500

----------------------------------------------------------------------
 .../core/client/admin/CompactionConfig.java     |  155 ++
 .../client/admin/CompactionStrategyConfig.java  |   74 +
 .../core/client/admin/TableOperations.java      |   17 +
 .../impl/CompactionStrategyConfigUtil.java      |   98 ++
 .../core/client/impl/TableOperationsImpl.java   |   20 +-
 .../core/client/mock/MockTableOperations.java   |   13 +
 .../client/impl/TableOperationsHelperTest.java  |    5 +
 proxy/src/main/cpp/AccumuloProxy.cpp            | 1479 ++++++++---------
 proxy/src/main/cpp/AccumuloProxy.h              |   25 +-
 .../main/cpp/AccumuloProxy_server.skeleton.cpp  |    2 +-
 proxy/src/main/cpp/proxy_types.cpp              |   99 ++
 proxy/src/main/cpp/proxy_types.h                |   51 +
 .../org/apache/accumulo/proxy/ProxyServer.java  |   18 +-
 .../accumulo/proxy/thrift/AccumuloProxy.java    | 1545 ++++++++++--------
 .../proxy/thrift/CompactionStrategyConfig.java  |  556 +++++++
 proxy/src/main/python/AccumuloProxy-remote      |    8 +-
 proxy/src/main/python/AccumuloProxy.py          |  560 ++++---
 proxy/src/main/python/ttypes.py                 |   82 +
 proxy/src/main/ruby/accumulo_proxy.rb           |   14 +-
 proxy/src/main/ruby/proxy_types.rb              |   18 +
 proxy/src/main/thrift/proxy.thrift              |    9 +-
 .../master/tableOps/CompactionIterators.java    |  106 --
 .../master/tableOps/UserCompactionConfig.java   |  120 ++
 .../accumulo/master/FateServiceHandler.java     |    5 +-
 .../accumulo/master/tableOps/CompactRange.java  |   99 +-
 .../apache/accumulo/tserver/TabletServer.java   |    9 +-
 .../tserver/TabletServerResourceManager.java    |   10 +-
 .../tserver/compaction/CompactionPlan.java      |    3 +-
 .../tserver/compaction/CompactionStrategy.java  |   10 +
 .../compaction/DefaultCompactionStrategy.java   |    5 +-
 .../EverythingCompactionStrategy.java           |   39 +
 .../compaction/SizeLimitCompactionStrategy.java |    4 +-
 .../apache/accumulo/tserver/tablet/Tablet.java  |  134 +-
 .../accumulo/shell/commands/CompactCommand.java |   55 +-
 .../apache/accumulo/proxy/SimpleProxyIT.java    |   47 +-
 .../org/apache/accumulo/test/ShellServerIT.java |   13 +-
 .../accumulo/test/UserCompactionStrategyIT.java |  337 ++++
 .../test/functional/FunctionalTestUtils.java    |   13 +-
 38 files changed, 3834 insertions(+), 2023 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/core/src/main/java/org/apache/accumulo/core/client/admin/CompactionConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/CompactionConfig.java
b/core/src/main/java/org/apache/accumulo/core/client/admin/CompactionConfig.java
new file mode 100644
index 0000000..f59c70b
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/CompactionConfig.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.admin;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.hadoop.io.Text;
+
+/**
+ * This class exist to pass parameters to {@link TableOperations#compact(String, CompactionConfig)}
+ * 
+ * @since 1.7.0
+ */
+
+public class CompactionConfig {
+  private Text start = null;
+  private Text end = null;
+  private boolean flush = true;
+  private boolean wait = true;
+  private List<IteratorSetting> iterators = Collections.emptyList();
+  private CompactionStrategyConfig compactionStrategy = new CompactionStrategyConfig("org.apache.accumulo.tserver.compaction.EverythingCompactionStrategy")
{
+    @Override
+    public CompactionStrategyConfig setOptions(Map<String,String> opts) {
+      throw new UnsupportedOperationException();
+    }
+  };
+
+  /**
+   * @param start
+   *          First tablet to be compacted contains the row after this row, null means the
first tablet in table. The default is null.
+   * @return this
+   */
+
+  public CompactionConfig setStartRow(Text start) {
+    this.start = start;
+    return this;
+  }
+
+  /**
+   * @return The previously set start row. The default is null.
+   */
+  public Text getStartRow() {
+    return start;
+  }
+
+  /**
+   * 
+   * @param end
+   *          Last tablet to be compacted contains this row, null means the last tablet in
table. The default is null.
+   * @return this
+   */
+  public CompactionConfig setEndRow(Text end) {
+    this.end = end;
+    return this;
+  }
+
+  /**
+   * @return The previously set end row. The default is null.
+   */
+  public Text getEndRow() {
+    return end;
+  }
+
+  /**
+   * @param flush
+   *          If set to true, will flush in memory data of all tablets in range before compacting.
If not set, the default is true.
+   * @return this
+   */
+  public CompactionConfig setFlush(boolean flush) {
+    this.flush = flush;
+    return this;
+  }
+
+  /**
+   * @return The previously set flush. The default is true.
+   */
+  public boolean getFlush() {
+    return flush;
+  }
+
+  /**
+   * @param wait
+   *          If set to true, will cause compact operation to wait for all tablets in range
to compact. If not set, the default is true.
+   * @return this
+   */
+
+  public CompactionConfig setWait(boolean wait) {
+    this.wait = wait;
+    return this;
+  }
+
+  /**
+   * 
+   * @return The previously set wait. The default is true.
+   */
+  public boolean getWait() {
+    return wait;
+  }
+
+  /**
+   * @param iterators
+   *          configures the iterators that will be used when compacting tablets. These iterators
are merged with current iterators configured for the table.
+   * @return this
+   */
+  public CompactionConfig setIterators(List<IteratorSetting> iterators) {
+    this.iterators = new ArrayList<>(iterators);
+    return this;
+  }
+
+  /**
+   * @return The previously set iterators. Returns an empty list if not set. The returned
list is unmodifiable.
+   */
+  public List<IteratorSetting> getIterators() {
+    return Collections.unmodifiableList(iterators);
+  }
+
+  /**
+   * @param csConfig
+   *          configures the strategy that will be used by each tablet to select files. If
no strategy is set, then all files will be compacted.
+   * @return this
+   */
+  public CompactionConfig setCompactionStrategy(CompactionStrategyConfig csConfig) {
+    Preconditions.checkNotNull(csConfig);
+    this.compactionStrategy = csConfig;
+    return this;
+  }
+
+  /**
+   * @return The previously set compaction strategy. Defaults to a configuration of org.apache.accumulo.tserver.compaction.EverythingCompactionStrategy
which
+   *         always compacts all files.
+   */
+  public CompactionStrategyConfig getCompactionStrategy() {
+    return compactionStrategy;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/core/src/main/java/org/apache/accumulo/core/client/admin/CompactionStrategyConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/CompactionStrategyConfig.java
b/core/src/main/java/org/apache/accumulo/core/client/admin/CompactionStrategyConfig.java
new file mode 100644
index 0000000..14b275e
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/CompactionStrategyConfig.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.admin;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * 
+ * @since 1.7.0
+ */
+
+public class CompactionStrategyConfig {
+  private String className;
+  private Map<String,String> options = Collections.emptyMap();
+
+  /**
+   * 
+   * @param className
+   *          The name of a class that implements org.apache.accumulo.tserver.compaction.CompactionStrategy.
This class must be exist on tservers.
+   */
+
+  public CompactionStrategyConfig(String className) {
+    Preconditions.checkNotNull(className);
+    this.className = className;
+  }
+
+  /**
+   * @return the class name passed to the constructor.
+   */
+  public String getClassName() {
+    return className;
+  }
+
+  /**
+   * @param opts
+   *          The options that will be passed to the init() method of the compaction strategy
when its instantiated on a tserver. This method will copy the map.
+   *          The default is an empty map.
+   * @return this
+   */
+
+  public CompactionStrategyConfig setOptions(Map<String,String> opts) {
+    Preconditions.checkNotNull(opts);
+    this.options = new HashMap<>(opts);
+    return this;
+  }
+
+  /**
+   * 
+   * @return The previously set options. Returns an unmodifiable map. The default is an empty
map.
+   */
+
+  public Map<String,String> getOptions() {
+    return Collections.unmodifiableMap(options);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
index 97f538d..75bdf8d 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/TableOperations.java
@@ -289,6 +289,23 @@ public interface TableOperations {
       TableNotFoundException, AccumuloException;
 
   /**
+   * Starts a full major compaction of the tablets in the range (start, end]. If the config
does not specify a compaction strategy, then all files in a tablet
+   * are compacted. The compaction is performed even for tablets that have only one file.
+   * 
+   * <p>
+   * Only one compact call at a time can pass iterators and/or a compaction strategy. If
two threads call compaction with iterators and/or a copmaction
+   * strategy, then one will fail.
+   * 
+   * @param tableName
+   *          the table to compact
+   * @param config
+   *          the configuration to use
+   * 
+   * @since 1.7.0
+   */
+  void compact(String tableName, CompactionConfig config) throws AccumuloSecurityException,
TableNotFoundException, AccumuloException;
+
+  /**
    * Cancels a user initiated major compaction of a table initiated with {@link #compact(String,
Text, Text, boolean, boolean)} or
    * {@link #compact(String, Text, Text, List, boolean, boolean)}. Compactions of tablets
that are currently running may finish, but new compactions of tablets
    * will not start.

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/core/src/main/java/org/apache/accumulo/core/client/impl/CompactionStrategyConfigUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/CompactionStrategyConfigUtil.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/CompactionStrategyConfigUtil.java
new file mode 100644
index 0000000..8dce877
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/CompactionStrategyConfigUtil.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.admin.CompactionStrategyConfig;
+
+public class CompactionStrategyConfigUtil {
+
+  private static final int MAGIC = 0xcc5e6024;
+
+  public static void encode(DataOutput dout, CompactionStrategyConfig csc) throws IOException
{
+
+    dout.writeInt(MAGIC);
+    dout.writeByte(1);
+
+    dout.writeUTF(csc.getClassName());
+    dout.writeInt(csc.getOptions().size());
+
+    for (Entry<String,String> entry : csc.getOptions().entrySet()) {
+      dout.writeUTF(entry.getKey());
+      dout.writeUTF(entry.getValue());
+    }
+
+  }
+
+  public static byte[] encode(CompactionStrategyConfig csc) {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(baos);
+
+    try {
+      encode(dos, csc);
+      dos.close();
+
+      return baos.toByteArray();
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  public static CompactionStrategyConfig decode(DataInput din) throws IOException {
+    if (din.readInt() != MAGIC) {
+      throw new IllegalArgumentException("Unexpected MAGIC ");
+    }
+
+    if (din.readByte() != 1) {
+      throw new IllegalArgumentException("Unexpected version");
+    }
+
+    String classname = din.readUTF();
+    int numEntries = din.readInt();
+
+    HashMap<String,String> opts = new HashMap<>();
+
+    for (int i = 0; i < numEntries; i++) {
+      String k = din.readUTF();
+      String v = din.readUTF();
+      opts.put(k, v);
+    }
+
+    return new CompactionStrategyConfig(classname).setOptions(opts);
+  }
+
+  public static CompactionStrategyConfig decode(byte[] encodedCsc) {
+    ByteArrayInputStream bais = new ByteArrayInputStream(encodedCsc);
+    DataInputStream dis = new DataInputStream(bais);
+
+    try {
+      return decode(dis);
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
index 77b6a01..1def091 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TableOperationsImpl.java
@@ -48,6 +48,8 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipInputStream;
 
+import org.apache.accumulo.core.client.admin.CompactionConfig;
+
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -114,7 +116,6 @@ import org.apache.log4j.Logger;
 import org.apache.thrift.TApplicationException;
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransportException;
-
 import com.google.common.base.Joiner;
 
 public class TableOperationsImpl extends TableOperationsHelper {
@@ -775,20 +776,29 @@ public class TableOperationsImpl extends TableOperationsHelper {
   @Override
   public void compact(String tableName, Text start, Text end, List<IteratorSetting>
iterators, boolean flush, boolean wait) throws AccumuloSecurityException,
       TableNotFoundException, AccumuloException {
+    compact(tableName, new CompactionConfig().setStartRow(start).setEndRow(end).setIterators(iterators).setFlush(flush).setWait(wait));
+  }
+
+  @Override
+  public void compact(String tableName, CompactionConfig config) throws AccumuloSecurityException,
TableNotFoundException, AccumuloException {
     checkArgument(tableName != null, "tableName is null");
     ByteBuffer EMPTY = ByteBuffer.allocate(0);
 
     String tableId = Tables.getTableId(context.getInstance(), tableName);
 
-    if (flush)
+    Text start = config.getStartRow();
+    Text end = config.getEndRow();
+
+    if (config.getFlush())
       _flush(tableId, start, end, true);
 
-    List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.getBytes(UTF_8)),
start == null ? EMPTY : TextUtil.getByteBuffer(start), end == null ? EMPTY
-        : TextUtil.getByteBuffer(end), ByteBuffer.wrap(IteratorUtil.encodeIteratorSettings(iterators)));
+    List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(tableId.getBytes(UTF_8)),
start == null ? EMPTY : TextUtil.getByteBuffer(start),
+        end == null ? EMPTY : TextUtil.getByteBuffer(end), ByteBuffer.wrap(IteratorUtil.encodeIteratorSettings(config.getIterators())),
+        ByteBuffer.wrap(CompactionStrategyConfigUtil.encode(config.getCompactionStrategy())));
 
     Map<String,String> opts = new HashMap<String,String>();
     try {
-      doFateOperation(FateOperation.TABLE_COMPACT, args, opts, wait);
+      doFateOperation(FateOperation.TABLE_COMPACT, args, opts, config.getWait());
     } catch (TableExistsException e) {
       // should not happen
       throw new AssertionError(e);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
b/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
index 59afc8b..f8d2ccd 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
@@ -36,6 +36,7 @@ import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.NamespaceNotFoundException;
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
 import org.apache.accumulo.core.client.admin.DiskUsage;
 import org.apache.accumulo.core.client.admin.FindMax;
 import org.apache.accumulo.core.client.admin.TimeType;
@@ -399,6 +400,18 @@ class MockTableOperations extends TableOperationsHelper {
       TableNotFoundException, AccumuloException {
     if (!exists(tableName))
       throw new TableNotFoundException(tableName, tableName, "");
+
+    if (iterators != null && iterators.size() > 0)
+      throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void compact(String tableName, CompactionConfig config) throws AccumuloSecurityException,
TableNotFoundException, AccumuloException {
+    if (!exists(tableName))
+      throw new TableNotFoundException(tableName, tableName, "");
+
+    if (config.getIterators().size() > 0 || config.getCompactionStrategy() != null)
+      throw new UnsupportedOperationException("Mock does not support iterators or compaction
strategies for compactions");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2f788f48/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java
b/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java
index 02838ed..f7a7395 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java
@@ -28,6 +28,8 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeMap;
 
+import org.apache.accumulo.core.client.admin.CompactionConfig;
+
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.IteratorSetting;
@@ -116,6 +118,9 @@ public class TableOperationsHelperTest {
         TableNotFoundException, AccumuloException {}
 
     @Override
+    public void compact(String tableName, CompactionConfig config) throws AccumuloSecurityException,
TableNotFoundException, AccumuloException {}
+
+    @Override
     public void delete(String tableName) throws AccumuloException, AccumuloSecurityException,
TableNotFoundException {}
 
     @Override


Mime
View raw message