diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index 3a9e13f94..e54ef6124 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -97,6 +97,7 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_rocketmq_action_info, emqx_bridge_influxdb_action_info, emqx_bridge_cassandra_action_info, + emqx_bridge_clickhouse_action_info, emqx_bridge_mysql_action_info, emqx_bridge_pgsql_action_info, emqx_bridge_syskeeper_action_info, diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl index a84d3912b..c9d3246d3 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl @@ -32,8 +32,6 @@ on_get_status/2 ]). --export([transform_bridge_v1_config_to_connector_config/1]). - %% callbacks of ecpool -export([ connect/1, @@ -448,9 +446,6 @@ handle_result({error, Error}) -> handle_result(Res) -> Res. -transform_bridge_v1_config_to_connector_config(_) -> - ok. - %%-------------------------------------------------------------------- %% utils diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src index 3288b83fd..d96d06375 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_clickhouse, [ {description, "EMQX Enterprise ClickHouse Bridge"}, - {vsn, "0.2.5"}, + {vsn, "0.3.0"}, {registered, []}, {applications, [ kernel, @@ -8,7 +8,7 @@ emqx_resource, clickhouse ]}, - {env, []}, + {env, [{emqx_action_info_modules, [emqx_bridge_clickhouse_action_info]}]}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.erl index deca42154..11f5fff5a 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse.erl @@ -9,8 +9,11 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). +%% Examples -export([ - conn_bridge_examples/1 + bridge_v2_examples/1, + conn_bridge_examples/1, + connector_examples/1 ]). -export([ @@ -20,12 +23,10 @@ desc/1 ]). --define(DEFAULT_SQL, - <<"INSERT INTO mqtt_test(payload, arrived) VALUES ('${payload}', ${timestamp})">> -). - +-define(DEFAULT_SQL, <<"INSERT INTO messages(data, arrived) VALUES ('${payload}', ${timestamp})">>). -define(DEFAULT_BATCH_VALUE_SEPARATOR, <<", ">>). - +-define(CONNECTOR_TYPE, clickhouse). +-define(ACTION_TYPE, clickhouse). %% ------------------------------------------------------------------------------------------------- %% Callback used by HTTP API %% ------------------------------------------------------------------------------------------------- @@ -40,6 +41,42 @@ conn_bridge_examples(Method) -> } ]. +bridge_v2_examples(Method) -> + ParamsExample = #{ + parameters => #{ + batch_value_separator => ?DEFAULT_BATCH_VALUE_SEPARATOR, + sql => ?DEFAULT_SQL + } + }, + [ + #{ + <<"clickhouse">> => #{ + summary => <<"ClickHouse Action">>, + value => emqx_bridge_v2_schema:action_values( + Method, clickhouse, clickhouse, ParamsExample + ) + } + } + ]. + +connector_examples(Method) -> + [ + #{ + <<"clickhouse">> => #{ + summary => <<"ClickHouse Connector">>, + value => emqx_connector_schema:connector_values( + Method, clickhouse, #{ + server => <<"127.0.0.1:8123">>, + database => <<"mqtt">>, + pool_size => 8, + username => <<"default">>, + password => <<"******">> + } + ) + } + } + ]. + values(_Method, Type) -> #{ enable => true, @@ -71,19 +108,49 @@ namespace() -> "bridge_clickhouse". roots() -> []. +fields("config_connector") -> + emqx_connector_schema:common_fields() ++ + emqx_bridge_clickhouse_connector:fields(config) ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); +fields(action) -> + {clickhouse, + mk( + hoconsc:map(name, ref(?MODULE, clickhouse_action)), + #{desc => <<"ClickHouse Action Config">>, required => false} + )}; +fields(clickhouse_action) -> + emqx_bridge_v2_schema:make_producer_action_schema( + mk(ref(?MODULE, action_parameters), #{ + required => true, desc => ?DESC(action_parameters) + }) + ); +fields(action_parameters) -> + [ + sql_field(), + batch_value_separator_field() + ]; +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + Fields = + emqx_bridge_clickhouse_connector:fields(config) ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts), + emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields); +fields(Field) when + Field == "get_bridge_v2"; + Field == "post_bridge_v2"; + Field == "put_bridge_v2" +-> + emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(clickhouse_action)); fields("config") -> [ {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, - {sql, - mk( - binary(), - #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>} - )}, - {batch_value_separator, - mk( - binary(), - #{desc => ?DESC("batch_value_separator"), default => ?DEFAULT_BATCH_VALUE_SEPARATOR} - )}, + sql_field(), + batch_value_separator_field(), {local_topic, mk( binary(), @@ -112,6 +179,28 @@ fields("get") -> fields("post", Type) -> [type_field(Type), name_field() | fields("config")]. +sql_field() -> + {sql, + mk( + binary(), + #{desc => ?DESC("sql_template"), default => ?DEFAULT_SQL, format => <<"sql">>} + )}. + +batch_value_separator_field() -> + {batch_value_separator, + mk( + binary(), + #{desc => ?DESC("batch_value_separator"), default => ?DEFAULT_BATCH_VALUE_SEPARATOR} + )}. + +desc(clickhouse_action) -> + ?DESC(clickhouse_action); +desc(action_parameters) -> + ?DESC(action_parameters); +desc("config_connector") -> + ?DESC("desc_config"); +desc(connector_resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); desc("config") -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_action_info.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_action_info.erl new file mode 100644 index 000000000..066569f7e --- /dev/null +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_action_info.erl @@ -0,0 +1,62 @@ +-module(emqx_bridge_clickhouse_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_config_to_action_config/2, + bridge_v1_config_to_connector_config/1, + connector_action_config_to_bridge_v1_config/2, + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +-import(emqx_utils_conv, [bin/1]). + +-define(SCHEMA_MODULE, emqx_bridge_clickhouse). + +bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) -> + ActionTopLevelKeys = schema_keys(clickhouse_action), + ActionParametersKeys = schema_keys(action_parameters), + ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, + ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1, + ActionConfig#{<<"connector">> => ConnectorName} + ). + +bridge_v1_config_to_connector_config(BridgeV1Config) -> + ActionTopLevelKeys = schema_keys(clickhouse_action), + ActionParametersKeys = schema_keys(action_parameters), + ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, + ConnectorTopLevelKeys = schema_keys("config_connector"), + ConnectorKeys = maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys), + ConnConfig0 = maps:with(ConnectorKeys, BridgeV1Config), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_connector_schema:project_to_connector_resource_opts/1, + ConnConfig0 + ). + +connector_action_config_to_bridge_v1_config(ConnectorRawConf, ActionRawConf) -> + RawConf = emqx_action_info:connector_action_config_to_bridge_v1_config( + ConnectorRawConf, ActionRawConf + ), + maps:without([<<"clickhouse_type">>], RawConf). + +bridge_v1_type_name() -> clickhouse. + +action_type_name() -> clickhouse. + +connector_type_name() -> clickhouse. + +schema_module() -> ?SCHEMA_MODULE. + +make_config_map(PickKeys, IndentKeys, Config) -> + Conf0 = maps:with(PickKeys, Config), + emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0). + +schema_keys(Name) -> + [bin(Key) || Key <- proplists:get_keys(?SCHEMA_MODULE:fields(Name))]. diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl index 0a6c504c7..e7327b56c 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl @@ -32,6 +32,10 @@ callback_mode/0, on_start/2, on_stop/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channel_status/3, + on_get_channels/1, on_query/3, on_batch_query/3, on_get_status/2 @@ -62,6 +66,7 @@ -type state() :: #{ + channels => #{binary() => templates()}, templates := templates(), pool_name := binary(), connect_timeout := pos_integer() @@ -155,10 +160,9 @@ on_start( {pool, InstanceID} ], try - Templates = prepare_sql_templates(Config), State = #{ + channels => #{}, pool_name => InstanceID, - templates => Templates, connect_timeout => ConnectTimeout }, case emqx_resource_pool:start(InstanceID, ?MODULE, Options) of @@ -195,10 +199,8 @@ prepare_sql_templates(#{ sql := Template, batch_value_separator := Separator }) -> - InsertTemplate = - emqx_placeholder:preproc_tmpl(Template), - BulkExtendInsertTemplate = - prepare_sql_bulk_extend_template(Template, Separator), + InsertTemplate = emqx_placeholder:preproc_tmpl(Template), + BulkExtendInsertTemplate = prepare_sql_bulk_extend_template(Template, Separator), #{ send_message_template => InsertTemplate, extend_send_message_template => BulkExtendInsertTemplate @@ -285,6 +287,27 @@ on_stop(InstanceID, _State) -> }), emqx_resource_pool:stop(InstanceID). +%% ------------------------------------------------------------------- +%% channel related emqx_resouce callbacks +%% ------------------------------------------------------------------- +on_add_channel(_InstId, #{channels := Channs} = OldState, ChannId, ChannConf0) -> + #{parameters := ParamConf} = ChannConf0, + NewChanns = Channs#{ChannId => #{templates => prepare_sql_templates(ParamConf)}}, + {ok, OldState#{channels => NewChanns}}. + +on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannId) -> + NewState = State#{channels => maps:remove(ChannId, Channels)}, + {ok, NewState}. + +on_get_channel_status(InstanceId, _ChannId, State) -> + case on_get_status(InstanceId, State) of + {connected, _} -> connected; + {disconnected, _, _} -> disconnected + end. + +on_get_channels(InstanceId) -> + emqx_bridge_v2:get_channels_for_connector(InstanceId). + %% ------------------------------------------------------------------- %% on_get_status emqx_resouce callback and related functions %% ------------------------------------------------------------------- @@ -339,8 +362,8 @@ do_get_status(PoolName, Timeout) -> -spec on_query (resource_id(), Request, resource_state()) -> query_result() when - Request :: {RequestType, Data}, - RequestType :: send_message, + Request :: {ChannId, Data}, + ChannId :: binary(), Data :: map(); (resource_id(), Request, resource_state()) -> query_result() when Request :: {RequestType, SQL}, @@ -361,12 +384,20 @@ on_query( }), %% Have we got a query or data to fit into an SQL template? SimplifiedRequestType = query_type(RequestType), - #{templates := Templates} = State, + Templates = get_templates(RequestType, State), SQL = get_sql(SimplifiedRequestType, Templates, DataOrSQL), ClickhouseResult = execute_sql_in_clickhouse_server(PoolName, SQL), transform_and_log_clickhouse_result(ClickhouseResult, ResourceID, SQL). -get_sql(send_message, #{send_message_template := PreparedSQL}, Data) -> +get_templates(ChannId, State) -> + case maps:find(channels, State) of + {ok, Channels} -> + maps:get(templates, maps:get(ChannId, Channels, #{}), #{}); + error -> + #{} + end. + +get_sql(channel_message, #{send_message_template := PreparedSQL}, Data) -> emqx_placeholder:proc_tmpl(PreparedSQL, Data); get_sql(_, _, SQL) -> SQL. @@ -376,24 +407,21 @@ query_type(sql) -> query_type(query) -> query; %% Data that goes to bridges use the prepared template -query_type(send_message) -> - send_message. +query_type(ChannId) when is_binary(ChannId) -> + channel_message. %% ------------------------------------------------------------------- %% on_batch_query emqx_resouce callback and related functions %% ------------------------------------------------------------------- -spec on_batch_query(resource_id(), BatchReq, resource_state()) -> query_result() when - BatchReq :: nonempty_list({'send_message', map()}). + BatchReq :: nonempty_list({binary(), map()}). -on_batch_query( - ResourceID, - BatchReq, - #{pool_name := PoolName, templates := Templates} = _State -) -> - %% Currently we only support batch requests with the send_message key - {Keys, ObjectsToInsert} = lists:unzip(BatchReq), - ensure_keys_are_of_type_send_message(Keys), +on_batch_query(ResourceID, BatchReq, #{pool_name := PoolName} = State) -> + %% Currently we only support batch requests with a binary ChannId + {[ChannId | _] = Keys, ObjectsToInsert} = lists:unzip(BatchReq), + ensure_channel_messages(Keys), + Templates = get_templates(ChannId, State), %% Create batch insert SQL statement SQL = objects_to_sql(ObjectsToInsert, Templates), %% Do the actual query in the database @@ -401,22 +429,16 @@ on_batch_query( %% Transform the result to a better format transform_and_log_clickhouse_result(ResultFromClickhouse, ResourceID, SQL). -ensure_keys_are_of_type_send_message(Keys) -> - case lists:all(fun is_send_message_atom/1, Keys) of +ensure_channel_messages(Keys) -> + case lists:all(fun is_binary/1, Keys) of true -> ok; false -> erlang:error( - {unrecoverable_error, - <<"Unexpected type for batch message (Expected send_message)">>} + {unrecoverable_error, <<"Unexpected type for batch message (Expected channel-id)">>} ) end. -is_send_message_atom(send_message) -> - true; -is_send_message_atom(_) -> - false. - objects_to_sql( [FirstObject | RemainingObjects] = _ObjectsToInsert, #{ diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index c0b0c365a..4679e1bc4 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -42,6 +42,8 @@ resource_type(influxdb) -> emqx_bridge_influxdb_connector; resource_type(cassandra) -> emqx_bridge_cassandra_connector; +resource_type(clickhouse) -> + emqx_bridge_clickhouse_connector; resource_type(mysql) -> emqx_bridge_mysql_connector; resource_type(pgsql) -> @@ -181,6 +183,14 @@ connector_structs() -> required => false } )}, + {clickhouse, + mk( + hoconsc:map(name, ref(emqx_bridge_clickhouse, "config_connector")), + #{ + desc => <<"ClickHouse Connector Config">>, + required => false + } + )}, {mysql, mk( hoconsc:map(name, ref(emqx_bridge_mysql, "config_connector")), @@ -307,6 +317,7 @@ schema_modules() -> emqx_bridge_oracle, emqx_bridge_influxdb, emqx_bridge_cassandra, + emqx_bridge_clickhouse, emqx_bridge_mysql, emqx_bridge_syskeeper_connector, emqx_bridge_syskeeper_proxy, @@ -345,6 +356,7 @@ api_schemas(Method) -> api_ref(emqx_bridge_oracle, <<"oracle">>, Method ++ "_connector"), api_ref(emqx_bridge_influxdb, <<"influxdb">>, Method ++ "_connector"), api_ref(emqx_bridge_cassandra, <<"cassandra">>, Method ++ "_connector"), + api_ref(emqx_bridge_clickhouse, <<"clickhouse">>, Method ++ "_connector"), api_ref(emqx_bridge_mysql, <<"mysql">>, Method ++ "_connector"), api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method), api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method), diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 751efa3d9..27d7f6379 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -142,6 +142,8 @@ connector_type_to_bridge_types(influxdb) -> [influxdb, influxdb_api_v1, influxdb_api_v2]; connector_type_to_bridge_types(cassandra) -> [cassandra]; +connector_type_to_bridge_types(clickhouse) -> + [clickhouse]; connector_type_to_bridge_types(mysql) -> [mysql]; connector_type_to_bridge_types(mqtt) -> diff --git a/changes/ee/feat-19999.en.md b/changes/ee/feat-19999.en.md new file mode 100644 index 000000000..943bf8442 --- /dev/null +++ b/changes/ee/feat-19999.en.md @@ -0,0 +1 @@ +The bridges for ClickHouse have been split so they are available via the connectors and actions APIs. They are still backwards compatible with the old bridge API. diff --git a/rel/i18n/emqx_bridge_clickhouse.hocon b/rel/i18n/emqx_bridge_clickhouse.hocon index 7d1961f98..929780fbe 100644 --- a/rel/i18n/emqx_bridge_clickhouse.hocon +++ b/rel/i18n/emqx_bridge_clickhouse.hocon @@ -6,6 +6,16 @@ batch_value_separator.desc: batch_value_separator.label: """Batch Value Separator""" +action_parameters.desc: +"""Action specific configs.""" +action_parameters.label: +"""Action""" + +cassandra_action.desc: +"""Action configs.""" +cassandra_action.label: +"""Action""" + config_enable.desc: """Enable or disable this bridge"""