accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject [3/3] git commit: ACCUMULO-1451 make output file parameters configurable; more test stabilization
Date Tue, 22 Oct 2013 16:25:45 GMT
ACCUMULO-1451 make output file parameters configurable; more test stabilization


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/bcaefcd2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/bcaefcd2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/bcaefcd2

Branch: refs/heads/master
Commit: bcaefcd2ed4f8807ca6a042580ca17da69f4d60d
Parents: 9b29eea
Author: Eric Newton <eric.newton@gmail.com>
Authored: Tue Oct 22 12:25:58 2013 -0400
Committer: Eric Newton <eric.newton@gmail.com>
Committed: Tue Oct 22 12:25:58 2013 -0400

----------------------------------------------------------------------
 .../accumulo/server/tabletserver/Compactor.java |   8 +-
 .../accumulo/server/tabletserver/Tablet.java    |  28 ++++-
 .../tabletserver/compaction/CompactionPlan.java |  17 ++-
 .../compaction/WriteParameters.java             |  56 +++++++++
 .../test/ConfigurableMajorCompactionIT.java     | 122 +++++++++++++++++++
 .../org/apache/accumulo/test/ShellServerIT.java |   2 +
 .../test/TestConfigurableMajorCompactionIT.java | 110 -----------------
 7 files changed, 224 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/bcaefcd2/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
index 5751f1e..fb66661 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/Compactor.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.KeyExtent;
@@ -55,7 +56,6 @@ import org.apache.accumulo.core.tabletserver.thrift.CompactionReason;
 import org.apache.accumulo.core.tabletserver.thrift.CompactionType;
 import org.apache.accumulo.core.util.LocalityGroupUtil;
 import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
