重庆思庄Oracle、KingBase、PostgreSQL、Redhat认证学习论坛

标题: PostgreSQL pg_clickhouse 0.1.0 插件的安装和使用 [打印本页]

作者: mahan    时间: 2025-12-28 12:23
标题: PostgreSQL pg_clickhouse 0.1.0 插件的安装和使用
clickhouse公司开发的pg_clickhouse可以由PostgreSQL数据库调用clickhouse数据库的能力,实现快速的查询。

https://github.com/ClickHouse/pg ... oc/pg_clickhouse.md

该库包含一个 PostgreSQL 扩展,支持在 ClickHouse 数据库上执行远程查询,包括一个外部数据包装器(foreign data wrapper)。其兼容 PostgreSQL 13 及以上版本,以及 ClickHouse 23 及以上版本。

版本控制策略
pg_clickhouse 的公开版本遵循语义化版本控制(Semantic Versioning):

主版本号(Major):当 API 发生不兼容变更时递增
次版本号(Minor):当新增向后兼容的 SQL 功能时递增
修订版本号(Patch):仅发生二进制层面的兼容变更时递增
安装后,PostgreSQL 会跟踪该扩展的两个版本变体:

库版本(Library Version):在 PostgreSQL 18 及以上版本中由 PG_MODULE_MAGIC 定义,包含完整的语义化版本号,可通过 pg_get_loaded_modules() 函数查看
扩展版本(Extension Version):在控制文件中定义,仅包含主版本号和次版本号,可通过 pg_catalog.pg_extension 表、pg_available_extension_versions() 函数或 \dx pg_clickhouse 命令查看
实际使用中:

仅修订版本号递增的更新(如从 v0.1.0 到 v0.1.1):所有已加载 v0.1 版本的数据库无需执行 ALTER EXTENSION 即可享受更新带来的优化
次版本号或主版本号递增的更新:会附带 SQL 升级脚本,所有已安装该扩展的数据库必须执行 ALTER EXTENSION pg_clickhouse UPDATE 才能完成升级
安装
快速开始
体验 pg_clickhouse 最简单的方式是使用 Docker 镜像,该镜像基于标准 PostgreSQL Docker 镜像并集成了 pg_clickhouse 扩展:

docker run --name pg_clickhouse -e POSTGRES_PASSWORD=my_pass \
       -d ghcr.io/clickhouse/pg_clickhouse:18
docker exec -it pg_clickhouse psql -U postgres
使用

CREATE EXTENSION pg_clickhouse;
CREATE SERVER taxi_srv FOREIGN DATA WRAPPER clickhouse_fdw
       OPTIONS(driver 'binary', host 'localhost', dbname 'taxi');
CREATE USER MAPPING FOR CURRENT_USER SERVER taxi_srv
       OPTIONS (user 'default');
CREATE SCHEMA taxi;
IMPORT FOREIGN SCHEMA taxi FROM SERVER taxi_srv INTO taxi;
从源代码编译
PostgreSQL 和 curl 开发包会在环境变量中包含 pg_config 和 curl_config,因此您只需运行 make(或 gmake),然后执行 make install,最后在数据库中运行 CREATE EXTENSION pg_clickhouse 即可。

RedHat / CentOS / Yum 系统

sudo yum install \
  postgresql-server \
  libcurl-devel \
  libuuid-devel \
  openssl-libs \
  automake \
  cmake \
  gcc
从 PGXN 安装

满足上述依赖后,使用 PGXN 客户端(可通过 Homebrew、Apt、Yum 安装,包名均为 pgxnclient)下载、编译并安装 pg_clickhouse:

pgxn install pg_clickhouse
编译与安装
要构建并安装 ClickHouse 库和 pg_clickhouse 扩展,运行以下命令:

make
sudo make install
特殊情况处理
若主机存在多个 PostgreSQL 安装版本,需指定对应的 pg_config 路径:

export PG_CONFIG=/usr/lib/postgresql/18/bin/pg_config
make
sudo make install
若 curl_config 未在环境变量中,可显式指定路径:

export CURL_CONFIG=/opt/homebrew/opt/curl/bin/curl-config
make
sudo make install
若遇到如下错误:

"Makefile", line 8: Need an operator
需使用 GNU make(系统中可能以 gmake 命名):

gmake
gmake install
gmake installcheck
若遇到如下错误:

make: pg_config: Command not found
需确保 pg_config 已安装且在环境变量中。若通过 RPM 等包管理工具安装 PostgreSQL,需同时安装 -devel 开发包。必要时显式指定 pg_config 路径:

export PG_CONFIG=/path/to/pg_config
make
sudo make install
若需在 PostgreSQL 18 及以上版本中安装到自定义路径,可在 install 目标中指定 prefix 参数(其他 make 目标无需指定):

sudo make install prefix=/usr/local/extras
然后在 postgresql.conf 中配置以下参数,确保自定义路径被识别:

extension_control_path = '/usr/local/extras/postgresql/share:$system'
dynamic_library_path   = '/usr/local/extras/postgresql/lib:$libdir'
测试
安装扩展后,运行以下命令执行测试套件:

make installcheck
若遇到如下错误:

ERROR:  must be owner of database regression
需使用超级用户(如默认的 postgres 超级用户)运行测试:

make installcheck PGUSER=postgres
加载扩展
pg_clickhouse 安装完成后,可通过以下步骤加载到数据库:

以超级用户身份连接数据库

执行以下 SQL 语句:

CREATE EXTENSION pg_clickhouse;
若需将 pg_clickhouse 及其所有支持对象安装到指定 schema,使用 SCHEMA 子句指定:

CREATE SCHEMA env;
CREATE EXTENSION pg_clickhouse SCHEMA env;
依赖项

