phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tdsi...@apache.org
Subject phoenix git commit: PHOENIX-2656 Shield Phoenix from Tephra repackaging
Date Tue, 09 Feb 2016 02:03:09 GMT
Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.0 525873614 -> dbde8c387


PHOENIX-2656 Shield Phoenix from Tephra repackaging


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/dbde8c38
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/dbde8c38
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/dbde8c38

Branch: refs/heads/4.x-HBase-1.0
Commit: dbde8c387d4028be8a934cc3d83be37bd734c3cc
Parents: 5258736
Author: Thomas D'Silva <tdsilva@salesforce.com>
Authored: Mon Feb 8 14:22:25 2016 -0800
Committer: Thomas D'Silva <tdsilva@salesforce.com>
Committed: Mon Feb 8 17:18:44 2016 -0800

----------------------------------------------------------------------
 .../phoenix/end2end/AlterTableWithViewsIT.java  |   7 +-
 .../org/apache/phoenix/tx/TransactionIT.java    |  12 +-
 .../coprocessor/DelegateRegionObserver.java     | 562 +++++++++++++++++++
 .../PhoenixTransactionalProcessor.java          |  28 +
 .../query/ConnectionQueryServicesImpl.java      |  22 +-
 5 files changed, 610 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbde8c38/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
index e83d4ca..f1816cc 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.coprocessor.PhoenixTransactionalProcessor;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
@@ -58,8 +59,6 @@ import org.apache.phoenix.util.SchemaUtil;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import co.cask.tephra.hbase10.coprocessor.TransactionProcessor;
-
 import com.google.common.base.Objects;
 import com.google.common.collect.Maps;
 
@@ -1126,7 +1125,7 @@ public class AlterTableWithViewsIT extends BaseHBaseManagedTimeIT {
             assertTableDefinition(conn, "VIEWOFTABLE", PTableType.VIEW, "TABLEWITHVIEW",
0, 5, 3, "ID", "COL1", "COL2", "VIEW_COL1", "VIEW_COL2");
             
             HTableInterface htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("TABLEWITHVIEW"));
-            assertFalse(htable.getTableDescriptor().getCoprocessors().contains(TransactionProcessor.class.getName()));
+            assertFalse(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
             assertFalse(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null,
"TABLEWITHVIEW")).isTransactional());
             assertFalse(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null,
"VIEWOFTABLE")).isTransactional());
             
@@ -1135,7 +1134,7 @@ public class AlterTableWithViewsIT extends BaseHBaseManagedTimeIT {
             // query the view to force the table cache to be updated
             conn.createStatement().execute("SELECT * FROM VIEWOFTABLE");
             htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("TABLEWITHVIEW"));
-            assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TransactionProcessor.class.getName()));
+            assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
             assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null,
"TABLEWITHVIEW")).isTransactional());
             assertTrue(conn.unwrap(PhoenixConnection.class).getTable(new PTableKey(null,
"VIEWOFTABLE")).isTransactional());
         } 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbde8c38/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
index a8e37d0..001d8c6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/tx/TransactionIT.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.PhoenixTransactionalProcessor;
 import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
 import org.apache.phoenix.end2end.Shadower;
 import org.apache.phoenix.exception.SQLExceptionCode;
@@ -69,7 +70,6 @@ import co.cask.tephra.TransactionContext;
 import co.cask.tephra.TransactionSystemClient;
 import co.cask.tephra.TxConstants;
 import co.cask.tephra.hbase10.TransactionAwareHTable;
-import co.cask.tephra.hbase10.coprocessor.TransactionProcessor;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -311,9 +311,9 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
         conn.createStatement().execute("ALTER TABLE NON_TX_TABLE SET TRANSACTIONAL=true");
         
         htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("NON_TX_TABLE"));
-        assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TransactionProcessor.class.getName()));
+        assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
         htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("IDX"));
-        assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TransactionProcessor.class.getName()));
+        assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
 
         conn.createStatement().execute("UPSERT INTO NON_TX_TABLE VALUES (4, 'c')");
         ResultSet rs = conn.createStatement().executeQuery("SELECT /*+ NO_INDEX */ k FROM
NON_TX_TABLE WHERE v IS NULL");
@@ -386,7 +386,7 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
         assertFalse(rs.next());
         
         htable = conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes("SYSTEM.NON_TX_TABLE"));
