diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index 450f3e1b0..add33c31f 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -325,19 +325,20 @@ init_load(SchemaMod, Conf) when is_list(Conf) orelse is_binary(Conf) -> ok = save_schema_mod_and_names(SchemaMod), HasDeprecatedFile = has_deprecated_file(), RawConf0 = load_config_files(HasDeprecatedFile, Conf), - warning_deprecated_root_key(RawConf0), - RawConf1 = + RawConf1 = emqx_connector_schema:transform_old_style_bridges_to_connector_and_actions(RawConf0), + warning_deprecated_root_key(RawConf1), + RawConf2 = case HasDeprecatedFile of true -> - overlay_v0(SchemaMod, RawConf0); + overlay_v0(SchemaMod, RawConf1); false -> - overlay_v1(SchemaMod, RawConf0) + overlay_v1(SchemaMod, RawConf1) end, - RawConf = fill_defaults_for_all_roots(SchemaMod, RawConf1), + RawConf3 = fill_defaults_for_all_roots(SchemaMod, RawConf2), %% check configs against the schema - {AppEnvs, CheckedConf} = check_config(SchemaMod, RawConf, #{}), + {AppEnvs, CheckedConf} = check_config(SchemaMod, RawConf3, #{}), save_to_app_env(AppEnvs), - ok = save_to_config_map(CheckedConf, RawConf), + ok = save_to_config_map(CheckedConf, RawConf3), maybe_init_default_zone(), ok. diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 0f8f39ca2..b7e990ae2 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -65,6 +65,8 @@ import_config/1 ]). +-export([query_opts/1]). + -define(EGRESS_DIR_BRIDGES(T), T == webhook; T == mysql; diff --git a/apps/emqx_bridge/src/emqx_bridge_app.erl b/apps/emqx_bridge/src/emqx_bridge_app.erl index d0dd7da2b..c5f291297 100644 --- a/apps/emqx_bridge/src/emqx_bridge_app.erl +++ b/apps/emqx_bridge/src/emqx_bridge_app.erl @@ -28,21 +28,29 @@ -define(TOP_LELVE_HDLR_PATH, (emqx_bridge:config_key_path())). -define(LEAF_NODE_HDLR_PATH, (emqx_bridge:config_key_path() ++ ['?', '?'])). +-define(TOP_LELVE_HDLR_PATH_BRIDGE_V2, (emqx_bridge_v2:config_key_path())). +-define(LEAF_NODE_HDLR_PATH_BRIDGE_V2, (emqx_bridge_v2:config_key_path() ++ ['?', '?'])). start(_StartType, _StartArgs) -> {ok, Sup} = emqx_bridge_sup:start_link(), ok = ensure_enterprise_schema_loaded(), ok = emqx_bridge:load(), + ok = emqx_bridge_v2:load(), ok = emqx_bridge:load_hook(), ok = emqx_config_handler:add_handler(?LEAF_NODE_HDLR_PATH, ?MODULE), ok = emqx_config_handler:add_handler(?TOP_LELVE_HDLR_PATH, emqx_bridge), + ok = emqx_config_handler:add_handler(?LEAF_NODE_HDLR_PATH_BRIDGE_V2, emqx_bridge_v2), + ok = emqx_config_handler:add_handler(?TOP_LELVE_HDLR_PATH_BRIDGE_V2, emqx_bridge_v2), ?tp(emqx_bridge_app_started, #{}), {ok, Sup}. stop(_State) -> emqx_conf:remove_handler(?LEAF_NODE_HDLR_PATH), emqx_conf:remove_handler(?TOP_LELVE_HDLR_PATH), + emqx_conf:remove_handler(emqx_bridge_v2:config_key_path()), + emqx_conf:remove_handler(?LEAF_NODE_HDLR_PATH_BRIDGE_V2), ok = emqx_bridge:unload(), + ok = emqx_bridge_v2:unload(), ok. -if(?EMQX_RELEASE_EDITION == ee). diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl new file mode 100644 index 000000000..3877ea165 --- /dev/null +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -0,0 +1,484 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_bridge_v2). + +-behaviour(emqx_config_handler). +% -behaviour(emqx_config_backup). + +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/emqx_hooks.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-export([ + load/0, + unload/0, + is_bridge_v2_type/1, + id/2, + id/3, + parse_id/1, + send_message/4, + bridge_v2_type_to_connector_type/1, + is_bridge_v2_id/1, + extract_connector_id_from_bridge_v2_id/1, + is_bridge_v2_installed_in_connector_state/2, + get_channels_for_connector/1 +]). + +%% CRUD API + +-export([ + list/0, + lookup/1, + lookup/2, + get_metrics/2, + config_key_path/0, + disable_enable/3, + create/3, + remove/2, + health_check/2 +]). + +%% Config Update Handler API + +-export([ + post_config_update/5 +]). + +-define(ROOT_KEY, bridges_v2). + +get_channels_for_connector(ConnectorId) -> + {ConnectorType, ConnectorName} = emqx_connector_resource:parse_connector_id(ConnectorId), + RootConf = maps:keys(emqx:get_config([?ROOT_KEY], #{})), + RelevantBridgeV2Types = [ + Type + || Type <- RootConf, + bridge_v2_type_to_connector_type(Type) =:= ConnectorType + ], + lists:flatten([ + get_channels_for_connector(ConnectorName, BridgeV2Type) + || BridgeV2Type <- RelevantBridgeV2Types + ]). + +get_channels_for_connector(ConnectorName, BridgeV2Type) -> + BridgeV2s = emqx:get_config([?ROOT_KEY, BridgeV2Type], #{}), + [ + {id(BridgeV2Type, Name, ConnectorName), Conf} + || {Name, Conf} <- maps:to_list(BridgeV2s), + bin(ConnectorName) =:= maps:get(connector, Conf, no_name) + ]. + +load() -> + % Bridge_V2s = emqx:get_config([?ROOT_KEY], #{}), + % lists:foreach( + % fun({Type, NamedConf}) -> + % lists:foreach( + % fun({Name, Conf}) -> + % install_bridge_v2( + % Type, + % Name, + % Conf + % ) + % end, + % maps:to_list(NamedConf) + % ) + % end, + % maps:to_list(Bridge_V2s) + % ), + ok. + +unload() -> + % Bridge_V2s = emqx:get_config([?ROOT_KEY], #{}), + % lists:foreach( + % fun({Type, NamedConf}) -> + % lists:foreach( + % fun({Name, Conf}) -> + % uninstall_bridge_v2( + % Type, + % Name, + % Conf + % ) + % end, + % maps:to_list(NamedConf) + % ) + % end, + % maps:to_list(Bridge_V2s) + % ), + ok. + +install_bridge_v2( + _BridgeType, + _BridgeName, + #{enable := false} +) -> + ok; +install_bridge_v2( + BridgeV2Type, + BridgeName, + #{connector := ConnectorName} = Config +) -> + CreationOpts = emqx_resource:fetch_creation_opts(Config), + BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName), + %% Create metrics for Bridge V2 + ok = emqx_resource:create_metrics(BridgeV2Id), + %% We might need to create buffer workers for Bridge V2 + case get_query_mode(BridgeV2Type, Config) of + %% the Bridge V2 has built-in buffer, so there is no need for resource workers + simple_sync -> + ok; + simple_async -> + ok; + %% The Bridge V2 is a consumer Bridge V2, so there is no need for resource workers + no_queries -> + ok; + _ -> + %% start resource workers as the query type requires them + ok = emqx_resource_buffer_worker_sup:start_workers(BridgeV2Id, CreationOpts) + end, + %% If there is a running connector, we need to install the Bridge V2 in it + ConnectorId = emqx_connector_resource:resource_id( + bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName + ), + emqx_resource_manager:add_channel(ConnectorId, BridgeV2Id, Config), + ok. + +uninstall_bridge_v2( + _BridgeType, + _BridgeName, + #{enable := false} +) -> + %% Already not installed + ok; +uninstall_bridge_v2( + BridgeV2Type, + BridgeName, + #{connector := ConnectorName} = Config +) -> + BridgeV2Id = id(BridgeV2Type, BridgeName, ConnectorName), + CreationOpts = emqx_resource:fetch_creation_opts(Config), + ok = emqx_resource_buffer_worker_sup:stop_workers(BridgeV2Id, CreationOpts), + ok = emqx_resource:clear_metrics(BridgeV2Id), + %% Deinstall from connector + ConnectorId = emqx_connector_resource:resource_id( + bridge_v2_type_to_connector_type(BridgeV2Type), ConnectorName + ), + emqx_resource_manager:remove_channel(ConnectorId, BridgeV2Id). + +get_query_mode(BridgeV2Type, Config) -> + CreationOpts = emqx_resource:fetch_creation_opts(Config), + ResourceType = emqx_bridge_resource:bridge_to_resource_type(BridgeV2Type), + emqx_resource:query_mode(ResourceType, Config, CreationOpts). + +send_message(BridgeType, BridgeName, Message, QueryOpts0) -> + case lookup(BridgeType, BridgeName) of + #{enable := true} = Config -> + do_send_msg_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config); + #{enable := false} -> + {error, bridge_stopped}; + Error -> + Error + end. + +health_check(BridgeType, BridgeName) -> + case lookup(BridgeType, BridgeName) of + #{ + enable := true, + connector := ConnectorName + } -> + ConnectorId = emqx_connector_resource:resource_id( + bridge_v2_type_to_connector_type(BridgeType), ConnectorName + ), + emqx_resource_manager:channel_health_check( + ConnectorId, id(BridgeType, BridgeName, ConnectorName) + ); + #{enable := false} -> + {error, bridge_stopped}; + Error -> + Error + end. + +% do_send_msg_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config) -> +% BridgeV2Id = emqx_bridge_v2:id(BridgeType, BridgeName), +% ConnectorResourceId = emqx_bridge_v2:extract_connector_id_from_bridge_v2_id(BridgeV2Id), +% try +% case emqx_resource_manager:maybe_install_bridge_v2(ConnectorResourceId, BridgeV2Id) of +% ok -> +% do_send_msg_after_bridge_v2_installed( +% BridgeType, +% BridgeName, +% BridgeV2Id, +% Message, +% QueryOpts0, +% Config +% ); +% InstallError -> +% throw(InstallError) +% end +% catch +% Error:Reason:Stack -> +% Msg = iolist_to_binary( +% io_lib:format( +% "Failed to install bridge_v2 ~p in connector ~p: ~p", +% [BridgeV2Id, ConnectorResourceId, Reason] +% ) +% ), +% ?SLOG(error, #{ +% msg => Msg, +% error => Error, +% reason => Reason, +% stacktrace => Stack +% }) +% end. + +do_send_msg_with_enabled_config( + BridgeType, BridgeName, Message, QueryOpts0, Config +) -> + QueryMode = get_query_mode(BridgeType, Config), + QueryOpts = maps:merge( + emqx_bridge:query_opts(Config), + QueryOpts0#{ + query_mode => QueryMode + } + ), + BridgeV2Id = emqx_bridge_v2:id(BridgeType, BridgeName), + emqx_resource:query(BridgeV2Id, {BridgeV2Id, Message}, QueryOpts). + +parse_id(Id) -> + case binary:split(Id, <<":">>, [global]) of + [Type, Name] -> + {Type, Name}; + [<<"bridge_v2">>, Type, Name | _] -> + {Type, Name}; + _X -> + error({error, iolist_to_binary(io_lib:format("Invalid id: ~p", [Id]))}) + end. + +id(BridgeType, BridgeName) -> + case lookup(BridgeType, BridgeName) of + #{connector := ConnectorName} -> + id(BridgeType, BridgeName, ConnectorName); + Error -> + error(Error) + end. + +id(BridgeType, BridgeName, ConnectorName) -> + ConnectorType = bin(bridge_v2_type_to_connector_type(BridgeType)), + <<"bridge_v2:", (bin(BridgeType))/binary, ":", (bin(BridgeName))/binary, ":connector:", + (bin(ConnectorType))/binary, ":", (bin(ConnectorName))/binary>>. + +bridge_v2_type_to_connector_type(kafka) -> + kafka. + +is_bridge_v2_type(kafka) -> true; +is_bridge_v2_type(_) -> false. + +is_bridge_v2_id(<<"bridge_v2:", _/binary>>) -> true; +is_bridge_v2_id(_) -> false. + +extract_connector_id_from_bridge_v2_id(Id) -> + case binary:split(Id, <<":">>, [global]) of + [<<"bridge_v2">>, _Type, _Name, <<"connector">>, ConnectorType, ConnecorName] -> + <<"connector:", ConnectorType/binary, ":", ConnecorName/binary>>; + _X -> + error({error, iolist_to_binary(io_lib:format("Invalid bridge V2 ID: ~p", [Id]))}) + end. + +bin(Bin) when is_binary(Bin) -> Bin; +bin(Str) when is_list(Str) -> list_to_binary(Str); +bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). + +%% Basic CRUD Operations + +list() -> + maps:fold( + fun(Type, NameAndConf, Bridges) -> + maps:fold( + fun(Name, RawConf, Acc) -> + [ + #{ + type => Type, + name => Name, + raw_config => RawConf + } + | Acc + ] + end, + Bridges, + NameAndConf + ) + end, + [], + emqx:get_raw_config([?ROOT_KEY], #{}) + ). + +lookup(Id) -> + {Type, Name} = parse_id(Id), + lookup(Type, Name). + +lookup(Type, Name) -> + case emqx:get_config([?ROOT_KEY, Type, Name], not_found) of + not_found -> + {error, bridge_not_found}; + Config -> + Config + end. + +get_metrics(Type, Name) -> + emqx_resource:get_metrics(id(Type, Name)). + +config_key_path() -> + [?ROOT_KEY]. + +disable_enable(Action, BridgeType, BridgeName) when + Action =:= disable; Action =:= enable +-> + emqx_conf:update( + config_key_path() ++ [BridgeType, BridgeName], + {Action, BridgeType, BridgeName}, + #{override_to => cluster} + ). + +create(BridgeType, BridgeName, RawConf) -> + ?SLOG(debug, #{ + brige_action => create, + bridge_version => 2, + bridge_type => BridgeType, + bridge_name => BridgeName, + bridge_raw_config => emqx_utils:redact(RawConf) + }), + emqx_conf:update( + config_key_path() ++ [BridgeType, BridgeName], + RawConf, + #{override_to => cluster} + ). + +remove(BridgeType, BridgeName) -> + ?SLOG(debug, #{ + brige_action => remove, + bridge_version => 2, + bridge_type => BridgeType, + bridge_name => BridgeName + }), + emqx_conf:remove( + config_key_path() ++ [BridgeType, BridgeName], + #{override_to => cluster} + ). + +%% This top level handler will be triggered when the bridges_v2 path is updated +%% with calls to emqx_conf:update([bridges_v2], BridgesConf, #{}). +%% +%% A public API that can trigger this is: +%% bin/emqx ctl conf load data/configs/cluster.hocon +post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) -> + #{added := Added, removed := Removed, changed := Updated} = + diff_confs(NewConf, OldConf), + %% The config update will be failed if any task in `perform_bridge_changes` failed. + RemoveFun = fun uninstall_bridge_v2/3, + CreateFun = fun install_bridge_v2/3, + UpdateFun = fun(Type, Name, {OldBridgeConf, Conf}) -> + uninstall_bridge_v2(Type, Name, OldBridgeConf), + install_bridge_v2(Type, Name, Conf) + end, + Result = perform_bridge_changes([ + #{action => RemoveFun, data => Removed}, + #{ + action => CreateFun, + data => Added, + on_exception_fn => fun emqx_bridge_resource:remove/4 + }, + #{action => UpdateFun, data => Updated} + ]), + ?tp(bridge_post_config_update_done, #{}), + Result; +post_config_update([?ROOT_KEY, BridgeType, BridgeName], '$remove', _, _OldConf, _AppEnvs) -> + Conf = emqx:get_config([?ROOT_KEY, BridgeType, BridgeName]), + ok = uninstall_bridge_v2(BridgeType, BridgeName, Conf), + ?tp(bridge_post_config_update_done, #{}), + ok; +post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, undefined, _AppEnvs) -> + ok = install_bridge_v2(BridgeType, BridgeName, NewConf), + ?tp(bridge_post_config_update_done, #{}), + ok; +post_config_update([?ROOT_KEY, BridgeType, BridgeName], _Req, NewConf, OldConf, _AppEnvs) -> + ok = uninstall_bridge_v2(BridgeType, BridgeName, OldConf), + ok = install_bridge_v2(BridgeType, BridgeName, NewConf), + ?tp(bridge_post_config_update_done, #{}), + ok. + +diff_confs(NewConfs, OldConfs) -> + emqx_utils_maps:diff_maps( + flatten_confs(NewConfs), + flatten_confs(OldConfs) + ). + +flatten_confs(Conf0) -> + maps:from_list( + lists:flatmap( + fun({Type, Conf}) -> + do_flatten_confs(Type, Conf) + end, + maps:to_list(Conf0) + ) + ). + +do_flatten_confs(Type, Conf0) -> + [{{Type, Name}, Conf} || {Name, Conf} <- maps:to_list(Conf0)]. + +perform_bridge_changes(Tasks) -> + perform_bridge_changes(Tasks, ok). + +perform_bridge_changes([], Result) -> + Result; +perform_bridge_changes([#{action := Action, data := MapConfs} = Task | Tasks], Result0) -> + OnException = maps:get(on_exception_fn, Task, fun(_Type, _Name, _Conf, _Opts) -> ok end), + Result = maps:fold( + fun + ({_Type, _Name}, _Conf, {error, Reason}) -> + {error, Reason}; + %% for update + ({Type, Name}, {OldConf, Conf}, _) -> + case Action(Type, Name, {OldConf, Conf}) of + {error, Reason} -> {error, Reason}; + Return -> Return + end; + ({Type, Name}, Conf, _) -> + try Action(Type, Name, Conf) of + {error, Reason} -> {error, Reason}; + Return -> Return + catch + Kind:Error:Stacktrace -> + ?SLOG(error, #{ + msg => "bridge_config_update_exception", + kind => Kind, + error => Error, + type => Type, + name => Name, + stacktrace => Stacktrace + }), + OnException(Type, Name, Conf), + erlang:raise(Kind, Error, Stacktrace) + end + end, + Result0, + MapConfs + ), + perform_bridge_changes(Tasks, Result). + +is_bridge_v2_installed_in_connector_state(Tag, State) when is_map(State) -> + BridgeV2s = maps:get(installed_bridge_v2s, State, #{}), + maps:is_key(Tag, BridgeV2s); +is_bridge_v2_installed_in_connector_state(_Tag, _State) -> + false. diff --git a/apps/emqx_bridge/src/schema/emqx_action_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_action_enterprise.erl new file mode 100644 index 000000000..f19b4302a --- /dev/null +++ b/apps/emqx_bridge/src/schema/emqx_action_enterprise.erl @@ -0,0 +1,32 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_action_enterprise). + +-if(?EMQX_RELEASE_EDITION == ee). + +-include_lib("hocon/include/hoconsc.hrl"). +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-export([ + fields/1 +]). + +fields(bridges_v2) -> + kafka_structs(). + +kafka_structs() -> + [ + {kafka, + mk( + hoconsc:map(name, ref(emqx_bridge_kafka, kafka_producer_action)), + #{ + desc => <<"Kafka Producer Bridge V2 Config">>, + required => false + } + )} + ]. + +-else. + +-endif. diff --git a/apps/emqx_bridge/src/schema/emqx_action_schema.erl b/apps/emqx_bridge/src/schema/emqx_action_schema.erl new file mode 100644 index 000000000..ce8f5d6a4 --- /dev/null +++ b/apps/emqx_bridge/src/schema/emqx_action_schema.erl @@ -0,0 +1,60 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_action_schema). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-import(hoconsc, [mk/2, ref/2]). + +-export([roots/0, fields/1, desc/1, namespace/0, tags/0]). + +-if(?EMQX_RELEASE_EDITION == ee). + +enterprise_fields_actions() -> + %% We *must* do this to ensure the module is really loaded, especially when we use + %% `call_hocon' from `nodetool' to generate initial configurations. + _ = emqx_action_enterprise:module_info(), + case erlang:function_exported(emqx_action_enterprise, fields, 1) of + true -> + emqx_action_enterprise:fields(bridges_v2); + false -> + [] + end. + +-else. + +enterprise_fields_actions() -> []. + +-endif. + +%%====================================================================================== +%% HOCON Schema Callbacks +%%====================================================================================== + +namespace() -> "bridge_v2". + +tags() -> + [<<"Bridge V2">>]. + +roots() -> [{bridges_v2, ?HOCON(?R_REF(bridges_v2), #{importance => ?IMPORTANCE_LOW})}]. + +fields(bridges_v2) -> + [] ++ enterprise_fields_actions(). + +desc(_) -> + undefined. diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 5972ba323..f2803651f 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -161,6 +161,14 @@ fields("config_consumer") -> fields(kafka_consumer); fields(kafka_producer) -> fields("config") ++ fields(producer_opts); +fields(kafka_producer_action) -> + [ + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {connector, + mk(binary(), #{ + desc => ?DESC(emqx_connector_schema, "connector_field"), required => true + })} + ] ++ fields(producer_opts); fields(kafka_consumer) -> fields("config") ++ fields(consumer_opts); fields("config") -> @@ -478,6 +486,8 @@ desc("put_" ++ Type) when Type =:= "consumer"; Type =:= "producer" -> ["Configuration for Kafka using `PUT` method."]; desc("post_" ++ Type) when Type =:= "consumer"; Type =:= "producer" -> ["Configuration for Kafka using `POST` method."]; +desc(kafka_producer_action) -> + ?DESC("kafka_producer_action"); desc(Name) -> lists:member(Name, struct_names()) orelse throw({missing_desc, Name}), ?DESC(Name). diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 749250306..12198b211 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -16,7 +16,11 @@ on_stop/2, on_query/3, on_query_async/4, - on_get_status/2 + on_get_status/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 ]). -export([ @@ -27,9 +31,10 @@ -include_lib("emqx/include/logger.hrl"). %% Allocatable resources --define(kafka_resource_id, kafka_resource_id). +-define(kafka_telementry_id, kafka_telementry_id). -define(kafka_client_id, kafka_client_id). -define(kafka_producers, kafka_producers). +-define(CONNECTOR_TYPE, kafka). query_mode(#{kafka := #{query_mode := sync}}) -> simple_sync_internal_buffer; @@ -38,32 +43,22 @@ query_mode(_) -> callback_mode() -> async_if_possible. -%% @doc Config schema is defined in emqx_bridge_kafka. -on_start(InstId, Config) -> +%% @doc Config schema is defined in emqx_connector_kafka. +on_start(<<"connector:", _/binary>> = InstId, Config) -> #{ authentication := Auth, bootstrap_hosts := Hosts0, - bridge_name := BridgeName, - bridge_type := BridgeType, + connector_name := ConnectorName, connect_timeout := ConnTimeout, - kafka := KafkaConfig = #{ - message := MessageTemplate, - topic := KafkaTopic, - sync_query_timeout := SyncQueryTimeout - }, metadata_request_timeout := MetaReqTimeout, min_metadata_refresh_interval := MinMetaRefreshInterval, socket_opts := SocketOpts, ssl := SSL } = Config, - KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)), - KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])), - KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none), - ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), - ok = emqx_resource:allocate_resource(InstId, ?kafka_resource_id, ResourceId), - _ = maybe_install_wolff_telemetry_handlers(ResourceId), + ConnectorType = ?CONNECTOR_TYPE, + ResourceId = emqx_connector_resource:resource_id(ConnectorType, ConnectorName), Hosts = emqx_bridge_kafka_impl:hosts(Hosts0), - ClientId = emqx_bridge_kafka_impl:make_client_id(BridgeType, BridgeName), + ClientId = emqx_bridge_kafka_impl:make_client_id(?CONNECTOR_TYPE, ConnectorName), ok = emqx_resource:allocate_resource(InstId, ?kafka_client_id, ClientId), ClientConfig = #{ min_metadata_refresh_interval => MinMetaRefreshInterval, @@ -74,12 +69,6 @@ on_start(InstId, Config) -> sasl => emqx_bridge_kafka_impl:sasl(Auth), ssl => ssl(SSL) }, - case do_get_topic_status(Hosts, KafkaConfig, KafkaTopic) of - unhealthy_target -> - throw(unhealthy_target); - _ -> - ok - end, case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of {ok, _} -> ?SLOG(info, #{ @@ -97,7 +86,51 @@ on_start(InstId, Config) -> throw(failed_to_start_kafka_client) end, %% Check if this is a dry run - TestIdStart = string:find(InstId, ?TEST_ID_PREFIX), + {ok, #{ + client_id => ClientId, + resource_id => ResourceId, + hosts => Hosts, + installed_bridge_v2s => #{} + }}. + +on_add_channel( + InstId, + #{ + client_id := ClientId, + hosts := Hosts, + installed_bridge_v2s := InstalledBridgeV2s + } = OldState, + BridgeV2Id, + BridgeV2Config +) -> + %% The following will throw an exception if the bridge producers fails to start + {ok, BridgeV2State} = create_producers_for_bridge_v2( + InstId, BridgeV2Id, ClientId, Hosts, BridgeV2Config + ), + NewInstalledBridgeV2s = maps:put(BridgeV2Id, BridgeV2State, InstalledBridgeV2s), + %% Update state + NewState = OldState#{installed_bridge_v2s => NewInstalledBridgeV2s}, + {ok, NewState}. + +create_producers_for_bridge_v2( + InstId, + BridgeV2Id, + ClientId, + Hosts, + #{ + bridge_type := BridgeType, + kafka := #{ + message := MessageTemplate, + topic := KafkaTopic, + sync_query_timeout := SyncQueryTimeout + } = KafkaConfig + } +) -> + KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)), + KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])), + KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none), + {_BridgeType, BridgeName} = emqx_bridge_v2:parse_id(BridgeV2Id), + TestIdStart = string:find(BridgeV2Id, ?TEST_ID_PREFIX), IsDryRun = case TestIdStart of nomatch -> @@ -105,18 +138,24 @@ on_start(InstId, Config) -> _ -> string:equal(TestIdStart, InstId) end, - WolffProducerConfig = producers_config(BridgeType, BridgeName, ClientId, KafkaConfig, IsDryRun), + ok = check_topic_status(Hosts, KafkaConfig, KafkaTopic), + ok = check_if_healthy_leaders(ClientId, KafkaTopic), + WolffProducerConfig = producers_config(BridgeType, BridgeName, ClientId, KafkaConfig, IsDryRun, BridgeV2Id), case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of {ok, Producers} -> - ok = emqx_resource:allocate_resource(InstId, ?kafka_producers, Producers), + ok = emqx_resource:allocate_resource(InstId, {?kafka_producers, BridgeV2Id}, Producers), + ok = emqx_resource:allocate_resource( + InstId, {?kafka_telementry_id, BridgeV2Id}, BridgeV2Id + ), + _ = maybe_install_wolff_telemetry_handlers(BridgeV2Id), {ok, #{ message_template => compile_message_template(MessageTemplate), client_id => ClientId, kafka_topic => KafkaTopic, producers => Producers, - resource_id => ResourceId, + resource_id => BridgeV2Id, + connector_resource_id => InstId, sync_query_timeout => SyncQueryTimeout, - hosts => Hosts, kafka_config => KafkaConfig, headers_tokens => KafkaHeadersTokens, ext_headers_tokens => KafkaExtHeadersTokens, @@ -130,20 +169,6 @@ on_start(InstId, Config) -> kafka_topic => KafkaTopic, reason => Reason2 }), - %% Need to stop the already running client; otherwise, the - %% next `on_start' call will try to ensure the client - %% exists and it will be already present and using the old - %% config. This is specially bad if the original crash - %% was due to misconfiguration and we are trying to fix - %% it... - _ = with_log_at_error( - fun() -> wolff:stop_and_delete_supervised_client(ClientId) end, - #{ - msg => "failed_to_delete_kafka_client", - client_id => ClientId - } - ), - throw( "Failed to start Kafka client. Please check the logs for errors and check" " the connection parameters." @@ -151,68 +176,94 @@ on_start(InstId, Config) -> end. on_stop(InstanceId, _State) -> - case emqx_resource:get_allocated_resources(InstanceId) of - #{ - ?kafka_client_id := ClientId, - ?kafka_producers := Producers, - ?kafka_resource_id := ResourceId - } -> - _ = with_log_at_error( - fun() -> wolff:stop_and_delete_supervised_producers(Producers) end, - #{ - msg => "failed_to_delete_kafka_producer", - client_id => ClientId - } - ), - _ = with_log_at_error( - fun() -> wolff:stop_and_delete_supervised_client(ClientId) end, - #{ - msg => "failed_to_delete_kafka_client", - client_id => ClientId - } - ), - _ = with_log_at_error( - fun() -> uninstall_telemetry_handlers(ResourceId) end, - #{ - msg => "failed_to_uninstall_telemetry_handlers", - resource_id => ResourceId - } - ), + AllocatedResources = emqx_resource:get_allocated_resources(InstanceId), + ClientId = maps:get(?kafka_client_id, AllocatedResources, undefined), + case ClientId of + undefined -> ok; - #{?kafka_client_id := ClientId, ?kafka_resource_id := ResourceId} -> - _ = with_log_at_error( - fun() -> wolff:stop_and_delete_supervised_client(ClientId) end, - #{ - msg => "failed_to_delete_kafka_client", - client_id => ClientId - } - ), - _ = with_log_at_error( - fun() -> uninstall_telemetry_handlers(ResourceId) end, - #{ - msg => "failed_to_uninstall_telemetry_handlers", - resource_id => ResourceId - } - ), - ok; - #{?kafka_resource_id := ResourceId} -> - _ = with_log_at_error( - fun() -> uninstall_telemetry_handlers(ResourceId) end, - #{ - msg => "failed_to_uninstall_telemetry_handlers", - resource_id => ResourceId - } - ), - ok; - _ -> - ok + ClientId -> + deallocate_client(ClientId) end, - ?tp(kafka_producer_stopped, #{instance_id => InstanceId}), + maps:foreach( + fun + ({?kafka_producers, _BridgeV2Id}, Producers) -> + deallocate_producers(ClientId, Producers); + ({?kafka_telementry_id, _BridgeV2Id}, TelementryId) -> + deallocate_telementry_handlers(TelementryId); + (_, _) -> + ok + end, + AllocatedResources + ), ok. +deallocate_client(ClientId) -> + _ = with_log_at_error( + fun() -> wolff:stop_and_delete_supervised_client(ClientId) end, + #{ + msg => "failed_to_delete_kafka_client", + client_id => ClientId + } + ). + +deallocate_producers(ClientId, Producers) -> + _ = with_log_at_error( + fun() -> wolff:stop_and_delete_supervised_producers(Producers) end, + #{ + msg => "failed_to_delete_kafka_producer", + client_id => ClientId + } + ). + +deallocate_telementry_handlers(TelementryId) -> + _ = with_log_at_error( + fun() -> uninstall_telemetry_handlers(TelementryId) end, + #{ + msg => "failed_to_uninstall_telemetry_handlers", + resource_id => TelementryId + } + ). + +remove_producers_for_bridge_v2( + InstId, BridgeV2Id +) -> + AllocatedResources = emqx_resource:get_allocated_resources(InstId), + ClientId = maps:get(?kafka_client_id, AllocatedResources, no_client_id), + maps:foreach( + fun + ({?kafka_producers, BridgeV2IdCheck}, Producers) when BridgeV2IdCheck =:= BridgeV2Id -> + deallocate_producers(ClientId, Producers); + ({?kafka_telementry_id, BridgeV2IdCheck}, TelementryId) when + BridgeV2IdCheck =:= BridgeV2Id + -> + deallocate_telementry_handlers(TelementryId); + (_, _) -> + ok + end, + AllocatedResources + ), + ok. + +on_remove_channel( + InstId, + #{ + client_id := _ClientId, + hosts := _Hosts, + installed_bridge_v2s := InstalledBridgeV2s + } = OldState, + BridgeV2Id +) -> + ok = remove_producers_for_bridge_v2(InstId, BridgeV2Id), + NewInstalledBridgeV2s = maps:remove(BridgeV2Id, InstalledBridgeV2s), + %% Update state + NewState = OldState#{installed_bridge_v2s => NewInstalledBridgeV2s}, + {ok, NewState}. + on_query( InstId, - {send_message, Message}, + {MessageTag, Message}, + #{installed_bridge_v2s := BridgeV2Configs} = _ConnectorState +) -> #{ message_template := Template, producers := Producers, @@ -220,8 +271,7 @@ on_query( headers_tokens := KafkaHeadersTokens, ext_headers_tokens := KafkaExtHeadersTokens, headers_val_encode_mode := KafkaHeadersValEncodeMode - } -) -> + } = maps:get(MessageTag, BridgeV2Configs), KafkaHeaders = #{ headers_tokens => KafkaHeadersTokens, ext_headers_tokens => KafkaExtHeadersTokens, @@ -257,6 +307,9 @@ on_query( {error, {unrecoverable_error, Error}} end. +on_get_channels(ResId) -> + emqx_bridge_v2:get_channels_for_connector(ResId). + %% @doc The callback API for rule-engine (or bridge without rules) %% The input argument `Message' is an enriched format (as a map()) %% of the original #message{} record. @@ -265,16 +318,17 @@ on_query( %% or the direct mapping from an MQTT message. on_query_async( InstId, - {send_message, Message}, + {MessageTag, Message}, AsyncReplyFn, + #{installed_bridge_v2s := BridgeV2Configs} = _ConnectorState +) -> #{ message_template := Template, producers := Producers, headers_tokens := KafkaHeadersTokens, ext_headers_tokens := KafkaExtHeadersTokens, headers_val_encode_mode := KafkaHeadersValEncodeMode - } -) -> + } = maps:get(MessageTag, BridgeV2Configs), KafkaHeaders = #{ headers_tokens => KafkaHeadersTokens, ext_headers_tokens => KafkaExtHeadersTokens, @@ -399,60 +453,116 @@ on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) -> %% Note: since wolff client has its own replayq that is not managed by %% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here. Otherwise, %% `emqx_resource_manager' will kill the wolff producers and messages might be lost. -on_get_status(_InstId, #{client_id := ClientId} = State) -> +on_get_status( + <<"connector:", _/binary>> = _InstId, + #{client_id := ClientId} = _State +) -> case wolff_client_sup:find_client(ClientId) of {ok, Pid} -> - case do_get_status(Pid, State) of + case wolff_client:check_connectivity(Pid) of ok -> connected; - unhealthy_target -> {disconnected, State, unhealthy_target}; - error -> connecting + {error, Error} -> {connecting, Error} end; {error, _Reason} -> connecting end. -do_get_status(Client, #{kafka_topic := KafkaTopic, hosts := Hosts, kafka_config := KafkaConfig}) -> - case do_get_topic_status(Hosts, KafkaConfig, KafkaTopic) of - unhealthy_target -> - unhealthy_target; - _ -> - case do_get_healthy_leaders(Client, KafkaTopic) of - [] -> error; - _ -> ok - end +on_get_channel_status( + _ResId, + ChannelId, + #{ + client_id := ClientId, + hosts := Hosts, + installed_bridge_v2s := Channels + } = _State +) -> + ChannelState = maps:get(ChannelId, Channels), + case wolff_client_sup:find_client(ClientId) of + {ok, Pid} -> + case wolff_client:check_connectivity(Pid) of + ok -> + try check_leaders_and_topic(Pid, Hosts, ChannelState) of + ok -> + connected + catch + _ErrorType:Reason -> + {connecting, Reason} + end; + {error, Error} -> + {connecting, Error} + end; + {error, _Reason} -> + connecting end. -do_get_healthy_leaders(Client, KafkaTopic) -> - case wolff_client:get_leader_connections(Client, KafkaTopic) of - {ok, Leaders} -> - %% Kafka is considered healthy as long as any of the partition leader is reachable. - lists:filtermap( - fun({_Partition, Pid}) -> - case is_pid(Pid) andalso erlang:is_process_alive(Pid) of - true -> {true, Pid}; - _ -> false - end - end, - Leaders +check_leaders_and_topic( + Client, + Hosts, + #{ + kafka_config := KafkaConfig, + kafka_topic := KafkaTopic + } = _ChannelState +) -> + check_if_healthy_leaders(Client, KafkaTopic), + check_topic_status(Hosts, KafkaConfig, KafkaTopic). + +check_if_healthy_leaders(Client, KafkaTopic) when is_pid(Client) -> + Leaders = + case wolff_client:get_leader_connections(Client, KafkaTopic) of + {ok, LeadersToCheck} -> + %% Kafka is considered healthy as long as any of the partition leader is reachable. + lists:filtermap( + fun({_Partition, Pid}) -> + case is_pid(Pid) andalso erlang:is_process_alive(Pid) of + true -> {true, Pid}; + _ -> false + end + end, + LeadersToCheck + ); + {error, _} -> + [] + end, + case Leaders of + [] -> + throw( + iolist_to_binary( + io_lib:format("Could not find any healthy partion leader for topic ~s", [ + KafkaTopic + ]) + ) ); - {error, _} -> - [] + _ -> + ok + end; +check_if_healthy_leaders(ClientId, KafkaTopic) -> + case wolff_client_sup:find_client(ClientId) of + {ok, Pid} -> + check_if_healthy_leaders(Pid, KafkaTopic); + {error, _Reason} -> + throw(iolist_to_binary(io_lib:format("Could not find Kafka client: ~p", [ClientId]))) end. -do_get_topic_status(Hosts, KafkaConfig, KafkaTopic) -> +check_topic_status(Hosts, KafkaConfig, KafkaTopic) -> CheckTopicFun = fun() -> wolff_client:check_if_topic_exists(Hosts, KafkaConfig, KafkaTopic) end, try case emqx_utils:nolink_apply(CheckTopicFun, 5_000) of - ok -> ok; - {error, unknown_topic_or_partition} -> unhealthy_target; - _ -> error + ok -> + ok; + {error, unknown_topic_or_partition} -> + throw( + iolist_to_binary(io_lib:format("Unknown topic or partition ~s", [KafkaTopic])) + ); + _ -> + ok end catch - _:_ -> - error + error:_:_ -> + %% Some other error not related to unknown_topic_or_partition + ok end. ssl(#{enable := true} = SSL) -> @@ -460,7 +570,7 @@ ssl(#{enable := true} = SSL) -> ssl(_) -> []. -producers_config(BridgeType, BridgeName, ClientId, Input, IsDryRun) -> +producers_config(BridgeType, BridgeName, ClientId, Input, IsDryRun, BridgeV2Id) -> #{ max_batch_bytes := MaxBatchBytes, compression := Compression, @@ -486,7 +596,6 @@ producers_config(BridgeType, BridgeName, ClientId, Input, IsDryRun) -> disk -> {false, replayq_dir(ClientId)}; hybrid -> {true, replayq_dir(ClientId)} end, - ResourceID = emqx_bridge_resource:resource_id(BridgeType, BridgeName), #{ name => make_producer_name(BridgeType, BridgeName, IsDryRun), partitioner => partitioner(PartitionStrategy), @@ -500,7 +609,7 @@ producers_config(BridgeType, BridgeName, ClientId, Input, IsDryRun) -> max_batch_bytes => MaxBatchBytes, max_send_ahead => MaxInflight - 1, compression => Compression, - telemetry_meta_data => #{bridge_id => ResourceID} + telemetry_meta_data => #{bridge_id => BridgeV2Id} }. %% Wolff API is a batch API. diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index b704fc92c..2e5c115cf 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -58,13 +58,16 @@ groups() -> All = emqx_common_test_helpers:all(?MODULE), [{on_query, All}, {on_query_async, All}]. +test_topic_one_partition() -> + "test-topic-one-partition". + wait_until_kafka_is_up() -> wait_until_kafka_is_up(0). wait_until_kafka_is_up(300) -> ct:fail("Kafka is not up even though we have waited for a while"); wait_until_kafka_is_up(Attempts) -> - KafkaTopic = "test-topic-one-partition", + KafkaTopic = test_topic_one_partition(), case resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0) of {ok, _} -> ok; @@ -297,7 +300,7 @@ kafka_bridge_rest_api_helper(Config) -> end, false = MyKafkaBridgeExists(), %% Create new Kafka bridge - KafkaTopic = "test-topic-one-partition", + KafkaTopic = test_topic_one_partition(), CreateBodyTmp = #{ <<"type">> => <>, <<"name">> => <<"my_kafka_bridge">>, @@ -413,7 +416,7 @@ t_failed_creation_then_fix(Config) -> Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), ResourceId = emqx_bridge_resource:resource_id(Type, Name), BridgeId = emqx_bridge_resource:bridge_id(Type, Name), - KafkaTopic = "test-topic-one-partition", + KafkaTopic = test_topic_one_partition(), WrongConf = config(#{ "authentication" => WrongAuthSettings, "kafka_hosts_string" => HostsString, @@ -478,7 +481,7 @@ t_custom_timestamp(_Config) -> Type = ?BRIDGE_TYPE, Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), ResourceId = emqx_bridge_resource:resource_id(Type, Name), - KafkaTopic = "test-topic-one-partition", + KafkaTopic = test_topic_one_partition(), MQTTTopic = <<"t/local/kafka">>, emqx:subscribe(MQTTTopic), Conf0 = config(#{ @@ -555,7 +558,7 @@ t_send_message_with_headers(Config) -> Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), ResourceId = emqx_bridge_resource:resource_id(Type, Name), BridgeId = emqx_bridge_resource:bridge_id(Type, Name), - KafkaTopic = "test-topic-one-partition", + KafkaTopic = test_topic_one_partition(), Conf = config_with_headers(#{ "authentication" => AuthSettings, "kafka_hosts_string" => HostsString, @@ -715,7 +718,7 @@ t_wrong_headers(_Config) -> Type = ?BRIDGE_TYPE, Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), ResourceId = emqx_bridge_resource:resource_id(Type, Name), - KafkaTopic = "test-topic-one-partition", + KafkaTopic = test_topic_one_partition(), ?assertThrow( { emqx_bridge_schema, @@ -789,7 +792,7 @@ t_wrong_headers_from_message(Config) -> Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), ResourceId = emqx_bridge_resource:resource_id(Type, Name), BridgeId = emqx_bridge_resource:bridge_id(Type, Name), - KafkaTopic = "test-topic-one-partition", + KafkaTopic = test_topic_one_partition(), Conf = config_with_headers(#{ "authentication" => AuthSettings, "kafka_hosts_string" => HostsString, @@ -939,7 +942,7 @@ publish_helper( Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), Type = ?BRIDGE_TYPE, InstId = emqx_bridge_resource:resource_id(Type, Name), - KafkaTopic = "test-topic-one-partition", + KafkaTopic = test_topic_one_partition(), Conf = config( #{ "authentication" => AuthSettings, diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl new file mode 100644 index 000000000..b65694a99 --- /dev/null +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -0,0 +1,212 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_v2_kafka_producer_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("brod/include/brod.hrl"). +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + _ = application:load(emqx_conf), + ok = emqx_common_test_helpers:start_apps(apps_to_start_and_stop()), + application:ensure_all_started(telemetry), + application:ensure_all_started(wolff), + application:ensure_all_started(brod), + emqx_bridge_kafka_impl_producer_SUITE:wait_until_kafka_is_up(), + Config. + +end_per_suite(_Config) -> + emqx_common_test_helpers:stop_apps(apps_to_start_and_stop()). + +apps_to_start_and_stop() -> + [ + emqx, + emqx_conf, + emqx_connector, + emqx_bridge, + emqx_rule_engine + ]. + +t_create_remove_list(_) -> + [] = emqx_bridge_v2:list(), + ConnectorConfig = connector_config(), + {ok, _} = emqx_connector:create(kafka, test_connector, ConnectorConfig), + Config = bridge_v2_config(<<"test_connector">>), + {ok, _Config} = emqx_bridge_v2:create(kafka, test_bridge_v2, Config), + [BridgeV2Info] = emqx_bridge_v2:list(), + #{ + name := <<"test_bridge_v2">>, + type := <<"kafka">>, + raw_config := _RawConfig + } = BridgeV2Info, + {ok, _Config2} = emqx_bridge_v2:create(kafka, test_bridge_v2_2, Config), + 2 = length(emqx_bridge_v2:list()), + {ok, _} = emqx_bridge_v2:remove(kafka, test_bridge_v2), + 1 = length(emqx_bridge_v2:list()), + {ok, _} = emqx_bridge_v2:remove(kafka, test_bridge_v2_2), + [] = emqx_bridge_v2:list(), + emqx_connector:remove(kafka, test_connector), + ok. + +%% Test sending a message to a bridge V2 +t_send_message(_) -> + BridgeV2Config = bridge_v2_config(<<"test_connector2">>), + ConnectorConfig = connector_config(), + {ok, _} = emqx_connector:create(kafka, test_connector2, ConnectorConfig), + {ok, _} = emqx_bridge_v2:create(kafka, test_bridge_v2_1, BridgeV2Config), + %% Use the bridge to send a message + check_send_message_with_bridge(test_bridge_v2_1), + %% Create a few more bridges with the same connector and test them + BridgeNames1 = [ + list_to_atom("test_bridge_v2_" ++ integer_to_list(I)) + || I <- lists:seq(2, 10) + ], + lists:foreach( + fun(BridgeName) -> + {ok, _} = emqx_bridge_v2:create(kafka, BridgeName, BridgeV2Config), + check_send_message_with_bridge(BridgeName) + end, + BridgeNames1 + ), + BridgeNames = [test_bridge_v2_1 | BridgeNames1], + %% Send more messages to the bridges + lists:foreach( + fun(BridgeName) -> + lists:foreach( + fun(_) -> + check_send_message_with_bridge(BridgeName) + end, + lists:seq(1, 10) + ) + end, + BridgeNames + ), + %% Remove all the bridges + lists:foreach( + fun(BridgeName) -> + {ok, _} = emqx_bridge_v2:remove(kafka, BridgeName) + end, + BridgeNames + ), + emqx_connector:remove(kafka, test_connector2), + ok. + +%% Test that we can get the status of the bridge V2 +t_health_check(_) -> + BridgeV2Config = bridge_v2_config(<<"test_connector3">>), + ConnectorConfig = connector_config(), + {ok, _} = emqx_connector:create(kafka, test_connector3, ConnectorConfig), + {ok, _} = emqx_bridge_v2:create(kafka, test_bridge_v2, BridgeV2Config), + connected = emqx_bridge_v2:health_check(kafka, test_bridge_v2), + {ok, _} = emqx_bridge_v2:remove(kafka, test_bridge_v2), + %% Check behaviour when bridge does not exist + {error, bridge_not_found} = emqx_bridge_v2:health_check(kafka, test_bridge_v2), + {ok, _} = emqx_connector:remove(kafka, test_connector3), + ok. + +check_send_message_with_bridge(BridgeName) -> + %% ###################################### + %% Create Kafka message + %% ###################################### + KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), + Partition = 0, + Time = erlang:unique_integer(), + BinTime = integer_to_binary(Time), + Msg = #{ + clientid => BinTime, + payload => <<"payload">>, + timestamp => Time + }, + Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(), + {ok, Offset0} = emqx_bridge_kafka_impl_producer_SUITE:resolve_kafka_offset( + Hosts, KafkaTopic, Partition + ), + %% ###################################### + %% Send message + %% ###################################### + emqx_bridge_v2:send_message(kafka, BridgeName, Msg, #{}), + %% ###################################### + %% Check if message is sent to Kafka + %% ###################################### + {ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset0), + ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg0). + +bridge_v2_config(ConnectorName) -> + #{ + <<"connector">> => ConnectorName, + <<"enable">> => true, + <<"kafka">> => #{ + <<"buffer">> => #{ + <<"memory_overload_protection">> => false, + <<"mode">> => <<"memory">>, + <<"per_partition_limit">> => <<"2GB">>, + <<"segment_bytes">> => <<"100MB">> + }, + <<"compression">> => <<"no_compression">>, + <<"kafka_header_value_encode_mode">> => <<"none">>, + <<"max_batch_bytes">> => <<"896KB">>, + <<"max_inflight">> => 10, + <<"message">> => #{ + <<"key">> => <<"${.clientid}">>, + <<"timestamp">> => <<"${.timestamp}">>, + <<"value">> => <<"${.}">> + }, + <<"partition_count_refresh_interval">> => <<"60s">>, + <<"partition_strategy">> => <<"random">>, + <<"query_mode">> => <<"sync">>, + <<"required_acks">> => <<"all_isr">>, + <<"sync_query_timeout">> => <<"5s">>, + <<"topic">> => emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition() + }, + <<"local_topic">> => <<"kafka_t/#">>, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"15s">> + } + }. + +connector_config() -> + #{ + <<"authentication">> => <<"none">>, + <<"bootstrap_hosts">> => <<"127.0.0.1:9092">>, + <<"connect_timeout">> => <<"5s">>, + <<"enable">> => true, + <<"metadata_request_timeout">> => <<"5s">>, + <<"min_metadata_refresh_interval">> => <<"3s">>, + <<"socket_opts">> => + #{ + <<"recbuf">> => <<"1024KB">>, + <<"sndbuf">> => <<"1024KB">>, + <<"tcp_keepalive">> => <<"none">> + }, + <<"ssl">> => + #{ + <<"ciphers">> => [], + <<"depth">> => 10, + <<"enable">> => false, + <<"hibernate_after">> => <<"5s">>, + <<"log_level">> => <<"notice">>, + <<"reuse_sessions">> => true, + <<"secure_renegotiate">> => true, + <<"verify">> => <<"verify_peer">>, + <<"versions">> => [<<"tlsv1.3">>, <<"tlsv1.2">>] + } + }. diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index 4eab257b8..4e4cf5fa7 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -53,6 +53,8 @@ %% by nodetool to generate app.