accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mwa...@apache.org
Subject [3/7] accumulo-examples git commit: ACCUMULO-4511 Adding examples from Accumulo repo
Date Fri, 09 Dec 2016 17:12:16 GMT
http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java
new file mode 100644
index 0000000..0c8d1ae
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.mapreduce.bulk;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Base64;
+import java.util.Collection;
+
+import org.apache.accumulo.core.cli.MapReduceClientOnRequiredTable;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
+import org.apache.accumulo.core.client.mapreduce.lib.partition.RangePartitioner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * Example map reduce job that bulk ingest data into an accumulo table. The expected input is text files containing tab separated key value pairs on each line.
+ */
+public class BulkIngestExample extends Configured implements Tool {
+  public static class MapClass extends Mapper<LongWritable,Text,Text,Text> {
+    private Text outputKey = new Text();
+    private Text outputValue = new Text();
+
+    @Override
+    public void map(LongWritable key, Text value, Context output) throws IOException, InterruptedException {
+      // split on tab
+      int index = -1;
+      for (int i = 0; i < value.getLength(); i++) {
+        if (value.getBytes()[i] == '\t') {
+          index = i;
+          break;
+        }
+      }
+
+      if (index > 0) {
+        outputKey.set(value.getBytes(), 0, index);
+        outputValue.set(value.getBytes(), index + 1, value.getLength() - (index + 1));
+        output.write(outputKey, outputValue);
+      }
+    }
+  }
+
+  public static class ReduceClass extends Reducer<Text,Text,Key,Value> {
+    @Override
+    public void reduce(Text key, Iterable<Text> values, Context output) throws IOException, InterruptedException {
+      // be careful with the timestamp... if you run on a cluster
+      // where the time is whacked you may not see your updates in
+      // accumulo if there is already an existing value with a later
+      // timestamp in accumulo... so make sure ntp is running on the
+      // cluster or consider using logical time... one options is
+      // to let accumulo set the time
+      long timestamp = System.currentTimeMillis();
+
+      int index = 0;
+      for (Text value : values) {
+        Key outputKey = new Key(key, new Text("colf"), new Text(String.format("col_%07d", index)), timestamp);
+        index++;
+
+        Value outputValue = new Value(value.getBytes(), 0, value.getLength());
+        output.write(outputKey, outputValue);
+      }
+    }
+  }
+
+  static class Opts extends MapReduceClientOnRequiredTable {
+    @Parameter(names = "--inputDir", required = true)
+    String inputDir;
+    @Parameter(names = "--workDir", required = true)
+    String workDir;
+  }
+
+  @Override
+  public int run(String[] args) {
+    Opts opts = new Opts();
+    opts.parseArgs(BulkIngestExample.class.getName(), args);
+
+    Configuration conf = getConf();
+    PrintStream out = null;
+    try {
+      Job job = Job.getInstance(conf);
+      job.setJobName("bulk ingest example");
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormatClass(TextInputFormat.class);
+
+      job.setMapperClass(MapClass.class);
+      job.setMapOutputKeyClass(Text.class);
+      job.setMapOutputValueClass(Text.class);
+
+      job.setReducerClass(ReduceClass.class);
+      job.setOutputFormatClass(AccumuloFileOutputFormat.class);
+      opts.setAccumuloConfigs(job);
+
+      Connector connector = opts.getConnector();
+
+      TextInputFormat.setInputPaths(job, new Path(opts.inputDir));
+      AccumuloFileOutputFormat.setOutputPath(job, new Path(opts.workDir + "/files"));
+
+      FileSystem fs = FileSystem.get(conf);
+      out = new PrintStream(new BufferedOutputStream(fs.create(new Path(opts.workDir + "/splits.txt"))));
+
+      Collection<Text> splits = connector.tableOperations().listSplits(opts.getTableName(), 100);
+      for (Text split : splits)
+        out.println(Base64.getEncoder().encodeToString(TextUtil.getBytes(split)));
+
+      job.setNumReduceTasks(splits.size() + 1);
+      out.close();
+
+      job.setPartitionerClass(RangePartitioner.class);
+      RangePartitioner.setSplitFile(job, opts.workDir + "/splits.txt");
+
+      job.waitForCompletion(true);
+      Path failures = new Path(opts.workDir, "failures");
+      fs.delete(failures, true);
+      fs.mkdirs(new Path(opts.workDir, "failures"));
+      // With HDFS permissions on, we need to make sure the Accumulo user can read/move the rfiles
+      FsShell fsShell = new FsShell(conf);
+      fsShell.run(new String[] {"-chmod", "-R", "777", opts.workDir});
+      connector.tableOperations().importDirectory(opts.getTableName(), opts.workDir + "/files", opts.workDir + "/failures", false);
+
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    } finally {
+      if (out != null)
+        out.close();
+    }
+
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new BulkIngestExample(), args);
+    System.exit(res);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/GenerateTestData.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/GenerateTestData.java b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/GenerateTestData.java
new file mode 100644
index 0000000..4622ea0
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/GenerateTestData.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.mapreduce.bulk;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.beust.jcommander.Parameter;
+
+public class GenerateTestData {
+
+  static class Opts extends org.apache.accumulo.core.cli.Help {
+    @Parameter(names = "--start-row", required = true)
+    int startRow = 0;
+    @Parameter(names = "--count", required = true)
+    int numRows = 0;
+    @Parameter(names = "--output", required = true)
+    String outputFile;
+  }
+
+  public static void main(String[] args) throws IOException {
+    Opts opts = new Opts();
+    opts.parseArgs(GenerateTestData.class.getName(), args);
+
+    FileSystem fs = FileSystem.get(new Configuration());
+    PrintStream out = new PrintStream(new BufferedOutputStream(fs.create(new Path(opts.outputFile))));
+
+    for (int i = 0; i < opts.numRows; i++) {
+      out.println(String.format("row_%010d\tvalue_%010d", i + opts.startRow, i + opts.startRow));
+    }
+    out.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java
new file mode 100644
index 0000000..a469fa6
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.mapreduce.bulk;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOnRequiredTable;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.hadoop.io.Text;
+
+import com.beust.jcommander.Parameter;
+
+public class SetupTable {
+
+  static class Opts extends ClientOnRequiredTable {
+    @Parameter(description = "<split> { <split> ... } ")
+    List<String> splits = new ArrayList<>();
+  }
+
+  public static void main(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(SetupTable.class.getName(), args);
+    Connector conn = opts.getConnector();
+    conn.tableOperations().create(opts.getTableName());
+    if (!opts.splits.isEmpty()) {
+      // create a table with initial partitions
+      TreeSet<Text> intialPartitions = new TreeSet<>();
+      for (String split : opts.splits) {
+        intialPartitions.add(new Text(split));
+      }
+      conn.tableOperations().addSplits(opts.getTableName(), intialPartitions);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java
new file mode 100644
index 0000000..fb47eef
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.mapreduce.bulk;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.cli.ClientOnRequiredTable;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+public class VerifyIngest {
+  private static final Logger log = LoggerFactory.getLogger(VerifyIngest.class);
+
+  static class Opts extends ClientOnRequiredTable {
+    @Parameter(names = "--start-row")
+    int startRow = 0;
+    @Parameter(names = "--count", required = true, description = "number of rows to verify")
+    int numRows = 0;
+  }
+
+  public static void main(String[] args) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    Opts opts = new Opts();
+    opts.parseArgs(VerifyIngest.class.getName(), args);
+
+    Connector connector = opts.getConnector();
+    Scanner scanner = connector.createScanner(opts.getTableName(), opts.auths);
+
+    scanner.setRange(new Range(new Text(String.format("row_%010d", opts.startRow)), null));
+
+    Iterator<Entry<Key,Value>> si = scanner.iterator();
+
+    boolean ok = true;
+
+    for (int i = opts.startRow; i < opts.numRows; i++) {
+
+      if (si.hasNext()) {
+        Entry<Key,Value> entry = si.next();
+
+        if (!entry.getKey().getRow().toString().equals(String.format("row_%010d", i))) {
+          log.error("unexpected row key " + entry.getKey().getRow().toString() + " expected " + String.format("row_%010d", i));
+          ok = false;
+        }
+
+        if (!entry.getValue().toString().equals(String.format("value_%010d", i))) {
+          log.error("unexpected value " + entry.getValue().toString() + " expected " + String.format("value_%010d", i));
+          ok = false;
+        }
+
+      } else {
+        log.error("no more rows, expected " + String.format("row_%010d", i));
+        ok = false;
+        break;
+      }
+
+    }
+
+    if (ok) {
+      System.out.println("OK");
+      System.exit(0);
+    } else {
+      System.exit(1);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/reservations/ARS.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/reservations/ARS.java b/src/main/java/org/apache/accumulo/examples/reservations/ARS.java
new file mode 100644
index 0000000..fb0277c
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/reservations/ARS.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.reservations;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriter.Status;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Condition;
+import org.apache.accumulo.core.data.ConditionalMutation;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import jline.console.ConsoleReader;
+
+/**
+ * Accumulo Reservation System : An example reservation system using Accumulo. Supports atomic reservations of a resource at a date. Wait list are also
+ * supported. In order to keep the example simple, no checking is done of the date. Also the code is inefficient, if interested in improving it take a look at
+ * the EXCERCISE comments.
+ */
+
+// EXCERCISE create a test that verifies correctness under concurrency. For example, have M threads making reservations against N resources. Each thread could
+// randomly reserve and cancel resources for a single user. When each thread finishes, it knows what the state of its single user should be. When all threads
+// finish, collect their expected state and verify the status of all users and resources. For extra credit run the test on a IAAS provider using 10 nodes and
+// 10 threads per node.
+
+public class ARS {
+
+  private static final Logger log = LoggerFactory.getLogger(ARS.class);
+
+  private Connector conn;
+  private String rTable;
+
+  public enum ReservationResult {
+    RESERVED, WAIT_LISTED
+  }
+
+  public ARS(Connector conn, String rTable) {
+    this.conn = conn;
+    this.rTable = rTable;
+  }
+
+  public List<String> setCapacity(String what, String when, int count) {
+    // EXCERCISE implement this method which atomically sets a capacity and returns anyone who was moved to the wait list if the capacity was decreased
+
+    throw new UnsupportedOperationException();
+  }
+
+  public ReservationResult reserve(String what, String when, String who) throws Exception {
+
+    String row = what + ":" + when;
+
+    // EXCERCISE This code assumes there is no reservation and tries to create one. If a reservation exist then the update will fail. This is a good strategy
+    // when it is expected there are usually no reservations. Could modify the code to scan first.
+
+    // The following mutation requires that the column tx:seq does not exist and will fail if it does.
+    ConditionalMutation update = new ConditionalMutation(row, new Condition("tx", "seq"));
+    update.put("tx", "seq", "0");
+    update.put("res", String.format("%04d", 0), who);
+
+    ReservationResult result = ReservationResult.RESERVED;
+
+    // it is important to use an isolated scanner so that only whole mutations are seen
+    try (ConditionalWriter cwriter = conn.createConditionalWriter(rTable, new ConditionalWriterConfig());
+        Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY))) {
+      while (true) {
+        Status status = cwriter.write(update).getStatus();
+        switch (status) {
+          case ACCEPTED:
+            return result;
+          case REJECTED:
+          case UNKNOWN:
+            // read the row and decide what to do
+            break;
+          default:
+            throw new RuntimeException("Unexpected status " + status);
+        }
+
+        // EXCERCISE in the case of many threads trying to reserve a slot, this approach of immediately retrying is inefficient. Exponential back-off is good
+        // general solution to solve contention problems like this. However in this particular case, exponential back-off could penalize the earliest threads
+        // that attempted to make a reservation by putting them later in the list. A more complex solution could involve having independent sub-queues within
+        // the row that approximately maintain arrival order and use exponential back off to fairly merge the sub-queues into the main queue.
+
+        scanner.setRange(new Range(row));
+
+        int seq = -1;
+        int maxReservation = -1;
+
+        for (Entry<Key,Value> entry : scanner) {
+          String cf = entry.getKey().getColumnFamilyData().toString();
+          String cq = entry.getKey().getColumnQualifierData().toString();
+          String val = entry.getValue().toString();
+
+          if (cf.equals("tx") && cq.equals("seq")) {
+            seq = Integer.parseInt(val);
+          } else if (cf.equals("res")) {
+            // EXCERCISE scanning the entire list to find if reserver is already in the list is inefficient. One possible way to solve this would be to sort the
+            // data differently in Accumulo so that finding the reserver could be done quickly.
+            if (val.equals(who))
+              if (maxReservation == -1)
+                return ReservationResult.RESERVED; // already have the first reservation
+              else
+                return ReservationResult.WAIT_LISTED; // already on wait list
+
+            // EXCERCISE the way this code finds the max reservation is very inefficient.... it would be better if it did not have to scan the entire row.
+            // One possibility is to just use the sequence number. Could also consider sorting the data in another way and/or using an iterator.
+            maxReservation = Integer.parseInt(cq);
+          }
+        }
+
+        Condition condition = new Condition("tx", "seq");
+        if (seq >= 0)
+          condition.setValue(seq + ""); // only expect a seq # if one was seen
+
+        update = new ConditionalMutation(row, condition);
+        update.put("tx", "seq", (seq + 1) + "");
+        update.put("res", String.format("%04d", maxReservation + 1), who);
+
+        // EXCERCISE if set capacity is implemented, then result should take capacity into account
+        if (maxReservation == -1)
+          result = ReservationResult.RESERVED; // if successful, will be first reservation
+        else
+          result = ReservationResult.WAIT_LISTED;
+      }
+    }
+  }
+
+  public void cancel(String what, String when, String who) throws Exception {
+
+    String row = what + ":" + when;
+
+    // Even though this method is only deleting a column, its important to use a conditional writer. By updating the seq # when deleting a reservation, it
+    // will cause any concurrent reservations to retry. If this delete were done using a batch writer, then a concurrent reservation could report WAIT_LISTED
+    // when it actually got the reservation.
+
+    // its important to use an isolated scanner so that only whole mutations are seen
+    try (ConditionalWriter cwriter = conn.createConditionalWriter(rTable, new ConditionalWriterConfig());
+        Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY))) {
+      while (true) {
+        scanner.setRange(new Range(row));
+
+        int seq = -1;
+        String reservation = null;
+
+        for (Entry<Key,Value> entry : scanner) {
+          String cf = entry.getKey().getColumnFamilyData().toString();
+          String cq = entry.getKey().getColumnQualifierData().toString();
+          String val = entry.getValue().toString();
+
+          // EXCERCISE avoid linear scan
+
+          if (cf.equals("tx") && cq.equals("seq")) {
+            seq = Integer.parseInt(val);
+          } else if (cf.equals("res") && val.equals(who)) {
+            reservation = cq;
+          }
+        }
+
+        if (reservation != null) {
+          ConditionalMutation update = new ConditionalMutation(row, new Condition("tx", "seq").setValue(seq + ""));
+          update.putDelete("res", reservation);
+          update.put("tx", "seq", (seq + 1) + "");
+
+          Status status = cwriter.write(update).getStatus();
+          switch (status) {
+            case ACCEPTED:
+              // successfully canceled reservation
+              return;
+            case REJECTED:
+            case UNKNOWN:
+              // retry
+              // EXCERCISE exponential back-off could be used here
+              break;
+            default:
+              throw new RuntimeException("Unexpected status " + status);
+          }
+
+        } else {
+          // not reserved, nothing to do
+          break;
+        }
+
+      }
+    }
+  }
+
+  public List<String> list(String what, String when) throws Exception {
+    String row = what + ":" + when;
+
+    // its important to use an isolated scanner so that only whole mutations are seen
+    try (Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY))) {
+      scanner.setRange(new Range(row));
+      scanner.fetchColumnFamily(new Text("res"));
+
+      List<String> reservations = new ArrayList<>();
+
+      for (Entry<Key,Value> entry : scanner) {
+        String val = entry.getValue().toString();
+        reservations.add(val);
+      }
+
+      return reservations;
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    final ConsoleReader reader = new ConsoleReader();
+    ARS ars = null;
+
+    while (true) {
+      String line = reader.readLine(">");
+      if (line == null)
+        break;
+
+      final String[] tokens = line.split("\\s+");
+
+      if (tokens[0].equals("reserve") && tokens.length >= 4 && ars != null) {
+        // start up multiple threads all trying to reserve the same resource, no more than one should succeed
+
+        final ARS fars = ars;
+        ArrayList<Thread> threads = new ArrayList<>();
+        for (int i = 3; i < tokens.length; i++) {
+          final int whoIndex = i;
+          Runnable reservationTask = new Runnable() {
+            @Override
+            public void run() {
+              try {
+                reader.println("  " + String.format("%20s", tokens[whoIndex]) + " : " + fars.reserve(tokens[1], tokens[2], tokens[whoIndex]));
+              } catch (Exception e) {
+                log.warn("Could not write to the ConsoleReader.", e);
+              }
+            }
+          };
+
+          threads.add(new Thread(reservationTask));
+        }
+
+        for (Thread thread : threads)
+          thread.start();
+
+        for (Thread thread : threads)
+          thread.join();
+
+      } else if (tokens[0].equals("cancel") && tokens.length == 4 && ars != null) {
+        ars.cancel(tokens[1], tokens[2], tokens[3]);
+      } else if (tokens[0].equals("list") && tokens.length == 3 && ars != null) {
+        List<String> reservations = ars.list(tokens[1], tokens[2]);
+        if (reservations.size() > 0) {
+          reader.println("  Reservation holder : " + reservations.get(0));
+          if (reservations.size() > 1)
+            reader.println("  Wait list : " + reservations.subList(1, reservations.size()));
+        }
+      } else if (tokens[0].equals("quit") && tokens.length == 1) {
+        break;
+      } else if (tokens[0].equals("connect") && tokens.length == 6 && ars == null) {
+        ZooKeeperInstance zki = new ZooKeeperInstance(new ClientConfiguration().withInstance(tokens[1]).withZkHosts(tokens[2]));
+        Connector conn = zki.getConnector(tokens[3], new PasswordToken(tokens[4]));
+        if (conn.tableOperations().exists(tokens[5])) {
+          ars = new ARS(conn, tokens[5]);
+          reader.println("  connected");
+        } else
+          reader.println("  No Such Table");
+      } else {
+        System.out.println("  Commands : ");
+        if (ars == null) {
+          reader.println("    connect <instance> <zookeepers> <user> <pass> <table>");
+        } else {
+          reader.println("    reserve <what> <when> <who> {who}");
+          reader.println("    cancel <what> <when> <who>");
+          reader.println("    list <what> <when>");
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/sample/SampleExample.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/sample/SampleExample.java b/src/main/java/org/apache/accumulo/examples/sample/SampleExample.java
new file mode 100644
index 0000000..608607e
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/sample/SampleExample.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.examples.sample;
+
+import java.util.Collections;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ClientOnDefaultTable;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.SampleNotPresentException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.client.admin.CompactionStrategyConfig;
+import org.apache.accumulo.core.client.sample.RowSampler;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.examples.client.RandomBatchWriter;
+import org.apache.accumulo.examples.shard.CutoffIntersectingIterator;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * A simple example of using Accumulo's sampling feature. This example does something similar to what README.sample shows using the shell. Also see
+ * {@link CutoffIntersectingIterator} and README.sample for an example of how to use sample data from within an iterator.
+ */
+public class SampleExample {
+
+  // a compaction strategy that only selects files for compaction that have no sample data or sample data created in a different way than the tables
+  static final CompactionStrategyConfig NO_SAMPLE_STRATEGY = new CompactionStrategyConfig(
+      "org.apache.accumulo.tserver.compaction.strategies.ConfigurableCompactionStrategy").setOptions(Collections.singletonMap("SF_NO_SAMPLE", ""));
+
+  static class Opts extends ClientOnDefaultTable {
+    public Opts() {
+      super("sampex");
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    Opts opts = new Opts();
+    BatchWriterOpts bwOpts = new BatchWriterOpts();
+    opts.parseArgs(RandomBatchWriter.class.getName(), args, bwOpts);
+
+    Connector conn = opts.getConnector();
+
+    if (!conn.tableOperations().exists(opts.getTableName())) {
+      conn.tableOperations().create(opts.getTableName());
+    } else {
+      System.out.println("Table exists, not doing anything.");
+      return;
+    }
+
+    // write some data
+    BatchWriter bw = conn.createBatchWriter(opts.getTableName(), bwOpts.getBatchWriterConfig());
+    bw.addMutation(createMutation("9225", "abcde", "file://foo.txt"));
+    bw.addMutation(createMutation("8934", "accumulo scales", "file://accumulo_notes.txt"));
+    bw.addMutation(createMutation("2317", "milk, eggs, bread, parmigiano-reggiano", "file://groceries/9/txt"));
+    bw.addMutation(createMutation("3900", "EC2 ate my homework", "file://final_project.txt"));
+    bw.flush();
+
+    SamplerConfiguration sc1 = new SamplerConfiguration(RowSampler.class.getName());
+    sc1.setOptions(ImmutableMap.of("hasher", "murmur3_32", "modulus", "3"));
+
+    conn.tableOperations().setSamplerConfiguration(opts.getTableName(), sc1);
+
+    Scanner scanner = conn.createScanner(opts.getTableName(), Authorizations.EMPTY);
+    System.out.println("Scanning all data :");
+    print(scanner);
+    System.out.println();
+
+    System.out.println("Scanning with sampler configuration.  Data was written before sampler was set on table, scan should fail.");
+    scanner.setSamplerConfiguration(sc1);
+    try {
+      print(scanner);
+    } catch (SampleNotPresentException e) {
+      System.out.println("  Saw sample not present exception as expected.");
+    }
+    System.out.println();
+
+    // compact table to recreate sample data
+    conn.tableOperations().compact(opts.getTableName(), new CompactionConfig().setCompactionStrategy(NO_SAMPLE_STRATEGY));
+
+    System.out.println("Scanning after compaction (compaction should have created sample data) : ");
+    print(scanner);
+    System.out.println();
+
+    // update a document in the sample data
+    bw.addMutation(createMutation("2317", "milk, eggs, bread, parmigiano-reggiano, butter", "file://groceries/9/txt"));
+    bw.close();
+    System.out.println("Scanning sample after updating content for docId 2317 (should see content change in sample data) : ");
+    print(scanner);
+    System.out.println();
+
+    // change tables sampling configuration...
+    SamplerConfiguration sc2 = new SamplerConfiguration(RowSampler.class.getName());
+    sc2.setOptions(ImmutableMap.of("hasher", "murmur3_32", "modulus", "2"));
+    conn.tableOperations().setSamplerConfiguration(opts.getTableName(), sc2);
+    // compact table to recreate sample data using new configuration
+    conn.tableOperations().compact(opts.getTableName(), new CompactionConfig().setCompactionStrategy(NO_SAMPLE_STRATEGY));
+
+    System.out.println("Scanning with old sampler configuration.  Sample data was created using new configuration with a compaction.  Scan should fail.");
+    try {
+      // try scanning with old sampler configuration
+      print(scanner);
+    } catch (SampleNotPresentException e) {
+      System.out.println("  Saw sample not present exception as expected ");
+    }
+    System.out.println();
+
+    // update expected sampler configuration on scanner
+    scanner.setSamplerConfiguration(sc2);
+
+    System.out.println("Scanning with new sampler configuration : ");
+    print(scanner);
+    System.out.println();
+
+  }
+
+  private static void print(Scanner scanner) {
+    for (Entry<Key,Value> entry : scanner) {
+      System.out.println("  " + entry.getKey() + " " + entry.getValue());
+    }
+  }
+
+  private static Mutation createMutation(String docId, String content, String url) {
+    Mutation m = new Mutation(docId);
+    m.put("doc", "context", content);
+    m.put("doc", "url", url);
+    return m;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/shard/ContinuousQuery.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/shard/ContinuousQuery.java b/src/main/java/org/apache/accumulo/examples/shard/ContinuousQuery.java
new file mode 100644
index 0000000..7251148
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/shard/ContinuousQuery.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.shard;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.cli.BatchScannerOpts;
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.IntersectingIterator;
+import org.apache.hadoop.io.Text;
+
+import com.beust.jcommander.Parameter;
+import com.google.common.collect.Iterators;
+
+/**
+ * Using the doc2word table created by Reverse.java, this program randomly selects N words per document. Then it continually queries a random set of words in
+ * the shard table (created by {@link Index}) using the {@link IntersectingIterator}.
+ */
+public class ContinuousQuery {
+
+  static class Opts extends ClientOpts {
+    @Parameter(names = "--shardTable", required = true, description = "name of the shard table")
+    String tableName = null;
+    @Parameter(names = "--doc2Term", required = true, description = "name of the doc2Term table")
+    String doc2Term;
+    @Parameter(names = "--terms", required = true, description = "the number of terms in the query")
+    int numTerms;
+    @Parameter(names = "--count", description = "the number of queries to run")
+    long iterations = Long.MAX_VALUE;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Opts opts = new Opts();
+    BatchScannerOpts bsOpts = new BatchScannerOpts();
+    opts.parseArgs(ContinuousQuery.class.getName(), args, bsOpts);
+
+    Connector conn = opts.getConnector();
+
+    ArrayList<Text[]> randTerms = findRandomTerms(conn.createScanner(opts.doc2Term, opts.auths), opts.numTerms);
+
+    Random rand = new Random();
+
+    BatchScanner bs = conn.createBatchScanner(opts.tableName, opts.auths, bsOpts.scanThreads);
+    bs.setTimeout(bsOpts.scanTimeout, TimeUnit.MILLISECONDS);
+
+    for (long i = 0; i < opts.iterations; i += 1) {
+      Text[] columns = randTerms.get(rand.nextInt(randTerms.size()));
+
+      bs.clearScanIterators();
+      bs.clearColumns();
+
+      IteratorSetting ii = new IteratorSetting(20, "ii", IntersectingIterator.class);
+      IntersectingIterator.setColumnFamilies(ii, columns);
+      bs.addScanIterator(ii);
+      bs.setRanges(Collections.singleton(new Range()));
+
+      long t1 = System.currentTimeMillis();
+      int count = Iterators.size(bs.iterator());
+      long t2 = System.currentTimeMillis();
+
+      System.out.printf("  %s %,d %6.3f%n", Arrays.asList(columns), count, (t2 - t1) / 1000.0);
+    }
+
+    bs.close();
+
+  }
+
+  private static ArrayList<Text[]> findRandomTerms(Scanner scanner, int numTerms) {
+
+    Text currentRow = null;
+
+    ArrayList<Text> words = new ArrayList<>();
+    ArrayList<Text[]> ret = new ArrayList<>();
+
+    Random rand = new Random();
+
+    for (Entry<Key,Value> entry : scanner) {
+      Key key = entry.getKey();
+
+      if (currentRow == null)
+        currentRow = key.getRow();
+
+      if (!currentRow.equals(key.getRow())) {
+        selectRandomWords(words, ret, rand, numTerms);
+        words.clear();
+        currentRow = key.getRow();
+      }
+
+      words.add(key.getColumnFamily());
+
+    }
+
+    selectRandomWords(words, ret, rand, numTerms);
+
+    return ret;
+  }
+
+  private static void selectRandomWords(ArrayList<Text> words, ArrayList<Text[]> ret, Random rand, int numTerms) {
+    if (words.size() >= numTerms) {
+      Collections.shuffle(words, rand);
+      Text docWords[] = new Text[numTerms];
+      for (int i = 0; i < docWords.length; i++) {
+        docWords[i] = words.get(i);
+      }
+
+      ret.add(docWords);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/shard/CutoffIntersectingIterator.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/shard/CutoffIntersectingIterator.java b/src/main/java/org/apache/accumulo/examples/shard/CutoffIntersectingIterator.java
new file mode 100644
index 0000000..18fe914
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/shard/CutoffIntersectingIterator.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.examples.shard;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.sample.RowColumnSampler;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.user.IntersectingIterator;
+
+/**
+ * This iterator uses a sample built from the Column Qualifier to quickly avoid intersecting iterator queries that may return too many documents.
+ */
+
+public class CutoffIntersectingIterator extends IntersectingIterator {
+
+  private IntersectingIterator sampleII;
+  private int sampleMax;
+  private boolean hasTop;
+
+  public static void setCutoff(IteratorSetting iterCfg, int cutoff) {
+    checkArgument(cutoff >= 0);
+    iterCfg.addOption("cutoff", cutoff + "");
+  }
+
+  @Override
+  public boolean hasTop() {
+    return hasTop && super.hasTop();
+  }
+
+  @Override
+  public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException {
+
+    sampleII.seek(range, seekColumnFamilies, inclusive);
+
+    // this check will be redone whenever iterator stack is torn down and recreated.
+    int count = 0;
+    while (count <= sampleMax && sampleII.hasTop()) {
+      sampleII.next();
+      count++;
+    }
+
+    if (count > sampleMax) {
+      // In a real application would probably want to return a key value that indicates too much data. Since this would execute for each tablet, some tablets
+      // may return data. For tablets that did not return data, would want an indication.
+      hasTop = false;
+    } else {
+      hasTop = true;
+      super.seek(range, seekColumnFamilies, inclusive);
+    }
+  }
+
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+    super.init(source, options, env);
+
+    IteratorEnvironment sampleEnv = env.cloneWithSamplingEnabled();
+
+    setMax(sampleEnv, options);
+
+    SortedKeyValueIterator<Key,Value> sampleDC = source.deepCopy(sampleEnv);
+    sampleII = new IntersectingIterator();
+    sampleII.init(sampleDC, options, env);
+
+  }
+
+  static void validateSamplerConfig(SamplerConfiguration sampleConfig) {
+    requireNonNull(sampleConfig);
+    checkArgument(sampleConfig.getSamplerClassName().equals(RowColumnSampler.class.getName()), "Unexpected Sampler " + sampleConfig.getSamplerClassName());
+    checkArgument(sampleConfig.getOptions().get("qualifier").equals("true"), "Expected sample on column qualifier");
+    checkArgument(isNullOrFalse(sampleConfig.getOptions(), "row", "family", "visibility"), "Expected sample on column qualifier only");
+  }
+
+  private void setMax(IteratorEnvironment sampleEnv, Map<String,String> options) {
+    String cutoffValue = options.get("cutoff");
+    SamplerConfiguration sampleConfig = sampleEnv.getSamplerConfiguration();
+
+    // Ensure the sample was constructed in an expected way. If the sample is not built as expected, then can not draw conclusions based on sample.
+    requireNonNull(cutoffValue, "Expected cutoff option is missing");
+    validateSamplerConfig(sampleConfig);
+
+    int modulus = Integer.parseInt(sampleConfig.getOptions().get("modulus"));
+
+    sampleMax = Math.round(Float.parseFloat(cutoffValue) / modulus);
+  }
+
+  private static boolean isNullOrFalse(Map<String,String> options, String... keys) {
+    for (String key : keys) {
+      String val = options.get(key);
+      if (val != null && val.equals("true")) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/shard/Index.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/shard/Index.java b/src/main/java/org/apache/accumulo/examples/shard/Index.java
new file mode 100644
index 0000000..0325e72
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/shard/Index.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.shard;
+
+import java.io.File;
+import java.io.FileReader;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ClientOnRequiredTable;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * This program indexes a set of documents given on the command line into a shard table.
+ *
+ * What it writes to the table is row = partition id, column family = term, column qualifier = document id.
+ */
+public class Index {
+
+  static Text genPartition(int partition) {
+    return new Text(String.format("%08x", Math.abs(partition)));
+  }
+
+  public static void index(int numPartitions, Text docId, String doc, String splitRegex, BatchWriter bw) throws Exception {
+
+    String[] tokens = doc.split(splitRegex);
+
+    Text partition = genPartition(doc.hashCode() % numPartitions);
+
+    Mutation m = new Mutation(partition);
+
+    HashSet<String> tokensSeen = new HashSet<>();
+
+    for (String token : tokens) {
+      token = token.toLowerCase();
+
+      if (!tokensSeen.contains(token)) {
+        tokensSeen.add(token);
+        m.put(new Text(token), docId, new Value(new byte[0]));
+      }
+    }
+
+    if (m.size() > 0)
+      bw.addMutation(m);
+  }
+
+  public static void index(int numPartitions, File src, String splitRegex, BatchWriter bw) throws Exception {
+    if (src.isDirectory()) {
+      File[] files = src.listFiles();
+      if (files != null) {
+        for (File child : files) {
+          index(numPartitions, child, splitRegex, bw);
+        }
+      }
+    } else {
+      FileReader fr = new FileReader(src);
+
+      StringBuilder sb = new StringBuilder();
+
+      char data[] = new char[4096];
+      int len;
+      while ((len = fr.read(data)) != -1) {
+        sb.append(data, 0, len);
+      }
+
+      fr.close();
+
+      index(numPartitions, new Text(src.getAbsolutePath()), sb.toString(), splitRegex, bw);
+    }
+
+  }
+
+  static class Opts extends ClientOnRequiredTable {
+    @Parameter(names = "--partitions", required = true, description = "the number of shards to create")
+    int partitions;
+    @Parameter(required = true, description = "<file> { <file> ... }")
+    List<String> files = new ArrayList<>();
+  }
+
+  public static void main(String[] args) throws Exception {
+    Opts opts = new Opts();
+    BatchWriterOpts bwOpts = new BatchWriterOpts();
+    opts.parseArgs(Index.class.getName(), args, bwOpts);
+
+    String splitRegex = "\\W+";
+
+    BatchWriter bw = opts.getConnector().createBatchWriter(opts.getTableName(), bwOpts.getBatchWriterConfig());
+    for (String filename : opts.files) {
+      index(opts.partitions, new File(filename), splitRegex, bw);
+    }
+    bw.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/shard/Query.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/shard/Query.java b/src/main/java/org/apache/accumulo/examples/shard/Query.java
new file mode 100644
index 0000000..77b459a
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/shard/Query.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.shard;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.cli.BatchScannerOpts;
+import org.apache.accumulo.core.cli.ClientOnRequiredTable;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.IntersectingIterator;
+import org.apache.hadoop.io.Text;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * This program queries a set of terms in the shard table (populated by {@link Index}) using the {@link IntersectingIterator}.
+ */
+public class Query {
+
+  static class Opts extends ClientOnRequiredTable {
+    @Parameter(description = " term { <term> ... }")
+    List<String> terms = new ArrayList<>();
+
+    @Parameter(names = {"--sample"}, description = "Do queries against sample, useful when sample is built using column qualifier")
+    private boolean useSample = false;
+
+    @Parameter(names = {"--sampleCutoff"},
+        description = "Use sample data to determine if a query might return a number of documents over the cutoff.  This check is per tablet.")
+    private Integer sampleCutoff = null;
+  }
+
+  public static List<String> query(BatchScanner bs, List<String> terms, Integer cutoff) {
+
+    Text columns[] = new Text[terms.size()];
+    int i = 0;
+    for (String term : terms) {
+      columns[i++] = new Text(term);
+    }
+
+    IteratorSetting ii;
+
+    if (cutoff != null) {
+      ii = new IteratorSetting(20, "ii", CutoffIntersectingIterator.class);
+      CutoffIntersectingIterator.setCutoff(ii, cutoff);
+    } else {
+      ii = new IteratorSetting(20, "ii", IntersectingIterator.class);
+    }
+
+    IntersectingIterator.setColumnFamilies(ii, columns);
+    bs.addScanIterator(ii);
+    bs.setRanges(Collections.singleton(new Range()));
+    List<String> result = new ArrayList<>();
+    for (Entry<Key,Value> entry : bs) {
+      result.add(entry.getKey().getColumnQualifier().toString());
+    }
+    return result;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Opts opts = new Opts();
+    BatchScannerOpts bsOpts = new BatchScannerOpts();
+    opts.parseArgs(Query.class.getName(), args, bsOpts);
+    Connector conn = opts.getConnector();
+    BatchScanner bs = conn.createBatchScanner(opts.getTableName(), opts.auths, bsOpts.scanThreads);
+    bs.setTimeout(bsOpts.scanTimeout, TimeUnit.MILLISECONDS);
+    if (opts.useSample) {
+      SamplerConfiguration samplerConfig = conn.tableOperations().getSamplerConfiguration(opts.getTableName());
+      CutoffIntersectingIterator.validateSamplerConfig(conn.tableOperations().getSamplerConfiguration(opts.getTableName()));
+      bs.setSamplerConfiguration(samplerConfig);
+    }
+    for (String entry : query(bs, opts.terms, opts.sampleCutoff))
+      System.out.println("  " + entry);
+
+    bs.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/shard/Reverse.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/shard/Reverse.java b/src/main/java/org/apache/accumulo/examples/shard/Reverse.java
new file mode 100644
index 0000000..05be206
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/shard/Reverse.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.shard;
+
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * The program reads an accumulo table written by {@link Index} and writes out to another table. It writes out a mapping of documents to terms. The document to
+ * term mapping is used by {@link ContinuousQuery}.
+ */
+public class Reverse {
+
+  static class Opts extends ClientOpts {
+    @Parameter(names = "--shardTable")
+    String shardTable = "shard";
+    @Parameter(names = "--doc2Term")
+    String doc2TermTable = "doc2Term";
+  }
+
+  public static void main(String[] args) throws Exception {
+    Opts opts = new Opts();
+    ScannerOpts scanOpts = new ScannerOpts();
+    BatchWriterOpts bwOpts = new BatchWriterOpts();
+    opts.parseArgs(Reverse.class.getName(), args, scanOpts, bwOpts);
+
+    Connector conn = opts.getConnector();
+
+    Scanner scanner = conn.createScanner(opts.shardTable, opts.auths);
+    scanner.setBatchSize(scanOpts.scanBatchSize);
+    BatchWriter bw = conn.createBatchWriter(opts.doc2TermTable, bwOpts.getBatchWriterConfig());
+
+    for (Entry<Key,Value> entry : scanner) {
+      Key key = entry.getKey();
+      Mutation m = new Mutation(key.getColumnQualifier());
+      m.put(key.getColumnFamily(), new Text(), new Value(new byte[0]));
+      bw.addMutation(m);
+    }
+
+    bw.close();
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/shell/DebugCommand.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/shell/DebugCommand.java b/src/main/java/org/apache/accumulo/examples/shell/DebugCommand.java
new file mode 100644
index 0000000..df68200
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/shell/DebugCommand.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.shell;
+
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.shell.Shell;
+import org.apache.accumulo.shell.Shell.Command;
+import org.apache.commons.cli.CommandLine;
+
+public class DebugCommand extends Command {
+
+  @Override
+  public int execute(String fullCommand, CommandLine cl, Shell shellState) throws Exception {
+    Set<String> lines = new TreeSet<>();
+    lines.add("This is a test");
+    shellState.printLines(lines.iterator(), true);
+    return 0;
+  }
+
+  @Override
+  public String description() {
+    return "prints a message to test extension feature";
+  }
+
+  @Override
+  public int numArgs() {
+    return 0;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/shell/ExampleShellExtension.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/shell/ExampleShellExtension.java b/src/main/java/org/apache/accumulo/examples/shell/ExampleShellExtension.java
new file mode 100644
index 0000000..be94b9c
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/shell/ExampleShellExtension.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.shell;
+
+import org.apache.accumulo.shell.Shell.Command;
+import org.apache.accumulo.shell.ShellExtension;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(ShellExtension.class)
+public class ExampleShellExtension extends ShellExtension {
+
+  @Override
+  public String getExtensionName() {
+    return "ExampleShellExtension";
+  }
+
+  @Override
+  public Command[] getCommands() {
+    return new Command[] {new DebugCommand()};
+  }
+
+}


Mime
View raw message