accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mwa...@apache.org
Subject [4/7] accumulo-examples git commit: ACCUMULO-4511 Adding examples from Accumulo repo
Date Fri, 09 Dec 2016 17:12:17 GMT
http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/filedata/ChunkInputStream.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/filedata/ChunkInputStream.java b/src/main/java/org/apache/accumulo/examples/filedata/ChunkInputStream.java
new file mode 100644
index 0000000..9d6b59d
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/filedata/ChunkInputStream.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.filedata;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.PeekingIterator;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An input stream that reads file data stored in one or more Accumulo values. Used by {@link ChunkInputFormat} to present input streams to a mapper.
+ */
+public class ChunkInputStream extends InputStream {
+  private static final Logger log = LoggerFactory.getLogger(ChunkInputStream.class);
+
+  protected PeekingIterator<Entry<Key,Value>> source;
+  protected Key currentKey;
+  protected Set<Text> currentVis;
+  protected int currentChunk;
+  protected int currentChunkSize;
+  protected boolean gotEndMarker;
+
+  protected byte buf[];
+  protected int count;
+  protected int pos;
+
+  public ChunkInputStream() {
+    source = null;
+  }
+
+  public ChunkInputStream(PeekingIterator<Entry<Key,Value>> in) throws IOException {
+    setSource(in);
+  }
+
+  public void setSource(PeekingIterator<Entry<Key,Value>> in) throws IOException {
+    if (source != null)
+      throw new IOException("setting new source without closing old one");
+    this.source = in;
+    currentVis = new TreeSet<>();
+    count = pos = 0;
+    if (!source.hasNext()) {
+      log.debug("source has no next");
+      gotEndMarker = true;
+      return;
+    }
+
+    // read forward until we reach a chunk
+    Entry<Key,Value> entry = source.next();
+    currentKey = entry.getKey();
+    buf = entry.getValue().get();
+    while (!currentKey.getColumnFamily().equals(FileDataIngest.CHUNK_CF)) {
+      log.debug("skipping key: " + currentKey.toString());
+      if (!source.hasNext())
+        return;
+      entry = source.next();
+      currentKey = entry.getKey();
+      buf = entry.getValue().get();
+    }
+    log.debug("starting chunk: " + currentKey.toString());
+    count = buf.length;
+    currentVis.add(currentKey.getColumnVisibility());
+    currentChunk = FileDataIngest.bytesToInt(currentKey.getColumnQualifier().getBytes(), 4);
+    currentChunkSize = FileDataIngest.bytesToInt(currentKey.getColumnQualifier().getBytes(), 0);
+    gotEndMarker = false;
+    if (buf.length == 0)
+      gotEndMarker = true;
+    if (currentChunk != 0) {
+      source = null;
+      throw new IOException("starting chunk number isn't 0 for " + currentKey.getRow());
+    }
+  }
+
+  private int fill() throws IOException {
+    if (source == null || !source.hasNext()) {
+      if (gotEndMarker)
+        return count = pos = 0;
+      else
+        throw new IOException("no end chunk marker but source has no data");
+    }
+
+    Entry<Key,Value> entry = source.peek();
+    Key thisKey = entry.getKey();
+    log.debug("evaluating key: " + thisKey.toString());
+
+    // check that we're still on the same row
+    if (!thisKey.equals(currentKey, PartialKey.ROW)) {
+      if (gotEndMarker)
+        return -1;
+      else {
+        String currentRow = currentKey.getRow().toString();
+        clear();
+        throw new IOException("got to the end of the row without end chunk marker " + currentRow);
+      }
+    }
+    log.debug("matches current key");
+
+    // ok to advance the iterator
+    source.next();
+
+    // check that this is part of a chunk
+    if (!thisKey.getColumnFamily().equals(FileDataIngest.CHUNK_CF)) {
+      log.debug("skipping non-chunk key");
+      return fill();
+    }
+    log.debug("is a chunk");
+
+    // check that the chunk size is the same as the one being read
+    if (currentChunkSize != FileDataIngest.bytesToInt(thisKey.getColumnQualifier().getBytes(), 0)) {
+      log.debug("skipping chunk of different size");
+      return fill();
+    }
+
+    // add the visibility to the list if it's not there
+    if (!currentVis.contains(thisKey.getColumnVisibility()))
+      currentVis.add(thisKey.getColumnVisibility());
+
+    // check to see if it is an identical chunk with a different visibility
+    if (thisKey.getColumnQualifier().equals(currentKey.getColumnQualifier())) {
+      log.debug("skipping identical chunk with different visibility");
+      return fill();
+    }
+
+    if (gotEndMarker) {
+      log.debug("got another chunk after end marker: " + currentKey.toString() + " " + thisKey.toString());
+      clear();
+      throw new IOException("found extra chunk after end marker");
+    }
+
+    // got new chunk of the same file, check that it's the next chunk
+    int thisChunk = FileDataIngest.bytesToInt(thisKey.getColumnQualifier().getBytes(), 4);
+    if (thisChunk != currentChunk + 1) {
+      log.debug("new chunk same file, unexpected chunkID: " + currentKey.toString() + " " + thisKey.toString());
+      clear();
+      throw new IOException("missing chunks between " + currentChunk + " and " + thisChunk);
+    }
+
+    currentKey = thisKey;
+    currentChunk = thisChunk;
+    buf = entry.getValue().get();
+    pos = 0;
+
+    // check to see if it's the last chunk
+    if (buf.length == 0) {
+      gotEndMarker = true;
+      return fill();
+    }
+
+    return count = buf.length;
+  }
+
+  public Set<Text> getVisibilities() {
+    if (source != null)
+      throw new IllegalStateException("don't get visibilities before chunks have been completely read");
+    return currentVis;
+  }
+
+  @Override
+  public int read() throws IOException {
+    if (source == null)
+      return -1;
+    log.debug("pos: " + pos + " count: " + count);
+    if (pos >= count) {
+      if (fill() <= 0) {
+        log.debug("done reading input stream at key: " + (currentKey == null ? "null" : currentKey.toString()));
+        if (source != null && source.hasNext())
+          log.debug("next key: " + source.peek().getKey());
+        clear();
+        return -1;
+      }
+    }
+    return buf[pos++] & 0xff;
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    if (b == null) {
+      throw new NullPointerException();
+    } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException();
+    } else if (len == 0) {
+      return 0;
+    }
+
+    log.debug("filling buffer " + off + " " + len);
+    int total = 0;
+    while (total < len) {
+      int avail = count - pos;
+      log.debug(avail + " available in current local buffer");
+      if (avail <= 0) {
+        if (fill() <= 0) {
+          log.debug("done reading input stream at key: " + (currentKey == null ? "null" : currentKey.toString()));
+          if (source != null && source.hasNext())
+            log.debug("next key: " + source.peek().getKey());
+          clear();
+          log.debug("filled " + total + " bytes");
+          return total == 0 ? -1 : total;
+        }
+        avail = count - pos;
+      }
+
+      int cnt = (avail < len - total) ? avail : len - total;
+      log.debug("copying from local buffer: local pos " + pos + " into pos " + off + " len " + cnt);
+      System.arraycopy(buf, pos, b, off, cnt);
+      pos += cnt;
+      off += cnt;
+      total += cnt;
+    }
+    log.debug("filled " + total + " bytes");
+    return total;
+  }
+
+  public void clear() {
+    source = null;
+    buf = null;
+    currentKey = null;
+    currentChunk = 0;
+    pos = count = 0;
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      while (fill() > 0) {}
+    } catch (IOException e) {
+      clear();
+      throw new IOException(e);
+    }
+    clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/filedata/FileDataIngest.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/filedata/FileDataIngest.java b/src/main/java/org/apache/accumulo/examples/filedata/FileDataIngest.java
new file mode 100644
index 0000000..6ee2d11
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/filedata/FileDataIngest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.filedata;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ClientOnRequiredTable;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.io.Text;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * Takes a list of files and archives them into Accumulo keyed on hashes of the files.
+ */
+public class FileDataIngest {
+  public static final Text CHUNK_CF = new Text("~chunk");
+  public static final Text REFS_CF = new Text("refs");
+  public static final String REFS_ORIG_FILE = "name";
+  public static final String REFS_FILE_EXT = "filext";
+  public static final ByteSequence CHUNK_CF_BS = new ArrayByteSequence(CHUNK_CF.getBytes(), 0, CHUNK_CF.getLength());
+  public static final ByteSequence REFS_CF_BS = new ArrayByteSequence(REFS_CF.getBytes(), 0, REFS_CF.getLength());
+
+  int chunkSize;
+  byte[] chunkSizeBytes;
+  byte[] buf;
+  MessageDigest md5digest;
+  ColumnVisibility cv;
+
+  public FileDataIngest(int chunkSize, ColumnVisibility colvis) {
+    this.chunkSize = chunkSize;
+    chunkSizeBytes = intToBytes(chunkSize);
+    buf = new byte[chunkSize];
+    try {
+      md5digest = MessageDigest.getInstance("MD5");
+    } catch (NoSuchAlgorithmException e) {
+      throw new RuntimeException(e);
+    }
+    cv = colvis;
+  }
+
+  public String insertFileData(String filename, BatchWriter bw) throws MutationsRejectedException, IOException {
+    if (chunkSize == 0)
+      return "";
+    md5digest.reset();
+    String uid = hexString(md5digest.digest(filename.getBytes()));
+
+    // read through file once, calculating hashes
+    md5digest.reset();
+    InputStream fis = null;
+    int numRead = 0;
+    try {
+      fis = new FileInputStream(filename);
+      numRead = fis.read(buf);
+      while (numRead >= 0) {
+        if (numRead > 0) {
+          md5digest.update(buf, 0, numRead);
+        }
+        numRead = fis.read(buf);
+      }
+    } finally {
+      if (fis != null) {
+        fis.close();
+      }
+    }
+
+    String hash = hexString(md5digest.digest());
+    Text row = new Text(hash);
+
+    // write info to accumulo
+    Mutation m = new Mutation(row);
+    m.put(REFS_CF, KeyUtil.buildNullSepText(uid, REFS_ORIG_FILE), cv, new Value(filename.getBytes()));
+    String fext = getExt(filename);
+    if (fext != null)
+      m.put(REFS_CF, KeyUtil.buildNullSepText(uid, REFS_FILE_EXT), cv, new Value(fext.getBytes()));
+    bw.addMutation(m);
+
+    // read through file again, writing chunks to accumulo
+    int chunkCount = 0;
+    try {
+      fis = new FileInputStream(filename);
+      numRead = fis.read(buf);
+      while (numRead >= 0) {
+        while (numRead < buf.length) {
+          int moreRead = fis.read(buf, numRead, buf.length - numRead);
+          if (moreRead > 0)
+            numRead += moreRead;
+          else if (moreRead < 0)
+            break;
+        }
+        m = new Mutation(row);
+        Text chunkCQ = new Text(chunkSizeBytes);
+        chunkCQ.append(intToBytes(chunkCount), 0, 4);
+        m.put(CHUNK_CF, chunkCQ, cv, new Value(buf, 0, numRead));
+        bw.addMutation(m);
+        if (chunkCount == Integer.MAX_VALUE)
+          throw new RuntimeException("too many chunks for file " + filename + ", try raising chunk size");
+        chunkCount++;
+        numRead = fis.read(buf);
+      }
+    } finally {
+      if (fis != null) {
+        fis.close();
+      }
+    }
+    m = new Mutation(row);
+    Text chunkCQ = new Text(chunkSizeBytes);
+    chunkCQ.append(intToBytes(chunkCount), 0, 4);
+    m.put(new Text(CHUNK_CF), chunkCQ, cv, new Value(new byte[0]));
+    bw.addMutation(m);
+    return hash;
+  }
+
+  public static int bytesToInt(byte[] b, int offset) {
+    if (b.length <= offset + 3)
+      throw new NumberFormatException("couldn't pull integer from bytes at offset " + offset);
+    int i = (((b[offset] & 255) << 24) + ((b[offset + 1] & 255) << 16) + ((b[offset + 2] & 255) << 8) + ((b[offset + 3] & 255) << 0));
+    return i;
+  }
+
+  public static byte[] intToBytes(int l) {
+    byte[] b = new byte[4];
+    b[0] = (byte) (l >>> 24);
+    b[1] = (byte) (l >>> 16);
+    b[2] = (byte) (l >>> 8);
+    b[3] = (byte) (l >>> 0);
+    return b;
+  }
+
+  private static String getExt(String filename) {
+    if (filename.indexOf(".") == -1)
+      return null;
+    return filename.substring(filename.lastIndexOf(".") + 1);
+  }
+
+  public String hexString(byte[] bytes) {
+    StringBuilder sb = new StringBuilder();
+    for (byte b : bytes) {
+      sb.append(String.format("%02x", b));
+    }
+    return sb.toString();
+  }
+
+  public static class Opts extends ClientOnRequiredTable {
+    @Parameter(names = "--vis", description = "use a given visibility for the new counts", converter = VisibilityConverter.class)
+    ColumnVisibility visibility = new ColumnVisibility();
+
+    @Parameter(names = "--chunk", description = "size of the chunks used to store partial files")
+    int chunkSize = 64 * 1024;
+
+    @Parameter(description = "<file> { <file> ... }")
+    List<String> files = new ArrayList<>();
+  }
+
+  public static void main(String[] args) throws Exception {
+    Opts opts = new Opts();
+    BatchWriterOpts bwOpts = new BatchWriterOpts();
+    opts.parseArgs(FileDataIngest.class.getName(), args, bwOpts);
+
+    Connector conn = opts.getConnector();
+    if (!conn.tableOperations().exists(opts.getTableName())) {
+      conn.tableOperations().create(opts.getTableName());
+      conn.tableOperations().attachIterator(opts.getTableName(), new IteratorSetting(1, ChunkCombiner.class));
+    }
+    BatchWriter bw = conn.createBatchWriter(opts.getTableName(), bwOpts.getBatchWriterConfig());
+    FileDataIngest fdi = new FileDataIngest(opts.chunkSize, opts.visibility);
+    for (String filename : opts.files) {
+      fdi.insertFileData(filename, bw);
+    }
+    bw.close();
+    opts.stopTracing();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/filedata/FileDataQuery.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/filedata/FileDataQuery.java b/src/main/java/org/apache/accumulo/examples/filedata/FileDataQuery.java
new file mode 100644
index 0000000..b034168
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/filedata/FileDataQuery.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.filedata;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.PeekingIterator;
+
+/**
+ * Retrieves file data based on the hash of the file. Used by the {@link org.apache.accumulo.examples.dirlist.Viewer}. See README.dirlist for
+ * instructions.
+ */
+public class FileDataQuery {
+  private Connector conn = null;
+  List<Entry<Key,Value>> lastRefs;
+  private ChunkInputStream cis;
+  Scanner scanner;
+
+  public FileDataQuery(String instanceName, String zooKeepers, String user, AuthenticationToken token, String tableName, Authorizations auths)
+      throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    ZooKeeperInstance instance = new ZooKeeperInstance(ClientConfiguration.loadDefault().withInstance(instanceName).withZkHosts(zooKeepers));
+    conn = instance.getConnector(user, token);
+    lastRefs = new ArrayList<>();
+    cis = new ChunkInputStream();
+    scanner = conn.createScanner(tableName, auths);
+  }
+
+  public List<Entry<Key,Value>> getLastRefs() {
+    return lastRefs;
+  }
+
+  public ChunkInputStream getData(String hash) throws IOException {
+    scanner.setRange(new Range(hash));
+    scanner.setBatchSize(1);
+    lastRefs.clear();
+    PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<>(scanner.iterator());
+    if (pi.hasNext()) {
+      while (!pi.peek().getKey().getColumnFamily().equals(FileDataIngest.CHUNK_CF)) {
+        lastRefs.add(pi.peek());
+        pi.next();
+      }
+    }
+    cis.clear();
+    cis.setSource(pi);
+    return cis;
+  }
+
+  public String getSomeData(String hash, int numBytes) throws IOException {
+    ChunkInputStream is = getData(hash);
+    byte[] buf = new byte[numBytes];
+    if (is.read(buf) >= 0) {
+      return new String(buf);
+    } else {
+      return "";
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/filedata/KeyUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/filedata/KeyUtil.java b/src/main/java/org/apache/accumulo/examples/filedata/KeyUtil.java
new file mode 100644
index 0000000..a2ffc1a
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/filedata/KeyUtil.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.filedata;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * A utility for creating and parsing null-byte separated strings into/from Text objects.
+ */
+public class KeyUtil {
+  public static final byte[] nullbyte = new byte[] {0};
+
+  /**
+   * Join some number of strings using a null byte separator into a text object.
+   *
+   * @param s
+   *          strings
+   * @return a text object containing the strings separated by null bytes
+   */
+  public static Text buildNullSepText(String... s) {
+    Text t = new Text(s[0]);
+    for (int i = 1; i < s.length; i++) {
+      t.append(nullbyte, 0, 1);
+      t.append(s[i].getBytes(), 0, s[i].length());
+    }
+    return t;
+  }
+
+  /**
+   * Split a text object using a null byte separator into an array of strings.
+   *
+   * @param t
+   *          null-byte separated text object
+   * @return an array of strings
+   */
+  public static String[] splitNullSepText(Text t) {
+    ArrayList<String> s = new ArrayList<>();
+    byte[] b = t.getBytes();
+    int lastindex = 0;
+    for (int i = 0; i < t.getLength(); i++) {
+      if (b[i] == (byte) 0) {
+        s.add(new String(b, lastindex, i - lastindex));
+        lastindex = i + 1;
+      }
+    }
+    s.add(new String(b, lastindex, t.getLength() - lastindex));
+    return s.toArray(new String[s.size()]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/filedata/VisibilityCombiner.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/filedata/VisibilityCombiner.java b/src/main/java/org/apache/accumulo/examples/filedata/VisibilityCombiner.java
new file mode 100644
index 0000000..819f710
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/filedata/VisibilityCombiner.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.filedata;
+
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.data.ByteSequence;
+
+/**
+ * A utility for merging visibilities into the form {@code (VIS1)|(VIS2)|...|(VISN)}. Used by the {@link ChunkCombiner}.
+ */
+public class VisibilityCombiner {
+
+  private TreeSet<String> visibilities = new TreeSet<>();
+
+  void add(ByteSequence cv) {
+    if (cv.length() == 0)
+      return;
+
+    int depth = 0;
+    int offset = 0;
+
+    for (int i = 0; i < cv.length(); i++) {
+      switch (cv.byteAt(i)) {
+        case '(':
+          depth++;
+          break;
+        case ')':
+          depth--;
+          if (depth < 0)
+            throw new IllegalArgumentException("Invalid vis " + cv);
+          break;
+        case '|':
+          if (depth == 0) {
+            insert(cv.subSequence(offset, i));
+            offset = i + 1;
+          }
+
+          break;
+      }
+    }
+
+    insert(cv.subSequence(offset, cv.length()));
+
+    if (depth != 0)
+      throw new IllegalArgumentException("Invalid vis " + cv);
+
+  }
+
+  private void insert(ByteSequence cv) {
+    for (int i = 0; i < cv.length(); i++) {
+
+    }
+
+    String cvs = cv.toString();
+
+    if (cvs.charAt(0) != '(')
+      cvs = "(" + cvs + ")";
+    else {
+      int depth = 0;
+      int depthZeroCloses = 0;
+      for (int i = 0; i < cv.length(); i++) {
+        switch (cv.byteAt(i)) {
+          case '(':
+            depth++;
+            break;
+          case ')':
+            depth--;
+            if (depth == 0)
+              depthZeroCloses++;
+            break;
+        }
+      }
+
+      if (depthZeroCloses > 1)
+        cvs = "(" + cvs + ")";
+    }
+
+    visibilities.add(cvs);
+  }
+
+  byte[] get() {
+    StringBuilder sb = new StringBuilder();
+    String sep = "";
+    for (String cvs : visibilities) {
+      sb.append(sep);
+      sep = "|";
+      sb.append(cvs);
+    }
+
+    return sb.toString().getBytes();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/helloworld/InsertWithBatchWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/helloworld/InsertWithBatchWriter.java b/src/main/java/org/apache/accumulo/examples/helloworld/InsertWithBatchWriter.java
new file mode 100644
index 0000000..8fa9fcf
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/helloworld/InsertWithBatchWriter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.helloworld;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ClientOnRequiredTable;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Inserts 10K rows (50K entries) into accumulo with each row having 5 entries.
+ */
+public class InsertWithBatchWriter {
+
+  public static void main(String[] args) throws AccumuloException, AccumuloSecurityException, MutationsRejectedException, TableExistsException,
+      TableNotFoundException {
+    ClientOnRequiredTable opts = new ClientOnRequiredTable();
+    BatchWriterOpts bwOpts = new BatchWriterOpts();
+    opts.parseArgs(InsertWithBatchWriter.class.getName(), args, bwOpts);
+
+    Connector connector = opts.getConnector();
+    MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(bwOpts.getBatchWriterConfig());
+
+    if (!connector.tableOperations().exists(opts.getTableName()))
+      connector.tableOperations().create(opts.getTableName());
+    BatchWriter bw = mtbw.getBatchWriter(opts.getTableName());
+
+    Text colf = new Text("colfam");
+    System.out.println("writing ...");
+    for (int i = 0; i < 10000; i++) {
+      Mutation m = new Mutation(new Text(String.format("row_%d", i)));
+      for (int j = 0; j < 5; j++) {
+        m.put(colf, new Text(String.format("colqual_%d", j)), new Value((String.format("value_%d_%d", i, j)).getBytes()));
+      }
+      bw.addMutation(m);
+      if (i % 100 == 0)
+        System.out.println(i);
+    }
+    mtbw.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/helloworld/ReadData.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/helloworld/ReadData.java b/src/main/java/org/apache/accumulo/examples/helloworld/ReadData.java
new file mode 100644
index 0000000..4b9b993
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/helloworld/ReadData.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.helloworld;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.cli.ClientOnRequiredTable;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * Reads all data between two rows; all data after a given row; or all data in a table, depending on the number of arguments given.
+ */
+public class ReadData {
+
+  private static final Logger log = LoggerFactory.getLogger(ReadData.class);
+
+  static class Opts extends ClientOnRequiredTable {
+    @Parameter(names = "--startKey")
+    String startKey;
+    @Parameter(names = "--endKey")
+    String endKey;
+  }
+
+  public static void main(String[] args) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+    Opts opts = new Opts();
+    ScannerOpts scanOpts = new ScannerOpts();
+    opts.parseArgs(ReadData.class.getName(), args, scanOpts);
+
+    Connector connector = opts.getConnector();
+
+    Scanner scan = connector.createScanner(opts.getTableName(), opts.auths);
+    scan.setBatchSize(scanOpts.scanBatchSize);
+    Key start = null;
+    if (opts.startKey != null)
+      start = new Key(new Text(opts.startKey));
+    Key end = null;
+    if (opts.endKey != null)
+      end = new Key(new Text(opts.endKey));
+    scan.setRange(new Range(start, end));
+    Iterator<Entry<Key,Value>> iter = scan.iterator();
+
+    while (iter.hasNext()) {
+      Entry<Key,Value> e = iter.next();
+      Text colf = e.getKey().getColumnFamily();
+      Text colq = e.getKey().getColumnQualifier();
+      log.trace("row: " + e.getKey().getRow() + ", colf: " + colf + ", colq: " + colq);
+      log.trace(", value: " + e.getValue().toString());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/isolation/InterferenceTest.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/isolation/InterferenceTest.java b/src/main/java/org/apache/accumulo/examples/isolation/InterferenceTest.java
new file mode 100644
index 0000000..594330f
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/isolation/InterferenceTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.isolation;
+
+import java.util.HashSet;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ClientOnRequiredTable;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * This example shows how a concurrent reader and writer can interfere with each other. It creates two threads that run forever reading and writing to the same
+ * table.
+ *
+ * When the example is run with isolation enabled, no interference will be observed.
+ *
+ * When the example is run with out isolation, the reader will see partial mutations of a row.
+ *
+ */
+
+public class InterferenceTest {
+
+  private static final int NUM_ROWS = 500;
+  private static final int NUM_COLUMNS = 113; // scanner batches 1000 by default, so make num columns not a multiple of 10
+  private static final Logger log = LoggerFactory.getLogger(InterferenceTest.class);
+
+  static class Writer implements Runnable {
+
+    private final BatchWriter bw;
+    private final long iterations;
+
+    Writer(BatchWriter bw, long iterations) {
+      this.bw = bw;
+      this.iterations = iterations;
+    }
+
+    @Override
+    public void run() {
+      int row = 0;
+      int value = 0;
+
+      for (long i = 0; i < iterations; i++) {
+        Mutation m = new Mutation(new Text(String.format("%03d", row)));
+        row = (row + 1) % NUM_ROWS;
+
+        for (int cq = 0; cq < NUM_COLUMNS; cq++)
+          m.put(new Text("000"), new Text(String.format("%04d", cq)), new Value(("" + value).getBytes()));
+
+        value++;
+
+        try {
+          bw.addMutation(m);
+        } catch (MutationsRejectedException e) {
+          log.error("Mutation was rejected.", e);
+          System.exit(-1);
+        }
+      }
+      try {
+        bw.close();
+      } catch (MutationsRejectedException e) {
+        log.error("Mutation was rejected on BatchWriter close.", e);
+      }
+    }
+  }
+
+  static class Reader implements Runnable {
+
+    private Scanner scanner;
+    volatile boolean stop = false;
+
+    Reader(Scanner scanner) {
+      this.scanner = scanner;
+    }
+
+    @Override
+    public void run() {
+      while (!stop) {
+        ByteSequence row = null;
+        int count = 0;
+
+        // all columns in a row should have the same value,
+        // use this hash set to track that
+        HashSet<String> values = new HashSet<>();
+
+        for (Entry<Key,Value> entry : scanner) {
+          if (row == null)
+            row = entry.getKey().getRowData();
+
+          if (!row.equals(entry.getKey().getRowData())) {
+            if (count != NUM_COLUMNS)
+              System.err.println("ERROR Did not see " + NUM_COLUMNS + " columns in row " + row);
+
+            if (values.size() > 1)
+              System.err.println("ERROR Columns in row " + row + " had multiple values " + values);
+
+            row = entry.getKey().getRowData();
+            count = 0;
+            values.clear();
+          }
+
+          count++;
+
+          values.add(entry.getValue().toString());
+        }
+
+        if (count > 0 && count != NUM_COLUMNS)
+          System.err.println("ERROR Did not see " + NUM_COLUMNS + " columns in row " + row);
+
+        if (values.size() > 1)
+          System.err.println("ERROR Columns in row " + row + " had multiple values " + values);
+      }
+    }
+
+    public void stopNow() {
+      stop = true;
+    }
+  }
+
+  static class Opts extends ClientOnRequiredTable {
+    @Parameter(names = "--iterations", description = "number of times to run", required = true)
+    long iterations = 0;
+    @Parameter(names = "--isolated", description = "use isolated scans")
+    boolean isolated = false;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Opts opts = new Opts();
+    BatchWriterOpts bwOpts = new BatchWriterOpts();
+    opts.parseArgs(InterferenceTest.class.getName(), args, bwOpts);
+
+    if (opts.iterations < 1)
+      opts.iterations = Long.MAX_VALUE;
+
+    Connector conn = opts.getConnector();
+    if (!conn.tableOperations().exists(opts.getTableName()))
+      conn.tableOperations().create(opts.getTableName());
+
+    Thread writer = new Thread(new Writer(conn.createBatchWriter(opts.getTableName(), bwOpts.getBatchWriterConfig()), opts.iterations));
+    writer.start();
+    Reader r;
+    if (opts.isolated)
+      r = new Reader(new IsolatedScanner(conn.createScanner(opts.getTableName(), opts.auths)));
+    else
+      r = new Reader(conn.createScanner(opts.getTableName(), opts.auths));
+    Thread reader;
+    reader = new Thread(r);
+    reader.start();
+    writer.join();
+    r.stopNow();
+    reader.join();
+    System.out.println("finished");
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/mapreduce/NGramIngest.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/mapreduce/NGramIngest.java b/src/main/java/org/apache/accumulo/examples/mapreduce/NGramIngest.java
new file mode 100644
index 0000000..9179e3a
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/mapreduce/NGramIngest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.mapreduce;
+
+import java.io.IOException;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.MapReduceClientOnRequiredTable;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * Map job to ingest n-gram files from http://storage.googleapis.com/books/ngrams/books/datasetsv2.html
+ */
+public class NGramIngest extends Configured implements Tool {
+
+  private static final Logger log = LoggerFactory.getLogger(NGramIngest.class);
+
+  static class Opts extends MapReduceClientOnRequiredTable {
+    @Parameter(names = "--input", required = true)
+    String inputDirectory;
+  }
+
+  static class NGramMapper extends Mapper<LongWritable,Text,Text,Mutation> {
+
+    @Override
+    protected void map(LongWritable location, Text value, Context context) throws IOException, InterruptedException {
+      String parts[] = value.toString().split("\\t");
+      if (parts.length >= 4) {
+        Mutation m = new Mutation(parts[0]);
+        m.put(parts[1], String.format("%010d", Long.parseLong(parts[2])), new Value(parts[3].trim().getBytes()));
+        context.write(null, m);
+      }
+    }
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(getClass().getName(), args);
+
+    Job job = Job.getInstance(getConf());
+    job.setJobName(getClass().getSimpleName());
+    job.setJarByClass(getClass());
+
+    opts.setAccumuloConfigs(job);
+    job.setInputFormatClass(TextInputFormat.class);
+    job.setOutputFormatClass(AccumuloOutputFormat.class);
+
+    job.setMapperClass(NGramMapper.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Mutation.class);
+
+    job.setNumReduceTasks(0);
+    job.setSpeculativeExecution(false);
+
+    if (!opts.getConnector().tableOperations().exists(opts.getTableName())) {
+      log.info("Creating table " + opts.getTableName());
+      opts.getConnector().tableOperations().create(opts.getTableName());
+      SortedSet<Text> splits = new TreeSet<>();
+      String numbers[] = "1 2 3 4 5 6 7 8 9".split("\\s");
+      String lower[] = "a b c d e f g h i j k l m n o p q r s t u v w x y z".split("\\s");
+      String upper[] = "A B C D E F G H I J K L M N O P Q R S T U V W X Y Z".split("\\s");
+      for (String[] array : new String[][] {numbers, lower, upper}) {
+        for (String s : array) {
+          splits.add(new Text(s));
+        }
+      }
+      opts.getConnector().tableOperations().addSplits(opts.getTableName(), splits);
+    }
+
+    TextInputFormat.addInputPath(job, new Path(opts.inputDirectory));
+    job.waitForCompletion(true);
+    return job.isSuccessful() ? 0 : 1;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new NGramIngest(), args);
+    if (res != 0)
+      System.exit(res);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/mapreduce/RegexExample.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/mapreduce/RegexExample.java b/src/main/java/org/apache/accumulo/examples/mapreduce/RegexExample.java
new file mode 100644
index 0000000..baf8412
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/mapreduce/RegexExample.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.cli.MapReduceClientOnRequiredTable;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.RegExFilter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.beust.jcommander.Parameter;
+
+public class RegexExample extends Configured implements Tool {
+  public static class RegexMapper extends Mapper<Key,Value,Key,Value> {
+    @Override
+    public void map(Key row, Value data, Context context) throws IOException, InterruptedException {
+      context.write(row, data);
+    }
+  }
+
+  static class Opts extends MapReduceClientOnRequiredTable {
+    @Parameter(names = "--rowRegex")
+    String rowRegex;
+    @Parameter(names = "--columnFamilyRegex")
+    String columnFamilyRegex;
+    @Parameter(names = "--columnQualifierRegex")
+    String columnQualifierRegex;
+    @Parameter(names = "--valueRegex")
+    String valueRegex;
+    @Parameter(names = "--output", required = true)
+    String destination;
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(getClass().getName(), args);
+
+    Job job = Job.getInstance(getConf());
+    job.setJobName(getClass().getSimpleName());
+    job.setJarByClass(getClass());
+
+    job.setInputFormatClass(AccumuloInputFormat.class);
+    opts.setAccumuloConfigs(job);
+
+    IteratorSetting regex = new IteratorSetting(50, "regex", RegExFilter.class);
+    RegExFilter.setRegexs(regex, opts.rowRegex, opts.columnFamilyRegex, opts.columnQualifierRegex, opts.valueRegex, false);
+    AccumuloInputFormat.addIterator(job, regex);
+
+    job.setMapperClass(RegexMapper.class);
+    job.setMapOutputKeyClass(Key.class);
+    job.setMapOutputValueClass(Value.class);
+
+    job.setNumReduceTasks(0);
+
+    job.setOutputFormatClass(TextOutputFormat.class);
+    TextOutputFormat.setOutputPath(job, new Path(opts.destination));
+
+    System.out.println("setRowRegex: " + opts.rowRegex);
+    System.out.println("setColumnFamilyRegex: " + opts.columnFamilyRegex);
+    System.out.println("setColumnQualifierRegex: " + opts.columnQualifierRegex);
+    System.out.println("setValueRegex: " + opts.valueRegex);
+
+    job.waitForCompletion(true);
+    return job.isSuccessful() ? 0 : 1;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new RegexExample(), args);
+    if (res != 0)
+      System.exit(res);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/mapreduce/RowHash.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/mapreduce/RowHash.java b/src/main/java/org/apache/accumulo/examples/mapreduce/RowHash.java
new file mode 100644
index 0000000..b2d0926
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/mapreduce/RowHash.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.mapreduce;
+
+import java.io.IOException;
+import java.util.Base64;
+import java.util.Collections;
+
+import org.apache.accumulo.core.cli.MapReduceClientOnRequiredTable;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.beust.jcommander.Parameter;
+
+public class RowHash extends Configured implements Tool {
+  /**
+   * The Mapper class that given a row number, will generate the appropriate output line.
+   */
+  public static class HashDataMapper extends Mapper<Key,Value,Text,Mutation> {
+    @Override
+    public void map(Key row, Value data, Context context) throws IOException, InterruptedException {
+      Mutation m = new Mutation(row.getRow());
+      m.put(new Text("cf-HASHTYPE"), new Text("cq-MD5BASE64"), new Value(Base64.getEncoder().encode(MD5Hash.digest(data.toString()).getDigest())));
+      context.write(null, m);
+      context.progress();
+    }
+
+    @Override
+    public void setup(Context job) {}
+  }
+
+  private static class Opts extends MapReduceClientOnRequiredTable {
+    @Parameter(names = "--column", required = true)
+    String column;
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Job job = Job.getInstance(getConf());
+    job.setJobName(this.getClass().getName());
+    job.setJarByClass(this.getClass());
+    Opts opts = new Opts();
+    opts.parseArgs(RowHash.class.getName(), args);
+    job.setInputFormatClass(AccumuloInputFormat.class);
+    opts.setAccumuloConfigs(job);
+
+    String col = opts.column;
+    int idx = col.indexOf(":");
+    Text cf = new Text(idx < 0 ? col : col.substring(0, idx));
+    Text cq = idx < 0 ? null : new Text(col.substring(idx + 1));
+    if (cf.getLength() > 0)
+      AccumuloInputFormat.fetchColumns(job, Collections.singleton(new Pair<>(cf, cq)));
+
+    job.setMapperClass(HashDataMapper.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Mutation.class);
+
+    job.setNumReduceTasks(0);
+
+    job.setOutputFormatClass(AccumuloOutputFormat.class);
+
+    job.waitForCompletion(true);
+    return job.isSuccessful() ? 0 : 1;
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new RowHash(), args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/mapreduce/TableToFile.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/mapreduce/TableToFile.java b/src/main/java/org/apache/accumulo/examples/mapreduce/TableToFile.java
new file mode 100644
index 0000000..f1c199d
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/mapreduce/TableToFile.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.mapreduce;
+
+import java.io.IOException;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.accumulo.core.cli.MapReduceClientOnRequiredTable;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.format.DefaultFormatter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * Takes a table and outputs the specified column to a set of part files on hdfs
+ * {@code accumulo accumulo.examples.mapreduce.TableToFile <username> <password> <tablename> <column> <hdfs-output-path>}
+ */
+public class TableToFile extends Configured implements Tool {
+
+  static class Opts extends MapReduceClientOnRequiredTable {
+    @Parameter(names = "--output", description = "output directory", required = true)
+    String output;
+    @Parameter(names = "--columns", description = "columns to extract, in cf:cq{,cf:cq,...} form")
+    String columns = "";
+  }
+
+  /**
+   * The Mapper class that given a row number, will generate the appropriate output line.
+   */
+  public static class TTFMapper extends Mapper<Key,Value,NullWritable,Text> {
+    @Override
+    public void map(Key row, Value data, Context context) throws IOException, InterruptedException {
+      Map.Entry<Key,Value> entry = new SimpleImmutableEntry<>(row, data);
+      context.write(NullWritable.get(), new Text(DefaultFormatter.formatEntry(entry, false)));
+      context.setStatus("Outputed Value");
+    }
+  }
+
+  @Override
+  public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException, AccumuloSecurityException {
+    Job job = Job.getInstance(getConf());
+    job.setJobName(this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+    job.setJarByClass(this.getClass());
+    Opts opts = new Opts();
+    opts.parseArgs(getClass().getName(), args);
+
+    job.setInputFormatClass(AccumuloInputFormat.class);
+    opts.setAccumuloConfigs(job);
+
+    HashSet<Pair<Text,Text>> columnsToFetch = new HashSet<>();
+    for (String col : opts.columns.split(",")) {
+      int idx = col.indexOf(":");
+      Text cf = new Text(idx < 0 ? col : col.substring(0, idx));
+      Text cq = idx < 0 ? null : new Text(col.substring(idx + 1));
+      if (cf.getLength() > 0)
+        columnsToFetch.add(new Pair<>(cf, cq));
+    }
+    if (!columnsToFetch.isEmpty())
+      AccumuloInputFormat.fetchColumns(job, columnsToFetch);
+
+    job.setMapperClass(TTFMapper.class);
+    job.setMapOutputKeyClass(NullWritable.class);
+    job.setMapOutputValueClass(Text.class);
+
+    job.setNumReduceTasks(0);
+
+    job.setOutputFormatClass(TextOutputFormat.class);
+    TextOutputFormat.setOutputPath(job, new Path(opts.output));
+
+    job.waitForCompletion(true);
+    return job.isSuccessful() ? 0 : 1;
+  }
+
+  /**
+   *
+   * @param args
+   *          instanceName zookeepers username password table columns outputpath
+   */
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new TableToFile(), args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/mapreduce/TeraSortIngest.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/mapreduce/TeraSortIngest.java b/src/main/java/org/apache/accumulo/examples/mapreduce/TeraSortIngest.java
new file mode 100644
index 0000000..a8b9b22
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/mapreduce/TeraSortIngest.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.examples.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.accumulo.core.cli.MapReduceClientOnRequiredTable;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * Generate the *almost* official terasort input data set. (See below) The user specifies the number of rows and the output directory and this class runs a
+ * map/reduce program to generate the data. The format of the data is:
+ * <ul>
+ * <li>(10 bytes key) (10 bytes rowid) (78 bytes filler) \r \n
+ * <li>The keys are random characters from the set ' ' .. '~'.
+ * <li>The rowid is the right justified row id as a int.
+ * <li>The filler consists of 7 runs of 10 characters from 'A' to 'Z'.
+ * </ul>
+ *
+ * This TeraSort is slightly modified to allow for variable length key sizes and value sizes. The row length isn't variable. To generate a terabyte of data in
+ * the same way TeraSort does use 10000000000 rows and 10/10 byte key length and 78/78 byte value length. Along with the 10 byte row id and \r\n this gives you
+ * 100 byte row * 10000000000 rows = 1tb. Min/Max ranges for key and value parameters are inclusive/inclusive respectively.
+ *
+ *
+ */
+public class TeraSortIngest extends Configured implements Tool {
+  /**
+   * An input format that assigns ranges of longs to each mapper.
+   */
+  static class RangeInputFormat extends InputFormat<LongWritable,NullWritable> {
+    /**
+     * An input split consisting of a range on numbers.
+     */
+    static class RangeInputSplit extends InputSplit implements Writable {
+      long firstRow;
+      long rowCount;
+
+      public RangeInputSplit() {}
+
+      public RangeInputSplit(long offset, long length) {
+        firstRow = offset;
+        rowCount = length;
+      }
+
+      @Override
+      public long getLength() throws IOException {
+        return 0;
+      }
+
+      @Override
+      public String[] getLocations() throws IOException {
+        return new String[] {};
+      }
+
+      @Override
+      public void readFields(DataInput in) throws IOException {
+        firstRow = WritableUtils.readVLong(in);
+        rowCount = WritableUtils.readVLong(in);
+      }
+
+      @Override
+      public void write(DataOutput out) throws IOException {
+        WritableUtils.writeVLong(out, firstRow);
+        WritableUtils.writeVLong(out, rowCount);
+      }
+    }
+
+    /**
+     * A record reader that will generate a range of numbers.
+     */
+    static class RangeRecordReader extends RecordReader<LongWritable,NullWritable> {
+      long startRow;
+      long finishedRows;
+      long totalRows;
+
+      public RangeRecordReader(RangeInputSplit split) {
+        startRow = split.firstRow;
+        finishedRows = 0;
+        totalRows = split.rowCount;
+      }
+
+      @Override
+      public void close() throws IOException {}
+
+      @Override
+      public float getProgress() throws IOException {
+        return finishedRows / (float) totalRows;
+      }
+
+      @Override
+      public LongWritable getCurrentKey() throws IOException, InterruptedException {
+        return new LongWritable(startRow + finishedRows);
+      }
+
+      @Override
+      public NullWritable getCurrentValue() throws IOException, InterruptedException {
+        return NullWritable.get();
+      }
+
+      @Override
+      public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {}
+
+      @Override
+      public boolean nextKeyValue() throws IOException, InterruptedException {
+        if (finishedRows < totalRows) {
+          ++finishedRows;
+          return true;
+        }
+        return false;
+      }
+    }
+
+    @Override
+    public RecordReader<LongWritable,NullWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
+      // reporter.setStatus("Creating record reader");
+      return new RangeRecordReader((RangeInputSplit) split);
+    }
+
+    /**
+     * Create the desired number of splits, dividing the number of rows between the mappers.
+     */
+    @Override
+    public List<InputSplit> getSplits(JobContext job) {
+      long totalRows = job.getConfiguration().getLong(NUMROWS, 0);
+      int numSplits = job.getConfiguration().getInt(NUMSPLITS, 1);
+      long rowsPerSplit = totalRows / numSplits;
+      System.out.println("Generating " + totalRows + " using " + numSplits + " maps with step of " + rowsPerSplit);
+      ArrayList<InputSplit> splits = new ArrayList<>(numSplits);
+      long currentRow = 0;
+      for (int split = 0; split < numSplits - 1; ++split) {
+        splits.add(new RangeInputSplit(currentRow, rowsPerSplit));
+        currentRow += rowsPerSplit;
+      }
+      splits.add(new RangeInputSplit(currentRow, totalRows - currentRow));
+      System.out.println("Done Generating.");
+      return splits;
+    }
+
+  }
+
+  private static String NUMSPLITS = "terasort.overridesplits";
+  private static String NUMROWS = "terasort.numrows";
+
+  static class RandomGenerator {
+    private long seed = 0;
+    private static final long mask32 = (1l << 32) - 1;
+    /**
+     * The number of iterations separating the precomputed seeds.
+     */
+    private static final int seedSkip = 128 * 1024 * 1024;
+    /**
+     * The precomputed seed values after every seedSkip iterations. There should be enough values so that a 2**32 iterations are covered.
+     */
+    private static final long[] seeds = new long[] {0L, 4160749568L, 4026531840L, 3892314112L, 3758096384L, 3623878656L, 3489660928L, 3355443200L, 3221225472L,
+        3087007744L, 2952790016L, 2818572288L, 2684354560L, 2550136832L, 2415919104L, 2281701376L, 2147483648L, 2013265920L, 1879048192L, 1744830464L,
+        1610612736L, 1476395008L, 1342177280L, 1207959552L, 1073741824L, 939524096L, 805306368L, 671088640L, 536870912L, 402653184L, 268435456L, 134217728L,};
+
+    /**
+     * Start the random number generator on the given iteration.
+     *
+     * @param initalIteration
+     *          the iteration number to start on
+     */
+    RandomGenerator(long initalIteration) {
+      int baseIndex = (int) ((initalIteration & mask32) / seedSkip);
+      seed = seeds[baseIndex];
+      for (int i = 0; i < initalIteration % seedSkip; ++i) {
+        next();
+      }
+    }
+
+    RandomGenerator() {
+      this(0);
+    }
+
+    long next() {
+      seed = (seed * 3141592621l + 663896637) & mask32;
+      return seed;
+    }
+  }
+
+  /**
+   * The Mapper class that given a row number, will generate the appropriate output line.
+   */
+  public static class SortGenMapper extends Mapper<LongWritable,NullWritable,Text,Mutation> {
+    private Text tableName = null;
+    private int minkeylength = 0;
+    private int maxkeylength = 0;
+    private int minvaluelength = 0;
+    private int maxvaluelength = 0;
+
+    private Text key = new Text();
+    private Text value = new Text();
+    private RandomGenerator rand;
+    private byte[] keyBytes; // = new byte[12];
+    private byte[] spaces = "          ".getBytes();
+    private byte[][] filler = new byte[26][];
+    {
+      for (int i = 0; i < 26; ++i) {
+        filler[i] = new byte[10];
+        for (int j = 0; j < 10; ++j) {
+          filler[i][j] = (byte) ('A' + i);
+        }
+      }
+    }
+
+    /**
+     * Add a random key to the text
+     */
+    private Random random = new Random();
+
+    private void addKey() {
+      int range = random.nextInt(maxkeylength - minkeylength + 1);
+      int keylen = range + minkeylength;
+      int keyceil = keylen + (4 - (keylen % 4));
+      keyBytes = new byte[keyceil];
+
+      long temp = 0;
+      for (int i = 0; i < keyceil / 4; i++) {
+        temp = rand.next() / 52;
+        keyBytes[3 + 4 * i] = (byte) (' ' + (temp % 95));
+        temp /= 95;
+        keyBytes[2 + 4 * i] = (byte) (' ' + (temp % 95));
+        temp /= 95;
+        keyBytes[1 + 4 * i] = (byte) (' ' + (temp % 95));
+        temp /= 95;
+        keyBytes[4 * i] = (byte) (' ' + (temp % 95));
+      }
+      key.set(keyBytes, 0, keylen);
+    }
+
+    /**
+     * Add the rowid to the row.
+     */
+    private Text getRowIdString(long rowId) {
+      Text paddedRowIdString = new Text();
+      byte[] rowid = Integer.toString((int) rowId).getBytes();
+      int padSpace = 10 - rowid.length;
+      if (padSpace > 0) {
+        paddedRowIdString.append(spaces, 0, 10 - rowid.length);
+      }
+      paddedRowIdString.append(rowid, 0, Math.min(rowid.length, 10));
+      return paddedRowIdString;
+    }
+
+    /**
+     * Add the required filler bytes. Each row consists of 7 blocks of 10 characters and 1 block of 8 characters.
+     *
+     * @param rowId
+     *          the current row number
+     */
+    private void addFiller(long rowId) {
+      int base = (int) ((rowId * 8) % 26);
+
+      // Get Random var
+      Random random = new Random(rand.seed);
+
+      int range = random.nextInt(maxvaluelength - minvaluelength + 1);
+      int valuelen = range + minvaluelength;
+
+      while (valuelen > 10) {
+        value.append(filler[(base + valuelen) % 26], 0, 10);
+        valuelen -= 10;
+      }
+
+      if (valuelen > 0)
+        value.append(filler[(base + valuelen) % 26], 0, valuelen);
+    }
+
+    @Override
+    public void map(LongWritable row, NullWritable ignored, Context context) throws IOException, InterruptedException {
+      context.setStatus("Entering");
+      long rowId = row.get();
+      if (rand == null) {
+        // we use 3 random numbers per a row
+        rand = new RandomGenerator(rowId * 3);
+      }
+      addKey();
+      value.clear();
+      // addRowId(rowId);
+      addFiller(rowId);
+
+      // New
+      Mutation m = new Mutation(key);
+      m.put(new Text("c"), // column family
+          getRowIdString(rowId), // column qual
+          new Value(value.toString().getBytes())); // data
+
+      context.setStatus("About to add to accumulo");
+      context.write(tableName, m);
+      context.setStatus("Added to accumulo " + key.toString());
+    }
+
+    @Override
+    public void setup(Context job) {
+      minkeylength = job.getConfiguration().getInt("cloudgen.minkeylength", 0);
+      maxkeylength = job.getConfiguration().getInt("cloudgen.maxkeylength", 0);
+      minvaluelength = job.getConfiguration().getInt("cloudgen.minvaluelength", 0);
+      maxvaluelength = job.getConfiguration().getInt("cloudgen.maxvaluelength", 0);
+      tableName = new Text(job.getConfiguration().get("cloudgen.tablename"));
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new TeraSortIngest(), args);
+  }
+
+  static class Opts extends MapReduceClientOnRequiredTable {
+    @Parameter(names = "--count", description = "number of rows to ingest", required = true)
+    long numRows;
+    @Parameter(names = {"-nk", "--minKeySize"}, description = "miniumum key size", required = true)
+    int minKeyLength;
+    @Parameter(names = {"-xk", "--maxKeySize"}, description = "maximum key size", required = true)
+    int maxKeyLength;
+    @Parameter(names = {"-nv", "--minValueSize"}, description = "minimum key size", required = true)
+    int minValueLength;
+    @Parameter(names = {"-xv", "--maxValueSize"}, description = "maximum key size", required = true)
+    int maxValueLength;
+    @Parameter(names = "--splits", description = "number of splits to create in the table")
+    int splits = 0;
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Job job = Job.getInstance(getConf());
+    job.setJobName("TeraSortCloud");
+    job.setJarByClass(this.getClass());
+    Opts opts = new Opts();
+    opts.parseArgs(TeraSortIngest.class.getName(), args);
+
+    job.setInputFormatClass(RangeInputFormat.class);
+    job.setMapperClass(SortGenMapper.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Mutation.class);
+
+    job.setNumReduceTasks(0);
+
+    job.setOutputFormatClass(AccumuloOutputFormat.class);
+    opts.setAccumuloConfigs(job);
+    BatchWriterConfig bwConfig = new BatchWriterConfig().setMaxMemory(10L * 1000 * 1000);
+    AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig);
+
+    Configuration conf = job.getConfiguration();
+    conf.setLong(NUMROWS, opts.numRows);
+    conf.setInt("cloudgen.minkeylength", opts.minKeyLength);
+    conf.setInt("cloudgen.maxkeylength", opts.maxKeyLength);
+    conf.setInt("cloudgen.minvaluelength", opts.minValueLength);
+    conf.setInt("cloudgen.maxvaluelength", opts.maxValueLength);
+    conf.set("cloudgen.tablename", opts.getTableName());
+
+    if (args.length > 10)
+      conf.setInt(NUMSPLITS, opts.splits);
+
+    job.waitForCompletion(true);
+    return job.isSuccessful() ? 0 : 1;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/mapreduce/TokenFileWordCount.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/mapreduce/TokenFileWordCount.java b/src/main/java/org/apache/accumulo/examples/mapreduce/TokenFileWordCount.java
new file mode 100644
index 0000000..5b9935e
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/mapreduce/TokenFileWordCount.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple map reduce job that inserts word counts into accumulo. See the README for instructions on how to run this. This version does not use the ClientOpts
+ * class to parse arguments as an example of using AccumuloInputFormat and AccumuloOutputFormat directly. See README.mapred for more details.
+ *
+ */
+public class TokenFileWordCount extends Configured implements Tool {
+
+  private static final Logger log = LoggerFactory.getLogger(TokenFileWordCount.class);
+
+  public static class MapClass extends Mapper<LongWritable,Text,Text,Mutation> {
+    @Override
+    public void map(LongWritable key, Text value, Context output) throws IOException {
+      String[] words = value.toString().split("\\s+");
+
+      for (String word : words) {
+
+        Mutation mutation = new Mutation(new Text(word));
+        mutation.put(new Text("count"), new Text("20080906"), new Value("1".getBytes()));
+
+        try {
+          output.write(null, mutation);
+        } catch (InterruptedException e) {
+          log.error("Could not write to Context.", e);
+        }
+      }
+    }
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+
+    String instance = args[0];
+    String zookeepers = args[1];
+    String user = args[2];
+    String tokenFile = args[3];
+    String input = args[4];
+    String tableName = args[5];
+
+    Job job = Job.getInstance(getConf());
+    job.setJobName(TokenFileWordCount.class.getName());
+    job.setJarByClass(this.getClass());
+
+    job.setInputFormatClass(TextInputFormat.class);
+    TextInputFormat.setInputPaths(job, input);
+
+    job.setMapperClass(MapClass.class);
+
+    job.setNumReduceTasks(0);
+
+    job.setOutputFormatClass(AccumuloOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Mutation.class);
+
+    // AccumuloInputFormat not used here, but it uses the same functions.
+    AccumuloOutputFormat.setZooKeeperInstance(job, ClientConfiguration.loadDefault().withInstance(instance).withZkHosts(zookeepers));
+    AccumuloOutputFormat.setConnectorInfo(job, user, tokenFile);
+    AccumuloOutputFormat.setCreateTables(job, true);
+    AccumuloOutputFormat.setDefaultTableName(job, tableName);
+
+    job.waitForCompletion(true);
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new TokenFileWordCount(), args);
+    System.exit(res);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/mapreduce/UniqueColumns.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/mapreduce/UniqueColumns.java b/src/main/java/org/apache/accumulo/examples/mapreduce/UniqueColumns.java
new file mode 100644
index 0000000..42fd651
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/mapreduce/UniqueColumns.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.mapreduce;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+
+import org.apache.accumulo.core.cli.MapReduceClientOnRequiredTable;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * A simple map reduce job that computes the unique column families and column qualifiers in a table. This example shows one way to run against an offline
+ * table.
+ */
+public class UniqueColumns extends Configured implements Tool {
+
+  private static final Text EMPTY = new Text();
+
+  public static class UMapper extends Mapper<Key,Value,Text,Text> {
+    private Text temp = new Text();
+    private static final Text CF = new Text("cf:");
+    private static final Text CQ = new Text("cq:");
+
+    @Override
+    public void map(Key key, Value value, Context context) throws IOException, InterruptedException {
+      temp.set(CF);
+      ByteSequence cf = key.getColumnFamilyData();
+      temp.append(cf.getBackingArray(), cf.offset(), cf.length());
+      context.write(temp, EMPTY);
+
+      temp.set(CQ);
+      ByteSequence cq = key.getColumnQualifierData();
+      temp.append(cq.getBackingArray(), cq.offset(), cq.length());
+      context.write(temp, EMPTY);
+    }
+  }
+
+  public static class UReducer extends Reducer<Text,Text,Text,Text> {
+    @Override
+    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+      context.write(key, EMPTY);
+    }
+  }
+
+  static class Opts extends MapReduceClientOnRequiredTable {
+    @Parameter(names = "--output", description = "output directory")
+    String output;
+    @Parameter(names = "--reducers", description = "number of reducers to use", required = true)
+    int reducers;
+    @Parameter(names = "--offline", description = "run against an offline table")
+    boolean offline = false;
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(UniqueColumns.class.getName(), args);
+
+    String jobName = this.getClass().getSimpleName() + "_" + System.currentTimeMillis();
+
+    Job job = Job.getInstance(getConf());
+    job.setJobName(jobName);
+    job.setJarByClass(this.getClass());
+
+    String clone = opts.getTableName();
+    Connector conn = null;
+
+    opts.setAccumuloConfigs(job);
+
+    if (opts.offline) {
+      /*
+       * this example clones the table and takes it offline. If you plan to run map reduce jobs over a table many times, it may be more efficient to compact the
+       * table, clone it, and then keep using the same clone as input for map reduce.
+       */
+
+      conn = opts.getConnector();
+      clone = opts.getTableName() + "_" + jobName;
+      conn.tableOperations().clone(opts.getTableName(), clone, true, new HashMap<String,String>(), new HashSet<String>());
+      conn.tableOperations().offline(clone);
+
+      AccumuloInputFormat.setOfflineTableScan(job, true);
+      AccumuloInputFormat.setInputTableName(job, clone);
+    }
+
+    job.setInputFormatClass(AccumuloInputFormat.class);
+
+    job.setMapperClass(UMapper.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Text.class);
+
+    job.setCombinerClass(UReducer.class);
+    job.setReducerClass(UReducer.class);
+
+    job.setNumReduceTasks(opts.reducers);
+
+    job.setOutputFormatClass(TextOutputFormat.class);
+    TextOutputFormat.setOutputPath(job, new Path(opts.output));
+
+    job.waitForCompletion(true);
+
+    if (opts.offline) {
+      conn.tableOperations().delete(clone);
+    }
+
+    return job.isSuccessful() ? 0 : 1;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new UniqueColumns(), args);
+    System.exit(res);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/main/java/org/apache/accumulo/examples/mapreduce/WordCount.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/accumulo/examples/mapreduce/WordCount.java b/src/main/java/org/apache/accumulo/examples/mapreduce/WordCount.java
new file mode 100644
index 0000000..859449d
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/examples/mapreduce/WordCount.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.cli.MapReduceClientOnRequiredTable;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * A simple map reduce job that inserts word counts into accumulo. See the README for instructions on how to run this.
+ *
+ */
+public class WordCount extends Configured implements Tool {
+
+  private static final Logger log = LoggerFactory.getLogger(WordCount.class);
+
+  static class Opts extends MapReduceClientOnRequiredTable {
+    @Parameter(names = "--input", description = "input directory")
+    String inputDirectory;
+  }
+
+  public static class MapClass extends Mapper<LongWritable,Text,Text,Mutation> {
+    @Override
+    public void map(LongWritable key, Text value, Context output) throws IOException {
+      String[] words = value.toString().split("\\s+");
+
+      for (String word : words) {
+
+        Mutation mutation = new Mutation(new Text(word));
+        mutation.put(new Text("count"), new Text("20080906"), new Value("1".getBytes()));
+
+        try {
+          output.write(null, mutation);
+        } catch (InterruptedException e) {
+          log.error("Could not write mutation to Context.", e);
+        }
+      }
+    }
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(WordCount.class.getName(), args);
+
+    Job job = Job.getInstance(getConf());
+    job.setJobName(WordCount.class.getName());
+    job.setJarByClass(this.getClass());
+
+    job.setInputFormatClass(TextInputFormat.class);
+    TextInputFormat.setInputPaths(job, new Path(opts.inputDirectory));
+
+    job.setMapperClass(MapClass.class);
+
+    job.setNumReduceTasks(0);
+
+    job.setOutputFormatClass(AccumuloOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Mutation.class);
+    opts.setAccumuloConfigs(job);
+    job.waitForCompletion(true);
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new WordCount(), args);
+  }
+}


Mime
View raw message