[TRAFODION-1988] Better Java exception handling in Trafodion - Part2
This time the focus is on hbasetmlib2 improvements
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/f585bc80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/f585bc80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/f585bc80
Branch: refs/heads/master
Commit: f585bc80f73cd4a303b87be298436e87b405f725
Parents: a4ec8cd
Author: selvaganesang <selva.govindarajan@esgyn.com>
Authored: Fri Jun 24 23:48:04 2016 +0000
Committer: selvaganesang <selva.govindarajan@esgyn.com>
Committed: Sat Jun 25 04:52:03 2016 +0000
----------------------------------------------------------------------
.../hbase/client/transactional/TmDDL.java | 65 +--
.../trafodion/dtm/HBaseAuditControlPoint.java | 150 ++----
.../main/java/org/trafodion/dtm/HBaseTmZK.java | 22 +-
.../java/org/trafodion/dtm/HBaseTxClient.java | 501 +++++++------------
.../java/org/trafodion/dtm/TmAuditTlog.java | 318 +++---------
.../main/java/org/trafodion/dtm/TrafInfo.java | 12 +-
.../java/org/trafodion/sql/HBaseClient.java | 14 +-
.../java/org/trafodion/sql/HBulkLoadClient.java | 23 +-
.../java/org/trafodion/sql/HTableClient.java | 21 +-
.../java/org/trafodion/sql/OrcFileReader.java | 52 +-
.../org/trafodion/sql/SequenceFileWriter.java | 75 ++-
11 files changed, 398 insertions(+), 855 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f585bc80/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java
index 783b70c..72f0b9f 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java
@@ -68,7 +68,7 @@ public class TmDDL {
private static Object tablePutLock; // Lock for synchronizing table.put operations
private static HTable table;
- public TmDDL (Configuration config) throws Exception {
+ public TmDDL (Configuration config) throws IOException {
this.config = config;
this.dtmid = Integer.parseInt(config.get("dtmid"));
@@ -76,26 +76,14 @@ public class TmDDL {
if (LOG.isTraceEnabled()) LOG.trace("Enter TmDDL constructor for dtmid: " + dtmid);
- try {
- hbadmin = new HBaseAdmin(config);
- } catch(Exception e) {
- LOG.error("Unable to obtain HBaseAdmin accessor, exiting with exception: " + e);
- e.printStackTrace();
- System.exit(1);
- }
+ hbadmin = new HBaseAdmin(config);
boolean tDDLTableExists = hbadmin.tableExists(tablename);
if(tDDLTableExists==false && dtmid ==0) {
- try {
HTableDescriptor desc = new HTableDescriptor(tablename);
desc.addFamily(new HColumnDescriptor(TDDL_FAMILY));
hbadmin.createTable(desc);
- } catch(Exception e) {
- LOG.error("Unable to create TDDL table, exception: " + e);
- e.printStackTrace();
- throw e;
- }
}
tablePutLock = new Object();
@@ -217,40 +205,25 @@ public class TmDDL {
if (LOG.isTraceEnabled()) LOG.trace("TmDDL putRow exit, TxId:" + transid);
}
- public void setState(final long transid, final String state) throws Exception {
+ public void setState(final long transid, final String state) throws IOException {
long threadId = Thread.currentThread().getId();
if (LOG.isTraceEnabled()) LOG.trace("TmDDL setState start in thread: " + threadId + "TxId:" + transid + "State :" + state);
Put p = new Put(Bytes.toBytes(transid));
p.add(TDDL_FAMILY, TDDL_STATE, Bytes.toBytes(state));
- try {
- synchronized (tablePutLock) {
- try {
- if (LOG.isTraceEnabled()) LOG.trace("TmDDL setState method. table.put. TxId: " + transid + "Put:" + p );
-
- table.put(p);
- }
- catch (Exception e2){
- //Avoiding logging within a lock. Throwing Exception.
- throw e2;
- }
- } // End global synchronization
- }
- catch (Exception e) {
- //create record of the exception
- LOG.error("TmDDL setState method. tablePutLock or Table.put Exception. TxID: " + transid + "Exception :" + e);
- throw e;
- }
+ synchronized (tablePutLock) {
+ if (LOG.isTraceEnabled()) LOG.trace("TmDDL setState method. table.put. TxId: " + transid + "Put:" + p );
+ table.put(p);
+ } // End global synchronization
if (LOG.isTraceEnabled()) LOG.trace("TmDDL setState exit, TxID:" + transid);
}
public void getRow(final long lvTransid, StringBuilder state, ArrayList<String> createList, ArrayList<String> dropList, ArrayList<String> truncateList)
- throws IOException, Exception {
+ throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("TmDDL getRow start, TxID: " + lvTransid);
String recordString = null;
StringTokenizer st = null;
byte [] value = null;
- try {
Get g = new Get(Bytes.toBytes(lvTransid));
Result r = table.get(g);
@@ -296,18 +269,11 @@ public class TmDDL {
state.append(Bytes.toString(value));
}
}
-
- }
- catch(Exception e){
- LOG.error("TmDDL getRow Exception, TxId: " + lvTransid + "Exception:" + e);
- throw e;
- }
}
- public void getState(final long lvTransid, StringBuilder state) throws IOException, Exception {
+ public void getState(final long lvTransid, StringBuilder state) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("TmDDL getState start, TxID:" + lvTransid);
byte [] value = null;
- try {
Get g = new Get(Bytes.toBytes(lvTransid));
Result r = table.get(g);
@@ -327,22 +293,11 @@ public class TmDDL {
{
state.append("INVALID");
}
- }
- catch(Exception e){
- LOG.error("TmDDL getState Exception, TxID: " + lvTransid + "Exception: " + e);
- throw e;
- }
}
- public void deleteRow(final long lvTransid) throws IOException, Exception {
+ public void deleteRow(final long lvTransid) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("TmDDL deleteRow start, TxID: " + lvTransid);
- try {
Delete d = new Delete(Bytes.toBytes(lvTransid));
table.delete(d);
- }
- catch(Exception e){
- LOG.error("TmDDL deleteRow Exception, TxID: " + lvTransid + "Exception:" + e);
- throw e;
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f585bc80/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java
index e50fe3b..12128f5 100644
--- a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java
+++ b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java
@@ -96,15 +96,10 @@ public class HBaseAuditControlPoint {
HColumnDescriptor hcol = new HColumnDescriptor(CONTROL_POINT_FAMILY);
disableBlockCache = false;
- try {
- String blockCacheString = System.getenv("TM_TLOG_DISABLE_BLOCK_CACHE");
- if (blockCacheString != null){
- disableBlockCache = (Integer.parseInt(blockCacheString) != 0);
- if (LOG.isDebugEnabled()) LOG.debug("disableBlockCache != null");
- }
- }
- catch (Exception e) {
- if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_DISABLE_BLOCK_CACHE is not in ms.env");
+ String blockCacheString = System.getenv("TM_TLOG_DISABLE_BLOCK_CACHE");
+ if (blockCacheString != null){
+ disableBlockCache = (Integer.parseInt(blockCacheString) != 0);
+ if (LOG.isDebugEnabled()) LOG.debug("disableBlockCache != null");
}
LOG.info("disableBlockCache is " + disableBlockCache);
if (disableBlockCache) {
@@ -115,15 +110,10 @@ public class HBaseAuditControlPoint {
admin = new HBaseAdmin(config);
useAutoFlush = true;
- try {
- String autoFlush = System.getenv("TM_TLOG_AUTO_FLUSH");
- if (autoFlush != null){
- useAutoFlush = (Integer.parseInt(autoFlush) != 0);
- if (LOG.isDebugEnabled()) LOG.debug("autoFlush != null");
- }
- }
- catch (Exception e) {
- if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_AUTO_FLUSH is not in ms.env");
+ String autoFlush = System.getenv("TM_TLOG_AUTO_FLUSH");
+ if (autoFlush != null){
+ useAutoFlush = (Integer.parseInt(autoFlush) != 0);
+ if (LOG.isDebugEnabled()) LOG.debug("autoFlush != null");
}
LOG.info("useAutoFlush is " + useAutoFlush);
@@ -140,22 +130,12 @@ public class HBaseAuditControlPoint {
LOG.error("Table " + CONTROL_POINT_TABLE_NAME + " already exists");
}
}
- try {
- if (LOG.isDebugEnabled()) LOG.debug("try new HTable");
- table = new HTable(config, desc.getName());
- table.setAutoFlushTo(this.useAutoFlush);
- }
- catch (IOException e) {
- LOG.error("new HTable IOException");
- }
+ if (LOG.isDebugEnabled()) LOG.debug("try new HTable");
+ table = new HTable(config, desc.getName());
+ table.setAutoFlushTo(this.useAutoFlush);
if (currControlPt == -1){
- try {
- currControlPt = getCurrControlPt();
- }
- catch (Exception e2) {
- if (LOG.isDebugEnabled()) LOG.debug("Exit getCurrControlPoint() exception " + e2);
- }
+ currControlPt = getCurrControlPt();
}
if (LOG.isDebugEnabled()) LOG.debug("currControlPt is " + currControlPt);
@@ -163,7 +143,7 @@ public class HBaseAuditControlPoint {
return;
}
- public long getCurrControlPt() throws Exception {
+ public long getCurrControlPt() throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("getCurrControlPt: start");
long highKey = -1;
if (LOG.isDebugEnabled()) LOG.debug("new Scan");
@@ -186,10 +166,6 @@ public class HBaseAuditControlPoint {
highKey = currKey;
}
}
- }
- catch (Exception e) {
- LOG.error("getCurrControlPt IOException" + e);
- e.printStackTrace();
} finally {
ss.close();
}
@@ -197,22 +173,16 @@ public class HBaseAuditControlPoint {
return highKey;
}
- public long putRecord(final long ControlPt, final long startingSequenceNumber) throws Exception {
+ public long putRecord(final long ControlPt, final long startingSequenceNumber) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("putRecord starting sequence number (" + String.valueOf(startingSequenceNumber) + ")");
String controlPtString = new String(String.valueOf(ControlPt));
Put p = new Put(Bytes.toBytes(controlPtString));
p.add(CONTROL_POINT_FAMILY, ASN_HIGH_WATER_MARK, Bytes.toBytes(String.valueOf(startingSequenceNumber)));
- try {
- if (LOG.isTraceEnabled()) LOG.trace("try table.put with starting sequence number " + startingSequenceNumber);
- table.put(p);
- if (useAutoFlush == false) {
- if (LOG.isTraceEnabled()) LOG.trace("flushing controlpoint record");
- table.flushCommits();
- }
- }
- catch (Exception e) {
- LOG.error("HBaseAuditControlPoint:putRecord Exception" + e);
- throw e;
+ if (LOG.isTraceEnabled()) LOG.trace("try table.put with starting sequence number " + startingSequenceNumber);
+ table.put(p);
+ if (useAutoFlush == false) {
+ if (LOG.isTraceEnabled()) LOG.trace("flushing controlpoint record");
+ table.flushCommits();
}
if (LOG.isTraceEnabled()) LOG.trace("HBaseAuditControlPoint:putRecord returning " + ControlPt);
return ControlPt;
@@ -243,21 +213,13 @@ public class HBaseAuditControlPoint {
long lvValue = -1;
Get g = new Get(Bytes.toBytes(controlPt));
String recordString;
- try {
- Result r = table.get(g);
- byte [] currValue = r.getValue(CONTROL_POINT_FAMILY, ASN_HIGH_WATER_MARK);
- try {
- recordString = new String (Bytes.toString(currValue));
- if (LOG.isDebugEnabled()) LOG.debug("recordString is " + recordString);
- lvValue = Long.parseLong(recordString, 10);
- }
- catch (NullPointerException e){
- if (LOG.isDebugEnabled()) LOG.debug("control point " + controlPt + " is not in the table");
- }
- }
- catch (IOException e){
- LOG.error("getRecord IOException");
- throw e;
+ Result r = table.get(g);
+ byte [] currValue = r.getValue(CONTROL_POINT_FAMILY, ASN_HIGH_WATER_MARK);
+ if (currValue != null)
+ {
+ recordString = new String (Bytes.toString(currValue));
+ if (LOG.isDebugEnabled()) LOG.debug("recordString is " + recordString);
+ lvValue = Long.parseLong(recordString, 10);
}
if (LOG.isTraceEnabled()) LOG.trace("getRecord - exit " + lvValue);
return lvValue;
@@ -276,19 +238,13 @@ public class HBaseAuditControlPoint {
+ CONTROL_POINT_FAMILY + " ASN_HIGH_WATER_MARK " + ASN_HIGH_WATER_MARK);
byte [] currValue = r.getValue(CONTROL_POINT_FAMILY, ASN_HIGH_WATER_MARK);
if (LOG.isDebugEnabled()) LOG.debug("Starting asn setting recordString ");
- String recordString = "";
- try {
- recordString = new String(currValue);
- }
- catch (NullPointerException e) {
- if (LOG.isDebugEnabled()) LOG.debug("getStartingAuditSeqNum recordString is null");
+ if (currValue == null)
lvAsn = 1;
- if (LOG.isDebugEnabled()) LOG.debug("Starting asn is 1");
- return lvAsn;
+ else
+ {
+ String recordString = new String(currValue);
+ lvAsn = Long.valueOf(recordString);
}
- if (LOG.isDebugEnabled()) LOG.debug("getStartingAuditSeqNum recordString is good");
- if (LOG.isDebugEnabled()) LOG.debug("Starting asn for control point " + currControlPt + " is " + recordString);
- lvAsn = Long.valueOf(recordString);
if (LOG.isTraceEnabled()) LOG.trace("getStartingAuditSeqNum - exit returning " + lvAsn);
return lvAsn;
}
@@ -323,19 +279,10 @@ public class HBaseAuditControlPoint {
highValue = currValue;
}
}
- }
- catch (Exception e) {
- LOG.error("getNextAuditSeqNum IOException" + e);
- e.printStackTrace();
} finally {
ss.close();
}
- }
- catch (IOException e) {
- LOG.error("getNextAuditSeqNum IOException setting up scan for " + lv_tName);
- e.printStackTrace();
- }
- finally {
+ } finally {
try {
remoteTable.close();
remoteConnection.close();
@@ -351,17 +298,9 @@ public class HBaseAuditControlPoint {
public long doControlPoint(final long sequenceNumber) throws IOException {
- if (LOG.isTraceEnabled()) LOG.trace("doControlPoint start");
- try {
- currControlPt++;
- if (LOG.isTraceEnabled()) LOG.trace("doControlPoint interval (" + currControlPt + "), sequenceNumber (" + sequenceNumber+ ") try putRecord");
- putRecord(currControlPt, sequenceNumber);
- }
- catch (Exception e) {
- LOG.error("doControlPoint Exception" + e);
- }
-
- if (LOG.isTraceEnabled()) LOG.trace("doControlPoint - exit");
+ currControlPt++;
+ if (LOG.isTraceEnabled()) LOG.trace("doControlPoint interval (" + currControlPt + "), sequenceNumber (" + sequenceNumber+ ") try putRecord");
+ putRecord(currControlPt, sequenceNumber);
return currControlPt;
}
@@ -369,16 +308,10 @@ public class HBaseAuditControlPoint {
if (LOG.isTraceEnabled()) LOG.trace("deleteRecord start for control point " + controlPoint);
String controlPtString = new String(String.valueOf(controlPoint));
- try {
- List<Delete> list = new ArrayList<Delete>();
- Delete del = new Delete(Bytes.toBytes(controlPtString));
- if (LOG.isDebugEnabled()) LOG.debug("deleteRecord (" + controlPtString + ") ");
- table.delete(del);
- }
- catch (Exception e) {
- LOG.error("deleteRecord IOException");
- }
-
+ List<Delete> list = new ArrayList<Delete>();
+ Delete del = new Delete(Bytes.toBytes(controlPtString));
+ if (LOG.isDebugEnabled()) LOG.debug("deleteRecord (" + controlPtString + ") ");
+ table.delete(del);
if (LOG.isTraceEnabled()) LOG.trace("deleteRecord - exit");
return true;
}
@@ -404,10 +337,7 @@ public class HBaseAuditControlPoint {
}
if (LOG.isDebugEnabled()) LOG.debug("attempting to delete list with " + deleteList.size() + " elements");
table.delete(deleteList);
- }
- catch (Exception e) {
- LOG.error("deleteAgedRecords IOException ", e);
- }finally {
+ } finally {
ss.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f585bc80/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTmZK.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTmZK.java b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTmZK.java
index 27411ec..82eaa9d 100644
--- a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTmZK.java
+++ b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTmZK.java
@@ -68,9 +68,9 @@ public class HBaseTmZK implements Abortable{
/**
* @param conf
- * @throws Exception
+ * @throws IOException
*/
- public HBaseTmZK(final Configuration conf) throws Exception {
+ public HBaseTmZK(final Configuration conf) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("HBaseTmZK(conf) -- ENTRY");
this.dtmID = 0;
this.zkNode = baseNode + "0";
@@ -80,9 +80,9 @@ public class HBaseTmZK implements Abortable{
/**
* @param conf
* @param dtmID
- * @throws Exception
+ * @throws IOException
*/
- public HBaseTmZK(final Configuration conf, final short dtmID) throws Exception {
+ public HBaseTmZK(final Configuration conf, final short dtmID) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("HBaseTmZK(conf, dtmID) -- ENTRY");
this.dtmID = dtmID;
this.zkNode = baseNode + String.format("%d", dtmID);
@@ -176,7 +176,7 @@ public class HBaseTmZK implements Abortable{
}
ZKUtil.createAndFailSilent(zooKeeper, zkNode + "/" + zNodeKey, data);
} catch (KeeperException e) {
- throw new IOException("HBaseTmZK:createRecoveryzNode: ZKW Unable to create recovery zNode: " + zkNode + " , throwing IOException " + e);
+ throw new IOException("HBaseTmZK:createRecoveryzNode: ZKW Unable to create recovery zNode: " + zkNode + " , throwing IOException ", e);
}
}
/**
@@ -193,7 +193,7 @@ public class HBaseTmZK implements Abortable{
String zNodeKey = dtmID+"";
ZKUtil.createSetData(zooKeeper, zNodeGCPath + "/" + zNodeKey, data);
} catch (KeeperException e) {
- throw new IOException("HBaseTmZK:createGCzNode: ZKW Unable to create GC zNode: " + zNodeGCPath +" , throwing IOException " + e);
+ throw new IOException("HBaseTmZK:createGCzNode: ZKW Unable to create GC zNode: " + zNodeGCPath +" , throwing IOException ", e);
}
}
@@ -216,14 +216,8 @@ public class HBaseTmZK implements Abortable{
String hostName = new String(tok.nextElement().toString());
int portNumber = Integer.parseInt(tok.nextElement().toString());
byte [] lv_byte_region_info = region.toByteArray();
- try{
- LOG.info("Calling createRecoveryzNode for encoded region: " + region.getEncodedName());
- createRecoveryzNode(hostName, portNumber, region.getEncodedName(), lv_byte_region_info);
- }
- catch (Exception e2){
- LOG.error("postAllRegionEntries exception in createRecoveryzNode " + region.getTable().getNameAsString() +
- " exception: " + e2);
- }
+ LOG.info("Calling createRecoveryzNode for encoded region: " + region.getEncodedName());
+ createRecoveryzNode(hostName, portNumber, region.getEncodedName(), lv_byte_region_info);
}// while
}
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f585bc80/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
index 4b1c4d8..e6e09ac 100644
--- a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
+++ b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
@@ -71,6 +71,8 @@ import org.apache.hadoop.hbase.regionserver.transactional.IdTm;
import org.apache.hadoop.hbase.regionserver.transactional.IdTmException;
import org.apache.hadoop.hbase.regionserver.transactional.IdTmId;
+import org.apache.zookeeper.KeeperException;
+
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.Callable;
@@ -113,7 +115,7 @@ public class HBaseTxClient {
PropertyConfigurator.configure(confFile);
}
- public boolean init(String hBasePath, String zkServers, String zkPort) throws Exception {
+ public boolean init(String hBasePath, String zkServers, String zkPort) throws IOException {
//System.out.println("In init - hbp");
setupLog4j();
if (LOG.isDebugEnabled()) LOG.debug("Enter init, hBasePath:" + hBasePath);
@@ -135,8 +137,8 @@ public class HBaseTxClient {
useForgotten = (Integer.parseInt(useAuditRecords) != 0);
}
}
- catch (Exception e) {
- if (LOG.isDebugEnabled()) LOG.debug("TM_ENABLE_FORGOTTEN_RECORDS is not in ms.env");
+ catch (NumberFormatException e) {
+ LOG.error("TM_ENABLE_FORGOTTEN_RECORDS is not valid in ms.env");
}
LOG.info("useForgotten is " + useForgotten);
@@ -148,8 +150,8 @@ public class HBaseTxClient {
if (LOG.isDebugEnabled()) LOG.debug("forgottenForce != null");
}
}
- catch (Exception e) {
- if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_FORCE_FORGOTTEN is not in ms.env");
+ catch (NumberFormatException e) {
+ LOG.error("TM_TLOG_FORCE_FORGOTTEN is not valid in ms.env");
}
LOG.info("forceForgotten is " + forceForgotten);
@@ -161,32 +163,27 @@ public class HBaseTxClient {
useTlog = useRecovThread = (Integer.parseInt(useAudit) != 0);
}
}
- catch (Exception e) {
- if (LOG.isDebugEnabled()) LOG.debug("TM_ENABLE_TLOG_WRITES is not in ms.env");
+ catch (NumberFormatException e) {
+ LOG.error("TM_ENABLE_TLOG_WRITES is not valid in ms.env");
}
if (useTlog) {
- try {
tLog = new TmAuditTlog(config);
- } catch (Exception e ){
- LOG.error("Unable to create TmAuditTlog, throwing exception");
- throw new RuntimeException(e);
- }
}
try {
trxManager = TransactionManager.getInstance(config);
} catch (IOException e ){
- LOG.error("Unable to create TransactionManager, throwing exception");
- throw new RuntimeException(e);
+ LOG.error("Unable to create TransactionManager, throwing exception", e);
+ throw e;
}
if (useRecovThread) {
if (LOG.isDebugEnabled()) LOG.debug("Starting recovery thread for tm ID: " + dtmID);
try {
tmZK = new HBaseTmZK(config, dtmID);
- }catch (IOException e ){
- LOG.error("Unable to create HBaseTmZK TM-zookeeper class, throwing exception");
- throw new RuntimeException(e);
+ } catch (IOException e ){
+ LOG.error("Unable to create HBaseTmZK TM-zookeeper class, throwing exception", e);
+ throw e;
}
recovThread = new RecoveryThread(tLog, tmZK, trxManager);
recovThread.start();
@@ -195,7 +192,7 @@ public class HBaseTxClient {
return true;
}
- public boolean init(short dtmid) throws Exception {
+ public boolean init(short dtmid) throws IOException {
//System.out.println("In init - dtmId" + dtmid);
setupLog4j();
@@ -214,89 +211,68 @@ public class HBaseTxClient {
String useSSCC = System.getenv("TM_USE_SSCC");
TRANSACTION_ALGORITHM = AlgorithmType.MVCC;
- if (useSSCC != null)
- TRANSACTION_ALGORITHM = (Integer.parseInt(useSSCC) == 1) ? AlgorithmType.SSCC :AlgorithmType.MVCC ;
-
try {
- idServer = new IdTm(false);
- }
- catch (Exception e){
- LOG.error("Exception creating new IdTm: " + e);
+ if (useSSCC != null)
+ TRANSACTION_ALGORITHM = (Integer.parseInt(useSSCC) == 1) ? AlgorithmType.SSCC :AlgorithmType.MVCC ;
+ } catch (NumberFormatException e) {
+ LOG.error("TRANSACTION_ALGORITHM is not valid in ms.env");
}
+ idServer = new IdTm(false);
+
+ String useDDLTransactions = System.getenv("TM_ENABLE_DDL_TRANS");
+
try {
- String useDDLTransactions = System.getenv("TM_ENABLE_DDL_TRANS");
if (useDDLTransactions != null) {
useDDLTrans = (Integer.parseInt(useDDLTransactions) != 0);
}
}
- catch (Exception e) {
- if (LOG.isDebugEnabled()) LOG.debug("TM_ENABLE_DDL_TRANS is not in ms.env");
+ catch (NumberFormatException e) {
+ LOG.error("TM_ENABLE_DDL_TRANS is not valid in ms.env");
}
- if(useDDLTrans){
- try {
- tmDDL = new TmDDL(config);
- }
- catch (Exception e) {
- LOG.error("Unable to create TmDDL, throwing exception " + e);
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- }
+ if (useDDLTrans)
+ tmDDL = new TmDDL(config);
useForgotten = true;
- try {
String useAuditRecords = System.getenv("TM_ENABLE_FORGOTTEN_RECORDS");
+ try {
if (useAuditRecords != null) {
useForgotten = (Integer.parseInt(useAuditRecords) != 0);
}
}
- catch (Exception e) {
- if (LOG.isDebugEnabled()) LOG.debug("TM_ENABLE_FORGOTTEN_RECORDS is not in ms.env");
+ catch (NumberFormatException e) {
+ LOG.error("TM_ENABLE_FORGOTTEN_RECORDS is not valid in ms.env");
}
LOG.info("useForgotten is " + useForgotten);
forceForgotten = false;
- try {
String forgottenForce = System.getenv("TM_TLOG_FORCE_FORGOTTEN");
+ try {
if (forgottenForce != null){
forceForgotten = (Integer.parseInt(forgottenForce) != 0);
- if (LOG.isDebugEnabled()) LOG.debug("forgottenForce != null");
}
}
- catch (Exception e) {
- if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_FORCE_FORGOTTEN is not in ms.env");
+ catch (NumberFormatException e) {
+ LOG.error("TM_TLOG_FORCE_FORGOTTEN is not valid in ms.env");
}
LOG.info("forceForgotten is " + forceForgotten);
useTlog = false;
useRecovThread = false;
- try {
String useAudit = System.getenv("TM_ENABLE_TLOG_WRITES");
+ try {
if (useAudit != null){
useTlog = useRecovThread = (Integer.parseInt(useAudit) != 0);
}
}
- catch (Exception e) {
- if (LOG.isDebugEnabled()) LOG.debug("TM_ENABLE_TLOG_WRITES is not in ms.env");
+ catch (NumberFormatException e) {
+ LOG.error("TM_ENABLE_TLOG_WRITES is not valid in ms.env");
}
if (useTlog) {
- try {
- tLog = new TmAuditTlog(config);
- } catch (Exception e ){
- LOG.error("Unable to create TmAuditTlog, throwing exception " + e);
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- }
- try {
- trxManager = TransactionManager.getInstance(config);
- } catch (IOException e ){
- LOG.error("Unable to create TransactionManager, Exception: " + e + "throwing new RuntimeException");
- throw new RuntimeException(e);
+ tLog = new TmAuditTlog(config);
}
-
+ trxManager = TransactionManager.getInstance(config);
if(useDDLTrans)
trxManager.init(tmDDL);
@@ -304,9 +280,9 @@ public class HBaseTxClient {
if (LOG.isDebugEnabled()) LOG.debug("Entering recovThread Usage");
try {
tmZK = new HBaseTmZK(config, dtmID);
- }catch (IOException e ){
- LOG.error("Unable to create HBaseTmZK TM-zookeeper class, throwing exception");
- throw new RuntimeException(e);
+ } catch (IOException e ) {
+ LOG.error("Unable to create HBaseTmZK TM-zookeeper class, throwing exception", e);
+ throw e;
}
recovThread = new RecoveryThread(tLog,
tmZK,
@@ -332,7 +308,6 @@ public class HBaseTxClient {
if(dtmID == nodeID)
throw new IOException("Down node ID is the same as current dtmID, Incorrect parameter");
- try {
if(mapRecoveryThreads.containsKey(nodeID)) {
if(LOG.isDebugEnabled()) LOG.debug("nodeDown called on a node that already has RecoveryThread running node ID: " + nodeID);
}
@@ -348,10 +323,6 @@ public class HBaseTxClient {
mapRecoveryThreads.put(nodeID, recovThread);
if(LOG.isTraceEnabled()) LOG.trace("nodeDown -- mapRecoveryThreads size: " + mapRecoveryThreads.size());
}
- }
- catch(Exception e) {
- LOG.error("Unable to create rescue recovery thread for TM" + dtmID);
- }
if(LOG.isTraceEnabled()) LOG.trace("nodeDown -- EXIT node ID: " + nodeID);
}
@@ -364,9 +335,15 @@ public class HBaseTxClient {
return;
}
rt.stopThread();
+ boolean loopBack = false;
+ do {
try {
rt.join();
- } catch (Exception e) { LOG.warn("Problem while waiting for the recovery thread to stop for node ID: " + nodeID); }
+ } catch (InterruptedException e) {
+ LOG.warn("Problem while waiting for the recovery thread to stop for node ID: " + nodeID, e);
+ loopBack = true;
+ }
+ } while (loopBack);
mapRecoveryThreads.remove(nodeID);
if(LOG.isTraceEnabled()) LOG.trace("nodeUp -- mapRecoveryThreads size: " + mapRecoveryThreads.size());
if(LOG.isTraceEnabled()) LOG.trace("nodeUp -- EXIT node ID: " + nodeID);
@@ -378,15 +355,17 @@ public class HBaseTxClient {
return TransReturnCode.RET_OK.getShort();
}
- public long beginTransaction(final long transactionId) throws Exception {
+ public long beginTransaction(final long transactionId) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("Enter beginTransaction, txid: " + transactionId);
- TransactionState tx = trxManager.beginTransaction(transactionId);
+ TransactionState tx = null;
+ try {
+ tx = trxManager.beginTransaction(transactionId);
+ } catch (IdTmException ite) {
+ LOG.error("Begin Transaction Error caused by : ", ite);
+ throw new IOException("Begin Transaction Error caused by :", ite);
+ }
if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:beginTransaction new transactionState created: " + tx);
- if(tx == null) {
- LOG.error("null Transaction State returned by the Transaction Manager, txid: " + transactionId);
- throw new Exception("TransactionState is null");
- }
synchronized(mapLock) {
TransactionState tx2 = mapTransactionStates.get(transactionId);
@@ -406,7 +385,7 @@ public class HBaseTxClient {
return transactionId;
}
- public short abortTransaction(final long transactionID) throws Exception {
+ public short abortTransaction(final long transactionID) throws IOException {
if (LOG.isDebugEnabled()) LOG.debug("Enter abortTransaction, txid: " + transactionID);
TransactionState ts = mapTransactionStates.get(transactionID);
@@ -420,14 +399,19 @@ public class HBaseTxClient {
if (useTlog) {
tLog.putSingleRecord(transactionID, -1, "ABORTED", ts.getParticipatingRegions(), false);
}
- } catch(Exception e) {
- LOG.error("Returning from HBaseTxClient:abortTransaction, txid: " + transactionID + " tLog.putRecord: EXCEPTION");
+ } catch(IOException e) {
+ LOG.error("Returning from HBaseTxClient:abortTransaction, txid: " + transactionID + " tLog.putRecord: EXCEPTION", e);
return TransReturnCode.RET_EXCEPTION.getShort();
}
if ((stallWhere == 1) || (stallWhere == 3)) {
LOG.info("Stalling in phase 2 for abortTransaction");
- Thread.sleep(300000); // Initially set to run every 5 min
+ boolean loopBack = true;
+ try {
+ Thread.sleep(300000); // Initially set to run every 5 min
+ } catch(InterruptedException ie) {
+ loopBack = true;
+ } while (loopBack);
}
try {
@@ -436,11 +420,11 @@ public class HBaseTxClient {
synchronized(mapLock) {
mapTransactionStates.remove(transactionID);
}
- LOG.error("Returning from HBaseTxClient:abortTransaction, txid: " + transactionID + " retval: EXCEPTION");
+ LOG.error("Returning from HBaseTxClient:abortTransaction, txid: " + transactionID + " retval: EXCEPTION", e);
return TransReturnCode.RET_EXCEPTION.getShort();
}
catch (UnsuccessfulDDLException ddle) {
- LOG.error("FATAL DDL Exception from HBaseTxClient:abort, WAITING INDEFINETLY !! retval: " + TransReturnCode.RET_EXCEPTION.toString() + " UnsuccessfulDDLException" + " txid: " + transactionID);
+ LOG.error("FATAL DDL Exception from HBaseTxClient:abort, WAITING INDEFINETLY !! retval: " + TransReturnCode.RET_EXCEPTION.toString() + " UnsuccessfulDDLException" + " txid: " + transactionID, ddle);
//Reaching here means several attempts to perform the DDL operation has failed in abort phase.
//Generally if only DML operation is involved, returning error causes TM to call completeRequest()
@@ -452,7 +436,13 @@ public class HBaseTxClient {
Object commitDDLLock = new Object();
synchronized(commitDDLLock)
{
- commitDDLLock.wait();
+ boolean loopBack = false;
+ try {
+ commitDDLLock.wait();
+ } catch(InterruptedException ie) {
+ LOG.warn("Interrupting commitDDLLock.wait, but retrying ", ie);
+ loopBack = true;
+ } while (loopBack);
}
return TransReturnCode.RET_EXCEPTION.getShort();
}
@@ -470,7 +460,7 @@ public class HBaseTxClient {
return TransReturnCode.RET_OK.getShort();
}
- public short prepareCommit(long transactionId) throws Exception {
+ public short prepareCommit(long transactionId) throws IOException {
if (LOG.isDebugEnabled()) LOG.debug("Enter prepareCommit, txid: " + transactionId);
if (LOG.isTraceEnabled()) LOG.trace("mapTransactionStates " + mapTransactionStates + " entries " + mapTransactionStates.size());
TransactionState ts = mapTransactionStates.get(transactionId);
@@ -503,20 +493,16 @@ public class HBaseTxClient {
return TransReturnCode.RET_EXCEPTION.getShort();
}
} catch (CommitUnsuccessfulException e) {
- LOG.error("Returning from HBaseTxClient:prepareCommit, txid: " + transactionId + " retval: " + TransReturnCode.RET_NOCOMMITEX.toString() + " CommitUnsuccessfulException");
+ LOG.error("Returning from HBaseTxClient:prepareCommit, txid: " + transactionId + " retval: " + TransReturnCode.RET_NOCOMMITEX.toString() + " CommitUnsuccessfulException", e);
return TransReturnCode.RET_NOCOMMITEX.getShort();
}
catch (IOException e) {
- LOG.error("Returning from HBaseTxClient:prepareCommit, txid: " + transactionId + " retval: " + TransReturnCode.RET_IOEXCEPTION.toString() + " IOException");
+ LOG.error("Returning from HBaseTxClient:prepareCommit, txid: " + transactionId + " retval: " + TransReturnCode.RET_IOEXCEPTION.toString() + " IOException", e);
return TransReturnCode.RET_IOEXCEPTION.getShort();
}
- catch (Exception e) {
- LOG.error("Returning from HBaseTxClient:prepareCommit, txid: " + transactionId + " retval: " + TransReturnCode.RET_NOCOMMITEX.toString() + " Exception " + e);
- return TransReturnCode.RET_NOCOMMITEX.getShort();
- }
}
- public short doCommit(long transactionId) throws Exception {
+ public short doCommit(long transactionId) throws IOException, CommitUnsuccessfulException {
if (LOG.isDebugEnabled()) LOG.debug("Enter doCommit, txid: " + transactionId);
TransactionState ts = mapTransactionStates.get(transactionId);
@@ -534,8 +520,8 @@ public class HBaseTxClient {
idServer.id(ID_TM_SERVER_TIMEOUT, commitId);
if (LOG.isTraceEnabled()) LOG.trace("doCommit idServer.id returned: " + commitId.val);
} catch (IdTmException exc) {
- LOG.error("doCommit: IdTm threw exception " + exc);
- throw new CommitUnsuccessfulException("doCommit: IdTm threw exception " + exc);
+ LOG.error("doCommit: IdTm threw exception " , exc);
+ throw new CommitUnsuccessfulException("doCommit: IdTm threw exception " , exc);
}
}
@@ -548,24 +534,29 @@ public class HBaseTxClient {
if (useTlog) {
tLog.putSingleRecord(transactionId, commitIdVal, "COMMITTED", ts.getParticipatingRegions(), true);
}
- } catch(Exception e) {
- LOG.error("Returning from HBaseTxClient:doCommit, txid: " + transactionId + " tLog.putRecord: EXCEPTION " + e);
+ } catch(IOException e) {
+ LOG.error("Returning from HBaseTxClient:doCommit, txid: " + transactionId + " tLog.putRecord: EXCEPTION ", e);
return TransReturnCode.RET_EXCEPTION.getShort();
}
if ((stallWhere == 2) || (stallWhere == 3)) {
LOG.info("Stalling in phase 2 for doCommit");
- Thread.sleep(300000); // Initially set to run every 5 min
+ boolean loopBack = false;
+ try {
+ Thread.sleep(300000); // Initially set to run every 5 min
+ } catch(InterruptedException ie) {
+ loopBack = true;
+ } while (loopBack);
}
try {
trxManager.doCommit(ts);
} catch (CommitUnsuccessfulException e) {
- LOG.error("Returning from HBaseTxClient:doCommit, retval: " + TransReturnCode.RET_EXCEPTION.toString() + " IOException" + " txid: " + transactionId);
+ LOG.error("Returning from HBaseTxClient:doCommit, retval: " + TransReturnCode.RET_EXCEPTION.toString() + " IOException" + " txid: " + transactionId, e);
return TransReturnCode.RET_EXCEPTION.getShort();
}
catch (UnsuccessfulDDLException ddle) {
- LOG.error("FATAL DDL Exception from HBaseTxClient:doCommit, WAITING INDEFINETLY !! retval: " + TransReturnCode.RET_EXCEPTION.toString() + " UnsuccessfulDDLException" + " txid: " + transactionId);
+ LOG.error("FATAL DDL Exception from HBaseTxClient:doCommit, WAITING INDEFINETLY !! retval: " + TransReturnCode.RET_EXCEPTION.toString() + " UnsuccessfulDDLException" + " txid: " + transactionId, ddle);
//Reaching here means several attempts to perform the DDL operation has failed in commit phase.
//Generally if only DML operation is involved, returning error causes TM to call completeRequest()
@@ -577,7 +568,13 @@ public class HBaseTxClient {
Object commitDDLLock = new Object();
synchronized(commitDDLLock)
{
- commitDDLLock.wait();
+ boolean loopBack = false;
+ try {
+ commitDDLLock.wait();
+ } catch(InterruptedException ie) {
+ LOG.warn("Interrupting commitDDLLock.wait, but retrying ", ie);
+ loopBack = true;
+ } while (loopBack);
}
return TransReturnCode.RET_EXCEPTION.getShort();
}
@@ -597,7 +594,7 @@ public class HBaseTxClient {
return TransReturnCode.RET_OK.getShort();
}
- public short completeRequest(long transactionId) throws Exception {
+ public short completeRequest(long transactionId) throws IOException, CommitUnsuccessfulException {
if (LOG.isDebugEnabled()) LOG.debug("Enter completeRequest, txid: " + transactionId);
TransactionState ts = mapTransactionStates.get(transactionId);
@@ -605,16 +602,17 @@ public class HBaseTxClient {
LOG.error("Returning from HBaseTxClient:completeRequest, (null tx) retval: " + TransReturnCode.RET_NOTX.toString() + " txid: " + transactionId);
return TransReturnCode.RET_NOTX.getShort();
}
-
+
+ boolean loopBack = false;
try {
if (LOG.isTraceEnabled()) LOG.trace("TEMP completeRequest Calling CompleteRequest() Txid :" + transactionId);
ts.completeRequest();
- } catch(Exception e) {
- LOG.error("Returning from HBaseTxClient:completeRequest, ts.completeRequest: txid: " + transactionId + ", EXCEPTION: " + e);
- throw new Exception("Exception during completeRequest, unable to commit. Exception: " + e);
- }
+ } catch(InterruptedException ie) {
+ LOG.warn("Interrupting HBaseTxClient:completeRequest but retrying, ts.completeRequest: txid: " + transactionId + ", EXCEPTION: ", ie);
+ loopBack = true;
+ } while (loopBack);
synchronized(mapLock) {
mapTransactionStates.remove(transactionId);
@@ -624,7 +622,7 @@ public class HBaseTxClient {
return TransReturnCode.RET_OK.getShort();
}
- public short tryCommit(long transactionId) throws Exception {
+ public short tryCommit(long transactionId) throws IOException, CommitUnsuccessfulException {
if (LOG.isDebugEnabled()) LOG.debug("Enter tryCommit, txid: " + transactionId);
short err, commitErr, abortErr = TransReturnCode.RET_OK.getShort();
@@ -650,10 +648,10 @@ public class HBaseTxClient {
if (err != TransReturnCode.RET_OK.getShort()){
if (LOG.isDebugEnabled()) LOG.debug("tryCommit completeRequest for transaction " + transactionId + " failed with error " + err);
}
- } catch(Exception e) {
+ } catch(IOException e) {
mapTransactionStates.remove(transactionId);
- LOG.error("Returning from HBaseTxClient:tryCommit, ts: EXCEPTION" + " txid: " + transactionId);
- throw new Exception("Exception " + e + "during tryCommit, unable to commit.");
+ LOG.error("Returning from HBaseTxClient:tryCommit, ts: EXCEPTION" + " txid: " + transactionId, e);
+ throw new IOException("Exception during tryCommit, unable to commit.", e);
}
synchronized(mapLock) {
@@ -664,10 +662,10 @@ public class HBaseTxClient {
return TransReturnCode.RET_OK.getShort();
}
- public short callCreateTable(long transactionId, byte[] pv_htbldesc, Object[] beginEndKeys) throws Exception
+ public short callCreateTable(long transactionId, byte[] pv_htbldesc, Object[] beginEndKeys) throws IOException
{
TransactionState ts;
- HTableDescriptor htdesc;
+ HTableDescriptor htdesc = null;
if (LOG.isTraceEnabled()) LOG.trace("Enter callCreateTable, txid: [" + transactionId + "], htbldesc bytearray: " + pv_htbldesc + "desc in hex: " + Hex.encodeHexString(pv_htbldesc));
@@ -676,40 +674,17 @@ public class HBaseTxClient {
LOG.error("Returning from HBaseTxClient:callCreateTable, (null tx) retval: " + TransReturnCode.RET_NOTX.getShort() + " txid: " + transactionId);
return TransReturnCode.RET_NOTX.getShort();
}
-
try {
htdesc = HTableDescriptor.parseFrom(pv_htbldesc);
+ } catch (DeserializationException de) {
+ LOG.error("Error while getting HTableDescriptor caused by : ", de);
+ throw new IOException("Error while getting HTableDescriptor caused by : ", de);
}
- catch(Exception e) {
- if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callCreateTable exception in htdesc parseFrom, retval: " +
- TransReturnCode.RET_EXCEPTION.toString() +
- " txid: " + transactionId +
- " DeserializationException: " + e);
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- e.printStackTrace(pw);
- LOG.error(sw.toString());
-
- throw new Exception("DeserializationException in callCreateTable parseFrom, unable to send callCreateTable");
- }
-
- try {
- trxManager.createTable(ts, htdesc, beginEndKeys);
- }
- catch (Exception cte) {
- if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callCreateTable exception trxManager.createTable, retval: " +
- TransReturnCode.RET_EXCEPTION.toString() +" txid: " + transactionId +" Exception: " + cte);
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- cte.printStackTrace(pw);
- LOG.error("HBaseTxClient createTable call error: " + sw.toString());
-
- throw new Exception("createTable call error");
- }
+ trxManager.createTable(ts, htdesc, beginEndKeys);
return TransReturnCode.RET_OK.getShort();
}
- public short callAlterTable(long transactionId, byte[] pv_tblname, Object[] tableOptions) throws Exception
+ public short callAlterTable(long transactionId, byte[] pv_tblname, Object[] tableOptions) throws IOException
{
TransactionState ts;
String strTblName = new String(pv_tblname, "UTF-8");
@@ -722,23 +697,11 @@ public class HBaseTxClient {
return TransReturnCode.RET_NOTX.getShort();
}
- try {
- trxManager.alterTable(ts, strTblName, tableOptions);
- }
- catch (Exception cte) {
- if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callAlterTable exception trxManager.alterTable, retval: " +
- TransReturnCode.RET_EXCEPTION.toString() +" txid: " + transactionId +" Exception: " + cte);
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- cte.printStackTrace(pw);
- LOG.error("HBaseTxClient alterTable call error: " + sw.toString());
-
- throw new Exception("alterTable call error");
- }
+ trxManager.alterTable(ts, strTblName, tableOptions);
return TransReturnCode.RET_OK.getShort();
}
- public short callRegisterTruncateOnAbort(long transactionId, byte[] pv_tblname) throws Exception
+ public short callRegisterTruncateOnAbort(long transactionId, byte[] pv_tblname) throws IOException
{
TransactionState ts;
String strTblName = new String(pv_tblname, "UTF-8");
@@ -751,23 +714,11 @@ public class HBaseTxClient {
return TransReturnCode.RET_NOTX.getShort();
}
- try {
- trxManager.registerTruncateOnAbort(ts, strTblName);
- }
- catch (Exception e) {
- if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callRegisterTruncateOnAbort exception trxManager.registerTruncateOnAbort, retval: " +
- TransReturnCode.RET_EXCEPTION.toString() +" txid: " + transactionId +" Exception: " + e);
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- e.printStackTrace(pw);
- String msg = "HBaseTxClient registerTruncateOnAbort call error ";
- LOG.error(msg + " : " + sw.toString());
- throw new Exception(msg);
- }
+ trxManager.registerTruncateOnAbort(ts, strTblName);
return TransReturnCode.RET_OK.getShort();
}
- public short callDropTable(long transactionId, byte[] pv_tblname) throws Exception
+ public short callDropTable(long transactionId, byte[] pv_tblname) throws IOException
{
TransactionState ts;
String strTblName = new String(pv_tblname, "UTF-8");
@@ -780,17 +731,7 @@ public class HBaseTxClient {
return TransReturnCode.RET_NOTX.getShort();
}
- try {
- trxManager.dropTable(ts, strTblName);
- }
- catch (Exception cte) {
- if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callDropTable exception trxManager.dropTable, retval: " +
- TransReturnCode.RET_EXCEPTION.toString() +" txid: " + transactionId +" Exception: " + cte);
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- cte.printStackTrace(pw);
- LOG.error("HBaseTxClient dropTable call error: " + sw.toString());
- }
+ trxManager.dropTable(ts, strTblName);
return TransReturnCode.RET_OK.getShort();
}
@@ -799,7 +740,7 @@ public class HBaseTxClient {
int pv_port,
byte[] pv_hostname,
long pv_startcode,
- byte[] pv_regionInfo) throws Exception {
+ byte[] pv_regionInfo) throws IOException {
String hostname = new String(pv_hostname);
if (LOG.isTraceEnabled()) LOG.trace("Enter callRegisterRegion, txid: [" + transactionId + "], startId: " + startId + ", port: "
+ pv_port + ", hostname: " + hostname + ", reg info len: " + pv_regionInfo.length + " " + new String(pv_regionInfo, "UTF-8"));
@@ -807,16 +748,9 @@ public class HBaseTxClient {
HRegionInfo lv_regionInfo;
try {
lv_regionInfo = HRegionInfo.parseFrom(pv_regionInfo);
- }
- catch (Exception de) {
- if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callRegisterRegion exception in lv_regionInfo parseFrom, retval: " +
- TransReturnCode.RET_EXCEPTION.toString() + " txid: " + transactionId + " DeserializationException: " + de);
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- de.printStackTrace(pw);
- LOG.error(sw.toString());
-
- throw new Exception("DeserializationException in lv_regionInfo parseFrom, unable to register region");
+ } catch (DeserializationException de) {
+ LOG.error("Error while getting regionInfo caused by : ", de);
+ throw new IOException("Error while getting regionInfo caused by : ", de);
}
// TODO Not in CDH 5.1 ServerName lv_servername = ServerName.valueOf(hostname, pv_port, pv_startcode);
@@ -832,10 +766,9 @@ public class HBaseTxClient {
") not found in mapTransactionStates of size: " + mapTransactionStates.size());
try {
ts = trxManager.beginTransaction(transactionId);
- }
- catch (IdTmException exc) {
- LOG.error("HBaseTxClient: beginTransaction for tx (" + transactionId + ") caught exception " + exc);
- throw new IdTmException("HBaseTxClient: beginTransaction for tx (" + transactionId + ") caught exception " + exc);
+ } catch (IdTmException ite) {
+ LOG.error("Begin Transaction Error caused by : ", ite);
+ throw new IOException("Begin Transaction Error caused by :", ite);
}
synchronized (mapLock) {
TransactionState ts2 = mapTransactionStates.get(transactionId);
@@ -863,7 +796,7 @@ public class HBaseTxClient {
trxManager.registerRegion(ts, regionLocation);
} catch (IOException e) {
LOG.error("HBaseTxClient:callRegisterRegion exception in registerRegion call, txid: " + transactionId +
- " retval: " + TransReturnCode.RET_EXCEPTION.toString() + " IOException " + e);
+ " retval: " + TransReturnCode.RET_EXCEPTION.toString() + " IOException " , e);
return TransReturnCode.RET_EXCEPTION.getShort();
}
@@ -878,7 +811,7 @@ public class HBaseTxClient {
return TransReturnCode.RET_OK.getShort();
}
- public int participatingRegions(long transactionId) throws Exception {
+ public int participatingRegions(long transactionId) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("Enter participatingRegions, txid: " + transactionId);
TransactionState ts = mapTransactionStates.get(transactionId);
if(ts == null) {
@@ -900,17 +833,11 @@ public class HBaseTxClient {
return participants;
}
- public long addControlPoint() throws Exception {
+ public long addControlPoint() throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("Enter addControlPoint");
long result = 0L;
- try {
- if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient calling tLog.addControlPoint with mapsize " + mapTransactionStates.size());
- result = tLog.addControlPoint(mapTransactionStates);
- }
- catch(IOException e){
- LOG.error("addControlPoint IOException " + e);
- throw e;
- }
+ if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient calling tLog.addControlPoint with mapsize " + mapTransactionStates.size());
+ result = tLog.addControlPoint(mapTransactionStates);
Long lowestStartId = Long.MAX_VALUE;
for(ConcurrentHashMap.Entry<Long, TransactionState> entry : mapTransactionStates.entrySet()){
TransactionState value;
@@ -987,7 +914,7 @@ public class HBaseTxClient {
this.continueThread = false;
}
- private void addRegionToTS(String hostnamePort, byte[] regionInfo, TransactionState ts) throws Exception{
+ private void addRegionToTS(String hostnamePort, byte[] regionInfo, TransactionState ts) throws IOException{
HRegionInfo regionInfoLoc; // = new HRegionInfo();
final byte [] delimiter = ",".getBytes();
String[] result = hostnamePort.split(new String(delimiter), 3);
@@ -998,21 +925,14 @@ public class HBaseTxClient {
String hostname = result[0];
int port = Integer.parseInt(result[1]);
try {
- regionInfoLoc = HRegionInfo.parseFrom(regionInfo);
- }
- catch(Exception e) {
- LOG.error("Unable to parse region byte array, " + e);
- throw e;
+ regionInfoLoc = HRegionInfo.parseFrom(regionInfo);
+ } catch (DeserializationException de) {
+ throw new IOException(de);
}
- /*
ByteArrayInputStream lv_bis = new ByteArrayInputStream(regionInfo);
DataInputStream lv_dis = new DataInputStream(lv_bis);
- try {
- regionInfoLoc.readFields(lv_dis);
- } catch (Exception e) {
- throw new Exception();
- }
- */
+ regionInfoLoc.readFields(lv_dis);
+
//HBase98 TODO: need to set the value of startcode correctly
//HBase98 TODO: Not in CDH 5.1: ServerName lv_servername = ServerName.valueOf(hostname, port, 0);
@@ -1033,17 +953,15 @@ public class HBaseTxClient {
Map<String, byte[]> regions = null;
Map<Long, TransactionState> transactionStates =
new HashMap<Long, TransactionState>();
+ boolean loopBack = false;
+ do {
try {
- regions = zookeeper.checkForRecovery();
- } catch (Exception e) {
- if (regions != null) { // ignore no object returned by zookeeper.checkForRecovery
- LOG.error("An ERROR occurred while checking for regions to recover. " + "TM: " + tmID);
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- e.printStackTrace(pw);
- LOG.error(sw.toString());
- }
+ regions = zookeeper.checkForRecovery();
+ } catch(InterruptedException ie) {
+ loopBack = true;
+ } catch (KeeperException ze) {
}
+ } while (loopBack);
if(regions != null) {
skipSleep = true;
@@ -1090,77 +1008,39 @@ public class HBaseTxClient {
}catch (NotServingRegionException e) {
TxRecoverList = null;
LOG.error("TRAF RCOV THREAD:NotServingRegionException calling recoveryRequest. regionBytes: " + new String(regionBytes) +
- " TM: " + tmID + " hostnamePort: " + hostnamePort);
+ " TM: " + tmID + " hostnamePort: " + hostnamePort, e);
- try {
// First delete the zookeeper entry
LOG.error("TRAF RCOV THREAD:recoveryRequest. Deleting region entry Entry: " + regionEntry);
zookeeper.deleteRegionEntry(regionEntry);
- }
- catch (Exception e2) {
- LOG.error("TRAF RCOV THREAD:Error calling deleteRegionEntry. regionEntry key: " + regionEntry.getKey() + " regionEntry value: " +
- new String(regionEntry.getValue()) + " exception: " + e2);
- }
- try {
// Create a local HTable object using the regionInfo
HTable table = new HTable(config, HRegionInfo.parseFrom(regionBytes).getTable().getNameAsString());
- try {
- // Repost a zookeeper entry for all current regions in the table
- zookeeper.postAllRegionEntries(table);
- }
- catch (Exception e2) {
- LOG.error("TRAF RCOV THREAD:Error calling postAllRegionEntries. table: " + new String(table.getTableName()) + " exception: " + e2);
- }
- }// try
- catch (Exception e1) {
- LOG.error("TRAF RCOV THREAD:recoveryRequest exception in new HTable " + HRegionInfo.parseFrom(regionBytes).getTable().getNameAsString() + " Exception: " + e1);
- }
+ // Repost a zookeeper entry for all current regions in the table
+ zookeeper.postAllRegionEntries(table);
}// NotServingRegionException
catch (TableNotFoundException tnfe) {
// In this case there is nothing to recover. We just need to delete the region entry.
- try {
// First delete the zookeeper entry
LOG.warn("TRAF RCOV THREAD:TableNotFoundException calling txnManager.recoveryRequest. " + "TM: " +
tmID + " regionBytes: [" + regionBytes + "]. Deleting zookeeper region entry. \n exception: " + tnfe);
zookeeper.deleteRegionEntry(regionEntry);
- }
- catch (Exception e2) {
- LOG.error("TRAF RCOV THREAD:Error calling deleteRegionEntry. regionEntry key: " + regionEntry.getKey() + " regionEntry value: " +
- new String(regionEntry.getValue()) + " exception: " + e2);
- }
}// TableNotFoundException
catch (DeserializationException de) {
// We are unable to parse the region info from ZooKeeper We just need to delete the region entry.
- try {
// First delete the zookeeper entry
LOG.warn("TRAF RCOV THREAD:DeserializationException calling txnManager.recoveryRequest. " + "TM: " +
tmID + " regionBytes: [" + regionBytes + "]. Deleting zookeeper region entry. \n exception: " + de);
zookeeper.deleteRegionEntry(regionEntry);
- }
- catch (Exception e2) {
- LOG.error("TRAF RCOV THREAD:Error calling deleteRegionEntry. regionEntry key: " + regionEntry.getKey() + " regionEntry value: " +
- new String(regionEntry.getValue()) + " exception: " + e2);
- }
-
}// DeserializationException
- catch (Exception e) {
- LOG.error("TRAF RCOV THREAD:An ERROR occurred calling txnManager.recoveryRequest. " + "TM: " +
- tmID + " regionBytes: [" + regionBytes + "] exception: " + e);
- }
+
if (TxRecoverList != null) {
if (LOG.isDebugEnabled()) LOG.trace("TRAF RCOV THREAD:size of TxRecoverList " + TxRecoverList.size());
if (TxRecoverList.size() == 0) {
- try {
// First delete the zookeeper entry
LOG.warn("TRAF RCOV THREAD:Leftover Znode calling txnManager.recoveryRequest. " + "TM: " +
tmID + " regionBytes: [" + regionBytes + "]. Deleting zookeeper region entry. ");
zookeeper.deleteRegionEntry(regionEntry);
- }
- catch (Exception e2) {
- LOG.error("TRAF RCOV THREAD:Error calling deleteRegionEntry. regionEntry key: " + regionEntry.getKey() + " regionEntry value: " +
- new String(regionEntry.getValue()) + " exception: " + e2);
- }
}
for (Long txid : TxRecoverList) {
TransactionState ts = transactionStates.get(txid);
@@ -1171,22 +1051,12 @@ public class HBaseTxClient {
if(hbtx.useDDLTrans){
TmDDL tmDDL = hbtx.getTmDDL();
StringBuilder state = new StringBuilder ();
- try {
- tmDDL.getState(txid,state);
- }
- catch(Exception egetState){
- LOG.error("TRAF RCOV THREAD:Error calling TmDDL getState." + egetState);
- }
+ tmDDL.getState(txid,state);
if(state.toString().equals("VALID"))
ts.setDDLTx(true);
}
}
- try {
- this.addRegionToTS(hostnamePort, regionBytes, ts);
- } catch (Exception e) {
- LOG.error("TRAF RCOV THREAD:Unable to add region to TransactionState, region info: " + new String(regionBytes));
- e.printStackTrace();
- }
+ this.addRegionToTS(hostnamePort, regionBytes, ts);
transactionStates.put(txid, ts);
}
}
@@ -1219,24 +1089,20 @@ public class HBaseTxClient {
txnManager.abort(ts);
}
- }catch (UnsuccessfulDDLException ddle) {
- LOG.error("UnsuccessfulDDLException encountered by Recovery Thread. Registering for retry. txID: " + txID + "Exception " + ddle);
- ddle.printStackTrace();
+ } catch (UnsuccessfulDDLException ddle) {
+ LOG.error("UnsuccessfulDDLException encountered by Recovery Thread. Registering for retry. txID: " + txID + "Exception " , ddle);
//Note that there may not be anymore redrive triggers from region server point of view for DDL operation.
//Register this DDL transaction for subsequent redrive from Audit Control Event.
//TODO: Launch a new Redrive Thread out of auditControlPoint().
TmDDL tmDDL = hbtx.getTmDDL();
- try {
- tmDDL.setState(txID,"REDRIVE");
- }
- catch(Exception e){
- LOG.error("TRAF RCOV THREAD:Error calling TmDDL putRow Redrive" + e);
- }
- }
- catch (Exception e) {
- LOG.error("Unable to get audit record for tx: " + txID + ", audit is throwing exception.");
- e.printStackTrace();
+ tmDDL.setState(txID,"REDRIVE");
+ LOG.error("TRAF RCOV THREAD:Error calling TmDDL putRow Redrive");
+ } catch (CommitUnsuccessfulException cue) {
+ LOG.error("CommitUnsuccessfulException encountered by Recovery Thread. Registering for retry. txID: " + txID + "Exception " , cue);
+ TmDDL tmDDL = hbtx.getTmDDL();
+ tmDDL.setState(txID,"REDRIVE");
+ LOG.error("TRAF RCOV THREAD:Error calling TmDDL putRow Redrive");
}
}
@@ -1257,17 +1123,40 @@ public class HBaseTxClient {
}
}
retryCount = 0;
- } catch (Exception e) {
+ } catch (InterruptedException e) {
LOG.error("Error in recoveryThread: " + e);
}
+ } catch (IOException e) {
+ int possibleRetries = 4;
+ LOG.error("Caught recovery thread exception for tmid: " + tmID + " retries: " + retryCount, e);
+
+ retryCount++;
+ if(retryCount > possibleRetries) {
+ LOG.error("Recovery thread failure, aborting process");
+ System.exit(4);
+ }
+
+ try {
+ Thread.sleep(SLEEP_DELAY / possibleRetries);
+ } catch(InterruptedException se) {
+ }
+ } catch (KeeperException ze) {
+ int possibleRetries = 4;
+ LOG.error("Caught recovery thread exception for tmid: " + tmID + " retries: " + retryCount, ze);
- } catch (Exception e) {
+ retryCount++;
+ if(retryCount > possibleRetries) {
+ LOG.error("Recovery thread failure, aborting process");
+ System.exit(4);
+ }
+
+ try {
+ Thread.sleep(SLEEP_DELAY / possibleRetries);
+ } catch(InterruptedException se) {
+ }
+ } catch (DeserializationException de) {
int possibleRetries = 4;
- LOG.error("Caught recovery thread exception for tmid: " + tmID + " retries: " + retryCount);
- StringWriter sw = new StringWriter();
- PrintWriter pw = new PrintWriter(sw);
- e.printStackTrace(pw);
- LOG.error(sw.toString());
+ LOG.error("Caught recovery thread exception for tmid: " + tmID + " retries: " + retryCount, de);
retryCount++;
if(retryCount > possibleRetries) {
@@ -1277,8 +1166,7 @@ public class HBaseTxClient {
try {
Thread.sleep(SLEEP_DELAY / possibleRetries);
- } catch(Exception se) {
- LOG.error(se);
+ } catch(InterruptedException se) {
}
}
}
@@ -1294,7 +1182,7 @@ public class HBaseTxClient {
// callRequestRegionInfo
// Purpose: Prepares HashMapArray class to get region information
//--------------------------------------------------------------------------------
- public HashMapArray callRequestRegionInfo() throws Exception {
+ public HashMapArray callRequestRegionInfo() throws IOException {
String tablename, encoded_region_name, region_name, is_offline, region_id, hostname, port, thn;
@@ -1308,7 +1196,6 @@ public class HBaseTxClient {
HashMapArray hm = new HashMapArray();
- try{
for(ConcurrentHashMap.Entry<Long, TransactionState> entry : mapTransactionStates.entrySet()){
key = entry.getKey();
value = entry.getValue();
@@ -1375,10 +1262,6 @@ public class HBaseTxClient {
tnum = tnum + 1;
}
- }catch(Exception e){
- if (LOG.isTraceEnabled()) LOG.trace("Error in getting region info. Map might be empty. Please ensure sqlci insert was done");
- }
-
if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient::callRequestRegionInfo:: end size: " + hm.getSize());
return hm;
}
|