🔥🔥 AllData大数据产品是可定义数据中台,以数据平台为底座,以数据中台为桥梁,以机器学习平台为中层框架,以大模型应用为上游产品,提供全链路数字化解决方案。
✨奥零数据科技官网:http://www.aolingdata.com
✨AllData开源项目:https://github.com/alldatacenter/alldata
✨AllData官方文档:https://alldata-document.readthedocs.io
✨AllData社区文档:https://docs.qq.com/doc/DVHlkSEtvVXVCdEFo
「 AllData数据中台 – 主页 」
「 湖仓一体化平台 – 功能描述 」
1、AllData数据同步平台基于开源项目kyuubi核心技术建设。
2、在数据同步与处理方面,湖仓一体化平台展现了其强大的实力。它能够实时捕获并同步各类数据源的变化,确保数据的及时性和准确性。同时,平台融合了Flink的流处理能力和Spark等计算引擎的批处理能力,实现了数据湖上的批流一体处理。这种处理方式不仅降低了数据处理的复杂性和成本,还提高了数据处理的效率和灵活性。
3、在生态集成方面,湖仓一体化平台与Hive、Trino、Presto等大数据主流计算引擎深度整合,为用户提供了统一的数据存储和访问接口。这种深度整合不仅简化了数据访问的复杂性,还提高了数据的可用性和共享性。此外,平台还支持多种数据源的无缝接入和统一管理,满足了企业多样化的数据需求。
4、在存储与查询性能方面,湖仓一体化平台采用了先进的存储架构和技术,如LSM树等,确保了高效的数据写入和查询性能。同时,平台还通过数据压缩和优化技术,进一步提升了存储效率和查询速度。
5、kyuubi湖仓一体化平台以其强大的功能、高效的处理能力、良好的生态集成以及卓越的存储与查询性能,成为了适用于各种大数据场景的存储解决方案。它不仅为企业提供了坚实可靠的数据基础,还为数据分析和实时计算提供了有力的支持。
「 湖仓一体化平台 – 模块功能汇总 」
「 湖仓一体化平台 – 功能点展示 」
「 湖仓查询 」统计概览
「 湖仓查询 」管理中心-会话中心
「 湖仓查询 」管理中心-操作中心
「 湖仓查询 」管理中心-引擎中心
「 湖仓查询 」管理中心-服务端
「 湖仓查询 」接口文档
「 湖仓查询 」查询中心
「 湖仓查询 」查询中心-SparkSQL
from pyspark.sql import SparkSession
# 创建支持 Hive 的 SparkSession 对象
spark = SparkSession.builder \
.appName("SparkSQLHiveQuery") \
.enableHiveSupport() \
.getOrCreate()
# 编写 Spark SQL 查询语句,假设存在一个名为 'your_hive_table' 的 Hive 表
query = "SELECT * FROM your_hive_table WHERE some_column = 'some_value'"
# 执行 SQL 查询,结果存储在一个新的 DataFrame 中
result = spark.sql(query)
# 显示查询结果
result.show()
# 停止 SparkSession,释放资源
spark.stop()
「 湖仓查询 」查询中心-FlinkSQ
from pyflink.table import EnvironmentSettings, TableEnvironment
# 创建基于 Blink 计划器的批处理环境设置
env_settings = EnvironmentSettings.new_instance() \
.in_batch_mode() \
.use_blink_planner() \
.build()
# 创建表环境
table_env = TableEnvironment.create(env_settings)
# 配置 Hive 连接
hive_catalog_ddl = """
CREATE CATALOG my_hive_catalog WITH (
'type' = 'hive',
'hive-conf-dir' = '/path/to/your/hive/conf'
)
"""
# 执行创建 Hive 目录的 SQL 语句
table_env.execute_sql(hive_catalog_ddl)
# 使用 Hive 目录
table_env.execute_sql("USE CATALOG my_hive_catalog")
# 编写 Flink SQL 查询语句,假设 Hive 中有一个名为 'your_hive_table' 的表
query = "SELECT * FROM your_hive_table WHERE some_column = 'some_value'"
# 执行查询
result_table = table_env.sql_query(query)
# 将结果转换为 Pandas DataFrame 并显示
result_pandas_df = result_table.to_pandas()
print(result_pandas_df)
「 湖仓查询 」查询中心-Trino
-- 首先使用 CTE(公共表表达式) subquery 计算每个客户的总订单金额
WITH subquery AS (
SELECT
c.customer_city,
c.customer_name,
SUM(o.order_amount) AS total_order_amount
FROM
hive.<your_database>.customers c
-- 通过客户 ID 关联 customers 表和 orders 表
JOIN
hive.<your_database>.orders o ON c.customer_id = o.customer_id
-- 按客户所在城市和客户姓名分组
GROUP BY
c.customer_city, c.customer_name
),
-- 接着使用 CTE rank_subquery 对每个城市的客户按总订单金额进行排名
rank_subquery AS (
SELECT
customer_city,
customer_name,
total_order_amount,
-- 使用 RANK() 函数为每个城市内的客户按总订单金额降序排名
RANK() OVER (PARTITION BY customer_city ORDER BY total_order_amount DESC) as ranking
FROM
subquery
)
-- 从 rank_subquery 中筛选出排名为 1 的记录,即每个城市中总订单金额最高的客户
SELECT
customer_city,
customer_name,
total_order_amount
FROM
rank_subquery
WHERE
ranking = 1;
「 湖仓查询 」查询中心-HiveSQL
-- 使用 CTE 对员工按部门进行薪资排名
WITH ranked_employees AS (
SELECT
employee_id,
employee_name,
department_id,
salary,
-- 使用 ROW_NUMBER() 窗口函数为每个部门内的员工按薪资降序排名
ROW_NUMBER() OVER (PARTITION BY department_id ORDER BY salary DESC) as ranking
FROM
employees
)
-- 从 ranked_employees 中筛选出排名在前三的员工信息
SELECT
employee_id,
employee_name,
department_id,
salary
FROM
ranked_employees
WHERE
ranking <= 3;
「 湖仓查询 」查询中心-JDBC
SELECT
c.customer_name,
SUM(o.quantity) AS total_quantity,
SUM(o.quantity * p.price) AS total_amount
FROM
customers c
JOIN
orders o ON c.customer_id = o.customer_id
JOIN
products p ON o.product_id = p.product_id
GROUP BY
c.customer_name
HAVING
SUM(o.quantity * p.price) > 1000;