diff --git a/.ci/docker-compose-file/clickhouse/config.xml b/.ci/docker-compose-file/clickhouse/config.xml
new file mode 100644
index 000000000..085f92a12
--- /dev/null
+++ b/.ci/docker-compose-file/clickhouse/config.xml
@@ -0,0 +1,678 @@
+
+
+
+
+
+ trace
+ /var/log/clickhouse-server/clickhouse-server.log
+ /var/log/clickhouse-server/clickhouse-server.err.log
+ 1000M
+ 10
+
+
+
+
+
+
+
+
+
+
+
+
+
+ false
+
+ false
+
+
+ https://6f33034cfe684dd7a3ab9875e57b1c8d@o388870.ingest.sentry.io/5226277
+
+
+
+ 8123
+ 9000
+ 9004
+
+
+
+
+
+
+ /etc/clickhouse-server/server.crt
+ /etc/clickhouse-server/server.key
+
+ /etc/clickhouse-server/dhparam.pem
+ none
+ true
+ true
+ sslv2,sslv3
+ true
+
+
+
+ true
+ true
+ sslv2,sslv3
+ true
+
+
+
+ RejectCertificateHandler
+
+
+
+
+
+
+
+
+ 9009
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 4096
+ 3
+
+
+ 100
+
+
+ 0
+
+
+
+ 10000
+
+
+ 10
+
+
+ 4194304
+
+
+ 0
+
+
+
+
+
+ 8589934592
+
+
+ 5368709120
+
+
+
+ /var/lib/clickhouse/
+
+
+ /var/lib/clickhouse/tmp/
+
+
+
+
+
+ /var/lib/clickhouse/user_files/
+
+
+ /var/lib/clickhouse/access/
+
+
+ /etc/clickhouse-server/users.xml
+
+
+ default
+
+
+
+
+
+ default
+
+
+
+
+
+
+
+
+ true
+
+
+
+
+
+
+
+
+
+
+
+ localhost
+ 9000
+
+
+
+
+
+
+
+
+ localhost
+ 9000
+
+
+
+
+ localhost
+ 9000
+
+
+
+
+
+
+ 127.0.0.1
+ 9000
+
+
+
+
+ 127.0.0.2
+ 9000
+
+
+
+
+
+
+ localhost
+ 9440
+ 1
+
+
+
+
+
+
+ localhost
+ 9000
+
+
+
+
+ localhost
+ 1
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 3600
+
+
+
+ 3600
+
+
+ 60
+
+
+
+
+
+
+
+
+
+
+
+
+ system
+
+
+ toYYYYMM(event_date)
+
+
+
+
+ 7500
+
+
+
+
+ system
+
+
+ toYYYYMM(event_date)
+ 7500
+
+
+
+
+ system
+
+ toYYYYMM(event_date)
+ 7500
+
+
+
+
+
+
+
+
+ system
+
+ 7500
+ 1000
+
+
+
+
+ system
+
+
+ 60000
+
+
+
+
+
+
+
+
+
+
+
+
+ *_dictionary.xml
+
+
+
+
+
+
+
+
+
+ /clickhouse/task_queue/ddl
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ click_cost
+ any
+
+ 0
+ 3600
+
+
+ 86400
+ 60
+
+
+
+ max
+
+ 0
+ 60
+
+
+ 3600
+ 300
+
+
+ 86400
+ 3600
+
+
+
+
+
+ /var/lib/clickhouse/format_schemas/
+
+
+
+
+
+
+
+
diff --git a/.ci/docker-compose-file/clickhouse/users.xml b/.ci/docker-compose-file/clickhouse/users.xml
new file mode 100644
index 000000000..ced773355
--- /dev/null
+++ b/.ci/docker-compose-file/clickhouse/users.xml
@@ -0,0 +1,110 @@
+
+
+
+
+
+
+
+ 10000000000
+
+
+ 0
+
+
+ random
+
+
+
+
+ 1
+
+
+
+
+
+
+
+
+ public
+
+
+
+ ::/0
+
+
+
+ default
+
+
+ default
+
+
+
+
+
+
+
+
+
+
+
+
+
+ 3600
+
+
+ 0
+ 0
+ 0
+ 0
+ 0
+
+
+
+
diff --git a/.ci/docker-compose-file/docker-compose-clickhouse.yaml b/.ci/docker-compose-file/docker-compose-clickhouse.yaml
new file mode 100644
index 000000000..118f83dc1
--- /dev/null
+++ b/.ci/docker-compose-file/docker-compose-clickhouse.yaml
@@ -0,0 +1,16 @@
+version: '3.9'
+
+services:
+ clickhouse:
+ container_name: clickhouse
+ image: clickhouse/clickhouse-server:23.1.2.9-alpine
+ restart: always
+ volumes:
+ - ./clickhouse/users.xml:/etc/clickhouse-server/users.xml
+ - ./clickhouse/config.xml:/etc/clickhouse-server/config.d/config.xml
+ expose:
+ - "8123"
+ ports:
+ - "8123:8123"
+ networks:
+ - emqx_bridge
diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src
index 0d4b552ee..0ec246320 100644
--- a/apps/emqx_bridge/src/emqx_bridge.app.src
+++ b/apps/emqx_bridge/src/emqx_bridge.app.src
@@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge, [
{description, "EMQX bridges"},
- {vsn, "0.1.11"},
+ {vsn, "0.1.12"},
{registered, []},
{mod, {emqx_bridge_app, []}},
{applications, [
diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl
index dc0a96690..196338336 100644
--- a/apps/emqx_bridge/src/emqx_bridge.erl
+++ b/apps/emqx_bridge/src/emqx_bridge.erl
@@ -57,7 +57,8 @@
T == influxdb_api_v2;
T == redis_single;
T == redis_sentinel;
- T == redis_cluster
+ T == redis_cluster;
+ T == clickhouse
).
load() ->
diff --git a/apps/emqx_connector/include/emqx_connector.hrl b/apps/emqx_connector/include/emqx_connector.hrl
index 82c946cfc..cdb6ddd92 100644
--- a/apps/emqx_connector/include/emqx_connector.hrl
+++ b/apps/emqx_connector/include/emqx_connector.hrl
@@ -23,6 +23,7 @@
-define(MONGO_DEFAULT_PORT, 27017).
-define(REDIS_DEFAULT_PORT, 6379).
-define(PGSQL_DEFAULT_PORT, 5432).
+-define(CLICKHOUSE_DEFAULT_PORT, 8123).
-define(AUTO_RECONNECT_INTERVAL, 2).
diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl
index a8ae4454d..6ddfb5af2 100644
--- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl
+++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl
@@ -917,6 +917,15 @@ do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) ->
%% return `{error, {recoverable_error, Reason}}`
EXPR
catch
+ %% For convenience and to make the code in the callbacks cleaner an
+ %% error exception with the two following formats are translated to the
+ %% corresponding return values. The receiver of the return values
+ %% recognizes these special return formats and use them to decided if a
+ %% request should be retried.
+ error:{unrecoverable_error, Msg} ->
+ {error, {unrecoverable_error, Msg}};
+ error:{recoverable_error, Msg} ->
+ {error, {recoverable_error, Msg}};
ERR:REASON:STACKTRACE ->
?RESOURCE_ERROR(exception, #{
name => NAME,
diff --git a/lib-ee/emqx_ee_bridge/docker-ct b/lib-ee/emqx_ee_bridge/docker-ct
index 967faa343..91a937b5c 100644
--- a/lib-ee/emqx_ee_bridge/docker-ct
+++ b/lib-ee/emqx_ee_bridge/docker-ct
@@ -8,3 +8,4 @@ redis
redis_cluster
pgsql
tdengine
+clickhouse
diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_clickhouse.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_clickhouse.conf
new file mode 100644
index 000000000..c38365d81
--- /dev/null
+++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_clickhouse.conf
@@ -0,0 +1,109 @@
+emqx_ee_bridge_clickhouse {
+
+ local_topic {
+ desc {
+ en: """The MQTT topic filter to be forwarded to Clickhouse. All MQTT 'PUBLISH' messages with the topic
+matching the local_topic will be forwarded.
+NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is
+configured, then both the data got from the rule and the MQTT messages that match local_topic
+will be forwarded.
+"""
+ zh: """发送到 'local_topic' 的消息都会转发到 Clickhouse。
+注意:如果这个 Bridge 被用作规则(EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发。
+"""
+ }
+ label {
+ en: "Local Topic"
+ zh: "本地 Topic"
+ }
+ }
+ sql_template {
+ desc {
+ en: """SQL Template. The template string can contain placeholders
+for message metadata and payload field. The placeholders are inserted
+without any checking and special formatting so it is important to
+ensure that the inserted values are formatted and escaped correctly."""
+ zh:
+ """SQL模板。模板字符串可以包含消息元数据和有效载荷字段的占位符。占位符
+的插入不需要任何检查和特殊格式化,因此必须确保插入的数值格式化和转义正确。模板字符串可以包含占位符
+模板字符串可以包含消息元数据和有效载荷字段的占位符。这些占位符被插入
+所以必须确保插入的值的格式正确。因此,确保插入的值格式化和转义正确是非常重要的。模板字符串可以包含占位符
+模板字符串可以包含消息元数据和有效载荷字段的占位符。这些占位符被插入
+所以必须确保插入的值的格式正确。确保插入的值被正确地格式化和转义。"""
+ }
+ label {
+ en: "SQL Template"
+ zh: "SQL 模板"
+ }
+ }
+ batch_value_separator {
+ desc {
+ en: """The bridge repeats what comes after the VALUES or FORMAT FormatType in the
+SQL template to form a batch request. The value specified with
+this parameter will be inserted between the values. The default
+value ',' works for the VALUES format but other values
+might be needed if you specify some other format with the
+clickhouse FORMAT syntax.
+
+See https://clickhouse.com/docs/en/sql-reference/statements/insert-into/ and
+https://clickhouse.com/docs/en/interfaces/formats#formats for more information about
+the format syntax and the available formats."""
+ zh: """桥接会重复 VALUES 或 FORMAT 格式类型之后的内容。中 VALUES 或
+FORMAT FormatType 后面的内容,以形成一个批处理请求。用这个参数指定的值
+这个参数指定的值将被插入到这些值之间。默认的
+默认值','适用于VALUES格式,但是如果你指定了其他的格式,可能需要其他的值。可能需要其他值,如果你用
+"clickhouse FORMAT "语法指定其他格式。语法指定其他格式。
+
+参见https://clickhouse.com/docs/en/sql-reference/statements/insert-into/ 和
+https://clickhouse.com/docs/en/interfaces/formats#formats 了解更多关于
+格式语法和可用的格式。"""
+ }
+ label {
+ en: "Batch Value Separator"
+ zh: "批量值分离器"
+ }
+ }
+ config_enable {
+ desc {
+ en: """Enable or disable this bridge"""
+ zh: """启用/禁用桥接"""
+ }
+ label {
+ en: "Enable Or Disable Bridge"
+ zh: "启用/禁用桥接"
+ }
+ }
+
+ desc_config {
+ desc {
+ en: """Configuration for an Clickhouse bridge."""
+ zh: """Clickhouse 桥接配置"""
+ }
+ label: {
+ en: "Clickhouse Bridge Configuration"
+ zh: "Clickhouse 桥接配置"
+ }
+ }
+
+ desc_type {
+ desc {
+ en: """The Bridge Type"""
+ zh: """Bridge 类型"""
+ }
+ label {
+ en: "Bridge Type"
+ zh: "桥接类型"
+ }
+ }
+
+ desc_name {
+ desc {
+ en: """Bridge name."""
+ zh: """桥接名字"""
+ }
+ label {
+ en: "Bridge Name"
+ zh: "桥接名字"
+ }
+ }
+}
diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl
index 1a358fdfe..dc7ee2be6 100644
--- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl
+++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl
@@ -29,7 +29,8 @@ api_schemas(Method) ->
ref(emqx_ee_bridge_redis, Method ++ "_cluster"),
ref(emqx_ee_bridge_timescale, Method),
ref(emqx_ee_bridge_matrix, Method),
- ref(emqx_ee_bridge_tdengine, Method)
+ ref(emqx_ee_bridge_tdengine, Method),
+ ref(emqx_ee_bridge_clickhouse, Method)
].
schema_modules() ->
@@ -44,7 +45,8 @@ schema_modules() ->
emqx_ee_bridge_pgsql,
emqx_ee_bridge_timescale,
emqx_ee_bridge_matrix,
- emqx_ee_bridge_tdengine
+ emqx_ee_bridge_tdengine,
+ emqx_ee_bridge_clickhouse
].
examples(Method) ->
@@ -75,7 +77,8 @@ resource_type(redis_cluster) -> emqx_ee_connector_redis;
resource_type(pgsql) -> emqx_connector_pgsql;
resource_type(timescale) -> emqx_connector_pgsql;
resource_type(matrix) -> emqx_connector_pgsql;
-resource_type(tdengine) -> emqx_ee_connector_tdengine.
+resource_type(tdengine) -> emqx_ee_connector_tdengine;
+resource_type(clickhouse) -> emqx_ee_connector_clickhouse.
fields(bridges) ->
[
@@ -119,7 +122,8 @@ fields(bridges) ->
required => false
}
)}
- ] ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++ pgsql_structs().
+ ] ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++ pgsql_structs() ++
+ clickhouse_structs().
mongodb_structs() ->
[
@@ -183,3 +187,18 @@ pgsql_structs() ->
{matrix, <<"Matrix">>}
]
].
+
+clickhouse_structs() ->
+ [
+ {Type,
+ mk(
+ hoconsc:map(name, ref(emqx_ee_bridge_clickhouse, "config")),
+ #{
+ desc => <>,
+ required => false
+ }
+ )}
+ || {Type, Name} <- [
+ {clickhouse, <<"Clickhouse">>}
+ ]
+ ].
diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_clickhouse.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_clickhouse.erl
new file mode 100644
index 000000000..9e03aca4a
--- /dev/null
+++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_clickhouse.erl
@@ -0,0 +1,143 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+-module(emqx_ee_bridge_clickhouse).
+
+-include_lib("emqx_bridge/include/emqx_bridge.hrl").
+-include_lib("typerefl/include/types.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+
+-import(hoconsc, [mk/2, enum/1, ref/2]).
+
+-export([
+ conn_bridge_examples/1
+]).
+
+-export([
+ namespace/0,
+ roots/0,
+ fields/1,
+ desc/1
+]).
+
+-define(DEFAULT_SQL,
+ <<"INSERT INTO mqtt_test(payload, arrived) VALUES ('${payload}', ${timestamp})">>
+).
+
+-define(DEFAULT_BATCH_VALUE_SEPARATOR, <<", ">>).
+
+%% -------------------------------------------------------------------------------------------------
+%% Callback used by HTTP API
+%% -------------------------------------------------------------------------------------------------
+
+conn_bridge_examples(Method) ->
+ [
+ #{
+ <<"clickhouse">> => #{
+ summary => <<"Clickhouse Bridge">>,
+ value => values(Method, "clickhouse")
+ }
+ }
+ ].
+
+values(get, Type) ->
+ maps:merge(values(post, Type), ?METRICS_EXAMPLE);
+values(post, Type) ->
+ #{
+ enable => true,
+ type => Type,
+ name => <<"foo">>,
+ server => <<"127.0.0.1:8123">>,
+ database => <<"mqtt">>,
+ pool_size => 8,
+ username => <<"default">>,
+ password => <<"public">>,
+ sql => ?DEFAULT_SQL,
+ batch_value_separator => ?DEFAULT_BATCH_VALUE_SEPARATOR,
+ local_topic => <<"local/topic/#">>,
+ resource_opts => #{
+ worker_pool_size => 8,
+ health_check_interval => ?HEALTHCHECK_INTERVAL_RAW,
+ auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW,
+ batch_size => ?DEFAULT_BATCH_SIZE,
+ batch_time => ?DEFAULT_BATCH_TIME,
+ query_mode => async,
+ max_queue_bytes => ?DEFAULT_QUEUE_SIZE
+ }
+ };
+values(put, Type) ->
+ values(post, Type).
+
+%% -------------------------------------------------------------------------------------------------
+%% Hocon Schema Definitions
+%% -------------------------------------------------------------------------------------------------
+
+namespace() -> "bridge_clickhouse".
+
+roots() -> [].
+
+fields("config") ->
+ [
+ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
+ {sql,
+ mk(
+ binary(),
+ #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>}
+ )},
+ {batch_value_separator,
+ mk(
+ binary(),
+ #{desc => ?DESC("batch_value_separator"), default => ?DEFAULT_BATCH_VALUE_SEPARATOR}
+ )},
+ {local_topic,
+ mk(
+ binary(),
+ #{desc => ?DESC("local_topic"), default => undefined}
+ )},
+ {resource_opts,
+ mk(
+ ref(?MODULE, "creation_opts"),
+ #{
+ required => false,
+ default => #{},
+ desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
+ }
+ )}
+ ] ++
+ emqx_ee_connector_clickhouse:fields(config);
+fields("creation_opts") ->
+ Opts = emqx_resource_schema:fields("creation_opts"),
+ [O || {Field, _} = O <- Opts, not is_hidden_opts(Field)];
+fields("post") ->
+ fields("post", clickhouse);
+fields("put") ->
+ fields("config");
+fields("get") ->
+ emqx_bridge_schema:status_fields() ++ fields("post").
+
+fields("post", Type) ->
+ [type_field(Type), name_field() | fields("config")].
+
+desc("config") ->
+ ?DESC("desc_config");
+desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
+ ["Configuration for Clickhouse using `", string:to_upper(Method), "` method."];
+desc("creation_opts" = Name) ->
+ emqx_resource_schema:desc(Name);
+desc(_) ->
+ undefined.
+
+%% -------------------------------------------------------------------------------------------------
+%% internal
+%% -------------------------------------------------------------------------------------------------
+is_hidden_opts(Field) ->
+ lists:member(Field, [
+ async_inflight_window
+ ]).
+
+type_field(Type) ->
+ {type, mk(enum([Type]), #{required => true, desc => ?DESC("desc_type")})}.
+
+name_field() ->
+ {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_clickhouse_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_clickhouse_SUITE.erl
new file mode 100644
index 000000000..6d4762882
--- /dev/null
+++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_clickhouse_SUITE.erl
@@ -0,0 +1,325 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%--------------------------------------------------------------------
+
+-module(emqx_ee_bridge_clickhouse_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-define(CLICKHOUSE_HOST, "clickhouse").
+-define(CLICKHOUSE_RESOURCE_MOD, emqx_ee_connector_clickhouse).
+-include_lib("emqx_connector/include/emqx_connector.hrl").
+
+%% See comment in
+%% lib-ee/emqx_ee_connector/test/ee_connector_clickhouse_SUITE.erl for how to
+%% run this without bringing up the whole CI infrastucture
+
+%%------------------------------------------------------------------------------
+%% Common Test Setup, Teardown and Testcase List
+%%------------------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ case
+ emqx_common_test_helpers:is_tcp_server_available(?CLICKHOUSE_HOST, ?CLICKHOUSE_DEFAULT_PORT)
+ of
+ true ->
+ emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
+ ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
+ ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
+ {ok, _} = application:ensure_all_started(emqx_connector),
+ {ok, _} = application:ensure_all_started(emqx_ee_connector),
+ {ok, _} = application:ensure_all_started(emqx_ee_bridge),
+ snabbkaffe:fix_ct_logging(),
+ %% Create the db table
+ Conn = start_clickhouse_connection(),
+ % erlang:monitor,sb
+ {ok, _, _} = clickhouse:query(Conn, sql_create_database(), #{}),
+ {ok, _, _} = clickhouse:query(Conn, sql_create_table(), []),
+ clickhouse:query(Conn, sql_find_key(42), []),
+ [{clickhouse_connection, Conn} | Config];
+ false ->
+ case os:getenv("IS_CI") of
+ "yes" ->
+ throw(no_clickhouse);
+ _ ->
+ {skip, no_clickhouse}
+ end
+ end.
+
+start_clickhouse_connection() ->
+ %% Start clickhouse connector in sub process so that it does not go
+ %% down with the process that is calling init_per_suite
+ InitPerSuiteProcess = self(),
+ erlang:spawn(
+ fun() ->
+ {ok, Conn} =
+ clickhouse:start_link([
+ {url, clickhouse_url()},
+ {user, <<"default">>},
+ {key, "public"},
+ {pool, tmp_pool}
+ ]),
+ InitPerSuiteProcess ! {clickhouse_connection, Conn},
+ Ref = erlang:monitor(process, Conn),
+ receive
+ {'DOWN', Ref, process, _, _} ->
+ erlang:display(helper_down),
+ ok
+ end
+ end
+ ),
+ receive
+ {clickhouse_connection, C} -> C
+ end.
+
+end_per_suite(Config) ->
+ ClickhouseConnection = proplists:get_value(clickhouse_connection, Config),
+ clickhouse:stop(ClickhouseConnection),
+ ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
+ ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
+ _ = application:stop(emqx_connector),
+ _ = application:stop(emqx_ee_connector),
+ _ = application:stop(emqx_bridge).
+
+init_per_testcase(_, Config) ->
+ reset_table(Config),
+ Config.
+
+end_per_testcase(_, Config) ->
+ reset_table(Config),
+ ok.
+
+all() ->
+ emqx_common_test_helpers:all(?MODULE).
+
+%%------------------------------------------------------------------------------
+%% Helper functions for test cases
+%%------------------------------------------------------------------------------
+
+sql_insert_template_for_bridge() ->
+ "INSERT INTO mqtt_test(key, data, arrived) VALUES "
+ "(${key}, '${data}', ${timestamp})".
+
+sql_insert_template_for_bridge_json() ->
+ "INSERT INTO mqtt_test(key, data, arrived) FORMAT JSONCompactEachRow "
+ "[${key}, \\\"${data}\\\", ${timestamp}]".
+
+sql_create_table() ->
+ "CREATE TABLE IF NOT EXISTS mqtt.mqtt_test (key BIGINT, data String, arrived BIGINT) ENGINE = Memory".
+
+sql_find_key(Key) ->
+ io_lib:format("SELECT key FROM mqtt.mqtt_test WHERE key = ~p", [Key]).
+
+sql_find_all_keys() ->
+ "SELECT key FROM mqtt.mqtt_test".
+
+sql_drop_table() ->
+ "DROP TABLE IF EXISTS mqtt.mqtt_test".
+
+sql_create_database() ->
+ "CREATE DATABASE IF NOT EXISTS mqtt".
+
+clickhouse_url() ->
+ erlang:iolist_to_binary([
+ <<"http://">>,
+ ?CLICKHOUSE_HOST,
+ ":",
+ erlang:integer_to_list(?CLICKHOUSE_DEFAULT_PORT)
+ ]).
+
+clickhouse_config(Config) ->
+ SQL = maps:get(sql, Config, sql_insert_template_for_bridge()),
+ BatchSeparator = maps:get(batch_value_separator, Config, <<", ">>),
+ BatchSize = maps:get(batch_size, Config, 1),
+ BatchTime = maps:get(batch_time_ms, Config, 0),
+ EnableBatch = maps:get(enable_batch, Config, true),
+ Name = atom_to_binary(?MODULE),
+ URL = clickhouse_url(),
+ ConfigString =
+ io_lib:format(
+ "bridges.clickhouse.~s {\n"
+ " enable = true\n"
+ " url = \"~s\"\n"
+ " database = \"mqtt\"\n"
+ " sql = \"~s\"\n"
+ " batch_value_separator = \"~s\""
+ " resource_opts = {\n"
+ " enable_batch = ~w\n"
+ " batch_size = ~b\n"
+ " batch_time = ~bms\n"
+ " }\n"
+ "}\n",
+ [
+ Name,
+ URL,
+ SQL,
+ BatchSeparator,
+ EnableBatch,
+ BatchSize,
+ BatchTime
+ ]
+ ),
+ ct:pal(ConfigString),
+ parse_and_check(ConfigString, <<"clickhouse">>, Name).
+
+parse_and_check(ConfigString, BridgeType, Name) ->
+ {ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
+ hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
+ #{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf,
+ RetConfig.
+
+make_bridge(Config) ->
+ Type = <<"clickhouse">>,
+ Name = atom_to_binary(?MODULE),
+ BridgeConfig = clickhouse_config(Config),
+ {ok, _} = emqx_bridge:create(
+ Type,
+ Name,
+ BridgeConfig
+ ),
+ emqx_bridge_resource:bridge_id(Type, Name).
+
+delete_bridge() ->
+ Type = <<"clickhouse">>,
+ Name = atom_to_binary(?MODULE),
+ {ok, _} = emqx_bridge:remove(Type, Name),
+ ok.
+
+reset_table(Config) ->
+ ClickhouseConnection = proplists:get_value(clickhouse_connection, Config),
+ {ok, _, _} = clickhouse:query(ClickhouseConnection, sql_drop_table(), []),
+ {ok, _, _} = clickhouse:query(ClickhouseConnection, sql_create_table(), []),
+ ok.
+
+check_key_in_clickhouse(AttempsLeft, Key, Config) ->
+ ClickhouseConnection = proplists:get_value(clickhouse_connection, Config),
+ check_key_in_clickhouse(AttempsLeft, Key, none, ClickhouseConnection).
+
+check_key_in_clickhouse(Key, Config) ->
+ ClickhouseConnection = proplists:get_value(clickhouse_connection, Config),
+ check_key_in_clickhouse(30, Key, none, ClickhouseConnection).
+
+check_key_in_clickhouse(0, Key, PrevResult, _) ->
+ ct:fail("Expected ~p in database but got ~s", [Key, PrevResult]);
+check_key_in_clickhouse(AttempsLeft, Key, _, ClickhouseConnection) ->
+ {ok, 200, ResultString} = clickhouse:query(ClickhouseConnection, sql_find_key(Key), []),
+ Expected = erlang:integer_to_binary(Key),
+ case iolist_to_binary(string:trim(ResultString)) of
+ Expected ->
+ ok;
+ SomethingElse ->
+ timer:sleep(100),
+ check_key_in_clickhouse(AttempsLeft - 1, Key, SomethingElse, ClickhouseConnection)
+ end.
+
+%%------------------------------------------------------------------------------
+%% Test Cases
+%%------------------------------------------------------------------------------
+
+t_make_delete_bridge(_Config) ->
+ make_bridge(#{}),
+ %% Check that the new brige is in the list of bridges
+ Bridges = emqx_bridge:list(),
+ Name = atom_to_binary(?MODULE),
+ IsRightName =
+ fun
+ (#{name := BName}) when BName =:= Name ->
+ true;
+ (_) ->
+ false
+ end,
+ true = lists:any(IsRightName, Bridges),
+ delete_bridge(),
+ BridgesAfterDelete = emqx_bridge:list(),
+ false = lists:any(IsRightName, BridgesAfterDelete),
+ ok.
+
+t_send_message_query(Config) ->
+ BridgeID = make_bridge(#{enable_batch => false}),
+ Key = 42,
+ Payload = #{key => Key, data => <<"clickhouse_data">>, timestamp => 10000},
+ %% This will use the SQL template included in the bridge
+ emqx_bridge:send_message(BridgeID, Payload),
+ %% Check that the data got to the database
+ check_key_in_clickhouse(Key, Config),
+ delete_bridge(),
+ ok.
+
+t_send_simple_batch(Config) ->
+ send_simple_batch_helper(Config, #{}).
+
+t_send_simple_batch_alternative_format(Config) ->
+ send_simple_batch_helper(
+ Config,
+ #{
+ sql => sql_insert_template_for_bridge_json(),
+ batch_value_separator => <<"">>
+ }
+ ).
+
+send_simple_batch_helper(Config, BridgeConfigExt) ->
+ BridgeConf = maps:merge(
+ #{
+ batch_size => 100,
+ enable_batch => true
+ },
+ BridgeConfigExt
+ ),
+ BridgeID = make_bridge(BridgeConf),
+ Key = 42,
+ Payload = #{key => Key, data => <<"clickhouse_data">>, timestamp => 10000},
+ %% This will use the SQL template included in the bridge
+ emqx_bridge:send_message(BridgeID, Payload),
+ check_key_in_clickhouse(Key, Config),
+ delete_bridge(),
+ ok.
+
+t_heavy_batching(Config) ->
+ heavy_batching_helper(Config, #{}).
+
+t_heavy_batching_alternative_format(Config) ->
+ heavy_batching_helper(
+ Config,
+ #{
+ sql => sql_insert_template_for_bridge_json(),
+ batch_value_separator => <<"">>
+ }
+ ).
+
+heavy_batching_helper(Config, BridgeConfigExt) ->
+ ClickhouseConnection = proplists:get_value(clickhouse_connection, Config),
+ NumberOfMessages = 10000,
+ BridgeConf = maps:merge(
+ #{
+ batch_size => 743,
+ batch_time_ms => 50,
+ enable_batch => true
+ },
+ BridgeConfigExt
+ ),
+ BridgeID = make_bridge(BridgeConf),
+ SendMessageKey = fun(Key) ->
+ Payload = #{
+ key => Key,
+ data => <<"clickhouse_data">>,
+ timestamp => 10000
+ },
+ emqx_bridge:send_message(BridgeID, Payload)
+ end,
+ [SendMessageKey(Key) || Key <- lists:seq(1, NumberOfMessages)],
+ % Wait until the last message is in clickhouse
+ %% The delay between attempts is 100ms so 150 attempts means 15 seconds
+ check_key_in_clickhouse(_AttemptsToFindKey = 150, NumberOfMessages, Config),
+ %% In case the messages are not sent in order (could happend with multiple buffer workers)
+ timer:sleep(1000),
+ {ok, 200, ResultString1} = clickhouse:query(ClickhouseConnection, sql_find_all_keys(), []),
+ ResultString2 = iolist_to_binary(string:trim(ResultString1)),
+ KeyStrings = string:lexemes(ResultString2, "\n"),
+ Keys = [erlang:binary_to_integer(iolist_to_binary(K)) || K <- KeyStrings],
+ KeySet = maps:from_keys(Keys, true),
+ NumberOfMessages = maps:size(KeySet),
+ CheckKey = fun(Key) -> maps:get(Key, KeySet, false) end,
+ true = lists:all(CheckKey, lists:seq(1, NumberOfMessages)),
+ delete_bridge(),
+ ok.
diff --git a/lib-ee/emqx_ee_connector/docker-ct b/lib-ee/emqx_ee_connector/docker-ct
index ef579c036..3db090939 100644
--- a/lib-ee/emqx_ee_connector/docker-ct
+++ b/lib-ee/emqx_ee_connector/docker-ct
@@ -1,2 +1,3 @@
toxiproxy
influxdb
+clickhouse
diff --git a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_clickhouse.conf b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_clickhouse.conf
new file mode 100644
index 000000000..1e07c29b4
--- /dev/null
+++ b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_clickhouse.conf
@@ -0,0 +1,15 @@
+
+emqx_ee_connector_clickhouse {
+
+ base_url {
+ desc {
+ en: """The HTTP URL to the Clickhouse server that you want to connect to (for example http://myhostname:8123)"""
+ zh: """你想连接到的Clickhouse服务器的HTTP URL(例如http://myhostname:8123)。"""
+ }
+ label: {
+ en: "URL to clickhouse server"
+ zh: "到clickhouse服务器的URL"
+ }
+ }
+
+}
diff --git a/lib-ee/emqx_ee_connector/rebar.config b/lib-ee/emqx_ee_connector/rebar.config
index d758e1424..bcf9508bf 100644
--- a/lib-ee/emqx_ee_connector/rebar.config
+++ b/lib-ee/emqx_ee_connector/rebar.config
@@ -3,6 +3,7 @@
{hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}},
{influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.9"}}},
{tdengine, {git, "https://github.com/emqx/tdengine-client-erl", {tag, "0.1.5"}}},
+ {clickhouse, {git, "https://github.com/emqx/clickhouse-client-erl", {tag, "0.2"}}},
{emqx, {path, "../../apps/emqx"}}
]}.
diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src
index 5017abd21..6c9d83bc7 100644
--- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src
+++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src
@@ -9,7 +9,8 @@
influxdb,
tdengine,
wolff,
- brod
+ brod,
+ clickhouse
]},
{env, []},
{modules, []},
diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_clickhouse.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_clickhouse.erl
new file mode 100644
index 000000000..b1ad6c787
--- /dev/null
+++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_clickhouse.erl
@@ -0,0 +1,444 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_ee_connector_clickhouse).
+
+-include_lib("emqx_connector/include/emqx_connector.hrl").
+-include_lib("emqx_resource/include/emqx_resource.hrl").
+-include_lib("typerefl/include/types.hrl").
+-include_lib("emqx/include/logger.hrl").
+-include_lib("hocon/include/hoconsc.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
+
+-behaviour(emqx_resource).
+
+-import(hoconsc, [mk/2, enum/1, ref/2]).
+
+%%=====================================================================
+%% Exports
+%%=====================================================================
+
+%% Hocon config schema exports
+-export([
+ roots/0,
+ fields/1,
+ values/1
+]).
+
+%% callbacks for behaviour emqx_resource
+-export([
+ callback_mode/0,
+ on_start/2,
+ on_stop/2,
+ on_query/3,
+ on_batch_query/3,
+ on_get_status/2
+]).
+
+%% callbacks for ecpool
+-export([connect/1]).
+
+%% Internal exports used to execute code with ecpool worker
+-export([
+ check_database_status/1,
+ execute_sql_in_clickhouse_server_using_connection/2
+]).
+
+%%=====================================================================
+%% Types
+%%=====================================================================
+
+-type url() :: emqx_http_lib:uri_map().
+-reflect_type([url/0]).
+-typerefl_from_string({url/0, emqx_http_lib, uri_parse}).
+
+-type templates() ::
+ #{}
+ | #{
+ send_message_template := term(),
+ extend_send_message_template := term()
+ }.
+
+-type state() ::
+ #{
+ templates := templates(),
+ poolname := atom()
+ }.
+
+-type clickhouse_config() :: map().
+
+%%=====================================================================
+%% Configuration and default values
+%%=====================================================================
+
+roots() ->
+ [{config, #{type => hoconsc:ref(?MODULE, config)}}].
+
+fields(config) ->
+ [
+ {url,
+ hoconsc:mk(
+ url(),
+ #{
+ required => true,
+ validator => fun
+ (#{query := _Query}) ->
+ {error, "There must be no query in the url"};
+ (_) ->
+ ok
+ end,
+ desc => ?DESC("base_url")
+ }
+ )}
+ ] ++ emqx_connector_schema_lib:relational_db_fields().
+
+values(post) ->
+ maps:merge(values(put), #{name => <<"connector">>});
+values(get) ->
+ values(post);
+values(put) ->
+ #{
+ database => <<"mqtt">>,
+ enable => true,
+ pool_size => 8,
+ type => clickhouse,
+ url => <<"http://127.0.0.1:8123">>
+ };
+values(_) ->
+ #{}.
+
+%% ===================================================================
+%% Callbacks defined in emqx_resource
+%% ===================================================================
+
+callback_mode() -> always_sync.
+
+%% -------------------------------------------------------------------
+%% on_start callback and related functions
+%% -------------------------------------------------------------------
+
+-spec on_start(resource_id(), clickhouse_config()) -> {ok, state()} | {error, _}.
+
+on_start(
+ InstanceID,
+ #{
+ url := URL,
+ database := DB,
+ pool_size := PoolSize
+ } = Config
+) ->
+ ?SLOG(info, #{
+ msg => "starting_clickhouse_connector",
+ connector => InstanceID,
+ config => emqx_misc:redact(Config)
+ }),
+ PoolName = emqx_plugin_libs_pool:pool_name(InstanceID),
+ Options = [
+ {url, URL},
+ {user, maps:get(username, Config, "default")},
+ {key, emqx_secret:wrap(maps:get(password, Config, "public"))},
+ {database, DB},
+ {auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
+ {pool_size, PoolSize},
+ {pool, PoolName}
+ ],
+ InitState = #{poolname => PoolName},
+ try
+ Templates = prepare_sql_templates(Config),
+ State = maps:merge(InitState, #{templates => Templates}),
+ case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options) of
+ ok ->
+ {ok, State};
+ {error, Reason} ->
+ log_start_error(Config, Reason, none),
+ {error, Reason}
+ end
+ catch
+ _:CatchReason:Stacktrace ->
+ log_start_error(Config, CatchReason, Stacktrace),
+ {error, CatchReason}
+ end.
+
+log_start_error(Config, Reason, Stacktrace) ->
+ StacktraceMap =
+ case Stacktrace of
+ none -> #{};
+ _ -> #{stacktrace => Stacktrace}
+ end,
+ LogMessage =
+ #{
+ msg => "clickhouse_connector_start_failed",
+ error_reason => Reason,
+ config => emqx_misc:redact(Config)
+ },
+ ?SLOG(info, maps:merge(LogMessage, StacktraceMap)),
+ ?tp(
+ clickhouse_connector_start_failed,
+ #{error => Reason}
+ ).
+
+%% Helper functions to prepare SQL tempaltes
+
+prepare_sql_templates(#{
+ sql := Template,
+ batch_value_separator := Separator
+}) ->
+ InsertTemplate =
+ emqx_plugin_libs_rule:preproc_tmpl(Template),
+ BulkExtendInsertTemplate =
+ prepare_sql_bulk_extend_template(Template, Separator),
+ #{
+ send_message_template => InsertTemplate,
+ extend_send_message_template => BulkExtendInsertTemplate
+ };
+prepare_sql_templates(_) ->
+ %% We don't create any templates if this is a non-bridge connector
+ #{}.
+
+prepare_sql_bulk_extend_template(Template, Separator) ->
+ ValuesTemplate = split_clickhouse_insert_sql(Template),
+ %% The value part has been extracted
+ %% Add separator before ValuesTemplate so that one can append it
+ %% to an insert template
+ ExtendParamTemplate = iolist_to_binary([Separator, ValuesTemplate]),
+ emqx_plugin_libs_rule:preproc_tmpl(ExtendParamTemplate).
+
+%% This function is similar to emqx_plugin_libs_rule:split_insert_sql/1 but can
+%% also handle Clickhouse's SQL extension for INSERT statments that allows the
+%% user to specify different formats:
+%%
+%% https://clickhouse.com/docs/en/sql-reference/statements/insert-into/
+%%
+split_clickhouse_insert_sql(SQL) ->
+ ErrorMsg = <<"The SQL template should be an SQL INSERT statement but it is something else.">>,
+ case
+ re:split(SQL, "(\\s+(?i:values)|(?i:format\\s+(?:[A-Za-z0-9_])+)\\s+)", [{return, binary}])
+ of
+ [Part1, _, Part3] ->
+ case string:trim(Part1, leading) of
+ <<"insert", _/binary>> ->
+ Part3;
+ <<"INSERT", _/binary>> ->
+ Part3;
+ _ ->
+ erlang:error(ErrorMsg)
+ end;
+ _ ->
+ erlang:error(ErrorMsg)
+ end.
+
+% This is a callback for ecpool which is triggered by the call to
+% emqx_plugin_libs_pool:start_pool in on_start/2
+
+connect(Options) ->
+ URL = iolist_to_binary(emqx_http_lib:normalize(proplists:get_value(url, Options))),
+ User = proplists:get_value(user, Options),
+ Database = proplists:get_value(database, Options),
+ Key = emqx_secret:unwrap(proplists:get_value(key, Options)),
+ Pool = proplists:get_value(pool, Options),
+ PoolSize = proplists:get_value(pool_size, Options),
+ FixedOptions = [
+ {url, URL},
+ {database, Database},
+ {user, User},
+ {key, Key},
+ {pool, Pool},
+ {pool_size, PoolSize}
+ ],
+ case clickhouse:start_link(FixedOptions) of
+ {ok, _Conn} = Ok ->
+ Ok;
+ {error, Reason} ->
+ {error, Reason}
+ end.
+
+%% -------------------------------------------------------------------
+%% on_stop emqx_resouce callback
+%% -------------------------------------------------------------------
+
+-spec on_stop(resource_id(), resource_state()) -> term().
+
+on_stop(ResourceID, #{poolname := PoolName}) ->
+ ?SLOG(info, #{
+ msg => "stopping clickouse connector",
+ connector => ResourceID
+ }),
+ emqx_plugin_libs_pool:stop_pool(PoolName).
+
+%% -------------------------------------------------------------------
+%% on_get_status emqx_resouce callback and related functions
+%% -------------------------------------------------------------------
+
+on_get_status(_ResourceID, #{poolname := Pool} = _State) ->
+ case
+ emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:check_database_status/1)
+ of
+ true ->
+ connected;
+ false ->
+ connecting
+ end.
+
+check_database_status(Connection) ->
+ clickhouse:status(Connection).
+
+%% -------------------------------------------------------------------
+%% on_query emqx_resouce callback and related functions
+%% -------------------------------------------------------------------
+
+-spec on_query
+ (resource_id(), Request, resource_state()) -> query_result() when
+ Request :: {RequestType, Data},
+ RequestType :: send_message,
+ Data :: map();
+ (resource_id(), Request, resource_state()) -> query_result() when
+ Request :: {RequestType, SQL},
+ RequestType :: sql | query,
+ SQL :: binary().
+
+on_query(
+ ResourceID,
+ {RequestType, DataOrSQL},
+ #{poolname := PoolName} = State
+) ->
+ ?SLOG(debug, #{
+ msg => "clickhouse connector received sql query",
+ connector => ResourceID,
+ type => RequestType,
+ sql => DataOrSQL,
+ state => State
+ }),
+ %% Have we got a query or data to fit into an SQL template?
+ SimplifiedRequestType = query_type(RequestType),
+ #{templates := Templates} = State,
+ SQL = get_sql(SimplifiedRequestType, Templates, DataOrSQL),
+ ClickhouseResult = execute_sql_in_clickhouse_server(PoolName, SQL),
+ transform_and_log_clickhouse_result(ClickhouseResult, ResourceID, SQL).
+
+get_sql(send_message, #{send_message_template := PreparedSQL}, Data) ->
+ emqx_plugin_libs_rule:proc_tmpl(PreparedSQL, Data);
+get_sql(_, _, SQL) ->
+ SQL.
+
+query_type(sql) ->
+ query;
+query_type(query) ->
+ query;
+%% Data that goes to bridges use the prepared template
+query_type(send_message) ->
+ send_message.
+
+%% -------------------------------------------------------------------
+%% on_batch_query emqx_resouce callback and related functions
+%% -------------------------------------------------------------------
+
+-spec on_batch_query(resource_id(), BatchReq, resource_state()) -> query_result() when
+ BatchReq :: nonempty_list({'send_message', map()}).
+
+on_batch_query(
+ ResourceID,
+ BatchReq,
+ State
+) ->
+ %% Currently we only support batch requests with the send_message key
+ {Keys, ObjectsToInsert} = lists:unzip(BatchReq),
+ ensure_keys_are_of_type_send_message(Keys),
+ %% Pick out the SQL template
+ #{
+ templates := Templates,
+ poolname := PoolName
+ } = State,
+ %% Create batch insert SQL statement
+ SQL = objects_to_sql(ObjectsToInsert, Templates),
+ %% Do the actual query in the database
+ ResultFromClickhouse = execute_sql_in_clickhouse_server(PoolName, SQL),
+ %% Transform the result to a better format
+ transform_and_log_clickhouse_result(ResultFromClickhouse, ResourceID, SQL).
+
+ensure_keys_are_of_type_send_message(Keys) ->
+ case lists:all(fun is_send_message_atom/1, Keys) of
+ true ->
+ ok;
+ false ->
+ erlang:error(
+ {unrecoverable_error,
+ <<"Unexpected type for batch message (Expected send_message)">>}
+ )
+ end.
+
+is_send_message_atom(send_message) ->
+ true;
+is_send_message_atom(_) ->
+ false.
+
+objects_to_sql(
+ [FirstObject | RemainingObjects] = _ObjectsToInsert,
+ #{
+ send_message_template := InsertTemplate,
+ extend_send_message_template := BulkExtendInsertTemplate
+ }
+) ->
+ %% Prepare INSERT-statement and the first row after VALUES
+ InsertStatementHead = emqx_plugin_libs_rule:proc_tmpl(InsertTemplate, FirstObject),
+ FormatObjectDataFunction =
+ fun(Object) ->
+ emqx_plugin_libs_rule:proc_tmpl(BulkExtendInsertTemplate, Object)
+ end,
+ InsertStatementTail = lists:map(FormatObjectDataFunction, RemainingObjects),
+ CompleteStatement = erlang:iolist_to_binary([InsertStatementHead, InsertStatementTail]),
+ CompleteStatement;
+objects_to_sql(_, _) ->
+ erlang:error(<<"Templates for bulk insert missing.">>).
+
+%% -------------------------------------------------------------------
+%% Helper functions that are used by both on_query/3 and on_batch_query/3
+%% -------------------------------------------------------------------
+
+%% This function is used by on_query/3 and on_batch_query/3 to send a query to
+%% the database server and receive a result
+execute_sql_in_clickhouse_server(PoolName, SQL) ->
+ ecpool:pick_and_do(
+ PoolName,
+ {?MODULE, execute_sql_in_clickhouse_server_using_connection, [SQL]},
+ no_handover
+ ).
+
+execute_sql_in_clickhouse_server_using_connection(Connection, SQL) ->
+ clickhouse:query(Connection, SQL, []).
+
+%% This function transforms the result received from clickhouse to something
+%% that is a little bit more readable and creates approprieate log messages
+transform_and_log_clickhouse_result({ok, 200, <<"">>} = _ClickhouseResult, _, _) ->
+ snabbkaffe_log_return(ok),
+ ok;
+transform_and_log_clickhouse_result({ok, 200, Data}, _, _) ->
+ Result = {ok, Data},
+ snabbkaffe_log_return(Result),
+ Result;
+transform_and_log_clickhouse_result(ClickhouseErrorResult, ResourceID, SQL) ->
+ ?SLOG(error, #{
+ msg => "clickhouse connector do sql query failed",
+ connector => ResourceID,
+ sql => SQL,
+ reason => ClickhouseErrorResult
+ }),
+ {error, ClickhouseErrorResult}.
+
+snabbkaffe_log_return(_Result) ->
+ ?tp(
+ clickhouse_connector_query_return,
+ #{result => _Result}
+ ).
diff --git a/lib-ee/emqx_ee_connector/test/ee_connector_clickhouse_SUITE.erl b/lib-ee/emqx_ee_connector/test/ee_connector_clickhouse_SUITE.erl
new file mode 100644
index 000000000..eab1aa054
--- /dev/null
+++ b/lib-ee/emqx_ee_connector/test/ee_connector_clickhouse_SUITE.erl
@@ -0,0 +1,198 @@
+% %%--------------------------------------------------------------------
+% %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+% %%
+% %% Licensed under the Apache License, Version 2.0 (the "License");
+% %% you may not use this file except in compliance with the License.
+% %% You may obtain a copy of the License at
+% %% http://www.apache.org/licenses/LICENSE-2.0
+% %%
+% %% Unless required by applicable law or agreed to in writing, software
+% %% distributed under the License is distributed on an "AS IS" BASIS,
+% %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+% %% See the License for the specific language governing permissions and
+% %% limitations under the License.
+% %%--------------------------------------------------------------------
+
+-module(ee_connector_clickhouse_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include("emqx_connector.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("stdlib/include/assert.hrl").
+
+-define(CLICKHOUSE_HOST, "clickhouse").
+-define(CLICKHOUSE_RESOURCE_MOD, emqx_ee_connector_clickhouse).
+
+%% This test SUITE requires a running clickhouse instance. If you don't want to
+%% bring up the whole CI infrastuctucture with the `scripts/ct/run.sh` script
+%% you can create a clickhouse instance with the following command (execute it
+%% from root of the EMQX directory.). You also need to set ?CLICKHOUSE_HOST and
+%% ?CLICKHOUSE_PORT to appropriate values.
+%%
+%% docker run -d -p 18123:8123 -p19000:9000 --name some-clickhouse-server --ulimit nofile=262144:262144 -v "`pwd`/.ci/docker-compose-file/clickhouse/users.xml:/etc/clickhouse-server/users.xml" -v "`pwd`/.ci/docker-compose-file/clickhouse/config.xml:/etc/clickhouse-server/config.xml" clickhouse/clickhouse-server
+
+all() ->
+ emqx_common_test_helpers:all(?MODULE).
+
+groups() ->
+ [].
+
+clickhouse_url() ->
+ erlang:iolist_to_binary([
+ <<"http://">>,
+ ?CLICKHOUSE_HOST,
+ ":",
+ erlang:integer_to_list(?CLICKHOUSE_DEFAULT_PORT)
+ ]).
+
+init_per_suite(Config) ->
+ case
+ emqx_common_test_helpers:is_tcp_server_available(?CLICKHOUSE_HOST, ?CLICKHOUSE_DEFAULT_PORT)
+ of
+ true ->
+ ok = emqx_common_test_helpers:start_apps([emqx_conf]),
+ ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
+ {ok, _} = application:ensure_all_started(emqx_connector),
+ {ok, _} = application:ensure_all_started(emqx_ee_connector),
+ %% Create the db table
+ {ok, Conn} =
+ clickhouse:start_link([
+ {url, clickhouse_url()},
+ {user, <<"default">>},
+ {key, "public"},
+ {pool, tmp_pool}
+ ]),
+ {ok, _, _} = clickhouse:query(Conn, <<"CREATE DATABASE IF NOT EXISTS mqtt">>, #{}),
+ clickhouse:stop(Conn),
+ Config;
+ false ->
+ case os:getenv("IS_CI") of
+ "yes" ->
+ throw(no_clickhouse);
+ _ ->
+ {skip, no_clickhouse}
+ end
+ end.
+
+end_per_suite(_Config) ->
+ ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
+ ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
+ _ = application:stop(emqx_connector).
+
+init_per_testcase(_, Config) ->
+ Config.
+
+end_per_testcase(_, _Config) ->
+ ok.
+
+% %%------------------------------------------------------------------------------
+% %% Testcases
+% %%------------------------------------------------------------------------------
+
+t_lifecycle(_Config) ->
+ perform_lifecycle_check(
+ <<"emqx_connector_clickhouse_SUITE">>,
+ clickhouse_config()
+ ).
+
+show(X) ->
+ erlang:display(X),
+ X.
+
+show(Label, What) ->
+ erlang:display({Label, What}),
+ What.
+
+perform_lifecycle_check(PoolName, InitialConfig) ->
+ {ok, #{config := CheckedConfig}} =
+ emqx_resource:check_config(?CLICKHOUSE_RESOURCE_MOD, InitialConfig),
+ {ok, #{
+ state := #{poolname := ReturnedPoolName} = State,
+ status := InitialStatus
+ }} =
+ emqx_resource:create_local(
+ PoolName,
+ ?CONNECTOR_RESOURCE_GROUP,
+ ?CLICKHOUSE_RESOURCE_MOD,
+ CheckedConfig,
+ #{}
+ ),
+ ?assertEqual(InitialStatus, connected),
+ % Instance should match the state and status of the just started resource
+ {ok, ?CONNECTOR_RESOURCE_GROUP, #{
+ state := State,
+ status := InitialStatus
+ }} =
+ emqx_resource:get_instance(PoolName),
+ ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
+ % % Perform query as further check that the resource is working as expected
+ (fun() ->
+ erlang:display({pool_name, PoolName}),
+ QueryNoParamsResWrapper = emqx_resource:query(PoolName, test_query_no_params()),
+ ?assertMatch({ok, _}, QueryNoParamsResWrapper),
+ {_, QueryNoParamsRes} = QueryNoParamsResWrapper,
+ ?assertMatch(<<"1">>, string:trim(QueryNoParamsRes))
+ end)(),
+ ?assertEqual(ok, emqx_resource:stop(PoolName)),
+ % Resource will be listed still, but state will be changed and healthcheck will fail
+ % as the worker no longer exists.
+ {ok, ?CONNECTOR_RESOURCE_GROUP, #{
+ state := State,
+ status := StoppedStatus
+ }} =
+ emqx_resource:get_instance(PoolName),
+ ?assertEqual(stopped, StoppedStatus),
+ ?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(PoolName)),
+ % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
+ ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
+ % Can call stop/1 again on an already stopped instance
+ ?assertEqual(ok, emqx_resource:stop(PoolName)),
+ % Make sure it can be restarted and the healthchecks and queries work properly
+ ?assertEqual(ok, emqx_resource:restart(PoolName)),
+ % async restart, need to wait resource
+ timer:sleep(500),
+ {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} =
+ emqx_resource:get_instance(PoolName),
+ ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
+ (fun() ->
+ QueryNoParamsResWrapper =
+ emqx_resource:query(PoolName, test_query_no_params()),
+ ?assertMatch({ok, _}, QueryNoParamsResWrapper),
+ {_, QueryNoParamsRes} = QueryNoParamsResWrapper,
+ ?assertMatch(<<"1">>, string:trim(QueryNoParamsRes))
+ end)(),
+ % Stop and remove the resource in one go.
+ ?assertEqual(ok, emqx_resource:remove_local(PoolName)),
+ ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
+ % Should not even be able to get the resource data out of ets now unlike just stopping.
+ ?assertEqual({error, not_found}, emqx_resource:get_instance(PoolName)).
+
+% %%------------------------------------------------------------------------------
+% %% Helpers
+% %%------------------------------------------------------------------------------
+
+clickhouse_config() ->
+ Config =
+ #{
+ auto_reconnect => true,
+ database => <<"mqtt">>,
+ username => <<"default">>,
+ password => <<"public">>,
+ pool_size => 8,
+ url => iolist_to_binary(
+ io_lib:format(
+ "http://~s:~b",
+ [
+ ?CLICKHOUSE_HOST,
+ ?CLICKHOUSE_DEFAULT_PORT
+ ]
+ )
+ )
+ },
+ #{<<"config">> => Config}.
+
+test_query_no_params() ->
+ {query, <<"SELECT 1">>}.
diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh
index 612bda77a..40857c46b 100755
--- a/scripts/ct/run.sh
+++ b/scripts/ct/run.sh
@@ -161,6 +161,9 @@ for dep in ${CT_DEPS}; do
;;
tdengine)
FILES+=( '.ci/docker-compose-file/docker-compose-tdengine-restful.yaml' )
+ ;;
+ clickhouse)
+ FILES+=( '.ci/docker-compose-file/docker-compose-clickhouse.yaml' )
;;
*)
echo "unknown_ct_dependency $dep"