Merge pull request #11937 from sstrigler/EMQX-11318-crash-in-get-bridges-v-2-if-a-broken-bridge-is-configured

Emqx 11318 crash in get bridges v 2 if a broken bridge is configured
This commit is contained in:
Stefan Strigler 2023-11-13 17:28:56 +01:00 committed by GitHub
commit fc2891d6de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 131 additions and 31 deletions

View File

@ -360,28 +360,6 @@ uninstall_bridge_v2(
%% Already not installed %% Already not installed
ok; ok;
uninstall_bridge_v2( uninstall_bridge_v2(
BridgeV2Type,
BridgeName,
Config
) ->
uninstall_bridge_v2_helper(
BridgeV2Type,
BridgeName,
combine_connector_and_bridge_v2_config(
BridgeV2Type,
BridgeName,
Config
)
).
uninstall_bridge_v2_helper(
_BridgeV2Type,
_BridgeName,
{error, Reason} = Error
) ->
?SLOG(error, Reason),
Error;
uninstall_bridge_v2_helper(
BridgeV2Type, BridgeV2Type,
BridgeName, BridgeName,
#{connector := ConnectorName} = Config #{connector := ConnectorName} = Config
@ -390,11 +368,16 @@ uninstall_bridge_v2_helper(
CreationOpts = emqx_resource:fetch_creation_opts(Config), CreationOpts = emqx_resource:fetch_creation_opts(Config),
ok = emqx_resource_buffer_worker_sup:stop_workers(BridgeV2Id, CreationOpts), ok = emqx_resource_buffer_worker_sup:stop_workers(BridgeV2Id, CreationOpts),
ok = emqx_resource:clear_metrics(BridgeV2Id), ok = emqx_resource:clear_metrics(BridgeV2Id),
%% Deinstall from connector case combine_connector_and_bridge_v2_config(BridgeV2Type, BridgeName, Config) of
ConnectorId = emqx_connector_resource:resource_id( {error, _} ->
connector_type(BridgeV2Type), ConnectorName ok;
), _CombinedConfig ->
emqx_resource_manager:remove_channel(ConnectorId, BridgeV2Id). %% Deinstall from connector
ConnectorId = emqx_connector_resource:resource_id(
connector_type(BridgeV2Type), ConnectorName
),
emqx_resource_manager:remove_channel(ConnectorId, BridgeV2Id)
end.
combine_connector_and_bridge_v2_config( combine_connector_and_bridge_v2_config(
BridgeV2Type, BridgeV2Type,

View File

@ -718,7 +718,13 @@ node_status(Bridges) ->
aggregate_status(AllStatus) -> aggregate_status(AllStatus) ->
Head = fun([A | _]) -> A end, Head = fun([A | _]) -> A end,
HeadVal = maps:get(status, Head(AllStatus), connecting), HeadVal = maps:get(status, Head(AllStatus), connecting),
AllRes = lists:all(fun(#{status := Val}) -> Val == HeadVal end, AllStatus), AllRes = lists:all(
fun
(#{status := Val}) -> Val == HeadVal;
(_) -> false
end,
AllStatus
),
case AllRes of case AllRes of
true -> HeadVal; true -> HeadVal;
false -> inconsistent false -> inconsistent
@ -795,8 +801,6 @@ do_create_or_update_bridge(BridgeType, BridgeName, Conf, HttpStatusCode) ->
PreOrPostConfigUpdate =:= pre_config_update; PreOrPostConfigUpdate =:= pre_config_update;
PreOrPostConfigUpdate =:= post_config_update PreOrPostConfigUpdate =:= post_config_update
-> ->
?BAD_REQUEST(map_to_json(redact(Reason)));
{error, Reason} ->
?BAD_REQUEST(map_to_json(redact(Reason))) ?BAD_REQUEST(map_to_json(redact(Reason)))
end. end.

View File

@ -177,7 +177,9 @@ all() ->
groups() -> groups() ->
AllTCs = emqx_common_test_helpers:all(?MODULE), AllTCs = emqx_common_test_helpers:all(?MODULE),
SingleOnlyTests = [ SingleOnlyTests = [
t_bridges_probe t_bridges_probe,
t_broken_bridge_config,
t_fix_broken_bridge_config
], ],
ClusterLaterJoinOnlyTCs = [ ClusterLaterJoinOnlyTCs = [
% t_cluster_later_join_metrics % t_cluster_later_join_metrics
@ -551,6 +553,117 @@ t_bridges_lifecycle(Config) ->
{ok, 400, _} = request(post, uri([?ROOT]), ?KAFKA_BRIDGE(<<"a.b">>), Config), {ok, 400, _} = request(post, uri([?ROOT]), ?KAFKA_BRIDGE(<<"a.b">>), Config),
ok. ok.
t_broken_bridge_config(Config) ->
emqx_cth_suite:stop_apps([emqx_bridge]),
BridgeName = ?BRIDGE_NAME,
StartOps =
#{
config =>
"actions {\n"
" "
?BRIDGE_TYPE_STR
" {\n"
" " ++ binary_to_list(BridgeName) ++
" {\n"
" connector = does_not_exist\n"
" enable = true\n"
" kafka {\n"
" topic = test-topic-one-partition\n"
" }\n"
" local_topic = \"mqtt/local/topic\"\n"
" resource_opts {health_check_interval = 32s}\n"
" }\n"
" }\n"
"}\n"
"\n",
schema_mod => emqx_bridge_v2_schema
},
emqx_cth_suite:start_app(emqx_bridge, StartOps),
?assertMatch(
{ok, 200, [
#{
<<"name">> := BridgeName,
<<"type">> := ?BRIDGE_TYPE,
<<"connector">> := <<"does_not_exist">>,
<<"status">> := <<"disconnected">>,
<<"error">> := <<"Pending installation">>
}
]},
request_json(get, uri([?ROOT]), Config)
),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME),
?assertEqual(
{ok, 204, <<>>},
request(delete, uri([?ROOT, BridgeID]), Config)
),
?assertEqual(
{ok, 200, []},
request_json(get, uri([?ROOT]), Config)
),
ok.
t_fix_broken_bridge_config(Config) ->
emqx_cth_suite:stop_apps([emqx_bridge]),
BridgeName = ?BRIDGE_NAME,
StartOps =
#{
config =>
"actions {\n"
" "
?BRIDGE_TYPE_STR
" {\n"
" " ++ binary_to_list(BridgeName) ++
" {\n"
" connector = does_not_exist\n"
" enable = true\n"
" kafka {\n"
" topic = test-topic-one-partition\n"
" }\n"
" local_topic = \"mqtt/local/topic\"\n"
" resource_opts {health_check_interval = 32s}\n"
" }\n"
" }\n"
"}\n"
"\n",
schema_mod => emqx_bridge_v2_schema
},
emqx_cth_suite:start_app(emqx_bridge, StartOps),
?assertMatch(
{ok, 200, [
#{
<<"name">> := BridgeName,
<<"type">> := ?BRIDGE_TYPE,
<<"connector">> := <<"does_not_exist">>,
<<"status">> := <<"disconnected">>,
<<"error">> := <<"Pending installation">>
}
]},
request_json(get, uri([?ROOT]), Config)
),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME),
request_json(
put,
uri([?ROOT, BridgeID]),
?KAFKA_BRIDGE_UPDATE(?BRIDGE_NAME, ?CONNECTOR_NAME),
Config
),
?assertMatch(
{ok, 200, #{
<<"connector">> := ?CONNECTOR_NAME,
<<"status">> := <<"connected">>
}},
request_json(get, uri([?ROOT, BridgeID]), Config)
),
ok.
t_start_bridge_unknown_node(Config) -> t_start_bridge_unknown_node(Config) ->
{ok, 404, _} = {ok, 404, _} =
request( request(