phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [1/2] phoenix git commit: PHOENIX-3744 Support snapshot scanners for MR-based Non-aggregate queries (Akshita Malhotra)
Date Fri, 02 Jun 2017 23:45:31 GMT
Repository: phoenix
Updated Branches:
  refs/heads/master ed77c36b0 -> faadb5448


http://git-wip-us.apache.org/repos/asf/phoenix/blob/faadb544/phoenix-core/src/main/java/org/apache/phoenix/iterate/SnapshotScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SnapshotScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SnapshotScanner.java
new file mode 100644
index 0000000..31f04f2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SnapshotScanner.java
@@ -0,0 +1,165 @@
+package org.apache.phoenix.iterate;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.*;
+
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.*;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+
+public class SnapshotScanner extends AbstractClientScanner {
+
+  private static final Log LOG = LogFactory.getLog(SnapshotScanner.class);
+
+  private RegionScanner scanner = null;
+  private HRegion region;
+  List<Cell> values;
+
+  public SnapshotScanner(Configuration conf, FileSystem fs, Path rootDir,
+      HTableDescriptor htd, HRegionInfo hri,  Scan scan) throws Throwable{
+
+    scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
+    values = new ArrayList<>();
+    this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null);
+
+    // process the region scanner for non-aggregate queries
+    PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan);
+    boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan);
+
+    RegionCoprocessorEnvironment snapshotEnv = getSnapshotContextEnvironment(conf);
+
+    RegionScannerFactory regionScannerFactory;
+    if (scan.getAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY) != null) {
+      regionScannerFactory = new NonAggregateRegionScannerFactory(snapshotEnv, useNewValueColumnQualifier,
encodingScheme);
+    } else {
+      /* future work : Snapshot M/R jobs for aggregate queries*/
+      throw new UnsupportedOperationException("Snapshot M/R jobs not available for aggregate
queries");
+    }
+
+    this.scanner = regionScannerFactory.getRegionScanner(scan, region.getScanner(scan));
+    region.startRegionOperation();
+  }
+
+
+  @Override
+  public Result next() throws IOException {
+    values.clear();
+    scanner.nextRaw(values);
+    if (values.isEmpty()) {
+      //we are done
+      return null;
+    }
+
+    return Result.create(values);
+  }
+
+  @Override
+  public void close() {
+    if (this.scanner != null) {
+      try {
+        this.scanner.close();
+        this.scanner = null;
+      } catch (IOException e) {
+        LOG.warn("Exception while closing scanner", e);
+      }
+    }
+    if (this.region != null) {
+      try {
+        this.region.closeRegionOperation();
+        this.region.close(true);
+        this.region = null;
+      } catch (IOException e) {
+        LOG.warn("Exception while closing scanner", e);
+      }
+    }
+  }
+
+  @Override
+  public boolean renewLease() {
+    return false;
+  }
+
+  private RegionCoprocessorEnvironment getSnapshotContextEnvironment(final Configuration
conf) {
+    return new RegionCoprocessorEnvironment() {
+      @Override
+      public Region getRegion() {
+        return region;
+      }
+
+      @Override
+      public HRegionInfo getRegionInfo() {
+        return region.getRegionInfo();
+      }
+
+      @Override
+      public RegionServerServices getRegionServerServices() {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public ConcurrentMap<String, Object> getSharedData() {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public int getVersion() {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public String getHBaseVersion() {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public Coprocessor getInstance() {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public int getPriority() {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public int getLoadSequence() {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public Configuration getConfiguration() {
+        return conf;
+      }
+
+      @Override
+      public HTableInterface getTable(TableName tableName) throws IOException {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public HTableInterface getTable(TableName tableName, ExecutorService executorService)
+          throws IOException {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public ClassLoader getClassLoader() {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/faadb544/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
new file mode 100644
index 0000000..a3f6224
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableSnapshotResultIterator.java
@@ -0,0 +1,138 @@
+package org.apache.phoenix.iterate;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.monitoring.ScanMetricsHolder;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ServerUtil;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+public class TableSnapshotResultIterator implements ResultIterator {
+
+  private final Scan scan;
+  private ResultIterator scanIterator;
+  private Configuration configuration;
+  private final ScanMetricsHolder scanMetricsHolder;
+  private Tuple lastTuple = null;
+  private static final ResultIterator UNINITIALIZED_SCANNER = ResultIterator.EMPTY_ITERATOR;
+  private ArrayList<HRegionInfo> regions;
+  private HTableDescriptor htd;
+  private String snapshotName;
+
+  private Path restoreDir;
+  private Path rootDir;
+  private FileSystem fs;
+  private int currentRegion;
+  private boolean closed = false;
+
+  public TableSnapshotResultIterator(Configuration configuration, Scan scan, ScanMetricsHolder
scanMetricsHolder)
+      throws IOException {
+    this.configuration = configuration;
+    this.currentRegion = -1;
+    this.scan = scan;
+    this.scanMetricsHolder = scanMetricsHolder;
+    this.scanIterator = UNINITIALIZED_SCANNER;
+    this.restoreDir = new Path(configuration.get(PhoenixConfigurationUtil.RESTORE_DIR_KEY));
+    this.snapshotName = configuration.get(
+        PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
+    this.rootDir = FSUtils.getRootDir(configuration);
+    this.fs = rootDir.getFileSystem(configuration);
+    init();
+  }
+
+  private void init() throws IOException {
+    RestoreSnapshotHelper.RestoreMetaChanges meta =
+        RestoreSnapshotHelper.copySnapshotForScanner(this.configuration, this.fs,
+            this.rootDir, this.restoreDir, this.snapshotName);
+    List restoredRegions = meta.getRegionsToAdd();
+    this.htd = meta.getTableDescriptor();
+    this.regions = new ArrayList(restoredRegions.size());
+    Iterator i$ = restoredRegions.iterator();
+
+    while(i$.hasNext()) {
+      HRegionInfo hri = (HRegionInfo)i$.next();
+      if(CellUtil.overlappingKeys(this.scan.getStartRow(), this.scan.getStopRow(),
+          hri.getStartKey(), hri.getEndKey())) {
+        this.regions.add(hri);
+      }
+    }
+
+    Collections.sort(this.regions);
+  }
+
+  public boolean initSnapshotScanner() throws SQLException {
+    if (closed) {
+      return true;
+    }
+    ResultIterator delegate = this.scanIterator;
+    if (delegate == UNINITIALIZED_SCANNER) {
+      ++this.currentRegion;
+      if (this.currentRegion >= this.regions.size())
+        return false;
+      try {
+        HRegionInfo hri = regions.get(this.currentRegion);
+        this.scanIterator =
+            new ScanningResultIterator(new SnapshotScanner(configuration, fs, restoreDir,
htd, hri, scan),
+                scan, scanMetricsHolder);
+      } catch (Throwable e) {
+        throw ServerUtil.parseServerException(e);
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public Tuple next() throws SQLException {
+    while (true) {
+      if (!initSnapshotScanner())
+        return null;
+      try {
+        lastTuple = scanIterator.next();
+        if (lastTuple != null) {
+          ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+          lastTuple.getKey(ptr);
+          return lastTuple;
+        }
+      } finally {
+        if (lastTuple == null) {
+          scanIterator.close();
+          scanIterator = UNINITIALIZED_SCANNER;
+        }
+      }
+    }
+  }
+
+  @Override
+  public void close() throws SQLException {
+    closed = true; // ok to say closed even if the below code throws an exception
+    try {
+      scanIterator.close();
+      fs.delete(this.restoreDir, true);
+    } catch (IOException e) {
+      throw ServerUtil.parseServerException(e);
+    } finally {
+      scanIterator = UNINITIALIZED_SCANNER;
+    }
+  }
+
+  @Override
+  public void explain(List<String> planSteps) {
+    // noop
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/faadb544/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
index d4d6734..ec1b451 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -35,15 +35,9 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
-import org.apache.phoenix.iterate.ConcatResultIterator;
-import org.apache.phoenix.iterate.LookAheadResultIterator;
-import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
-import org.apache.phoenix.iterate.PeekingResultIterator;
-import org.apache.phoenix.iterate.ResultIterator;
-import org.apache.phoenix.iterate.RoundRobinResultIterator;
-import org.apache.phoenix.iterate.SequenceResultIterator;
-import org.apache.phoenix.iterate.TableResultIterator;
+import org.apache.phoenix.iterate.*;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.monitoring.ReadMetricQueue;
 import org.apache.phoenix.monitoring.ScanMetricsHolder;
 import org.apache.phoenix.query.ConnectionQueryServices;
@@ -110,6 +104,7 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null
             StatementContext ctx = queryPlan.getContext();
             ReadMetricQueue readMetrics = ctx.getReadMetricsQueue();
             String tableName = queryPlan.getTableRef().getTable().getPhysicalName().getString();
+            String snapshotName = this.configuration.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
 
             // Clear the table region boundary cache to make sure long running jobs stay
up to date
             byte[] tableNameBytes = queryPlan.getTableRef().getTable().getPhysicalName().getBytes();
@@ -121,15 +116,25 @@ public class PhoenixRecordReader<T extends DBWritable> extends
RecordReader<Null
             for (Scan scan : scans) {
                 // For MR, skip the region boundary check exception if we encounter a split.
ref: PHOENIX-2599
                 scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, Bytes.toBytes(true));
+
+                PeekingResultIterator peekingResultIterator;
                 ScanMetricsHolder scanMetricsHolder =
-                        ScanMetricsHolder.getInstance(readMetrics, tableName, scan,
-                            isRequestMetricsEnabled);
-                final TableResultIterator tableResultIterator =
-                        new TableResultIterator(
-                                queryPlan.getContext().getConnection().getMutationState(),
scan,
-                                scanMetricsHolder, renewScannerLeaseThreshold, queryPlan,
-                                MapReduceParallelScanGrouper.getInstance());
-                PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
+                  ScanMetricsHolder.getInstance(readMetrics, tableName, scan,
+                      isRequestMetricsEnabled);
+                if (snapshotName != null) {
+                  // result iterator to read snapshots
+                  final TableSnapshotResultIterator tableSnapshotResultIterator = new TableSnapshotResultIterator(configuration,
scan,
+                      scanMetricsHolder);
+                    peekingResultIterator = LookAheadResultIterator.wrap(tableSnapshotResultIterator);
+                } else {
+                  final TableResultIterator tableResultIterator =
+                      new TableResultIterator(
+                          queryPlan.getContext().getConnection().getMutationState(), scan,
+                          scanMetricsHolder, renewScannerLeaseThreshold, queryPlan,
+                          MapReduceParallelScanGrouper.getInstance());
+                  peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator);
+                }
+
                 iterators.add(peekingResultIterator);
             }
             ResultIterator iterator = queryPlan.useRoundRobinIterator() ? RoundRobinResultIterator.newIterator(iterators,
queryPlan) : ConcatResultIterator.newIterator(iterators);
@@ -139,13 +144,14 @@ public class PhoenixRecordReader<T extends DBWritable> extends
RecordReader<Null
             this.resultIterator = iterator;
             // Clone the row projector as it's not thread safe and would be used simultaneously
by
             // multiple threads otherwise.
+
             this.resultSet = new PhoenixResultSet(this.resultIterator, queryPlan.getProjector().cloneIfNecessary(),
queryPlan.getContext());
         } catch (SQLException e) {
             LOG.error(String.format(" Error [%s] initializing PhoenixRecordReader. ",e.getMessage()));
             Throwables.propagate(e);
         }
    }
-    
+
    @Override
     public boolean nextKeyValue() throws IOException, InterruptedException {
         if (key == null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/faadb544/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 1d2cbbe..1302f85 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -111,6 +111,10 @@ public final class PhoenixConfigurationUtil {
 
     public static final boolean DEFAULT_SPLIT_BY_STATS = true;
 
+    public static final String SNAPSHOT_NAME_KEY = "phoenix.mapreduce.snapshot.name";
+
+    public static final String RESTORE_DIR_KEY = "phoenix.tableSnapshot.restore.dir";
+
     public enum SchemaType {
         TABLE,
         QUERY;
@@ -192,6 +196,18 @@ public final class PhoenixConfigurationUtil {
     public static void setUpsertColumnNames(final Configuration configuration,final String[]
columns) {
         setValues(configuration, columns, MAPREDUCE_UPSERT_COLUMN_COUNT, MAPREDUCE_UPSERT_COLUMN_VALUE_PREFIX);
     }
+
+    public static void setSnapshotNameKey(final Configuration configuration, final String
snapshotName) {
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(snapshotName);
+        configuration.set(SNAPSHOT_NAME_KEY, snapshotName);
+    }
+
+    public static void setRestoreDirKey(final Configuration configuration, final String restoreDir)
{
+        Preconditions.checkNotNull(configuration);
+        Preconditions.checkNotNull(restoreDir);
+        configuration.set(RESTORE_DIR_KEY, restoreDir);
+    }
     
     public static List<String> getUpsertColumnNames(final Configuration configuration)
{
         return getValues(configuration, MAPREDUCE_UPSERT_COLUMN_COUNT, MAPREDUCE_UPSERT_COLUMN_VALUE_PREFIX);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/faadb544/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
index 98f0364..b0981ef 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
@@ -18,12 +18,16 @@
 package org.apache.phoenix.mapreduce.util;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
 import org.apache.phoenix.mapreduce.PhoenixInputFormat;
 import org.apache.phoenix.mapreduce.PhoenixOutputFormat;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
 
+import java.io.IOException;
+import java.util.UUID;
+
 /**
  * Utility class for setting Configuration parameters for the Map Reduce job
  */
@@ -63,6 +67,66 @@ public final class PhoenixMapReduceUtil {
         PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY);
      }
 
+    /**
+     *
+     * @param job
+     * @param inputClass DBWritable class
+     * @param snapshotName The name of a snapshot (of a table) to read from
+     * @param tableName Input table name
+     * @param restoreDir a temporary dir to copy the snapshot files into
+     * @param conditions Condition clause to be added to the WHERE clause. Can be <tt>null</tt>
if there are no conditions.
+     * @param fieldNames fields being projected for the SELECT query.
+     */
+    public static void setInput(final Job job, final Class<? extends DBWritable> inputClass,
final String snapshotName, String tableName,
+        Path restoreDir, final String conditions, final String... fieldNames) throws
+        IOException {
+        final Configuration configuration = setSnapshotInput(job, inputClass, snapshotName,
tableName, restoreDir);
+        if(conditions != null) {
+            PhoenixConfigurationUtil.setInputTableConditions(configuration, conditions);
+        }
+        PhoenixConfigurationUtil.setSelectColumnNames(configuration, fieldNames);
+    }
+
+    /**
+     *
+     * @param job
+     * @param inputClass DBWritable class
+     * @param snapshotName The name of a snapshot (of a table) to read from
+     * @param tableName Input table name
+     * @param restoreDir a temporary dir to copy the snapshot files into
+     * @param inputQuery The select query
+     */
+    public static void setInput(final Job job, final Class<? extends DBWritable> inputClass,
final String snapshotName, String tableName,
+        Path restoreDir, String inputQuery) throws
+        IOException {
+        final Configuration configuration = setSnapshotInput(job, inputClass, snapshotName,
tableName, restoreDir);
+        if(inputQuery != null) {
+            PhoenixConfigurationUtil.setInputQuery(configuration, inputQuery);
+        }
+
+    }
+
+    /**
+     *
+     * @param job
+     * @param inputClass DBWritable class
+     * @param snapshotName The name of a snapshot (of a table) to read from
+     * @param tableName Input table name
+     * @param restoreDir a temporary dir to copy the snapshot files into
+     */
+    private static Configuration setSnapshotInput(Job job, Class<? extends DBWritable>
inputClass, String snapshotName,
+        String tableName, Path restoreDir) {
+        job.setInputFormatClass(PhoenixInputFormat.class);
+        final Configuration configuration = job.getConfiguration();
+        PhoenixConfigurationUtil.setInputClass(configuration, inputClass);
+        PhoenixConfigurationUtil.setSnapshotNameKey(configuration, snapshotName);
+        PhoenixConfigurationUtil.setInputTableName(configuration, tableName);
+
+        PhoenixConfigurationUtil.setRestoreDirKey(configuration, new Path(restoreDir, UUID.randomUUID().toString()).toString());
+        PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY);
+        return configuration;
+    }
+
     private static Configuration setInput(final Job job, final Class<? extends DBWritable>
inputClass, final String tableName){
         job.setInputFormatClass(PhoenixInputFormat.class);
         final Configuration configuration = job.getConfiguration();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/faadb544/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index 986debd..2f65647 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
@@ -504,7 +503,7 @@ public class IndexUtil {
         return QueryUtil.getViewStatement(index.getSchemaName().getString(), index.getTableName().getString(),
buf.toString());
     }
     
-    public static void wrapResultUsingOffset(final ObserverContext<RegionCoprocessorEnvironment>
c,
+    public static void wrapResultUsingOffset(final RegionCoprocessorEnvironment environment,
             List<Cell> result, final int offset, ColumnReference[] dataColumns,
             TupleProjector tupleProjector, Region dataRegion, IndexMaintainer indexMaintainer,
             byte[][] viewConstants, ImmutableBytesWritable ptr) throws IOException {
@@ -529,11 +528,11 @@ public class IndexUtil {
                 joinResult = dataRegion.get(get);
             } else {
                 TableName dataTable =
-                        TableName.valueOf(MetaDataUtil.getLocalIndexUserTableName(c.getEnvironment()
-                                .getRegion().getTableDesc().getNameAsString()));
+                        TableName.valueOf(MetaDataUtil.getLocalIndexUserTableName(
+                            environment.getRegion().getTableDesc().getNameAsString()));
                 HTableInterface table = null;
                 try {
-                    table = c.getEnvironment().getTable(dataTable);
+                    table = environment.getTable(dataTable);
                     joinResult = table.get(get);
                 } finally {
                     if (table != null) table.close();


Mime
View raw message