From f7984be9464ef0bf8cc666c2ec07e5a8fa12555e Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 8 Sep 2023 17:00:02 +0200 Subject: [PATCH] feat: add connector schema scaffold and break out Kafka conector This commit is the beginning of an effort to split bridges into a connector part and a bridge part. Several bridges should be able to share a connector pool defined by a single connector. The connectors should be possible to enable and disable similar to how one can disable and enable bridges. There should also be an API for checking the status of a connector and for add/edit/delete connectors similar to the current bridge API. Issues: https://emqx.atlassian.net/browse/EMQX-10805 --- apps/emqx/src/emqx_config.erl | 15 +- apps/emqx_bridge/src/emqx_bridge.erl | 2 + apps/emqx_bridge/src/emqx_bridge_app.erl | 8 + apps/emqx_bridge/src/emqx_bridge_v2.erl | 484 +++++++++++++++ .../src/schema/emqx_action_enterprise.erl | 32 + .../src/schema/emqx_action_schema.erl | 60 ++ .../src/emqx_bridge_kafka.erl | 10 + .../src/emqx_bridge_kafka_impl_producer.erl | 389 +++++++----- .../emqx_bridge_kafka_impl_producer_SUITE.erl | 19 +- .../emqx_bridge_v2_kafka_producer_SUITE.erl | 212 +++++++ apps/emqx_conf/src/emqx_conf_schema.erl | 2 + apps/emqx_connector/src/emqx_connector.erl | 580 ++++++++++++++++++ .../emqx_connector/src/emqx_connector_app.erl | 6 + .../src/emqx_connector_resource.erl | 429 +++++++++++++ .../src/schema/emqx_connector_enterprise.erl | 46 ++ .../src/schema/emqx_connector_schema.erl | 223 +++++++ apps/emqx_resource/include/emqx_resource.hrl | 1 + apps/emqx_resource/src/emqx_resource.erl | 203 +++++- .../src/emqx_resource_buffer_worker.erl | 137 ++++- .../src/emqx_resource_manager.erl | 269 +++++++- .../src/emqx_rule_actions.erl | 8 + .../emqx_rule_engine/src/emqx_rule_engine.erl | 7 +- .../src/emqx_rule_engine_api.erl | 2 + .../src/emqx_rule_runtime.erl | 27 + rel/i18n/emqx_bridge_kafka.hocon | 14 + rel/i18n/emqx_connector_schema.hocon | 9 + 26 files changed, 2984 insertions(+), 210 deletions(-) create mode 100644 apps/emqx_bridge/src/emqx_bridge_v2.erl create mode 100644 apps/emqx_bridge/src/schema/emqx_action_enterprise.erl create mode 100644 apps/emqx_bridge/src/schema/emqx_action_schema.erl create mode 100644 apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl create mode 100644 apps/emqx_connector/src/emqx_connector.erl create mode 100644 apps/emqx_connector/src/emqx_connector_resource.erl create mode 100644 apps/emqx_connector/src/schema/emqx_connector_enterprise.erl create mode 100644 apps/emqx_connector/src/schema/emqx_connector_schema.erl create mode 100644 rel/i18n/emqx_connector_schema.hocon diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index 450f3e1b0..add33c31f 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -325,19 +325,20 @@ init_load(SchemaMod, Conf) when is_list(Conf) orelse is_binary(Conf) -> ok = save_schema_mod_and_names(SchemaMod), HasDeprecatedFile = has_deprecated_file(), RawConf0 = load_config_files(HasDeprecatedFile, Conf), - warning_deprecated_root_key(RawConf0), - RawConf1 = + RawConf1 = emqx_connector_schema:transform_old_style_bridges_to_connector_and_actions(RawConf0), + warning_deprecated_root_key(RawConf1), + RawConf2 = case HasDeprecatedFile of true -> - overlay_v0(SchemaMod, RawConf0); + overlay_v0(SchemaMod, RawConf1); false -> - overlay_v1(SchemaMod, RawConf0) + overlay_v1(SchemaMod, RawConf1) end, - RawConf = fill_defaults_for_all_roots(SchemaMod, RawConf1), + RawConf3 = fill_defaults_for_all_roots(SchemaMod, RawConf2), %% check configs against the schema - {AppEnvs, CheckedConf} = check_config(SchemaMod, RawConf, #{}), + {AppEnvs, CheckedConf} = check_config(SchemaMod, RawConf3, #{}), save_to_app_env(AppEnvs), - ok = save_to_config_map(CheckedConf, RawConf), + ok = save_to_config_map(CheckedConf, RawConf3), maybe_init_default_zone(), ok. diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 0f8f39ca2..b7e990ae2 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -65,6 +65,8 @@ import_config/1 ]). +-export([query_opts/1]). + -define(EGRESS_DIR_BRIDGES(T), T == webhook; T == mysql; diff --git a/apps/emqx_bridge/src/emqx_bridge_app.erl b/apps/emqx_bridge/src/emqx_bridge_app.erl index d0dd7da2b..c5f291297 100644 --- a/apps/emqx_bridge/src/emqx_bridge_app.erl +++ b/apps/emqx_bridge/src/emqx_bridge_app.erl @@ -28,21 +28,29 @@ -define(TOP_LELVE_HDLR_PATH, (emqx_bridge:config_key_path())). -define(LEAF_NODE_HDLR_PATH, (emqx_bridge:config_key_path() ++ ['?', '?'])). +-define(TOP_LELVE_HDLR_PATH_BRIDGE_V2, (emqx_bridge_v2:config_key_path())). +-define(LEAF_NODE_HDLR_PATH_BRIDGE_V2, (emqx_bridge_v2:config_key_path() ++ ['?', '?'])). start(_StartType, _StartArgs) -> {ok, Sup} = emqx_bridge_sup:start_link(), ok = ensure_enterprise_schema_loaded(), ok = emqx_bridge:load(), + ok = emqx_bridge_v2:load(), ok = emqx_bridge:load_hook(), ok = emqx_config_handler:add_handler(?LEAF_NODE_HDLR_PATH, ?MODULE), ok = emqx_config_handler:add_handler(?TOP_LELVE_HDLR_PATH, emqx_bridge), + ok = emqx_config_handler:add_handler(?LEAF_NODE_HDLR_PATH_BRIDGE_V2, emqx_bridge_v2), + ok = emqx_config_handler:add_handler(?TOP_LELVE_HDLR_PATH_BRIDGE_V2, emqx_bridge_v2), ?tp(emqx_bridge_app_started, #{}), {ok, Sup}. stop(_State) -> emqx_conf:remove_handler(?LEAF_NODE_HDLR_PATH), emqx_conf:remove_handler(?TOP_LELVE_HDLR_PATH), + emqx_conf:remove_handler(emqx_bridge_v2:config_key_path()), + emqx_conf:remove_handler(?LEAF_NODE_HDLR_PATH_BRIDGE_V2), ok = emqx_bridge:unload(), + ok = emqx_bridge_v2:unload(), ok. -if(?EMQX_RELEASE_EDITION == ee). diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl new file mode 100644 index 000000000..3877ea165 --- /dev/null +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -0,0 +1,484 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_bridge_v2). + +-behaviour(emqx_config_handler). +% -behaviour(emqx_config_backup). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_hooks.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-export([ + load/0, + unload/0, + is_bridge_v2_type/1, + id/2, + id/3, + parse_id/1, + send_message/4, + bridge_v2_type_to_connector_type/1, + is_bridge_v2_id/1, + extract_connector_id_from_bridge_v2_id/1, + is_bridge_v2_installed_in_connector_state/2, + get_channels_for_connector/1 +]). + +%% CRUD API + +-export([ + list/0, + lookup/1, + lookup/2, + get_metrics/2, + config_key_path/0, + disable_enable/3, + create/3, + remove/2, + health_check/2 +]). + +%% Config Update Handler API + +-export([ + post_config_update/5 +]). + +-define(ROOT_KEY, bridges_v2). + +get_channels_for_connector(ConnectorId) -> + {ConnectorType, ConnectorName} = emqx_connector_resource:parse_connector_id(ConnectorId), + RootConf = maps:keys(emqx:get_config([?ROOT_KEY], #{})), + RelevantBridgeV2Types = [ + Type + || Type <- RootConf, + bridge_v2_type_to_connector_type(Type) =:= ConnectorType + ], + lists:flatten([ + get_channels_for_connector(ConnectorName, BridgeV2Type) + || BridgeV2Type <- RelevantBridgeV2Types + ]). + +get_channels_for_connector(ConnectorName, BridgeV2Type) -> + BridgeV2s = emqx:get_config([?ROOT_KEY, BridgeV2Type], #{}), + [ + {id(BridgeV2Type, Name, ConnectorName), Conf} + || {Name, Conf} <- maps:to_list(BridgeV2s), + bin(ConnectorName) =:= maps:get(connector, Conf, no_name) + ]. + +load() -> + % Bridge_V2s = emqx:get_config([?ROOT_KEY], #{}), + % lists:foreach( + % fun({Type, NamedConf}) -> + % lists:foreach( + % fun({Name, Conf}) -> + % install_bridge_v2( + % Type, + % Name, + % Conf + % ) + % end, + % maps:to_list(NamedConf) + % ) + % end, + % maps:to_list(Bridge_V2s) + % ), + ok. + +unload() -> + % Bridge_V2s = emqx:get_config([?ROOT_KEY], #{}), + % lists:foreach( + % fun({Type, NamedConf}) -> + % lists:foreach( + % fun({Name, Conf}) -> + % uninstall_bridge_v2( + % Type, + % Name, + % Conf + % ) + % end, + % maps:to_list(NamedConf) + % ) + % end, + % maps:to_list(Bridge_V2s) + % ), + ok. + +install_bridge_v2( + _BridgeType, + _BridgeName, + #{enable := false} +) -> + ok; +install_bridge_v2( + BridgeV2Type, + BridgeName, + #{connector := ConnectorName} = Config +) -> + CreationOpts = emqx_resource:fetch_creation_opts(Config), + BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName), + %% Create metrics for Bridge V2 + ok = emqx_resource:create_metrics(BridgeV2Id), + %% We might need to create buffer workers for Bridge V2 + case get_query_mode(BridgeV2Type, Config) of + %% the Bridge V2 has built-in buffer, so there is no need for resource workers + simple_sync -> + ok; + simple_async -> + ok; + %% The Bridge V2 is a consumer Bridge V2, so there is no need for resource workers + no_queries -> + ok; + _ -> + %% start resource workers as the query type requires them + ok = emqx_resource_buffer_worker_sup:start_workers(BridgeV2Id, CreationOpts) + end, + %% If there is a running connector, we need to install the Bridge V2 in it + ConnectorId = emqx_connector_resource:resource_id( + bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName + ), + emqx_resource_manager:add_channel(ConnectorId, BridgeV2Id, Config), + ok. + +uninstall_bridge_v2( + _BridgeType, + _BridgeName, + #{enable := false} +) -> + %% Already not installed + ok; +uninstall_bridge_v2( + BridgeV2Type, + BridgeName, + #{connector := ConnectorName} = Config +) -> + BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName), + CreationOpts = emqx_resource:fetch_creation_opts(Config), + ok = emqx_resource_buffer_worker_sup:stop_workers(BridgeV2Id, CreationOpts), + ok = emqx_resource:clear_metrics(BridgeV2Id), + %% Deinstall from connector + ConnectorId = emqx_connector_resource:resource_id( + bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName + ), + emqx_resource_manager:remove_channel(ConnectorId, BridgeV2Id). + +get_query_mode(BridgeV2Type, Config) -> + CreationOpts = emqx_resource:fetch_creation_opts(Config), + ResourceType = emqx_bridge_resource:bridge_to_resource_type(BridgeV2Type), + emqx_resource:query_mode(ResourceType, Config, CreationOpts). + +send_message(BridgeType, BridgeName, Message, QueryOpts0) -> + case lookup(BridgeType, BridgeName) of + #{enable := true} = Config -> + do_send_msg_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config); + #{enable := false} -> + {error, bridge_stopped}; + Error -> + Error + end. + +health_check(BridgeType, BridgeName) -> + case lookup(BridgeType, BridgeName) of + #{ + enable := true, + connector := ConnectorName + } -> + ConnectorId = emqx_connector_resource:resource_id( + bridge_v2_type_to_connector_type(BridgeType), ConnectorName + ), + emqx_resource_manager:channel_health_check( + ConnectorId, id(BridgeType, BridgeName, ConnectorName) + ); + #{enable := false} -> + {error, bridge_stopped}; + Error -> + Error + end. + +% do_send_msg_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config) -> +% BridgeV2Id = emqx_bridge_v2:id(BridgeType, BridgeName), +% ConnectorResourceId = emqx_bridge_v2:extract_connector_id_from_bridge_v2_id(BridgeV2Id), +% try +% case emqx_resource_manager:maybe_install_bridge_v2(ConnectorResourceId, BridgeV2Id) of +% ok -> +% do_send_msg_after_bridge_v2_installed( +% BridgeType, +% BridgeName, +% BridgeV2Id, +% Message, +% QueryOpts0, +% Config +% ); +% InstallError -> +% throw(InstallError) +% end +% catch +% Error:Reason:Stack -> +% Msg = iolist_to_binary( +% io_lib:format( +% "Failed to install bridge_v2 ~p in connector ~p: ~p", +% [BridgeV2Id, ConnectorResourceId, Reason] +% ) +% ), +% ?SLOG(error, #{ +% msg => Msg, +% error => Error, +% reason => Reason, +% stacktrace => Stack +% }) +% end. + +do_send_msg_with_enabled_config( + BridgeType, BridgeName, Message, QueryOpts0, Config +) -> + QueryMode = get_query_mode(BridgeType, Config), + QueryOpts = maps:merge( + emqx_bridge:query_opts(Config), + QueryOpts0#{ + query_mode => QueryMode + } + ), + BridgeV2Id = emqx_bridge_v2:id(BridgeType, BridgeName), + emqx_resource:query(BridgeV2Id, {BridgeV2Id, Message}, QueryOpts). + +parse_id(Id) -> + case binary:split(Id, <<":">>, [global]) of + [Type, Name] -> + {Type, Name}; + [<<"bridge_v2">>, Type, Name | _] -> + {Type, Name}; + _X -> + error({error, iolist_to_binary(io_lib:format("Invalid id: ~p", [Id]))}) + end. + +id(BridgeType, BridgeName) -> + case lookup(BridgeType, BridgeName) of + #{connector := ConnectorName} -> + id(BridgeType, BridgeName, ConnectorName); + Error -> + error(Error) + end. + +id(BridgeType, BridgeName, ConnectorName) -> + ConnectorType = bin(bridge_v2_type_to_connector_type(BridgeType)), + <<"bridge_v2:", (bin(BridgeType))/binary, ":", (bin(BridgeName))/binary, ":connector:", + (bin(ConnectorType))/binary, ":", (bin(ConnectorName))/binary>>. + +bridge_v2_type_to_connector_type(kafka) -> + kafka. + +is_bridge_v2_type(kafka) -> true; +is_bridge_v2_type(_) -> false. + +is_bridge_v2_id(<<"bridge_v2:", _/binary>>) -> true; +is_bridge_v2_id(_) -> false. + +extract_connector_id_from_bridge_v2_id(Id) -> + case binary:split(Id, <<":">>, [global]) of + [<<"bridge_v2">>, _Type, _Name, <<"connector">>, ConnectorType, ConnecorName] -> + <<"connector:", ConnectorType/binary, ":", ConnecorName/binary>>; + _X -> + error({error, iolist_to_binary(io_lib:format("Invalid bridge V2 ID: ~p", [Id]))}) + end. + +bin(Bin) when is_binary(Bin) -> Bin; +bin(Str) when is_list(Str) -> list_to_binary(Str); +bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). + +%% Basic CRUD Operations + +list() -> + maps:fold( + fun(Type, NameAndConf, Bridges) -> + maps:fold( + fun(Name, RawConf, Acc) -> + [ + #{ + type => Type, + name => Name, + raw_config => RawConf + } + | Acc + ] + end, + Bridges, + NameAndConf + ) + end, + [], + emqx:get_raw_config([?ROOT_KEY], #{}) + ). + +lookup(Id) -> + {Type, Name} = parse_id(Id), + lookup(Type, Name). + +lookup(Type, Name) -> + case emqx:get_config([?ROOT_KEY, Type, Name], not_found) of + not_found -> + {error, bridge_not_found}; + Config -> + Config + end. + +get_metrics(Type, Name) -> + emqx_resource:get_metrics(id(Type, Name)). + +config_key_path() -> + [?ROOT_KEY]. + +disable_enable(Action, BridgeType, BridgeName) when + Action =:= disable; Action =:= enable +-> + emqx_conf:update( + config_key_path() ++ [BridgeType, BridgeName], + {Action, BridgeType, BridgeName}, + #{override_to => cluster} + ). + +create(BridgeType, BridgeName, RawConf) -> + ?SLOG(debug, #{ + brige_action => create, + bridge_version => 2, + bridge_type => BridgeType, + bridge_name => BridgeName, + bridge_raw_config => emqx_utils:redact(RawConf) + }), + emqx_conf:update( + config_key_path() ++ [BridgeType, BridgeName], + RawConf, + #{override_to => cluster} + ). + +remove(BridgeType, BridgeName) -> + ?SLOG(debug, #{ + brige_action => remove, + bridge_version => 2, + bridge_type => BridgeType, + bridge_name => BridgeName + }), + emqx_conf:remove( + config_key_path() ++ [BridgeType, BridgeName], + #{override_to => cluster} + ). + +%% This top level handler will be triggered when the bridges_v2 path is updated +%% with calls to emqx_conf:update([bridges_v2], BridgesConf, #{}). +%% +%% A public API that can trigger this is: +%% bin/emqx ctl conf load data/configs/cluster.hocon +post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) -> + #{added := Added, removed := Removed, changed := Updated} = + diff_confs(NewConf, OldConf), + %% The config update will be failed if any task in `perform_bridge_changes` failed. + RemoveFun = fun uninstall_bridge_v2/3, + CreateFun = fun install_bridge_v2/3, + UpdateFun = fun(Type, Name, {OldBridgeConf, Conf}) -> + uninstall_bridge_v2(Type, Name, OldBridgeConf), + install_bridge_v2(Type, Name, Conf) + end, + Result = perform_bridge_changes([ + #{action => RemoveFun, data => Removed}, + #{ + action => CreateFun, + data => Added, + on_exception_fn => fun emqx_bridge_resource:remove/4 + }, + #{action => UpdateFun, data => Updated} + ]), + ?tp(bridge_post_config_update_done, #{}), + Result; +post_config_update([?ROOT_KEY, BridgeType, BridgeName], '$remove', _, _OldConf, _AppEnvs) -> + Conf = emqx:get_config([?ROOT_KEY, BridgeType, BridgeName]), + ok = uninstall_bridge_v2(BridgeType, BridgeName, Conf), + ?tp(bridge_post_config_update_done, #{}), + ok; +post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, undefined, _AppEnvs) -> + ok = install_bridge_v2(BridgeType, BridgeName, NewConf), + ?tp(bridge_post_config_update_done, #{}), + ok; +post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, OldConf, _AppEnvs) -> + ok = uninstall_bridge_v2(BridgeType, BridgeName, OldConf), + ok = install_bridge_v2(BridgeType, BridgeName, NewConf), + ?tp(bridge_post_config_update_done, #{}), + ok. + +diff_confs(NewConfs, OldConfs) -> + emqx_utils_maps:diff_maps( + flatten_confs(NewConfs), + flatten_confs(OldConfs) + ). + +flatten_confs(Conf0) -> + maps:from_list( + lists:flatmap( + fun({Type, Conf}) -> + do_flatten_confs(Type, Conf) + end, + maps:to_list(Conf0) + ) + ). + +do_flatten_confs(Type, Conf0) -> + [{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)]. + +perform_bridge_changes(Tasks) -> + perform_bridge_changes(Tasks, ok). + +perform_bridge_changes([], Result) -> + Result; +perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], Result0) -> + OnException = maps:get(on_exception_fn, Task, fun(_Type, _Name, _Conf, _Opts) -> ok end), + Result = maps:fold( + fun + ({_Type, _Name}, _Conf, {error, Reason}) -> + {error, Reason}; + %% for update + ({Type, Name}, {OldConf, Conf}, _) -> + case Action(Type, Name, {OldConf, Conf}) of + {error, Reason} -> {error, Reason}; + Return -> Return + end; + ({Type, Name}, Conf, _) -> + try Action(Type, Name, Conf) of + {error, Reason} -> {error, Reason}; + Return -> Return + catch + Kind:Error:Stacktrace -> + ?SLOG(error, #{ + msg => "bridge_config_update_exception", + kind => Kind, + error => Error, + type => Type, + name => Name, + stacktrace => Stacktrace + }), + OnException(Type, Name, Conf), + erlang:raise(Kind, Error, Stacktrace) + end + end, + Result0, + MapConfs + ), + perform_bridge_changes(Tasks, Result). + +is_bridge_v2_installed_in_connector_state(Tag, State) when is_map(State) -> + BridgeV2s = maps:get(installed_bridge_v2s, State, #{}), + maps:is_key(Tag, BridgeV2s); +is_bridge_v2_installed_in_connector_state(_Tag, _State) -> + false. diff --git a/apps/emqx_bridge/src/schema/emqx_action_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_action_enterprise.erl new file mode 100644 index 000000000..f19b4302a --- /dev/null +++ b/apps/emqx_bridge/src/schema/emqx_action_enterprise.erl @@ -0,0 +1,32 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_action_enterprise). + +-if(?EMQX_RELEASE_EDITION == ee). + +-include_lib("hocon/include/hoconsc.hrl"). +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-export([ + fields/1 +]). + +fields(bridges_v2) -> + kafka_structs(). + +kafka_structs() -> + [ + {kafka, + mk( + hoconsc:map(name, ref(emqx_bridge_kafka, kafka_producer_action)), + #{ + desc => <<"Kafka Producer Bridge V2 Config">>, + required => false + } + )} + ]. + +-else. + +-endif. diff --git a/apps/emqx_bridge/src/schema/emqx_action_schema.erl b/apps/emqx_bridge/src/schema/emqx_action_schema.erl new file mode 100644 index 000000000..ce8f5d6a4 --- /dev/null +++ b/apps/emqx_bridge/src/schema/emqx_action_schema.erl @@ -0,0 +1,60 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_action_schema). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-import(hoconsc, [mk/2, ref/2]). + +-export([roots/0, fields/1, desc/1, namespace/0, tags/0]). + +-if(?EMQX_RELEASE_EDITION == ee). + +enterprise_fields_actions() -> + %% We *must* do this to ensure the module is really loaded, especially when we use + %% `call_hocon' from `nodetool' to generate initial configurations. + _ = emqx_action_enterprise:module_info(), + case erlang:function_exported(emqx_action_enterprise, fields, 1) of + true -> + emqx_action_enterprise:fields(bridges_v2); + false -> + [] + end. + +-else. + +enterprise_fields_actions() -> []. + +-endif. + +%%====================================================================================== +%% HOCON Schema Callbacks +%%====================================================================================== + +namespace() -> "bridge_v2". + +tags() -> + [<<"Bridge V2">>]. + +roots() -> [{bridges_v2, ?HOCON(?R_REF(bridges_v2), #{importance => ?IMPORTANCE_LOW})}]. + +fields(bridges_v2) -> + [] ++ enterprise_fields_actions(). + +desc(_) -> + undefined. diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 5972ba323..f2803651f 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -161,6 +161,14 @@ fields("config_consumer") -> fields(kafka_consumer); fields(kafka_producer) -> fields("config") ++ fields(producer_opts); +fields(kafka_producer_action) -> + [ + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {connector, + mk(binary(), #{ + desc => ?DESC(emqx_connector_schema, "connector_field"), required => true + })} + ] ++ fields(producer_opts); fields(kafka_consumer) -> fields("config") ++ fields(consumer_opts); fields("config") -> @@ -478,6 +486,8 @@ desc("put_" ++ Type) when Type =:= "consumer"; Type =:= "producer" -> ["Configuration for Kafka using `PUT` method."]; desc("post_" ++ Type) when Type =:= "consumer"; Type =:= "producer" -> ["Configuration for Kafka using `POST` method."]; +desc(kafka_producer_action) -> + ?DESC("kafka_producer_action"); desc(Name) -> lists:member(Name, struct_names()) orelse throw({missing_desc, Name}), ?DESC(Name). diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 749250306..12198b211 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -16,7 +16,11 @@ on_stop/2, on_query/3, on_query_async/4, - on_get_status/2 + on_get_status/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 ]). -export([ @@ -27,9 +31,10 @@ -include_lib("emqx/include/logger.hrl"). %% Allocatable resources --define(kafka_resource_id, kafka_resource_id). +-define(kafka_telementry_id, kafka_telementry_id). -define(kafka_client_id, kafka_client_id). -define(kafka_producers, kafka_producers). +-define(CONNECTOR_TYPE, kafka). query_mode(#{kafka := #{query_mode := sync}}) -> simple_sync_internal_buffer; @@ -38,32 +43,22 @@ query_mode(_) -> callback_mode() -> async_if_possible. -%% @doc Config schema is defined in emqx_bridge_kafka. -on_start(InstId, Config) -> +%% @doc Config schema is defined in emqx_connector_kafka. +on_start(<<"connector:", _/binary>> = InstId, Config) -> #{ authentication := Auth, bootstrap_hosts := Hosts0, - bridge_name := BridgeName, - bridge_type := BridgeType, + connector_name := ConnectorName, connect_timeout := ConnTimeout, - kafka := KafkaConfig = #{ - message := MessageTemplate, - topic := KafkaTopic, - sync_query_timeout := SyncQueryTimeout - }, metadata_request_timeout := MetaReqTimeout, min_metadata_refresh_interval := MinMetaRefreshInterval, socket_opts := SocketOpts, ssl := SSL } = Config, - KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)), - KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])), - KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none), - ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), - ok = emqx_resource:allocate_resource(InstId, ?kafka_resource_id, ResourceId), - _ = maybe_install_wolff_telemetry_handlers(ResourceId), + ConnectorType = ?CONNECTOR_TYPE, + ResourceId = emqx_connector_resource:resource_id(ConnectorType, ConnectorName), Hosts = emqx_bridge_kafka_impl:hosts(Hosts0), - ClientId = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName), + ClientId = emqx_bridge_kafka_impl:make_client_id(?CONNECTOR_TYPE, ConnectorName), ok = emqx_resource:allocate_resource(InstId, ?kafka_client_id, ClientId), ClientConfig = #{ min_metadata_refresh_interval => MinMetaRefreshInterval, @@ -74,12 +69,6 @@ on_start(InstId, Config) -> sasl => emqx_bridge_kafka_impl:sasl(Auth), ssl => ssl(SSL) }, - case do_get_topic_status(Hosts, KafkaConfig, KafkaTopic) of - unhealthy_target -> - throw(unhealthy_target); - _ -> - ok - end, case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of {ok, _} -> ?SLOG(info, #{ @@ -97,7 +86,51 @@ on_start(InstId, Config) -> throw(failed_to_start_kafka_client) end, %% Check if this is a dry run - TestIdStart = string:find(InstId, ?TEST_ID_PREFIX), + {ok, #{ + client_id => ClientId, + resource_id => ResourceId, + hosts => Hosts, + installed_bridge_v2s => #{} + }}. + +on_add_channel( + InstId, + #{ + client_id := ClientId, + hosts := Hosts, + installed_bridge_v2s := InstalledBridgeV2s + } = OldState, + BridgeV2Id, + BridgeV2Config +) -> + %% The following will throw an exception if the bridge producers fails to start + {ok, BridgeV2State} = create_producers_for_bridge_v2( + InstId, BridgeV2Id, ClientId, Hosts, BridgeV2Config + ), + NewInstalledBridgeV2s = maps:put(BridgeV2Id, BridgeV2State, InstalledBridgeV2s), + %% Update state + NewState = OldState#{installed_bridge_v2s => NewInstalledBridgeV2s}, + {ok, NewState}. + +create_producers_for_bridge_v2( + InstId, + BridgeV2Id, + ClientId, + Hosts, + #{ + bridge_type := BridgeType, + kafka := #{ + message := MessageTemplate, + topic := KafkaTopic, + sync_query_timeout := SyncQueryTimeout + } = KafkaConfig + } +) -> + KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)), + KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])), + KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none), + {_BridgeType, BridgeName} = emqx_bridge_v2:parse_id(BridgeV2Id), + TestIdStart = string:find(BridgeV2Id, ?TEST_ID_PREFIX), IsDryRun = case TestIdStart of nomatch -> @@ -105,18 +138,24 @@ on_start(InstId, Config) -> _ -> string:equal(TestIdStart, InstId) end, - WolffProducerConfig = producers_config(BridgeType, BridgeName, ClientId, KafkaConfig, IsDryRun), + ok = check_topic_status(Hosts, KafkaConfig, KafkaTopic), + ok = check_if_healthy_leaders(ClientId, KafkaTopic), + WolffProducerConfig = producers_config(BridgeType, BridgeName, ClientId, KafkaConfig, IsDryRun, BridgeV2Id), case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of {ok, Producers} -> - ok = emqx_resource:allocate_resource(InstId, ?kafka_producers, Producers), + ok = emqx_resource:allocate_resource(InstId, {?kafka_producers, BridgeV2Id}, Producers), + ok = emqx_resource:allocate_resource( + InstId, {?kafka_telementry_id, BridgeV2Id}, BridgeV2Id + ), + _ = maybe_install_wolff_telemetry_handlers(BridgeV2Id), {ok, #{ message_template => compile_message_template(MessageTemplate), client_id => ClientId, kafka_topic => KafkaTopic, producers => Producers, - resource_id => ResourceId, + resource_id => BridgeV2Id, + connector_resource_id => InstId, sync_query_timeout => SyncQueryTimeout, - hosts => Hosts, kafka_config => KafkaConfig, headers_tokens => KafkaHeadersTokens, ext_headers_tokens => KafkaExtHeadersTokens, @@ -130,20 +169,6 @@ on_start(InstId, Config) -> kafka_topic => KafkaTopic, reason => Reason2 }), - %% Need to stop the already running client; otherwise, the - %% next `on_start' call will try to ensure the client - %% exists and it will be already present and using the old - %% config. This is specially bad if the original crash - %% was due to misconfiguration and we are trying to fix - %% it... - _ = with_log_at_error( - fun() -> wolff:stop_and_delete_supervised_client(ClientId) end, - #{ - msg => "failed_to_delete_kafka_client", - client_id => ClientId - } - ), - throw( "Failed to start Kafka client. Please check the logs for errors and check" " the connection parameters." @@ -151,68 +176,94 @@ on_start(InstId, Config) -> end. on_stop(InstanceId, _State) -> - case emqx_resource:get_allocated_resources(InstanceId) of - #{ - ?kafka_client_id := ClientId, - ?kafka_producers := Producers, - ?kafka_resource_id := ResourceId - } -> - _ = with_log_at_error( - fun() -> wolff:stop_and_delete_supervised_producers(Producers) end, - #{ - msg => "failed_to_delete_kafka_producer", - client_id => ClientId - } - ), - _ = with_log_at_error( - fun() -> wolff:stop_and_delete_supervised_client(ClientId) end, - #{ - msg => "failed_to_delete_kafka_client", - client_id => ClientId - } - ), - _ = with_log_at_error( - fun() -> uninstall_telemetry_handlers(ResourceId) end, - #{ - msg => "failed_to_uninstall_telemetry_handlers", - resource_id => ResourceId - } - ), + AllocatedResources = emqx_resource:get_allocated_resources(InstanceId), + ClientId = maps:get(?kafka_client_id, AllocatedResources, undefined), + case ClientId of + undefined -> ok; - #{?kafka_client_id := ClientId, ?kafka_resource_id := ResourceId} -> - _ = with_log_at_error( - fun() -> wolff:stop_and_delete_supervised_client(ClientId) end, - #{ - msg => "failed_to_delete_kafka_client", - client_id => ClientId - } - ), - _ = with_log_at_error( - fun() -> uninstall_telemetry_handlers(ResourceId) end, - #{ - msg => "failed_to_uninstall_telemetry_handlers", - resource_id => ResourceId - } - ), - ok; - #{?kafka_resource_id := ResourceId} -> - _ = with_log_at_error( - fun() -> uninstall_telemetry_handlers(ResourceId) end, - #{ - msg => "failed_to_uninstall_telemetry_handlers", - resource_id => ResourceId - } - ), - ok; - _ -> - ok + ClientId -> + deallocate_client(ClientId) end, - ?tp(kafka_producer_stopped, #{instance_id => InstanceId}), + maps:foreach( + fun + ({?kafka_producers, _BridgeV2Id}, Producers) -> + deallocate_producers(ClientId, Producers); + ({?kafka_telementry_id, _BridgeV2Id}, TelementryId) -> + deallocate_telementry_handlers(TelementryId); + (_, _) -> + ok + end, + AllocatedResources + ), ok. +deallocate_client(ClientId) -> + _ = with_log_at_error( + fun() -> wolff:stop_and_delete_supervised_client(ClientId) end, + #{ + msg => "failed_to_delete_kafka_client", + client_id => ClientId + } + ). + +deallocate_producers(ClientId, Producers) -> + _ = with_log_at_error( + fun() -> wolff:stop_and_delete_supervised_producers(Producers) end, + #{ + msg => "failed_to_delete_kafka_producer", + client_id => ClientId + } + ). + +deallocate_telementry_handlers(TelementryId) -> + _ = with_log_at_error( + fun() -> uninstall_telemetry_handlers(TelementryId) end, + #{ + msg => "failed_to_uninstall_telemetry_handlers", + resource_id => TelementryId + } + ). + +remove_producers_for_bridge_v2( + InstId, BridgeV2Id +) -> + AllocatedResources = emqx_resource:get_allocated_resources(InstId), + ClientId = maps:get(?kafka_client_id, AllocatedResources, no_client_id), + maps:foreach( + fun + ({?kafka_producers, BridgeV2IdCheck}, Producers) when BridgeV2IdCheck =:= BridgeV2Id -> + deallocate_producers(ClientId, Producers); + ({?kafka_telementry_id, BridgeV2IdCheck}, TelementryId) when + BridgeV2IdCheck =:= BridgeV2Id + -> + deallocate_telementry_handlers(TelementryId); + (_, _) -> + ok + end, + AllocatedResources + ), + ok. + +on_remove_channel( + InstId, + #{ + client_id := _ClientId, + hosts := _Hosts, + installed_bridge_v2s := InstalledBridgeV2s + } = OldState, + BridgeV2Id +) -> + ok = remove_producers_for_bridge_v2(InstId, BridgeV2Id), + NewInstalledBridgeV2s = maps:remove(BridgeV2Id, InstalledBridgeV2s), + %% Update state + NewState = OldState#{installed_bridge_v2s => NewInstalledBridgeV2s}, + {ok, NewState}. + on_query( InstId, - {send_message, Message}, + {MessageTag, Message}, + #{installed_bridge_v2s := BridgeV2Configs} = _ConnectorState +) -> #{ message_template := Template, producers := Producers, @@ -220,8 +271,7 @@ on_query( headers_tokens := KafkaHeadersTokens, ext_headers_tokens := KafkaExtHeadersTokens, headers_val_encode_mode := KafkaHeadersValEncodeMode - } -) -> + } = maps:get(MessageTag, BridgeV2Configs), KafkaHeaders = #{ headers_tokens => KafkaHeadersTokens, ext_headers_tokens => KafkaExtHeadersTokens, @@ -257,6 +307,9 @@ on_query( {error, {unrecoverable_error, Error}} end. +on_get_channels(ResId) -> + emqx_bridge_v2:get_channels_for_connector(ResId). + %% @doc The callback API for rule-engine (or bridge without rules) %% The input argument `Message' is an enriched format (as a map()) %% of the original #message{} record. @@ -265,16 +318,17 @@ on_query( %% or the direct mapping from an MQTT message. on_query_async( InstId, - {send_message, Message}, + {MessageTag, Message}, AsyncReplyFn, + #{installed_bridge_v2s := BridgeV2Configs} = _ConnectorState +) -> #{ message_template := Template, producers := Producers, headers_tokens := KafkaHeadersTokens, ext_headers_tokens := KafkaExtHeadersTokens, headers_val_encode_mode := KafkaHeadersValEncodeMode - } -) -> + } = maps:get(MessageTag, BridgeV2Configs), KafkaHeaders = #{ headers_tokens => KafkaHeadersTokens, ext_headers_tokens => KafkaExtHeadersTokens, @@ -399,60 +453,116 @@ on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) -> %% Note: since wolff client has its own replayq that is not managed by %% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here. Otherwise, %% `emqx_resource_manager' will kill the wolff producers and messages might be lost. -on_get_status(_InstId, #{client_id := ClientId} = State) -> +on_get_status( + <<"connector:", _/binary>> = _InstId, + #{client_id := ClientId} = _State +) -> case wolff_client_sup:find_client(ClientId) of {ok, Pid} -> - case do_get_status(Pid, State) of + case wolff_client:check_connectivity(Pid) of ok -> connected; - unhealthy_target -> {disconnected, State, unhealthy_target}; - error -> connecting + {error, Error} -> {connecting, Error} end; {error, _Reason} -> connecting end. -do_get_status(Client, #{kafka_topic := KafkaTopic, hosts := Hosts, kafka_config := KafkaConfig}) -> - case do_get_topic_status(Hosts, KafkaConfig, KafkaTopic) of - unhealthy_target -> - unhealthy_target; - _ -> - case do_get_healthy_leaders(Client, KafkaTopic) of - [] -> error; - _ -> ok - end +on_get_channel_status( + _ResId, + ChannelId, + #{ + client_id := ClientId, + hosts := Hosts, + installed_bridge_v2s := Channels + } = _State +) -> + ChannelState = maps:get(ChannelId, Channels), + case wolff_client_sup:find_client(ClientId) of + {ok, Pid} -> + case wolff_client:check_connectivity(Pid) of + ok -> + try check_leaders_and_topic(Pid, Hosts, ChannelState) of + ok -> + connected + catch + _ErrorType:Reason -> + {connecting, Reason} + end; + {error, Error} -> + {connecting, Error} + end; + {error, _Reason} -> + connecting end. -do_get_healthy_leaders(Client, KafkaTopic) -> - case wolff_client:get_leader_connections(Client, KafkaTopic) of - {ok, Leaders} -> - %% Kafka is considered healthy as long as any of the partition leader is reachable. - lists:filtermap( - fun({_Partition, Pid}) -> - case is_pid(Pid) andalso erlang:is_process_alive(Pid) of - true -> {true, Pid}; - _ -> false - end - end, - Leaders +check_leaders_and_topic( + Client, + Hosts, + #{ + kafka_config := KafkaConfig, + kafka_topic := KafkaTopic + } = _ChannelState +) -> + check_if_healthy_leaders(Client, KafkaTopic), + check_topic_status(Hosts, KafkaConfig, KafkaTopic). + +check_if_healthy_leaders(Client, KafkaTopic) when is_pid(Client) -> + Leaders = + case wolff_client:get_leader_connections(Client, KafkaTopic) of + {ok, LeadersToCheck} -> + %% Kafka is considered healthy as long as any of the partition leader is reachable. + lists:filtermap( + fun({_Partition, Pid}) -> + case is_pid(Pid) andalso erlang:is_process_alive(Pid) of + true -> {true, Pid}; + _ -> false + end + end, + LeadersToCheck + ); + {error, _} -> + [] + end, + case Leaders of + [] -> + throw( + iolist_to_binary( + io_lib:format("Could not find any healthy partion leader for topic ~s", [ + KafkaTopic + ]) + ) ); - {error, _} -> - [] + _ -> + ok + end; +check_if_healthy_leaders(ClientId, KafkaTopic) -> + case wolff_client_sup:find_client(ClientId) of + {ok, Pid} -> + check_if_healthy_leaders(Pid, KafkaTopic); + {error, _Reason} -> + throw(iolist_to_binary(io_lib:format("Could not find Kafka client: ~p", [ClientId]))) end. -do_get_topic_status(Hosts, KafkaConfig, KafkaTopic) -> +check_topic_status(Hosts, KafkaConfig, KafkaTopic) -> CheckTopicFun = fun() -> wolff_client:check_if_topic_exists(Hosts, KafkaConfig, KafkaTopic) end, try case emqx_utils:nolink_apply(CheckTopicFun, 5_000) of - ok -> ok; - {error, unknown_topic_or_partition} -> unhealthy_target; - _ -> error + ok -> + ok; + {error, unknown_topic_or_partition} -> + throw( + iolist_to_binary(io_lib:format("Unknown topic or partition ~s", [KafkaTopic])) + ); + _ -> + ok end catch - _:_ -> - error + error:_:_ -> + %% Some other error not related to unknown_topic_or_partition + ok end. ssl(#{enable := true} = SSL) -> @@ -460,7 +570,7 @@ ssl(#{enable := true} = SSL) -> ssl(_) -> []. -producers_config(BridgeType, BridgeName, ClientId, Input, IsDryRun) -> +producers_config(BridgeType, BridgeName, ClientId, Input, IsDryRun, BridgeV2Id) -> #{ max_batch_bytes := MaxBatchBytes, compression := Compression, @@ -486,7 +596,6 @@ producers_config(BridgeType, BridgeName, ClientId, Input, IsDryRun) -> disk -> {false, replayq_dir(ClientId)}; hybrid -> {true, replayq_dir(ClientId)} end, - ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName), #{ name => make_producer_name(BridgeType, BridgeName, IsDryRun), partitioner => partitioner(PartitionStrategy), @@ -500,7 +609,7 @@ producers_config(BridgeType, BridgeName, ClientId, Input, IsDryRun) -> max_batch_bytes => MaxBatchBytes, max_send_ahead => MaxInflight - 1, compression => Compression, - telemetry_meta_data => #{bridge_id => ResourceID} + telemetry_meta_data => #{bridge_id => BridgeV2Id} }. %% Wolff API is a batch API. diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index b704fc92c..2e5c115cf 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -58,13 +58,16 @@ groups() -> All = emqx_common_test_helpers:all(?MODULE), [{on_query, All}, {on_query_async, All}]. +test_topic_one_partition() -> + "test-topic-one-partition". + wait_until_kafka_is_up() -> wait_until_kafka_is_up(0). wait_until_kafka_is_up(300) -> ct:fail("Kafka is not up even though we have waited for a while"); wait_until_kafka_is_up(Attempts) -> - KafkaTopic = "test-topic-one-partition", + KafkaTopic = test_topic_one_partition(), case resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0) of {ok, _} -> ok; @@ -297,7 +300,7 @@ kafka_bridge_rest_api_helper(Config) -> end, false = MyKafkaBridgeExists(), %% Create new Kafka bridge - KafkaTopic = "test-topic-one-partition", + KafkaTopic = test_topic_one_partition(), CreateBodyTmp = #{ <<"type">> => <>, <<"name">> => <<"my_kafka_bridge">>, @@ -413,7 +416,7 @@ t_failed_creation_then_fix(Config) -> Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), ResourceId = emqx_bridge_resource:resource_id(Type, Name), BridgeId = emqx_bridge_resource:bridge_id(Type, Name), - KafkaTopic = "test-topic-one-partition", + KafkaTopic = test_topic_one_partition(), WrongConf = config(#{ "authentication" => WrongAuthSettings, "kafka_hosts_string" => HostsString, @@ -478,7 +481,7 @@ t_custom_timestamp(_Config) -> Type = ?BRIDGE_TYPE, Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), ResourceId = emqx_bridge_resource:resource_id(Type, Name), - KafkaTopic = "test-topic-one-partition", + KafkaTopic = test_topic_one_partition(), MQTTTopic = <<"t/local/kafka">>, emqx:subscribe(MQTTTopic), Conf0 = config(#{ @@ -555,7 +558,7 @@ t_send_message_with_headers(Config) -> Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), ResourceId = emqx_bridge_resource:resource_id(Type, Name), BridgeId = emqx_bridge_resource:bridge_id(Type, Name), - KafkaTopic = "test-topic-one-partition", + KafkaTopic = test_topic_one_partition(), Conf = config_with_headers(#{ "authentication" => AuthSettings, "kafka_hosts_string" => HostsString, @@ -715,7 +718,7 @@ t_wrong_headers(_Config) -> Type = ?BRIDGE_TYPE, Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), ResourceId = emqx_bridge_resource:resource_id(Type, Name), - KafkaTopic = "test-topic-one-partition", + KafkaTopic = test_topic_one_partition(), ?assertThrow( { emqx_bridge_schema, @@ -789,7 +792,7 @@ t_wrong_headers_from_message(Config) -> Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), ResourceId = emqx_bridge_resource:resource_id(Type, Name), BridgeId = emqx_bridge_resource:bridge_id(Type, Name), - KafkaTopic = "test-topic-one-partition", + KafkaTopic = test_topic_one_partition(), Conf = config_with_headers(#{ "authentication" => AuthSettings, "kafka_hosts_string" => HostsString, @@ -939,7 +942,7 @@ publish_helper( Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), Type = ?BRIDGE_TYPE, InstId = emqx_bridge_resource:resource_id(Type, Name), - KafkaTopic = "test-topic-one-partition", + KafkaTopic = test_topic_one_partition(), Conf = config( #{ "authentication" => AuthSettings, diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl new file mode 100644 index 000000000..b65694a99 --- /dev/null +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -0,0 +1,212 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_v2_kafka_producer_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("brod/include/brod.hrl"). +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + _ = application:load(emqx_conf), + ok = emqx_common_test_helpers:start_apps(apps_to_start_and_stop()), + application:ensure_all_started(telemetry), + application:ensure_all_started(wolff), + application:ensure_all_started(brod), + emqx_bridge_kafka_impl_producer_SUITE:wait_until_kafka_is_up(), + Config. + +end_per_suite(_Config) -> + emqx_common_test_helpers:stop_apps(apps_to_start_and_stop()). + +apps_to_start_and_stop() -> + [ + emqx, + emqx_conf, + emqx_connector, + emqx_bridge, + emqx_rule_engine + ]. + +t_create_remove_list(_) -> + [] = emqx_bridge_v2:list(), + ConnectorConfig = connector_config(), + {ok, _} = emqx_connector:create(kafka, test_connector, ConnectorConfig), + Config = bridge_v2_config(<<"test_connector">>), + {ok, _Config} = emqx_bridge_v2:create(kafka, test_bridge_v2, Config), + [BridgeV2Info] = emqx_bridge_v2:list(), + #{ + name := <<"test_bridge_v2">>, + type := <<"kafka">>, + raw_config := _RawConfig + } = BridgeV2Info, + {ok, _Config2} = emqx_bridge_v2:create(kafka, test_bridge_v2_2, Config), + 2 = length(emqx_bridge_v2:list()), + {ok, _} = emqx_bridge_v2:remove(kafka, test_bridge_v2), + 1 = length(emqx_bridge_v2:list()), + {ok, _} = emqx_bridge_v2:remove(kafka, test_bridge_v2_2), + [] = emqx_bridge_v2:list(), + emqx_connector:remove(kafka, test_connector), + ok. + +%% Test sending a message to a bridge V2 +t_send_message(_) -> + BridgeV2Config = bridge_v2_config(<<"test_connector2">>), + ConnectorConfig = connector_config(), + {ok, _} = emqx_connector:create(kafka, test_connector2, ConnectorConfig), + {ok, _} = emqx_bridge_v2:create(kafka, test_bridge_v2_1, BridgeV2Config), + %% Use the bridge to send a message + check_send_message_with_bridge(test_bridge_v2_1), + %% Create a few more bridges with the same connector and test them + BridgeNames1 = [ + list_to_atom("test_bridge_v2_" ++ integer_to_list(I)) + || I <- lists:seq(2, 10) + ], + lists:foreach( + fun(BridgeName) -> + {ok, _} = emqx_bridge_v2:create(kafka, BridgeName, BridgeV2Config), + check_send_message_with_bridge(BridgeName) + end, + BridgeNames1 + ), + BridgeNames = [test_bridge_v2_1 | BridgeNames1], + %% Send more messages to the bridges + lists:foreach( + fun(BridgeName) -> + lists:foreach( + fun(_) -> + check_send_message_with_bridge(BridgeName) + end, + lists:seq(1, 10) + ) + end, + BridgeNames + ), + %% Remove all the bridges + lists:foreach( + fun(BridgeName) -> + {ok, _} = emqx_bridge_v2:remove(kafka, BridgeName) + end, + BridgeNames + ), + emqx_connector:remove(kafka, test_connector2), + ok. + +%% Test that we can get the status of the bridge V2 +t_health_check(_) -> + BridgeV2Config = bridge_v2_config(<<"test_connector3">>), + ConnectorConfig = connector_config(), + {ok, _} = emqx_connector:create(kafka, test_connector3, ConnectorConfig), + {ok, _} = emqx_bridge_v2:create(kafka, test_bridge_v2, BridgeV2Config), + connected = emqx_bridge_v2:health_check(kafka, test_bridge_v2), + {ok, _} = emqx_bridge_v2:remove(kafka, test_bridge_v2), + %% Check behaviour when bridge does not exist + {error, bridge_not_found} = emqx_bridge_v2:health_check(kafka, test_bridge_v2), + {ok, _} = emqx_connector:remove(kafka, test_connector3), + ok. + +check_send_message_with_bridge(BridgeName) -> + %% ###################################### + %% Create Kafka message + %% ###################################### + KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), + Partition = 0, + Time = erlang:unique_integer(), + BinTime = integer_to_binary(Time), + Msg = #{ + clientid => BinTime, + payload => <<"payload">>, + timestamp => Time + }, + Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(), + {ok, Offset0} = emqx_bridge_kafka_impl_producer_SUITE:resolve_kafka_offset( + Hosts, KafkaTopic, Partition + ), + %% ###################################### + %% Send message + %% ###################################### + emqx_bridge_v2:send_message(kafka, BridgeName, Msg, #{}), + %% ###################################### + %% Check if message is sent to Kafka + %% ###################################### + {ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset0), + ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg0). + +bridge_v2_config(ConnectorName) -> + #{ + <<"connector">> => ConnectorName, + <<"enable">> => true, + <<"kafka">> => #{ + <<"buffer">> => #{ + <<"memory_overload_protection">> => false, + <<"mode">> => <<"memory">>, + <<"per_partition_limit">> => <<"2GB">>, + <<"segment_bytes">> => <<"100MB">> + }, + <<"compression">> => <<"no_compression">>, + <<"kafka_header_value_encode_mode">> => <<"none">>, + <<"max_batch_bytes">> => <<"896KB">>, + <<"max_inflight">> => 10, + <<"message">> => #{ + <<"key">> => <<"${.clientid}">>, + <<"timestamp">> => <<"${.timestamp}">>, + <<"value">> => <<"${.}">> + }, + <<"partition_count_refresh_interval">> => <<"60s">>, + <<"partition_strategy">> => <<"random">>, + <<"query_mode">> => <<"sync">>, + <<"required_acks">> => <<"all_isr">>, + <<"sync_query_timeout">> => <<"5s">>, + <<"topic">> => emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition() + }, + <<"local_topic">> => <<"kafka_t/#">>, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"15s">> + } + }. + +connector_config() -> + #{ + <<"authentication">> => <<"none">>, + <<"bootstrap_hosts">> => <<"127.0.0.1:9092">>, + <<"connect_timeout">> => <<"5s">>, + <<"enable">> => true, + <<"metadata_request_timeout">> => <<"5s">>, + <<"min_metadata_refresh_interval">> => <<"3s">>, + <<"socket_opts">> => + #{ + <<"recbuf">> => <<"1024KB">>, + <<"sndbuf">> => <<"1024KB">>, + <<"tcp_keepalive">> => <<"none">> + }, + <<"ssl">> => + #{ + <<"ciphers">> => [], + <<"depth">> => 10, + <<"enable">> => false, + <<"hibernate_after">> => <<"5s">>, + <<"log_level">> => <<"notice">>, + <<"reuse_sessions">> => true, + <<"secure_renegotiate">> => true, + <<"verify">> => <<"verify_peer">>, + <<"versions">> => [<<"tlsv1.3">>, <<"tlsv1.2">>] + } + }. diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index 4eab257b8..4e4cf5fa7 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -53,6 +53,8 @@ %% by nodetool to generate app.