1. 基础概念
Flink UDF(User-Defined Function)是用户自定义函数,用于扩展 Flink SQL 或 Table API 的内置函数功能。
2、分类
常见的 UDF 类型包括:
(1)flink UDF
Flink UDF(User-Defined Function) 为标量函数。
特点
单/多 字段输入,单字段输出,编写函数时,继承 Scalar Function。
使用场景
适合数据转换和简单的计算。如:字符串的格式转换,类型转换,根据某些条件计算新的字段值。
实现与注册
标量函数需继承 ScalarFunction 并实现 eval 方法。例如,实现一个字符串大写转换函数:
public class UpperCase extends ScalarFunction {
public String eval(String str) {
return str.toUpperCase();
}
}
注册方式:
tableEnv.createTemporarySystemFunction("MY_UPPER", UpperCase.class);
SQL 使用:
SELECT MY_UPPER(name) FROM users;
实际场景
根据 json 的 key,获取对应的 value,这在数仓的业务场景中,是非常基础的一个 udf 函数,同时也是使用最广泛、且重要的一个函数。
public class JsonExtract extends ScalarFunction {
public String eval(String json, String key) {
try {
JSONObject obj = new JSONObject(json);
return obj.getString(key);
} catch (JSONException e) {
return null;
}
}
}
SQL 使用:
SELECT JsonExtract(log, 'userId') AS userId FROM logs;
(2)flink UDTF
Flink UDTF(User-Defined Table Function) 为表函数。
特点为
单输入/多输入,多输出。编写函数时,继承 Table Function。
使用场景
数据拆分和数据扩展。例如:输入一个 json,返回 json 中的多个字段。或者根据某些规则生成额外的行数据。
实现步骤
- 继承 TableFunction,定义输出类型 T。
- 实现 eval 方法,并通过 collect(T) 输出多行。
示例:实现拆分字符串为多行的表函数:
public class Split extends TableFunction<String> {
public void eval(String str) {
for (String s : str.split(",")) {
collect(s);
}
}
}
SQL 使用:
SELECT name, word FROM users, LATERAL TABLE(Split(name)) AS T(word);
(3)Flink UDAF
Flink UDAF(User-Defined Aggregate Function)为聚合函数。
特点
对一组数据进行聚合计算。可以维护中间状态,逐步累积计算结果。编写函数时,继承 Table Function。
使用场景
- 常见的聚合操作:如求平均值、总和、最大值、最小值等。
- 自定义的复杂聚合逻辑,比如计算移动平均值等。
实现步骤
聚合函数需实现 AggregateFunction,其中 OUT 是输出类型,ACC 是累加器类型。核心方法:
- createAccumulator():初始化累加器。
- accumulate(ACC acc, …):累加输入数据。
- getValue(ACC acc):从累加器生成最终结果。
示例:实现求平均值的聚合函数:
public class Avg extends AggregateFunction<Double, AvgAccum> {
public AvgAccum createAccumulator() {
return new AvgAccum(0, 0);
}
public void accumulate(AvgAccum acc, Integer value) {
acc.sum += value;
acc.count++;
}
public Double getValue(AvgAccum acc) {
return acc.sum / (double) acc.count;
}
}
public static class AvgAccum {
public int sum;
public int count;
// 构造函数、getter/setter 省略
}
3、Flink UDF 中如何处理状态(State)
UDF:通常无状态,需保证线程安全。
UDAF:通过累加器(Accumulator)隐式管理状态,Flink 自动容错。
手动状态管理:在 RichFunction(如 RichMapFunction)中通过 RuntimeContext 访问状态 API,例如:
public class StatefulUDF extends RichMapFunction<String, String> {
private ValueState<String> state;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<String> descriptor =
new ValueStateDescriptor<>("state", String.class);
state = getRuntimeContext().getState(descriptor);
}
@Override
public String map(String value) throws Exception {
String current = state.value();
// 更新状态逻辑
state.update(newValue);
return result;
}
}
4、性能调优
(1)避免复杂对象创建:在 eval 方法中重用对象,减少 GC 压力。
(2)使用原生类型:优先使用 int、double 而非 Integer、Double。
(3)并行度调优:根据数据量和资源调整 UDF 算子的并行度。
(4)状态后端选择:对状态频繁更新的场景,使用 RocksDB 状态后端。
(5)禁用 Operator Chain:对计算密集型的 UDF,通过 disableChaining() 避免与上下游算子链化,提升资源利用率。
5. 异常处理
如果 UDF 抛出异常,如何避免整个作业失败呢?有如下几种方法:
(1)Try-Catch 处理:在 eval 方法内部捕获异常,返回默认值或错误标识。
(2)Side Output:将异常数据路由到侧输出流,单独处理。
(3)重启策略:配置 Flink 重启策略(如 fixed-delay),但需谨慎避免无限重启。