运行依赖:PostgreSQL 13+、libcurl、libuuid
构建依赖:C/C++ 编译器、libSSL、GNU make、CMake
使用 pg_clickhouse
以下 SQL 语句用于操作 pg_clickhouse 扩展。

CREATE EXTENSION
使用 CREATE EXTENSION 将 pg_clickhouse 添加到数据库:

CREATE EXTENSION pg_clickhouse;
推荐使用 WITH SCHEMA 指定扩展安装的 schema:

CREATE SCHEMA ch;
CREATE EXTENSION pg_clickhouse WITH SCHEMA ch;
ALTER EXTENSION
使用 ALTER EXTENSION 修改 pg_clickhouse 扩展配置:

安装新版本后,使用 UPDATE 子句升级:

ALTER EXTENSION pg_clickhouse UPDATE;
使用 SET SCHEMA 将扩展迁移到新的 schema:

CREATE SCHEMA ch;
ALTER EXTENSION pg_clickhouse SET SCHEMA ch;
DROP EXTENSION
使用 DROP EXTENSION 从数据库中移除 pg_clickhouse:

DROP EXTENSION pg_clickhouse;
若存在依赖该扩展的对象,上述命令会执行失败。可使用 CASCADE 子句同时删除依赖对象:

DROP EXTENSION pg_clickhouse CASCADE;
CREATE SERVER
使用 CREATE SERVER 创建连接 ClickHouse 服务器的外部服务器:

CREATE SERVER taxi_srv FOREIGN DATA WRAPPER clickhouse_fdw
       OPTIONS(driver 'binary', host 'localhost', dbname 'taxi');
支持的选项:

driver:ClickHouse 连接驱动,可选值为 “binary” 或 “http”,必填项
dbname:连接时使用的 ClickHouse 数据库,默认值为 “default”
host:ClickHouse 服务器主机名,默认值为 “localhost”
port:连接端口,默认值如下:
若 driver 为 “binary” 且 host 是 ClickHouse Cloud 主机:9440
若 driver 为 “binary” 且 host 非 ClickHouse Cloud 主机:9004
若 driver 为 “http” 且 host 是 ClickHouse Cloud 主机:8443
若 driver 为 “http” 且 host 非 ClickHouse Cloud 主机:8123
ALTER SERVER
使用 ALTER SERVER 修改外部服务器配置:

ALTER SERVER taxi_srv OPTIONS (SET driver 'http');
支持的选项与 CREATE SERVER 一致。

DROP SERVER
使用 DROP SERVER 移除外部服务器:

DROP SERVER taxi_srv;
若存在依赖该服务器的对象,上述命令会执行失败。可使用 CASCADE 子句同时删除依赖对象:

DROP SERVER taxi_srv CASCADE;
CREATE USER MAPPING
使用 CREATE USER MAPPING 将 PostgreSQL 用户映射到 ClickHouse 用户。例如,将当前 PostgreSQL 用户映射到 taxi_srv 外部服务器对应的 ClickHouse 用户:

CREATE USER MAPPING FOR CURRENT_USER SERVER taxi_srv
       OPTIONS (user 'demo');
支持的选项:

user:ClickHouse 用户名,默认值为 “default”
password:ClickHouse 用户密码
ALTER USER MAPPING
使用 ALTER USER MAPPING 修改用户映射配置:

ALTER USER MAPPING FOR CURRENT_USER SERVER taxi_srv
       OPTIONS (SET user 'default');
支持的选项与 CREATE USER MAPPING 一致。

DROP USER MAPPING
使用 DROP USER MAPPING 移除用户映射:

DROP USER MAPPING FOR CURRENT_USER SERVER taxi_srv;
IMPORT FOREIGN SCHEMA
使用 IMPORT FOREIGN SCHEMA 将 ClickHouse 数据库中的所有表导入为 PostgreSQL 中的外部表:

CREATE SCHEMA taxi;
IMPORT FOREIGN SCHEMA demo FROM SERVER taxi_srv INTO taxi;
使用 LIMIT TO 限制仅导入指定表:

IMPORT FOREIGN SCHEMA demo LIMIT TO (trips) FROM SERVER taxi_srv INTO taxi;
使用 EXCEPT 排除指定表:

IMPORT FOREIGN SCHEMA demo EXCEPT (users) FROM SERVER taxi_srv INTO taxi;
pg_clickhouse 会执行以下操作:

获取指定 ClickHouse 数据库(上述示例中为 “demo”)的所有表列表
获取每个表的列定义
执行 CREATE FOREIGN TABLE 命令创建外部表,列会使用支持的数据类型,并在可检测的情况下包含 CREATE FOREIGN TABLE 支持的选项
⚠️ 导入标识符的大小写保留规则

IMPORT FOREIGN SCHEMA 会对导入的表名和列名执行 quote_identifier() 函数,这意味着包含大写字符或空格的标识符会被双引号包裹。因此,在 PostgreSQL 查询中必须对这类表名和列名使用双引号。全小写且无空格的标识符无需引号。

示例:

ClickHouse 中的表定义:

CREATE OR REPLACE TABLE test
(
    id UInt64,
    Name TEXT,
    updatedAt DateTime DEFAULT now()
)
ENGINE = MergeTree
ORDER BY id;
通过 IMPORT FOREIGN SCHEMA 创建的外部表:

CREATE TABLE test
(
    id          BIGINT      NOT NULL,
    "Name"      TEXT        NOT NULL,
    "updatedAt" TIMESTAMPTZ NOT NULL
);
查询时需正确使用双引号:

SELECT id, "Name", "updatedAt" FROM test;
若需创建名称不同或全小写(大小写不敏感)的对象,请使用 CREATE FOREIGN TABLE。

