From 67acdf088832be4f535b0b91bef7df14c68deed6 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 17 Jan 2023 14:07:56 +0100 Subject: [PATCH] feat: add clickhouse database bridge This commit adds a Clickhouse bridge to EMQX 5. The bridge is similar to the Clickhouse bridge in the 4.4, but adds the possibility to use different formats (such as JSON) for values to be inserted. --- .ci/docker-compose-file/clickhouse/config.xml | 678 ++++++++++++++++++ .ci/docker-compose-file/clickhouse/users.xml | 110 +++ .../docker-compose-clickhouse.yaml | 16 + apps/emqx_bridge/src/emqx_bridge.app.src | 2 +- apps/emqx_bridge/src/emqx_bridge.erl | 3 +- .../emqx_connector/include/emqx_connector.hrl | 1 + .../src/emqx_resource_buffer_worker.erl | 9 + lib-ee/emqx_ee_bridge/docker-ct | 1 + .../i18n/emqx_ee_bridge_clickhouse.conf | 109 +++ lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl | 27 +- .../src/emqx_ee_bridge_clickhouse.erl | 143 ++++ .../test/emqx_ee_bridge_clickhouse_SUITE.erl | 325 +++++++++ lib-ee/emqx_ee_connector/docker-ct | 1 + .../i18n/emqx_ee_connector_clickhouse.conf | 15 + lib-ee/emqx_ee_connector/rebar.config | 1 + .../src/emqx_ee_connector.app.src | 3 +- .../src/emqx_ee_connector_clickhouse.erl | 444 ++++++++++++ .../test/ee_connector_clickhouse_SUITE.erl | 198 +++++ scripts/ct/run.sh | 3 + 19 files changed, 2082 insertions(+), 7 deletions(-) create mode 100644 .ci/docker-compose-file/clickhouse/config.xml create mode 100644 .ci/docker-compose-file/clickhouse/users.xml create mode 100644 .ci/docker-compose-file/docker-compose-clickhouse.yaml create mode 100644 lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_clickhouse.conf create mode 100644 lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_clickhouse.erl create mode 100644 lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_clickhouse_SUITE.erl create mode 100644 lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_clickhouse.conf create mode 100644 lib-ee/emqx_ee_connector/src/emqx_ee_connector_clickhouse.erl create mode 100644 lib-ee/emqx_ee_connector/test/ee_connector_clickhouse_SUITE.erl diff --git a/.ci/docker-compose-file/clickhouse/config.xml b/.ci/docker-compose-file/clickhouse/config.xml new file mode 100644 index 000000000..085f92a12 --- /dev/null +++ b/.ci/docker-compose-file/clickhouse/config.xml @@ -0,0 +1,678 @@ + + + + + + trace + /var/log/clickhouse-server/clickhouse-server.log + /var/log/clickhouse-server/clickhouse-server.err.log + 1000M + 10 + + + + + + + + + + + + + + false + + false + + + https://6f33034cfe684dd7a3ab9875e57b1c8d@o388870.ingest.sentry.io/5226277 + + + + 8123 + 9000 + 9004 + + + + + + + /etc/clickhouse-server/server.crt + /etc/clickhouse-server/server.key + + /etc/clickhouse-server/dhparam.pem + none + true + true + sslv2,sslv3 + true + + + + true + true + sslv2,sslv3 + true + + + + RejectCertificateHandler + + + + + + + + + 9009 + + + + + + + + + + + + + + + + + + + + 4096 + 3 + + + 100 + + + 0 + + + + 10000 + + + 10 + + + 4194304 + + + 0 + + + + + + 8589934592 + + + 5368709120 + + + + /var/lib/clickhouse/ + + + /var/lib/clickhouse/tmp/ + + + + + + /var/lib/clickhouse/user_files/ + + + /var/lib/clickhouse/access/ + + + /etc/clickhouse-server/users.xml + + + default + + + + + + default + + + + + + + + + true + + + + + + + + + + + + localhost + 9000 + + + + + + + + + localhost + 9000 + + + + + localhost + 9000 + + + + + + + 127.0.0.1 + 9000 + + + + + 127.0.0.2 + 9000 + + + + + + + localhost + 9440 + 1 + + + + + + + localhost + 9000 + + + + + localhost + 1 + + + + + + + + + + + + + + + + + + + + + + + + 3600 + + + + 3600 + + + 60 + + + + + + + + + + + + + system + query_log
+ + toYYYYMM(event_date) + + + + + 7500 +
+ + + + system + trace_log
+ + toYYYYMM(event_date) + 7500 +
+ + + + system + query_thread_log
+ toYYYYMM(event_date) + 7500 +
+ + + + + + + + system + metric_log
+ 7500 + 1000 +
+ + + + system + asynchronous_metric_log
+ + 60000 +
+ + + + + + + + + + + + *_dictionary.xml + + + + + + + + + + /clickhouse/task_queue/ddl + + + + + + + + + + + + + + + + click_cost + any + + 0 + 3600 + + + 86400 + 60 + + + + max + + 0 + 60 + + + 3600 + 300 + + + 86400 + 3600 + + + + + + /var/lib/clickhouse/format_schemas/ + + + + + + + +
diff --git a/.ci/docker-compose-file/clickhouse/users.xml b/.ci/docker-compose-file/clickhouse/users.xml new file mode 100644 index 000000000..ced773355 --- /dev/null +++ b/.ci/docker-compose-file/clickhouse/users.xml @@ -0,0 +1,110 @@ + + + + + + + + 10000000000 + + + 0 + + + random + + + + + 1 + + + + + + + + + public + + + + ::/0 + + + + default + + + default + + + + + + + + + + + + + + 3600 + + + 0 + 0 + 0 + 0 + 0 + + + + diff --git a/.ci/docker-compose-file/docker-compose-clickhouse.yaml b/.ci/docker-compose-file/docker-compose-clickhouse.yaml new file mode 100644 index 000000000..118f83dc1 --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-clickhouse.yaml @@ -0,0 +1,16 @@ +version: '3.9' + +services: + clickhouse: + container_name: clickhouse + image: clickhouse/clickhouse-server:23.1.2.9-alpine + restart: always + volumes: + - ./clickhouse/users.xml:/etc/clickhouse-server/users.xml + - ./clickhouse/config.xml:/etc/clickhouse-server/config.d/config.xml + expose: + - "8123" + ports: + - "8123:8123" + networks: + - emqx_bridge diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index 0d4b552ee..0ec246320 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge, [ {description, "EMQX bridges"}, - {vsn, "0.1.11"}, + {vsn, "0.1.12"}, {registered, []}, {mod, {emqx_bridge_app, []}}, {applications, [ diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index dc0a96690..196338336 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -57,7 +57,8 @@ T == influxdb_api_v2; T == redis_single; T == redis_sentinel; - T == redis_cluster + T == redis_cluster; + T == clickhouse ). load() -> diff --git a/apps/emqx_connector/include/emqx_connector.hrl b/apps/emqx_connector/include/emqx_connector.hrl index 82c946cfc..cdb6ddd92 100644 --- a/apps/emqx_connector/include/emqx_connector.hrl +++ b/apps/emqx_connector/include/emqx_connector.hrl @@ -23,6 +23,7 @@ -define(MONGO_DEFAULT_PORT, 27017). -define(REDIS_DEFAULT_PORT, 6379). -define(PGSQL_DEFAULT_PORT, 5432). +-define(CLICKHOUSE_DEFAULT_PORT, 8123). -define(AUTO_RECONNECT_INTERVAL, 2). diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index a8ae4454d..6ddfb5af2 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -917,6 +917,15 @@ do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) -> %% return `{error, {recoverable_error, Reason}}` EXPR catch + %% For convenience and to make the code in the callbacks cleaner an + %% error exception with the two following formats are translated to the + %% corresponding return values. The receiver of the return values + %% recognizes these special return formats and use them to decided if a + %% request should be retried. + error:{unrecoverable_error, Msg} -> + {error, {unrecoverable_error, Msg}}; + error:{recoverable_error, Msg} -> + {error, {recoverable_error, Msg}}; ERR:REASON:STACKTRACE -> ?RESOURCE_ERROR(exception, #{ name => NAME, diff --git a/lib-ee/emqx_ee_bridge/docker-ct b/lib-ee/emqx_ee_bridge/docker-ct index 967faa343..91a937b5c 100644 --- a/lib-ee/emqx_ee_bridge/docker-ct +++ b/lib-ee/emqx_ee_bridge/docker-ct @@ -8,3 +8,4 @@ redis redis_cluster pgsql tdengine +clickhouse diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_clickhouse.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_clickhouse.conf new file mode 100644 index 000000000..c38365d81 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_clickhouse.conf @@ -0,0 +1,109 @@ +emqx_ee_bridge_clickhouse { + + local_topic { + desc { + en: """The MQTT topic filter to be forwarded to Clickhouse. All MQTT 'PUBLISH' messages with the topic +matching the local_topic will be forwarded.
+NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is +configured, then both the data got from the rule and the MQTT messages that match local_topic +will be forwarded. +""" + zh: """发送到 'local_topic' 的消息都会转发到 Clickhouse。
+注意:如果这个 Bridge 被用作规则(EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发。 +""" + } + label { + en: "Local Topic" + zh: "本地 Topic" + } + } + sql_template { + desc { + en: """SQL Template. The template string can contain placeholders +for message metadata and payload field. The placeholders are inserted +without any checking and special formatting so it is important to +ensure that the inserted values are formatted and escaped correctly.""" + zh: + """SQL模板。模板字符串可以包含消息元数据和有效载荷字段的占位符。占位符 +的插入不需要任何检查和特殊格式化,因此必须确保插入的数值格式化和转义正确。模板字符串可以包含占位符 +模板字符串可以包含消息元数据和有效载荷字段的占位符。这些占位符被插入 +所以必须确保插入的值的格式正确。因此,确保插入的值格式化和转义正确是非常重要的。模板字符串可以包含占位符 +模板字符串可以包含消息元数据和有效载荷字段的占位符。这些占位符被插入 +所以必须确保插入的值的格式正确。确保插入的值被正确地格式化和转义。""" + } + label { + en: "SQL Template" + zh: "SQL 模板" + } + } + batch_value_separator { + desc { + en: """The bridge repeats what comes after the VALUES or FORMAT FormatType in the +SQL template to form a batch request. The value specified with +this parameter will be inserted between the values. The default +value ',' works for the VALUES format but other values +might be needed if you specify some other format with the +clickhouse FORMAT syntax. + +See https://clickhouse.com/docs/en/sql-reference/statements/insert-into/ and +https://clickhouse.com/docs/en/interfaces/formats#formats for more information about +the format syntax and the available formats.""" + zh: """桥接会重复 VALUES 或 FORMAT 格式类型之后的内容。中 VALUES 或 +FORMAT FormatType 后面的内容,以形成一个批处理请求。用这个参数指定的值 +这个参数指定的值将被插入到这些值之间。默认的 +默认值','适用于VALUES格式,但是如果你指定了其他的格式,可能需要其他的值。可能需要其他值,如果你用 +"clickhouse FORMAT "语法指定其他格式。语法指定其他格式。 + +参见https://clickhouse.com/docs/en/sql-reference/statements/insert-into/ 和 +https://clickhouse.com/docs/en/interfaces/formats#formats 了解更多关于 +格式语法和可用的格式。""" + } + label { + en: "Batch Value Separator" + zh: "批量值分离器" + } + } + config_enable { + desc { + en: """Enable or disable this bridge""" + zh: """启用/禁用桥接""" + } + label { + en: "Enable Or Disable Bridge" + zh: "启用/禁用桥接" + } + } + + desc_config { + desc { + en: """Configuration for an Clickhouse bridge.""" + zh: """Clickhouse 桥接配置""" + } + label: { + en: "Clickhouse Bridge Configuration" + zh: "Clickhouse 桥接配置" + } + } + + desc_type { + desc { + en: """The Bridge Type""" + zh: """Bridge 类型""" + } + label { + en: "Bridge Type" + zh: "桥接类型" + } + } + + desc_name { + desc { + en: """Bridge name.""" + zh: """桥接名字""" + } + label { + en: "Bridge Name" + zh: "桥接名字" + } + } +} diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index 1a358fdfe..dc7ee2be6 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -29,7 +29,8 @@ api_schemas(Method) -> ref(emqx_ee_bridge_redis, Method ++ "_cluster"), ref(emqx_ee_bridge_timescale, Method), ref(emqx_ee_bridge_matrix, Method), - ref(emqx_ee_bridge_tdengine, Method) + ref(emqx_ee_bridge_tdengine, Method), + ref(emqx_ee_bridge_clickhouse, Method) ]. schema_modules() -> @@ -44,7 +45,8 @@ schema_modules() -> emqx_ee_bridge_pgsql, emqx_ee_bridge_timescale, emqx_ee_bridge_matrix, - emqx_ee_bridge_tdengine + emqx_ee_bridge_tdengine, + emqx_ee_bridge_clickhouse ]. examples(Method) -> @@ -75,7 +77,8 @@ resource_type(redis_cluster) -> emqx_ee_connector_redis; resource_type(pgsql) -> emqx_connector_pgsql; resource_type(timescale) -> emqx_connector_pgsql; resource_type(matrix) -> emqx_connector_pgsql; -resource_type(tdengine) -> emqx_ee_connector_tdengine. +resource_type(tdengine) -> emqx_ee_connector_tdengine; +resource_type(clickhouse) -> emqx_ee_connector_clickhouse. fields(bridges) -> [ @@ -119,7 +122,8 @@ fields(bridges) -> required => false } )} - ] ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++ pgsql_structs(). + ] ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs() ++ pgsql_structs() ++ + clickhouse_structs(). mongodb_structs() -> [ @@ -183,3 +187,18 @@ pgsql_structs() -> {matrix, <<"Matrix">>} ] ]. + +clickhouse_structs() -> + [ + {Type, + mk( + hoconsc:map(name, ref(emqx_ee_bridge_clickhouse, "config")), + #{ + desc => <>, + required => false + } + )} + || {Type, Name} <- [ + {clickhouse, <<"Clickhouse">>} + ] + ]. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_clickhouse.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_clickhouse.erl new file mode 100644 index 000000000..9e03aca4a --- /dev/null +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_clickhouse.erl @@ -0,0 +1,143 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_bridge_clickhouse). + +-include_lib("emqx_bridge/include/emqx_bridge.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-export([ + conn_bridge_examples/1 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +-define(DEFAULT_SQL, + <<"INSERT INTO mqtt_test(payload, arrived) VALUES ('${payload}', ${timestamp})">> +). + +-define(DEFAULT_BATCH_VALUE_SEPARATOR, <<", ">>). + +%% ------------------------------------------------------------------------------------------------- +%% Callback used by HTTP API +%% ------------------------------------------------------------------------------------------------- + +conn_bridge_examples(Method) -> + [ + #{ + <<"clickhouse">> => #{ + summary => <<"Clickhouse Bridge">>, + value => values(Method, "clickhouse") + } + } + ]. + +values(get, Type) -> + maps:merge(values(post, Type), ?METRICS_EXAMPLE); +values(post, Type) -> + #{ + enable => true, + type => Type, + name => <<"foo">>, + server => <<"127.0.0.1:8123">>, + database => <<"mqtt">>, + pool_size => 8, + username => <<"default">>, + password => <<"public">>, + sql => ?DEFAULT_SQL, + batch_value_separator => ?DEFAULT_BATCH_VALUE_SEPARATOR, + local_topic => <<"local/topic/#">>, + resource_opts => #{ + worker_pool_size => 8, + health_check_interval => ?HEALTHCHECK_INTERVAL_RAW, + auto_restart_interval => ?AUTO_RESTART_INTERVAL_RAW, + batch_size => ?DEFAULT_BATCH_SIZE, + batch_time => ?DEFAULT_BATCH_TIME, + query_mode => async, + max_queue_bytes => ?DEFAULT_QUEUE_SIZE + } + }; +values(put, Type) -> + values(post, Type). + +%% ------------------------------------------------------------------------------------------------- +%% Hocon Schema Definitions +%% ------------------------------------------------------------------------------------------------- + +namespace() -> "bridge_clickhouse". + +roots() -> []. + +fields("config") -> + [ + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {sql, + mk( + binary(), + #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>} + )}, + {batch_value_separator, + mk( + binary(), + #{desc => ?DESC("batch_value_separator"), default => ?DEFAULT_BATCH_VALUE_SEPARATOR} + )}, + {local_topic, + mk( + binary(), + #{desc => ?DESC("local_topic"), default => undefined} + )}, + {resource_opts, + mk( + ref(?MODULE, "creation_opts"), + #{ + required => false, + default => #{}, + desc => ?DESC(emqx_resource_schema, <<"resource_opts">>) + } + )} + ] ++ + emqx_ee_connector_clickhouse:fields(config); +fields("creation_opts") -> + Opts = emqx_resource_schema:fields("creation_opts"), + [O || {Field, _} = O <- Opts, not is_hidden_opts(Field)]; +fields("post") -> + fields("post", clickhouse); +fields("put") -> + fields("config"); +fields("get") -> + emqx_bridge_schema:status_fields() ++ fields("post"). + +fields("post", Type) -> + [type_field(Type), name_field() | fields("config")]. + +desc("config") -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for Clickhouse using `", string:to_upper(Method), "` method."]; +desc("creation_opts" = Name) -> + emqx_resource_schema:desc(Name); +desc(_) -> + undefined. + +%% ------------------------------------------------------------------------------------------------- +%% internal +%% ------------------------------------------------------------------------------------------------- +is_hidden_opts(Field) -> + lists:member(Field, [ + async_inflight_window + ]). + +type_field(Type) -> + {type, mk(enum([Type]), #{required => true, desc => ?DESC("desc_type")})}. + +name_field() -> + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_clickhouse_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_clickhouse_SUITE.erl new file mode 100644 index 000000000..6d4762882 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_clickhouse_SUITE.erl @@ -0,0 +1,325 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ee_bridge_clickhouse_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-define(CLICKHOUSE_HOST, "clickhouse"). +-define(CLICKHOUSE_RESOURCE_MOD, emqx_ee_connector_clickhouse). +-include_lib("emqx_connector/include/emqx_connector.hrl"). + +%% See comment in +%% lib-ee/emqx_ee_connector/test/ee_connector_clickhouse_SUITE.erl for how to +%% run this without bringing up the whole CI infrastucture + +%%------------------------------------------------------------------------------ +%% Common Test Setup, Teardown and Testcase List +%%------------------------------------------------------------------------------ + +init_per_suite(Config) -> + case + emqx_common_test_helpers:is_tcp_server_available(?CLICKHOUSE_HOST, ?CLICKHOUSE_DEFAULT_PORT) + of + true -> + emqx_common_test_helpers:render_and_load_app_config(emqx_conf), + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + ok = emqx_connector_test_helpers:start_apps([emqx_resource]), + {ok, _} = application:ensure_all_started(emqx_connector), + {ok, _} = application:ensure_all_started(emqx_ee_connector), + {ok, _} = application:ensure_all_started(emqx_ee_bridge), + snabbkaffe:fix_ct_logging(), + %% Create the db table + Conn = start_clickhouse_connection(), + % erlang:monitor,sb + {ok, _, _} = clickhouse:query(Conn, sql_create_database(), #{}), + {ok, _, _} = clickhouse:query(Conn, sql_create_table(), []), + clickhouse:query(Conn, sql_find_key(42), []), + [{clickhouse_connection, Conn} | Config]; + false -> + case os:getenv("IS_CI") of + "yes" -> + throw(no_clickhouse); + _ -> + {skip, no_clickhouse} + end + end. + +start_clickhouse_connection() -> + %% Start clickhouse connector in sub process so that it does not go + %% down with the process that is calling init_per_suite + InitPerSuiteProcess = self(), + erlang:spawn( + fun() -> + {ok, Conn} = + clickhouse:start_link([ + {url, clickhouse_url()}, + {user, <<"default">>}, + {key, "public"}, + {pool, tmp_pool} + ]), + InitPerSuiteProcess ! {clickhouse_connection, Conn}, + Ref = erlang:monitor(process, Conn), + receive + {'DOWN', Ref, process, _, _} -> + erlang:display(helper_down), + ok + end + end + ), + receive + {clickhouse_connection, C} -> C + end. + +end_per_suite(Config) -> + ClickhouseConnection = proplists:get_value(clickhouse_connection, Config), + clickhouse:stop(ClickhouseConnection), + ok = emqx_common_test_helpers:stop_apps([emqx_conf]), + ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), + _ = application:stop(emqx_connector), + _ = application:stop(emqx_ee_connector), + _ = application:stop(emqx_bridge). + +init_per_testcase(_, Config) -> + reset_table(Config), + Config. + +end_per_testcase(_, Config) -> + reset_table(Config), + ok. + +all() -> + emqx_common_test_helpers:all(?MODULE). + +%%------------------------------------------------------------------------------ +%% Helper functions for test cases +%%------------------------------------------------------------------------------ + +sql_insert_template_for_bridge() -> + "INSERT INTO mqtt_test(key, data, arrived) VALUES " + "(${key}, '${data}', ${timestamp})". + +sql_insert_template_for_bridge_json() -> + "INSERT INTO mqtt_test(key, data, arrived) FORMAT JSONCompactEachRow " + "[${key}, \\\"${data}\\\", ${timestamp}]". + +sql_create_table() -> + "CREATE TABLE IF NOT EXISTS mqtt.mqtt_test (key BIGINT, data String, arrived BIGINT) ENGINE = Memory". + +sql_find_key(Key) -> + io_lib:format("SELECT key FROM mqtt.mqtt_test WHERE key = ~p", [Key]). + +sql_find_all_keys() -> + "SELECT key FROM mqtt.mqtt_test". + +sql_drop_table() -> + "DROP TABLE IF EXISTS mqtt.mqtt_test". + +sql_create_database() -> + "CREATE DATABASE IF NOT EXISTS mqtt". + +clickhouse_url() -> + erlang:iolist_to_binary([ + <<"http://">>, + ?CLICKHOUSE_HOST, + ":", + erlang:integer_to_list(?CLICKHOUSE_DEFAULT_PORT) + ]). + +clickhouse_config(Config) -> + SQL = maps:get(sql, Config, sql_insert_template_for_bridge()), + BatchSeparator = maps:get(batch_value_separator, Config, <<", ">>), + BatchSize = maps:get(batch_size, Config, 1), + BatchTime = maps:get(batch_time_ms, Config, 0), + EnableBatch = maps:get(enable_batch, Config, true), + Name = atom_to_binary(?MODULE), + URL = clickhouse_url(), + ConfigString = + io_lib:format( + "bridges.clickhouse.~s {\n" + " enable = true\n" + " url = \"~s\"\n" + " database = \"mqtt\"\n" + " sql = \"~s\"\n" + " batch_value_separator = \"~s\"" + " resource_opts = {\n" + " enable_batch = ~w\n" + " batch_size = ~b\n" + " batch_time = ~bms\n" + " }\n" + "}\n", + [ + Name, + URL, + SQL, + BatchSeparator, + EnableBatch, + BatchSize, + BatchTime + ] + ), + ct:pal(ConfigString), + parse_and_check(ConfigString, <<"clickhouse">>, Name). + +parse_and_check(ConfigString, BridgeType, Name) -> + {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), + hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), + #{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf, + RetConfig. + +make_bridge(Config) -> + Type = <<"clickhouse">>, + Name = atom_to_binary(?MODULE), + BridgeConfig = clickhouse_config(Config), + {ok, _} = emqx_bridge:create( + Type, + Name, + BridgeConfig + ), + emqx_bridge_resource:bridge_id(Type, Name). + +delete_bridge() -> + Type = <<"clickhouse">>, + Name = atom_to_binary(?MODULE), + {ok, _} = emqx_bridge:remove(Type, Name), + ok. + +reset_table(Config) -> + ClickhouseConnection = proplists:get_value(clickhouse_connection, Config), + {ok, _, _} = clickhouse:query(ClickhouseConnection, sql_drop_table(), []), + {ok, _, _} = clickhouse:query(ClickhouseConnection, sql_create_table(), []), + ok. + +check_key_in_clickhouse(AttempsLeft, Key, Config) -> + ClickhouseConnection = proplists:get_value(clickhouse_connection, Config), + check_key_in_clickhouse(AttempsLeft, Key, none, ClickhouseConnection). + +check_key_in_clickhouse(Key, Config) -> + ClickhouseConnection = proplists:get_value(clickhouse_connection, Config), + check_key_in_clickhouse(30, Key, none, ClickhouseConnection). + +check_key_in_clickhouse(0, Key, PrevResult, _) -> + ct:fail("Expected ~p in database but got ~s", [Key, PrevResult]); +check_key_in_clickhouse(AttempsLeft, Key, _, ClickhouseConnection) -> + {ok, 200, ResultString} = clickhouse:query(ClickhouseConnection, sql_find_key(Key), []), + Expected = erlang:integer_to_binary(Key), + case iolist_to_binary(string:trim(ResultString)) of + Expected -> + ok; + SomethingElse -> + timer:sleep(100), + check_key_in_clickhouse(AttempsLeft - 1, Key, SomethingElse, ClickhouseConnection) + end. + +%%------------------------------------------------------------------------------ +%% Test Cases +%%------------------------------------------------------------------------------ + +t_make_delete_bridge(_Config) -> + make_bridge(#{}), + %% Check that the new brige is in the list of bridges + Bridges = emqx_bridge:list(), + Name = atom_to_binary(?MODULE), + IsRightName = + fun + (#{name := BName}) when BName =:= Name -> + true; + (_) -> + false + end, + true = lists:any(IsRightName, Bridges), + delete_bridge(), + BridgesAfterDelete = emqx_bridge:list(), + false = lists:any(IsRightName, BridgesAfterDelete), + ok. + +t_send_message_query(Config) -> + BridgeID = make_bridge(#{enable_batch => false}), + Key = 42, + Payload = #{key => Key, data => <<"clickhouse_data">>, timestamp => 10000}, + %% This will use the SQL template included in the bridge + emqx_bridge:send_message(BridgeID, Payload), + %% Check that the data got to the database + check_key_in_clickhouse(Key, Config), + delete_bridge(), + ok. + +t_send_simple_batch(Config) -> + send_simple_batch_helper(Config, #{}). + +t_send_simple_batch_alternative_format(Config) -> + send_simple_batch_helper( + Config, + #{ + sql => sql_insert_template_for_bridge_json(), + batch_value_separator => <<"">> + } + ). + +send_simple_batch_helper(Config, BridgeConfigExt) -> + BridgeConf = maps:merge( + #{ + batch_size => 100, + enable_batch => true + }, + BridgeConfigExt + ), + BridgeID = make_bridge(BridgeConf), + Key = 42, + Payload = #{key => Key, data => <<"clickhouse_data">>, timestamp => 10000}, + %% This will use the SQL template included in the bridge + emqx_bridge:send_message(BridgeID, Payload), + check_key_in_clickhouse(Key, Config), + delete_bridge(), + ok. + +t_heavy_batching(Config) -> + heavy_batching_helper(Config, #{}). + +t_heavy_batching_alternative_format(Config) -> + heavy_batching_helper( + Config, + #{ + sql => sql_insert_template_for_bridge_json(), + batch_value_separator => <<"">> + } + ). + +heavy_batching_helper(Config, BridgeConfigExt) -> + ClickhouseConnection = proplists:get_value(clickhouse_connection, Config), + NumberOfMessages = 10000, + BridgeConf = maps:merge( + #{ + batch_size => 743, + batch_time_ms => 50, + enable_batch => true + }, + BridgeConfigExt + ), + BridgeID = make_bridge(BridgeConf), + SendMessageKey = fun(Key) -> + Payload = #{ + key => Key, + data => <<"clickhouse_data">>, + timestamp => 10000 + }, + emqx_bridge:send_message(BridgeID, Payload) + end, + [SendMessageKey(Key) || Key <- lists:seq(1, NumberOfMessages)], + % Wait until the last message is in clickhouse + %% The delay between attempts is 100ms so 150 attempts means 15 seconds + check_key_in_clickhouse(_AttemptsToFindKey = 150, NumberOfMessages, Config), + %% In case the messages are not sent in order (could happend with multiple buffer workers) + timer:sleep(1000), + {ok, 200, ResultString1} = clickhouse:query(ClickhouseConnection, sql_find_all_keys(), []), + ResultString2 = iolist_to_binary(string:trim(ResultString1)), + KeyStrings = string:lexemes(ResultString2, "\n"), + Keys = [erlang:binary_to_integer(iolist_to_binary(K)) || K <- KeyStrings], + KeySet = maps:from_keys(Keys, true), + NumberOfMessages = maps:size(KeySet), + CheckKey = fun(Key) -> maps:get(Key, KeySet, false) end, + true = lists:all(CheckKey, lists:seq(1, NumberOfMessages)), + delete_bridge(), + ok. diff --git a/lib-ee/emqx_ee_connector/docker-ct b/lib-ee/emqx_ee_connector/docker-ct index ef579c036..3db090939 100644 --- a/lib-ee/emqx_ee_connector/docker-ct +++ b/lib-ee/emqx_ee_connector/docker-ct @@ -1,2 +1,3 @@ toxiproxy influxdb +clickhouse diff --git a/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_clickhouse.conf b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_clickhouse.conf new file mode 100644 index 000000000..1e07c29b4 --- /dev/null +++ b/lib-ee/emqx_ee_connector/i18n/emqx_ee_connector_clickhouse.conf @@ -0,0 +1,15 @@ + +emqx_ee_connector_clickhouse { + + base_url { + desc { + en: """The HTTP URL to the Clickhouse server that you want to connect to (for example http://myhostname:8123)""" + zh: """你想连接到的Clickhouse服务器的HTTP URL(例如http://myhostname:8123)。""" + } + label: { + en: "URL to clickhouse server" + zh: "到clickhouse服务器的URL" + } + } + +} diff --git a/lib-ee/emqx_ee_connector/rebar.config b/lib-ee/emqx_ee_connector/rebar.config index d758e1424..bcf9508bf 100644 --- a/lib-ee/emqx_ee_connector/rebar.config +++ b/lib-ee/emqx_ee_connector/rebar.config @@ -3,6 +3,7 @@ {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}}, {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.9"}}}, {tdengine, {git, "https://github.com/emqx/tdengine-client-erl", {tag, "0.1.5"}}}, + {clickhouse, {git, "https://github.com/emqx/clickhouse-client-erl", {tag, "0.2"}}}, {emqx, {path, "../../apps/emqx"}} ]}. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src index 5017abd21..6c9d83bc7 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src @@ -9,7 +9,8 @@ influxdb, tdengine, wolff, - brod + brod, + clickhouse ]}, {env, []}, {modules, []}, diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_clickhouse.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_clickhouse.erl new file mode 100644 index 000000000..b1ad6c787 --- /dev/null +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_clickhouse.erl @@ -0,0 +1,444 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ee_connector_clickhouse). + +-include_lib("emqx_connector/include/emqx_connector.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-behaviour(emqx_resource). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +%%===================================================================== +%% Exports +%%===================================================================== + +%% Hocon config schema exports +-export([ + roots/0, + fields/1, + values/1 +]). + +%% callbacks for behaviour emqx_resource +-export([ + callback_mode/0, + on_start/2, + on_stop/2, + on_query/3, + on_batch_query/3, + on_get_status/2 +]). + +%% callbacks for ecpool +-export([connect/1]). + +%% Internal exports used to execute code with ecpool worker +-export([ + check_database_status/1, + execute_sql_in_clickhouse_server_using_connection/2 +]). + +%%===================================================================== +%% Types +%%===================================================================== + +-type url() :: emqx_http_lib:uri_map(). +-reflect_type([url/0]). +-typerefl_from_string({url/0, emqx_http_lib, uri_parse}). + +-type templates() :: + #{} + | #{ + send_message_template := term(), + extend_send_message_template := term() + }. + +-type state() :: + #{ + templates := templates(), + poolname := atom() + }. + +-type clickhouse_config() :: map(). + +%%===================================================================== +%% Configuration and default values +%%===================================================================== + +roots() -> + [{config, #{type => hoconsc:ref(?MODULE, config)}}]. + +fields(config) -> + [ + {url, + hoconsc:mk( + url(), + #{ + required => true, + validator => fun + (#{query := _Query}) -> + {error, "There must be no query in the url"}; + (_) -> + ok + end, + desc => ?DESC("base_url") + } + )} + ] ++ emqx_connector_schema_lib:relational_db_fields(). + +values(post) -> + maps:merge(values(put), #{name => <<"connector">>}); +values(get) -> + values(post); +values(put) -> + #{ + database => <<"mqtt">>, + enable => true, + pool_size => 8, + type => clickhouse, + url => <<"http://127.0.0.1:8123">> + }; +values(_) -> + #{}. + +%% =================================================================== +%% Callbacks defined in emqx_resource +%% =================================================================== + +callback_mode() -> always_sync. + +%% ------------------------------------------------------------------- +%% on_start callback and related functions +%% ------------------------------------------------------------------- + +-spec on_start(resource_id(), clickhouse_config()) -> {ok, state()} | {error, _}. + +on_start( + InstanceID, + #{ + url := URL, + database := DB, + pool_size := PoolSize + } = Config +) -> + ?SLOG(info, #{ + msg => "starting_clickhouse_connector", + connector => InstanceID, + config => emqx_misc:redact(Config) + }), + PoolName = emqx_plugin_libs_pool:pool_name(InstanceID), + Options = [ + {url, URL}, + {user, maps:get(username, Config, "default")}, + {key, emqx_secret:wrap(maps:get(password, Config, "public"))}, + {database, DB}, + {auto_reconnect, ?AUTO_RECONNECT_INTERVAL}, + {pool_size, PoolSize}, + {pool, PoolName} + ], + InitState = #{poolname => PoolName}, + try + Templates = prepare_sql_templates(Config), + State = maps:merge(InitState, #{templates => Templates}), + case emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Options) of + ok -> + {ok, State}; + {error, Reason} -> + log_start_error(Config, Reason, none), + {error, Reason} + end + catch + _:CatchReason:Stacktrace -> + log_start_error(Config, CatchReason, Stacktrace), + {error, CatchReason} + end. + +log_start_error(Config, Reason, Stacktrace) -> + StacktraceMap = + case Stacktrace of + none -> #{}; + _ -> #{stacktrace => Stacktrace} + end, + LogMessage = + #{ + msg => "clickhouse_connector_start_failed", + error_reason => Reason, + config => emqx_misc:redact(Config) + }, + ?SLOG(info, maps:merge(LogMessage, StacktraceMap)), + ?tp( + clickhouse_connector_start_failed, + #{error => Reason} + ). + +%% Helper functions to prepare SQL tempaltes + +prepare_sql_templates(#{ + sql := Template, + batch_value_separator := Separator +}) -> + InsertTemplate = + emqx_plugin_libs_rule:preproc_tmpl(Template), + BulkExtendInsertTemplate = + prepare_sql_bulk_extend_template(Template, Separator), + #{ + send_message_template => InsertTemplate, + extend_send_message_template => BulkExtendInsertTemplate + }; +prepare_sql_templates(_) -> + %% We don't create any templates if this is a non-bridge connector + #{}. + +prepare_sql_bulk_extend_template(Template, Separator) -> + ValuesTemplate = split_clickhouse_insert_sql(Template), + %% The value part has been extracted + %% Add separator before ValuesTemplate so that one can append it + %% to an insert template + ExtendParamTemplate = iolist_to_binary([Separator, ValuesTemplate]), + emqx_plugin_libs_rule:preproc_tmpl(ExtendParamTemplate). + +%% This function is similar to emqx_plugin_libs_rule:split_insert_sql/1 but can +%% also handle Clickhouse's SQL extension for INSERT statments that allows the +%% user to specify different formats: +%% +%% https://clickhouse.com/docs/en/sql-reference/statements/insert-into/ +%% +split_clickhouse_insert_sql(SQL) -> + ErrorMsg = <<"The SQL template should be an SQL INSERT statement but it is something else.">>, + case + re:split(SQL, "(\\s+(?i:values)|(?i:format\\s+(?:[A-Za-z0-9_])+)\\s+)", [{return, binary}]) + of + [Part1, _, Part3] -> + case string:trim(Part1, leading) of + <<"insert", _/binary>> -> + Part3; + <<"INSERT", _/binary>> -> + Part3; + _ -> + erlang:error(ErrorMsg) + end; + _ -> + erlang:error(ErrorMsg) + end. + +% This is a callback for ecpool which is triggered by the call to +% emqx_plugin_libs_pool:start_pool in on_start/2 + +connect(Options) -> + URL = iolist_to_binary(emqx_http_lib:normalize(proplists:get_value(url, Options))), + User = proplists:get_value(user, Options), + Database = proplists:get_value(database, Options), + Key = emqx_secret:unwrap(proplists:get_value(key, Options)), + Pool = proplists:get_value(pool, Options), + PoolSize = proplists:get_value(pool_size, Options), + FixedOptions = [ + {url, URL}, + {database, Database}, + {user, User}, + {key, Key}, + {pool, Pool}, + {pool_size, PoolSize} + ], + case clickhouse:start_link(FixedOptions) of + {ok, _Conn} = Ok -> + Ok; + {error, Reason} -> + {error, Reason} + end. + +%% ------------------------------------------------------------------- +%% on_stop emqx_resouce callback +%% ------------------------------------------------------------------- + +-spec on_stop(resource_id(), resource_state()) -> term(). + +on_stop(ResourceID, #{poolname := PoolName}) -> + ?SLOG(info, #{ + msg => "stopping clickouse connector", + connector => ResourceID + }), + emqx_plugin_libs_pool:stop_pool(PoolName). + +%% ------------------------------------------------------------------- +%% on_get_status emqx_resouce callback and related functions +%% ------------------------------------------------------------------- + +on_get_status(_ResourceID, #{poolname := Pool} = _State) -> + case + emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:check_database_status/1) + of + true -> + connected; + false -> + connecting + end. + +check_database_status(Connection) -> + clickhouse:status(Connection). + +%% ------------------------------------------------------------------- +%% on_query emqx_resouce callback and related functions +%% ------------------------------------------------------------------- + +-spec on_query + (resource_id(), Request, resource_state()) -> query_result() when + Request :: {RequestType, Data}, + RequestType :: send_message, + Data :: map(); + (resource_id(), Request, resource_state()) -> query_result() when + Request :: {RequestType, SQL}, + RequestType :: sql | query, + SQL :: binary(). + +on_query( + ResourceID, + {RequestType, DataOrSQL}, + #{poolname := PoolName} = State +) -> + ?SLOG(debug, #{ + msg => "clickhouse connector received sql query", + connector => ResourceID, + type => RequestType, + sql => DataOrSQL, + state => State + }), + %% Have we got a query or data to fit into an SQL template? + SimplifiedRequestType = query_type(RequestType), + #{templates := Templates} = State, + SQL = get_sql(SimplifiedRequestType, Templates, DataOrSQL), + ClickhouseResult = execute_sql_in_clickhouse_server(PoolName, SQL), + transform_and_log_clickhouse_result(ClickhouseResult, ResourceID, SQL). + +get_sql(send_message, #{send_message_template := PreparedSQL}, Data) -> + emqx_plugin_libs_rule:proc_tmpl(PreparedSQL, Data); +get_sql(_, _, SQL) -> + SQL. + +query_type(sql) -> + query; +query_type(query) -> + query; +%% Data that goes to bridges use the prepared template +query_type(send_message) -> + send_message. + +%% ------------------------------------------------------------------- +%% on_batch_query emqx_resouce callback and related functions +%% ------------------------------------------------------------------- + +-spec on_batch_query(resource_id(), BatchReq, resource_state()) -> query_result() when + BatchReq :: nonempty_list({'send_message', map()}). + +on_batch_query( + ResourceID, + BatchReq, + State +) -> + %% Currently we only support batch requests with the send_message key + {Keys, ObjectsToInsert} = lists:unzip(BatchReq), + ensure_keys_are_of_type_send_message(Keys), + %% Pick out the SQL template + #{ + templates := Templates, + poolname := PoolName + } = State, + %% Create batch insert SQL statement + SQL = objects_to_sql(ObjectsToInsert, Templates), + %% Do the actual query in the database + ResultFromClickhouse = execute_sql_in_clickhouse_server(PoolName, SQL), + %% Transform the result to a better format + transform_and_log_clickhouse_result(ResultFromClickhouse, ResourceID, SQL). + +ensure_keys_are_of_type_send_message(Keys) -> + case lists:all(fun is_send_message_atom/1, Keys) of + true -> + ok; + false -> + erlang:error( + {unrecoverable_error, + <<"Unexpected type for batch message (Expected send_message)">>} + ) + end. + +is_send_message_atom(send_message) -> + true; +is_send_message_atom(_) -> + false. + +objects_to_sql( + [FirstObject | RemainingObjects] = _ObjectsToInsert, + #{ + send_message_template := InsertTemplate, + extend_send_message_template := BulkExtendInsertTemplate + } +) -> + %% Prepare INSERT-statement and the first row after VALUES + InsertStatementHead = emqx_plugin_libs_rule:proc_tmpl(InsertTemplate, FirstObject), + FormatObjectDataFunction = + fun(Object) -> + emqx_plugin_libs_rule:proc_tmpl(BulkExtendInsertTemplate, Object) + end, + InsertStatementTail = lists:map(FormatObjectDataFunction, RemainingObjects), + CompleteStatement = erlang:iolist_to_binary([InsertStatementHead, InsertStatementTail]), + CompleteStatement; +objects_to_sql(_, _) -> + erlang:error(<<"Templates for bulk insert missing.">>). + +%% ------------------------------------------------------------------- +%% Helper functions that are used by both on_query/3 and on_batch_query/3 +%% ------------------------------------------------------------------- + +%% This function is used by on_query/3 and on_batch_query/3 to send a query to +%% the database server and receive a result +execute_sql_in_clickhouse_server(PoolName, SQL) -> + ecpool:pick_and_do( + PoolName, + {?MODULE, execute_sql_in_clickhouse_server_using_connection, [SQL]}, + no_handover + ). + +execute_sql_in_clickhouse_server_using_connection(Connection, SQL) -> + clickhouse:query(Connection, SQL, []). + +%% This function transforms the result received from clickhouse to something +%% that is a little bit more readable and creates approprieate log messages +transform_and_log_clickhouse_result({ok, 200, <<"">>} = _ClickhouseResult, _, _) -> + snabbkaffe_log_return(ok), + ok; +transform_and_log_clickhouse_result({ok, 200, Data}, _, _) -> + Result = {ok, Data}, + snabbkaffe_log_return(Result), + Result; +transform_and_log_clickhouse_result(ClickhouseErrorResult, ResourceID, SQL) -> + ?SLOG(error, #{ + msg => "clickhouse connector do sql query failed", + connector => ResourceID, + sql => SQL, + reason => ClickhouseErrorResult + }), + {error, ClickhouseErrorResult}. + +snabbkaffe_log_return(_Result) -> + ?tp( + clickhouse_connector_query_return, + #{result => _Result} + ). diff --git a/lib-ee/emqx_ee_connector/test/ee_connector_clickhouse_SUITE.erl b/lib-ee/emqx_ee_connector/test/ee_connector_clickhouse_SUITE.erl new file mode 100644 index 000000000..eab1aa054 --- /dev/null +++ b/lib-ee/emqx_ee_connector/test/ee_connector_clickhouse_SUITE.erl @@ -0,0 +1,198 @@ +% %%-------------------------------------------------------------------- +% %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +% %% +% %% Licensed under the Apache License, Version 2.0 (the "License"); +% %% you may not use this file except in compliance with the License. +% %% You may obtain a copy of the License at +% %% http://www.apache.org/licenses/LICENSE-2.0 +% %% +% %% Unless required by applicable law or agreed to in writing, software +% %% distributed under the License is distributed on an "AS IS" BASIS, +% %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% %% See the License for the specific language governing permissions and +% %% limitations under the License. +% %%-------------------------------------------------------------------- + +-module(ee_connector_clickhouse_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include("emqx_connector.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("emqx/include/emqx.hrl"). +-include_lib("stdlib/include/assert.hrl"). + +-define(CLICKHOUSE_HOST, "clickhouse"). +-define(CLICKHOUSE_RESOURCE_MOD, emqx_ee_connector_clickhouse). + +%% This test SUITE requires a running clickhouse instance. If you don't want to +%% bring up the whole CI infrastuctucture with the `scripts/ct/run.sh` script +%% you can create a clickhouse instance with the following command (execute it +%% from root of the EMQX directory.). You also need to set ?CLICKHOUSE_HOST and +%% ?CLICKHOUSE_PORT to appropriate values. +%% +%% docker run -d -p 18123:8123 -p19000:9000 --name some-clickhouse-server --ulimit nofile=262144:262144 -v "`pwd`/.ci/docker-compose-file/clickhouse/users.xml:/etc/clickhouse-server/users.xml" -v "`pwd`/.ci/docker-compose-file/clickhouse/config.xml:/etc/clickhouse-server/config.xml" clickhouse/clickhouse-server + +all() -> + emqx_common_test_helpers:all(?MODULE). + +groups() -> + []. + +clickhouse_url() -> + erlang:iolist_to_binary([ + <<"http://">>, + ?CLICKHOUSE_HOST, + ":", + erlang:integer_to_list(?CLICKHOUSE_DEFAULT_PORT) + ]). + +init_per_suite(Config) -> + case + emqx_common_test_helpers:is_tcp_server_available(?CLICKHOUSE_HOST, ?CLICKHOUSE_DEFAULT_PORT) + of + true -> + ok = emqx_common_test_helpers:start_apps([emqx_conf]), + ok = emqx_connector_test_helpers:start_apps([emqx_resource]), + {ok, _} = application:ensure_all_started(emqx_connector), + {ok, _} = application:ensure_all_started(emqx_ee_connector), + %% Create the db table + {ok, Conn} = + clickhouse:start_link([ + {url, clickhouse_url()}, + {user, <<"default">>}, + {key, "public"}, + {pool, tmp_pool} + ]), + {ok, _, _} = clickhouse:query(Conn, <<"CREATE DATABASE IF NOT EXISTS mqtt">>, #{}), + clickhouse:stop(Conn), + Config; + false -> + case os:getenv("IS_CI") of + "yes" -> + throw(no_clickhouse); + _ -> + {skip, no_clickhouse} + end + end. + +end_per_suite(_Config) -> + ok = emqx_common_test_helpers:stop_apps([emqx_conf]), + ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), + _ = application:stop(emqx_connector). + +init_per_testcase(_, Config) -> + Config. + +end_per_testcase(_, _Config) -> + ok. + +% %%------------------------------------------------------------------------------ +% %% Testcases +% %%------------------------------------------------------------------------------ + +t_lifecycle(_Config) -> + perform_lifecycle_check( + <<"emqx_connector_clickhouse_SUITE">>, + clickhouse_config() + ). + +show(X) -> + erlang:display(X), + X. + +show(Label, What) -> + erlang:display({Label, What}), + What. + +perform_lifecycle_check(PoolName, InitialConfig) -> + {ok, #{config := CheckedConfig}} = + emqx_resource:check_config(?CLICKHOUSE_RESOURCE_MOD, InitialConfig), + {ok, #{ + state := #{poolname := ReturnedPoolName} = State, + status := InitialStatus + }} = + emqx_resource:create_local( + PoolName, + ?CONNECTOR_RESOURCE_GROUP, + ?CLICKHOUSE_RESOURCE_MOD, + CheckedConfig, + #{} + ), + ?assertEqual(InitialStatus, connected), + % Instance should match the state and status of the just started resource + {ok, ?CONNECTOR_RESOURCE_GROUP, #{ + state := State, + status := InitialStatus + }} = + emqx_resource:get_instance(PoolName), + ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), + % % Perform query as further check that the resource is working as expected + (fun() -> + erlang:display({pool_name, PoolName}), + QueryNoParamsResWrapper = emqx_resource:query(PoolName, test_query_no_params()), + ?assertMatch({ok, _}, QueryNoParamsResWrapper), + {_, QueryNoParamsRes} = QueryNoParamsResWrapper, + ?assertMatch(<<"1">>, string:trim(QueryNoParamsRes)) + end)(), + ?assertEqual(ok, emqx_resource:stop(PoolName)), + % Resource will be listed still, but state will be changed and healthcheck will fail + % as the worker no longer exists. + {ok, ?CONNECTOR_RESOURCE_GROUP, #{ + state := State, + status := StoppedStatus + }} = + emqx_resource:get_instance(PoolName), + ?assertEqual(stopped, StoppedStatus), + ?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(PoolName)), + % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. + ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), + % Can call stop/1 again on an already stopped instance + ?assertEqual(ok, emqx_resource:stop(PoolName)), + % Make sure it can be restarted and the healthchecks and queries work properly + ?assertEqual(ok, emqx_resource:restart(PoolName)), + % async restart, need to wait resource + timer:sleep(500), + {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} = + emqx_resource:get_instance(PoolName), + ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), + (fun() -> + QueryNoParamsResWrapper = + emqx_resource:query(PoolName, test_query_no_params()), + ?assertMatch({ok, _}, QueryNoParamsResWrapper), + {_, QueryNoParamsRes} = QueryNoParamsResWrapper, + ?assertMatch(<<"1">>, string:trim(QueryNoParamsRes)) + end)(), + % Stop and remove the resource in one go. + ?assertEqual(ok, emqx_resource:remove_local(PoolName)), + ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), + % Should not even be able to get the resource data out of ets now unlike just stopping. + ?assertEqual({error, not_found}, emqx_resource:get_instance(PoolName)). + +% %%------------------------------------------------------------------------------ +% %% Helpers +% %%------------------------------------------------------------------------------ + +clickhouse_config() -> + Config = + #{ + auto_reconnect => true, + database => <<"mqtt">>, + username => <<"default">>, + password => <<"public">>, + pool_size => 8, + url => iolist_to_binary( + io_lib:format( + "http://~s:~b", + [ + ?CLICKHOUSE_HOST, + ?CLICKHOUSE_DEFAULT_PORT + ] + ) + ) + }, + #{<<"config">> => Config}. + +test_query_no_params() -> + {query, <<"SELECT 1">>}. diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index 612bda77a..40857c46b 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -161,6 +161,9 @@ for dep in ${CT_DEPS}; do ;; tdengine) FILES+=( '.ci/docker-compose-file/docker-compose-tdengine-restful.yaml' ) + ;; + clickhouse) + FILES+=( '.ci/docker-compose-file/docker-compose-clickhouse.yaml' ) ;; *) echo "unknown_ct_dependency $dep"