Stay hungry, Stay foolish

0%

Pig编程实践

简介

Hadoop这个名词的火起让不少人恨不得马上安装好环境小试身手一番,虽然Map/Reduce应用程序并不十分复杂,但是仍然需要一些开发经验并且开发效率不高,Pig就是在这样的一种情况下诞生的,作为Hadoop生态环境比较高的一层,其由Pig Latin和运行Pig Latin程序的环境组成,提供易于编程、自动优化、可扩展等特性以提高开发效率,如:Twitter数据分析的80%以上的代码都是由Pig产生

背景

笔者使用Pig也仅仅数月。以移动APP应用的分析统计为例写下这篇入门级的实例,仅为了对阶段性的工作/学习做一个总结。如果能让刚接触Pig的同学从中受益或者能得到高手的指点,则荣幸之至

开胃菜

先以一个简单的程序来开始吧——统计每天的启动次数

1
2
3
4
5
6
7
8
9
start_log = load 'yidong.event_start' USING org.apache.hcatalog.pig.HCatLoader();
day_start_log = filter start_log by dateline == '$date';

day_start_d = foreach day_start_log generate appid,channel,platform,1 as start_num;
start_g = group day_start_d by (appid,channel,platform);
start_gd = foreach start_g generate FLATTEN(group),SUM(day_start_d.start_num) as start_num;
start_save_d = foreach start_gd generate appid,channel,platform,start_num;

STORE start_save_d INTO '/tmp/start_result' using PigStorage('\t');

“麻雀虽小,五脏俱全”。这个程序展示了Pig程序的开发流程:获取数据->分析统计->存储结果

获取数据

  • 直接读取文件

直接LOAD原始文件(HDFS或者Hive表在HDFS中的),用美元符号($)加列序号(0,1,2,…)来获取对应的列数据

1
2
start_log = load '/user/hive/warehouse/yidong.db/event_start/dateline=$date'
day_start_d = foreach start_log generate $1 as appid,$2 as channel;

$date是传递的参数

  • HCatLoader

这种方式适合于读取Hive中的数据,会自动产生字段及数据的映射

1
start_log = load 'yidong.event_start' USING org.apache.hcatalog.pig.HCatLoader();

直接用HCatLoader获取的是所有数据的meta,可以用filter来过滤,如

1
day_start_log = filter start_log by dateline == '$date';

类型及转换