CREATE FOREIGN TABLE
使用 CREATE FOREIGN TABLE 创建可查询 ClickHouse 数据的外部表:

CREATE FOREIGN TABLE uact (
    user_id    bigint NOT NULL,
    page_views int,
    duration   smallint,
    sign       smallint
) SERVER taxi_srv OPTIONS(
    table_name 'uact',
    engine 'CollapsingMergeTree'
);
支持的表选项:

database:远程 ClickHouse 数据库名,默认值为外部服务器定义的数据库
table_name:远程 ClickHouse 表名,默认值为外部表的名称
engine:ClickHouse 表使用的引擎。对于 CollapsingMergeTree() 和 AggregatingMergeTree(),pg_clickhouse 会自动将参数应用到该表上执行的函数表达式中
列数据类型需与远程 ClickHouse 表的列类型匹配。对于 AggregateFunction 类型和 SimpleAggregateFunction 类型的列,需将数据类型映射到函数对应的 ClickHouse 类型,并通过列选项指定聚合函数名称:

AggregateFunction:应用于 AggregateFunction 类型列的聚合函数名称
SimpleAggregateFunction:应用于 SimpleAggregateFunction 类型列的聚合函数名称
示例:

CREATE FOREIGN TABLE test (
    column1 bigint  OPTIONS(AggregateFunction 'uniq'),
    column2 integer OPTIONS(AggregateFunction 'anyIf'),
    column3 bigint  OPTIONS(AggregateFunction 'quantiles(0.5, 0.9)')
) SERVER clickhouse_srv;
对于带有 AggregateFunction 函数的列,pg_clickhouse 会自动在评估该列的聚合函数后追加 Merge。

ALTER FOREIGN TABLE
使用 ALTER FOREIGN TABLE 修改外部表定义:

ALTER TABLE table ALTER COLUMN b OPTIONS (SET AggregateFunction 'count');
支持的表选项和列选项与 CREATE FOREIGN TABLE 一致。

DROP FOREIGN TABLE
使用 DROP FOREIGN TABLE 移除外部表:

DROP FOREIGN TABLE uact;
若存在依赖该外部表的对象,上述命令会执行失败。可使用 CASCADE 子句同时删除依赖对象:

DROP FOREIGN TABLE uact CASCADE;
函数和运算符参考
数据类型映射
pg_clickhouse 将 ClickHouse 数据类型映射为以下 PostgreSQL 数据类型:

ClickHouse 类型        PostgreSQL 类型        备注
Bool        boolean       
Date        date       
DateTime        timestamp       
Decimal        numeric       
Float32        real       
Float64        double precision       
IPv4        inet       
IPv6        inet       
Int16        smallint       
Int32        integer       
Int64        bigint       
Int8        smallint       
JSON        jsonb        仅支持 HTTP 引擎
String        text       
UInt16        integer       
UInt32        bigint       
UInt64        bigint        当值超过 BIGINT 最大值时会报错
UInt8        smallint       
UUID        uuid       
函数
以下函数提供查询 ClickHouse 数据库的接口。

clickhouse_raw_query
SELECT clickhouse_raw_query(
    'CREATE TABLE t1 (x String) ENGINE = Memory',
    'host=localhost port=8123'
);
通过 HTTP 接口连接 ClickHouse 服务,执行单个查询后断开连接。可选的第二个参数为连接字符串,默认值为 host=localhost port=8123。支持的连接参数:

host:连接的主机名,必填项
port:HTTP 端口,默认值为 8123;若 host 是 ClickHouse Cloud 主机,默认值为 8443
dbname:连接的数据库名
username:连接使用的用户名,默认值为 “default”
password:认证密码,默认值为空
该函数适用于无返回结果的查询;若查询有返回值,会以单个文本值形式返回:

SELECT clickhouse_raw_query(
    'SELECT schema_name, schema_owner from information_schema.schemata',
    'host=localhost port=8123'
);
返回结果:

      clickhouse_raw_query      
---------------------------------
INFORMATION_SCHEMA      default+
default default                +
git     default                +
information_schema      default+
system  default                +

(1 row)
下推函数(Pushdown Functions)
所有在查询 ClickHouse 外部表的条件子句(HAVING 和 WHERE)中使用的 PostgreSQL 内置函数,都会自动以相同名称和签名下推到 ClickHouse 执行。但部分函数的名称或签名不同,需映射为对应的 ClickHouse 函数。pg_clickhouse 支持以下函数映射:

date_part:
date_part('day') → toDayOfMonth
date_part('doy') → toDayOfYear
date_part('dow') → toDayOfWeek
date_part('year') → toYear
date_part('month') → toMonth
date_part('hour') → toHour
date_part('minute') → toMinute
date_part('second') → toSecond
date_part('quarter') → toQuarter
date_part('isoyear') → toISOYear
date_part('week') → toISOYear
date_part('epoch') → toISOYear
date_trunc:
date_trunc('week') → toMonday
date_trunc('second') → toStartOfSecond
date_trunc('minute') → toStartOfMinute
date_trunc('hour') → toStartOfHour
date_trunc('day') → toStartOfDay
date_trunc('month') → toStartOfMonth
date_trunc('quarter') → toStartOfQuarter
date_trunc('year') → toStartOfYear
array_position → indexOf
btrim → trimBoth
strpos → position
regexp_like → match
自定义函数(Custom Functions)
pg_clickhouse 提供以下自定义函数,用于为无 PostgreSQL 等效函数的 ClickHouse 函数提供外部查询下推支持。若这些函数无法下推,会抛出异常:

