flink UDF使用

1. 基础概念

  Flink UDF(User-Defined Function)是用户自定义函数,用于扩展 Flink SQL 或 Table API 的内置函数功能。

2、分类

常见的 UDF 类型包括:

(1)flink UDF

Flink UDF(User-Defined Function) 为标量函数。

特点

单/多 字段输入,单字段输出,编写函数时,继承 Scalar Function。

使用场景

  适合数据转换和简单的计算。如:字符串的格式转换,类型转换,根据某些条件计算新的字段值。

实现与注册

  标量函数需继承 ScalarFunction 并实现 eval 方法。例如,实现一个字符串大写转换函数:

public class UpperCase extends ScalarFunction {
    public String eval(String str) {
        return str.toUpperCase();
    }
}

注册方式:

tableEnv.createTemporarySystemFunction("MY_UPPER", UpperCase.class);

SQL 使用:

SELECT MY_UPPER(name) FROM users;

实际场景

  根据 json 的 key,获取对应的 value,这在数仓的业务场景中,是非常基础的一个 udf 函数,同时也是使用最广泛、且重要的一个函数。

public class JsonExtract extends ScalarFunction {
    public String eval(String json, String key) {
        try {
            JSONObject obj = new JSONObject(json);
            return obj.getString(key);
        } catch (JSONException e) {
            return null;
        }
    }
}

SQL 使用:

SELECT JsonExtract(log, 'userId') AS userId FROM logs;

(2)flink UDTF

Flink UDTF(User-Defined Table Function) 为表函数。

特点为

单输入/多输入,多输出。编写函数时,继承 Table Function。

使用场景

  数据拆分和数据扩展。例如:输入一个 json,返回 json 中的多个字段。或者根据某些规则生成额外的行数据。

实现步骤

  • 继承 TableFunction,定义输出类型 T。
  • 实现 eval 方法,并通过 collect(T) 输出多行。

示例:实现拆分字符串为多行的表函数:

public class Split extends TableFunction<String> {
    public void eval(String str) {
        for (String s : str.split(",")) {
            collect(s);
        }
    }
}

SQL 使用:

SELECT name, word FROM users, LATERAL TABLE(Split(name)) AS T(word);

(3)Flink UDAF

Flink UDAF(User-Defined Aggregate Function)为聚合函数。

特点

  对一组数据进行聚合计算。可以维护中间状态,逐步累积计算结果。编写函数时,继承 Table Function。

使用场景

  • 常见的聚合操作:如求平均值、总和、最大值、最小值等。
  • 自定义的复杂聚合逻辑,比如计算移动平均值等。

实现步骤

  聚合函数需实现 AggregateFunction,其中 OUT 是输出类型,ACC 是累加器类型。核心方法:

  • createAccumulator():初始化累加器。
  • accumulate(ACC acc, …):累加输入数据。
  • getValue(ACC acc):从累加器生成最终结果。

示例:实现求平均值的聚合函数:

public class Avg extends AggregateFunction<Double, AvgAccum> {
    public AvgAccum createAccumulator() {
        return new AvgAccum(0, 0);
    }
    
    public void accumulate(AvgAccum acc, Integer value) {
        acc.sum += value;
        acc.count++;
    }
    
    public Double getValue(AvgAccum acc) {
        return acc.sum / (double) acc.count;
    }
}

public static class AvgAccum {
    public int sum;
    public int count;
    // 构造函数、getter/setter 省略
}

3、Flink UDF 中如何处理状态(State)

UDF:通常无状态,需保证线程安全。

UDAF:通过累加器(Accumulator)隐式管理状态,Flink 自动容错。

手动状态管理:在 RichFunction(如 RichMapFunction)中通过 RuntimeContext 访问状态 API,例如:

public class StatefulUDF extends RichMapFunction<String, String> {
    private ValueState<String> state;
    
    @Override
    public void open(Configuration parameters) {
        ValueStateDescriptor<String> descriptor = 
            new ValueStateDescriptor<>("state", String.class);
        state = getRuntimeContext().getState(descriptor);
    }
    
    @Override
    public String map(String value) throws Exception {
        String current = state.value();
        // 更新状态逻辑
        state.update(newValue);
        return result;
    }
}

4、性能调优

(1)避免复杂对象创建:在 eval 方法中重用对象,减少 GC 压力。

(2)使用原生类型:优先使用 int、double 而非 Integer、Double。

(3)并行度调优:根据数据量和资源调整 UDF 算子的并行度。

(4)状态后端选择:对状态频繁更新的场景,使用 RocksDB 状态后端。

(5)禁用 Operator Chain:对计算密集型的 UDF,通过 disableChaining() 避免与上下游算子链化,提升资源利用率。

5. 异常处理

  如果 UDF 抛出异常,如何避免整个作业失败呢?有如下几种方法:

(1)Try-Catch 处理:在 eval 方法内部捕获异常,返回默认值或错误标识。
(2)Side Output:将异常数据路由到侧输出流,单独处理。
(3)重启策略:配置 Flink 重启策略(如 fixed-delay),但需谨慎避免无限重启。

版权声明

   站内部分内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供网络资源分享服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请 联系我们 一经核实,立即删除。并对发布账号进行永久封禁处理。在为用户提供最好的产品同时,保证优秀的服务质量。


本站仅提供信息存储空间,不拥有所有权,不承担相关法律责任。

给TA打赏
共{{data.count}}人
人已打赏
网站技巧

网站源码分享搭建 网站源码使用教程

2024-9-19 20:00:19

大数据

大数据应用与服务赛项赛题04(大数据技术与应用竞赛与答案)

2025-2-25 15:19:12

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索