Return-Path: X-Original-To: apmail-chukwa-commits-archive@www.apache.org Delivered-To: apmail-chukwa-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 75CD610FA9 for ; Sun, 11 Jan 2015 19:57:45 +0000 (UTC) Received: (qmail 97541 invoked by uid 500); 11 Jan 2015 19:57:47 -0000 Delivered-To: apmail-chukwa-commits-archive@chukwa.apache.org Received: (qmail 97507 invoked by uid 500); 11 Jan 2015 19:57:47 -0000 Mailing-List: contact commits-help@chukwa.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@chukwa.apache.org Delivered-To: mailing list commits@chukwa.apache.org Received: (qmail 97495 invoked by uid 99); 11 Jan 2015 19:57:46 -0000 Received: from eris.apache.org (HELO hades.apache.org) (140.211.11.105) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 11 Jan 2015 19:57:46 +0000 Received: from hades.apache.org (localhost [127.0.0.1]) by hades.apache.org (ASF Mail Server at hades.apache.org) with ESMTP id 86BC1AC0143; Sun, 11 Jan 2015 19:57:46 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1650958 - in /chukwa/trunk: CHANGES.txt src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java Date: Sun, 11 Jan 2015 19:57:46 -0000 To: commits@chukwa.apache.org From: eyang@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20150111195746.86BC1AC0143@hades.apache.org> Author: eyang Date: Sun Jan 11 19:57:45 2015 New Revision: 1650958 URL: http://svn.apache.org/r1650958 Log: CHUKWA-723. Update Chukwa code to use new HBase HConnection API. (Sreepathi Prasanna via Eric Yang) Modified: chukwa/trunk/CHANGES.txt chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java Modified: chukwa/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/chukwa/trunk/CHANGES.txt?rev=1650958&r1=1650957&r2=1650958&view=diff ============================================================================== --- chukwa/trunk/CHANGES.txt (original) +++ chukwa/trunk/CHANGES.txt Sun Jan 11 19:57:45 2015 @@ -6,6 +6,8 @@ Trunk (unreleased changes) IMPROVEMENTS + CHUKWA-723. Update Chukwa code to use new HBase HConnection API. (Sreepathi Prasanna via Eric Yang) + BUGS Release 0.6 - 09/28/2014 Modified: chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java URL: http://svn.apache.org/viewvc/chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java?rev=1650958&r1=1650957&r2=1650958&view=diff ============================================================================== --- chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java (original) +++ chukwa/trunk/src/main/java/org/apache/hadoop/chukwa/datastore/ChukwaHBaseStore.java Sun Jan 11 19:57:45 2015 @@ -25,6 +25,8 @@ import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -33,14 +35,15 @@ import org.apache.hadoop.chukwa.hicc.bea import org.apache.hadoop.chukwa.hicc.bean.Heatmap; import org.apache.hadoop.chukwa.hicc.bean.Series; import org.apache.hadoop.chukwa.util.ExceptionUtil; - +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -52,7 +55,8 @@ import org.apache.log4j.Logger; public class ChukwaHBaseStore { private static Configuration hconf = HBaseConfiguration.create(); - private static HTablePool pool = new HTablePool(hconf, 60); + private static HConnection connection = null; + private static final int POOL_SIZE = 60; static Logger log = Logger.getLogger(ChukwaHBaseStore.class); public static Series getSeries(String tableName, String rkey, String family, String column, @@ -66,7 +70,7 @@ public class ChukwaHBaseStore { Series series = new Series(seriesName.toString()); try { - HTableInterface table = pool.getTable(tableName); + HTableInterface table = getHTableConnection().getTable(tableName); Calendar c = Calendar.getInstance(); c.setTimeInMillis(startTime); c.set(Calendar.MINUTE, 0); @@ -109,7 +113,7 @@ public class ChukwaHBaseStore { public static Set getFamilyNames(String tableName) { Set familyNames = new CopyOnWriteArraySet(); try { - HTableInterface table = pool.getTable(tableName); + HTableInterface table = getHTableConnection().getTable(tableName); Set families = table.getTableDescriptor().getFamiliesKeys(); for(byte[] name : families) { familyNames.add(new String(name)); @@ -139,9 +143,9 @@ public class ChukwaHBaseStore { public static void getColumnNamesHelper(SetcolumnNames, Iterator it) { Result result = it.next(); if(result!=null) { - List kvList = result.list(); - for(KeyValue kv : kvList) { - columnNames.add(new String(kv.getQualifier())); + List cList = result.listCells(); + for(Cell cell : cList) { + columnNames.add(new String(CellUtil.cloneQualifier(cell))); } } } @@ -149,7 +153,7 @@ public class ChukwaHBaseStore { public static Set getColumnNames(String tableName, String family, long startTime, long endTime, boolean fullScan) { Set columnNames = new CopyOnWriteArraySet(); try { - HTableInterface table = pool.getTable(tableName); + HTableInterface table = getHTableConnection().getTable(tableName); Scan scan = new Scan(); if(!fullScan) { // Take sample columns of the recent time. @@ -187,8 +191,8 @@ public class ChukwaHBaseStore { public static Set getRowNames(String tableName, String family, String qualifier, long startTime, long endTime, boolean fullScan) { Set rows = new HashSet(); - HTableInterface table = pool.getTable(tableName); try { + HTableInterface table = getHTableConnection().getTable(tableName); Scan scan = new Scan(); scan.addColumn(family.getBytes(), qualifier.getBytes()); if(!fullScan) { @@ -234,9 +238,9 @@ public class ChukwaHBaseStore { String family = "system"; String column = "ctags"; Set clusters = new HashSet(); - HTableInterface table = pool.getTable(tableName); Pattern p = Pattern.compile("\\s*cluster=\"(.*?)\""); try { + HTableInterface table = getHTableConnection().getTable(tableName); Scan scan = new Scan(); scan.addColumn(family.getBytes(), column.getBytes()); scan.setTimeRange(startTime, endTime); @@ -262,8 +266,9 @@ public class ChukwaHBaseStore { long startTime, long endTime, double max, double scale, int height) { final long MINUTE = TimeUnit.MINUTES.toMillis(1); Heatmap heatmap = new Heatmap(); - HTableInterface table = pool.getTable(tableName); + try { + HTableInterface table = getHTableConnection().getTable(tableName); Scan scan = new Scan(); ColumnPrefixFilter cpf = new ColumnPrefixFilter(column.getBytes()); scan.addFamily(family.getBytes()); @@ -278,13 +283,13 @@ public class ChukwaHBaseStore { HashMap keyMap = new HashMap(); while(it.hasNext()) { Result result = it.next(); - List kvList = result.list(); - for(KeyValue kv : kvList) { + List cList = result.listCells(); + for(Cell cell : cList) { String key = parseRowKey(result.getRow()); StringBuilder tmp = new StringBuilder(); tmp.append(key); tmp.append(":"); - tmp.append(new String(kv.getQualifier())); + tmp.append(new String(CellUtil.cloneQualifier(cell))); String seriesName = tmp.toString(); long time = parseTime(result.getRow()); // Time display in x axis @@ -296,7 +301,7 @@ public class ChukwaHBaseStore { y = index; index++; } - double v = Double.parseDouble(new String(kv.getValue())); + double v = Double.parseDouble(new String(CellUtil.cloneValue(cell))); heatmap.put(x, y, v); if(v > max) { max = v; @@ -333,5 +338,27 @@ public class ChukwaHBaseStore { long time = Long.parseLong(parts[0]); return time; } + + private static HConnection getHTableConnection() { + if(connection == null) { + synchronized(ChukwaHBaseStore.class) { + try { + ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE); + /* Set the hbase client properties to unblock immediately in case + * hbase goes down. This will ensure we timeout on socket connection to + * hbase early. + */ + hconf.setInt("hbase.client.operation.timeout", 60000); + hconf.setLong("hbase.client.pause", 1000); + hconf.setInt("hbase.client.retries.number", 1); + connection = HConnectionManager.createConnection(hconf, pool); + }catch(IOException e) { + log.error("Unable to obtain connection to HBase " + e.getMessage()); + e.printStackTrace(); + } + } + } + return connection; + } }