dictGet
下推类型转换(Pushdown Casts)
pg_clickhouse 支持将 CAST(x AS bigint) 等类型转换操作下推到 ClickHouse(仅适用于兼容的数据类型)。若类型不兼容,下推会失败(例如,若 x 是 ClickHouse 的 UInt64 类型,ClickHouse 会拒绝该转换)。

为支持将类型转换下推到不兼容的数据类型,pg_clickhouse 提供以下函数。若这些函数无法下推,会在 PostgreSQL 中抛出异常:

toUInt8
toUInt16
toUInt32
toUInt64
toUInt128
下推聚合函数(Pushdown Aggregates)
以下 PostgreSQL 聚合函数支持下推到 ClickHouse 执行:

count
自定义聚合函数(Custom Aggregates)
pg_clickhouse 提供以下自定义聚合函数,用于为无 PostgreSQL 等效函数的 ClickHouse 聚合函数提供外部查询下推支持。若这些函数无法下推,会抛出异常:

argMax
argMin
uniq
uniqCombined
uniqCombined64
uniqExact
uniqHLL12
uniqTheta
quantile
quantileExact
下推有序集合聚合函数(Pushdown Ordered Set Aggregates)
以下有序集合聚合函数会映射到 ClickHouse 的参数化聚合函数,将直接参数作为参数传递,将 ORDER BY 表达式作为参数传入。例如:

PostgreSQL 查询:

SELECT percentile_cont(0.25) WITHIN GROUP (ORDER BY a) FROM t1;
映射为 ClickHouse 查询:

SELECT quantile(0.25)(a) FROM t1;
注意:不支持非默认的 ORDER BY 后缀(DESC 和 NULLS FIRST),否则会抛出错误。

支持的函数:

percentile_cont(double) → quantile
quantile(double) → quantile
quantileExact(double) → quantileExact
会话设置
通过设置 pg_clickhouse.session_settings 运行时参数,可配置后续查询中 ClickHouse 的会话设置。示例:

SET pg_clickhouse.session_settings = 'join_use_nulls 1, final 1';
默认值为 join_use_nulls 1。若需使用 ClickHouse 服务器的默认设置,可将其设为空字符串:

SET pg_clickhouse.session_settings = '';
语法规则:

以逗号分隔的键值对列表,键和值之间用一个或多个空格分隔

键必须是 ClickHouse 支持的设置项

若值中包含空格、逗号或反斜杠,需用反斜杠转义:

SET pg_clickhouse.session_settings = 'join_algorithm grace_hash\,hash';
也可使用单引号包裹值以避免转义空格和逗号;若需避免双引号转义,可使用美元引号(dollar quoting):

SET pg_clickhouse.session_settings = $$join_algorithm 'grace_hash,hash'$$;
若需设置多个配置项以提高可读性,可使用多行格式:

SET pg_clickhouse.session_settings TO $$
    connect_timeout 2,
    count_distinct_implementation uniq,
    final 1,
    group_by_use_nulls 1,
    join_algorithm 'prefer_partial_merge',
    join_use_nulls 1,
    log_queries_min_type QUERY_FINISH,
    max_block_size 32768,
    max_execution_time 45,
    max_result_rows 1024,
    metrics_perf_events_list 'this,that',
    network_compression_method ZSTD,
    poll_interval 5,
    totals_mode after_having_auto
$$;
pg_clickhouse 不会验证这些设置项,仅会在每个查询中传递给 ClickHouse。因此,该参数支持各个 ClickHouse 版本的所有设置项。

注意:设置 pg_clickhouse.session_settings 前需确保 pg_clickhouse 已加载,可通过库预加载(shared library preloading)或使用扩展中的任意对象来确保加载。

测试用例
启动 ClickHouse
首先,若尚未部署 ClickHouse 数据库,可通过 Docker 快速启动:

docker run -d --network host --name clickhouse -p 8123:8123 -p 9000:9000 --ulimit nofile=262144:262144 clickhouse

docker exec -it clickhouse clickhouse-client
执行:

[root@bigdata ~]# docker run -d --network host --name clickhouse -p 8123:8123 -p 9000:9000 --ulimit nofile=262144:262144 clickhouse
Unable to find image 'clickhouse:latest' locally
latest: Pulling from library/clickhouse
7e49dc6156b0: Pull complete
66a9f27340b0: Pull complete
4ea1943fd65c: Pull complete
c4e91a1791f4: Pull complete
b95838ab0602: Pull complete
e70e29afad90: Pull complete
596aaa9e4cb7: Pull complete
077cf31b66a6: Pull complete
Digest: sha256:0c1acee29c905829331544ec71342ed4346a6383da60f0c488f68b6b45009010
Status: Downloaded newer image for clickhouse:latest
WARNING: Published ports are discarded when using host network mode
22ec632560f4d7ddafc04610623ceb80e9588ac5fa702c13eff57b15ad8578ac
[root@bigdata ~]# docker exec -it clickhouse clickhouse-client
ClickHouse client version 25.11.2.24 (official build).
Connecting to localhost:9000 as user default.
Connected to ClickHouse server version 25.11.2.

Warnings:
* Delay accounting is not enabled, OSIOWaitMicroseconds will not be gathered. You can enable it using `sudo sh -c 'echo 1 > /proc/sys/kernel/task_delayacct'` or by using sysctl.
* Linux transparent hugepages are set to "always". Check /sys/kernel/mm/transparent_hugepage/enabled
* Available disk space for data at server startup is too low (1GiB): /var/lib/clickhouse/
* Available disk space for logs at server startup is too low (1GiB): /var/log/clickhouse-server

bigdata :)
创建表
参考 ClickHouse 官方教程,创建存储纽约市出租车数据集的数据库和表:

