From commits-return-2853-apmail-trafodion-commits-archive=trafodion.apache.org@trafodion.apache.org Fri Jul 8 07:39:34 2016 Return-Path: X-Original-To: apmail-trafodion-commits-archive@www.apache.org Delivered-To: apmail-trafodion-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 27F3410459 for ; Fri, 8 Jul 2016 07:39:34 +0000 (UTC) Received: (qmail 13929 invoked by uid 500); 8 Jul 2016 07:39:34 -0000 Delivered-To: apmail-trafodion-commits-archive@trafodion.apache.org Received: (qmail 13897 invoked by uid 500); 8 Jul 2016 07:39:34 -0000 Mailing-List: contact commits-help@trafodion.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@trafodion.apache.org Delivered-To: mailing list commits@trafodion.apache.org Received: (qmail 13888 invoked by uid 99); 8 Jul 2016 07:39:33 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Jul 2016 07:39:33 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 6B45DC0457 for ; Fri, 8 Jul 2016 07:39:33 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id O75TD1vlcqSK for ; Fri, 8 Jul 2016 07:39:26 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 29AE05FBAA for ; Fri, 8 Jul 2016 07:39:24 +0000 (UTC) Received: (qmail 13537 invoked by uid 99); 8 Jul 2016 07:39:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Jul 2016 07:39:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 15BEEE03C0; Fri, 8 Jul 2016 07:39:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sandhya@apache.org To: commits@trafodion.incubator.apache.org Date: Fri, 08 Jul 2016 07:39:24 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/6] incubator-trafodion git commit: [TRAFODION-1988] Better Java exception handling in Trafodion - Part2 [TRAFODION-1988] Better Java exception handling in Trafodion - Part2 This time the focus is on hbasetmlib2 improvements Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/f585bc80 Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/f585bc80 Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/f585bc80 Branch: refs/heads/master Commit: f585bc80f73cd4a303b87be298436e87b405f725 Parents: a4ec8cd Author: selvaganesang Authored: Fri Jun 24 23:48:04 2016 +0000 Committer: selvaganesang Committed: Sat Jun 25 04:52:03 2016 +0000 ---------------------------------------------------------------------- .../hbase/client/transactional/TmDDL.java | 65 +-- .../trafodion/dtm/HBaseAuditControlPoint.java | 150 ++---- .../main/java/org/trafodion/dtm/HBaseTmZK.java | 22 +- .../java/org/trafodion/dtm/HBaseTxClient.java | 501 +++++++------------ .../java/org/trafodion/dtm/TmAuditTlog.java | 318 +++--------- .../main/java/org/trafodion/dtm/TrafInfo.java | 12 +- .../java/org/trafodion/sql/HBaseClient.java | 14 +- .../java/org/trafodion/sql/HBulkLoadClient.java | 23 +- .../java/org/trafodion/sql/HTableClient.java | 21 +- .../java/org/trafodion/sql/OrcFileReader.java | 52 +- .../org/trafodion/sql/SequenceFileWriter.java | 75 ++- 11 files changed, 398 insertions(+), 855 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f585bc80/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java index 783b70c..72f0b9f 100644 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java @@ -68,7 +68,7 @@ public class TmDDL { private static Object tablePutLock; // Lock for synchronizing table.put operations private static HTable table; - public TmDDL (Configuration config) throws Exception { + public TmDDL (Configuration config) throws IOException { this.config = config; this.dtmid = Integer.parseInt(config.get("dtmid")); @@ -76,26 +76,14 @@ public class TmDDL { if (LOG.isTraceEnabled()) LOG.trace("Enter TmDDL constructor for dtmid: " + dtmid); - try { - hbadmin = new HBaseAdmin(config); - } catch(Exception e) { - LOG.error("Unable to obtain HBaseAdmin accessor, exiting with exception: " + e); - e.printStackTrace(); - System.exit(1); - } + hbadmin = new HBaseAdmin(config); boolean tDDLTableExists = hbadmin.tableExists(tablename); if(tDDLTableExists==false && dtmid ==0) { - try { HTableDescriptor desc = new HTableDescriptor(tablename); desc.addFamily(new HColumnDescriptor(TDDL_FAMILY)); hbadmin.createTable(desc); - } catch(Exception e) { - LOG.error("Unable to create TDDL table, exception: " + e); - e.printStackTrace(); - throw e; - } } tablePutLock = new Object(); @@ -217,40 +205,25 @@ public class TmDDL { if (LOG.isTraceEnabled()) LOG.trace("TmDDL putRow exit, TxId:" + transid); } - public void setState(final long transid, final String state) throws Exception { + public void setState(final long transid, final String state) throws IOException { long threadId = Thread.currentThread().getId(); if (LOG.isTraceEnabled()) LOG.trace("TmDDL setState start in thread: " + threadId + "TxId:" + transid + "State :" + state); Put p = new Put(Bytes.toBytes(transid)); p.add(TDDL_FAMILY, TDDL_STATE, Bytes.toBytes(state)); - try { - synchronized (tablePutLock) { - try { - if (LOG.isTraceEnabled()) LOG.trace("TmDDL setState method. table.put. TxId: " + transid + "Put:" + p ); - - table.put(p); - } - catch (Exception e2){ - //Avoiding logging within a lock. Throwing Exception. - throw e2; - } - } // End global synchronization - } - catch (Exception e) { - //create record of the exception - LOG.error("TmDDL setState method. tablePutLock or Table.put Exception. TxID: " + transid + "Exception :" + e); - throw e; - } + synchronized (tablePutLock) { + if (LOG.isTraceEnabled()) LOG.trace("TmDDL setState method. table.put. TxId: " + transid + "Put:" + p ); + table.put(p); + } // End global synchronization if (LOG.isTraceEnabled()) LOG.trace("TmDDL setState exit, TxID:" + transid); } public void getRow(final long lvTransid, StringBuilder state, ArrayList createList, ArrayList dropList, ArrayList truncateList) - throws IOException, Exception { + throws IOException { if (LOG.isTraceEnabled()) LOG.trace("TmDDL getRow start, TxID: " + lvTransid); String recordString = null; StringTokenizer st = null; byte [] value = null; - try { Get g = new Get(Bytes.toBytes(lvTransid)); Result r = table.get(g); @@ -296,18 +269,11 @@ public class TmDDL { state.append(Bytes.toString(value)); } } - - } - catch(Exception e){ - LOG.error("TmDDL getRow Exception, TxId: " + lvTransid + "Exception:" + e); - throw e; - } } - public void getState(final long lvTransid, StringBuilder state) throws IOException, Exception { + public void getState(final long lvTransid, StringBuilder state) throws IOException { if (LOG.isTraceEnabled()) LOG.trace("TmDDL getState start, TxID:" + lvTransid); byte [] value = null; - try { Get g = new Get(Bytes.toBytes(lvTransid)); Result r = table.get(g); @@ -327,22 +293,11 @@ public class TmDDL { { state.append("INVALID"); } - } - catch(Exception e){ - LOG.error("TmDDL getState Exception, TxID: " + lvTransid + "Exception: " + e); - throw e; - } } - public void deleteRow(final long lvTransid) throws IOException, Exception { + public void deleteRow(final long lvTransid) throws IOException { if (LOG.isTraceEnabled()) LOG.trace("TmDDL deleteRow start, TxID: " + lvTransid); - try { Delete d = new Delete(Bytes.toBytes(lvTransid)); table.delete(d); - } - catch(Exception e){ - LOG.error("TmDDL deleteRow Exception, TxID: " + lvTransid + "Exception:" + e); - throw e; - } } } http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f585bc80/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java index e50fe3b..12128f5 100644 --- a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java +++ b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java @@ -96,15 +96,10 @@ public class HBaseAuditControlPoint { HColumnDescriptor hcol = new HColumnDescriptor(CONTROL_POINT_FAMILY); disableBlockCache = false; - try { - String blockCacheString = System.getenv("TM_TLOG_DISABLE_BLOCK_CACHE"); - if (blockCacheString != null){ - disableBlockCache = (Integer.parseInt(blockCacheString) != 0); - if (LOG.isDebugEnabled()) LOG.debug("disableBlockCache != null"); - } - } - catch (Exception e) { - if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_DISABLE_BLOCK_CACHE is not in ms.env"); + String blockCacheString = System.getenv("TM_TLOG_DISABLE_BLOCK_CACHE"); + if (blockCacheString != null){ + disableBlockCache = (Integer.parseInt(blockCacheString) != 0); + if (LOG.isDebugEnabled()) LOG.debug("disableBlockCache != null"); } LOG.info("disableBlockCache is " + disableBlockCache); if (disableBlockCache) { @@ -115,15 +110,10 @@ public class HBaseAuditControlPoint { admin = new HBaseAdmin(config); useAutoFlush = true; - try { - String autoFlush = System.getenv("TM_TLOG_AUTO_FLUSH"); - if (autoFlush != null){ - useAutoFlush = (Integer.parseInt(autoFlush) != 0); - if (LOG.isDebugEnabled()) LOG.debug("autoFlush != null"); - } - } - catch (Exception e) { - if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_AUTO_FLUSH is not in ms.env"); + String autoFlush = System.getenv("TM_TLOG_AUTO_FLUSH"); + if (autoFlush != null){ + useAutoFlush = (Integer.parseInt(autoFlush) != 0); + if (LOG.isDebugEnabled()) LOG.debug("autoFlush != null"); } LOG.info("useAutoFlush is " + useAutoFlush); @@ -140,22 +130,12 @@ public class HBaseAuditControlPoint { LOG.error("Table " + CONTROL_POINT_TABLE_NAME + " already exists"); } } - try { - if (LOG.isDebugEnabled()) LOG.debug("try new HTable"); - table = new HTable(config, desc.getName()); - table.setAutoFlushTo(this.useAutoFlush); - } - catch (IOException e) { - LOG.error("new HTable IOException"); - } + if (LOG.isDebugEnabled()) LOG.debug("try new HTable"); + table = new HTable(config, desc.getName()); + table.setAutoFlushTo(this.useAutoFlush); if (currControlPt == -1){ - try { - currControlPt = getCurrControlPt(); - } - catch (Exception e2) { - if (LOG.isDebugEnabled()) LOG.debug("Exit getCurrControlPoint() exception " + e2); - } + currControlPt = getCurrControlPt(); } if (LOG.isDebugEnabled()) LOG.debug("currControlPt is " + currControlPt); @@ -163,7 +143,7 @@ public class HBaseAuditControlPoint { return; } - public long getCurrControlPt() throws Exception { + public long getCurrControlPt() throws IOException { if (LOG.isTraceEnabled()) LOG.trace("getCurrControlPt: start"); long highKey = -1; if (LOG.isDebugEnabled()) LOG.debug("new Scan"); @@ -186,10 +166,6 @@ public class HBaseAuditControlPoint { highKey = currKey; } } - } - catch (Exception e) { - LOG.error("getCurrControlPt IOException" + e); - e.printStackTrace(); } finally { ss.close(); } @@ -197,22 +173,16 @@ public class HBaseAuditControlPoint { return highKey; } - public long putRecord(final long ControlPt, final long startingSequenceNumber) throws Exception { + public long putRecord(final long ControlPt, final long startingSequenceNumber) throws IOException { if (LOG.isTraceEnabled()) LOG.trace("putRecord starting sequence number (" + String.valueOf(startingSequenceNumber) + ")"); String controlPtString = new String(String.valueOf(ControlPt)); Put p = new Put(Bytes.toBytes(controlPtString)); p.add(CONTROL_POINT_FAMILY, ASN_HIGH_WATER_MARK, Bytes.toBytes(String.valueOf(startingSequenceNumber))); - try { - if (LOG.isTraceEnabled()) LOG.trace("try table.put with starting sequence number " + startingSequenceNumber); - table.put(p); - if (useAutoFlush == false) { - if (LOG.isTraceEnabled()) LOG.trace("flushing controlpoint record"); - table.flushCommits(); - } - } - catch (Exception e) { - LOG.error("HBaseAuditControlPoint:putRecord Exception" + e); - throw e; + if (LOG.isTraceEnabled()) LOG.trace("try table.put with starting sequence number " + startingSequenceNumber); + table.put(p); + if (useAutoFlush == false) { + if (LOG.isTraceEnabled()) LOG.trace("flushing controlpoint record"); + table.flushCommits(); } if (LOG.isTraceEnabled()) LOG.trace("HBaseAuditControlPoint:putRecord returning " + ControlPt); return ControlPt; @@ -243,21 +213,13 @@ public class HBaseAuditControlPoint { long lvValue = -1; Get g = new Get(Bytes.toBytes(controlPt)); String recordString; - try { - Result r = table.get(g); - byte [] currValue = r.getValue(CONTROL_POINT_FAMILY, ASN_HIGH_WATER_MARK); - try { - recordString = new String (Bytes.toString(currValue)); - if (LOG.isDebugEnabled()) LOG.debug("recordString is " + recordString); - lvValue = Long.parseLong(recordString, 10); - } - catch (NullPointerException e){ - if (LOG.isDebugEnabled()) LOG.debug("control point " + controlPt + " is not in the table"); - } - } - catch (IOException e){ - LOG.error("getRecord IOException"); - throw e; + Result r = table.get(g); + byte [] currValue = r.getValue(CONTROL_POINT_FAMILY, ASN_HIGH_WATER_MARK); + if (currValue != null) + { + recordString = new String (Bytes.toString(currValue)); + if (LOG.isDebugEnabled()) LOG.debug("recordString is " + recordString); + lvValue = Long.parseLong(recordString, 10); } if (LOG.isTraceEnabled()) LOG.trace("getRecord - exit " + lvValue); return lvValue; @@ -276,19 +238,13 @@ public class HBaseAuditControlPoint { + CONTROL_POINT_FAMILY + " ASN_HIGH_WATER_MARK " + ASN_HIGH_WATER_MARK); byte [] currValue = r.getValue(CONTROL_POINT_FAMILY, ASN_HIGH_WATER_MARK); if (LOG.isDebugEnabled()) LOG.debug("Starting asn setting recordString "); - String recordString = ""; - try { - recordString = new String(currValue); - } - catch (NullPointerException e) { - if (LOG.isDebugEnabled()) LOG.debug("getStartingAuditSeqNum recordString is null"); + if (currValue == null) lvAsn = 1; - if (LOG.isDebugEnabled()) LOG.debug("Starting asn is 1"); - return lvAsn; + else + { + String recordString = new String(currValue); + lvAsn = Long.valueOf(recordString); } - if (LOG.isDebugEnabled()) LOG.debug("getStartingAuditSeqNum recordString is good"); - if (LOG.isDebugEnabled()) LOG.debug("Starting asn for control point " + currControlPt + " is " + recordString); - lvAsn = Long.valueOf(recordString); if (LOG.isTraceEnabled()) LOG.trace("getStartingAuditSeqNum - exit returning " + lvAsn); return lvAsn; } @@ -323,19 +279,10 @@ public class HBaseAuditControlPoint { highValue = currValue; } } - } - catch (Exception e) { - LOG.error("getNextAuditSeqNum IOException" + e); - e.printStackTrace(); } finally { ss.close(); } - } - catch (IOException e) { - LOG.error("getNextAuditSeqNum IOException setting up scan for " + lv_tName); - e.printStackTrace(); - } - finally { + } finally { try { remoteTable.close(); remoteConnection.close(); @@ -351,17 +298,9 @@ public class HBaseAuditControlPoint { public long doControlPoint(final long sequenceNumber) throws IOException { - if (LOG.isTraceEnabled()) LOG.trace("doControlPoint start"); - try { - currControlPt++; - if (LOG.isTraceEnabled()) LOG.trace("doControlPoint interval (" + currControlPt + "), sequenceNumber (" + sequenceNumber+ ") try putRecord"); - putRecord(currControlPt, sequenceNumber); - } - catch (Exception e) { - LOG.error("doControlPoint Exception" + e); - } - - if (LOG.isTraceEnabled()) LOG.trace("doControlPoint - exit"); + currControlPt++; + if (LOG.isTraceEnabled()) LOG.trace("doControlPoint interval (" + currControlPt + "), sequenceNumber (" + sequenceNumber+ ") try putRecord"); + putRecord(currControlPt, sequenceNumber); return currControlPt; } @@ -369,16 +308,10 @@ public class HBaseAuditControlPoint { if (LOG.isTraceEnabled()) LOG.trace("deleteRecord start for control point " + controlPoint); String controlPtString = new String(String.valueOf(controlPoint)); - try { - List list = new ArrayList(); - Delete del = new Delete(Bytes.toBytes(controlPtString)); - if (LOG.isDebugEnabled()) LOG.debug("deleteRecord (" + controlPtString + ") "); - table.delete(del); - } - catch (Exception e) { - LOG.error("deleteRecord IOException"); - } - + List list = new ArrayList(); + Delete del = new Delete(Bytes.toBytes(controlPtString)); + if (LOG.isDebugEnabled()) LOG.debug("deleteRecord (" + controlPtString + ") "); + table.delete(del); if (LOG.isTraceEnabled()) LOG.trace("deleteRecord - exit"); return true; } @@ -404,10 +337,7 @@ public class HBaseAuditControlPoint { } if (LOG.isDebugEnabled()) LOG.debug("attempting to delete list with " + deleteList.size() + " elements"); table.delete(deleteList); - } - catch (Exception e) { - LOG.error("deleteAgedRecords IOException ", e); - }finally { + } finally { ss.close(); } http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f585bc80/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTmZK.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTmZK.java b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTmZK.java index 27411ec..82eaa9d 100644 --- a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTmZK.java +++ b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTmZK.java @@ -68,9 +68,9 @@ public class HBaseTmZK implements Abortable{ /** * @param conf - * @throws Exception + * @throws IOException */ - public HBaseTmZK(final Configuration conf) throws Exception { + public HBaseTmZK(final Configuration conf) throws IOException { if (LOG.isTraceEnabled()) LOG.trace("HBaseTmZK(conf) -- ENTRY"); this.dtmID = 0; this.zkNode = baseNode + "0"; @@ -80,9 +80,9 @@ public class HBaseTmZK implements Abortable{ /** * @param conf * @param dtmID - * @throws Exception + * @throws IOException */ - public HBaseTmZK(final Configuration conf, final short dtmID) throws Exception { + public HBaseTmZK(final Configuration conf, final short dtmID) throws IOException { if (LOG.isTraceEnabled()) LOG.trace("HBaseTmZK(conf, dtmID) -- ENTRY"); this.dtmID = dtmID; this.zkNode = baseNode + String.format("%d", dtmID); @@ -176,7 +176,7 @@ public class HBaseTmZK implements Abortable{ } ZKUtil.createAndFailSilent(zooKeeper, zkNode + "/" + zNodeKey, data); } catch (KeeperException e) { - throw new IOException("HBaseTmZK:createRecoveryzNode: ZKW Unable to create recovery zNode: " + zkNode + " , throwing IOException " + e); + throw new IOException("HBaseTmZK:createRecoveryzNode: ZKW Unable to create recovery zNode: " + zkNode + " , throwing IOException ", e); } } /** @@ -193,7 +193,7 @@ public class HBaseTmZK implements Abortable{ String zNodeKey = dtmID+""; ZKUtil.createSetData(zooKeeper, zNodeGCPath + "/" + zNodeKey, data); } catch (KeeperException e) { - throw new IOException("HBaseTmZK:createGCzNode: ZKW Unable to create GC zNode: " + zNodeGCPath +" , throwing IOException " + e); + throw new IOException("HBaseTmZK:createGCzNode: ZKW Unable to create GC zNode: " + zNodeGCPath +" , throwing IOException ", e); } } @@ -216,14 +216,8 @@ public class HBaseTmZK implements Abortable{ String hostName = new String(tok.nextElement().toString()); int portNumber = Integer.parseInt(tok.nextElement().toString()); byte [] lv_byte_region_info = region.toByteArray(); - try{ - LOG.info("Calling createRecoveryzNode for encoded region: " + region.getEncodedName()); - createRecoveryzNode(hostName, portNumber, region.getEncodedName(), lv_byte_region_info); - } - catch (Exception e2){ - LOG.error("postAllRegionEntries exception in createRecoveryzNode " + region.getTable().getNameAsString() + - " exception: " + e2); - } + LOG.info("Calling createRecoveryzNode for encoded region: " + region.getEncodedName()); + createRecoveryzNode(hostName, portNumber, region.getEncodedName(), lv_byte_region_info); }// while } http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f585bc80/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java index 4b1c4d8..e6e09ac 100644 --- a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java +++ b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java @@ -71,6 +71,8 @@ import org.apache.hadoop.hbase.regionserver.transactional.IdTm; import org.apache.hadoop.hbase.regionserver.transactional.IdTmException; import org.apache.hadoop.hbase.regionserver.transactional.IdTmId; +import org.apache.zookeeper.KeeperException; + import java.util.Map; import java.util.HashMap; import java.util.concurrent.Callable; @@ -113,7 +115,7 @@ public class HBaseTxClient { PropertyConfigurator.configure(confFile); } - public boolean init(String hBasePath, String zkServers, String zkPort) throws Exception { + public boolean init(String hBasePath, String zkServers, String zkPort) throws IOException { //System.out.println("In init - hbp"); setupLog4j(); if (LOG.isDebugEnabled()) LOG.debug("Enter init, hBasePath:" + hBasePath); @@ -135,8 +137,8 @@ public class HBaseTxClient { useForgotten = (Integer.parseInt(useAuditRecords) != 0); } } - catch (Exception e) { - if (LOG.isDebugEnabled()) LOG.debug("TM_ENABLE_FORGOTTEN_RECORDS is not in ms.env"); + catch (NumberFormatException e) { + LOG.error("TM_ENABLE_FORGOTTEN_RECORDS is not valid in ms.env"); } LOG.info("useForgotten is " + useForgotten); @@ -148,8 +150,8 @@ public class HBaseTxClient { if (LOG.isDebugEnabled()) LOG.debug("forgottenForce != null"); } } - catch (Exception e) { - if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_FORCE_FORGOTTEN is not in ms.env"); + catch (NumberFormatException e) { + LOG.error("TM_TLOG_FORCE_FORGOTTEN is not valid in ms.env"); } LOG.info("forceForgotten is " + forceForgotten); @@ -161,32 +163,27 @@ public class HBaseTxClient { useTlog = useRecovThread = (Integer.parseInt(useAudit) != 0); } } - catch (Exception e) { - if (LOG.isDebugEnabled()) LOG.debug("TM_ENABLE_TLOG_WRITES is not in ms.env"); + catch (NumberFormatException e) { + LOG.error("TM_ENABLE_TLOG_WRITES is not valid in ms.env"); } if (useTlog) { - try { tLog = new TmAuditTlog(config); - } catch (Exception e ){ - LOG.error("Unable to create TmAuditTlog, throwing exception"); - throw new RuntimeException(e); - } } try { trxManager = TransactionManager.getInstance(config); } catch (IOException e ){ - LOG.error("Unable to create TransactionManager, throwing exception"); - throw new RuntimeException(e); + LOG.error("Unable to create TransactionManager, throwing exception", e); + throw e; } if (useRecovThread) { if (LOG.isDebugEnabled()) LOG.debug("Starting recovery thread for tm ID: " + dtmID); try { tmZK = new HBaseTmZK(config, dtmID); - }catch (IOException e ){ - LOG.error("Unable to create HBaseTmZK TM-zookeeper class, throwing exception"); - throw new RuntimeException(e); + } catch (IOException e ){ + LOG.error("Unable to create HBaseTmZK TM-zookeeper class, throwing exception", e); + throw e; } recovThread = new RecoveryThread(tLog, tmZK, trxManager); recovThread.start(); @@ -195,7 +192,7 @@ public class HBaseTxClient { return true; } - public boolean init(short dtmid) throws Exception { + public boolean init(short dtmid) throws IOException { //System.out.println("In init - dtmId" + dtmid); setupLog4j(); @@ -214,89 +211,68 @@ public class HBaseTxClient { String useSSCC = System.getenv("TM_USE_SSCC"); TRANSACTION_ALGORITHM = AlgorithmType.MVCC; - if (useSSCC != null) - TRANSACTION_ALGORITHM = (Integer.parseInt(useSSCC) == 1) ? AlgorithmType.SSCC :AlgorithmType.MVCC ; - try { - idServer = new IdTm(false); - } - catch (Exception e){ - LOG.error("Exception creating new IdTm: " + e); + if (useSSCC != null) + TRANSACTION_ALGORITHM = (Integer.parseInt(useSSCC) == 1) ? AlgorithmType.SSCC :AlgorithmType.MVCC ; + } catch (NumberFormatException e) { + LOG.error("TRANSACTION_ALGORITHM is not valid in ms.env"); } + idServer = new IdTm(false); + + String useDDLTransactions = System.getenv("TM_ENABLE_DDL_TRANS"); + try { - String useDDLTransactions = System.getenv("TM_ENABLE_DDL_TRANS"); if (useDDLTransactions != null) { useDDLTrans = (Integer.parseInt(useDDLTransactions) != 0); } } - catch (Exception e) { - if (LOG.isDebugEnabled()) LOG.debug("TM_ENABLE_DDL_TRANS is not in ms.env"); + catch (NumberFormatException e) { + LOG.error("TM_ENABLE_DDL_TRANS is not valid in ms.env"); } - if(useDDLTrans){ - try { - tmDDL = new TmDDL(config); - } - catch (Exception e) { - LOG.error("Unable to create TmDDL, throwing exception " + e); - e.printStackTrace(); - throw new RuntimeException(e); - } - } + if (useDDLTrans) + tmDDL = new TmDDL(config); useForgotten = true; - try { String useAuditRecords = System.getenv("TM_ENABLE_FORGOTTEN_RECORDS"); + try { if (useAuditRecords != null) { useForgotten = (Integer.parseInt(useAuditRecords) != 0); } } - catch (Exception e) { - if (LOG.isDebugEnabled()) LOG.debug("TM_ENABLE_FORGOTTEN_RECORDS is not in ms.env"); + catch (NumberFormatException e) { + LOG.error("TM_ENABLE_FORGOTTEN_RECORDS is not valid in ms.env"); } LOG.info("useForgotten is " + useForgotten); forceForgotten = false; - try { String forgottenForce = System.getenv("TM_TLOG_FORCE_FORGOTTEN"); + try { if (forgottenForce != null){ forceForgotten = (Integer.parseInt(forgottenForce) != 0); - if (LOG.isDebugEnabled()) LOG.debug("forgottenForce != null"); } } - catch (Exception e) { - if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_FORCE_FORGOTTEN is not in ms.env"); + catch (NumberFormatException e) { + LOG.error("TM_TLOG_FORCE_FORGOTTEN is not valid in ms.env"); } LOG.info("forceForgotten is " + forceForgotten); useTlog = false; useRecovThread = false; - try { String useAudit = System.getenv("TM_ENABLE_TLOG_WRITES"); + try { if (useAudit != null){ useTlog = useRecovThread = (Integer.parseInt(useAudit) != 0); } } - catch (Exception e) { - if (LOG.isDebugEnabled()) LOG.debug("TM_ENABLE_TLOG_WRITES is not in ms.env"); + catch (NumberFormatException e) { + LOG.error("TM_ENABLE_TLOG_WRITES is not valid in ms.env"); } if (useTlog) { - try { - tLog = new TmAuditTlog(config); - } catch (Exception e ){ - LOG.error("Unable to create TmAuditTlog, throwing exception " + e); - e.printStackTrace(); - throw new RuntimeException(e); - } - } - try { - trxManager = TransactionManager.getInstance(config); - } catch (IOException e ){ - LOG.error("Unable to create TransactionManager, Exception: " + e + "throwing new RuntimeException"); - throw new RuntimeException(e); + tLog = new TmAuditTlog(config); } - + trxManager = TransactionManager.getInstance(config); if(useDDLTrans) trxManager.init(tmDDL); @@ -304,9 +280,9 @@ public class HBaseTxClient { if (LOG.isDebugEnabled()) LOG.debug("Entering recovThread Usage"); try { tmZK = new HBaseTmZK(config, dtmID); - }catch (IOException e ){ - LOG.error("Unable to create HBaseTmZK TM-zookeeper class, throwing exception"); - throw new RuntimeException(e); + } catch (IOException e ) { + LOG.error("Unable to create HBaseTmZK TM-zookeeper class, throwing exception", e); + throw e; } recovThread = new RecoveryThread(tLog, tmZK, @@ -332,7 +308,6 @@ public class HBaseTxClient { if(dtmID == nodeID) throw new IOException("Down node ID is the same as current dtmID, Incorrect parameter"); - try { if(mapRecoveryThreads.containsKey(nodeID)) { if(LOG.isDebugEnabled()) LOG.debug("nodeDown called on a node that already has RecoveryThread running node ID: " + nodeID); } @@ -348,10 +323,6 @@ public class HBaseTxClient { mapRecoveryThreads.put(nodeID, recovThread); if(LOG.isTraceEnabled()) LOG.trace("nodeDown -- mapRecoveryThreads size: " + mapRecoveryThreads.size()); } - } - catch(Exception e) { - LOG.error("Unable to create rescue recovery thread for TM" + dtmID); - } if(LOG.isTraceEnabled()) LOG.trace("nodeDown -- EXIT node ID: " + nodeID); } @@ -364,9 +335,15 @@ public class HBaseTxClient { return; } rt.stopThread(); + boolean loopBack = false; + do { try { rt.join(); - } catch (Exception e) { LOG.warn("Problem while waiting for the recovery thread to stop for node ID: " + nodeID); } + } catch (InterruptedException e) { + LOG.warn("Problem while waiting for the recovery thread to stop for node ID: " + nodeID, e); + loopBack = true; + } + } while (loopBack); mapRecoveryThreads.remove(nodeID); if(LOG.isTraceEnabled()) LOG.trace("nodeUp -- mapRecoveryThreads size: " + mapRecoveryThreads.size()); if(LOG.isTraceEnabled()) LOG.trace("nodeUp -- EXIT node ID: " + nodeID); @@ -378,15 +355,17 @@ public class HBaseTxClient { return TransReturnCode.RET_OK.getShort(); } - public long beginTransaction(final long transactionId) throws Exception { + public long beginTransaction(final long transactionId) throws IOException { if (LOG.isTraceEnabled()) LOG.trace("Enter beginTransaction, txid: " + transactionId); - TransactionState tx = trxManager.beginTransaction(transactionId); + TransactionState tx = null; + try { + tx = trxManager.beginTransaction(transactionId); + } catch (IdTmException ite) { + LOG.error("Begin Transaction Error caused by : ", ite); + throw new IOException("Begin Transaction Error caused by :", ite); + } if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:beginTransaction new transactionState created: " + tx); - if(tx == null) { - LOG.error("null Transaction State returned by the Transaction Manager, txid: " + transactionId); - throw new Exception("TransactionState is null"); - } synchronized(mapLock) { TransactionState tx2 = mapTransactionStates.get(transactionId); @@ -406,7 +385,7 @@ public class HBaseTxClient { return transactionId; } - public short abortTransaction(final long transactionID) throws Exception { + public short abortTransaction(final long transactionID) throws IOException { if (LOG.isDebugEnabled()) LOG.debug("Enter abortTransaction, txid: " + transactionID); TransactionState ts = mapTransactionStates.get(transactionID); @@ -420,14 +399,19 @@ public class HBaseTxClient { if (useTlog) { tLog.putSingleRecord(transactionID, -1, "ABORTED", ts.getParticipatingRegions(), false); } - } catch(Exception e) { - LOG.error("Returning from HBaseTxClient:abortTransaction, txid: " + transactionID + " tLog.putRecord: EXCEPTION"); + } catch(IOException e) { + LOG.error("Returning from HBaseTxClient:abortTransaction, txid: " + transactionID + " tLog.putRecord: EXCEPTION", e); return TransReturnCode.RET_EXCEPTION.getShort(); } if ((stallWhere == 1) || (stallWhere == 3)) { LOG.info("Stalling in phase 2 for abortTransaction"); - Thread.sleep(300000); // Initially set to run every 5 min + boolean loopBack = true; + try { + Thread.sleep(300000); // Initially set to run every 5 min + } catch(InterruptedException ie) { + loopBack = true; + } while (loopBack); } try { @@ -436,11 +420,11 @@ public class HBaseTxClient { synchronized(mapLock) { mapTransactionStates.remove(transactionID); } - LOG.error("Returning from HBaseTxClient:abortTransaction, txid: " + transactionID + " retval: EXCEPTION"); + LOG.error("Returning from HBaseTxClient:abortTransaction, txid: " + transactionID + " retval: EXCEPTION", e); return TransReturnCode.RET_EXCEPTION.getShort(); } catch (UnsuccessfulDDLException ddle) { - LOG.error("FATAL DDL Exception from HBaseTxClient:abort, WAITING INDEFINETLY !! retval: " + TransReturnCode.RET_EXCEPTION.toString() + " UnsuccessfulDDLException" + " txid: " + transactionID); + LOG.error("FATAL DDL Exception from HBaseTxClient:abort, WAITING INDEFINETLY !! retval: " + TransReturnCode.RET_EXCEPTION.toString() + " UnsuccessfulDDLException" + " txid: " + transactionID, ddle); //Reaching here means several attempts to perform the DDL operation has failed in abort phase. //Generally if only DML operation is involved, returning error causes TM to call completeRequest() @@ -452,7 +436,13 @@ public class HBaseTxClient { Object commitDDLLock = new Object(); synchronized(commitDDLLock) { - commitDDLLock.wait(); + boolean loopBack = false; + try { + commitDDLLock.wait(); + } catch(InterruptedException ie) { + LOG.warn("Interrupting commitDDLLock.wait, but retrying ", ie); + loopBack = true; + } while (loopBack); } return TransReturnCode.RET_EXCEPTION.getShort(); } @@ -470,7 +460,7 @@ public class HBaseTxClient { return TransReturnCode.RET_OK.getShort(); } - public short prepareCommit(long transactionId) throws Exception { + public short prepareCommit(long transactionId) throws IOException { if (LOG.isDebugEnabled()) LOG.debug("Enter prepareCommit, txid: " + transactionId); if (LOG.isTraceEnabled()) LOG.trace("mapTransactionStates " + mapTransactionStates + " entries " + mapTransactionStates.size()); TransactionState ts = mapTransactionStates.get(transactionId); @@ -503,20 +493,16 @@ public class HBaseTxClient { return TransReturnCode.RET_EXCEPTION.getShort(); } } catch (CommitUnsuccessfulException e) { - LOG.error("Returning from HBaseTxClient:prepareCommit, txid: " + transactionId + " retval: " + TransReturnCode.RET_NOCOMMITEX.toString() + " CommitUnsuccessfulException"); + LOG.error("Returning from HBaseTxClient:prepareCommit, txid: " + transactionId + " retval: " + TransReturnCode.RET_NOCOMMITEX.toString() + " CommitUnsuccessfulException", e); return TransReturnCode.RET_NOCOMMITEX.getShort(); } catch (IOException e) { - LOG.error("Returning from HBaseTxClient:prepareCommit, txid: " + transactionId + " retval: " + TransReturnCode.RET_IOEXCEPTION.toString() + " IOException"); + LOG.error("Returning from HBaseTxClient:prepareCommit, txid: " + transactionId + " retval: " + TransReturnCode.RET_IOEXCEPTION.toString() + " IOException", e); return TransReturnCode.RET_IOEXCEPTION.getShort(); } - catch (Exception e) { - LOG.error("Returning from HBaseTxClient:prepareCommit, txid: " + transactionId + " retval: " + TransReturnCode.RET_NOCOMMITEX.toString() + " Exception " + e); - return TransReturnCode.RET_NOCOMMITEX.getShort(); - } } - public short doCommit(long transactionId) throws Exception { + public short doCommit(long transactionId) throws IOException, CommitUnsuccessfulException { if (LOG.isDebugEnabled()) LOG.debug("Enter doCommit, txid: " + transactionId); TransactionState ts = mapTransactionStates.get(transactionId); @@ -534,8 +520,8 @@ public class HBaseTxClient { idServer.id(ID_TM_SERVER_TIMEOUT, commitId); if (LOG.isTraceEnabled()) LOG.trace("doCommit idServer.id returned: " + commitId.val); } catch (IdTmException exc) { - LOG.error("doCommit: IdTm threw exception " + exc); - throw new CommitUnsuccessfulException("doCommit: IdTm threw exception " + exc); + LOG.error("doCommit: IdTm threw exception " , exc); + throw new CommitUnsuccessfulException("doCommit: IdTm threw exception " , exc); } } @@ -548,24 +534,29 @@ public class HBaseTxClient { if (useTlog) { tLog.putSingleRecord(transactionId, commitIdVal, "COMMITTED", ts.getParticipatingRegions(), true); } - } catch(Exception e) { - LOG.error("Returning from HBaseTxClient:doCommit, txid: " + transactionId + " tLog.putRecord: EXCEPTION " + e); + } catch(IOException e) { + LOG.error("Returning from HBaseTxClient:doCommit, txid: " + transactionId + " tLog.putRecord: EXCEPTION ", e); return TransReturnCode.RET_EXCEPTION.getShort(); } if ((stallWhere == 2) || (stallWhere == 3)) { LOG.info("Stalling in phase 2 for doCommit"); - Thread.sleep(300000); // Initially set to run every 5 min + boolean loopBack = false; + try { + Thread.sleep(300000); // Initially set to run every 5 min + } catch(InterruptedException ie) { + loopBack = true; + } while (loopBack); } try { trxManager.doCommit(ts); } catch (CommitUnsuccessfulException e) { - LOG.error("Returning from HBaseTxClient:doCommit, retval: " + TransReturnCode.RET_EXCEPTION.toString() + " IOException" + " txid: " + transactionId); + LOG.error("Returning from HBaseTxClient:doCommit, retval: " + TransReturnCode.RET_EXCEPTION.toString() + " IOException" + " txid: " + transactionId, e); return TransReturnCode.RET_EXCEPTION.getShort(); } catch (UnsuccessfulDDLException ddle) { - LOG.error("FATAL DDL Exception from HBaseTxClient:doCommit, WAITING INDEFINETLY !! retval: " + TransReturnCode.RET_EXCEPTION.toString() + " UnsuccessfulDDLException" + " txid: " + transactionId); + LOG.error("FATAL DDL Exception from HBaseTxClient:doCommit, WAITING INDEFINETLY !! retval: " + TransReturnCode.RET_EXCEPTION.toString() + " UnsuccessfulDDLException" + " txid: " + transactionId, ddle); //Reaching here means several attempts to perform the DDL operation has failed in commit phase. //Generally if only DML operation is involved, returning error causes TM to call completeRequest() @@ -577,7 +568,13 @@ public class HBaseTxClient { Object commitDDLLock = new Object(); synchronized(commitDDLLock) { - commitDDLLock.wait(); + boolean loopBack = false; + try { + commitDDLLock.wait(); + } catch(InterruptedException ie) { + LOG.warn("Interrupting commitDDLLock.wait, but retrying ", ie); + loopBack = true; + } while (loopBack); } return TransReturnCode.RET_EXCEPTION.getShort(); } @@ -597,7 +594,7 @@ public class HBaseTxClient { return TransReturnCode.RET_OK.getShort(); } - public short completeRequest(long transactionId) throws Exception { + public short completeRequest(long transactionId) throws IOException, CommitUnsuccessfulException { if (LOG.isDebugEnabled()) LOG.debug("Enter completeRequest, txid: " + transactionId); TransactionState ts = mapTransactionStates.get(transactionId); @@ -605,16 +602,17 @@ public class HBaseTxClient { LOG.error("Returning from HBaseTxClient:completeRequest, (null tx) retval: " + TransReturnCode.RET_NOTX.toString() + " txid: " + transactionId); return TransReturnCode.RET_NOTX.getShort(); } - + + boolean loopBack = false; try { if (LOG.isTraceEnabled()) LOG.trace("TEMP completeRequest Calling CompleteRequest() Txid :" + transactionId); ts.completeRequest(); - } catch(Exception e) { - LOG.error("Returning from HBaseTxClient:completeRequest, ts.completeRequest: txid: " + transactionId + ", EXCEPTION: " + e); - throw new Exception("Exception during completeRequest, unable to commit. Exception: " + e); - } + } catch(InterruptedException ie) { + LOG.warn("Interrupting HBaseTxClient:completeRequest but retrying, ts.completeRequest: txid: " + transactionId + ", EXCEPTION: ", ie); + loopBack = true; + } while (loopBack); synchronized(mapLock) { mapTransactionStates.remove(transactionId); @@ -624,7 +622,7 @@ public class HBaseTxClient { return TransReturnCode.RET_OK.getShort(); } - public short tryCommit(long transactionId) throws Exception { + public short tryCommit(long transactionId) throws IOException, CommitUnsuccessfulException { if (LOG.isDebugEnabled()) LOG.debug("Enter tryCommit, txid: " + transactionId); short err, commitErr, abortErr = TransReturnCode.RET_OK.getShort(); @@ -650,10 +648,10 @@ public class HBaseTxClient { if (err != TransReturnCode.RET_OK.getShort()){ if (LOG.isDebugEnabled()) LOG.debug("tryCommit completeRequest for transaction " + transactionId + " failed with error " + err); } - } catch(Exception e) { + } catch(IOException e) { mapTransactionStates.remove(transactionId); - LOG.error("Returning from HBaseTxClient:tryCommit, ts: EXCEPTION" + " txid: " + transactionId); - throw new Exception("Exception " + e + "during tryCommit, unable to commit."); + LOG.error("Returning from HBaseTxClient:tryCommit, ts: EXCEPTION" + " txid: " + transactionId, e); + throw new IOException("Exception during tryCommit, unable to commit.", e); } synchronized(mapLock) { @@ -664,10 +662,10 @@ public class HBaseTxClient { return TransReturnCode.RET_OK.getShort(); } - public short callCreateTable(long transactionId, byte[] pv_htbldesc, Object[] beginEndKeys) throws Exception + public short callCreateTable(long transactionId, byte[] pv_htbldesc, Object[] beginEndKeys) throws IOException { TransactionState ts; - HTableDescriptor htdesc; + HTableDescriptor htdesc = null; if (LOG.isTraceEnabled()) LOG.trace("Enter callCreateTable, txid: [" + transactionId + "], htbldesc bytearray: " + pv_htbldesc + "desc in hex: " + Hex.encodeHexString(pv_htbldesc)); @@ -676,40 +674,17 @@ public class HBaseTxClient { LOG.error("Returning from HBaseTxClient:callCreateTable, (null tx) retval: " + TransReturnCode.RET_NOTX.getShort() + " txid: " + transactionId); return TransReturnCode.RET_NOTX.getShort(); } - try { htdesc = HTableDescriptor.parseFrom(pv_htbldesc); + } catch (DeserializationException de) { + LOG.error("Error while getting HTableDescriptor caused by : ", de); + throw new IOException("Error while getting HTableDescriptor caused by : ", de); } - catch(Exception e) { - if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callCreateTable exception in htdesc parseFrom, retval: " + - TransReturnCode.RET_EXCEPTION.toString() + - " txid: " + transactionId + - " DeserializationException: " + e); - StringWriter sw = new StringWriter(); - PrintWriter pw = new PrintWriter(sw); - e.printStackTrace(pw); - LOG.error(sw.toString()); - - throw new Exception("DeserializationException in callCreateTable parseFrom, unable to send callCreateTable"); - } - - try { - trxManager.createTable(ts, htdesc, beginEndKeys); - } - catch (Exception cte) { - if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callCreateTable exception trxManager.createTable, retval: " + - TransReturnCode.RET_EXCEPTION.toString() +" txid: " + transactionId +" Exception: " + cte); - StringWriter sw = new StringWriter(); - PrintWriter pw = new PrintWriter(sw); - cte.printStackTrace(pw); - LOG.error("HBaseTxClient createTable call error: " + sw.toString()); - - throw new Exception("createTable call error"); - } + trxManager.createTable(ts, htdesc, beginEndKeys); return TransReturnCode.RET_OK.getShort(); } - public short callAlterTable(long transactionId, byte[] pv_tblname, Object[] tableOptions) throws Exception + public short callAlterTable(long transactionId, byte[] pv_tblname, Object[] tableOptions) throws IOException { TransactionState ts; String strTblName = new String(pv_tblname, "UTF-8"); @@ -722,23 +697,11 @@ public class HBaseTxClient { return TransReturnCode.RET_NOTX.getShort(); } - try { - trxManager.alterTable(ts, strTblName, tableOptions); - } - catch (Exception cte) { - if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callAlterTable exception trxManager.alterTable, retval: " + - TransReturnCode.RET_EXCEPTION.toString() +" txid: " + transactionId +" Exception: " + cte); - StringWriter sw = new StringWriter(); - PrintWriter pw = new PrintWriter(sw); - cte.printStackTrace(pw); - LOG.error("HBaseTxClient alterTable call error: " + sw.toString()); - - throw new Exception("alterTable call error"); - } + trxManager.alterTable(ts, strTblName, tableOptions); return TransReturnCode.RET_OK.getShort(); } - public short callRegisterTruncateOnAbort(long transactionId, byte[] pv_tblname) throws Exception + public short callRegisterTruncateOnAbort(long transactionId, byte[] pv_tblname) throws IOException { TransactionState ts; String strTblName = new String(pv_tblname, "UTF-8"); @@ -751,23 +714,11 @@ public class HBaseTxClient { return TransReturnCode.RET_NOTX.getShort(); } - try { - trxManager.registerTruncateOnAbort(ts, strTblName); - } - catch (Exception e) { - if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callRegisterTruncateOnAbort exception trxManager.registerTruncateOnAbort, retval: " + - TransReturnCode.RET_EXCEPTION.toString() +" txid: " + transactionId +" Exception: " + e); - StringWriter sw = new StringWriter(); - PrintWriter pw = new PrintWriter(sw); - e.printStackTrace(pw); - String msg = "HBaseTxClient registerTruncateOnAbort call error "; - LOG.error(msg + " : " + sw.toString()); - throw new Exception(msg); - } + trxManager.registerTruncateOnAbort(ts, strTblName); return TransReturnCode.RET_OK.getShort(); } - public short callDropTable(long transactionId, byte[] pv_tblname) throws Exception + public short callDropTable(long transactionId, byte[] pv_tblname) throws IOException { TransactionState ts; String strTblName = new String(pv_tblname, "UTF-8"); @@ -780,17 +731,7 @@ public class HBaseTxClient { return TransReturnCode.RET_NOTX.getShort(); } - try { - trxManager.dropTable(ts, strTblName); - } - catch (Exception cte) { - if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callDropTable exception trxManager.dropTable, retval: " + - TransReturnCode.RET_EXCEPTION.toString() +" txid: " + transactionId +" Exception: " + cte); - StringWriter sw = new StringWriter(); - PrintWriter pw = new PrintWriter(sw); - cte.printStackTrace(pw); - LOG.error("HBaseTxClient dropTable call error: " + sw.toString()); - } + trxManager.dropTable(ts, strTblName); return TransReturnCode.RET_OK.getShort(); } @@ -799,7 +740,7 @@ public class HBaseTxClient { int pv_port, byte[] pv_hostname, long pv_startcode, - byte[] pv_regionInfo) throws Exception { + byte[] pv_regionInfo) throws IOException { String hostname = new String(pv_hostname); if (LOG.isTraceEnabled()) LOG.trace("Enter callRegisterRegion, txid: [" + transactionId + "], startId: " + startId + ", port: " + pv_port + ", hostname: " + hostname + ", reg info len: " + pv_regionInfo.length + " " + new String(pv_regionInfo, "UTF-8")); @@ -807,16 +748,9 @@ public class HBaseTxClient { HRegionInfo lv_regionInfo; try { lv_regionInfo = HRegionInfo.parseFrom(pv_regionInfo); - } - catch (Exception de) { - if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callRegisterRegion exception in lv_regionInfo parseFrom, retval: " + - TransReturnCode.RET_EXCEPTION.toString() + " txid: " + transactionId + " DeserializationException: " + de); - StringWriter sw = new StringWriter(); - PrintWriter pw = new PrintWriter(sw); - de.printStackTrace(pw); - LOG.error(sw.toString()); - - throw new Exception("DeserializationException in lv_regionInfo parseFrom, unable to register region"); + } catch (DeserializationException de) { + LOG.error("Error while getting regionInfo caused by : ", de); + throw new IOException("Error while getting regionInfo caused by : ", de); } // TODO Not in CDH 5.1 ServerName lv_servername = ServerName.valueOf(hostname, pv_port, pv_startcode); @@ -832,10 +766,9 @@ public class HBaseTxClient { ") not found in mapTransactionStates of size: " + mapTransactionStates.size()); try { ts = trxManager.beginTransaction(transactionId); - } - catch (IdTmException exc) { - LOG.error("HBaseTxClient: beginTransaction for tx (" + transactionId + ") caught exception " + exc); - throw new IdTmException("HBaseTxClient: beginTransaction for tx (" + transactionId + ") caught exception " + exc); + } catch (IdTmException ite) { + LOG.error("Begin Transaction Error caused by : ", ite); + throw new IOException("Begin Transaction Error caused by :", ite); } synchronized (mapLock) { TransactionState ts2 = mapTransactionStates.get(transactionId); @@ -863,7 +796,7 @@ public class HBaseTxClient { trxManager.registerRegion(ts, regionLocation); } catch (IOException e) { LOG.error("HBaseTxClient:callRegisterRegion exception in registerRegion call, txid: " + transactionId + - " retval: " + TransReturnCode.RET_EXCEPTION.toString() + " IOException " + e); + " retval: " + TransReturnCode.RET_EXCEPTION.toString() + " IOException " , e); return TransReturnCode.RET_EXCEPTION.getShort(); } @@ -878,7 +811,7 @@ public class HBaseTxClient { return TransReturnCode.RET_OK.getShort(); } - public int participatingRegions(long transactionId) throws Exception { + public int participatingRegions(long transactionId) throws IOException { if (LOG.isTraceEnabled()) LOG.trace("Enter participatingRegions, txid: " + transactionId); TransactionState ts = mapTransactionStates.get(transactionId); if(ts == null) { @@ -900,17 +833,11 @@ public class HBaseTxClient { return participants; } - public long addControlPoint() throws Exception { + public long addControlPoint() throws IOException { if (LOG.isTraceEnabled()) LOG.trace("Enter addControlPoint"); long result = 0L; - try { - if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient calling tLog.addControlPoint with mapsize " + mapTransactionStates.size()); - result = tLog.addControlPoint(mapTransactionStates); - } - catch(IOException e){ - LOG.error("addControlPoint IOException " + e); - throw e; - } + if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient calling tLog.addControlPoint with mapsize " + mapTransactionStates.size()); + result = tLog.addControlPoint(mapTransactionStates); Long lowestStartId = Long.MAX_VALUE; for(ConcurrentHashMap.Entry entry : mapTransactionStates.entrySet()){ TransactionState value; @@ -987,7 +914,7 @@ public class HBaseTxClient { this.continueThread = false; } - private void addRegionToTS(String hostnamePort, byte[] regionInfo, TransactionState ts) throws Exception{ + private void addRegionToTS(String hostnamePort, byte[] regionInfo, TransactionState ts) throws IOException{ HRegionInfo regionInfoLoc; // = new HRegionInfo(); final byte [] delimiter = ",".getBytes(); String[] result = hostnamePort.split(new String(delimiter), 3); @@ -998,21 +925,14 @@ public class HBaseTxClient { String hostname = result[0]; int port = Integer.parseInt(result[1]); try { - regionInfoLoc = HRegionInfo.parseFrom(regionInfo); - } - catch(Exception e) { - LOG.error("Unable to parse region byte array, " + e); - throw e; + regionInfoLoc = HRegionInfo.parseFrom(regionInfo); + } catch (DeserializationException de) { + throw new IOException(de); } - /* ByteArrayInputStream lv_bis = new ByteArrayInputStream(regionInfo); DataInputStream lv_dis = new DataInputStream(lv_bis); - try { - regionInfoLoc.readFields(lv_dis); - } catch (Exception e) { - throw new Exception(); - } - */ + regionInfoLoc.readFields(lv_dis); + //HBase98 TODO: need to set the value of startcode correctly //HBase98 TODO: Not in CDH 5.1: ServerName lv_servername = ServerName.valueOf(hostname, port, 0); @@ -1033,17 +953,15 @@ public class HBaseTxClient { Map regions = null; Map transactionStates = new HashMap(); + boolean loopBack = false; + do { try { - regions = zookeeper.checkForRecovery(); - } catch (Exception e) { - if (regions != null) { // ignore no object returned by zookeeper.checkForRecovery - LOG.error("An ERROR occurred while checking for regions to recover. " + "TM: " + tmID); - StringWriter sw = new StringWriter(); - PrintWriter pw = new PrintWriter(sw); - e.printStackTrace(pw); - LOG.error(sw.toString()); - } + regions = zookeeper.checkForRecovery(); + } catch(InterruptedException ie) { + loopBack = true; + } catch (KeeperException ze) { } + } while (loopBack); if(regions != null) { skipSleep = true; @@ -1090,77 +1008,39 @@ public class HBaseTxClient { }catch (NotServingRegionException e) { TxRecoverList = null; LOG.error("TRAF RCOV THREAD:NotServingRegionException calling recoveryRequest. regionBytes: " + new String(regionBytes) + - " TM: " + tmID + " hostnamePort: " + hostnamePort); + " TM: " + tmID + " hostnamePort: " + hostnamePort, e); - try { // First delete the zookeeper entry LOG.error("TRAF RCOV THREAD:recoveryRequest. Deleting region entry Entry: " + regionEntry); zookeeper.deleteRegionEntry(regionEntry); - } - catch (Exception e2) { - LOG.error("TRAF RCOV THREAD:Error calling deleteRegionEntry. regionEntry key: " + regionEntry.getKey() + " regionEntry value: " + - new String(regionEntry.getValue()) + " exception: " + e2); - } - try { // Create a local HTable object using the regionInfo HTable table = new HTable(config, HRegionInfo.parseFrom(regionBytes).getTable().getNameAsString()); - try { - // Repost a zookeeper entry for all current regions in the table - zookeeper.postAllRegionEntries(table); - } - catch (Exception e2) { - LOG.error("TRAF RCOV THREAD:Error calling postAllRegionEntries. table: " + new String(table.getTableName()) + " exception: " + e2); - } - }// try - catch (Exception e1) { - LOG.error("TRAF RCOV THREAD:recoveryRequest exception in new HTable " + HRegionInfo.parseFrom(regionBytes).getTable().getNameAsString() + " Exception: " + e1); - } + // Repost a zookeeper entry for all current regions in the table + zookeeper.postAllRegionEntries(table); }// NotServingRegionException catch (TableNotFoundException tnfe) { // In this case there is nothing to recover. We just need to delete the region entry. - try { // First delete the zookeeper entry LOG.warn("TRAF RCOV THREAD:TableNotFoundException calling txnManager.recoveryRequest. " + "TM: " + tmID + " regionBytes: [" + regionBytes + "]. Deleting zookeeper region entry. \n exception: " + tnfe); zookeeper.deleteRegionEntry(regionEntry); - } - catch (Exception e2) { - LOG.error("TRAF RCOV THREAD:Error calling deleteRegionEntry. regionEntry key: " + regionEntry.getKey() + " regionEntry value: " + - new String(regionEntry.getValue()) + " exception: " + e2); - } }// TableNotFoundException catch (DeserializationException de) { // We are unable to parse the region info from ZooKeeper We just need to delete the region entry. - try { // First delete the zookeeper entry LOG.warn("TRAF RCOV THREAD:DeserializationException calling txnManager.recoveryRequest. " + "TM: " + tmID + " regionBytes: [" + regionBytes + "]. Deleting zookeeper region entry. \n exception: " + de); zookeeper.deleteRegionEntry(regionEntry); - } - catch (Exception e2) { - LOG.error("TRAF RCOV THREAD:Error calling deleteRegionEntry. regionEntry key: " + regionEntry.getKey() + " regionEntry value: " + - new String(regionEntry.getValue()) + " exception: " + e2); - } - }// DeserializationException - catch (Exception e) { - LOG.error("TRAF RCOV THREAD:An ERROR occurred calling txnManager.recoveryRequest. " + "TM: " + - tmID + " regionBytes: [" + regionBytes + "] exception: " + e); - } + if (TxRecoverList != null) { if (LOG.isDebugEnabled()) LOG.trace("TRAF RCOV THREAD:size of TxRecoverList " + TxRecoverList.size()); if (TxRecoverList.size() == 0) { - try { // First delete the zookeeper entry LOG.warn("TRAF RCOV THREAD:Leftover Znode calling txnManager.recoveryRequest. " + "TM: " + tmID + " regionBytes: [" + regionBytes + "]. Deleting zookeeper region entry. "); zookeeper.deleteRegionEntry(regionEntry); - } - catch (Exception e2) { - LOG.error("TRAF RCOV THREAD:Error calling deleteRegionEntry. regionEntry key: " + regionEntry.getKey() + " regionEntry value: " + - new String(regionEntry.getValue()) + " exception: " + e2); - } } for (Long txid : TxRecoverList) { TransactionState ts = transactionStates.get(txid); @@ -1171,22 +1051,12 @@ public class HBaseTxClient { if(hbtx.useDDLTrans){ TmDDL tmDDL = hbtx.getTmDDL(); StringBuilder state = new StringBuilder (); - try { - tmDDL.getState(txid,state); - } - catch(Exception egetState){ - LOG.error("TRAF RCOV THREAD:Error calling TmDDL getState." + egetState); - } + tmDDL.getState(txid,state); if(state.toString().equals("VALID")) ts.setDDLTx(true); } } - try { - this.addRegionToTS(hostnamePort, regionBytes, ts); - } catch (Exception e) { - LOG.error("TRAF RCOV THREAD:Unable to add region to TransactionState, region info: " + new String(regionBytes)); - e.printStackTrace(); - } + this.addRegionToTS(hostnamePort, regionBytes, ts); transactionStates.put(txid, ts); } } @@ -1219,24 +1089,20 @@ public class HBaseTxClient { txnManager.abort(ts); } - }catch (UnsuccessfulDDLException ddle) { - LOG.error("UnsuccessfulDDLException encountered by Recovery Thread. Registering for retry. txID: " + txID + "Exception " + ddle); - ddle.printStackTrace(); + } catch (UnsuccessfulDDLException ddle) { + LOG.error("UnsuccessfulDDLException encountered by Recovery Thread. Registering for retry. txID: " + txID + "Exception " , ddle); //Note that there may not be anymore redrive triggers from region server point of view for DDL operation. //Register this DDL transaction for subsequent redrive from Audit Control Event. //TODO: Launch a new Redrive Thread out of auditControlPoint(). TmDDL tmDDL = hbtx.getTmDDL(); - try { - tmDDL.setState(txID,"REDRIVE"); - } - catch(Exception e){ - LOG.error("TRAF RCOV THREAD:Error calling TmDDL putRow Redrive" + e); - } - } - catch (Exception e) { - LOG.error("Unable to get audit record for tx: " + txID + ", audit is throwing exception."); - e.printStackTrace(); + tmDDL.setState(txID,"REDRIVE"); + LOG.error("TRAF RCOV THREAD:Error calling TmDDL putRow Redrive"); + } catch (CommitUnsuccessfulException cue) { + LOG.error("CommitUnsuccessfulException encountered by Recovery Thread. Registering for retry. txID: " + txID + "Exception " , cue); + TmDDL tmDDL = hbtx.getTmDDL(); + tmDDL.setState(txID,"REDRIVE"); + LOG.error("TRAF RCOV THREAD:Error calling TmDDL putRow Redrive"); } } @@ -1257,17 +1123,40 @@ public class HBaseTxClient { } } retryCount = 0; - } catch (Exception e) { + } catch (InterruptedException e) { LOG.error("Error in recoveryThread: " + e); } + } catch (IOException e) { + int possibleRetries = 4; + LOG.error("Caught recovery thread exception for tmid: " + tmID + " retries: " + retryCount, e); + + retryCount++; + if(retryCount > possibleRetries) { + LOG.error("Recovery thread failure, aborting process"); + System.exit(4); + } + + try { + Thread.sleep(SLEEP_DELAY / possibleRetries); + } catch(InterruptedException se) { + } + } catch (KeeperException ze) { + int possibleRetries = 4; + LOG.error("Caught recovery thread exception for tmid: " + tmID + " retries: " + retryCount, ze); - } catch (Exception e) { + retryCount++; + if(retryCount > possibleRetries) { + LOG.error("Recovery thread failure, aborting process"); + System.exit(4); + } + + try { + Thread.sleep(SLEEP_DELAY / possibleRetries); + } catch(InterruptedException se) { + } + } catch (DeserializationException de) { int possibleRetries = 4; - LOG.error("Caught recovery thread exception for tmid: " + tmID + " retries: " + retryCount); - StringWriter sw = new StringWriter(); - PrintWriter pw = new PrintWriter(sw); - e.printStackTrace(pw); - LOG.error(sw.toString()); + LOG.error("Caught recovery thread exception for tmid: " + tmID + " retries: " + retryCount, de); retryCount++; if(retryCount > possibleRetries) { @@ -1277,8 +1166,7 @@ public class HBaseTxClient { try { Thread.sleep(SLEEP_DELAY / possibleRetries); - } catch(Exception se) { - LOG.error(se); + } catch(InterruptedException se) { } } } @@ -1294,7 +1182,7 @@ public class HBaseTxClient { // callRequestRegionInfo // Purpose: Prepares HashMapArray class to get region information //-------------------------------------------------------------------------------- - public HashMapArray callRequestRegionInfo() throws Exception { + public HashMapArray callRequestRegionInfo() throws IOException { String tablename, encoded_region_name, region_name, is_offline, region_id, hostname, port, thn; @@ -1308,7 +1196,6 @@ public class HBaseTxClient { HashMapArray hm = new HashMapArray(); - try{ for(ConcurrentHashMap.Entry entry : mapTransactionStates.entrySet()){ key = entry.getKey(); value = entry.getValue(); @@ -1375,10 +1262,6 @@ public class HBaseTxClient { tnum = tnum + 1; } - }catch(Exception e){ - if (LOG.isTraceEnabled()) LOG.trace("Error in getting region info. Map might be empty. Please ensure sqlci insert was done"); - } - if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient::callRequestRegionInfo:: end size: " + hm.getSize()); return hm; }