抱歉,您的浏览器无法访问本站

本页面需要浏览器支持(启用)JavaScript


了解详情 >

Hive 是基于 Hadoop 中的 MapReduce,提供 HQL 查询的数据仓库.

Hive 是一个很开放的系统,很多内容都支持用户定制. 如 : 文件格式、MR脚本、自定义函数、自定义聚合函数 等.

UDF

编写 UDF函数 的时候需要注意一下几点:

  1. 自定义 UDF 需要继承 org.apache.hadoop.hive.ql.UDF
  2. 需要实现 evaluate 函数

以下是两个数求和函数的UDF。evaluate函数代表两个整型数据相加

1
2
3
4
5
6
7
8
9
10
11
12
13
package hive.connect;  

import org.apache.hadoop.hive.ql.exec.UDF;

public final class Add extends UDF {

public Integer evaluate(Integer a, Integer b) {
if (null == a || null == b) {
return null;
}
return a + b;
}
}

UDAF

函数类需要继承 UDAF 类,内部类 Evaluator 需要实现 UDAFEvaluator 接口.

Evaluator 需要实现 init、iterate、terminatePartial、merge、terminate 这几个函数.

  1. init函数实现接口 UDAFEvaluator 的 init 函数.
  2. iterate接收传入的参数,并进行内部的轮转。其返回类型为 boolean.
  3. terminatePartial无参数,其为 iterate 函数轮转结束后,返回轮转数据.
  4. merge 接收 terminatePartial 的返回结果,进行数据 merge 操作,其返回类型为boolean.
  5. terminate 返回最终的聚集函数结果.

下面是一个简单的 UDAF 的 demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package com.x.user_bhv;

import com.google.common.collect.Maps;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

import java.util.HashMap;
import java.util.Map;

public class UDAFMergeIntToIntMap extends UDAF {

public static class PartialResult {
Map<Integer, Integer> attributes;

PartialResult() {
attributes = Maps.newHashMap();
}
}

public static class UnitIdUDAFEvaluator implements UDAFEvaluator {
private PartialResult partialResult;

public UnitIdUDAFEvaluator() {
super();
init();
}

public void init() {
System.out.println("map init");
partialResult = new PartialResult();
}

public boolean iterate(Map<Integer, Integer> attributes_args) {

if (attributes_args == null || attributes_args.isEmpty()) {
return true;
}

for (Map.Entry<Integer, Integer> entry : attributes_args.entrySet()) {
this.partialResult.attributes.put(entry.getKey(), entry.getValue());
}
return true;
}

public PartialResult terminatePartial() {
return this.partialResult;
}

public boolean merge(PartialResult other) { // 参数不可能为 null

for (Map.Entry<Integer, Integer> entry : other.attributes.entrySet()) {
this.partialResult.attributes.put(entry.getKey(), entry.getValue());
}

return true;
}

public Map<Integer, Integer> terminate() {
if (partialResult == null) {
return new HashMap<Integer, Integer>();
} else {
return this.partialResult.attributes;
}
}
}

public static void main(String[] args) {
}
}

在 Hive 脚本中的使用示例 :

1
2
3
4
5
6
7
8
9
hql="ADD jar ${jar_dir}/user_bhv_for_hive.jar;
CREATE TEMPORARY FUNCTION merge_int_to_int_map AS 'com.x.user_bhv.UDAFMergeIntToIntMap';
INSERT OVERWRITE TABLE ${table_user_buy_category}
SELECT
mobile_number,
merge_int_to_int_map (level1_id_count_map)
FROM
ods_dm_e_coupon
GROUP BY mobile_number

Summary

  1. 重载 evaluate 函数.
  2. UDF 函数中参数类型可以为Writable,也可为java中的基本数据对象.
  3. UDF 支持变长的参数.
  4. Hive 支持隐式类型转换.
  5. 客户端退出时,创建的临时函数自动销毁.
  6. evaluate函数必须要返回类型值,空的话返回null,不能为void类型.
  7. UDF 和 UDAF 都可以重载.
  8. 查看函数 SHOW FUNCTIONS.

UDAF: User Defined Aggregation Function

Reference

Comments