CREATE DATABASE taxi;
CREATE TABLE taxi.trips
(
    trip_id UInt32,
    vendor_id Enum8(
        '1'      =  1, '2'      =  2, '3'      =  3, '4'      =  4,
        'CMT'    =  5, 'VTS'    =  6, 'DDS'    =  7, 'B02512' = 10,
        'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14,
        ''       = 15
    ),
    pickup_date Date,
    pickup_datetime DateTime,
    dropoff_date Date,
    dropoff_datetime DateTime,
    store_and_fwd_flag UInt8,
    rate_code_id UInt8,
    pickup_longitude Float64,
    pickup_latitude Float64,
    dropoff_longitude Float64,
    dropoff_latitude Float64,
    passenger_count UInt8,
    trip_distance Float64,
    fare_amount Decimal(10, 2),
    extra Decimal(10, 2),
    mta_tax Decimal(10, 2),
    tip_amount Decimal(10, 2),
    tolls_amount Decimal(10, 2),
    ehail_fee Decimal(10, 2),
    improvement_surcharge Decimal(10, 2),
    total_amount Decimal(10, 2),
    payment_type Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4),
    trip_type UInt8,
    pickup FixedString(25),
    dropoff FixedString(25),
    cab_type Enum8('yellow' = 1, 'green' = 2, 'uber' = 3),
    pickup_nyct2010_gid Int8,
    pickup_ctlabel Float32,
    pickup_borocode Int8,
    pickup_ct2010 String,
    pickup_boroct2010 String,
    pickup_cdeligibil String,
    pickup_ntacode FixedString(4),
    pickup_ntaname String,
    pickup_puma UInt16,
    dropoff_nyct2010_gid UInt8,
    dropoff_ctlabel Float32,
    dropoff_borocode UInt8,
    dropoff_ct2010 String,
    dropoff_boroct2010 String,
    dropoff_cdeligibil String,
    dropoff_ntacode FixedString(4),
    dropoff_ntaname String,
    dropoff_puma UInt16
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(pickup_date)
ORDER BY pickup_datetime;
导入数据集
执行以下语句导入数据(数据来源于 S3 存储的纽约市出租车数据集):

