phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [1/2] phoenix git commit: PHOENIX-4278 Implement pure client side transactional index maintenance (Ohad Shacham)
Date Mon, 12 Feb 2018 23:24:29 GMT
Repository: phoenix
Updated Branches:
  refs/heads/master 9461d0d6a -> 0d0c0feae


http://git-wip-us.apache.org/repos/asf/phoenix/blob/0d0c0fea/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
index 5b85da5..7c154f0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexPartialBuildMapper.java
@@ -33,9 +33,11 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -105,6 +107,7 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr
         try {
             byte[] attribValue = ByteUtil.copyKeyBytesIfNecessary(maintainers);
             byte[] uuidValue = ServerCacheClient.generateId();
+            byte[] clientVersion = Bytes.toBytes(MetaDataProtocol.PHOENIX_VERSION);
             Put put = null;
             Delete del = null;
             for (Cell cell : value.rawCells()) {
@@ -113,6 +116,7 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr
                         put = new Put(CellUtil.cloneRow(cell));
                         put.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                         put.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue);
+                        put.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, clientVersion);
                         put.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES);
                         mutations.add(put);
                     }
@@ -122,6 +126,7 @@ public class PhoenixIndexPartialBuildMapper extends TableMapper<ImmutableBytesWr
                         del = new Delete(CellUtil.cloneRow(cell));
                         del.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                         del.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, attribValue);
+                        del.setAttribute(PhoenixIndexCodec.CLIENT_VERSION, clientVersion);
                         del.setAttribute(BaseScannerRegionObserver.REPLAY_WRITES, BaseScannerRegionObserver.REPLAY_ONLY_INDEX_WRITES);
                         mutations.add(del);
                     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0d0c0fea/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 5b7735e..6926c4e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -176,7 +176,6 @@ import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.VersionUtil;
 import org.apache.phoenix.index.PhoenixIndexBuilder;
 import org.apache.phoenix.index.PhoenixIndexCodec;
-import org.apache.phoenix.index.PhoenixTransactionalIndexer;
 import org.apache.phoenix.iterate.TableResultIterator;
 import org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -850,19 +849,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices
implement
                     && !SchemaUtil.isMetaTable(tableName)
                     && !SchemaUtil.isStatsTable(tableName)) {
                 if (isTransactional) {
-                    if (!descriptor.hasCoprocessor(PhoenixTransactionalIndexer.class.getName()))
{
-                        descriptor.addCoprocessor(PhoenixTransactionalIndexer.class.getName(),
null, priority, null);
-                    }
                     // For alter table, remove non transactional index coprocessor
                     if (descriptor.hasCoprocessor(Indexer.class.getName())) {
                         descriptor.removeCoprocessor(Indexer.class.getName());
                     }
                 } else {
                     if (!descriptor.hasCoprocessor(Indexer.class.getName())) {
-                        // If exception on alter table to transition back to non transactional
-                        if (descriptor.hasCoprocessor(PhoenixTransactionalIndexer.class.getName()))
{
-                            descriptor.removeCoprocessor(PhoenixTransactionalIndexer.class.getName());
-                        }
                         Map<String, String> opts = Maps.newHashMapWithExpectedSize(1);
                         opts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY, PhoenixIndexCodec.class.getName());
                         Indexer.enableIndexing(descriptor, PhoenixIndexBuilder.class, opts,
priority);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0d0c0fea/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 9525127..fb30bc7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -1038,32 +1038,21 @@ public class PTableImpl implements PTable {
         @Override
         public void delete() {
             newMutations();
-            // we're using the Tephra column family delete marker here to prevent the translation

-            // of deletes to puts by the Tephra's TransactionProcessor
-            if (PTableImpl.this.isTransactional()) {
-                Put put = new Put(key);
-                if (families.isEmpty()) {
-                    put.add(SchemaUtil.getEmptyColumnFamily(PTableImpl.this), TransactionFactory.getTransactionFactory().getTransactionContext().getFamilyDeleteMarker(),
ts,
-                            HConstants.EMPTY_BYTE_ARRAY);
-                } else {
-                    for (PColumnFamily colFamily : families) {
-                        put.add(colFamily.getName().getBytes(), TransactionFactory.getTransactionFactory().getTransactionContext().getFamilyDeleteMarker(),
ts,
-                                HConstants.EMPTY_BYTE_ARRAY);
-                    }
-                }
-                deleteRow = put;                
+            Delete delete = new Delete(key);
+            if (families.isEmpty()) {
+                delete.deleteFamily(SchemaUtil.getEmptyColumnFamily(PTableImpl.this), ts);
             } else {
-                Delete delete = new Delete(key);
                 for (PColumnFamily colFamily : families) {
-                	delete.deleteFamily(colFamily.getName().getBytes(), ts);
+                    delete.deleteFamily(colFamily.getName().getBytes(), ts);
                 }
-                deleteRow = delete;
             }
+
+            deleteRow = delete;
+
             if (isWALDisabled()) {
                 deleteRow.setDurability(Durability.SKIP_WAL);
             }
         }
-        
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0d0c0fea/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java b/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java
index f4c83b2..c741f4e 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/cache/TenantCacheTest.java
@@ -28,6 +28,7 @@ import java.sql.SQLException;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.memory.GlobalMemoryManager;
@@ -47,7 +48,7 @@ public class TenantCacheTest {
         TenantCacheImpl newTenantCache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive);
         ImmutableBytesPtr cacheId = new ImmutableBytesPtr(Bytes.toBytes("a"));
         ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("a"));
-        newTenantCache.addServerCache(cacheId, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory,
true);
+        newTenantCache.addServerCache(cacheId, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory,
true, MetaDataProtocol.PHOENIX_VERSION);
         assertEquals(maxBytes-1, memoryManager.getAvailableMemory());
         newTenantCache.removeServerCache(cacheId);
         assertEquals(maxBytes, memoryManager.getAvailableMemory());
@@ -62,7 +63,7 @@ public class TenantCacheTest {
         TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive,
ticker);
         ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes("a"));
         ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("a"));
-        cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory,
true);
+        cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory,
true, MetaDataProtocol.PHOENIX_VERSION);
         assertEquals(maxBytes-1, memoryManager.getAvailableMemory());
         ticker.time += (maxServerCacheTimeToLive + 1) * 1000000;
         cache.cleanUp();