Pig提供以下9种数据类型:

  • 标量
    • int
    • long
    • float
    • double
  • 数组
    • chararray - 字符数组
    • bytearray - 字节数组
  • 复杂数据类型
    • tuple 有序字段集((1,3,2))
    • bag 元组集合({(1,3),(4,5),(9,8)})
    • map 键值对([open#apache])

在获取的过程中或之后,可以对数据进行类型转换,如:

1
2
3
start_log = load '/user/hive/warehouse/yidong.db/event_start/dateline=$date' as (appid:chararray,channel:chararray,userid:chararray);

day_start_d = foreach start_log generate appid,channle,(long)userid;

使用load … as格式必须指定所有的字段,这里为了演示方便只写了三个

分析统计

  • group

通过group对我们要处理的数据进行归类,比如按产品ID、渠道、平台这三个维度来进行统计

1
start_g = group day_start_d  by (appid,channel,platform);

每一次变换都会生成一个关系(relation),可以通过describe打印出关系的结构(describe start_g;)

1
start_g: {group: (appid: chararray,channel: chararray,platform: chararray),day_start_d: {(ngtime: chararray,appid: chararray,channel: chararray,platform: chararray,start_num: int)}}

通过dump打印出关系中的数据(dump start_g;)

贴出两条来看一下

1
2
((03c79c5a227e41584e14d4415eba42f3,2ac848933b7c0d407dde8e2d00d953e0,iOS),{(2014-01-06 17:43:58,03c79c5a227e41584e14d4415eba42f3,2ac848933b7c0d407dde8e2d00d953e0,iOS,1)})
((03c79c5a227e41584e14d4415eba42f3,2ac848933b7c0d407dde8e2d00d953e0,android),{(2014-01-06 14:57:55,03c79c5a227e41584e14d4415eba42f3,2ac848933b7c0d407dde8e2d00d953e0,android,1)})

我经常把它写成dump(start_gd);,T_T!

  • foreach … generate

foreach可以对数据进行遍历,在前面已经看到过这样的例子了。在group操作之后,foreach可以嵌套其他的词法对分组内的数据进行处理,比如order、limit等

对于上面的例子,如果需要统计UV,就需要得到每个分组中的唯一用户的集合

1
2
3
4
5
start_g = group day_start_d  by (appid,channel,platform);
start_gd = foreach start_g {
user_set = DISTINCT day_start_d.userid;
generate FLATTEN(group),COUNT(user_set) as start_uv,SUM(day_start_d.start_num) as start_num;
}

COUNT()、SUM()等内部函数的作用这里就不啰嗦了,唯一一点注意的是这些函数必须是大写,不然会报Could not resolve xxx(函数名) using imports错误

PS:Pig的关键字是非大小写敏感的(group和GROUP是一样的),BIF与UDF是大小写敏感的

  • FALTTEN()

进行分组之后,在关系中会有一个group的序列,包含分组的字段,对这些字段的访问需要先通过FLATTEN()来进行“展平”

如果group里只有一个字段,FLATTEN之后,直接用group获取该字段即可

1
2
3
start_g = group day_start_d by appid;
start_gd = foreach start_g generate FLATTEN(group),SUM(day_start_d.start_num) as start_num;
save_d = foreach start_gd generate group as appid,start_num;

如果是多个字段,就要用group::相应的字段名来获取

1
save_d = foreach start_gd generate group::appid as appid,group::channel as channel,group::platform as platform,start_num;
  • order、limit

继续增加需求——按服务器的时间(ngtime)取出每个分组中前十个访问的用户

1
2
3
4
5
6
start_g = group day_start_d  by (appid,channel,platform);
start_gd = foreach start_g {
order_d = order day_start_d by ngtime asc;
limit_d = limit order_d 10;
generate FLATTEN(group),FLATTEN(limit_d);
}

order可以放到外层,效果是一样的

1
2
3
4
5
6
order_d = order day_start_d by ngtime asc;
start_g = group order_d by (appid,channel,platform)
start_gd = foreach start_g {
limit_d = limit day_start_d 10;
generate FLATTEN(group),FLATTEN(limit_d);
}

但是limit放到外层的话,效果就不一样了——内层表示每个group中取十条,而外层则是从全集中取十条
limit放到外层的应用场景通常是与dump配合输出少量的数据进行调试(更高级的可查看illustrateexample)

  • 三元运算符

求启动次数的等级

1
level_d = foreach start_gd generate group::appid as appid,group::channel as channel,group::platform as platform,((start_num <= 100 ) ? 1 : ((start_num <= 200) ? 2 : ((start_num <= 300) ? 3 : 4 ))) as level;

括号看着有些晕,但是Pig中没有类似于switch这样的语法,只能要么这样来实现,要么自己写UDF
在实际开发中发现三元运算符的嵌套有限制,如果超出了会报错,这时就得对其进行拆分

  • split

APP上有一个位置(下载链接),需要统计这个位置的展示、点击、下载成功、下载失败,因些在收集数据时设计了一个action字段来标识,由APP上报,然后服务端把它们拆分开来统计

1
split log into display_log if action=='display',click_log if action=='click',downsuc_log if action=='down_success',downfail_log if action == 'down_fail';

当然,我们可以用filter来实现,但是效率不高

  • union

union用来合并两个相同结构的关系(relation)

对于split中的例子,对所有的下载进行统计(不管成功失败)

1
download_log = union downsuc_log,downfail_log;
  • join

join用来关联其他的关系(relation),有内联、外联、左联、右联等

现在来看一个例子——已知有一个用户表,记录着新增时的appid、channel等信息,一个当天的启动用户表,用户有可能更换渠道了,需要得到他新增时的渠道

1
2
3
4

join_d = join day_start_d by (appid,userid) left,user_d by (appid,userid);

result = foreach join_d generate day_start_d::appid as appid,day_start_d::userid,user_d::channel as channel;

看一下join的关系结构(describe join_d;)

1
join_d: {day_start_d::ngtime: chararray,day_start_d::appid: chararray,day_start_d::channel: chararray,day_start_d::platform: chararray,day_start_d::userid: chararray,day_start_d::start_num: int,user_d::appid: chararray,user_d::channel: chararray,user_d::platform: chararray,user_d::userid: chararray,user_d::ngtime: chararray}
  • cogroup
1
cogroup_d =  cogroup day_start_d by (appid,userid), user_d by (appid,userid);

describe输出:

1
cogroup_d: {group: (appid: chararray,userid: chararray),day_start_d: {(appid: chararray,channel: chararray,platform: chararray,userid: chararray,ngtime: chararray)},user_d: {(appid: chararray,channel: chararray,platform: chararray,userid: chararray,ngtime: chararray)}}

从describe的输出可以看出来,cogroup类似于group与join的一个合体

  • explain

查看执行计划

1
explain start_gd;

输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#--------------------------------------------------
# Map Reduce Plan
#--------------------------------------------------
MapReduce node scope-25
Map Plan
start_g: Local Rearrange[tuple]{tuple}(false) - scope-13
| |
| Project[chararray][1] - scope-14
|
|---day_start_d: New For Each(false,false,false,false,false)[bag] - scope-10
| |
| Project[chararray][0] - scope-1
| |
| Project[chararray][3] - scope-3
| |
| Project[chararray][2] - scope-5
| |
| Project[chararray][1] - scope-7
| |
| Constant(1) - scope-9
|
|---start_log: Load(yidong.event_start:org.apache.hcatalog.pig.HCatLoader) - scope-0--------
Reduce Plan
start_gd: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-24
|
|---start_gd: New For Each(true,true)[bag] - scope-23
| |
| Project[chararray][0] - scope-15
| |
| POUserFunc(org.apache.pig.impl.builtin.IdentityColumn)[bag] - scope-18
| |
| |---RelationToExpressionProject[bag][*] - scope-17
| |
| |---limit_d: Limit - scope-22
| |
| |---Project[bag][1] - scope-19
|
|---start_g: Package[tuple]{chararray} - scope-12--------
Global sort: false
Secondary sort: true
----------------

存储结果

  • 存储到文件中
1
store save_d into '/logs/tmp/day_start_user';
  • 存储到MySQL数据库
1
2
STORE save_d  INTO 'start_t' using org.apache.pig.piggybank.storage.DBStorage('com.mysql.jdbc.Driver', '$jdbc','$user','$password',
'INSERT INTO start_t (app_id, channel_id, platform, start_num,local_date,create_time,update_time) VALUES (?, ?, ?, ?, unix_timestamp(\'$date\'),unix_timestamp(),unix_timestamp())ON DUPLICATE KEY UPDATE start_num=?,update_time=unix_timestamp()');

操作HDFS

在Pig里,可以使用catlsmkdirrm等类UNIX命令对HDFS进行操作

比如:把分析结果存储到/logs/tmp/day_start_user中,但是第二次操作就会报错——提示目录已经存在,所以需要先删除,再保存

1
2
rmf /logs/tmp/day_start_user;
store save_d into '/logs/tmp/day_start_user';

但是实际应用中这些对HDFS文件/目录的删除操作一般是通过配置oozie来实现,这里仅拿此来举例

1
2
3
<prepare>
<delete path="${nameNode}/logs/tmp/day_start_user"/>
</prepare>

内建函数(BIF)

前面已经见到了不少BIF的应用,下面再介绍一些经常会用到的

  • ToDate()

获取前一天的数据

1
day1_user_log = filter new_log by dateline == ToString(AddDuration(ToDate('$date','yyyyMMdd'),'P-1D'),'yyyyMMdd')

ToDate()把字符串转换为日期,与之对应的是ToString()
AddDuration(datetime,duration)的duration采用ISO 8601格式,该例中的’P-1D’为”减1天”

  • UPPER()

全部转换为大写

1
day_start_d = foreach day_start_log generate appid,channel,UPPER(platform) as platform,1 as start_num;

与之相关的有LOWER(全部转为小写)

  • SIZE()

已知appid不超过32位,可以通过SIZE()与filter配合来过滤掉长度不合规则的日志

1
day_start_log = filter start_log by dateline == '$date' and SIZE(appid) <= 32;

使用JAVA的类库

1
2
DEFINE UrlEncode InvokeForString('java.net.URLEncoder.encode', 'String String'); 
event_log_data = foreach filter_log generate UrlEncode(defineid,'UTF-8') as defineid,UrlEncode(paramkey,'UTF-8') as paramkey,UrlEncode(paramvalue,'UTF-8') as paramvalue;

自定义函数(UDF)

UPPER()函数为例来自己实现一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package org.sunzy.pig.udf;

import java.io.IOException;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.WrappedIOException;

@SuppressWarnings("deprecation")
public class MyUpper extends EvalFunc <String> {
public String exec(Tuple input) throws IOException{
if (null == input || 0 == input.size())
return null;
try{
String str = (String) input.get(0);
return str.toUpperCase();
}catch(Exception e){
throw WrappedIOException.wrap("Caught exception processing input row ",e);
}
}
}

使用

1
2
3
register myudfs.jar;

day_start_d = foreach day_start_log generate ngtime,appid,channel,myudfs.MyUpper(platform) as platform;

一些坑

  • 分析与入库的值不一样

用describe检查一下分析结果的数据类型是否与数据库的数据类型一致

  • 部分数据未更新成功

遇到这种问题是因为我把对同一个表的多次插入更新操作放到了一个pig文件中执行,拆开后即可

与Pig相关的资料

两个官方文档,一本书,足矣

据说打赏我的人,代码没有BUG