Hive 随笔
Hive 是一个数据仓库软件,支持使用 SQL 读取、写入、管理分布存储中的大型数据集。可以将结构投影到已经存储的数据上,为用户提供了命令行工具和 JDBC 驱动程序连接 Hive。
Hive 是一个基于 Apache Hadoop 的数据仓库基础设施,Hadoop 为在普通硬件上存储和计算提供了大规模扩展和容错能力。
Hive 的设计目的是支持简单数据摘要、特别查询、分析大量数据。它提供 SQL使用户方便的进行特别查询、汇总和数据分析,同时 Hive 为用户提供位置集成自己的功能进行自定义分析,例如用户自定义函数(UDFs)。
快速开始
MapReduce 模式与 Local 模式
对于大多数查询,Hive 编译器生成 MapReduce job,Job 会被提交到变量(mapred.job.tracker)指定的 MapReduce 集群。
这种情况下,job 通过被提交到具有多个节点的 MapReduce 集群,除此之外 Hadoop 还提供了一个选项,用户可以在 workstation 上本地运行 MapReduce 任务。这对于小型数据集的查询非常有用,因为对于小型数据在本地模型执行通常比向大型集群提交作业快得多。本地模式仅运行一个 Reducer。
本地模式开启命令:hive> SET mapreduce.framework.name=local;
DDL
创建表:
hive> CREATE TABLE pokes (foo INT, bar STRING);
hive> CREATE TABLE invites (foo INT, bar STRING) PARTITIONED BY (ds STRING);
创建一张表,拥有两个 column 和一个 partition column,其中 partition column 是一个虚拟 column,它不是数据本身的一部分,而是来自于装载特定数据集的 partition。
默认情况下,表被假设为文本输入模式,分隔符为^A(ctrl-a)
。
浏览表
hive> SHOW TABLES;
hive> SHOW TABLES '.*s';
列出所有表或者列出以 ’s‘ 为后缀的表。
hive> DESCRIBE invites;
列出 columns。
更改、删除表
改变表名、增加或者改变 column
hive> ALTER TABLE events RENAME TO 3koobecaf;
hive> ALTER TABLE pokes ADD COLUMNS (new_col INT);
hive> ALTER TABLE invites ADD COLUMNS (new_col2 INT COMMENT 'a comment');
hive> ALTER TABLE invites REPLACE COLUMNS (foo INT, bar STRING, baz INT COMMENT 'baz replaces new_col2');
REPLACE COLUMNS 替换所有存在的 columns,并且只更改了表的 schema,不改变数据。当然也可以用来删除 column。
hive> ALTER TABLE invites REPLACE COLUMNS (foo INT COMMENT 'only keep the first column');
删除表:
hive> DROP TABLE pokes;
metadata 存储
metadata 位于嵌入式 Derby 数据库中,磁盘存储位置由变量 javax.jdo.option.ConnectionURL
决定,默认值为./metastore_db
(see conf/hive-default.xml
).
目前默认情况下,metadata 在某一时刻仅能由一名用户查看。
Metadata 可以被支持 JPOX 的任何数据库存储。RDBMS 的位置和类型可以由两个变量控制:javax.jdo.option.ConnectionURL
and javax.jdo.option.ConnectionDriverName
。
DML
加载文件到 Hive:
hive> LOAD DATA LOCAL INPATH './examples/files/kv1.txt' OVERWRITE INTO TABLE pokes;
将会加载带有两个 column 的以 ctrl-a 作为分隔符文件到 Hive 中,’LOCAL‘ 表示输入文件在本地文件系统上,如果没有 ’LOCAL‘ ,则会到 HDFS 去寻找。
关键字 ‘OVERWRITE’ 表示已经存在于表中的数据被删除,如果 ‘OVERWRITE’ 被省略,数据将会以 append 的方式增加到 Hive 表中。
LOAD 命令是不对数据进行验证的。如果文件在 HDFS 中,则会被移动到 Hive 控制的文件系统命名空间中。
hive> LOAD DATA LOCAL INPATH './examples/files/kv2.txt' OVERWRITE INTO TABLE invites PARTITION (ds='2008-08-15');
hive> LOAD DATA LOCAL INPATH './examples/files/kv3.txt' OVERWRITE INTO TABLE invites PARTITION (ds='2008-08-08');
上述两条命令将文件加载到 Hive 中不同的 partition 中。但是只有在表以 partition 的方式被创建时才会成功。
hive> LOAD DATA INPATH '/user/myname/kv2.txt' OVERWRITE INTO TABLE invites PARTITION (ds='2008-08-15');
上述命令会将 HDFS 中的文件内容加载到 HDFS 中。由于从 HDFS 中 load 文件会导致文件的移动,因此操作是瞬间完成的。
SQL
SELECTS and FILTERS
hive> SELECT a.foo FROM invites a WHERE a.ds='2008-08-15';
结果不存储在任何地方,仅显示在 console 中。
hive> INSERT OVERWRITE DIRECTORY '/tmp/hdfs_out' SELECT a.* FROM invites a WHERE a.ds='2008-08-15';
将查询的结果写入到 HDFS 目录中。结果存储在该目录的文件中,文件多少由 mapper 数量决定。
partition columns 也通过 * 被指定。
partitioned table 在查询时必须在 where 子句中选择分区。
hive> INSERT OVERWRITE LOCAL DIRECTORY '/tmp/local_out' SELECT a.* FROM pokes a;
获取表中的所有记录写入到本地文件夹中。
INSERT 有三种,分别为:
- TABLE:数据表中
- LOCAL DIRECTORY:本地文件夹
- DIRECTORY:HDFS 文件夹
hive> INSERT OVERWRITE TABLE events SELECT a.* FROM profiles a;
hive> INSERT OVERWRITE TABLE events SELECT a.* FROM profiles a WHERE a.key < 100;
hive> INSERT OVERWRITE LOCAL DIRECTORY '/tmp/reg_3' SELECT a.* FROM events a;
hive> INSERT OVERWRITE DIRECTORY '/tmp/reg_4' select a.invites, a.pokes FROM profiles a;
hive> INSERT OVERWRITE DIRECTORY '/tmp/reg_5' SELECT COUNT(*) FROM invites a WHERE a.ds='2008-08-15';
hive> INSERT OVERWRITE DIRECTORY '/tmp/reg_5' SELECT a.foo, a.bar FROM invites a;
hive> INSERT OVERWRITE LOCAL DIRECTORY '/tmp/sum' SELECT SUM(a.pc) FROM pc1 a;
可以使用 sum、avg、min、max 等方法。
GROUP BY
hive> FROM invites a INSERT OVERWRITE TABLE events SELECT a.bar, count(*) WHERE a.foo > 0 GROUP BY a.bar;
hive> INSERT OVERWRITE TABLE events SELECT a.bar, count(*) FROM invites a WHERE a.foo > 0 GROUP BY a.bar;
JOIN
hive> FROM pokes t1 JOIN invites t2 ON (t1.bar = t2.bar) INSERT OVERWRITE TABLE events SELECT t1.bar, t1.foo, t2.foo;
MULTITABLE INSERT
FROM src
INSERT OVERWRITE TABLE dest1 SELECT src.* WHERE src.key < 100
INSERT OVERWRITE TABLE dest2 SELECT src.key, src.value WHERE src.key >= 100 and src.key < 200
INSERT OVERWRITE TABLE dest3 PARTITION(ds='2008-04-08', hr='12') SELECT src.key WHERE src.key >= 200 and src.key < 300
INSERT OVERWRITE LOCAL DIRECTORY '/tmp/dest4.out' SELECT src.value WHERE src.key >= 300;
STREAMING
hive> FROM invites a INSERT OVERWRITE TABLE events SELECT TRANSFORM(a.foo, a.bar) AS (oof, rab) USING '/bin/cat' WHERE a.ds > '2008-08-09';
这将在 map 阶段通过脚本 ‘/bin/cat’ 流化数据,流也可以在 Reduce 过程使用。
Case
MovieLens User Ratings
创建表
CREATE TABLE u_data (
userid INT,
movieid INT,
rating INT,
unixtime STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;
load 本地数据
LOAD DATA LOCAL INPATH '<path>/u.data'
OVERWRITE INTO TABLE u_data;
获取数据行数
SELECT COUNT(*) FROM u_data;
创建 weekday_mapper.py
import sys
import datetime
for line in sys.stdin:
line = line.strip()
userid, movieid, rating, unixtime = line.split('\t')
weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
print '\t'.join([userid, movieid, rating, str(weekday)])
使用 script
CREATE TABLE u_data_new (
userid INT,
movieid INT,
rating INT,
weekday INT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';
add FILE weekday_mapper.py;
INSERT OVERWRITE TABLE u_data_new
SELECT
TRANSFORM (userid, movieid, rating, unixtime)
USING 'python weekday_mapper.py'
AS (userid, movieid, rating, weekday)
FROM u_data;
SELECT weekday, COUNT(*)
FROM u_data_new
GROUP BY weekday;
Apache Weblog Data
CREATE TABLE apachelog (
host STRING,
identity STRING,
user STRING,
time STRING,
request STRING,
status STRING,
size STRING,
referer STRING,
agent STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
"input.regex" = "([^]*) ([^]*) ([^]*) (-|\\[^\\]*\\]) ([^ \"]*|\"[^\"]*\") (-|[0-9]*) (-|[0-9]*)(?: ([^ \"]*|\".*\") ([^ \"]*|\".*\"))?"
)
STORED AS TEXTFILE;
概念
数据单元
根据粒度从大到小,Hive 数据组织为:
Databases
命名空间的功能,以避免表、视图、分区、列名的冲突,还可以用来加强用户或用户组的安全性。
Tables
具有相同 schema 的数据单元
Partitions
每个 table 可以有一个或多个 partition key,这决定了数据的存储方式。partition 还可以用来标识满足指定条件的行,partition column 是虚拟的 column,不是数据本身的一部分,是在 load 时生成的。
Buckets (or Clusters)
每个 partition 中的数据可以根据某些 column 中的哈希值划分为不同的 bucket。划分为 bucket 的列不能是 partition column,通过 bucket 可以有效率的采样数据。
数据类型
关键类型
整数
- TINYINT—1 byte integer
- SMALLINT—2 byte integer
- INT—4 byte integer
- BIGINT—8 byte integer
布尔
- BOOLEAN—TRUE/FALSE
浮点数
- FLOAT—single precision
- DOUBLE—Double precision
定点数
- DECIMAL—a fixed point value of user defined scale and precision
字符串
- STRING—sequence of characters in a specified character set
- VARCHAR—sequence of characters in a specified character set with a maximum length
- CHAR—sequence of characters in a specified character set with a defined length
日期/时间
- TIMESTAMP — A date and time without a timezone (“LocalDateTime” semantics)
- TIMESTAMP WITH LOCAL TIME ZONE — A point in time measured down to nanoseconds (“Instant” semantics)
- DATE—a date
二进制
- BINARY—a sequence of bytes
类型层次结构
- Type
- Primitive Type
- Number
- DOUBLE
- FLOAT
- BIGINT
- INT
- SMALLINT
- TINYINT
- SMALLINT
- INT
- BIGINT
- STRING
- FLOAT
- DOUBLE
- BOOLEAN
- Number
- Primitive Type
上述类型层次结构定义了如何在查询语言中隐式转换类型,允许子类型到父类型的隐式转换。其中特别注意,STRING 类型可以隐式转换成 DORBLE 类型
除此之外可以使用 cast 关键字进行显示类型转换。
复合类型
复杂类型可以通过基本类型和其他复合类型构建。
Structs
可以通过 . 访问类型中的元素,例如 STRUCT {a INT; b INT},可以通过 c.a 访问 a。
Maps (key-value tuples)
可以通过 [‘element name’] 访问类型中的元素
Arrays (indexable lists)
数组中的元素必须是相同类型的,可以通过索引访问元素。
内置运算符和函数
操作符
关系操作符
常见关系操作符:
=、!=、<、>、>=、<=、IS NULL、IS NOT NULL、
特殊关系操作符:
-
LIKE:SQL simple regular expression
使用 SQL 简单正则表达式,比较是逐个字符进行的。
_
用来代替任意字符(类似于.
);%
匹配任意数量的字符(类似于.*
); -
RLIKE:Java regular expression
-
REGEXP:与 RLIKE 一样
算数运算符
常见算数运算符
+、-、*、/、%、
特殊算数运算符
- &:按位与操作
-
:按位或操作 - ^:按位异或操作
- ~:按位非操作
逻辑运算符
AND = &&
OR = |
NOT = !
内置函数
参见源文档
查询与插入数据
简单查询
SELECT user.*
FROM user
WHERE user.active = 1;
基于分区的查询
INSERT OVERWRITE TABLE xyz_com_page_views
SELECT page_views.*
FROM page_views
WHERE page_views.date >= '2008-03-01' AND page_views.date <= '2008-03-31' AND
page_views.referrer_url like '%xyz.com';
Join
INSERT OVERWRITE TABLE pv_users
SELECT pv.*, u.gender, u.age
FROM user u JOIN page_view pv ON (pv.userid = u.id)
WHERE pv.date = '2008-03-03';
INSERT OVERWRITE TABLE pv_users
SELECT pv.*, u.gender, u.age
FROM user u FULL OUTER JOIN page_view pv ON (pv.userid = u.id)
WHERE pv.date = '2008-03-03';
INSERT OVERWRITE TABLE pv_users
SELECT u.*
FROM user u LEFT SEMI JOIN page_view pv ON (pv.userid = u.id)
WHERE pv.date = '2008-03-03';
INSERT OVERWRITE TABLE pv_friends
SELECT pv.*, u.gender, u.age, f.friends
FROM page_view pv JOIN user u ON (pv.userid = u.id) JOIN friend_list f ON (u.id = f.uid)
WHERE pv.date = '2008-03-03';
聚合
INSERT OVERWRITE TABLE pv_gender_sum
SELECT pv_users.gender, count (DISTINCT pv_users.userid)
FROM pv_users
GROUP BY pv_users.gender;
INSERT OVERWRITE TABLE pv_gender_agg
SELECT pv_users.gender, count(DISTINCT pv_users.userid), count(*), sum(DISTINCT pv_users.userid)
FROM pv_users
GROUP BY pv_users.gender;
其中下面的是不允许的:
INSERT OVERWRITE TABLE pv_gender_agg
SELECT pv_users.gender, count(DISTINCT pv_users.userid), count(DISTINCT pv_users.ip)
FROM pv_users
GROUP BY pv_users.gender;
多表插入
FROM pv_users
INSERT OVERWRITE TABLE pv_gender_sum
SELECT pv_users.gender, count_distinct(pv_users.userid)
GROUP BY pv_users.gender
INSERT OVERWRITE DIRECTORY '/user/data/tmp/pv_age_sum'
SELECT pv_users.age, count_distinct(pv_users.userid)
GROUP BY pv_users.age;
动态 partition 插入
FROM page_view_stg pvs
INSERT OVERWRITE TABLE page_view PARTITION(dt='2008-06-08', country='US')
SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null, null, pvs.ip WHERE pvs.country = 'US'
INSERT OVERWRITE TABLE page_view PARTITION(dt='2008-06-08', country='CA')
SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null, null, pvs.ip WHERE pvs.country = 'CA'
INSERT OVERWRITE TABLE page_view PARTITION(dt='2008-06-08', country='UK')
SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null, null, pvs.ip WHERE pvs.country = 'UK';
插入本地文件
INSERT OVERWRITE LOCAL DIRECTORY '/tmp/pv_gender_sum'
SELECT pv_gender_sum.*
FROM pv_gender_sum;
抽样
INSERT OVERWRITE TABLE pv_gender_sum_sample
SELECT pv_gender_sum.*
FROM pv_gender_sum TABLESAMPLE(BUCKET 3 OUT OF 32);
TABLESAMPLE(BUCKET x OUT OF y)
y 必须是 buckets 数量的乘数或者除数
Union All
INSERT OVERWRITE TABLE actions_users
SELECT u.id, actions.date
FROM (
SELECT av.uid AS uid
FROM action_video av
WHERE av.date = '2008-06-03'
UNION ALL
SELECT ac.uid AS uid
FROM action_comment ac
WHERE ac.date = '2008-06-03'
) actions JOIN users u ON(u.id = actions.uid);