-        assertFalse(htable.getTableDescriptor().getCoprocessors().contains(TransactionProcessor.class.getName()));
+        assertFalse(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
         assertEquals(1,conn.unwrap(PhoenixConnection.class).getQueryServices().
                 getTableDescriptor(Bytes.toBytes("SYSTEM.NON_TX_TABLE")).
                 getFamily(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES).getMaxVersions());
@@ -479,7 +479,7 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
         PTable table = pconn.getTable(new PTableKey(null, "TEST_TRANSACTIONAL_TABLE"));
         HTableInterface htable = pconn.getQueryServices().getTable(Bytes.toBytes("TEST_TRANSACTIONAL_TABLE"));
         assertTrue(table.isTransactional());
-        assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TransactionProcessor.class.getName()));
+        assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
         
         try {
             ddl = "ALTER TABLE TEST_TRANSACTIONAL_TABLE SET transactional=false";
@@ -510,7 +510,7 @@ public class TransactionIT extends BaseHBaseManagedTimeIT {
         table = pconn.getTable(new PTableKey(null, "TEST_TRANSACTIONAL_TABLE"));
         htable = pconn.getQueryServices().getTable(Bytes.toBytes("TEST_TRANSACTIONAL_TABLE"));
         assertTrue(table.isTransactional());
-        assertTrue(htable.getTableDescriptor().getCoprocessors().contains(TransactionProcessor.class.getName()));
+        assertTrue(htable.getTableDescriptor().getCoprocessors().contains(PhoenixTransactionalProcessor.class.getName()));
     }
 
     public void testCurrentDate() throws Exception {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbde8c38/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java
new file mode 100644
index 0000000..82284ec
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionObserver.java
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NavigableSet;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.regionserver.DeleteTracker;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WALKey;
+
+import com.google.common.collect.ImmutableList;
+
+public class DelegateRegionObserver implements RegionObserver {
+
+    protected final RegionObserver delegate;
+    
+    public DelegateRegionObserver(RegionObserver delegate) {
+        this.delegate = delegate;
+    }
+
+    @Override
+    public void preOpen(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException
{
+        delegate.preOpen(c);
+    }
+
+    @Override
+    public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
+        delegate.postOpen(c);
+    }
+
+    @Override
+    public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> c) {
+        delegate.postLogReplay(c);
+    }
+
+    @Override
+    public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment>
c,
+            Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException
{
+        return delegate.preFlushScannerOpen(c, store, memstoreScanner, s);
+    }
+
+    @Override
+    public void preFlush(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException
{
+        delegate.preFlush(c);
+    }
+
+    @Override
+    public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c,
Store store,
+            InternalScanner scanner) throws IOException {
+        return delegate.preFlush(c, store, scanner);
+    }
+
+    @Override
+    public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException
{
+        delegate.postFlush(c);
+    }
+
+    @Override
+    public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+            StoreFile resultFile) throws IOException {
+        delegate.postFlush(c, store, resultFile);
+    }
+
+    @Override
+    public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c,
Store store,
+            List<StoreFile> candidates, CompactionRequest request) throws IOException
{
+        delegate.preCompactSelection(c, store, candidates, request);
+    }
+
+    @Override
+    public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c,
Store store,
+            List<StoreFile> candidates) throws IOException {
+        delegate.preCompactSelection(c, store, candidates);
+    }
+
+    @Override
+    public void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment>
c, Store store,
+            ImmutableList<StoreFile> selected, CompactionRequest request) {
+        delegate.postCompactSelection(c, store, selected, request);
+    }
+
+    @Override
+    public void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment>
c, Store store,
+            ImmutableList<StoreFile> selected) {
+        delegate.postCompactSelection(c, store, selected);
+    }
+
+    @Override
+    public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment>
c, Store store,
+            InternalScanner scanner, ScanType scanType, CompactionRequest request)
+            throws IOException {
+        return delegate.preCompact(c, store, scanner, scanType, request);
+    }
+
+    @Override
+    public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment>
c, Store store,
+            InternalScanner scanner, ScanType scanType) throws IOException {
+        return delegate.preCompact(c, store, scanner, scanType);
+    }
+
+    @Override
+    public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment>
c,
+            Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
+            long earliestPutTs, InternalScanner s, CompactionRequest request) throws IOException
{
+        return delegate.preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs,
s,
+            request);
+    }
+
+    @Override
+    public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment>
c,
+            Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
+            long earliestPutTs, InternalScanner s) throws IOException {
+        return delegate.preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs,
s);
+    }
+
+    @Override
+    public void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store
store,
+            StoreFile resultFile, CompactionRequest request) throws IOException {
+        delegate.postCompact(c, store, resultFile, request);
+    }
+
+    @Override
+    public void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store
store,
+            StoreFile resultFile) throws IOException {
+        delegate.postCompact(c, store, resultFile);
+    }
+
+    @Override
+    public void preSplit(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException
{
+        delegate.preSplit(c);
+    }
+
+    @Override
+    public void preSplit(ObserverContext<RegionCoprocessorEnvironment> c, byte[] splitRow)
+            throws IOException {
+        delegate.preSplit(c, splitRow);
+    }
+
+    @Override
+    public void postSplit(ObserverContext<RegionCoprocessorEnvironment> c, HRegion
l, HRegion r)
+            throws IOException {
+        delegate.postSplit(c, l, r);
+    }
+
+    @Override
+    public void preSplitBeforePONR(ObserverContext<RegionCoprocessorEnvironment> ctx,
+            byte[] splitKey, List<Mutation> metaEntries) throws IOException {
+        delegate.preSplitBeforePONR(ctx, splitKey, metaEntries);
+    }
+
+    @Override
+    public void preSplitAfterPONR(ObserverContext<RegionCoprocessorEnvironment> ctx)
+            throws IOException {
+        delegate.preSplitAfterPONR(ctx);
+    }
+
+    @Override
+    public void preRollBackSplit(ObserverContext<RegionCoprocessorEnvironment> ctx)
+            throws IOException {
+        delegate.preRollBackSplit(ctx);
+    }
+
+    @Override
+    public void postRollBackSplit(ObserverContext<RegionCoprocessorEnvironment> ctx)
+            throws IOException {
+        delegate.postRollBackSplit(ctx);
+    }
+
+    @Override
+    public void postCompleteSplit(ObserverContext<RegionCoprocessorEnvironment> ctx)
+            throws IOException {
+        delegate.postCompleteSplit(ctx);
+    }
+
+    @Override
+    public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested)
+            throws IOException {
+        delegate.preClose(c, abortRequested);
+    }
+
+    @Override
+    public void postClose(ObserverContext<RegionCoprocessorEnvironment> c, boolean
abortRequested) {
+        delegate.postClose(c, abortRequested);
+    }
+
+    @Override
+    public void preGetClosestRowBefore(ObserverContext<RegionCoprocessorEnvironment>
c, byte[] row,
+            byte[] family, Result result) throws IOException {
+        delegate.preGetClosestRowBefore(c, row, family, result);
+    }
+
+    @Override
+    public void postGetClosestRowBefore(ObserverContext<RegionCoprocessorEnvironment>
c,
+            byte[] row, byte[] family, Result result) throws IOException {
+        delegate.postGetClosestRowBefore(c, row, family, result);
+    }
+
+    @Override
+    public void
+            preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get, List<Cell>
result)
+                    throws IOException {
+        delegate.preGetOp(c, get, result);
+    }
+
+    @Override
+    public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
+            List<Cell> result) throws IOException {
+        delegate.postGetOp(c, get, result);
+    }
+
+    @Override
+    public boolean preExists(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
+            boolean exists) throws IOException {
+        return delegate.preExists(c, get, exists);
+    }
+
+    @Override
+    public boolean postExists(ObserverContext<RegionCoprocessorEnvironment> c, Get
get,
+            boolean exists) throws IOException {
+        return delegate.postExists(c, get, exists);
+    }
+
+    @Override
+    public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit
edit,
+            Durability durability) throws IOException {
+        delegate.prePut(c, put, edit, durability);
+    }
+
+    @Override
+    public void postPut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit
edit,
+            Durability durability) throws IOException {
+        delegate.postPut(c, put, edit, durability);
+    }
+
+    @Override
+    public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete,
+            WALEdit edit, Durability durability) throws IOException {
+        delegate.preDelete(c, delete, edit, durability);
+    }
+
+    @Override
+    public void prePrepareTimeStampForDeleteVersion(
+            ObserverContext<RegionCoprocessorEnvironment> c, Mutation mutation, Cell
cell,
+            byte[] byteNow, Get get) throws IOException {
+        delegate.prePrepareTimeStampForDeleteVersion(c, mutation, cell, byteNow, get);
+    }
+
+    @Override
+    public void postDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete
delete,
+            WALEdit edit, Durability durability) throws IOException {
+        delegate.postDelete(c, delete, edit, durability);
+    }
+
+    @Override
+    public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+            MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException
{
+        delegate.preBatchMutate(c, miniBatchOp);
+    }
+
+    @Override
+    public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+            MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException
{
+        delegate.postBatchMutate(c, miniBatchOp);
+    }
+
+    @Override
+    public void postStartRegionOperation(ObserverContext<RegionCoprocessorEnvironment>
ctx,
+            Operation operation) throws IOException {
+        delegate.postStartRegionOperation(ctx, operation);
+    }
+
+    @Override
+    public void postCloseRegionOperation(ObserverContext<RegionCoprocessorEnvironment>
ctx,
+            Operation operation) throws IOException {
+        delegate.postCloseRegionOperation(ctx, operation);
+    }
+
+    @Override
+    public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment>
ctx,
+            MiniBatchOperationInProgress<Mutation> miniBatchOp, boolean success) throws
IOException {
+        delegate.postBatchMutateIndispensably(ctx, miniBatchOp, success);
+    }
+
+    @Override
+    public boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c,
byte[] row,
+            byte[] family, byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator,
+            Put put, boolean result) throws IOException {
+        return delegate.preCheckAndPut(c, row, family, qualifier, compareOp, comparator,
put,
+            result);
+    }
+
+    @Override
+    public boolean preCheckAndPutAfterRowLock(ObserverContext<RegionCoprocessorEnvironment>
c,
+            byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp,
+            ByteArrayComparable comparator, Put put, boolean result) throws IOException {
+        return delegate.preCheckAndPutAfterRowLock(c, row, family, qualifier, compareOp,
+            comparator, put, result);
+    }
+
+    @Override
+    public boolean postCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c,
byte[] row,
+            byte[] family, byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator,
+            Put put, boolean result) throws IOException {
+        return delegate.postCheckAndPut(c, row, family, qualifier, compareOp, comparator,
put,
+            result);
+    }
+
+    @Override
+    public boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment>
c, byte[] row,
+            byte[] family, byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator,
+            Delete delete, boolean result) throws IOException {
+        return delegate.preCheckAndDelete(c, row, family, qualifier, compareOp, comparator,
delete,
+            result);
+    }
+
+    @Override
+    public boolean preCheckAndDeleteAfterRowLock(ObserverContext<RegionCoprocessorEnvironment>
c,
+            byte[] row, byte[] family, byte[] qualifier, CompareOp compareOp,
+            ByteArrayComparable comparator, Delete delete, boolean result) throws IOException
{
+        return delegate.preCheckAndDeleteAfterRowLock(c, row, family, qualifier, compareOp,
+            comparator, delete, result);
+    }
+
+    @Override
+    public boolean postCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment>
c, byte[] row,
+            byte[] family, byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator,
+            Delete delete, boolean result) throws IOException {
+        return delegate.postCheckAndDelete(c, row, family, qualifier, compareOp, comparator,
+            delete, result);
+    }
+
+    @Override
+    public long preIncrementColumnValue(ObserverContext<RegionCoprocessorEnvironment>
c,
+            byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL)
+            throws IOException {
+        return delegate.preIncrementColumnValue(c, row, family, qualifier, amount, writeToWAL);
+    }
+
+    @Override
+    public long postIncrementColumnValue(ObserverContext<RegionCoprocessorEnvironment>
c,
+            byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL,
+            long result) throws IOException {
+        return delegate.postIncrementColumnValue(c, row, family, qualifier, amount, writeToWAL,
+            result);
+    }
+
+    @Override
+    public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append
append)
+            throws IOException {
+        return delegate.preAppend(c, append);
+    }
+
+    @Override
+    public Result preAppendAfterRowLock(ObserverContext<RegionCoprocessorEnvironment>
c,
+            Append append) throws IOException {
+        return delegate.preAppendAfterRowLock(c, append);
+    }
+
+    @Override
+    public Result postAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append
append,
+            Result result) throws IOException {
+        return delegate.postAppend(c, append, result);
+    }
+
+    @Override
+    public Result
+            preIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment
increment)
+                    throws IOException {
+        return delegate.preIncrement(c, increment);
+    }
+
+    @Override
+    public Result preIncrementAfterRowLock(ObserverContext<RegionCoprocessorEnvironment>
c,
+            Increment increment) throws IOException {
+        return delegate.preIncrementAfterRowLock(c, increment);
+    }
+
+    @Override
+    public Result postIncrement(ObserverContext<RegionCoprocessorEnvironment> c,
+            Increment increment, Result result) throws IOException {
+        return delegate.postIncrement(c, increment, result);
+    }
+
+    @Override
+    public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment>
c, Scan scan,
+            RegionScanner s) throws IOException {
+        return delegate.preScannerOpen(c, scan, s);
+    }
+
+    @Override
+    public KeyValueScanner preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment>
c,
+            Store store, Scan scan, NavigableSet<byte[]> targetCols, KeyValueScanner
s)
+            throws IOException {
+        return delegate.preStoreScannerOpen(c, store, scan, targetCols, s);
+    }
+
+    @Override
+    public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment>
c,
+            Scan scan, RegionScanner s) throws IOException {
+        return delegate.postScannerOpen(c, scan, s);
+    }
+
+    @Override
+    public boolean preScannerNext(ObserverContext<RegionCoprocessorEnvironment> c,
+            InternalScanner s, List<Result> result, int limit, boolean hasNext) throws
IOException {
+        return delegate.preScannerNext(c, s, result, limit, hasNext);
+    }
+
+    @Override
+    public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> c,
+            InternalScanner s, List<Result> result, int limit, boolean hasNext) throws
IOException {
+        return delegate.postScannerNext(c, s, result, limit, hasNext);
+    }
+
+    @Override
+    public boolean postScannerFilterRow(ObserverContext<RegionCoprocessorEnvironment>
c,
+            InternalScanner s, byte[] currentRow, int offset, short length, boolean hasMore)
+            throws IOException {
+        return delegate.postScannerFilterRow(c, s, currentRow, offset, length, hasMore);
+    }
+
+    @Override
+    public void preScannerClose(ObserverContext<RegionCoprocessorEnvironment> c, InternalScanner
s)
+            throws IOException {
+        delegate.preScannerClose(c, s);
+    }
+
+    @Override
+    public void
+            postScannerClose(ObserverContext<RegionCoprocessorEnvironment> c, InternalScanner
s)
+                    throws IOException {
+        delegate.postScannerClose(c, s);
+    }
+
+    @Override
+    public void preWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment>
ctx,
+            HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
+        delegate.preWALRestore(ctx, info, logKey, logEdit);
+    }
+
+    @Override
+    public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> ctx, HRegionInfo
info,
+            HLogKey logKey, WALEdit logEdit) throws IOException {
+        delegate.preWALRestore(ctx, info, logKey, logEdit);
+    }
+
+    @Override
+    public void postWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment>
ctx,
+            HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
+        delegate.postWALRestore(ctx, info, logKey, logEdit);
+    }
+
+    @Override
+    public void postWALRestore(ObserverContext<RegionCoprocessorEnvironment> ctx, HRegionInfo
info,
+            HLogKey logKey, WALEdit logEdit) throws IOException {
+        delegate.postWALRestore(ctx, info, logKey, logEdit);
+    }
+
+    @Override
+    public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
+            List<Pair<byte[], String>> familyPaths) throws IOException {
+        delegate.preBulkLoadHFile(ctx, familyPaths);
+    }
+
+    @Override
+    public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment>
ctx,
+            List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws
IOException {
+        return delegate.postBulkLoadHFile(ctx, familyPaths, hasLoaded);
+    }
+
+    @Override
+    public Reader preStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment>
ctx,
+            FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
+            Reference r, Reader reader) throws IOException {
+        return delegate.preStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, reader);
+    }
+
+    @Override
+    public Reader postStoreFileReaderOpen(ObserverContext<RegionCoprocessorEnvironment>
ctx,
+            FileSystem fs, Path p, FSDataInputStreamWrapper in, long size, CacheConfig cacheConf,
+            Reference r, Reader reader) throws IOException {
+        return delegate.postStoreFileReaderOpen(ctx, fs, p, in, size, cacheConf, r, reader);
+    }
+
+    @Override
+    public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment>
ctx,
+            MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException
{
+        return delegate.postMutationBeforeWAL(ctx, opType, mutation, oldCell, newCell);
+    }
+
+    @Override
+    public DeleteTracker postInstantiateDeleteTracker(
+            ObserverContext<RegionCoprocessorEnvironment> ctx, DeleteTracker delTracker)
+            throws IOException {
+        return delegate.postInstantiateDeleteTracker(ctx, delTracker);
+    }
+
+    @Override
+    public void start(CoprocessorEnvironment arg0) throws IOException {
+        delegate.start(arg0);
+    }
+
+    @Override
+    public void stop(CoprocessorEnvironment arg0) throws IOException {
+        delegate.stop(arg0);
+    }
+    
+    
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbde8c38/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
new file mode 100644
index 0000000..30df42f
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTransactionalProcessor.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import co.cask.tephra.hbase10.coprocessor.TransactionProcessor;
+
+public class PhoenixTransactionalProcessor extends DelegateRegionObserver {
+
+    public PhoenixTransactionalProcessor() {
+        super(new TransactionProcessor());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/dbde8c38/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 593a161..b0e8509 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -58,12 +58,6 @@ import java.util.concurrent.TimeoutException;
 
 import javax.annotation.concurrent.GuardedBy;
 
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.distributed.PooledClientProvider;
-import co.cask.tephra.distributed.TransactionServiceClient;
-import co.cask.tephra.hbase10.coprocessor.TransactionProcessor;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
@@ -100,6 +94,7 @@ import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode;
 import org.apache.phoenix.coprocessor.MetaDataRegionObserver;
+import org.apache.phoenix.coprocessor.PhoenixTransactionalProcessor;
 import org.apache.phoenix.coprocessor.ScanRegionObserver;
 import org.apache.phoenix.coprocessor.SequenceRegionObserver;
 import org.apache.phoenix.coprocessor.ServerCachingEndpointImpl;
@@ -193,6 +188,11 @@ import org.apache.twill.zookeeper.ZKClients;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import co.cask.tephra.TransactionSystemClient;
+import co.cask.tephra.TxConstants;
+import co.cask.tephra.distributed.PooledClientProvider;
+import co.cask.tephra.distributed.TransactionServiceClient;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Throwables;
@@ -866,13 +866,13 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
             }
             
             if (isTransactional) {
-                if (!descriptor.hasCoprocessor(TransactionProcessor.class.getName())) {
-                    descriptor.addCoprocessor(TransactionProcessor.class.getName(), null,
priority - 10, null);
+                if (!descriptor.hasCoprocessor(PhoenixTransactionalProcessor.class.getName()))
{
+                    descriptor.addCoprocessor(PhoenixTransactionalProcessor.class.getName(),
null, priority - 10, null);
                 }
             } else {
                 // If exception on alter table to transition back to non transactional
-                if (descriptor.hasCoprocessor(TransactionProcessor.class.getName())) {
-                    descriptor.removeCoprocessor(TransactionProcessor.class.getName());
+                if (descriptor.hasCoprocessor(PhoenixTransactionalProcessor.class.getName()))
{
+                    descriptor.removeCoprocessor(PhoenixTransactionalProcessor.class.getName());
                 }                
             }
         } catch (IOException e) {
@@ -1041,7 +1041,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
                 } else {
                     // If we think we're creating a non transactional table when it's already
                     // transactional, don't allow.
-                    if (existingDesc.hasCoprocessor(TransactionProcessor.class.getName()))
{
+                    if (existingDesc.hasCoprocessor(PhoenixTransactionalProcessor.class.getName()))
{
                         throw new SQLExceptionInfo.Builder(SQLExceptionCode.TX_MAY_NOT_SWITCH_TO_NON_TX)
                         .setSchemaName(SchemaUtil.getSchemaNameFromFullName(tableName))
                         .setTableName(SchemaUtil.getTableNameFromFullName(tableName)).build().buildException();


Mime
View raw message