-import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.problems.ProblemReport;
@@ -138,7 +138,7 @@ public class Compactor implements Callable<CompactionStats> {
   private InMemoryMap imm;
   private FileRef outputFile;
   private boolean propogateDeletes;
-  private TableConfiguration acuTableConf;
+  private AccumuloConfiguration acuTableConf;
   private CompactionEnv env;
   private Configuration conf;
   private VolumeManager fs;
@@ -285,7 +285,7 @@ public class Compactor implements Callable<CompactionStats> {
   }
 
   Compactor(Configuration conf, VolumeManager fs, Map<FileRef,DataFileValue> files,
InMemoryMap imm, FileRef outputFile, boolean propogateDeletes,
-      TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env, List<IteratorSetting>
iterators, MajorCompactionReason reason) {
+      AccumuloConfiguration acuTableConf, KeyExtent extent, CompactionEnv env, List<IteratorSetting>
iterators, MajorCompactionReason reason) {
     this.extent = extent;
     this.conf = conf;
     this.fs = fs;
@@ -302,7 +302,7 @@ public class Compactor implements Callable<CompactionStats> {
   }
   
   Compactor(Configuration conf, VolumeManager fs, Map<FileRef,DataFileValue> files,
InMemoryMap imm, FileRef outputFile, boolean propogateDeletes,
-      TableConfiguration acuTableConf, KeyExtent extent, CompactionEnv env) {
+      AccumuloConfiguration acuTableConf, KeyExtent extent, CompactionEnv env) {
     this(conf, fs, files, imm, outputFile, propogateDeletes, acuTableConf, extent, env, new
ArrayList<IteratorSetting>(), null);
   }
   

http://git-wip-us.apache.org/repos/asf/accumulo/blob/bcaefcd2/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
index bf835a2..fc54cab 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/Tablet.java
@@ -47,6 +47,7 @@ import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.impl.ScannerImpl;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
 import org.apache.accumulo.core.conf.ConfigurationObserver;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.constraints.Violations;
@@ -121,6 +122,7 @@ import org.apache.accumulo.server.tabletserver.compaction.CompactionStrategy;
 import org.apache.accumulo.server.tabletserver.compaction.DefaultCompactionStrategy;
 import org.apache.accumulo.server.tabletserver.compaction.MajorCompactionReason;
 import org.apache.accumulo.server.tabletserver.compaction.MajorCompactionRequest;
+import org.apache.accumulo.server.tabletserver.compaction.WriteParameters;
 import org.apache.accumulo.server.tabletserver.log.DfsLogger;
 import org.apache.accumulo.server.tabletserver.log.MutationReceiver;
 import org.apache.accumulo.server.tabletserver.mastermessage.TabletStatusMessage;
@@ -3111,8 +3113,8 @@ public class Tablet {
     MajorCompactionRequest request = new MajorCompactionRequest(extent, reason, fs, acuTableConf);
     request.setFiles(datafileManager.getDatafileSizes());
     strategy.gatherInformation(request);
-    Map<FileRef,Pair<Key,Key>> firstAndLastKeys = null;
     
+    Map<FileRef,Pair<Key,Key>> firstAndLastKeys = null;
     if (reason == MajorCompactionReason.CHOP) {
         firstAndLastKeys = getFirstAndLastKeys(request);
     }
@@ -3227,6 +3229,8 @@ public class Tablet {
         FileRef fileName = getNextMapFilename((filesToCompact.size() == 0 && !propogateDeletes)
? "A" : "C");
         FileRef compactTmpName = new FileRef(fileName.path().toString() + "_tmp");
         
+        AccumuloConfiguration tableConf = createTableConfiguration(acuTableConf, plan);
+        
         Span span = Trace.start("compactFiles");
         try {
           
@@ -3252,7 +3256,7 @@ public class Tablet {
 
           // always propagate deletes, unless last batch
           boolean lastBatch = filesToCompact.isEmpty();
-          Compactor compactor = new Compactor(conf, fs, copy, null, compactTmpName, lastBatch
? propogateDeletes : true, acuTableConf, extent,
+          Compactor compactor = new Compactor(conf, fs, copy, null, compactTmpName, lastBatch
? propogateDeletes : true, tableConf, extent,
               cenv, compactionIterators, reason);
           
           CompactionStats mcs = compactor.call();
@@ -3262,7 +3266,7 @@ public class Tablet {
           span.data("written", "" + mcs.getEntriesWritten());
           majCStats.add(mcs);
           
-          if (lastBatch) {
+          if (lastBatch && plan != null && plan.deleteFiles != null) {
             smallestFiles.addAll(plan.deleteFiles);
           }
           datafileManager.bringMajorCompactionOnline(smallestFiles, compactTmpName, fileName,
@@ -3287,6 +3291,24 @@ public class Tablet {
     }
   }
   
+  private AccumuloConfiguration createTableConfiguration(TableConfiguration base, CompactionPlan
plan) {
+    if (plan == null || plan.writeParameters == null)
+      return base;
+    WriteParameters p = plan.writeParameters;
+    ConfigurationCopy result = new ConfigurationCopy(base);
+    if (p.getHdfsBlockSize() > 0)
+      result.set(Property.TABLE_FILE_BLOCK_SIZE, "" + p.getHdfsBlockSize());
+    if (p.getBlockSize() > 0)
+      result.set(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE, "" + p.getBlockSize());
+    if (p.getIndexBlockSize() > 0)
+      result.set(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX, "" + p.getBlockSize());
+    if (p.getCompressType() != null)
+      result.set(Property.TABLE_FILE_COMPRESSION_TYPE, p.getCompressType());
+    if (p.getReplication() != 0)
+      result.set(Property.TABLE_FILE_REPLICATION, "" + p.getReplication());
+    return result;
+  }
+
   private Set<FileRef> removeSmallest(Map<FileRef,DataFileValue> filesToCompact,
int maxFilesToCompact) {
     // ensure this method works properly when multiple files have the same size
     

http://git-wip-us.apache.org/repos/asf/accumulo/blob/bcaefcd2/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionPlan.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionPlan.java
b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionPlan.java
index 33f080b..d1e7b90 100644
--- a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionPlan.java
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/CompactionPlan.java
@@ -22,12 +22,13 @@ import java.util.List;
 import org.apache.accumulo.server.fs.FileRef;
 
 /**
- * A plan for a compaction: the input files the files that are *not* inputs to a compaction
that should
- * simply be deleted.
+ * A plan for a compaction: the input files, the files that are *not* inputs to a compaction
that should
+ * simply be deleted, and the optional parameters used to create the resulting output file.
  */
 public class CompactionPlan {
   public final List<FileRef> inputFiles = new ArrayList<FileRef>();
   public final List<FileRef> deleteFiles = new ArrayList<FileRef>();
+  public WriteParameters writeParameters = null;
   
   public String toString() {
     StringBuilder b = new StringBuilder();
@@ -35,6 +36,18 @@ public class CompactionPlan {
     if (!deleteFiles.isEmpty()) { 
       b.append(" files to be deleted ");
       b.append(deleteFiles);
+      if (writeParameters != null) {
+        if (writeParameters.getCompressType() != null)
+          b.append(" compress type " + writeParameters.getCompressType());
+        if (writeParameters.getHdfsBlockSize() != 0)
+          b.append(" hdfs block size " + writeParameters.getHdfsBlockSize());
+        if (writeParameters.getBlockSize() != 0)
+          b.append(" data block size " + writeParameters.getBlockSize());
+        if (writeParameters.getIndexBlockSize() != 0)
+          b.append(" index block size " + writeParameters.getIndexBlockSize());
+        if (writeParameters.getReplication() != 0)
+          b.append(" replication " + writeParameters.getReplication());
+      }
     }
     return b.toString(); 
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/bcaefcd2/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/WriteParameters.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/WriteParameters.java
b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/WriteParameters.java
new file mode 100644
index 0000000..6cb8254
--- /dev/null
+++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/compaction/WriteParameters.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.tabletserver.compaction;
+
+public class WriteParameters {
+  private String compressType = null;
+  private long hdfsBlockSize = 0;
+  private long blockSize = 0;
+  private long indexBlockSize = 0;
+  private int replication = 0;
+
+  public String getCompressType() {
+    return compressType;
+  }
+  public void setCompressType(String compressType) {
+    this.compressType = compressType;
+  }
+  public long getHdfsBlockSize() {
+    return hdfsBlockSize;
+  }
+  public void setHdfsBlockSize(long hdfsBlockSize) {
+    this.hdfsBlockSize = hdfsBlockSize;
+  }
+  public long getBlockSize() {
+    return blockSize;
+  }
+  public void setBlockSize(long blockSize) {
+    this.blockSize = blockSize;
+  }
+  public long getIndexBlockSize() {
+    return indexBlockSize;
+  }
+  public void setIndexBlockSize(long indexBlockSize) {
+    this.indexBlockSize = indexBlockSize;
+  }
+  public int getReplication() {
+    return replication;
+  }
+  public void setReplication(int replication) {
+    this.replication = replication;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/bcaefcd2/test/src/test/java/org/apache/accumulo/test/ConfigurableMajorCompactionIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/ConfigurableMajorCompactionIT.java
b/test/src/test/java/org/apache/accumulo/test/ConfigurableMajorCompactionIT.java
new file mode 100644
index 0000000..195830f
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/ConfigurableMajorCompactionIT.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
+import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.file.rfile.RFile.Reader;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.MiniAccumuloConfig;
+import org.apache.accumulo.server.tabletserver.compaction.CompactionPlan;
+import org.apache.accumulo.server.tabletserver.compaction.CompactionStrategy;
+import org.apache.accumulo.server.tabletserver.compaction.MajorCompactionRequest;
+import org.apache.accumulo.server.tabletserver.compaction.WriteParameters;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+public class ConfigurableMajorCompactionIT extends ConfigurableMacIT {
+  
+  @Override
+  public void configure(MiniAccumuloConfig cfg) {
+    Map<String,String> siteConfig = new HashMap<String, String>();
+    siteConfig.put(Property.TSERV_MAJC_DELAY.getKey(), "1s");
+    cfg.setSiteConfig(siteConfig);
+  }
+
+  public static class TestCompactionStrategy extends CompactionStrategy {
+   
+    @Override
+    public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException
{
+      if (request.getFiles().size() != 5)
+        return null;
+      CompactionPlan plan = new CompactionPlan();
+      plan.inputFiles.addAll(request.getFiles().keySet());
+      plan.writeParameters = new WriteParameters();
+      plan.writeParameters.setBlockSize(1024 * 1024);
+      plan.writeParameters.setCompressType("none");
+      plan.writeParameters.setHdfsBlockSize(1024 * 1024);
+      plan.writeParameters.setIndexBlockSize(10);
+      plan.writeParameters.setReplication(7);
+      return plan;
+    }
+  }
+  
+  @Test(timeout = 20 * 1000)
+  public void test() throws Exception {
+    Connector conn = getConnector();
+    String tableName = "test";
+    conn.tableOperations().create(tableName);
+    conn.tableOperations().setProperty(tableName, Property.TABLE_COMPACTION_STRATEGY.getKey(),
TestCompactionStrategy.class.getName());
+    writeFile(conn, tableName);
+    writeFile(conn, tableName);
+    writeFile(conn, tableName);
+    writeFile(conn, tableName);
+    UtilWaitThread.sleep(2*1000);
+    assertEquals(4, countFiles(conn));
+    writeFile(conn, tableName);
+    int count = countFiles(conn);
+    assertTrue(count == 1 || count == 5);
+    while (count != 1) {
+      UtilWaitThread.sleep(250);
+      count = countFiles(conn);
+    }
+  }
+
+  private int countFiles(Connector conn) throws Exception {
+    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+    s.setRange(MetadataSchema.TabletsSection.getRange());
+    s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
+    int count = 0;
+    for (@SuppressWarnings("unused") Entry<Key,Value> entry : s)
+      count++;
+    return count;
+  }
+
+
+
+  private void writeFile(Connector conn, String tableName) throws Exception {
+    BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+    Mutation m = new Mutation("row");
+    m.put("cf", "cq", "value");
+    bw.addMutation(m);
+    bw.close();
+    conn.tableOperations().flush(tableName, null, null, true);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/bcaefcd2/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
index e77561f..d8cbe8d 100644
--- a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java
@@ -506,6 +506,8 @@ public class ShellServerIT extends SimpleMacIT {
     exec("createtable c -evc");
     exec("constraint -l -t c", true, "VisibilityConstraint=2", true);
     exec("constraint -t c -d 2", true, "Removed constraint 2 from table c");
+    // wait for zookeeper updates to propagate
+    UtilWaitThread.sleep(1000);
     exec("constraint -l -t c", true, "VisibilityConstraint=2", false);
     exec("deletetable -f c");
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/bcaefcd2/test/src/test/java/org/apache/accumulo/test/TestConfigurableMajorCompactionIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/TestConfigurableMajorCompactionIT.java
b/test/src/test/java/org/apache/accumulo/test/TestConfigurableMajorCompactionIT.java
deleted file mode 100644
index 13f81c5..0000000
--- a/test/src/test/java/org/apache/accumulo/test/TestConfigurableMajorCompactionIT.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.accumulo.test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.fate.util.UtilWaitThread;
-import org.apache.accumulo.minicluster.MiniAccumuloConfig;
-import org.apache.accumulo.server.tabletserver.compaction.CompactionPlan;
-import org.apache.accumulo.server.tabletserver.compaction.CompactionStrategy;
-import org.apache.accumulo.server.tabletserver.compaction.MajorCompactionRequest;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.junit.Test;
-
-public class TestConfigurableMajorCompactionIT extends ConfigurableMacIT {
-  
-  @Override
-  public void configure(MiniAccumuloConfig cfg) {
-    Map<String,String> siteConfig = new HashMap<String, String>();
-    siteConfig.put(Property.TSERV_MAJC_DELAY.getKey(), "1s");
-    cfg.setSiteConfig(siteConfig);
-  }
-
-  public static class TestCompactionStrategy implements CompactionStrategy {
-    @Override
-    public void gatherInformation(MajorCompactionRequest request) throws IOException {
-    }
-
-    @Override
-    public CompactionPlan getCompactionPlan(MajorCompactionRequest request) throws IOException
{
-      CompactionPlan plan = new CompactionPlan();
-      plan.inputFiles.addAll(request.getFiles().keySet());
-      return plan;
-    }
-  }
-  
-  @Test(timeout = 20 * 1000)
-  public void test() throws Exception {
-    Connector conn = getConnector();
-    String tableName = "test";
-    conn.tableOperations().create(tableName);
-    conn.tableOperations().setProperty(tableName, Property.TABLE_COMPACTION_STRATEGY.getKey(),
TestCompactionStrategy.class.getName());
-    writeFile(conn, tableName);
-    writeFile(conn, tableName);
-    writeFile(conn, tableName);
-    writeFile(conn, tableName);
-    UtilWaitThread.sleep(2*1000);
-    assertEquals(4, countFiles(conn));
-    writeFile(conn, tableName);
-    int count = countFiles(conn);
-    assertTrue(count == 1 || count == 5);
-    while (count != 1) {
-      UtilWaitThread.sleep(250);
-      count = countFiles(conn);
-    }
-  }
-
-  private int countFiles(Connector conn) throws Exception {
-    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-    s.setRange(MetadataSchema.TabletsSection.getRange());
-    s.fetchColumnFamily(MetadataSchema.TabletsSection.DataFileColumnFamily.NAME);
-    int count = 0;
-    for (@SuppressWarnings("unused") Entry<Key,Value> entry : s)
-      count++;
-    return count;
-  }
-
-
-
-  private void writeFile(Connector conn, String tableName) throws Exception {
-    BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
-    Mutation m = new Mutation("row");
-    m.put("cf", "cq", "value");
-    bw.addMutation(m);
-    bw.close();
-    conn.tableOperations().flush(tableName, null, null, true);
-  }
-  
-}


Mime
View raw message