hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Apache Wiki <wikidi...@apache.org>
Subject [Hadoop Wiki] Update of "Hive/Tutorial" by StevenWong
Date Sat, 18 Jun 2011 01:22:53 GMT
Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Hadoop Wiki" for change notification.

The "Hive/Tutorial" page has been changed by StevenWong:
http://wiki.apache.org/hadoop/Hive/Tutorial?action=diff&rev1=37&rev2=38

  == What is Hive ==
  Hive is a data warehousing infrastructure based on the Hadoop. Hadoop provides massive scale
out and fault tolerance capabilities for data storage and processing (using the map-reduce
programming paradigm) on commodity hardware.
  
- Hive is designed to enable easy data summarization, ad-hoc querying and analysis of large
volumes of data. It provides a simple query language called Hive QL, which is based on SQL
and which enables users familiar with SQL to do ad-hoc querying, summarization and data analysis
easily. At the same time, Hive QL also allows traditional map/reduce programmers to be able
to plug in their custom mappers and reducers to do more sophisticated analysis that may not
be supported by the built-in capabilities of the language. 
+ Hive is designed to enable easy data summarization, ad-hoc querying and analysis of large
volumes of data. It provides a simple query language called Hive QL, which is based on SQL
and which enables users familiar with SQL to do ad-hoc querying, summarization and data analysis
easily. At the same time, Hive QL also allows traditional map/reduce programmers to be able
to plug in their custom mappers and reducers to do more sophisticated analysis that may not
be supported by the built-in capabilities of the language.
  
  == What is NOT Hive ==
  Hadoop is a batch processing system and Hadoop jobs tend to have high latency and incur
substantial overheads in job submission and scheduling. As a result - latency for Hive queries
is generally very high (minutes) even when data sets involved are very small (say a few hundred
megabytes). As a result it cannot be compared with systems such as Oracle where analyses are
conducted on a significantly smaller amount of data but the analyses proceed much more iteratively
with the response times between iterations being less than a few minutes. Hive aims to provide
acceptable (but not optimal) latency for interactive data browsing, queries over small data
sets or test queries.
@@ -55, +55 @@

      . |→DOUBLE
       . |→BIGINT
        . |→INT
-        . |→TINYINT     
+        . |→TINYINT
       . |→FLOAT
        . |→INT
         . |→TINYINT
     . |→STRING
     . |→BOOLEAN
  
- 
  This type hierarchy defines how the types are implicitly converted in the query language.
Implicit conversion is allowed for types from child to an ancestor. So when a query expression
expects type1 and the data is of type2 type2 is implicitly converted to type1 if type1 is
an ancestor of type2 in the type hierarchy. Apart from these fundamental rules for implicit
conversion based on type system, Hive also allows the special case for conversion:
  
   * STRING → DOUBLE
@@ -86, +85 @@

   * Relational Operators - The following operators compare the passed operands and generate
a TRUE or FALSE value depending on whether the comparison between the operands holds or not.
  
  ||'''Relational Operator''' ||'''Operand types''' ||'''Description''' ||
- || ''???'' ||||surely there are operators for equality and lack of equality? ||
+ ||''???'' ||||<style="text-align: center;">surely there are operators for equality
and lack of equality? ||
  ||A < B ||all primitive types ||TRUE if expression A is  less than expression B otherwise
FALSE ||
  ||A <= B ||all primitive types ||TRUE if expression A is less than or equal to expression
B otherwise FALSE ||
  ||A > B ||all primitive types ||TRUE if expression A is greater than expression B otherwise
FALSE ||
@@ -112, +111 @@

  ||~A ||all number types ||Gives the result of bitwise NOT of A. The type of the result is
the same as the type of A. ||
  
  
- 
- 
   * Logical Operators - The following operators provide support for creating logical expressions.
All of them return boolean TRUE or FALSE depending upon the boolean values of the operands.
  
  ||''' Logical Operators ''' ||'''Operands types''' ||'''Description''' ||
@@ -124, +121 @@

  ||NOT A ||boolean ||TRUE if A is FALSE, otherwise FALSE ||
  ||! A ||boolean ||Same as NOT A ||
  
+ 
  * Operators on Complex Types - The following operators provide mechanisms to access elements
in Complex Types
- 
  ||'''Operator''' ||'''Operand types''' ||'''Description''' ||
  ||A[n] ||A is an Array and n is an int ||returns the nth element in the array A. The first