INSERT INTO taxi.trips
SELECT * FROM s3(
    'https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/trips_{1..2}.gz',
    'TabSeparatedWithNames', "
    trip_id UInt32,
    vendor_id Enum8(
        '1'      =  1, '2'      =  2, '3'      =  3, '4'      =  4,
        'CMT'    =  5, 'VTS'    =  6, 'DDS'    =  7, 'B02512' = 10,
        'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14,
        ''       = 15
    ),
    pickup_date Date,
    pickup_datetime DateTime,
    dropoff_date Date,
    dropoff_datetime DateTime,
    store_and_fwd_flag UInt8,
    rate_code_id UInt8,
    pickup_longitude Float64,
    pickup_latitude Float64,
    dropoff_longitude Float64,
    dropoff_latitude Float64,
    passenger_count UInt8,
    trip_distance Float64,
    fare_amount Decimal(10, 2),
    extra Decimal(10, 2),
    mta_tax Decimal(10, 2),
    tip_amount Decimal(10, 2),
    tolls_amount Decimal(10, 2),
    ehail_fee Decimal(10, 2),
    improvement_surcharge Decimal(10, 2),
    total_amount Decimal(10, 2),
    payment_type Enum8('UNK' = 0, 'CSH' = 1, 'CRE' = 2, 'NOC' = 3, 'DIS' = 4),
    trip_type UInt8,
    pickup FixedString(25),
    dropoff FixedString(25),
    cab_type Enum8('yellow' = 1, 'green' = 2, 'uber' = 3),
    pickup_nyct2010_gid Int8,
    pickup_ctlabel Float32,
    pickup_borocode Int8,
    pickup_ct2010 String,
    pickup_boroct2010 String,
    pickup_cdeligibil String,
    pickup_ntacode FixedString(4),
    pickup_ntaname String,
    pickup_puma UInt16,
    dropoff_nyct2010_gid UInt8,
    dropoff_ctlabel Float32,
    dropoff_borocode UInt8,
    dropoff_ct2010 String,
    dropoff_boroct2010 String,
    dropoff_cdeligibil String,
    dropoff_ntacode FixedString(4),
    dropoff_ntaname String,
    dropoff_puma UInt16
") SETTINGS input_format_try_infer_datetimes = 0
验证数据导入成功后退出 ClickHouse 客户端:

SELECT count() FROM taxi.trips; -- 查看数据总行数
quit -- 退出客户端
安装 pg_clickhouse
可通过以下方式安装 pg_clickhouse:

从 PGXN 或 GitHub 编译安装
直接使用 Docker 镜像(基于标准 PostgreSQL 镜像,已预装 pg_clickhouse):
docker run -d --network host --name pg_clickhouse -e POSTGRES_PASSWORD=my_pass \
       ghcr.io/clickhouse/pg_clickhouse:18
连接 pg_clickhouse
首先连接到 PostgreSQL 数据库:

docker exec -it pg_clickhouse psql -U postgres
启用 pg_clickhouse 扩展:

CREATE EXTENSION pg_clickhouse;
创建外部服务器
创建连接 ClickHouse 的外部服务器,需指定 ClickHouse 的主机、端口和数据库:

CREATE SERVER taxi_srv FOREIGN DATA WRAPPER clickhouse_fdw
       OPTIONS(driver 'binary', host 'localhost', dbname 'taxi');
driver:连接驱动,可选 binary(使用 ClickHouse 二进制协议)或 http(使用 HTTP 接口)
host:ClickHouse 主机名(此处为本地)
dbname:目标 ClickHouse 数据库名(即前文创建的 taxi)
创建用户映射
将 PostgreSQL 用户映射到 ClickHouse 用户,最简单的方式是映射当前 PostgreSQL 用户:

CREATE USER MAPPING FOR CURRENT_USER SERVER taxi_srv
       OPTIONS (user 'default'); -- ClickHouse 默认用户名
如需认证,可添加 password 选项(例如 OPTIONS (user 'default', password 'your_pass'))。

导入外部表
创建 PostgreSQL 模式(schema),并将 ClickHouse 的 taxi 数据库中所有表导入该模式:

CREATE SCHEMA taxi; -- 创建本地模式
IMPORT FOREIGN SCHEMA taxi FROM SERVER taxi_srv INTO taxi; -- 导入外部表
验证导入结果
在 psql 中使用以下命令验证外部表是否导入成功:

查看外部表列表(\det+):
taxi=# \det+ taxi.*
                                       外部表列表
模式  |  表名  |   服务器    |                   FDW 选项                    |  描述  
-------+--------+-------------+-----------------------------------------------+--------
taxi  | trips  | taxi_srv    | (database 'taxi', table_name 'trips', engine 'MergeTree') | [null]
(1 行记录)
查看表结构(\d):
taxi=# \d taxi.trips
                                     外部表 "taxi.trips"
        列名         |           类型           | 排序规则 | 可为空 | 默认值 | FDW 选项
---------------------+--------------------------+----------+--------+---------+----------
trip_id             | bigint                   |          | 否     |         |
vendor_id           | text                     |          | 否     |         |
pickup_date         | date                     |          | 否     |         |
pickup_datetime     | timestamp without time zone |          | 否     |         |
dropoff_date        | date                     |          | 否     |         |
dropoff_datetime    | timestamp without time zone |          | 否     |         |
store_and_fwd_flag  | smallint                 |          | 否     |         |
rate_code_id        | smallint                 |          | 否     |         |
pickup_longitude    | double precision         |          | 否     |         |
pickup_latitude     | double precision         |          | 否     |         |
dropoff_longitude   | double precision         |          | 否     |         |
dropoff_latitude    | double precision         |          | 否     |         |
passenger_count     | smallint                 |          | 否     |         |
trip_distance       | double precision         |          | 否     |         |
fare_amount         | numeric(10,2)            |          | 否     |         |
extra               | numeric(10,2)            |          | 否     |         |
mta_tax             | numeric(10,2)            |          | 否     |         |
tip_amount          | numeric(10,2)            |          | 否     |         |
tolls_amount        | numeric(10,2)            |          | 否     |         |
ehail_fee           | numeric(10,2)            |          | 否     |         |
improvement_surcharge | numeric(10,2)            |          | 否     |         |
total_amount        | numeric(10,2)            |          | 否     |         |
payment_type        | text                     |          | 否     |         |
trip_type           | smallint                 |          | 否     |         |
pickup              | character varying(25)    |          | 否     |         |
dropoff             | character varying(25)    |          | 否     |         |
cab_type            | text                     |          | 否     |         |
pickup_nyct2010_gid | smallint                 |          | 否     |         |
pickup_ctlabel      | real                     |          | 否     |         |
pickup_borocode     | smallint                 |          | 否     |         |
pickup_ct2010       | text                     |          | 否     |         |
pickup_boroct2010   | text                     |          | 否     |         |
pickup_cdeligibil   | text                     |          | 否     |         |
pickup_ntacode      | character varying(4)     |          | 否     |         |
pickup_ntaname      | text                     |          | 否     |         |
pickup_puma         | integer                  |          | 否     |         |
dropoff_nyct2010_gid | smallint                 |          | 否     |         |
dropoff_ctlabel     | real                     |          | 否     |         |
dropoff_borocode    | smallint                 |          | 否     |         |
dropoff_ct2010      | text                     |          | 否     |         |
dropoff_boroct2010  | text                     |          | 否     |         |
dropoff_cdeligibil  | text                     |          | 否     |         |
dropoff_ntacode     | character varying(4)     |          | 否     |         |
dropoff_ntaname     | text                     |          | 否     |         |
dropoff_puma        | integer                  |          | 否     |         |
服务器:taxi_srv
FDW 选项:(database 'taxi', table_name 'trips', engine 'MergeTree')
执行查询
查询数据总行数,验证查询下推功能:

SELECT count(*) FROM taxi.trips;
  count  
---------
1999657
(1 行记录)
使用 EXPLAIN 查看查询计划(确认查询是否下推到 ClickHouse):

EXPLAIN select count(*) from taxi.trips;
                   查询计划                    
-------------------------------------------------
外部扫描  (cost=1.00..-0.90 rows=1 width=8)
   Relations: 聚合 on (trips)
(2 行记录)
若 “外部扫描(Foreign Scan)” 位于计划根节点,说明整个查询已下推至 ClickHouse 执行,仅返回结果给 PostgreSQL,性能最优。
数据分析
以下示例演示如何通过 pg_clickhouse 分析出租车数据,可直接执行或自定义 SQL 查询。

1. 计算平均小费金额
taxi=# \timing -- 开启计时功能
计时已开启。
taxi=# SELECT round(avg(tip_amount), 2) FROM taxi.trips;
round
-------
  1.68
(1 行记录)

时间:9.438 毫秒
2. 按乘客数量分组计算平均总费用
taxi=# SELECT
        passenger_count,
        avg(total_amount)::NUMERIC(10, 2) AS average_total_amount
    FROM taxi.trips
    GROUP BY passenger_count;
passenger_count | average_total_amount
-----------------+----------------------
               0 |                22.68
               1 |                15.96
               2 |                17.14
               3 |                16.75
               4 |                17.32
               5 |                16.34
               6 |                16.03
               7 |                59.79
               8 |                36.40
               9 |                 9.79
(10 行记录)

时间:27.266 毫秒
3. 按日期和社区分组统计每日接单量
taxi=# SELECT
    pickup_date,
    pickup_ntaname,
    SUM(1) AS number_of_trips
FROM taxi.trips
GROUP BY pickup_date, pickup_ntaname
ORDER BY pickup_date ASC LIMIT 10;
pickup_date |         pickup_ntaname         | number_of_trips
-------------+--------------------------------+-----------------
2015-07-01  | Williamsburg                   |               1
2015-07-01  | park-cemetery-etc-Queens       |               6
2015-07-01  | Maspeth                        |               1
2015-07-01  | Stuyvesant Town-Cooper Village |              44
2015-07-01  | Rego Park                      |               1
2015-07-01  | Greenpoint                     |               7
2015-07-01  | Highbridge                     |               1
2015-07-01  | Briarwood-Jamaica Hills        |               3
2015-07-01  | Airport                        |             550
2015-07-01  | East Harlem North              |              32
(10 行记录)

时间:30.978 毫秒
4. 按行程时长分组分析费用和乘客量
taxi=# SELECT
    avg(tip_amount) AS avg_tip,
    avg(fare_amount) AS avg_fare,
    avg(passenger_count) AS avg_passenger,
    count(*) AS count,
    round((date_part('epoch', dropoff_datetime) - date_part('epoch', pickup_datetime)) / 60) as trip_minutes
FROM taxi.trips
WHERE round((date_part('epoch', dropoff_datetime) - date_part('epoch', pickup_datetime)) / 60) > 0
GROUP BY trip_minutes
ORDER BY trip_minutes DESC
LIMIT 5;
      avg_tip      |     avg_fare     |  avg_passenger   | count | trip_minutes
-------------------+------------------+------------------+-------+--------------
              1.96 |                8 |                1 |     1 |        27512
                 0 |               12 |                2 |     1 |        27500
0.562727272727273 | 17.4545454545455 | 2.45454545454545 |    11 |         1440
0.716564885496183 | 14.2786259541985 | 1.94656488549618 |   131 |         1439
  1.00945205479452 | 12.8787671232877 | 1.98630136986301 |   146 |         1438
(5 行记录)

时间:45.477 毫秒
注:date_part('epoch', datetime) 用于将时间转换为 Unix 时间戳(秒数),再计算行程时长(分钟)。
5. 按社区和小时分组统计接单量
taxi=# SELECT
    pickup_ntaname,
    date_part('hour', pickup_datetime) as pickup_hour,
    SUM(1) AS pickups
FROM taxi.trips
WHERE pickup_ntaname != ''
GROUP BY pickup_ntaname, pickup_hour
ORDER BY pickup_ntaname, date_part('hour', pickup_datetime)
LIMIT 5;
pickup_ntaname | pickup_hour | pickups
----------------+-------------+---------
Airport        |           0 |    3509
Airport        |           1 |    1184
Airport        |           2 |     401
Airport        |           3 |     152
Airport        |           4 |     213
(5 行记录)

时间:36.895 毫秒
6. 查询前往拉瓜迪亚 / 肯尼迪机场的行程
taxi=# SELECT
    pickup_datetime,
    dropoff_datetime,
    total_amount,
    pickup_nyct2010_gid,
    dropoff_nyct2010_gid,
    CASE
        WHEN dropoff_nyct2010_gid = 138 THEN 'LGA' -- 拉瓜迪亚机场
        WHEN dropoff_nyct2010_gid = 132 THEN 'JFK' -- 肯尼迪机场
    END AS airport_code,
    EXTRACT(YEAR FROM pickup_datetime) AS year,
    EXTRACT(DAY FROM pickup_datetime) AS day,
    EXTRACT(HOUR FROM pickup_datetime) AS hour
FROM taxi.trips
WHERE dropoff_nyct2010_gid IN (132, 138)
ORDER BY pickup_datetime
LIMIT 5;
   pickup_datetime   |  dropoff_datetime   | total_amount | pickup_nyct2010_gid | dropoff_nyct2010_gid | airport_code | year | day | hour
---------------------+---------------------+--------------+---------------------+----------------------+--------------+------+-----+------
2015-07-01 00:04:14 | 2015-07-01 00:15:29 |        13.30 |                 -34 |                  132 | JFK          | 2015 |   1 |    0
2015-07-01 00:09:42 | 2015-07-01 00:12:55 |         6.80 |                  50 |                  138 | LGA          | 2015 |   1 |    0
2015-07-01 00:23:04 | 2015-07-01 00:24:39 |         4.80 |                -125 |                  132 | JFK          | 2015 |   1 |    0
2015-07-01 00:27:51 | 2015-07-01 00:39:02 |        14.72 |                -101 |                  138 | LGA          | 2015 |   1 |    0
2015-07-01 00:32:03 | 2015-07-01 00:55:39 |        39.34 |                  48 |                  138 | LGA          | 2015 |   1 |    0
(5 行记录)

时间:17.450 毫秒
创建字典
字典(Dictionary)是 ClickHouse 的特殊对象,用于存储维度数据(如地区映射表)。以下创建纽约市社区 - 行政区映射字典,并通过 pg_clickhouse 访问。

1. 创建 ClickHouse 字典
通过 clickhouse_raw_query 函数在 ClickHouse 中创建字典(数据来源于 S3 存储的 CSV 文件):

SELECT clickhouse_raw_query($$
    CREATE DICTIONARY taxi.taxi_zone_dictionary (
        LocationID Int64 DEFAULT 0,
        Borough String, -- 行政区(如 Manhattan、Brooklyn)
        zone String, -- 社区名称
        service_zone String -- 服务区域
    )
    PRIMARY KEY LocationID -- 主键(映射 trips 表的 gid 字段)
    SOURCE(HTTP(URL 'https://datasets-documentation.s3.eu-west-3.amazonaws.com/nyc-taxi/taxi_zone_lookup.csv' FORMAT 'CSVWithNames'))
    LIFETIME(MIN 0 MAX 0) -- 禁用自动更新(避免频繁访问 S3)
    LAYOUT(HASHED_ARRAY()) -- 字典存储结构
$$, 'host=localhost dbname=taxi'); -- 连接参数
字典数据说明:CSV 文件包含纽约市社区与行政区的映射关系,LocationID 对应 trips 表的 pickup_nyct2010_gid 和 dropoff_nyct2010_gid 字段。
2. 导入字典到 PostgreSQL
IMPORT FOREIGN SCHEMA taxi LIMIT TO (taxi_zone_dictionary)
FROM SERVER taxi_srv INTO taxi;
3. 验证字典查询
taxi=# SELECT * FROM taxi.taxi_zone_dictionary limit 3;
LocationID |  Borough  |                     Zone                      | service_zone
------------+-----------+-----------------------------------------------+--------------
         77 | Brooklyn  | East New York/Pennsylvania Avenue             | Boro Zone
        106 | Brooklyn  | Gowanus                                       | Boro Zone
        103 | Manhattan | Governor's Island/Ellis Island/Liberty Island | Yellow Zone
(3 行记录)
4. 使用字典查询(dictGet 函数)
通过 dictGet 函数(pg_clickhouse 自定义函数)关联字典数据,统计前往两大机场的行程按出发行政区分组的数量:

taxi=# SELECT
        count(1) AS total,
        COALESCE(NULLIF(dictGet(
            'taxi.taxi_zone_dictionary', 'Borough',
            toUInt64(pickup_nyct2010_gid) -- 转换为字典主键类型
        ), ''), 'Unknown') AS borough_name
    FROM taxi.trips
    WHERE dropoff_nyct2010_gid = 132 OR dropoff_nyct2010_gid = 138 -- 目的地为 JFK/LGA 机场
    GROUP BY borough_name
    ORDER BY total DESC;
total | borough_name  
-------+---------------
23683 | Unknown
  7053 | Manhattan
  6828 | Brooklyn
  4458 | Queens
  2670 | Bronx
   554 | Staten Island
    53 | EWR
(7 行记录)

时间:66.245 毫秒
dictGet(dict_name, col_name, key):从字典中根据主键 key 查询指定列 col_name 的值。
执行关联查询(JOIN)
将 trips 表与字典表关联,实现更复杂的分析。

1. 简单关联查询(替代 dictGet)
taxi=# SELECT
        count(1) AS total,
        "Borough" AS borough_name
    FROM taxi.trips
    JOIN taxi.taxi_zone_dictionary
      ON trips.pickup_nyct2010_gid = toUInt64(taxi.taxi_zone_dictionary."LocationID")
    WHERE pickup_nyct2010_gid > 0 -- 过滤无效 gid
      AND dropoff_nyct2010_gid IN (132, 138) -- 目的地为机场
    GROUP BY "Borough"
    ORDER BY total DESC;
total | borough_name  
-------+---------------
  7053 | Manhattan
  6828 | Brooklyn
  4458 | Queens
  2670 | Bronx
   554 | Staten Island
    53 | EWR
(6 行记录)

时间:48.449 毫秒
说明:该查询结果与前文 dictGet 示例一致(不含 Unknown 行),底层 ClickHouse 会自动优化为字典查询,JOIN 语法更符合 SQL 开发者习惯。
2. 查看关联查询计划
taxi=# explain SELECT
        count(1) AS total,
        "Borough"
    FROM taxi.trips
    JOIN taxi.taxi_zone_dictionary
      ON trips.pickup_nyct2010_gid = toUInt64(taxi.taxi_zone_dictionary."LocationID")
    WHERE pickup_nyct2010_gid > 0
      AND dropoff_nyct2010_gid IN (132, 138)
    GROUP BY "Borough"
    ORDER BY total DESC;
                              查询计划                              
-----------------------------------------------------------------------
外部扫描  (cost=1.00..5.10 rows=1000 width=40)
   Relations: 聚合 on ((trips) INNER JOIN (taxi_zone_dictionary))
(2 行记录)
时间:2.012 毫秒
结果表明:关联查询已完全下推至 ClickHouse 执行,PostgreSQL 仅接收最终结果。
3. 复杂关联查询(筛选高小费行程)
查询小费金额前 1000 的行程,并关联字典获取下车点行政区信息:

taxi=# SELECT *
FROM taxi.trips
JOIN taxi.taxi_zone_dictionary
    ON trips.dropoff_nyct2010_gid = taxi.taxi_zone_dictionary."LocationID"
WHERE tip_amount > 0
ORDER BY tip_amount DESC
LIMIT 1000;
注意事项
避免使用 SELECT *:查询时应明确指定所需列,减少数据传输量,提升性能。
数据类型兼容性:关联查询时需确保关联字段类型一致(如使用 toUInt64 转换类型),避免类型不匹配导致查询失败。
字典与表关联:ClickHouse 字典本质是内存中的维度表,关联查询性能优于普通表 JOIN,适合高频访问的维度数据。






欢迎光临 重庆思庄Oracle、KingBase、PostgreSQL、Redhat认证学习论坛 (http://bbs.cqsztech.com/) Powered by Discuz! X3.2