From d6e9bbb95c5406d88f67cb25819f226b943b1e92 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 13 Nov 2023 15:56:46 -0300 Subject: [PATCH 1/7] fix(connector): validate connector name before converting ssl certs Fixes https://emqx.atlassian.net/browse/EMQX-11336 See also: https://github.com/emqx/emqx/pull/11540 --- apps/emqx_bridge/src/emqx_bridge.erl | 54 ++++++++++---- apps/emqx_bridge/src/emqx_bridge_app.erl | 18 ++++- apps/emqx_bridge/test/emqx_bridge_SUITE.erl | 30 +++++++- .../test/emqx_bridge_api_SUITE.erl | 8 ++- ...qx_bridge_v1_compatibility_layer_SUITE.erl | 31 +++++++- .../emqx_connector/src/emqx_connector.app.src | 2 +- apps/emqx_connector/src/emqx_connector.erl | 70 +++++++++++++++++-- .../test/emqx_connector_SUITE.erl | 65 +++++++++++++++++ .../test/emqx_connector_api_SUITE.erl | 22 ++++++ apps/emqx_resource/src/emqx_resource.erl | 28 +++----- 10 files changed, 283 insertions(+), 45 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 51bdfb084..64bec3a4e 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -55,7 +55,6 @@ ]). -export([config_key_path/0]). --export([validate_bridge_name/1]). %% exported for `emqx_telemetry' -export([get_basic_usage_info/0]). @@ -268,7 +267,12 @@ config_key_path() -> pre_config_update([?ROOT_KEY], RawConf, RawConf) -> {ok, RawConf}; pre_config_update([?ROOT_KEY], NewConf, _RawConf) -> - {ok, convert_certs(NewConf)}. + case multi_validate_bridge_names(NewConf) of + ok -> + {ok, convert_certs(NewConf)}; + Error -> + Error + end. post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) -> #{added := Added, removed := Removed, changed := Updated} = @@ -657,17 +661,13 @@ get_basic_usage_info() -> InitialAcc end. -validate_bridge_name(BridgeName0) -> - BridgeName = to_bin(BridgeName0), - case re:run(BridgeName, ?MAP_KEY_RE, [{capture, none}]) of - match -> - ok; - nomatch -> - {error, #{ - kind => validation_error, - reason => bad_bridge_name, - value => BridgeName - }} +validate_bridge_name(BridgeName) -> + try + _ = emqx_resource:validate_name(to_bin(BridgeName)), + ok + catch + throw:Error -> + {error, Error} end. to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8); @@ -675,3 +675,31 @@ to_bin(B) when is_binary(B) -> B. upgrade_type(Type) -> emqx_bridge_lib:upgrade_type(Type). + +multi_validate_bridge_names(Conf) -> + BridgeTypeAndNames = + [ + {Type, Name} + || {Type, NameToConf} <- maps:to_list(Conf), + {Name, _Conf} <- maps:to_list(NameToConf) + ], + BadBridges = + lists:filtermap( + fun({Type, Name}) -> + case validate_bridge_name(Name) of + ok -> false; + _Error -> {true, #{type => Type, name => Name}} + end + end, + BridgeTypeAndNames + ), + case BadBridges of + [] -> + ok; + [_ | _] -> + {error, #{ + kind => validation_error, + reason => bad_bridge_names, + bad_bridges => BadBridges + }} + end. diff --git a/apps/emqx_bridge/src/emqx_bridge_app.erl b/apps/emqx_bridge/src/emqx_bridge_app.erl index cd54d31e7..321f59f28 100644 --- a/apps/emqx_bridge/src/emqx_bridge_app.erl +++ b/apps/emqx_bridge/src/emqx_bridge_app.erl @@ -63,7 +63,7 @@ pre_config_update(_, {Oper, _Type, _Name}, OldConfig) -> %% to save the 'enable' to the config files {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}}; pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) -> - case validate_bridge_name(Path) of + case validate_bridge_name_in_config(Path) of ok -> case emqx_connector_ssl:convert_certs(filename:join(Path), Conf) of {error, Reason} -> @@ -104,11 +104,23 @@ post_config_update([bridges, BridgeType, BridgeName], _Req, NewConf, OldConf, _A operation_to_enable(disable) -> false; operation_to_enable(enable) -> true. -validate_bridge_name(Path) -> +validate_bridge_name_in_config(Path) -> [RootKey] = emqx_bridge:config_key_path(), case Path of [RootKey, _BridgeType, BridgeName] -> - emqx_bridge:validate_bridge_name(BridgeName); + validate_bridge_name(BridgeName); _ -> ok end. + +to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8); +to_bin(B) when is_binary(B) -> B. + +validate_bridge_name(BridgeName) -> + try + _ = emqx_resource:validate_name(to_bin(BridgeName)), + ok + catch + throw:Error -> + {error, Error} + end. diff --git a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl index 96c3c29ca..b29ba154e 100644 --- a/apps/emqx_bridge/test/emqx_bridge_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_SUITE.erl @@ -199,13 +199,41 @@ t_create_with_bad_name(_Config) -> ?assertMatch( {error, {pre_config_update, emqx_bridge_app, #{ - reason := bad_bridge_name, + reason := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>>, kind := validation_error }}}, emqx:update_config(Path, Conf) ), ok. +t_create_with_bad_name_root(_Config) -> + BadBridgeName = <<"test_哈哈">>, + BridgeConf = #{ + <<"bridge_mode">> => false, + <<"clean_start">> => true, + <<"keepalive">> => <<"60s">>, + <<"proto_ver">> => <<"v4">>, + <<"server">> => <<"127.0.0.1:1883">>, + <<"ssl">> => + #{ + %% needed to trigger pre_config_update + <<"certfile">> => cert_file("certfile"), + <<"enable">> => true + } + }, + Conf = #{<<"mqtt">> => #{BadBridgeName => BridgeConf}}, + Path = [bridges], + ?assertMatch( + {error, + {pre_config_update, _ConfigHandlerMod, #{ + kind := validation_error, + reason := bad_bridge_names, + bad_bridges := [#{type := <<"mqtt">>, name := BadBridgeName}] + }}}, + emqx:update_config(Path, Conf) + ), + ok. + data_file(Name) -> Dir = code:lib_dir(emqx_bridge, test), {ok, Bin} = file:read_file(filename:join([Dir, "data", Name])), diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index c0339660e..99a2bc8cd 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -1362,7 +1362,13 @@ t_create_with_bad_name(Config) -> Config ), Msg = emqx_utils_json:decode(Msg0, [return_maps]), - ?assertMatch(#{<<"reason">> := <<"bad_bridge_name">>}, Msg), + ?assertMatch( + #{ + <<"kind">> := <<"validation_error">>, + <<"reason">> := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>> + }, + Msg + ), ok. validate_resource_request_ttl(single, Timeout, Name) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl index f3b7fb685..c714b858a 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v1_compatibility_layer_SUITE.erl @@ -150,7 +150,8 @@ con_schema() -> fields("connector") -> [ {enable, hoconsc:mk(any(), #{})}, - {resource_opts, hoconsc:mk(map(), #{})} + {resource_opts, hoconsc:mk(map(), #{})}, + {ssl, hoconsc:ref(ssl)} ]; fields("api_post") -> [ @@ -159,7 +160,9 @@ fields("api_post") -> {type, hoconsc:mk(bridge_type(), #{})}, {send_to, hoconsc:mk(atom(), #{})} | fields("connector") - ]. + ]; +fields(ssl) -> + emqx_schema:client_ssl_opts_schema(#{required => false}). con_config() -> #{ @@ -806,3 +809,27 @@ t_scenario_2(Config) -> ?assert(is_rule_enabled(RuleId2)), ok. + +t_create_with_bad_name(_Config) -> + BadBridgeName = <<"test_哈哈">>, + %% Note: must contain SSL options to trigger bug. + Cacertfile = emqx_common_test_helpers:app_path( + emqx, + filename:join(["etc", "certs", "cacert.pem"]) + ), + Opts = #{ + name => BadBridgeName, + overrides => #{ + <<"ssl">> => + #{<<"cacertfile">> => Cacertfile} + } + }, + {error, + {{_, 400, _}, _, #{ + <<"code">> := <<"BAD_REQUEST">>, + <<"message">> := #{ + <<"kind">> := <<"validation_error">>, + <<"reason">> := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>> + } + }}} = create_bridge_http_api_v1(Opts), + ok. diff --git a/apps/emqx_connector/src/emqx_connector.app.src b/apps/emqx_connector/src/emqx_connector.app.src index 7ecabb0ff..d23a36e49 100644 --- a/apps/emqx_connector/src/emqx_connector.app.src +++ b/apps/emqx_connector/src/emqx_connector.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_connector, [ {description, "EMQX Data Integration Connectors"}, - {vsn, "0.1.33"}, + {vsn, "0.1.34"}, {registered, []}, {mod, {emqx_connector_app, []}}, {applications, [ diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index f07e038d2..30654bb13 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -108,18 +108,28 @@ config_key_path() -> pre_config_update([?ROOT_KEY], RawConf, RawConf) -> {ok, RawConf}; pre_config_update([?ROOT_KEY], NewConf, _RawConf) -> - {ok, convert_certs(NewConf)}; + case multi_validate_connector_names(NewConf) of + ok -> + {ok, convert_certs(NewConf)}; + Error -> + Error + end; pre_config_update(_, {_Oper, _, _}, undefined) -> {error, connector_not_found}; pre_config_update(_, {Oper, _Type, _Name}, OldConfig) -> %% to save the 'enable' to the config files {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}}; pre_config_update(Path, Conf, _OldConfig) when is_map(Conf) -> - case emqx_connector_ssl:convert_certs(filename:join(Path), Conf) of - {error, Reason} -> - {error, Reason}; - {ok, ConfNew} -> - {ok, ConfNew} + case validate_connector_name_in_config(Path) of + ok -> + case emqx_connector_ssl:convert_certs(filename:join(Path), Conf) of + {error, Reason} -> + {error, Reason}; + {ok, ConfNew} -> + {ok, ConfNew} + end; + Error -> + Error end. operation_to_enable(disable) -> false; @@ -458,3 +468,51 @@ ensure_no_channels(Configs) -> {error, Reason, _State} -> {error, Reason} end. + +to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8); +to_bin(B) when is_binary(B) -> B. + +validate_connector_name(ConnectorName) -> + try + _ = emqx_resource:validate_name(to_bin(ConnectorName)), + ok + catch + throw:Error -> + {error, Error} + end. + +validate_connector_name_in_config(Path) -> + case Path of + [?ROOT_KEY, _ConnectorType, ConnectorName] -> + validate_connector_name(ConnectorName); + _ -> + ok + end. + +multi_validate_connector_names(Conf) -> + ConnectorTypeAndNames = + [ + {Type, Name} + || {Type, NameToConf} <- maps:to_list(Conf), + {Name, _Conf} <- maps:to_list(NameToConf) + ], + BadConnectors = + lists:filtermap( + fun({Type, Name}) -> + case validate_connector_name(Name) of + ok -> false; + _Error -> {true, #{type => Type, name => Name}} + end + end, + ConnectorTypeAndNames + ), + case BadConnectors of + [] -> + ok; + [_ | _] -> + {error, #{ + kind => validation_error, + reason => bad_connector_names, + bad_connectors => BadConnectors + }} + end. diff --git a/apps/emqx_connector/test/emqx_connector_SUITE.erl b/apps/emqx_connector/test/emqx_connector_SUITE.erl index a62b5ed95..ee7e29741 100644 --- a/apps/emqx_connector/test/emqx_connector_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_SUITE.erl @@ -204,6 +204,71 @@ t_remove_fail(_Config) -> ), ok. +t_create_with_bad_name_direct_path({init, Config}) -> + meck:new(emqx_connector_ee_schema, [passthrough]), + meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR), + meck:new(?CONNECTOR, [non_strict]), + meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible), + meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}), + meck:expect(?CONNECTOR, on_stop, 2, ok), + meck:expect(?CONNECTOR, on_get_status, 2, connected), + Config; +t_create_with_bad_name_direct_path({'end', _Config}) -> + meck:unload(), + ok; +t_create_with_bad_name_direct_path(_Config) -> + Path = [connectors, kafka_producer, 'test_哈哈'], + ConnConfig0 = connector_config(), + %% Note: must contain SSL options to trigger original bug. + Cacertfile = emqx_common_test_helpers:app_path( + emqx, + filename:join(["etc", "certs", "cacert.pem"]) + ), + ConnConfig = ConnConfig0#{<<"ssl">> => #{<<"cacertfile">> => Cacertfile}}, + ?assertMatch( + {error, + {pre_config_update, _ConfigHandlerMod, #{ + kind := validation_error, + reason := <<"only 0-9a-zA-Z_- is allowed in resource name", _/binary>> + }}}, + emqx:update_config(Path, ConnConfig) + ), + ok. + +t_create_with_bad_name_root_path({init, Config}) -> + meck:new(emqx_connector_ee_schema, [passthrough]), + meck:expect(emqx_connector_ee_schema, resource_type, 1, ?CONNECTOR), + meck:new(?CONNECTOR, [non_strict]), + meck:expect(?CONNECTOR, callback_mode, 0, async_if_possible), + meck:expect(?CONNECTOR, on_start, 2, {ok, connector_state}), + meck:expect(?CONNECTOR, on_stop, 2, ok), + meck:expect(?CONNECTOR, on_get_status, 2, connected), + Config; +t_create_with_bad_name_root_path({'end', _Config}) -> + meck:unload(), + ok; +t_create_with_bad_name_root_path(_Config) -> + Path = [connectors], + BadConnectorName = <<"test_哈哈">>, + ConnConfig0 = connector_config(), + %% Note: must contain SSL options to trigger original bug. + Cacertfile = emqx_common_test_helpers:app_path( + emqx, + filename:join(["etc", "certs", "cacert.pem"]) + ), + ConnConfig = ConnConfig0#{<<"ssl">> => #{<<"cacertfile">> => Cacertfile}}, + Conf = #{<<"kafka_producer">> => #{BadConnectorName => ConnConfig}}, + ?assertMatch( + {error, + {pre_config_update, _ConfigHandlerMod, #{ + kind := validation_error, + reason := bad_connector_names, + bad_connectors := [#{type := <<"kafka_producer">>, name := BadConnectorName}] + }}}, + emqx:update_config(Path, Conf) + ), + ok. + %% helpers connector_config() -> diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index becbc8791..f6609808f 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -652,6 +652,28 @@ t_connectors_probe(Config) -> ), ok. +t_create_with_bad_name(Config) -> + ConnectorName = <<"test_哈哈">>, + Conf0 = ?KAFKA_CONNECTOR(ConnectorName), + %% Note: must contain SSL options to trigger original bug. + Cacertfile = emqx_common_test_helpers:app_path( + emqx, + filename:join(["etc", "certs", "cacert.pem"]) + ), + Conf = Conf0#{<<"ssl">> => #{<<"cacertfile">> => Cacertfile}}, + {ok, 400, #{ + <<"code">> := <<"BAD_REQUEST">>, + <<"message">> := Msg0 + }} = request_json( + post, + uri(["connectors"]), + Conf, + Config + ), + Msg = emqx_utils_json:decode(Msg0, [return_maps]), + ?assertMatch(#{<<"kind">> := <<"validation_error">>}, Msg), + ok. + %%% helpers listen_on_random_port() -> SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}], diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index f5bf65c0f..90df229e4 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -815,29 +815,21 @@ validate_name(<<>>, _Opts) -> invalid_data("name cannot be empty string"); validate_name(Name, _Opts) when size(Name) >= 255 -> invalid_data("name length must be less than 255"); -validate_name(Name0, Opts) -> - Name = unicode:characters_to_list(Name0, utf8), - case lists:all(fun is_id_char/1, Name) of - true -> +validate_name(Name, Opts) -> + case re:run(Name, <<"^[-0-9a-zA-Z_]+$">>, [{capture, none}]) of + match -> case maps:get(atom_name, Opts, true) of - % NOTE - % Rule may be created before bridge, thus not `list_to_existing_atom/1`, - % also it is infrequent user input anyway. - true -> list_to_atom(Name); - false -> Name0 + %% NOTE + %% Rule may be created before bridge, thus not `list_to_existing_atom/1`, + %% also it is infrequent user input anyway. + true -> binary_to_atom(Name, utf8); + false -> Name end; - false -> + nomatch -> invalid_data( - <<"only 0-9a-zA-Z_- is allowed in resource name, got: ", Name0/binary>> + <<"only 0-9a-zA-Z_- is allowed in resource name, got: ", Name/binary>> ) end. -spec invalid_data(binary()) -> no_return(). invalid_data(Reason) -> throw(#{kind => validation_error, reason => Reason}). - -is_id_char(C) when C >= $0 andalso C =< $9 -> true; -is_id_char(C) when C >= $a andalso C =< $z -> true; -is_id_char(C) when C >= $A andalso C =< $Z -> true; -is_id_char($_) -> true; -is_id_char($-) -> true; -is_id_char(_) -> false. From 2f1d88d4140abc40ff6e7c1b127dc6075974985a Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 14 Nov 2023 13:56:50 -0300 Subject: [PATCH 2/7] fix(bridges_v1): avoid merging action examples for non-v1 bridges Since some new bridges might not have a V1 equivalent (i.e. they are not registered in `emqx_bridge_enterprise`), we should avoid displaying their examples in the V1 API spec. --- apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl index 9456575d4..ca5ad74b6 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_enterprise.erl @@ -82,9 +82,7 @@ schema_modules() -> ]. examples(Method) -> - ActionExamples = emqx_bridge_v2_schema:examples(Method), - RegisteredExamples = registered_examples(Method), - maps:merge(ActionExamples, RegisteredExamples). + registered_examples(Method). registered_examples(Method) -> MergeFun = From 98f947f4f3eb92cff8b0cf8604a992e51c8d194a Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Wed, 15 Nov 2023 22:28:52 +0700 Subject: [PATCH 3/7] ci(router): fix flaky testcase --- apps/emqx/test/emqx_routing_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/test/emqx_routing_SUITE.erl b/apps/emqx/test/emqx_routing_SUITE.erl index 6966ac56a..a54e1b4dd 100644 --- a/apps/emqx/test/emqx_routing_SUITE.erl +++ b/apps/emqx/test/emqx_routing_SUITE.erl @@ -100,7 +100,7 @@ mk_config_listeners(N) -> t_cluster_routing(Config) -> Cluster = ?config(cluster, Config), - Clients = [C1, C2, C3] = [start_client(N) || N <- Cluster], + Clients = [C1, C2, C3] = lists:sort([start_client(N) || N <- Cluster]), Commands = [ {fun publish/3, [C1, <<"a/b/c">>, <<"wontsee">>]}, {fun publish/3, [C2, <<"a/b/d">>, <<"wontsee">>]}, From 90571b7d8eb05b7badcec810cce2d56d569d1c08 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 14 Nov 2023 13:33:07 -0300 Subject: [PATCH 4/7] test: fix noise about undefined unofficial callbacks --- apps/emqx/test/emqx_cth_suite.erl | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/apps/emqx/test/emqx_cth_suite.erl b/apps/emqx/test/emqx_cth_suite.erl index 5a59238de..401d4f59d 100644 --- a/apps/emqx/test/emqx_cth_suite.erl +++ b/apps/emqx/test/emqx_cth_suite.erl @@ -74,6 +74,9 @@ -export([merge_appspec/2]). +%% "Unofficial" `emqx_config_handler' and `emqx_conf' APIs +-export([schema_module/0, upgrade_raw_conf/1]). + -export_type([appspec/0]). -export_type([appspec_opts/0]). @@ -477,3 +480,18 @@ render_config(Config = #{}) -> unicode:characters_to_binary(hocon_pp:do(Config, #{})); render_config(Config) -> unicode:characters_to_binary(Config). + +%% + +%% "Unofficial" `emqx_config_handler' API +schema_module() -> + ?MODULE. + +%% "Unofficial" `emqx_conf' API +upgrade_raw_conf(Conf) -> + case emqx_release:edition() of + ee -> + emqx_enterprise_schema:upgrade_raw_conf(Conf); + ce -> + emqx_conf_schema:upgrade_raw_conf(Conf) + end. From 36b5d58957050790ebbc9d3d3068c8e5d76c71be Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 16 Nov 2023 09:17:54 -0300 Subject: [PATCH 5/7] test: reorganize test suite a bit --- .../emqx_bridge_v2_kafka_producer_SUITE.erl | 220 +++++++++--------- 1 file changed, 116 insertions(+), 104 deletions(-) diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index 6adb66357..fba72a1d7 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -25,6 +25,10 @@ -define(TYPE, kafka_producer). +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + all() -> emqx_common_test_helpers:all(?MODULE). @@ -51,6 +55,118 @@ end_per_suite(Config) -> emqx_cth_suite:stop(Apps), ok. +%%------------------------------------------------------------------------------------- +%% Helper fns +%%------------------------------------------------------------------------------------- + +check_send_message_with_bridge(BridgeName) -> + %% ###################################### + %% Create Kafka message + %% ###################################### + Time = erlang:unique_integer(), + BinTime = integer_to_binary(Time), + Payload = list_to_binary("payload" ++ integer_to_list(Time)), + Msg = #{ + clientid => BinTime, + payload => Payload, + timestamp => Time + }, + Offset = resolve_kafka_offset(), + %% ###################################### + %% Send message + %% ###################################### + emqx_bridge_v2:send_message(?TYPE, BridgeName, Msg, #{}), + %% ###################################### + %% Check if message is sent to Kafka + %% ###################################### + check_kafka_message_payload(Offset, Payload). + +resolve_kafka_offset() -> + KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), + Partition = 0, + Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(), + {ok, Offset0} = emqx_bridge_kafka_impl_producer_SUITE:resolve_kafka_offset( + Hosts, KafkaTopic, Partition + ), + Offset0. + +check_kafka_message_payload(Offset, ExpectedPayload) -> + KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), + Partition = 0, + Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(), + {ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset), + ?assertMatch(#kafka_message{value = ExpectedPayload}, KafkaMsg0). + +bridge_v2_config(ConnectorName) -> + #{ + <<"connector">> => ConnectorName, + <<"enable">> => true, + <<"kafka">> => #{ + <<"buffer">> => #{ + <<"memory_overload_protection">> => false, + <<"mode">> => <<"memory">>, + <<"per_partition_limit">> => <<"2GB">>, + <<"segment_bytes">> => <<"100MB">> + }, + <<"compression">> => <<"no_compression">>, + <<"kafka_header_value_encode_mode">> => <<"none">>, + <<"max_batch_bytes">> => <<"896KB">>, + <<"max_inflight">> => 10, + <<"message">> => #{ + <<"key">> => <<"${.clientid}">>, + <<"timestamp">> => <<"${.timestamp}">>, + <<"value">> => <<"${.payload}">> + }, + <<"partition_count_refresh_interval">> => <<"60s">>, + <<"partition_strategy">> => <<"random">>, + <<"query_mode">> => <<"sync">>, + <<"required_acks">> => <<"all_isr">>, + <<"sync_query_timeout">> => <<"5s">>, + <<"topic">> => emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition() + }, + <<"local_topic">> => <<"kafka_t/#">>, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"15s">> + } + }. + +connector_config() -> + #{ + <<"authentication">> => <<"none">>, + <<"bootstrap_hosts">> => iolist_to_binary(kafka_hosts_string()), + <<"connect_timeout">> => <<"5s">>, + <<"enable">> => true, + <<"metadata_request_timeout">> => <<"5s">>, + <<"min_metadata_refresh_interval">> => <<"3s">>, + <<"socket_opts">> => + #{ + <<"recbuf">> => <<"1024KB">>, + <<"sndbuf">> => <<"1024KB">>, + <<"tcp_keepalive">> => <<"none">> + }, + <<"ssl">> => + #{ + <<"ciphers">> => [], + <<"depth">> => 10, + <<"enable">> => false, + <<"hibernate_after">> => <<"5s">>, + <<"log_level">> => <<"notice">>, + <<"reuse_sessions">> => true, + <<"secure_renegotiate">> => true, + <<"verify">> => <<"verify_peer">>, + <<"versions">> => [<<"tlsv1.3">>, <<"tlsv1.2">>] + } + }. + +kafka_hosts_string() -> + KafkaHost = os:getenv("KAFKA_PLAIN_HOST", "kafka-1.emqx.net"), + KafkaPort = os:getenv("KAFKA_PLAIN_PORT", "9092"), + KafkaHost ++ ":" ++ KafkaPort. + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + t_create_remove_list(_) -> [] = emqx_bridge_v2:list(), ConnectorConfig = connector_config(), @@ -186,107 +302,3 @@ t_unknown_topic(_Config) -> emqx_bridge_v2_testlib:get_bridge_api(?TYPE, BridgeName) ), ok. - -check_send_message_with_bridge(BridgeName) -> - %% ###################################### - %% Create Kafka message - %% ###################################### - Time = erlang:unique_integer(), - BinTime = integer_to_binary(Time), - Payload = list_to_binary("payload" ++ integer_to_list(Time)), - Msg = #{ - clientid => BinTime, - payload => Payload, - timestamp => Time - }, - Offset = resolve_kafka_offset(), - %% ###################################### - %% Send message - %% ###################################### - emqx_bridge_v2:send_message(?TYPE, BridgeName, Msg, #{}), - %% ###################################### - %% Check if message is sent to Kafka - %% ###################################### - check_kafka_message_payload(Offset, Payload). - -resolve_kafka_offset() -> - KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), - Partition = 0, - Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(), - {ok, Offset0} = emqx_bridge_kafka_impl_producer_SUITE:resolve_kafka_offset( - Hosts, KafkaTopic, Partition - ), - Offset0. - -check_kafka_message_payload(Offset, ExpectedPayload) -> - KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), - Partition = 0, - Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(), - {ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset), - ?assertMatch(#kafka_message{value = ExpectedPayload}, KafkaMsg0). - -bridge_v2_config(ConnectorName) -> - #{ - <<"connector">> => ConnectorName, - <<"enable">> => true, - <<"kafka">> => #{ - <<"buffer">> => #{ - <<"memory_overload_protection">> => false, - <<"mode">> => <<"memory">>, - <<"per_partition_limit">> => <<"2GB">>, - <<"segment_bytes">> => <<"100MB">> - }, - <<"compression">> => <<"no_compression">>, - <<"kafka_header_value_encode_mode">> => <<"none">>, - <<"max_batch_bytes">> => <<"896KB">>, - <<"max_inflight">> => 10, - <<"message">> => #{ - <<"key">> => <<"${.clientid}">>, - <<"timestamp">> => <<"${.timestamp}">>, - <<"value">> => <<"${.payload}">> - }, - <<"partition_count_refresh_interval">> => <<"60s">>, - <<"partition_strategy">> => <<"random">>, - <<"query_mode">> => <<"sync">>, - <<"required_acks">> => <<"all_isr">>, - <<"sync_query_timeout">> => <<"5s">>, - <<"topic">> => emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition() - }, - <<"local_topic">> => <<"kafka_t/#">>, - <<"resource_opts">> => #{ - <<"health_check_interval">> => <<"15s">> - } - }. - -connector_config() -> - #{ - <<"authentication">> => <<"none">>, - <<"bootstrap_hosts">> => iolist_to_binary(kafka_hosts_string()), - <<"connect_timeout">> => <<"5s">>, - <<"enable">> => true, - <<"metadata_request_timeout">> => <<"5s">>, - <<"min_metadata_refresh_interval">> => <<"3s">>, - <<"socket_opts">> => - #{ - <<"recbuf">> => <<"1024KB">>, - <<"sndbuf">> => <<"1024KB">>, - <<"tcp_keepalive">> => <<"none">> - }, - <<"ssl">> => - #{ - <<"ciphers">> => [], - <<"depth">> => 10, - <<"enable">> => false, - <<"hibernate_after">> => <<"5s">>, - <<"log_level">> => <<"notice">>, - <<"reuse_sessions">> => true, - <<"secure_renegotiate">> => true, - <<"verify">> => <<"verify_peer">>, - <<"versions">> => [<<"tlsv1.3">>, <<"tlsv1.2">>] - } - }. - -kafka_hosts_string() -> - KafkaHost = os:getenv("KAFKA_PLAIN_HOST", "kafka-1.emqx.net"), - KafkaPort = os:getenv("KAFKA_PLAIN_PORT", "9092"), - KafkaHost ++ ":" ++ KafkaPort. From b92821188b43466f449f73572613ce1d9865dc1f Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 16 Nov 2023 10:08:41 -0300 Subject: [PATCH 6/7] fix(kafka_producer): make status `connecting` while the client fails to connect Fixes https://emqx.atlassian.net/browse/EMQX-11408 To make it consistent with the previous bridge behavior. Also, introduces macros for resource status to avoid problems with typos. --- apps/emqx_bridge/src/emqx_bridge_v2.erl | 25 ++++--- .../emqx_bridge/test/emqx_bridge_v2_SUITE.erl | 68 ++++++++++++++++--- .../test/emqx_bridge_v2_api_SUITE.erl | 4 +- .../test/emqx_bridge_v2_test_connector.erl | 4 +- .../src/emqx_bridge_kafka_impl_producer.erl | 10 +-- .../emqx_bridge_v2_kafka_producer_SUITE.erl | 40 +++++++++++ apps/emqx_resource/include/emqx_resource.hrl | 18 ++++- 7 files changed, 137 insertions(+), 32 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 70e248e56..7ce266922 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -202,33 +202,36 @@ lookup(Type, Name) -> %% The connector should always exist %% ... but, in theory, there might be no channels associated to it when we try %% to delete the connector, and then this reference will become dangling... - InstanceData = + ConnectorData = case emqx_resource:get_instance(ConnectorId) of {ok, _, Data} -> Data; {error, not_found} -> #{} end, - %% Find the Bridge V2 status from the InstanceData - Channels = maps:get(added_channels, InstanceData, #{}), + %% Find the Bridge V2 status from the ConnectorData + ConnectorStatus = maps:get(status, ConnectorData, undefined), + Channels = maps:get(added_channels, ConnectorData, #{}), BridgeV2Id = id(Type, Name, BridgeConnector), ChannelStatus = maps:get(BridgeV2Id, Channels, undefined), {DisplayBridgeV2Status, ErrorMsg} = - case ChannelStatus of - #{status := connected} -> - {connected, <<"">>}; - #{status := Status, error := undefined} -> + case {ChannelStatus, ConnectorStatus} of + {#{status := ?status_connected}, _} -> + {?status_connected, <<"">>}; + {#{error := resource_not_operational}, ?status_connecting} -> + {?status_connecting, <<"Not installed">>}; + {#{status := Status, error := undefined}, _} -> {Status, <<"Unknown reason">>}; - #{status := Status, error := Error} -> + {#{status := Status, error := Error}, _} -> {Status, emqx_utils:readable_error_msg(Error)}; - undefined -> - {disconnected, <<"Pending installation">>} + {undefined, _} -> + {?status_disconnected, <<"Not installed">>} end, {ok, #{ type => bin(Type), name => bin(Name), raw_config => RawConf, - resource_data => InstanceData, + resource_data => ConnectorData, status => DisplayBridgeV2Status, error => ErrorMsg }} diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index 2766088a1..791997fc3 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -20,6 +20,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). -import(emqx_common_test_helpers, [on_exit/1]). @@ -43,7 +44,7 @@ con_schema() -> { con_type(), hoconsc:mk( - hoconsc:map(name, typerefl:map()), + hoconsc:map(name, hoconsc:ref(?MODULE, connector_config)), #{ desc => <<"Test Connector Config">>, required => false @@ -52,6 +53,15 @@ con_schema() -> } ]. +fields(connector_config) -> + [ + {enable, hoconsc:mk(typerefl:boolean(), #{})}, + {resource_opts, hoconsc:mk(typerefl:map(), #{})}, + {on_start_fun, hoconsc:mk(typerefl:binary(), #{})}, + {on_get_status_fun, hoconsc:mk(typerefl:binary(), #{})}, + {on_add_channel_fun, hoconsc:mk(typerefl:binary(), #{})} + ]. + con_config() -> #{ <<"enable">> => true, @@ -112,6 +122,7 @@ setup_mocks() -> catch meck:new(emqx_connector_schema, MeckOpts), meck:expect(emqx_connector_schema, fields, 1, con_schema()), + meck:expect(emqx_connector_schema, connector_type_to_bridge_types, 1, [con_type()]), catch meck:new(emqx_connector_resource, MeckOpts), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, con_mod()), @@ -159,15 +170,7 @@ init_per_testcase(_TestCase, Config) -> ets:new(fun_table_name(), [named_table, public]), %% Create a fake connector {ok, _} = emqx_connector:create(con_type(), con_name(), con_config()), - [ - {mocked_mods, [ - emqx_connector_schema, - emqx_connector_resource, - - emqx_bridge_v2 - ]} - | Config - ]. + Config. end_per_testcase(_TestCase, _Config) -> ets:delete(fun_table_name()), @@ -846,6 +849,51 @@ t_start_operation_when_on_add_channel_gives_error(_Config) -> ), ok. +t_lookup_status_when_connecting(_Config) -> + ResponseETS = ets:new(response_ets, [public]), + ets:insert(ResponseETS, {on_get_status_value, ?status_connecting}), + OnGetStatusFun = wrap_fun(fun() -> + ets:lookup_element(ResponseETS, on_get_status_value, 2) + end), + + ConnectorConfig = emqx_utils_maps:deep_merge(con_config(), #{ + <<"on_get_status_fun">> => OnGetStatusFun, + <<"resource_opts">> => #{<<"start_timeout">> => 100} + }), + ConnectorName = ?FUNCTION_NAME, + ct:pal("connector config:\n ~p", [ConnectorConfig]), + {ok, _} = emqx_connector:create(con_type(), ConnectorName, ConnectorConfig), + + ActionName = my_test_action, + ChanStatusFun = wrap_fun(fun() -> ?status_disconnected end), + ActionConfig = (bridge_config())#{ + <<"on_get_channel_status_fun">> => ChanStatusFun, + <<"connector">> => atom_to_binary(ConnectorName) + }, + ct:pal("action config:\n ~p", [ActionConfig]), + {ok, _} = emqx_bridge_v2:create(bridge_type(), ActionName, ActionConfig), + + %% Top-level status is connecting if the connector status is connecting, but the + %% channel is not yet installed. `resource_data.added_channels.$channel_id.status' + %% contains true internal status. + {ok, Res} = emqx_bridge_v2:lookup(bridge_type(), ActionName), + ?assertMatch( + #{ + %% This is the action's public status + status := ?status_connecting, + resource_data := + #{ + %% This is the connector's status + status := ?status_connecting + } + }, + Res + ), + #{resource_data := #{added_channels := Channels}} = Res, + [{_Id, ChannelData}] = maps:to_list(Channels), + ?assertMatch(#{status := ?status_disconnected}, ChannelData), + ok. + %% Helper Functions wait_until(Fun) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl index 059f9ac9f..b99a462b4 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl @@ -587,7 +587,7 @@ t_broken_bridge_config(Config) -> <<"type">> := ?BRIDGE_TYPE, <<"connector">> := <<"does_not_exist">>, <<"status">> := <<"disconnected">>, - <<"error">> := <<"Pending installation">> + <<"error">> := <<"Not installed">> } ]}, request_json(get, uri([?ROOT]), Config) @@ -640,7 +640,7 @@ t_fix_broken_bridge_config(Config) -> <<"type">> := ?BRIDGE_TYPE, <<"connector">> := <<"does_not_exist">>, <<"status">> := <<"disconnected">>, - <<"error">> := <<"Pending installation">> + <<"error">> := <<"Not installed">> } ]}, request_json(get, uri([?ROOT]), Config) diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl b/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl index 0138832a0..3c5204ea1 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl @@ -43,8 +43,8 @@ on_start( ) -> Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef), Fun(Conf); -on_start(_InstId, _Config) -> - {ok, #{}}. +on_start(_InstId, Config) -> + {ok, Config}. on_add_channel( _InstId, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 4422d8dd5..84401aaa6 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -481,11 +481,11 @@ on_get_status( case wolff_client_sup:find_client(ClientId) of {ok, Pid} -> case wolff_client:check_connectivity(Pid) of - ok -> connected; - {error, Error} -> {connecting, State, Error} + ok -> ?status_connected; + {error, Error} -> {?status_connecting, State, Error} end; {error, _Reason} -> - connecting + ?status_connecting end. on_get_channel_status( @@ -499,10 +499,10 @@ on_get_channel_status( #{kafka_topic := KafkaTopic} = maps:get(ChannelId, Channels), try ok = check_topic_and_leader_connections(ClientId, KafkaTopic), - connected + ?status_connected catch throw:#{reason := restarting} -> - conneting + ?status_connecting end. check_topic_and_leader_connections(ClientId, KafkaTopic) -> diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index fba72a1d7..6c48146cd 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -23,6 +23,8 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("brod/include/brod.hrl"). +-import(emqx_common_test_helpers, [on_exit/1]). + -define(TYPE, kafka_producer). %%------------------------------------------------------------------------------ @@ -55,6 +57,13 @@ end_per_suite(Config) -> emqx_cth_suite:stop(Apps), ok. +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, _Config) -> + emqx_common_test_helpers:call_janitor(60_000), + ok. + %%------------------------------------------------------------------------------------- %% Helper fns %%------------------------------------------------------------------------------------- @@ -163,6 +172,16 @@ kafka_hosts_string() -> KafkaPort = os:getenv("KAFKA_PLAIN_PORT", "9092"), KafkaHost ++ ":" ++ KafkaPort. +create_connector(Name, Config) -> + Res = emqx_connector:create(?TYPE, Name, Config), + on_exit(fun() -> emqx_connector:remove(?TYPE, Name) end), + Res. + +create_action(Name, Config) -> + Res = emqx_bridge_v2:create(?TYPE, Name, Config), + on_exit(fun() -> emqx_bridge_v2:remove(?TYPE, Name) end), + Res. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -302,3 +321,24 @@ t_unknown_topic(_Config) -> emqx_bridge_v2_testlib:get_bridge_api(?TYPE, BridgeName) ), ok. + +t_bad_url(_Config) -> + ConnectorName = <<"test_connector">>, + ActionName = <<"test_action">>, + ActionConfig = bridge_v2_config(<<"test_connector">>), + ConnectorConfig0 = connector_config(), + ConnectorConfig = ConnectorConfig0#{<<"bootstrap_hosts">> := <<"bad_host:9092">>}, + ?assertMatch({ok, _}, create_connector(ConnectorName, ConnectorConfig)), + ?assertMatch({ok, _}, create_action(ActionName, ActionConfig)), + ?assertMatch( + {ok, #{ + resource_data := + #{ + status := connecting, + error := [#{reason := unresolvable_hostname}] + } + }}, + emqx_connector:lookup(?TYPE, ConnectorName) + ), + ?assertMatch({ok, #{status := connecting}}, emqx_bridge_v2:lookup(?TYPE, ActionName)), + ok. diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index fa86e68c9..b34da9a63 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -13,6 +13,16 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- + +%% bridge/connector/action status +-define(status_connected, connected). +-define(status_connecting, connecting). +-define(status_disconnected, disconnected). +%% Note: the `stopped' status can only be emitted by `emqx_resource_manager'... Modules +%% implementing `emqx_resource' behavior should not return it. The `rm_' prefix is to +%% remind us of that. +-define(rm_status_stopped, stopped). + -type resource_type() :: module(). -type resource_id() :: binary(). -type channel_id() :: binary(). @@ -21,8 +31,12 @@ -type resource_config() :: term(). -type resource_spec() :: map(). -type resource_state() :: term(). --type resource_status() :: connected | disconnected | connecting | stopped. --type channel_status() :: connected | connecting | disconnected. +%% Note: the `stopped' status can only be emitted by `emqx_resource_manager'... Modules +%% implementing `emqx_resource' behavior should not return it. +-type resource_status() :: + ?status_connected | ?status_disconnected | ?status_connecting | ?rm_status_stopped. +-type health_check_status() :: ?status_connected | ?status_disconnected | ?status_connecting. +-type channel_status() :: ?status_connected | ?status_connecting | ?status_disconnected. -type callback_mode() :: always_sync | async_if_possible. -type query_mode() :: simple_sync From 58437cd35ac8f4eae945ea2e54bee44088e23a59 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Fri, 17 Nov 2023 12:34:36 +0300 Subject: [PATCH 7/7] fix(mongodb): fix deadlock while stopping mongodb resource --- apps/emqx_mongodb/rebar.config | 2 +- changes/ce/fix-11955.en.md | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 changes/ce/fix-11955.en.md diff --git a/apps/emqx_mongodb/rebar.config b/apps/emqx_mongodb/rebar.config index 577dee8b8..5be42ef17 100644 --- a/apps/emqx_mongodb/rebar.config +++ b/apps/emqx_mongodb/rebar.config @@ -3,5 +3,5 @@ {erl_opts, [debug_info]}. {deps, [ {emqx_connector, {path, "../../apps/emqx_connector"}} , {emqx_resource, {path, "../../apps/emqx_resource"}} - , {mongodb, {git, "https://github.com/emqx/mongodb-erlang", {tag, "v3.0.21"}}} + , {mongodb, {git, "https://github.com/emqx/mongodb-erlang", {tag, "v3.0.22"}}} ]}. diff --git a/changes/ce/fix-11955.en.md b/changes/ce/fix-11955.en.md new file mode 100644 index 000000000..aae3f0602 --- /dev/null +++ b/changes/ce/fix-11955.en.md @@ -0,0 +1 @@ +Fix EMQX graceful stop when there is an unavailable MongoDB resource present.