element has index 0 e.g. if A is an array comprising of ['foo', 'bar'] then A[0] returns 'foo'
and A[1] returns 'bar' ||
  ||M[key] ||M is a Map<K, V> and key has type K ||returns the value corresponding to
the key in the map e.g. if M is a map comprising of {'f' -> 'foo', 'b' -> 'bar', 'all'
-> 'foobar'} then M['all'] returns 'foobar' ||
@@ -155, +152 @@

  ||string ||regexp_replace(string A, string B, string C) ||returns the string resulting from
replacing all substrings in B that match the Java regular expression syntax(See [[http://java.sun.com/j2se/1.4.2/docs/api/java/util/regex/Pattern.html|Java
regular expressions syntax]]) with C. For example, regexp_replace('foobar', 'oo<nowiki>|</nowiki>ar',
) returns 'fb' ||
  ||int ||size(Map<K.V>) ||returns the number of elements in the map type ||
  ||int ||size(Array<T>) ||returns the number of elements in the array type ||
- || <type> ||cast(expr as <type>) ||converts the results of the expression expr
to <type> e.g. cast('1' as BIGINT) will convert the string '1' to it integral representation.
A null is returned if the conversion does not succeed. ||
+ ||<type> ||cast(expr as <type>) ||converts the results of the expression expr
to <type> e.g. cast('1' as BIGINT) will convert the string '1' to it integral representation.
A null is returned if the conversion does not succeed. ||
  ||string ||from_unixtime(int unixtime) ||convert the number of seconds from unix epoch (1970-01-01
00:00:00 UTC) to a string representing the timestamp of that moment in the current system
time zone in the format of "1970-01-01 00:00:00" ||
  ||string ||to_date(string timestamp) ||Return the date part of a timestamp string: to_date("1970-01-01
00:00:00") = "1970-01-01" ||
  ||int ||year(string date) ||Return the year part of a date or a timestamp string: year("1970-01-01
00:00:00") = 1970, year("1970-01-01") = 1970 ||
@@ -164, +161 @@

  ||string ||get_json_object(string json_string, string path) ||Extract json object from a
json string based on json path specified, and return json string of the extracted json object.
It will return null if the input json string is invalid ||
  
  
- 
- 
  *The following built in aggregate functions are supported in Hive:
- 
  ||'''Return Type''' ||'''Aggregation Function Name (Signature)''' ||'''Description''' ||
  ||BIGINT ||count(*), count(expr), count(DISTINCT expr[, expr...]) ||count(*) - Returns the
total number of retrieved rows, including rows containing NULL values; count(expr) - Returns
the number of rows for which the supplied expression is non-NULL; count(DISTINCT expr[, expr])
- Returns the number of rows for which the supplied expression(s) are unique and non-NULL.
||
  ||DOUBLE ||sum(col), sum(DISTINCT col) ||returns the sum of the elements in the group or
the sum of the distinct values of the column in the group ||
@@ -431, +425 @@

  The first insert clause sends the results of the first group by to a Hive table while the
second one sends the results to a hadoop dfs files.
  
  == Dynamic-partition Insert ==
- 
- In the previous examples, the user has to know which partition to insert into and only one
partition can be inserted in one insert statement. If you want to load into multiple partitions,
you have to use multi-insert statement as illustrated below. 
+ In the previous examples, the user has to know which partition to insert into and only one
partition can be inserted in one insert statement. If you want to load into multiple partitions,
you have to use multi-insert statement as illustrated below.
+ 
  {{{
      FROM page_view_stg pvs
-     INSERT OVERWRITE TABLE page_view PARTITION(dt='2008-06-08', country='US') 
+     INSERT OVERWRITE TABLE page_view PARTITION(dt='2008-06-08', country='US')
             SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null, null,
pvs.ip WHERE pvs.country = 'US'
-     INSERT OVERWRITE TABLE page_view PARTITION(dt='2008-06-08', country='CA') 
+     INSERT OVERWRITE TABLE page_view PARTITION(dt='2008-06-08', country='CA')
             SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null, null,
pvs.ip WHERE pvs.country = 'CA'
-     INSERT OVERWRITE TABLE page_view PARTITION(dt='2008-06-08', country='UK') 
+     INSERT OVERWRITE TABLE page_view PARTITION(dt='2008-06-08', country='UK')
-            SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null, null,
pvs.ip WHERE pvs.country = 'UK';    
+            SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null, null,
pvs.ip WHERE pvs.country = 'UK';
  }}}
- 
- In order to load data into all country partitions in a particular day, you have to add an
insert statement for each country in the input data. This is very inconvenient since you have
to have the priori knowledge of the list of countries exist in the input data and create the
partitions beforehand. If the list changed for another day, you have to modify your insert
DML as well as the partition creation DDLs. It is also inefficient since each insert statement
may be turned into a MapReduce Job. 
+ In order to load data into all country partitions in a particular day, you have to add an
insert statement for each country in the input data. This is very inconvenient since you have
to have the priori knowledge of the list of countries exist in the input data and create the
partitions beforehand. If the list changed for another day, you have to modify your insert
DML as well as the partition creation DDLs. It is also inefficient since each insert statement
may be turned into a MapReduce Job.
  
  ''Dynamic-partition insert'' (or multi-partition insert) is designed to solve this problem
by dynamically determining which partitions should be created and populated while scanning
the input table. This is a newly added feature that is only available from version 0.6.0 (trunk
now). In the dynamic partition insert, the input column values are evaluated to determine
which partition this row should be inserted into. If that partition has not been created,
it will create that partition automatically. Using this feature you need only one insert statement
to create and populate all necessary partitions. In addition, since there is only one insert
statement, there is only one corresponding MapReduce job. This significantly improves performance
and reduce the Hadoop cluster workload comparing to the multiple insert case.
  
@@ -451, +444 @@

  
  {{{
      FROM page_view_stg pvs
-     INSERT OVERWRITE TABLE page_view PARTITION(dt='2008-06-08', country) 
+     INSERT OVERWRITE TABLE page_view PARTITION(dt='2008-06-08', country)
             SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null, null,
pvs.ip, pvs.country
  }}}
- 
- There are several syntactic differences from the multi-insert statement: 
+ There are several syntactic differences from the multi-insert statement:
+ 
-   * country appears in the PARTITION specification, but with no value associated. In this
case, country is a ''dynamic partition column''. On the other hand, ds has a value associated
with it, which means it is a ''static partition column''. If a column is dynamic partition
column, its value will be coming from the input column. Currently we only allow dynamic partition
columns to be the last column(s) in the partition clause because the partition column order
indicates its hierarchical order (meaning dt is the root partition, and country is the child
partition). You cannot specify a partition clause with (dt, country='US') because that means
you need to update all partitions with any date and its country sub-partition is 'US'. 
+  * country appears in the PARTITION specification, but with no value associated. In this
case, country is a ''dynamic partition column''. On the other hand, ds has a value associated
with it, which means it is a ''static partition column''. If a column is dynamic partition
column, its value will be coming from the input column. Currently we only allow dynamic partition
columns to be the last column(s) in the partition clause because the partition column order
indicates its hierarchical order (meaning dt is the root partition, and country is the child
partition). You cannot specify a partition clause with (dt, country='US') because that means
you need to update all partitions with any date and its country sub-partition is 'US'.
-   * An additional pvs.country column is added in the select statement. This is the corresponding
input column for the dynamic partition column. Note that you do not need to add an input column
for the static partition column because its value is already known in the PARTITION clause.
Note that the dynamic partition values are selected by ordering, not name, and taken as the
last columns from the select clause.
+  * An additional pvs.country column is added in the select statement. This is the corresponding
input column for the dynamic partition column. Note that you do not need to add an input column
for the static partition column because its value is already known in the PARTITION clause.
Note that the dynamic partition values are selected by ordering, not name, and taken as the
last columns from the select clause.
  
  Semantics of the dynamic partition insert statement:
+ 
-   * When there are already non-empty partitions exists for the dynamic partition columns,
(e.g., country='CA' exists under some ds root partition), it will be overwritten if the dynamic
partition insert saw the same value (say 'CA') in the input data. This is in line with the
'insert overwrite' semantics. However, if the partition value 'CA' does not appear in the
input data, the existing partition will not be overwritten. 
+  * When there are already non-empty partitions exists for the dynamic partition columns,
(e.g., country='CA' exists under some ds root partition), it will be overwritten if the dynamic
partition insert saw the same value (say 'CA') in the input data. This is in line with the
'insert overwrite' semantics. However, if the partition value 'CA' does not appear in the
input data, the existing partition will not be overwritten.
-   * Since a Hive partition corresponds to a directory in HDFS, the partition value has to
conform to the HDFS path format (URI in Java). Any character having a special meaning in URI
(e.g., '%', ':', '/', '#') will be escaped with '%' followed by 2 bytes of its ASCII value.
 
+  * Since a Hive partition corresponds to a directory in HDFS, the partition value has to
conform to the HDFS path format (URI in Java). Any character having a special meaning in URI
(e.g., '%', ':', '/', '#') will be escaped with '%' followed by 2 bytes of its ASCII value.
-   * If the input column is a type different than STRING, its value will be first converted
to STRING to be used to construct the HDFS path. 
+  * If the input column is a type different than STRING, its value will be first converted
to STRING to be used to construct the HDFS path.
-   * If the input column value is NULL or empty string, the row will be put into a special
partition, whose name is controlled by the hive parameter hive.exec.default.partition.name.
The default value is `__HIVE_DEFAULT_PARTITION__`. Basically this partition will contain all
"bad" rows whose value are not valid partition names. The caveat of this approach is that
the bad value will be lost and is replaced by `__HIVE_DEFAULT_PARTITION__` if you select them
Hive. JIRA HIVE-1309 is a solution to let user specify "bad file" to retain the input partition
column values as well.
+  * If the input column value is NULL or empty string, the row will be put into a special
partition, whose name is controlled by the hive parameter hive.exec.default.partition.name.
The default value is `__HIVE_DEFAULT_PARTITION__`. Basically this partition will contain all
"bad" rows whose value are not valid partition names. The caveat of this approach is that
the bad value will be lost and is replaced by `__HIVE_DEFAULT_PARTITION__` if you select them
Hive. JIRA HIVE-1309 is a solution to let user specify "bad file" to retain the input partition
column values as well.
-   * Dynamic partition insert could potentially resource hog in that it could generate a
large number of partitions in a short time. To get yourself buckled, we define three parameters:
+  * Dynamic partition insert could potentially resource hog in that it could generate a large
number of partitions in a short time. To get yourself buckled, we define three parameters:
-     * '''hive.exec.max.dynamic.partitions.pernode''' (default value being 100) is the maximum
dynamic partitions that can be created by each mapper or reducer. If one mapper or reducer
created more than that the threshold, a fatal error will be raised from the mapper/reducer
(through counter) and the whole job will be killed. 
+   * '''hive.exec.max.dynamic.partitions.pernode''' (default value being 100) is the maximum
dynamic partitions that can be created by each mapper or reducer. If one mapper or reducer
created more than that the threshold, a fatal error will be raised from the mapper/reducer
(through counter) and the whole job will be killed.
-     * '''hive.exec.max.dynamic.partitions''' (default value being 1000) is the total number
of dynamic partitions could be created by one DML. If each mapper/reducer did not exceed the
limit but the total number of dynamic partitions does, then an exception is raised at the
end of the job before the intermediate data are moved to the final destination.
+   * '''hive.exec.max.dynamic.partitions''' (default value being 1000) is the total number
of dynamic partitions could be created by one DML. If each mapper/reducer did not exceed the
limit but the total number of dynamic partitions does, then an exception is raised at the
end of the job before the intermediate data are moved to the final destination.
-     * '''hive.max.created.files''' (default value being 100000) is the maximum total number
of files created by all mappers and reducers. This is implemented by updating a Hadoop counter
by each mapper/reducer whenever a new file is created. If the total number is exceeding hive.max.created.files,
a fatal error will be thrown and the job will be killed. 
+   * '''hive.max.created.files''' (default value being 100000) is the maximum total number
of files created by all mappers and reducers. This is implemented by updating a Hadoop counter
by each mapper/reducer whenever a new file is created. If the total number is exceeding hive.max.created.files,
a fatal error will be thrown and the job will be killed.
-  
+ 
-   * Another situation we want to protect against dynamic partition insert is that the user
may accidentally specify all partitions to be dynamic partitions without specifying one static
partition, while the original intention is to just overwrite the sub-partitions of one root
partition. We define another parameter hive.exec.dynamic.partition.mode=strict to prevent
the all-dynamic partition case. In the strict mode, you have to specify at least one static
partition. The default mode is strict. In addition, we have a parameter hive.exec.dynamic.partition=true/false
to control whether to allow dynamic partition at all. The default value is false. 
+  * Another situation we want to protect against dynamic partition insert is that the user
may accidentally specify all partitions to be dynamic partitions without specifying one static
partition, while the original intention is to just overwrite the sub-partitions of one root
partition. We define another parameter hive.exec.dynamic.partition.mode=strict to prevent
the all-dynamic partition case. In the strict mode, you have to specify at least one static
partition. The default mode is strict. In addition, we have a parameter hive.exec.dynamic.partition=true/false
to control whether to allow dynamic partition at all. The default value is false.
-   * In Hive 0.6, dynamic partition insert does not work with hive.merge.mapfiles=true or
hive.merge.mapredfiles=true, so it internally turns off the merge parameters. Merging files
in dynamic partition inserts are supported in Hive 0.7 (see JIRA HIVE-1307 for details). 
+  * In Hive 0.6, dynamic partition insert does not work with hive.merge.mapfiles=true or
hive.merge.mapredfiles=true, so it internally turns off the merge parameters. Merging files
in dynamic partition inserts are supported in Hive 0.7 (see JIRA HIVE-1307 for details).
  
  Troubleshooting and best practices:
+ 
-   * As stated above, there are too many dynamic partitions created by a particular mapper/reducer,
a fatal error could be raised and the job will be killed. The error message looks something
like: {{{
+  * As stated above, there are too many dynamic partitions created by a particular mapper/reducer,
a fatal error could be raised and the job will be killed. The error message looks something
like:
+  {{{
      hive> set hive.exec.dynamic.partition.mode=nonstrict;
      hive> FROM page_view_stg pvs
-           INSERT OVERWRITE TABLE page_view PARTITION(dt, country) 
+           INSERT OVERWRITE TABLE page_view PARTITION(dt, country)
-                  SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null,
null, pvs.ip, 
+                  SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null,
null, pvs.ip,
                          from_unixtimestamp(pvs.viewTime, 'yyyy-MM-dd') ds, pvs.country;
  ...
  2010-05-07 11:10:19,816 Stage-1 map = 0%,  reduce = 0%
  [Fatal Error] Operator FS_28 (id=41): fatal error. Killing the job.
  Ended Job = job_201005052204_28178 with errors
+ ...
+ }}}
- ...}}} The problem of this that one mapper will take a random set of rows and it is very
likely that the number of distinct (dt, country) pairs will exceed the limit of hive.exec.max.dynamic.partitions.pernode.
 One way around it is to group the rows by the dynamic partition columns in the mapper and
distribute them to the reducers where the dynamic partitions will be created. In this case
the number of distinct dynamic partitions will be significantly reduced. The above example
query could be rewritten to: {{{
+  The problem of this that one mapper will take a random set of rows and it is very likely
that the number of distinct (dt, country) pairs will exceed the limit of hive.exec.max.dynamic.partitions.pernode.
 One way around it is to group the rows by the dynamic partition columns in the mapper and
distribute them to the reducers where the dynamic partitions will be created. In this case
the number of distinct dynamic partitions will be significantly reduced. The above example
query could be rewritten to:
+  {{{
      hive> set hive.exec.dynamic.partition.mode=nonstrict;
      hive> FROM page_view_stg pvs
-           INSERT OVERWRITE TABLE page_view PARTITION(dt, country) 
+           INSERT OVERWRITE TABLE page_view PARTITION(dt, country)
-                  SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null,
null, pvs.ip, 
+                  SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null,
null, pvs.ip,
-                         from_unixtimestamp(pvs.viewTime, 'yyyy-MM-dd') ds, pvs.country 
+                         from_unixtimestamp(pvs.viewTime, 'yyyy-MM-dd') ds, pvs.country
                   DISTRIBUTE BY ds, country;
+ }}}
- }}} This query will generate a MapReduce job rather than Map-only job. The SELECT-clause
will be converted to a plan to the mappers and the output will be distributed to the reducers
based on the value of (ds, country) pairs. The INSERT-clause will be converted to the plan
in the reducer which writes to the dynamic partitions. 
+  This query will generate a MapReduce job rather than Map-only job. The SELECT-clause will
be converted to a plan to the mappers and the output will be distributed to the reducers based
on the value of (ds, country) pairs. The INSERT-clause will be converted to the plan in the
reducer which writes to the dynamic partitions.
  
  == Inserting into local files ==
  In certain situations you would want to write the output into a local file so that you could
load it into an excel spreadsheet. This can be accomplished with the following command:
@@ -560, +560 @@

     SELECT pv.userid, size(pv.friends)
     FROM page_view pv;
  }}}
- 
- 
  == Map(Associative Arrays) Operations ==
  Maps provide collections similar to associative arrays. Such structures can only be created
programmatically currently. We will be extending this soon. For the purpose of the current
example assume that pv.properties is of the type map<String, String> i.e. it is an associative
array from strings to string. Accordingly, the following query:
  

Mime
View raw message