@@ -79,7 +80,7 @@ public class TenantCacheTest {
         TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive,
ticker);
         ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes("a"));
         ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("a"));
-        cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory,
true);
+        cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory,
true, MetaDataProtocol.PHOENIX_VERSION);
         assertEquals(maxBytes-1, memoryManager.getAvailableMemory());
         ticker.time += (maxServerCacheTimeToLive + 1) * 1000000;
         assertNull(cache.getServerCache(cacheId1));
@@ -94,12 +95,11 @@ public class TenantCacheTest {
         ManualTicker ticker = new ManualTicker();
         TenantCacheImpl cache = new TenantCacheImpl(memoryManager, maxServerCacheTimeToLive,
ticker);
         ImmutableBytesPtr cacheId1 = new ImmutableBytesPtr(Bytes.toBytes("a"));
-        ImmutableBytesPtr cacheId2 = new ImmutableBytesPtr(Bytes.toBytes("b"));
         ImmutableBytesWritable cachePtr = new ImmutableBytesWritable(Bytes.toBytes("12345678"));
-        cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory,
true);
+        cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory,
true, MetaDataProtocol.PHOENIX_VERSION);
         assertEquals(2, memoryManager.getAvailableMemory());
         ticker.time += (maxServerCacheTimeToLive + 1) * 1000000;
-        cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory,
true);
+        cache.addServerCache(cacheId1, cachePtr, ByteUtil.EMPTY_BYTE_ARRAY, cacheFactory,
true, MetaDataProtocol.PHOENIX_VERSION);
         assertEquals(2, memoryManager.getAvailableMemory());
     }
 
@@ -124,7 +124,7 @@ public class TenantCacheTest {
         }
 
         @Override
-        public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk
chunk, boolean useProtoForIndexMaintainer)
+        public Closeable newCache(ImmutableBytesWritable cachePtr, byte[] txState, MemoryChunk
chunk, boolean useProtoForIndexMaintainer, int clientVersion)
                 throws SQLException {
             return chunk;
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/0d0c0fea/phoenix-protocol/src/main/ServerCachingService.proto
----------------------------------------------------------------------
diff --git a/phoenix-protocol/src/main/ServerCachingService.proto b/phoenix-protocol/src/main/ServerCachingService.proto
index 044c111..c059a1a 100644
--- a/phoenix-protocol/src/main/ServerCachingService.proto
+++ b/phoenix-protocol/src/main/ServerCachingService.proto
@@ -71,6 +71,7 @@ message AddServerCacheRequest {
   required ServerCacheFactory cacheFactory = 4;
   optional bytes txState = 5;
   optional bool hasProtoBufIndexMaintainer = 6;
+  optional int32 clientVersion = 7;
 }
 
 message AddServerCacheResponse {


Mime
View raw message