From 451c8ecaf53f828d504daae1a64e2013ffa430b3 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Tue, 30 Jan 2024 18:07:36 +0800 Subject: [PATCH 1/4] ci: add env vars to run clickhouse tests locally --- .../test/emqx_bridge_clickhouse_SUITE.erl | 21 ++++++++------- ...emqx_bridge_clickhouse_connector_SUITE.erl | 27 ++++--------------- 2 files changed, 17 insertions(+), 31 deletions(-) diff --git a/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_SUITE.erl b/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_SUITE.erl index 8cfc24882..d83321d27 100644 --- a/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_SUITE.erl +++ b/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_SUITE.erl @@ -9,6 +9,7 @@ -define(APP, emqx_bridge_clickhouse). -define(CLICKHOUSE_HOST, "clickhouse"). +-define(CLICKHOUSE_PORT, "8123"). -include_lib("emqx_connector/include/emqx_connector.hrl"). %% See comment in @@ -20,9 +21,9 @@ %%------------------------------------------------------------------------------ init_per_suite(Config) -> - case - emqx_common_test_helpers:is_tcp_server_available(?CLICKHOUSE_HOST, ?CLICKHOUSE_DEFAULT_PORT) - of + Host = clickhouse_host(), + Port = list_to_integer(clickhouse_port()), + case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of true -> emqx_common_test_helpers:render_and_load_app_config(emqx_conf), ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), @@ -114,13 +115,15 @@ sql_drop_table() -> sql_create_database() -> "CREATE DATABASE IF NOT EXISTS mqtt". +clickhouse_host() -> + os:getenv("CLICKHOUSE_HOST", ?CLICKHOUSE_HOST). +clickhouse_port() -> + os:getenv("CLICKHOUSE_PORT", ?CLICKHOUSE_PORT). + clickhouse_url() -> - erlang:iolist_to_binary([ - <<"http://">>, - ?CLICKHOUSE_HOST, - ":", - erlang:integer_to_list(?CLICKHOUSE_DEFAULT_PORT) - ]). + Host = clickhouse_host(), + Port = clickhouse_port(), + erlang:iolist_to_binary(["http://", Host, ":", Port]). clickhouse_config(Config) -> SQL = maps:get(sql, Config, sql_insert_template_for_bridge()), diff --git a/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_connector_SUITE.erl b/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_connector_SUITE.erl index e1d3149db..e9eb6c7a2 100644 --- a/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_connector_SUITE.erl +++ b/apps/emqx_bridge_clickhouse/test/emqx_bridge_clickhouse_connector_SUITE.erl @@ -13,7 +13,6 @@ -include_lib("common_test/include/ct.hrl"). -define(APP, emqx_bridge_clickhouse). --define(CLICKHOUSE_HOST, "clickhouse"). -define(CLICKHOUSE_RESOURCE_MOD, emqx_bridge_clickhouse_connector). -define(CLICKHOUSE_PASSWORD, "public"). @@ -39,25 +38,17 @@ all() -> groups() -> []. -clickhouse_url() -> - erlang:iolist_to_binary([ - <<"http://">>, - ?CLICKHOUSE_HOST, - ":", - erlang:integer_to_list(?CLICKHOUSE_DEFAULT_PORT) - ]). - init_per_suite(Config) -> - case - emqx_common_test_helpers:is_tcp_server_available(?CLICKHOUSE_HOST, ?CLICKHOUSE_DEFAULT_PORT) - of + Host = emqx_bridge_clickhouse_SUITE:clickhouse_host(), + Port = list_to_integer(emqx_bridge_clickhouse_SUITE:clickhouse_port()), + case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of true -> ok = emqx_common_test_helpers:start_apps([emqx_conf]), ok = emqx_connector_test_helpers:start_apps([emqx_resource, ?APP]), %% Create the db table {ok, Conn} = clickhouse:start_link([ - {url, clickhouse_url()}, + {url, emqx_bridge_clickhouse_SUITE:clickhouse_url()}, {user, <<"default">>}, {key, ?CLICKHOUSE_PASSWORD}, {pool, tmp_pool} @@ -205,15 +196,7 @@ clickhouse_config(Overrides) -> username => <<"default">>, password => <>, pool_size => 8, - url => iolist_to_binary( - io_lib:format( - "http://~s:~b", - [ - ?CLICKHOUSE_HOST, - ?CLICKHOUSE_DEFAULT_PORT - ] - ) - ), + url => emqx_bridge_clickhouse_SUITE:clickhouse_url(), connect_timeout => <<"10s">> }, #{<<"config">> => maps:merge(Config, Overrides)}. From 8ae0e787863bd37f32f00225f1407af52c490f4f Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Fri, 2 Feb 2024 17:58:18 +0800 Subject: [PATCH 2/4] refactor: split clickhouse bridges to actions and connectors --- apps/emqx_bridge/src/emqx_action_info.erl | 1 + .../src/emqx_bridge_cassandra_connector.erl | 5 - .../src/emqx_bridge_clickhouse.app.src | 4 +- .../src/emqx_bridge_clickhouse.erl | 121 +++++++++++++++--- .../emqx_bridge_clickhouse_action_info.erl | 62 +++++++++ .../src/emqx_bridge_clickhouse_connector.erl | 82 +++++++----- .../src/schema/emqx_connector_ee_schema.erl | 12 ++ .../src/schema/emqx_connector_schema.erl | 2 + changes/ee/feat-19999.en.md | 1 + rel/i18n/emqx_bridge_clickhouse.hocon | 10 ++ 10 files changed, 247 insertions(+), 53 deletions(-) create mode 100644 apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_action_info.erl create mode 100644 changes/ee/feat-19999.en.md diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index 3a9e13f94..e54ef6124 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -97,6 +97,7 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_rocketmq_action_info, emqx_bridge_influxdb_action_info, emqx_bridge_cassandra_action_info, + emqx_bridge_clickhouse_action_info, emqx_bridge_mysql_action_info, emqx_bridge_pgsql_action_info, emqx_bridge_syskeeper_action_info, diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl index a84d3912b..c9d3246d3 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl @@ -32,8 +32,6 @@ on_get_status/2 ]). --export([transform_bridge_v1_config_to_connector_config/1]). - %% callbacks of ecpool -export([ connect/1, @@ -448,9 +446,6 @@ handle_result({error, Error}) -> handle_result(Res) -> Res. -transform_bridge_v1_config_to_connector_config(_) -> - ok. - %%-------------------------------------------------------------------- %% utils diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src index 3288b83fd..d96d06375 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_clickhouse, [ {description, "EMQX Enterprise ClickHouse Bridge"}, - {vsn, "0.2.5"}, + {vsn, "0.3.0"}, {registered, []}, {applications, [ kernel, @@ -8,7 +8,7 @@ emqx_resource, clickhouse ]}, - {env, []}, + {env, [{emqx_action_info_modules, [emqx_bridge_clickhouse_action_info]}]}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.erl index deca42154..11f5fff5a 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.erl @@ -9,8 +9,11 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). +%% Examples -export([ - conn_bridge_examples/1 + bridge_v2_examples/1, + conn_bridge_examples/1, + connector_examples/1 ]). -export([ @@ -20,12 +23,10 @@ desc/1 ]). --define(DEFAULT_SQL, - <<"INSERT INTO mqtt_test(payload, arrived) VALUES ('${payload}', ${timestamp})">> -). - +-define(DEFAULT_SQL, <<"INSERT INTO messages(data, arrived) VALUES ('${payload}', ${timestamp})">>). -define(DEFAULT_BATCH_VALUE_SEPARATOR, <<", ">>). - +-define(CONNECTOR_TYPE, clickhouse). +-define(ACTION_TYPE, clickhouse). %% ------------------------------------------------------------------------------------------------- %% Callback used by HTTP API %% ------------------------------------------------------------------------------------------------- @@ -40,6 +41,42 @@ conn_bridge_examples(Method) -> } ]. +bridge_v2_examples(Method) -> + ParamsExample = #{ + parameters => #{ + batch_value_separator => ?DEFAULT_BATCH_VALUE_SEPARATOR, + sql => ?DEFAULT_SQL + } + }, + [ + #{ + <<"clickhouse">> => #{ + summary => <<"ClickHouse Action">>, + value => emqx_bridge_v2_schema:action_values( + Method, clickhouse, clickhouse, ParamsExample + ) + } + } + ]. + +connector_examples(Method) -> + [ + #{ + <<"clickhouse">> => #{ + summary => <<"ClickHouse Connector">>, + value => emqx_connector_schema:connector_values( + Method, clickhouse, #{ + server => <<"127.0.0.1:8123">>, + database => <<"mqtt">>, + pool_size => 8, + username => <<"default">>, + password => <<"******">> + } + ) + } + } + ]. + values(_Method, Type) -> #{ enable => true, @@ -71,19 +108,49 @@ namespace() -> "bridge_clickhouse". roots() -> []. +fields("config_connector") -> + emqx_connector_schema:common_fields() ++ + emqx_bridge_clickhouse_connector:fields(config) ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); +fields(action) -> + {clickhouse, + mk( + hoconsc:map(name, ref(?MODULE, clickhouse_action)), + #{desc => <<"ClickHouse Action Config">>, required => false} + )}; +fields(clickhouse_action) -> + emqx_bridge_v2_schema:make_producer_action_schema( + mk(ref(?MODULE, action_parameters), #{ + required => true, desc => ?DESC(action_parameters) + }) + ); +fields(action_parameters) -> + [ + sql_field(), + batch_value_separator_field() + ]; +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + Fields = + emqx_bridge_clickhouse_connector:fields(config) ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts), + emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields); +fields(Field) when + Field == "get_bridge_v2"; + Field == "post_bridge_v2"; + Field == "put_bridge_v2" +-> + emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(clickhouse_action)); fields("config") -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, - {sql, - mk( - binary(), - #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>} - )}, - {batch_value_separator, - mk( - binary(), - #{desc => ?DESC("batch_value_separator"), default => ?DEFAULT_BATCH_VALUE_SEPARATOR} - )}, + sql_field(), + batch_value_separator_field(), {local_topic, mk( binary(), @@ -112,6 +179,28 @@ fields("get") -> fields("post", Type) -> [type_field(Type), name_field() | fields("config")]. +sql_field() -> + {sql, + mk( + binary(), + #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>} + )}. + +batch_value_separator_field() -> + {batch_value_separator, + mk( + binary(), + #{desc => ?DESC("batch_value_separator"), default => ?DEFAULT_BATCH_VALUE_SEPARATOR} + )}. + +desc(clickhouse_action) -> + ?DESC(clickhouse_action); +desc(action_parameters) -> + ?DESC(action_parameters); +desc("config_connector") -> + ?DESC("desc_config"); +desc(connector_resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); desc("config") -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_action_info.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_action_info.erl new file mode 100644 index 000000000..066569f7e --- /dev/null +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_action_info.erl @@ -0,0 +1,62 @@ +-module(emqx_bridge_clickhouse_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_config_to_action_config/2, + bridge_v1_config_to_connector_config/1, + connector_action_config_to_bridge_v1_config/2, + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +-import(emqx_utils_conv, [bin/1]). + +-define(SCHEMA_MODULE, emqx_bridge_clickhouse). + +bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) -> + ActionTopLevelKeys = schema_keys(clickhouse_action), + ActionParametersKeys = schema_keys(action_parameters), + ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, + ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1, + ActionConfig#{<<"connector">> => ConnectorName} + ). + +bridge_v1_config_to_connector_config(BridgeV1Config) -> + ActionTopLevelKeys = schema_keys(clickhouse_action), + ActionParametersKeys = schema_keys(action_parameters), + ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, + ConnectorTopLevelKeys = schema_keys("config_connector"), + ConnectorKeys = maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys), + ConnConfig0 = maps:with(ConnectorKeys, BridgeV1Config), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_connector_schema:project_to_connector_resource_opts/1, + ConnConfig0 + ). + +connector_action_config_to_bridge_v1_config(ConnectorRawConf, ActionRawConf) -> + RawConf = emqx_action_info:connector_action_config_to_bridge_v1_config( + ConnectorRawConf, ActionRawConf + ), + maps:without([<<"clickhouse_type">>], RawConf). + +bridge_v1_type_name() -> clickhouse. + +action_type_name() -> clickhouse. + +connector_type_name() -> clickhouse. + +schema_module() -> ?SCHEMA_MODULE. + +make_config_map(PickKeys, IndentKeys, Config) -> + Conf0 = maps:with(PickKeys, Config), + emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0). + +schema_keys(Name) -> + [bin(Key) || Key <- proplists:get_keys(?SCHEMA_MODULE:fields(Name))]. diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl index 0a6c504c7..e7327b56c 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl @@ -32,6 +32,10 @@ callback_mode/0, on_start/2, on_stop/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channel_status/3, + on_get_channels/1, on_query/3, on_batch_query/3, on_get_status/2 @@ -62,6 +66,7 @@ -type state() :: #{ + channels => #{binary() => templates()}, templates := templates(), pool_name := binary(), connect_timeout := pos_integer() @@ -155,10 +160,9 @@ on_start( {pool, InstanceID} ], try - Templates = prepare_sql_templates(Config), State = #{ + channels => #{}, pool_name => InstanceID, - templates => Templates, connect_timeout => ConnectTimeout }, case emqx_resource_pool:start(InstanceID, ?MODULE, Options) of @@ -195,10 +199,8 @@ prepare_sql_templates(#{ sql := Template, batch_value_separator := Separator }) -> - InsertTemplate = - emqx_placeholder:preproc_tmpl(Template), - BulkExtendInsertTemplate = - prepare_sql_bulk_extend_template(Template, Separator), + InsertTemplate = emqx_placeholder:preproc_tmpl(Template), + BulkExtendInsertTemplate = prepare_sql_bulk_extend_template(Template, Separator), #{ send_message_template => InsertTemplate, extend_send_message_template => BulkExtendInsertTemplate @@ -285,6 +287,27 @@ on_stop(InstanceID, _State) -> }), emqx_resource_pool:stop(InstanceID). +%% ------------------------------------------------------------------- +%% channel related emqx_resouce callbacks +%% ------------------------------------------------------------------- +on_add_channel(_InstId, #{channels := Channs} = OldState, ChannId, ChannConf0) -> + #{parameters := ParamConf} = ChannConf0, + NewChanns = Channs#{ChannId => #{templates => prepare_sql_templates(ParamConf)}}, + {ok, OldState#{channels => NewChanns}}. + +on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannId) -> + NewState = State#{channels => maps:remove(ChannId, Channels)}, + {ok, NewState}. + +on_get_channel_status(InstanceId, _ChannId, State) -> + case on_get_status(InstanceId, State) of + {connected, _} -> connected; + {disconnected, _, _} -> disconnected + end. + +on_get_channels(InstanceId) -> + emqx_bridge_v2:get_channels_for_connector(InstanceId). + %% ------------------------------------------------------------------- %% on_get_status emqx_resouce callback and related functions %% ------------------------------------------------------------------- @@ -339,8 +362,8 @@ do_get_status(PoolName, Timeout) -> -spec on_query (resource_id(), Request, resource_state()) -> query_result() when - Request :: {RequestType, Data}, - RequestType :: send_message, + Request :: {ChannId, Data}, + ChannId :: binary(), Data :: map(); (resource_id(), Request, resource_state()) -> query_result() when Request :: {RequestType, SQL}, @@ -361,12 +384,20 @@ on_query( }), %% Have we got a query or data to fit into an SQL template? SimplifiedRequestType = query_type(RequestType), - #{templates := Templates} = State, + Templates = get_templates(RequestType, State), SQL = get_sql(SimplifiedRequestType, Templates, DataOrSQL), ClickhouseResult = execute_sql_in_clickhouse_server(PoolName, SQL), transform_and_log_clickhouse_result(ClickhouseResult, ResourceID, SQL). -get_sql(send_message, #{send_message_template := PreparedSQL}, Data) -> +get_templates(ChannId, State) -> + case maps:find(channels, State) of + {ok, Channels} -> + maps:get(templates, maps:get(ChannId, Channels, #{}), #{}); + error -> + #{} + end. + +get_sql(channel_message, #{send_message_template := PreparedSQL}, Data) -> emqx_placeholder:proc_tmpl(PreparedSQL, Data); get_sql(_, _, SQL) -> SQL. @@ -376,24 +407,21 @@ query_type(sql) -> query_type(query) -> query; %% Data that goes to bridges use the prepared template -query_type(send_message) -> - send_message. +query_type(ChannId) when is_binary(ChannId) -> + channel_message. %% ------------------------------------------------------------------- %% on_batch_query emqx_resouce callback and related functions %% ------------------------------------------------------------------- -spec on_batch_query(resource_id(), BatchReq, resource_state()) -> query_result() when - BatchReq :: nonempty_list({'send_message', map()}). + BatchReq :: nonempty_list({binary(), map()}). -on_batch_query( - ResourceID, - BatchReq, - #{pool_name := PoolName, templates := Templates} = _State -) -> - %% Currently we only support batch requests with the send_message key - {Keys, ObjectsToInsert} = lists:unzip(BatchReq), - ensure_keys_are_of_type_send_message(Keys), +on_batch_query(ResourceID, BatchReq, #{pool_name := PoolName} = State) -> + %% Currently we only support batch requests with a binary ChannId + {[ChannId | _] = Keys, ObjectsToInsert} = lists:unzip(BatchReq), + ensure_channel_messages(Keys), + Templates = get_templates(ChannId, State), %% Create batch insert SQL statement SQL = objects_to_sql(ObjectsToInsert, Templates), %% Do the actual query in the database @@ -401,22 +429,16 @@ on_batch_query( %% Transform the result to a better format transform_and_log_clickhouse_result(ResultFromClickhouse, ResourceID, SQL). -ensure_keys_are_of_type_send_message(Keys) -> - case lists:all(fun is_send_message_atom/1, Keys) of +ensure_channel_messages(Keys) -> + case lists:all(fun is_binary/1, Keys) of true -> ok; false -> erlang:error( - {unrecoverable_error, - <<"Unexpected type for batch message (Expected send_message)">>} + {unrecoverable_error, <<"Unexpected type for batch message (Expected channel-id)">>} ) end. -is_send_message_atom(send_message) -> - true; -is_send_message_atom(_) -> - false. - objects_to_sql( [FirstObject | RemainingObjects] = _ObjectsToInsert, #{ diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index c0b0c365a..4679e1bc4 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -42,6 +42,8 @@ resource_type(influxdb) -> emqx_bridge_influxdb_connector; resource_type(cassandra) -> emqx_bridge_cassandra_connector; +resource_type(clickhouse) -> + emqx_bridge_clickhouse_connector; resource_type(mysql) -> emqx_bridge_mysql_connector; resource_type(pgsql) -> @@ -181,6 +183,14 @@ connector_structs() -> required => false } )}, + {clickhouse, + mk( + hoconsc:map(name, ref(emqx_bridge_clickhouse, "config_connector")), + #{ + desc => <<"ClickHouse Connector Config">>, + required => false + } + )}, {mysql, mk( hoconsc:map(name, ref(emqx_bridge_mysql, "config_connector")), @@ -307,6 +317,7 @@ schema_modules() -> emqx_bridge_oracle, emqx_bridge_influxdb, emqx_bridge_cassandra, + emqx_bridge_clickhouse, emqx_bridge_mysql, emqx_bridge_syskeeper_connector, emqx_bridge_syskeeper_proxy, @@ -345,6 +356,7 @@ api_schemas(Method) -> api_ref(emqx_bridge_oracle, <<"oracle">>, Method ++ "_connector"), api_ref(emqx_bridge_influxdb, <<"influxdb">>, Method ++ "_connector"), api_ref(emqx_bridge_cassandra, <<"cassandra">>, Method ++ "_connector"), + api_ref(emqx_bridge_clickhouse, <<"clickhouse">>, Method ++ "_connector"), api_ref(emqx_bridge_mysql, <<"mysql">>, Method ++ "_connector"), api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method), api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method), diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 751efa3d9..27d7f6379 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -142,6 +142,8 @@ connector_type_to_bridge_types(influxdb) -> [influxdb, influxdb_api_v1, influxdb_api_v2]; connector_type_to_bridge_types(cassandra) -> [cassandra]; +connector_type_to_bridge_types(clickhouse) -> + [clickhouse]; connector_type_to_bridge_types(mysql) -> [mysql]; connector_type_to_bridge_types(mqtt) -> diff --git a/changes/ee/feat-19999.en.md b/changes/ee/feat-19999.en.md new file mode 100644 index 000000000..943bf8442 --- /dev/null +++ b/changes/ee/feat-19999.en.md @@ -0,0 +1 @@ +The bridges for ClickHouse have been split so they are available via the connectors and actions APIs. They are still backwards compatible with the old bridge API. diff --git a/rel/i18n/emqx_bridge_clickhouse.hocon b/rel/i18n/emqx_bridge_clickhouse.hocon index 7d1961f98..929780fbe 100644 --- a/rel/i18n/emqx_bridge_clickhouse.hocon +++ b/rel/i18n/emqx_bridge_clickhouse.hocon @@ -6,6 +6,16 @@ batch_value_separator.desc: batch_value_separator.label: """Batch Value Separator""" +action_parameters.desc: +"""Action specific configs.""" +action_parameters.label: +"""Action""" + +cassandra_action.desc: +"""Action configs.""" +cassandra_action.label: +"""Action""" + config_enable.desc: """Enable or disable this bridge""" From ec888bc87f65a76dbc616613aed90a5e752af239 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sun, 18 Feb 2024 09:41:43 +0800 Subject: [PATCH 3/4] ci: fix test cases for clickhouse --- changes/ee/{feat-19999.en.md => feat-12425.en.md} | 0 rel/i18n/emqx_bridge_clickhouse.hocon | 4 ++-- 2 files changed, 2 insertions(+), 2 deletions(-) rename changes/ee/{feat-19999.en.md => feat-12425.en.md} (100%) diff --git a/changes/ee/feat-19999.en.md b/changes/ee/feat-12425.en.md similarity index 100% rename from changes/ee/feat-19999.en.md rename to changes/ee/feat-12425.en.md diff --git a/rel/i18n/emqx_bridge_clickhouse.hocon b/rel/i18n/emqx_bridge_clickhouse.hocon index 929780fbe..2a77bba3e 100644 --- a/rel/i18n/emqx_bridge_clickhouse.hocon +++ b/rel/i18n/emqx_bridge_clickhouse.hocon @@ -11,9 +11,9 @@ action_parameters.desc: action_parameters.label: """Action""" -cassandra_action.desc: +clickhouse_action.desc: """Action configs.""" -cassandra_action.label: +clickhouse_action.label: """Action""" config_enable.desc: From ce0c27e80276d0c4dde12f4e28473b69b9eefcea Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Sun, 18 Feb 2024 14:49:58 +0800 Subject: [PATCH 4/4] fix: correct the swagger example for clickhouse connectors --- apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.erl index 11f5fff5a..1c7e786d8 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.erl @@ -66,7 +66,7 @@ connector_examples(Method) -> summary => <<"ClickHouse Connector">>, value => emqx_connector_schema:connector_values( Method, clickhouse, #{ - server => <<"127.0.0.1:8123">>, + url => <<"http://localhost:8123">>, database => <<"mqtt">>, pool_size => 8, username => <<"default">>,