phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mujt...@apache.org
Subject [09/15] Rename package from org.apache.hadoop.hbase.index.* to org.apache.phoenix.index.* to fix classloader issue causing mutable index performance regression - https://issues.apache.org/jira/browse/PHOENIX-38
Date Sat, 15 Feb 2014 00:07:42 GMT
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexUpdate.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexUpdate.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexUpdate.java
new file mode 100644
index 0000000..e3132d6
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexUpdate.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered;
+
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
+
+/**
+ * Update to make to the index table.
+ */
+public class IndexUpdate {
+  Mutation update;
+  byte[] tableName;
+  ColumnTracker columns;
+
+  IndexUpdate(ColumnTracker tracker) {
+    this.columns = tracker;
+  }
+
+  public void setUpdate(Mutation p) {
+    this.update = p;
+  }
+
+  public void setTable(byte[] tableName) {
+    this.tableName = tableName;
+  }
+
+  public Mutation getUpdate() {
+    return update;
+  }
+
+  public byte[] getTableName() {
+    return tableName;
+  }
+
+  public ColumnTracker getIndexedColumns() {
+    return columns;
+  }
+
+  @Override
+  public String toString() {
+    return "IndexUpdate: \n\ttable - " + Bytes.toString(tableName) + "\n\tupdate: " + update
+        + "\n\tcolumns: " + columns;
+  }
+
+  public static IndexUpdate createIndexUpdateForTesting(ColumnTracker tracker, byte[] table, Put p) {
+    IndexUpdate update = new IndexUpdate(tracker);
+    update.setTable(table);
+    update.setUpdate(p);
+    return update;
+  }
+
+  /**
+   * @return <tt>true</tt> if the necessary state for a valid index update has been set.
+   */
+  public boolean isValid() {
+    return this.tableName != null && this.update != null;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/KeyValueStore.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/KeyValueStore.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/KeyValueStore.java
new file mode 100644
index 0000000..39f9062
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/KeyValueStore.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+
+/**
+ * Store a collection of KeyValues in memory.
+ */
+public interface KeyValueStore {
+
+  public void add(KeyValue kv, boolean overwrite);
+
+  public KeyValueScanner getScanner();
+
+  public void rollback(KeyValue kv);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
new file mode 100644
index 0000000..cb21a00
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.util.Pair;
+
+import org.apache.phoenix.hbase.index.covered.data.IndexMemStore;
+import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
+import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
+import org.apache.phoenix.hbase.index.scanner.Scanner;
+import org.apache.phoenix.hbase.index.scanner.ScannerBuilder;
+
+/**
+ * Manage the state of the HRegion's view of the table, for the single row.
+ * <p>
+ * Currently, this is a single-use object - you need to create a new one for each row that you need
+ * to manage. In the future, we could make this object reusable, but for the moment its easier to
+ * manage as a throw-away object.
+ * <p>
+ * This class is <b>not</b> thread-safe - it requires external synchronization is access
+ * concurrently.
+ */
+public class LocalTableState implements TableState {
+
+  private long ts;
+  private RegionCoprocessorEnvironment env;
+  private KeyValueStore memstore;
+  private LocalHBaseState table;
+  private Mutation update;
+  private Set<ColumnTracker> trackedColumns = new HashSet<ColumnTracker>();
+  private ScannerBuilder scannerBuilder;
+  private List<KeyValue> kvs = new ArrayList<KeyValue>();
+  private List<? extends IndexedColumnGroup> hints;
+  private CoveredColumns columnSet;
+
+  public LocalTableState(RegionCoprocessorEnvironment environment, LocalHBaseState table, Mutation update) {
+    this.env = environment;
+    this.table = table;
+    this.update = update;
+    this.memstore = new IndexMemStore();
+    this.scannerBuilder = new ScannerBuilder(memstore, update);
+    this.columnSet = new CoveredColumns();
+  }
+
+  public void addPendingUpdates(KeyValue... kvs) {
+    if (kvs == null) return;
+    addPendingUpdates(Arrays.asList(kvs));
+  }
+
+  public void addPendingUpdates(List<KeyValue> kvs) {
+    if(kvs == null) return;
+    setPendingUpdates(kvs);
+    addUpdate(kvs);
+  }
+
+  private void addUpdate(List<KeyValue> list) {
+    addUpdate(list, true);
+  }
+
+  private void addUpdate(List<KeyValue> list, boolean overwrite) {
+    if (list == null) return;
+    for (KeyValue kv : list) {
+      this.memstore.add(kv, overwrite);
+    }
+  }
+
+  @Override
+  public RegionCoprocessorEnvironment getEnvironment() {
+    return this.env;
+  }
+
+  @Override
+  public long getCurrentTimestamp() {
+    return this.ts;
+  }
+
+  @Override
+  public void setCurrentTimestamp(long timestamp) {
+    this.ts = timestamp;
+  }
+
+  public void resetTrackedColumns() {
+    this.trackedColumns.clear();
+  }
+
+  public Set<ColumnTracker> getTrackedColumns() {
+    return this.trackedColumns;
+  }
+
+  @Override
+  public Pair<Scanner, IndexUpdate> getIndexedColumnsTableState(
+      Collection<? extends ColumnReference> indexedColumns) throws IOException {
+    ensureLocalStateInitialized(indexedColumns);
+    // filter out things with a newer timestamp and track the column references to which it applies
+    ColumnTracker tracker = new ColumnTracker(indexedColumns);
+    synchronized (this.trackedColumns) {
+      // we haven't seen this set of columns before, so we need to create a new tracker
+      if (!this.trackedColumns.contains(tracker)) {
+        this.trackedColumns.add(tracker);
+      }
+    }
+
+    Scanner scanner =
+        this.scannerBuilder.buildIndexedColumnScanner(indexedColumns, tracker, ts);
+
+    return new Pair<Scanner, IndexUpdate>(scanner, new IndexUpdate(tracker));
+  }
+
+  /**
+   * Initialize the managed local state. Generally, this will only be called by
+   * {@link #getNonIndexedColumnsTableState(List)}, which is unlikely to be called concurrently from the outside.
+   * Even then, there is still fairly low contention as each new Put/Delete will have its own table
+   * state.
+   */
+  private synchronized void ensureLocalStateInitialized(
+      Collection<? extends ColumnReference> columns) throws IOException {
+    // check to see if we haven't initialized any columns yet
+    Collection<? extends ColumnReference> toCover = this.columnSet.findNonCoveredColumns(columns);
+    // we have all the columns loaded, so we are good to go.
+    if (toCover.isEmpty()) {
+      return;
+    }
+
+    // add the current state of the row
+    this.addUpdate(this.table.getCurrentRowState(update, toCover).list(), false);
+
+    // add the covered columns to the set
+    for (ColumnReference ref : toCover) {
+      this.columnSet.addColumn(ref);
+    }
+  }
+
+  @Override
+  public Map<String, byte[]> getUpdateAttributes() {
+    return this.update.getAttributesMap();
+  }
+
+  @Override
+  public byte[] getCurrentRowKey() {
+    return this.update.getRow();
+  }
+
+  public Result getCurrentRowState() {
+    KeyValueScanner scanner = this.memstore.getScanner();
+    List<KeyValue> kvs = new ArrayList<KeyValue>();
+    while (scanner.peek() != null) {
+      try {
+        kvs.add(scanner.next());
+      } catch (IOException e) {
+        // this should never happen - something has gone terribly arwy if it has
+        throw new RuntimeException("Local MemStore threw IOException!");
+      }
+    }
+    return new Result(kvs);
+  }
+
+  /**
+   * Helper to add a {@link Mutation} to the values stored for the current row
+   * @param pendingUpdate update to apply
+   */
+  public void addUpdateForTesting(Mutation pendingUpdate) {
+    for (Map.Entry<byte[], List<KeyValue>> e : pendingUpdate.getFamilyMap().entrySet()) {
+      List<KeyValue> edits = e.getValue();
+      addUpdate(edits);
+    }
+  }
+
+  /**
+   * @param hints
+   */
+  public void setHints(List<? extends IndexedColumnGroup> hints) {
+    this.hints = hints;
+  }
+
+  @Override
+  public List<? extends IndexedColumnGroup> getIndexColumnHints() {
+    return this.hints;
+  }
+
+  @Override
+  public Collection<KeyValue> getPendingUpdate() {
+    return this.kvs;
+  }
+
+  /**
+   * Set the {@link KeyValue}s in the update for which we are currently building an index update,
+   * but don't actually apply them.
+   * @param update pending {@link KeyValue}s
+   */
+  public void setPendingUpdates(Collection<KeyValue> update) {
+    this.kvs.clear();
+    this.kvs.addAll(update);
+  }
+
+  /**
+   * Apply the {@link KeyValue}s set in {@link #setPendingUpdates(Collection)}.
+   */
+  public void applyPendingUpdates() {
+    this.addUpdate(kvs);
+  }
+
+  /**
+   * Rollback all the given values from the underlying state.
+   * @param values
+   */
+  public void rollback(Collection<KeyValue> values) {
+    for (KeyValue kv : values) {
+      this.memstore.rollback(kv);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
new file mode 100644
index 0000000..4c4d0b0
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/TableState.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hbase.index.covered;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Pair;
+
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
+import org.apache.phoenix.hbase.index.scanner.Scanner;
+
+/**
+ * Interface for the current state of the table. This is generally going to be as of a timestamp - a
+ * view on the current state of the HBase table - so you don't have to worry about exposing too much
+ * information.
+ */
+public interface TableState {
+
+  // use this to get batch ids/ptable stuff
+  /**
+   * WARNING: messing with this can affect the indexing plumbing. Use with caution :)
+   * @return get the current environment in which this table lives.
+   */
+  public RegionCoprocessorEnvironment getEnvironment();
+
+  /**
+   * @return the current timestamp up-to-which we are releasing table state.
+   */
+  public long getCurrentTimestamp();
+
+  /**
+   * Set the current timestamp up to which the table should allow access to the underlying table.
+   * This overrides the timestamp view provided by the indexer - use with care!
+   * @param timestamp timestamp up to which the table should allow access.
+   */
+  public void setCurrentTimestamp(long timestamp);
+
+  /**
+   * @return the attributes attached to the current update (e.g. {@link Mutation}).
+   */
+  public Map<String, byte[]> getUpdateAttributes();
+
+  /**
+   * Get a scanner on the columns that are needed by the index.
+   * <p>
+   * The returned scanner is already pre-seeked to the first {@link KeyValue} that matches the given
+   * columns with a timestamp earlier than the timestamp to which the table is currently set (the
+   * current state of the table for which we need to build an update).
+   * <p>
+   * If none of the passed columns matches any of the columns in the pending update (as determined
+   * by {@link ColumnReference#matchesFamily(byte[])} and
+   * {@link ColumnReference#matchesQualifier(byte[])}, then an empty scanner will be returned. This
+   * is because it doesn't make sense to build index updates when there is no change in the table
+   * state for any of the columns you are indexing.
+   * <p>
+   * <i>NOTE:</i> This method should <b>not</b> be used during
+   * {@link IndexCodec#getIndexDeletes(TableState)} as the pending update will not yet have been
+   * applied - you are merely attempting to cleanup the current state and therefore do <i>not</i>
+   * need to track the indexed columns.
+   * <p>
+   * As a side-effect, we update a timestamp for the next-most-recent timestamp for the columns you
+   * request - you will never see a column with the timestamp we are tracking, but the next oldest
+   * timestamp for that column.
+   * @param indexedColumns the columns to that will be indexed
+   * @return an iterator over the columns and the {@link IndexUpdate} that should be passed back to
+   *         the builder. Even if no update is necessary for the requested columns, you still need
+   *         to return the {@link IndexUpdate}, just don't set the update for the
+   *         {@link IndexUpdate}.
+   * @throws IOException
+   */
+  Pair<Scanner, IndexUpdate> getIndexedColumnsTableState(
+      Collection<? extends ColumnReference> indexedColumns) throws IOException;
+
+  /**
+   * @return the row key for the current row for which we are building an index update.
+   */
+  byte[] getCurrentRowKey();
+
+  /**
+   * Get the 'hint' for the columns that were indexed last time for the same set of keyvalues.
+   * Generally, this will only be used when fixing up a 'back in time' put or delete as we need to
+   * fix up all the indexes that reference the changed columns.
+   * @return the hint the index columns that were queried on the last iteration for the changed
+   *         column
+   */
+  List<? extends IndexedColumnGroup> getIndexColumnHints();
+
+  /**
+   * Can be used to help the codec to determine which columns it should attempt to index.
+   * @return the keyvalues in the pending update to the table.
+   */
+  Collection<KeyValue> getPendingUpdate();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/IndexMemStore.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/IndexMemStore.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/IndexMemStore.java
new file mode 100644
index 0000000..3d114c5
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/IndexMemStore.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered.data;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.SortedSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KeyComparator;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.IndexKeyValueSkipListSet;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.MemStore;
+import org.apache.hadoop.hbase.regionserver.NonLazyKeyValueScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.phoenix.hbase.index.covered.KeyValueStore;
+import org.apache.phoenix.hbase.index.covered.LocalTableState;
+
+/**
+ * Like the HBase {@link MemStore}, but without all that extra work around maintaining snapshots and
+ * sizing (for right now). We still support the concurrent access (in case indexes are built in
+ * parallel).
+ * <p>
+ * 
+ We basically wrap a KeyValueSkipListSet, just like a regular MemStore, except we are:
+ * <ol>
+ *  <li>not dealing with
+ *    <ul>
+ *      <li>space considerations</li>
+ *      <li>a snapshot set</li>
+ *    </ul>
+ *  </li>
+ *  <li>ignoring memstore timestamps in favor of deciding when we want to overwrite keys based on how
+ *    we obtain them</li>
+ *   <li>ignoring time range updates (so 
+ *    {@link KeyValueScanner#shouldUseScanner(Scan, SortedSet, long)} isn't supported from 
+ *    {@link #getScanner()}).</li>
+ * </ol>
+ * <p>
+ * We can ignore the memstore timestamps because we know that anything we get from the local region
+ * is going to be MVCC visible - so it should just go in. However, we also want overwrite any
+ * existing state with our pending write that we are indexing, so that needs to clobber the KVs we
+ * get from the HRegion. This got really messy with a regular memstore as each KV from the MemStore
+ * frequently has a higher MemStoreTS, but we can't just up the pending KVs' MemStoreTs because a
+ * memstore relies on the MVCC readpoint, which generally is less than {@link Long#MAX_VALUE}.
+ * <p>
+ * By realizing that we don't need the snapshot or space requirements, we can go much faster than
+ * the previous implementation. Further, by being smart about how we manage the KVs, we can drop the
+ * extra object creation we were doing to wrap the pending KVs (which we did previously to ensure
+ * they sorted before the ones we got from the HRegion). We overwrite {@link KeyValue}s when we add
+ * them from external sources {@link #add(KeyValue, boolean)}, but then don't overwrite existing
+ * keyvalues when read them from the underlying table (because pending keyvalues should always
+ * overwrite current ones) - this logic is all contained in LocalTableState.
+ * @see LocalTableState
+ */
+public class IndexMemStore implements KeyValueStore {
+
+  private static final Log LOG = LogFactory.getLog(IndexMemStore.class);
+  private IndexKeyValueSkipListSet kvset;
+  private Comparator<KeyValue> comparator;
+
+  /**
+   * Compare two {@link KeyValue}s based only on their row keys. Similar to the standard
+   * {@link KeyValue#COMPARATOR}, but doesn't take into consideration the memstore timestamps. We
+   * instead manage which KeyValue to retain based on how its loaded here
+   */
+  public static final Comparator<KeyValue> COMPARATOR = new Comparator<KeyValue>() {
+
+    private final KeyComparator rawcomparator = new KeyComparator();
+
+    @Override
+    public int compare(final KeyValue left, final KeyValue right) {
+      return rawcomparator.compare(left.getBuffer(), left.getOffset() + KeyValue.ROW_OFFSET,
+        left.getKeyLength(), right.getBuffer(), right.getOffset() + KeyValue.ROW_OFFSET,
+        right.getKeyLength());
+    }
+  };
+
+  public IndexMemStore() {
+    this(COMPARATOR);
+  }
+
+  /**
+   * Create a store with the given comparator. This comparator is used to determine both sort order
+   * <b>as well as equality of {@link KeyValue}s</b>.
+   * <p>
+   * Exposed for subclassing/testing.
+   * @param comparator to use
+   */
+  IndexMemStore(Comparator<KeyValue> comparator) {
+    this.comparator = comparator;
+    this.kvset = IndexKeyValueSkipListSet.create(comparator);
+  }
+
+  @Override
+  public void add(KeyValue kv, boolean overwrite) {
+    if (LOG.isDebugEnabled()) {
+      LOG.info("Inserting: " + toString(kv));
+    }
+    // if overwriting, we will always update
+    if (!overwrite) {
+      // null if there was no previous value, so we added the kv
+      kvset.putIfAbsent(kv);
+    } else {
+      kvset.add(kv);
+    }
+
+    if (LOG.isTraceEnabled()) {
+      dump();
+    }
+  }
+
+  private void dump() {
+    LOG.trace("Current kv state:\n");
+    for (KeyValue kv : this.kvset) {
+      LOG.trace("KV: " + toString(kv));
+    }
+    LOG.trace("========== END MemStore Dump ==================\n");
+  }
+
+  private String toString(KeyValue kv) {
+    return kv.toString() + "/value=" + Bytes.toString(kv.getValue());
+  }
+
+  @Override
+  public void rollback(KeyValue kv) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Rolling back: " + toString(kv));
+    }
+    // If the key is in the store, delete it
+    this.kvset.remove(kv);
+    if (LOG.isTraceEnabled()) {
+      dump();
+    }
+  }
+
+  @Override
+  public KeyValueScanner getScanner() {
+    return new MemStoreScanner();
+  }
+
+  /*
+   * MemStoreScanner implements the KeyValueScanner. It lets the caller scan the contents of a
+   * memstore -- both current map and snapshot. This behaves as if it were a real scanner but does
+   * not maintain position.
+   */
+  // This class is adapted from org.apache.hadoop.hbase.MemStore.MemStoreScanner, HBase 0.94.12
+  // It does basically the same thing as the MemStoreScanner, but it only keeps track of a single
+  // set, rather than a primary and a secondary set of KeyValues.
+  protected class MemStoreScanner extends NonLazyKeyValueScanner {
+    // Next row information for the set
+    private KeyValue nextRow = null;
+
+    // last iterated KVs for kvset and snapshot (to restore iterator state after reseek)
+    private KeyValue kvsetItRow = null;
+
+    // iterator based scanning.
+    private Iterator<KeyValue> kvsetIt;
+
+    // The kvset at the time of creating this scanner
+    volatile IndexKeyValueSkipListSet kvsetAtCreation;
+
+    MemStoreScanner() {
+      super();
+      kvsetAtCreation = kvset;
+    }
+
+    private KeyValue getNext(Iterator<KeyValue> it) {
+      // in the original implementation we cared about the current thread's readpoint from MVCC.
+      // However, we don't need to worry here because everything the index can see, is also visible
+      // to the client (or is the pending primary table update, so it will be once the index is
+      // written, so it might as well be).
+      KeyValue v = null;
+      try {
+        while (it.hasNext()) {
+          v = it.next();
+          return v;
+        }
+
+        return null;
+      } finally {
+        if (v != null) {
+          kvsetItRow = v;
+        }
+      }
+    }
+
+    /**
+     * Set the scanner at the seek key. Must be called only once: there is no thread safety between
+     * the scanner and the memStore.
+     * @param key seek value
+     * @return false if the key is null or if there is no data
+     */
+    @Override
+    public synchronized boolean seek(KeyValue key) {
+      if (key == null) {
+        close();
+        return false;
+      }
+
+      // kvset and snapshot will never be null.
+      // if tailSet can't find anything, SortedSet is empty (not null).
+      kvsetIt = kvsetAtCreation.tailSet(key).iterator();
+      kvsetItRow = null;
+
+      return seekInSubLists(key);
+    }
+
+    /**
+     * (Re)initialize the iterators after a seek or a reseek.
+     */
+    private synchronized boolean seekInSubLists(KeyValue key) {
+      nextRow = getNext(kvsetIt);
+      return nextRow != null;
+    }
+
+    /**
+     * Move forward on the sub-lists set previously by seek.
+     * @param key seek value (should be non-null)
+     * @return true if there is at least one KV to read, false otherwise
+     */
+    @Override
+    public synchronized boolean reseek(KeyValue key) {
+      /*
+       * See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation. This
+       * code is executed concurrently with flush and puts, without locks. Two points must be known
+       * when working on this code: 1) It's not possible to use the 'kvTail' and 'snapshot'
+       * variables, as they are modified during a flush. 2) The ideal implementation for performance
+       * would use the sub skip list implicitly pointed by the iterators 'kvsetIt' and 'snapshotIt'.
+       * Unfortunately the Java API does not offer a method to get it. So we remember the last keys
+       * we iterated to and restore the reseeked set to at least that point.
+       */
+
+      kvsetIt = kvsetAtCreation.tailSet(getHighest(key, kvsetItRow)).iterator();
+      return seekInSubLists(key);
+    }
+
+    /*
+     * Returns the higher of the two key values, or null if they are both null. This uses
+     * comparator.compare() to compare the KeyValue using the memstore comparator.
+     */
+    private KeyValue getHighest(KeyValue first, KeyValue second) {
+      if (first == null && second == null) {
+        return null;
+      }
+      if (first != null && second != null) {
+        int compare = comparator.compare(first, second);
+        return (compare > 0 ? first : second);
+      }
+      return (first != null ? first : second);
+    }
+
+    @Override
+    public synchronized KeyValue peek() {
+      // DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest());
+      return nextRow;
+    }
+
+    @Override
+    public synchronized KeyValue next() {
+      if (nextRow == null) {
+        return null;
+      }
+
+      final KeyValue ret = nextRow;
+
+      // Advance the iterators
+      nextRow = getNext(kvsetIt);
+
+      return ret;
+    }
+
+    @Override
+    public synchronized void close() {
+      this.nextRow = null;
+      this.kvsetIt = null;
+      this.kvsetItRow = null;
+    }
+
+    /**
+     * MemStoreScanner returns max value as sequence id because it will always have the latest data
+     * among all files.
+     */
+    @Override
+    public long getSequenceID() {
+      return Long.MAX_VALUE;
+    }
+
+    @Override
+    public boolean shouldUseScanner(Scan scan, SortedSet<byte[]> columns, long oldestUnexpiredTS) {
+      throw new UnsupportedOperationException(this.getClass().getName()
+          + " doesn't support checking to see if it should use a scanner!");
+    }
+
+    /*
+    @Override
+    public boolean backwardSeek(KeyValue arg0) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean seekToLastRow() throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean seekToPreviousRow(KeyValue arg0) throws IOException {
+        throw new UnsupportedOperationException();
+    }
+    */
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
new file mode 100644
index 0000000..43c4028
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered.data;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.KeyValue;
+
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.scanner.Scanner;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ * {@link ValueGetter} that uses lazy initialization to get the value for the given
+ * {@link ColumnReference}. Once stored, the mapping for that reference is retained.
+ */
+public class LazyValueGetter implements ValueGetter {
+
+  private Scanner scan;
+  private volatile Map<ColumnReference, ImmutableBytesPtr> values;
+  private byte[] row;
+  
+  /**
+   * Back the getter with a {@link Scanner} to actually access the local data.
+   * @param scan backing scanner
+   * @param currentRow row key for the row to seek in the scanner
+   */
+  public LazyValueGetter(Scanner scan, byte[] currentRow) {
+    this.scan = scan;
+    this.row = currentRow;
+  }
+
+  @Override
+  public ImmutableBytesPtr getLatestValue(ColumnReference ref) throws IOException {
+    // ensure we have a backing map
+    if (values == null) {
+      synchronized (this) {
+        values = Collections.synchronizedMap(new HashMap<ColumnReference, ImmutableBytesPtr>());
+      }
+    }
+
+    // check the value in the map
+    ImmutableBytesPtr value = values.get(ref);
+    if (value == null) {
+      value = get(ref);
+      values.put(ref, value);
+    }
+
+    return value;
+  }
+
+  /**
+   * @param ref
+   * @return the first value on the scanner for the given column
+   */
+  private ImmutableBytesPtr get(ColumnReference ref) throws IOException {
+    KeyValue first = ref.getFirstKeyValueForRow(row);
+    if (!scan.seek(first)) {
+      return null;
+    }
+    // there is a next value - we only care about the current value, so we can just snag that
+    KeyValue next = scan.next();
+    if (ref.matches(next)) {
+      return new ImmutableBytesPtr(next.getBuffer(), next.getValueOffset(), next.getValueLength());
+    }
+    return null;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
new file mode 100644
index 0000000..6d20c18
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalHBaseState.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered.data;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+
+/**
+ * Access the current state of the row in the local HBase table, given a mutation
+ */
+public interface LocalHBaseState {
+
+  /**
+   * @param m mutation for which we should get the current table state
+   * @param toCover all the columns the current row state needs to cover; hint the underlying lookup
+   *          to save getting all the columns for the row
+   * @return the full state of the given row. Includes all current versions (even if they are not
+   *         usually visible to the client (unless they are also doing a raw scan)). Never returns a
+   *         <tt>null</tt> {@link Result} - instead, when there is not data for the row, returns a
+   *         {@link Result} with no stored {@link KeyValue}s.
+   * @throws IOException if there is an issue reading the row
+   */
+  public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> toCover)
+      throws IOException;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
new file mode 100644
index 0000000..f2f247e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered.data;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+
+/**
+ * Wrapper around a lazily instantiated, local HTable.
+ * <p>
+ * Previously, we had used various row and batch caches. However, this ends up being very
+ * complicated when attempting manage updating and invalidating the cache with no real gain as any
+ * row accessed multiple times will likely be in HBase's block cache, invalidating any extra caching
+ * we are doing here. In the end, its simpler and about as efficient to just get the current state
+ * of the row from HBase and let HBase manage caching the row from disk on its own.
+ */
+public class LocalTable implements LocalHBaseState {
+
+  private RegionCoprocessorEnvironment env;
+
+  public LocalTable(RegionCoprocessorEnvironment env) {
+    this.env = env;
+  }
+
+  @Override
+  public Result getCurrentRowState(Mutation m, Collection<? extends ColumnReference> columns)
+      throws IOException {
+    byte[] row = m.getRow();
+    // need to use a scan here so we can get raw state, which Get doesn't provide.
+    Scan s = IndexManagementUtil.newLocalStateScan(Collections.singletonList(columns));
+    s.setStartRow(row);
+    s.setStopRow(row);
+    HRegion region = this.env.getRegion();
+    RegionScanner scanner = region.getScanner(s);
+    List<KeyValue> kvs = new ArrayList<KeyValue>(1);
+    boolean more = scanner.next(kvs);
+    assert !more : "Got more than one result when scanning" + " a single row in the primary table!";
+
+    Result r = new Result(kvs);
+    scanner.close();
+    return r;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/ColumnGroup.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/ColumnGroup.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/ColumnGroup.java
new file mode 100644
index 0000000..ba2b092
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/ColumnGroup.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hbase.index.covered.example;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+
+/**
+ * A collection of {@link CoveredColumn}s that should be included in a covered index.
+ */
+public class ColumnGroup implements Iterable<CoveredColumn> {
+
+  private List<CoveredColumn> columns = new ArrayList<CoveredColumn>();
+  private String table;
+
+  public ColumnGroup(String tableName) {
+    this.table = tableName;
+  }
+
+  public void add(CoveredColumn column) {
+    this.columns.add(column);
+  }
+
+  public String getTable() {
+    return table;
+  }
+
+  /**
+   * Check to see if any {@link CoveredColumn} in <tt>this</tt> matches the given family
+   * @param family to check
+   * @return <tt>true</tt> if any column covers the family
+   */
+  public boolean matches(String family) {
+    for (CoveredColumn column : columns) {
+      if (column.matchesFamily(family)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  /**
+   * Check to see if any column matches the family/qualifier pair
+   * @param family family to match against
+   * @param qualifier qualifier to match, can be <tt>null</tt>, in which case we match all
+   *          qualifiers
+   * @return <tt>true</tt> if any column matches, <tt>false</tt> otherwise
+   */
+  public boolean matches(byte[] family, byte[] qualifier) {
+    // families are always printable characters
+    String fam = Bytes.toString(family);
+    for (CoveredColumn column : columns) {
+      if (column.matchesFamily(fam)) {
+        // check the qualifier
+          if (column.matchesQualifier(qualifier)) {
+            return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
+   * @return the number of columns in the group
+   */
+  public int size() {
+    return this.columns.size();
+  }
+
+  @Override
+  public Iterator<CoveredColumn> iterator() {
+    return columns.iterator();
+  }
+
+  /**
+   * @param index index of the column to get
+   * @return the specified column
+   */
+  public CoveredColumn getColumnForTesting(int index) {
+    return this.columns.get(index);
+  }
+
+  @Override
+  public String toString() {
+    return "ColumnGroup - table: " + table + ", columns: " + columns;
+  }
+
+  public List<CoveredColumn> getColumns() {
+    return this.columns;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumn.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumn.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumn.java
new file mode 100644
index 0000000..5c6989f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumn.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hbase.index.covered.example;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+
+/**
+ * A single Column (either a Column Family or a full Family:Qualifier pair) in a {@link ColumnGroup}
+ * . If no column qualifier is specified (null), matches all known qualifiers of the family.
+ */
+public class CoveredColumn extends ColumnReference {
+
+  public static final String SEPARATOR = ":";
+  String familyString;
+  private final int hashCode;
+
+  public CoveredColumn(byte[] family, byte[] qualifier){
+    this(Bytes.toString(family), qualifier);
+  }
+
+  public CoveredColumn(String family, byte[] qualifier) {
+    super(Bytes.toBytes(family), qualifier == null ? ColumnReference.ALL_QUALIFIERS : qualifier);
+    this.familyString = family;
+    this.hashCode = calcHashCode(family, qualifier);
+  }
+
+  public static CoveredColumn parse(String spec) {
+    int sep = spec.indexOf(SEPARATOR);
+    if (sep < 0) {
+      throw new IllegalArgumentException(spec + " is not a valid specifier!");
+    }
+    String family = spec.substring(0, sep);
+    String qual = spec.substring(sep + 1);
+    byte[] column = qual.length() == 0 ? null : Bytes.toBytes(qual);
+    return new CoveredColumn(family, column);
+  }
+
+  public String serialize() {
+    return CoveredColumn.serialize(familyString, qualifier);
+  }
+
+  public static String serialize(String first, byte[] second) {
+    String nextValue = first + CoveredColumn.SEPARATOR;
+    if (second != null) {
+      nextValue += Bytes.toString(second);
+    }
+    return nextValue;
+  }
+
+  /**
+   * @param family2 to check
+   * @return <tt>true</tt> if the passed family matches the family this column covers
+   */
+  public boolean matchesFamily(String family2) {
+    return this.familyString.equals(family2);
+  }
+
+  @Override
+  public int hashCode() {
+    return hashCode;
+  }
+
+  private static int calcHashCode(String familyString, byte[] qualifier) {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + familyString.hashCode();
+    if (qualifier != null) {
+      result = prime * result + Bytes.hashCode(qualifier);
+    }
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) return true;
+    if (!super.equals(obj)) return false;
+    if (getClass() != obj.getClass()) return false;
+    CoveredColumn other = (CoveredColumn) obj;
+    if (hashCode != other.hashCode) return false;
+    if (!familyString.equals(other.familyString)) return false;
+    return Bytes.equals(qualifier, other.qualifier);
+  }
+
+  @Override
+  public String toString() {
+    String qualString = qualifier == null ? "null" : Bytes.toString(qualifier);
+    return "CoveredColumn:[" + familyString + ":" + qualString + "]";
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
new file mode 100644
index 0000000..691ab08
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.covered.example;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.collect.Lists;
+import org.apache.phoenix.hbase.index.covered.IndexUpdate;
+import org.apache.phoenix.hbase.index.covered.TableState;
+import org.apache.phoenix.hbase.index.scanner.Scanner;
+import org.apache.phoenix.index.BaseIndexCodec;
+
+/**
+ *
+ */
+public class CoveredColumnIndexCodec extends BaseIndexCodec {
+
+  private static final byte[] EMPTY_BYTES = new byte[0];
+  public static final byte[] INDEX_ROW_COLUMN_FAMILY = Bytes.toBytes("INDEXED_COLUMNS");
+
+  private List<ColumnGroup> groups;
+
+  /**
+   * @param groups to initialize the codec with
+   * @return an instance that is initialized with the given {@link ColumnGroup}s, for testing
+   *         purposes
+   */
+  public static CoveredColumnIndexCodec getCodecForTesting(List<ColumnGroup> groups) {
+    CoveredColumnIndexCodec codec = new CoveredColumnIndexCodec();
+    codec.groups = Lists.newArrayList(groups);
+    return codec;
+  }
+
+  @Override
+  public void initialize(RegionCoprocessorEnvironment env) {
+    groups = CoveredColumnIndexSpecifierBuilder.getColumns(env.getConfiguration());
+  }
+
+  @Override
+  public Iterable<IndexUpdate> getIndexUpserts(TableState state) {
+    List<IndexUpdate> updates = new ArrayList<IndexUpdate>();
+    for (ColumnGroup group : groups) {
+      IndexUpdate update = getIndexUpdateForGroup(group, state);
+      updates.add(update);
+    }
+    return updates;
+  }
+
+  /**
+   * @param group
+   * @param state
+   * @return the update that should be made to the table
+   */
+  private IndexUpdate getIndexUpdateForGroup(ColumnGroup group, TableState state) {
+    List<CoveredColumn> refs = group.getColumns();
+    try {
+      Pair<Scanner, IndexUpdate> stateInfo = state.getIndexedColumnsTableState(refs);
+      Scanner kvs = stateInfo.getFirst();
+      Pair<Integer, List<ColumnEntry>> columns =
+          getNextEntries(refs, kvs, state.getCurrentRowKey());
+      // make sure we close the scanner
+      kvs.close();
+      if (columns.getFirst().intValue() == 0) {
+        return stateInfo.getSecond();
+      }
+      // have all the column entries, so just turn it into a Delete for the row
+      // convert the entries to the needed values
+      byte[] rowKey =
+          composeRowKey(state.getCurrentRowKey(), columns.getFirst(), columns.getSecond());
+      Put p = new Put(rowKey, state.getCurrentTimestamp());
+      // add the columns to the put
+      addColumnsToPut(p, columns.getSecond());
+
+      // update the index info
+      IndexUpdate update = stateInfo.getSecond();
+      update.setTable(Bytes.toBytes(group.getTable()));
+      update.setUpdate(p);
+      return update;
+    } catch (IOException e) {
+      throw new RuntimeException("Unexpected exception when getting state for columns: " + refs);
+    }
+  }
+
+  private static void addColumnsToPut(Put indexInsert, List<ColumnEntry> columns) {
+    // add each of the corresponding families to the put
+    int count = 0;
+    for (ColumnEntry column : columns) {
+      indexInsert.add(INDEX_ROW_COLUMN_FAMILY,
+        ArrayUtils.addAll(Bytes.toBytes(count++), toIndexQualifier(column.ref)), null);
+    }
+  }
+
+  private static byte[] toIndexQualifier(CoveredColumn column) {
+    return ArrayUtils.addAll(Bytes.toBytes(column.familyString + CoveredColumn.SEPARATOR),
+      column.getQualifier());
+  }
+
+  @Override
+  public Iterable<IndexUpdate> getIndexDeletes(TableState state) {
+    List<IndexUpdate> deletes = new ArrayList<IndexUpdate>();
+    for (ColumnGroup group : groups) {
+      deletes.add(getDeleteForGroup(group, state));
+    }
+    return deletes;
+  }
+
+
+  /**
+   * Get all the deletes necessary for a group of columns - logically, the cleanup the index table
+   * for a given index.
+   * @param group index information
+   * @return the cleanup for the given index, or <tt>null</tt> if no cleanup is necessary
+   */
+  private IndexUpdate getDeleteForGroup(ColumnGroup group, TableState state) {
+    List<CoveredColumn> refs = group.getColumns();
+    try {
+      Pair<Scanner, IndexUpdate> kvs = state.getIndexedColumnsTableState(refs);
+      Pair<Integer, List<ColumnEntry>> columns =
+          getNextEntries(refs, kvs.getFirst(), state.getCurrentRowKey());
+      // make sure we close the scanner reference
+      kvs.getFirst().close();
+      // no change, just return the passed update
+      if (columns.getFirst() == 0) {
+        return kvs.getSecond();
+      }
+      // have all the column entries, so just turn it into a Delete for the row
+      // convert the entries to the needed values
+      byte[] rowKey =
+          composeRowKey(state.getCurrentRowKey(), columns.getFirst(), columns.getSecond());
+      Delete d = new Delete(rowKey);
+      d.setTimestamp(state.getCurrentTimestamp());
+      IndexUpdate update = kvs.getSecond();
+      update.setUpdate(d);
+      update.setTable(Bytes.toBytes(group.getTable()));
+      return update;
+    } catch (IOException e) {
+      throw new RuntimeException("Unexpected exception when getting state for columns: " + refs);
+    }
+  }
+
+  /**
+   * Get the next batch of primary table values for the given columns
+   * @param refs columns to match against
+   * @param state
+   * @return the total length of all values found and the entries to add for the index
+   */
+  private Pair<Integer, List<ColumnEntry>> getNextEntries(List<CoveredColumn> refs, Scanner kvs,
+      byte[] currentRow) throws IOException {
+    int totalValueLength = 0;
+    List<ColumnEntry> entries = new ArrayList<ColumnEntry>(refs.size());
+
+    // pull out the latest state for each column reference, in order
+    for (CoveredColumn ref : refs) {
+      KeyValue first = ref.getFirstKeyValueForRow(currentRow);
+      if (!kvs.seek(first)) {
+        // no more keys, so add a null value
+        entries.add(new ColumnEntry(null, ref));
+        continue;
+      }
+      // there is a next value - we only care about the current value, so we can just snag that
+      KeyValue next = kvs.next();
+      if (ref.matchesFamily(next.getFamily()) && ref.matchesQualifier(next.getQualifier())) {
+        byte[] v = next.getValue();
+        totalValueLength += v.length;
+        entries.add(new ColumnEntry(v, ref));
+      } else {
+        // this first one didn't match at all, so we have to put in a null entry
+        entries.add(new ColumnEntry(null, ref));
+        continue;
+      }
+      // here's where is gets a little tricky - we either need to decide if we should continue
+      // adding entries (matches all qualifiers) or if we are done (matches a single qualifier)
+      if (!ref.allColumns()) {
+        continue;
+      }
+      // matches all columns, so we need to iterate until we hit the next column with the same
+      // family as the current key
+      byte[] lastQual = next.getQualifier();
+      byte[] nextQual = null;
+      while ((next = kvs.next()) != null) {
+        // different family, done with this column
+        if (!ref.matchesFamily(next.getFamily())) {
+          break;
+        }
+        nextQual = next.getQualifier();
+        // we are still on the same qualifier - skip it, since we already added a column for it
+        if (Arrays.equals(lastQual, nextQual)) {
+          continue;
+        }
+        // this must match the qualifier since its an all-qualifiers specifier, so we add it
+        byte[] v = next.getValue();
+        totalValueLength += v.length;
+        entries.add(new ColumnEntry(v, ref));
+        // update the last qualifier to check against
+        lastQual = nextQual;
+      }
+    }
+    return new Pair<Integer, List<ColumnEntry>>(totalValueLength, entries);
+  }
+
+  static class ColumnEntry {
+    byte[] value = EMPTY_BYTES;
+    CoveredColumn ref;
+
+    public ColumnEntry(byte[] value, CoveredColumn ref) {
+      this.value = value == null ? EMPTY_BYTES : value;
+      this.ref = ref;
+    }
+  }
+
+  /**
+   * Compose the final index row key.
+   * <p>
+   * This is faster than adding each value independently as we can just build a single a array and
+   * copy everything over once.
+   * @param pk primary key of the original row
+   * @param length total number of bytes of all the values that should be added
+   * @param values to use when building the key
+   * @return
+   */
+  static byte[] composeRowKey(byte[] pk, int length, List<ColumnEntry> values) {
+    // now build up expected row key, each of the values, in order, followed by the PK and then some
+    // info about lengths so we can deserialize each value
+    byte[] output = new byte[length + pk.length];
+    int pos = 0;
+    int[] lengths = new int[values.size()];
+    int i = 0;
+    for (ColumnEntry entry : values) {
+      byte[] v = entry.value;
+      // skip doing the copy attempt, if we don't need to
+      if (v.length != 0) {
+        System.arraycopy(v, 0, output, pos, v.length);
+        pos += v.length;
+      }
+      lengths[i++] = v.length;
+    }
+
+    // add the primary key to the end of the row key
+    System.arraycopy(pk, 0, output, pos, pk.length);
+
+    // add the lengths as suffixes so we can deserialize the elements again
+    for (int l : lengths) {
+      output = ArrayUtils.addAll(output, Bytes.toBytes(l));
+    }
+
+    // and the last integer is the number of values
+    return ArrayUtils.addAll(output, Bytes.toBytes(values.size()));
+  }
+
+  /**
+   * Essentially a short-cut from building a {@link Put}.
+   * @param pk row key
+   * @param timestamp timestamp of all the keyvalues
+   * @param values expected value--column pair
+   * @return a keyvalues that the index contains for a given row at a timestamp with the given value
+   *         -- column pairs.
+   */
+  public static List<KeyValue> getIndexKeyValueForTesting(byte[] pk, long timestamp,
+      List<Pair<byte[], CoveredColumn>> values) {
+  
+    int length = 0;
+    List<ColumnEntry> expected = new ArrayList<ColumnEntry>(values.size());
+    for (Pair<byte[], CoveredColumn> value : values) {
+      ColumnEntry entry = new ColumnEntry(value.getFirst(), value.getSecond());
+      length += value.getFirst().length;
+      expected.add(entry);
+    }
+  
+    byte[] rowKey = CoveredColumnIndexCodec.composeRowKey(pk, length, expected);
+    Put p = new Put(rowKey, timestamp);
+    CoveredColumnIndexCodec.addColumnsToPut(p, expected);
+    List<KeyValue> kvs = new ArrayList<KeyValue>();
+    for (Entry<byte[], List<KeyValue>> entry : p.getFamilyMap().entrySet()) {
+      kvs.addAll(entry.getValue());
+    }
+  
+    return kvs;
+  }
+
+  public static List<byte[]> getValues(byte[] bytes) {
+    // get the total number of keys in the bytes
+    int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, bytes.length);
+    List<byte[]> keys = new ArrayList<byte[]>(keyCount);
+    int[] lengths = new int[keyCount];
+    int lengthPos = keyCount - 1;
+    int pos = bytes.length - Bytes.SIZEOF_INT;
+    // figure out the length of each key
+    for (int i = 0; i < keyCount; i++) {
+      lengths[lengthPos--] = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos);
+      pos -= Bytes.SIZEOF_INT;
+    }
+
+    int current = 0;
+    for (int length : lengths) {
+      byte[] key = Arrays.copyOfRange(bytes, current, current + length);
+      keys.add(key);
+      current += length;
+    }
+
+    return keys;
+  }
+
+  /**
+   * Read an integer from the preceding {@value Bytes#SIZEOF_INT} bytes
+   * @param bytes array to read from
+   * @param start start point, backwards from which to read. For example, if specifying "25", we
+   *          would try to read an integer from 21 -> 25
+   * @return an integer from the proceeding {@value Bytes#SIZEOF_INT} bytes, if it exists.
+   */
+  private static int getPreviousInteger(byte[] bytes, int start) {
+    return Bytes.toInt(bytes, start - Bytes.SIZEOF_INT);
+  }
+
+  /**
+   * Check to see if an row key just contains a list of null values.
+   * @param bytes row key to examine
+   * @return <tt>true</tt> if all the values are zero-length, <tt>false</tt> otherwise
+   */
+  public static boolean checkRowKeyForAllNulls(byte[] bytes) {
+    int keyCount = CoveredColumnIndexCodec.getPreviousInteger(bytes, bytes.length);
+    int pos = bytes.length - Bytes.SIZEOF_INT;
+    for (int i = 0; i < keyCount; i++) {
+      int next = CoveredColumnIndexCodec.getPreviousInteger(bytes, pos);
+      if (next > 0) {
+        return false;
+      }
+      pos -= Bytes.SIZEOF_INT;
+    }
+
+    return true;
+  }
+
+  @Override
+  public boolean isEnabled(Mutation m) {
+    // this could be a bit smarter, looking at the groups for the mutation, but we leave it at this
+    // simple check for the moment.
+    return groups.size() > 0;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
new file mode 100644
index 0000000..9fcd5f3
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexSpecifierBuilder.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hbase.index.covered.example;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+
+import org.apache.phoenix.hbase.index.Indexer;
+import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder;
+import org.apache.phoenix.hbase.index.covered.IndexCodec;
+
+/**
+ * Helper to build the configuration for the {@link CoveredColumnIndexer}.
+ * <p>
+ * This class is NOT thread-safe; all concurrent access must be managed externally.
+ */
+public class CoveredColumnIndexSpecifierBuilder {
+
+  private static final String INDEX_TO_TABLE_CONF_PREFX = "hbase.index.covered.";
+  // number of index 'groups'. Each group represents a set of 'joined' columns. The data stored with
+  // each joined column are either just the columns in the group or all the most recent data in the
+  // row (a fully covered index).
+  private static final String COUNT = ".count";
+  private static final String INDEX_GROUPS_COUNT_KEY = INDEX_TO_TABLE_CONF_PREFX + ".groups" + COUNT;
+  private static final String INDEX_GROUP_PREFIX = INDEX_TO_TABLE_CONF_PREFX + "group.";
+  private static final String INDEX_GROUP_COVERAGE_SUFFIX = ".columns";
+  private static final String TABLE_SUFFIX = ".table";
+
+  // right now, we don't support this should be easy enough to add later
+  // private static final String INDEX_GROUP_FULLY_COVERED = ".covered";
+
+  List<ColumnGroup> groups = new ArrayList<ColumnGroup>();
+  private Map<String, String> specs = new HashMap<String, String>();
+
+  /**
+   * Add a group of columns to index
+   * @param columns Pairs of cf:cq (full specification of a column) to index
+   * @return the index of the group. This can be used to remove or modify the group via
+   *         {@link #remove(int)} or {@link #get(int)}, any time before building
+   */
+  public int addIndexGroup(ColumnGroup columns) {
+    if (columns == null || columns.size() == 0) {
+      throw new IllegalArgumentException("Must specify some columns to index!");
+    }
+    int size = this.groups.size();
+    this.groups.add(columns);
+    return size;
+  }
+
+  public void remove(int i) {
+    this.groups.remove(i);
+  }
+
+  public ColumnGroup get(int i) {
+    return this.groups.get(i);
+  }
+
+  /**
+   * Clear the stored {@link ColumnGroup}s for resuse.
+   */
+  public void reset() {
+    this.groups.clear();
+  }
+
+  Map<String, String> convertToMap() {
+    int total = this.groups.size();
+    // hbase.index.covered.groups = i
+    specs.put(INDEX_GROUPS_COUNT_KEY, Integer.toString(total));
+
+    int i = 0;
+    for (ColumnGroup group : groups) {
+      addIndexGroupToSpecs(specs, group, i++);
+    }
+
+    return specs;
+  }
+
+  /**
+   * @param specs
+   * @param columns
+   * @param index
+   */
+  private void addIndexGroupToSpecs(Map<String, String> specs, ColumnGroup columns, int index) {
+    // hbase.index.covered.group.<i>
+    String prefix = INDEX_GROUP_PREFIX + Integer.toString(index);
+
+    // set the table to which the group writes
+    // hbase.index.covered.group.<i>.table
+    specs.put(prefix + TABLE_SUFFIX, columns.getTable());
+    
+    // a different key for each column in the group
+    // hbase.index.covered.group.<i>.columns
+    String columnPrefix = prefix + INDEX_GROUP_COVERAGE_SUFFIX;
+    // hbase.index.covered.group.<i>.columns.count = <j>
+    String columnsSizeKey = columnPrefix + COUNT;
+    specs.put(columnsSizeKey, Integer.toString(columns.size()));
+    
+    // add each column in the group
+    int i=0; 
+    for (CoveredColumn column : columns) {
+      // hbase.index.covered.group.<i>.columns.<j>
+      String nextKey = columnPrefix + "." + Integer.toString(i);
+      String nextValue = column.serialize();
+      specs.put(nextKey, nextValue);
+      i++;
+    }
+  }
+
+  public void build(HTableDescriptor desc) throws IOException {
+    build(desc, CoveredColumnIndexCodec.class);
+  }
+
+  void build(HTableDescriptor desc, Class<? extends IndexCodec> clazz) throws IOException {
+    // add the codec for the index to the map of options
+    Map<String, String> opts = this.convertToMap();
+    opts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY, clazz.getName());
+    Indexer.enableIndexing(desc, CoveredColumnIndexer.class, opts);
+  }
+
+  static List<ColumnGroup> getColumns(Configuration conf) {
+    int count= conf.getInt(INDEX_GROUPS_COUNT_KEY, 0);
+    if (count ==0) {
+      return Collections.emptyList();
+    }
+
+    // parse out all the column groups we should index
+    List<ColumnGroup> columns = new ArrayList<ColumnGroup>(count);
+    for (int i = 0; i < count; i++) {
+      // parse out each group
+      String prefix = INDEX_GROUP_PREFIX + i;
+
+      // hbase.index.covered.group.<i>.table
+      String table = conf.get(prefix + TABLE_SUFFIX);
+      ColumnGroup group = new ColumnGroup(table);
+
+      // parse out each column in the group
+      // hbase.index.covered.group.<i>.columns
+      String columnPrefix = prefix + INDEX_GROUP_COVERAGE_SUFFIX;
+      // hbase.index.covered.group.<i>.columns.count = j
+      String columnsSizeKey = columnPrefix + COUNT;
+      int columnCount = conf.getInt(columnsSizeKey, 0);
+      for(int j=0; j< columnCount; j++){
+        String columnKey = columnPrefix + "." + j;
+        CoveredColumn column = CoveredColumn.parse(conf.get(columnKey));
+        group.add(column);
+      }
+
+      // add the group
+      columns.add(group);
+    }
+    return columns;
+  }
+
+  /**
+   * @param key
+   * @param value
+   */
+  public void addArbitraryConfigForTesting(String key, String value) {
+    this.specs.put(key, value);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
new file mode 100644
index 0000000..c5d7119
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexer.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hbase.index.covered.example;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+import org.apache.phoenix.hbase.index.covered.Batch;
+import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder;
+import org.apache.phoenix.hbase.index.covered.LocalTableState;
+import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
+
+/**
+ * Index maintainer that maintains multiple indexes based on '{@link ColumnGroup}s'. Each group is a
+ * fully covered within itself and stores the fully 'pre-joined' version of that values for that
+ * group of columns.
+ * <p>
+ * <h2>Index Layout</h2> The row key for a given index entry is the current state of the all the
+ * values of the columns in a column group, followed by the primary key (row key) of the original
+ * row, and then the length of each value and then finally the total number of values. This is then
+ * enough information to completely rebuild the latest value of row for each column in the group.
+ * <p>
+ * The family is always {@link CoveredColumnIndexCodec#INDEX_ROW_COLUMN_FAMILY}
+ * <p>
+ * The qualifier is prepended with the integer index (serialized with {@link Bytes#toBytes(int)}) of
+ * the column in the group. This index corresponds the index of the value for the group in the row
+ * key.
+ * 
+ * <pre>
+ *         ROW                            ||   FAMILY     ||    QUALIFIER     ||   VALUE
+ * (v1)(v2)...(vN)(pk)(L1)(L2)...(Ln)(#V) || INDEX_FAMILY ||     1Cf1:Cq1     ||  null
+ * (v1)(v2)...(vN)(pk)(L1)(L2)...(Ln)(#V) || INDEX_FAMILY ||     2Cf2:Cq2     ||  null
+ * ...
+ * (v1)(v2)...(vN)(pk)(L1)(L2)...(Ln)(#V) || INDEX_FAMILY ||     NCfN:CqN     ||  null
+ * </pre>
+ * 
+ * <h2>Index Maintenance</h2>
+ * <p>
+ * When making an insertion into the table, we also attempt to cleanup the index. This means that we
+ * need to remove the previous entry from the index. Generally, this is completed by inserting a
+ * delete at the previous value of the previous row.
+ * <p>
+ * The main caveat here is when dealing with custom timestamps. If there is no special timestamp
+ * specified, we can just insert the proper {@link Delete} at the current timestamp and move on.
+ * However, when the client specifies a timestamp, we could see updates out of order. In that case,
+ * we can do an insert using the specified timestamp, but a delete is different...
+ * <p>
+ * Taking the simple case, assume we do a single column in a group. Then if we get an out of order
+ * update, we need to check the current state of that column in the current row. If the current row
+ * is older, we can issue a delete as normal. If the current row is newer, however, we then have to
+ * issue a delete for the index update at the time of the current row. This ensures that the index
+ * update made for the 'future' time still covers the existing row.
+ * <p>
+ * <b>ASSUMPTION:</b> all key-values in a single {@link Delete}/{@link Put} have the same timestamp.
+ * This dramatically simplifies the logic needed to manage updating the index for out-of-order
+ * {@link Put}s as we don't need to manage multiple levels of timestamps across multiple columns.
+ * <p>
+ * We can extend this to multiple columns by picking the latest update of any column in group as the
+ * delete point.
+ * <p>
+ * <b>NOTE:</b> this means that we need to do a lookup (point {@link Get}) of the entire row
+ * <i>every time there is a write to the table</i>.
+ */
+public class CoveredColumnIndexer extends CoveredColumnsIndexBuilder {
+
+  /**
+   * Create the specified index table with the necessary columns
+   * @param admin {@link HBaseAdmin} to use when creating the table
+   * @param indexTable name of the index table.
+   * @throws IOException
+   */
+  public static void createIndexTable(HBaseAdmin admin, String indexTable) throws IOException {
+    createIndexTable(admin, new HTableDescriptor(indexTable));
+  }
+
+  /**
+   * @param admin to create the table
+   * @param index descriptor to update before creating table
+   */
+  public static void createIndexTable(HBaseAdmin admin, HTableDescriptor index) throws IOException {
+    HColumnDescriptor col =
+        new HColumnDescriptor(CoveredColumnIndexCodec.INDEX_ROW_COLUMN_FAMILY);
+    // ensure that we can 'see past' delete markers when doing scans
+    col.setKeepDeletedCells(true);
+    index.addFamily(col);
+    admin.createTable(index);
+  }
+
+  @Override
+  public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
+      Collection<KeyValue> filtered) throws IOException {
+
+    // stores all the return values
+    IndexUpdateManager updateMap = new IndexUpdateManager();
+    // batch the updates by row to make life easier and ordered
+    Collection<Batch> batches = batchByRow(filtered);
+
+    for (Batch batch : batches) {
+      Put p = new Put(batch.getKvs().iterator().next().getRow());
+      for (KeyValue kv : batch.getKvs()) {
+        // we only need to cleanup Put entries
+        byte type = kv.getType();
+        Type t = KeyValue.Type.codeToType(type);
+        if (!t.equals(Type.Put)) {
+          continue;
+        }
+
+        // add the kv independently
+        p.add(kv);
+      }
+
+      // do the usual thing as for deletes
+      Collection<Batch> timeBatch = createTimestampBatchesFromMutation(p);
+      LocalTableState state = new LocalTableState(env, localTable, p);
+      for (Batch entry : timeBatch) {
+        //just set the timestamp on the table - it already has all the future state
+        state.setCurrentTimestamp(entry.getTimestamp());
+        this.addDeleteUpdatesToMap(updateMap, state, entry.getTimestamp());
+      }
+    }
+    return updateMap.toMap();
+  }
+
+
+  /**
+   * @param filtered
+   * @return
+   */
+  private Collection<Batch>  batchByRow(Collection<KeyValue> filtered) {
+    Map<Long, Batch> batches = new HashMap<Long, Batch>();
+    createTimestampBatchesFromKeyValues(filtered, batches);
+    return batches.values();
+  }
+}
\ No newline at end of file


Mime
View raw message