pig-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Cheolsoo Park (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (PIG-2251) PIG leaks Zookeeper connections when using HBaseStorage
Date Wed, 02 Jan 2013 22:30:14 GMT

     [ https://issues.apache.org/jira/browse/PIG-2251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Cheolsoo Park updated PIG-2251:
-------------------------------

    Fix Version/s: 0.12
    
> PIG leaks Zookeeper connections when using HBaseStorage
> -------------------------------------------------------
>
>                 Key: PIG-2251
>                 URL: https://issues.apache.org/jira/browse/PIG-2251
>             Project: Pig
>          Issue Type: Bug
>    Affects Versions: 0.8.1, 0.9.0, 0.10.0, 0.11
>         Environment: PIG 0.9 branch
> HBase 0.90.3
> HDFS 0.20-append
>            Reporter: Vincent BARAT
>            Assignee: Jeff Markham
>             Fix For: 0.12
>
>         Attachments: PIG-2251.patch
>
>
> I run a set of PIG jobs from a Java process (using PigServer). Most of which use HBaseStorage
to load data from HBase.
> Each job is run using a new PigServer object, and I correctly call PigServer.shutdown()
when my pig server is no longer used.
> Nevertheless, after a few hours of run, I notice that the number of connections to my
Zookeeper servers reach the limit (300 in my case).
> It appears that each job leaks 4 or 5 Zookeeper connections.
> It was not the case with PIG 0.6.1 + HBase 0.20.6
> To solve this issue (temporarily) by killing the process running PIG after a few set
of jobs have been run : connections are correctly closed.
> My process don't use HBase by itself, only HBaseStorage, so I guess the leak is in the
code of HBaseStorage: maybe to cnx to HBase are not closed.
> All my request are simple request loading data from HBase, lik:
> {code}
>     pigServer.registerQuery("start_sessions = LOAD '"
>         + Analytics.getHBaseTableURL("startSession")
>         + "' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('meta:sid meta:infoid
meta:imei meta:timestamp') "
>         + "AS (sid:chararray, infoid:chararray, imei:chararray, start:long);");
>     pigServer.registerQuery("end_sessions = LOAD '"
>         + Analytics.getHBaseTableURL("endSession")
>         + "' USING org.apache.pig.backend.hadoop.hbase.HBaseStorage('meta:sid meta:timestamp
meta:locid') "
>         + "AS (sid:chararray, end:long, locid:chararray);");
>     pigServer.registerQuery("sessions = JOIN start_sessions BY sid, end_sessions BY sid;");
>     pigServer.store("sessions", Analytics.getOutputFilePath("sessions"), "BinStorage");
> {code}
> Code used to allocate a new PIG server:
> {code}
>   public static PigServer getNewPigServer() throws IOException
>   {
>     /* Get system properties */
>     Properties properties = new Properties();
>     /* Set specific Hadoop properties for PIG jobs */
>     properties.setProperty("mapred.child.java.opts", "-Xmx" + childMemory + "m");
>     /* Create PIG context */
>     PigContext context = new PigContext(local ? ExecType.LOCAL : ExecType.MAPREDUCE,
properties);
>     /* Create the PIG server */
>     PigServer pigServer = new PigServer(context);
>     /* Register our User Defined Functions (UDFs) */
>     pigServer.registerJar(pigUdfsPath);
>     /* Register shortcuts for our UDFs */
>     pigServer.registerFunction("GetActivitiesLengthsRanges", new FuncSpec(
>       "com.ubikod.ermin.analytics.pigudf.GetActivitiesLengthsRanges"));
>     pigServer.registerFunction("GetActivitiesLinks", new FuncSpec(
>       "com.ubikod.ermin.analytics.pigudf.GetActivitiesLinks"));
>     pigServer.registerFunction("GetActivitiesPeriodsAndLengths", new FuncSpec(
>       "com.ubikod.ermin.analytics.pigudf.GetActivitiesPeriodsAndLengths"));
>     pigServer.registerFunction("GetCountRange", new FuncSpec(
>       "com.ubikod.ermin.analytics.pigudf.GetCountRange"));
>     pigServer.registerFunction("GetAllPeriods", new FuncSpec(
>       "com.ubikod.ermin.analytics.pigudf.GetAllPeriods"));
>     pigServer.registerFunction("GetCountRangeLabel", new FuncSpec(
>       "com.ubikod.ermin.analytics.pigudf.GetCountRangeLabel"));
>     pigServer.registerFunction("GetCountsAndLengthsByName", new FuncSpec(
>       "com.ubikod.ermin.analytics.pigudf.GetCountsAndLengthsByName"));
>     pigServer.registerFunction("GetCountsByName", new FuncSpec(
>       "com.ubikod.ermin.analytics.pigudf.GetCountsByName"));
>     pigServer.registerFunction("GetDayPeriod", new FuncSpec(
>       "com.ubikod.ermin.analytics.pigudf.GetDayPeriod"));
>     pigServer.registerFunction("GetDayWeekMonthPeriods", new FuncSpec(
>       "com.ubikod.ermin.analytics.pigudf.GetDayWeekMonthPeriods"));
>     pigServer.registerFunction("GetLengthRange", new FuncSpec(
>       "com.ubikod.ermin.analytics.pigudf.GetLengthRange"));
>     pigServer.registerFunction("GetLengthRangeLabel", new FuncSpec(
>       "com.ubikod.ermin.analytics.pigudf.GetLengthRangeLabel"));
>     pigServer.registerFunction("GetPeriods", new FuncSpec(
>       "com.ubikod.ermin.analytics.pigudf.GetPeriods"));
>     pigServer.registerFunction("GetPeriodsAndLengths", new FuncSpec(
>       "com.ubikod.ermin.analytics.pigudf.GetPeriodsAndLengths"));
>     pigServer.registerFunction("NormalizeCarrierName", new FuncSpec(
>       "com.ubikod.ermin.analytics.pigudf.NormalizeCarrierName"));
>     pigServer.registerFunction("NormalizeCountryCode", new FuncSpec(
>       "com.ubikod.ermin.analytics.pigudf.NormalizeCountryCode"));
>     pigServer.registerFunction("NormalizeLocaleCode", new FuncSpec(
>       "com.ubikod.ermin.analytics.pigudf.NormalizeLocaleCode"));
>     pigServer.registerFunction("NormalizeNetworkType", new FuncSpec(
>       "com.ubikod.ermin.analytics.pigudf.NormalizeNetworkType"));
>     pigServer.registerFunction("NormalizeNetworkSubType", new FuncSpec(
>       "com.ubikod.ermin.analytics.pigudf.NormalizeNetworkSubType"));
>     pigServer.registerFunction("NormalizePhoneManufacturer", new FuncSpec(
>       "com.ubikod.ermin.analytics.pigudf.NormalizePhoneManufacturer"));
>     pigServer.registerFunction("NormalizePhoneModel", new FuncSpec(
>       "com.ubikod.ermin.analytics.pigudf.NormalizePhoneModel"));
>     pigServer.registerFunction("NormalizeString", new FuncSpec(
>       "com.ubikod.ermin.analytics.pigudf.NormalizeString"));
>     pigServer.registerFunction("SubString", new FuncSpec(
>       "com.ubikod.ermin.analytics.pigudf.SubString"));
>     pigServer.registerFunction("GuessCountryCode", new FuncSpec(
>       "com.ubikod.ermin.analytics.pigudf.GuessCountryCode"));
>     /* Return this new instance of PIG server */
>     return pigServer;
>   }
> {code}
> Code used when PIG server no longer used:
> {code}
>     pigServer.shutdown();
> {code}

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message