phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject phoenix git commit: Fix upgrade for system.sequence table
Date Fri, 07 Nov 2014 18:27:03 GMT
Repository: phoenix
Updated Branches:
  refs/heads/3.0 cb4d2f01e -> bef7c0ed2


Fix upgrade for system.sequence table


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/bef7c0ed
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/bef7c0ed
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/bef7c0ed

Branch: refs/heads/3.0
Commit: bef7c0ed2f6cf98834f1827239f4816acf5bf4a9
Parents: cb4d2f0
Author: James Taylor <jtaylor@salesforce.com>
Authored: Fri Nov 7 10:16:03 2014 -0800
Committer: James Taylor <jtaylor@salesforce.com>
Committed: Fri Nov 7 10:26:36 2014 -0800

----------------------------------------------------------------------
 .../org/apache/phoenix/util/UpgradeUtil.java    | 116 +++++++++++++++++--
 1 file changed, 109 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/bef7c0ed/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
index 12d3938..1e57941 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/UpgradeUtil.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.util.List;
 
+import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
@@ -53,33 +54,134 @@ public class UpgradeUtil {
     private UpgradeUtil() {
     }
 
+    private static byte[] getSequenceSnapshotName() {
+        return Bytes.toBytes("_BAK_" + PhoenixDatabaseMetaData.SEQUENCE_FULLNAME);
+    }
+    
+    private static void createSequenceSnapshot(HBaseAdmin admin, PhoenixConnection conn)
throws SQLException {
+        byte[] tableName = getSequenceSnapshotName();
+        HColumnDescriptor columnDesc = new HColumnDescriptor(PhoenixDatabaseMetaData.SEQUENCE_FAMILY_BYTES);
+        HTableDescriptor desc = new HTableDescriptor(tableName);
+        desc.addFamily(columnDesc);
+        try {
+            admin.createTable(desc);
+            copyTable(conn, PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES, tableName);
+        } catch (IOException e) {
+            throw ServerUtil.parseServerException(e);
+        }
+    }
+    
+    private static void restoreSequenceSnapshot(HBaseAdmin admin, PhoenixConnection conn)
throws SQLException {
+        byte[] tableName = getSequenceSnapshotName();
+        copyTable(conn, tableName, PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
+    }
+    
+    private static void deleteSequenceSnapshot(HBaseAdmin admin) throws SQLException {
+        byte[] tableName = getSequenceSnapshotName();
+        try {
+            admin.disableTable(tableName);;
+            admin.deleteTable(tableName);
+        } catch (IOException e) {
+            throw ServerUtil.parseServerException(e);
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    private static void copyTable(PhoenixConnection conn, byte[] sourceName, byte[] targetName)
throws SQLException {
+        int batchSizeBytes = 100 * 1024; // 100K chunks
+        int sizeBytes = 0;
+        List<Mutation> mutations =  Lists.newArrayListWithExpectedSize(10000);
+
+        Scan scan = new Scan();
+        scan.setRaw(true);
+        scan.setMaxVersions(MetaDataProtocol.DEFAULT_MAX_META_DATA_VERSIONS);
+        ResultScanner scanner = null;
+        HTableInterface source = null;
+        HTableInterface target = null;
+        try {
+            source = conn.getQueryServices().getTable(sourceName);
+            target = conn.getQueryServices().getTable(targetName);
+            scanner = source.getScanner(scan);
+            Result result;
+             while ((result = scanner.next()) != null) {
+                for (KeyValue keyValue : result.raw()) {
+                    sizeBytes += keyValue.getLength();
+                    if (KeyValue.Type.codeToType(keyValue.getType()) == KeyValue.Type.Put)
{
+                        // Put new value
+                        Put put = new Put(keyValue.getRow());
+                        put.add(keyValue);
+                        mutations.add(put);
+                    } else if (KeyValue.Type.codeToType(keyValue.getType()) == KeyValue.Type.Delete){
+                        // Copy delete marker using new key so that it continues
+                        // to delete the key value preceding it that will be updated
+                        // as well.
+                        Delete delete = new Delete(keyValue.getRow());
+                        delete.addDeleteMarker(keyValue);
+                        mutations.add(delete);
+                    }
+                }
+                if (sizeBytes >= batchSizeBytes) {
+                    logger.info("Committing bactch of temp rows");
+                    target.batch(mutations);
+                    mutations.clear();
+                    sizeBytes = 0;
+                }
+            }
+            if (!mutations.isEmpty()) {
+                logger.info("Committing last bactch of temp rows");
+                target.batch(mutations);
+            }
+            logger.info("Successfully completed copy");
+        } catch (SQLException e) {
+            throw e;
+        } catch (Exception e) {
+            throw ServerUtil.parseServerException(e);
+        } finally {
+            try {
+                if (scanner != null) scanner.close();
+            } finally {
+                try {
+                    if (source != null) source.close();
+                } catch (IOException e) {
+                    logger.warn("Exception during close of source table",e);
+                } finally {
+                    try {
+                        if (target != null) target.close();
+                    } catch (IOException e) {
+                        logger.warn("Exception during close of target table",e);
+                    }
+                }
+            }
+        }
+    }
+    
     private static void preSplitSequenceTable(PhoenixConnection conn, int nSaltBuckets) throws
SQLException {
         HBaseAdmin admin = conn.getQueryServices().getAdmin();
         boolean snapshotCreated = false;
+        boolean success = false;
         try {
             if (nSaltBuckets <= 0) {
                 return;
             }
             logger.warn("Pre-splitting SYSTEM.SEQUENCE table " + nSaltBuckets + "-ways. This
may take some time - please do not close window.");
             HTableDescriptor desc = admin.getTableDescriptor(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME_BYTES);
-            admin.snapshot(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME, PhoenixDatabaseMetaData.SEQUENCE_FULLNAME);
+            createSequenceSnapshot(admin, conn);
             snapshotCreated = true;
             admin.disableTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME);
             admin.deleteTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME);
             byte[][] splitPoints = SaltingUtil.getSalteByteSplitPoints(nSaltBuckets);
             admin.createTable(desc, splitPoints);
-            admin.disableTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME);
-            admin.restoreSnapshot(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME);
-            admin.enableTable(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME);
+            restoreSequenceSnapshot(admin, conn);
+            success = true;
             logger.warn("Completed pre-splitting SYSTEM.SEQUENCE table");
         } catch (IOException e) {
             throw new SQLException("Unable to pre-split SYSTEM.SEQUENCE table", e);
         } finally {
             try {
-                if (snapshotCreated) {
+                if (snapshotCreated && success) {
                     try {
-                        admin.deleteSnapshot(PhoenixDatabaseMetaData.SEQUENCE_FULLNAME);
-                    } catch (IOException e) {
+                        deleteSequenceSnapshot(admin);
+                    } catch (SQLException e) {
                         logger.warn("Exception while deleting SYSTEM.SEQUENCE snapshot during
pre-split", e);
                     }
                 }


Mime
View raw message