From 4fe1d332b30d5766ca5a7655b19e5a6a5b1bbf6e Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Thu, 2 Nov 2023 16:49:50 +0100 Subject: [PATCH 1/4] fix(emqx_bridge): don't crash if there's no status field --- apps/emqx_bridge/src/emqx_bridge_v2_api.erl | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index 1935a1b5a..0472be443 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -718,7 +718,13 @@ node_status(Bridges) -> aggregate_status(AllStatus) -> Head = fun([A | _]) -> A end, HeadVal = maps:get(status, Head(AllStatus), connecting), - AllRes = lists:all(fun(#{status := Val}) -> Val == HeadVal end, AllStatus), + AllRes = lists:all( + fun + (#{status := Val}) -> Val == HeadVal; + (_) -> false + end, + AllStatus + ), case AllRes of true -> HeadVal; false -> inconsistent From 0d332846725595788759f7b8a8bf71969d7e13ad Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Mon, 13 Nov 2023 14:05:46 +0100 Subject: [PATCH 2/4] fix(emqx_bridge): don't crash uninstallling broken action --- apps/emqx_bridge/src/emqx_bridge_v2.erl | 37 +++++++------------------ 1 file changed, 10 insertions(+), 27 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 5e42b4881..d8646f952 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -360,28 +360,6 @@ uninstall_bridge_v2( %% Already not installed ok; uninstall_bridge_v2( - BridgeV2Type, - BridgeName, - Config -) -> - uninstall_bridge_v2_helper( - BridgeV2Type, - BridgeName, - combine_connector_and_bridge_v2_config( - BridgeV2Type, - BridgeName, - Config - ) - ). - -uninstall_bridge_v2_helper( - _BridgeV2Type, - _BridgeName, - {error, Reason} = Error -) -> - ?SLOG(error, Reason), - Error; -uninstall_bridge_v2_helper( BridgeV2Type, BridgeName, #{connector := ConnectorName} = Config @@ -390,11 +368,16 @@ uninstall_bridge_v2_helper( CreationOpts = emqx_resource:fetch_creation_opts(Config), ok = emqx_resource_buffer_worker_sup:stop_workers(BridgeV2Id, CreationOpts), ok = emqx_resource:clear_metrics(BridgeV2Id), - %% Deinstall from connector - ConnectorId = emqx_connector_resource:resource_id( - connector_type(BridgeV2Type), ConnectorName - ), - emqx_resource_manager:remove_channel(ConnectorId, BridgeV2Id). + case combine_connector_and_bridge_v2_config(BridgeV2Type, BridgeName, Config) of + {error, _} -> + ok; + _CombinedConfig -> + %% Deinstall from connector + ConnectorId = emqx_connector_resource:resource_id( + connector_type(BridgeV2Type), ConnectorName + ), + emqx_resource_manager:remove_channel(ConnectorId, BridgeV2Id) + end. combine_connector_and_bridge_v2_config( BridgeV2Type, From 0b3645057dc21b4ae4c75c04089c8e9965e79d2b Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Mon, 13 Nov 2023 14:06:31 +0100 Subject: [PATCH 3/4] fix(emqx_bridge): don't try to handle what we don't understand --- apps/emqx_bridge/src/emqx_bridge_v2_api.erl | 2 -- 1 file changed, 2 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index 0472be443..634337d99 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -801,8 +801,6 @@ do_create_or_update_bridge(BridgeType, BridgeName, Conf, HttpStatusCode) -> PreOrPostConfigUpdate =:= pre_config_update; PreOrPostConfigUpdate =:= post_config_update -> - ?BAD_REQUEST(map_to_json(redact(Reason))); - {error, Reason} -> ?BAD_REQUEST(map_to_json(redact(Reason))) end. From 17b18849de07bfcad4e808df3f8c648e8218a5a3 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Sun, 5 Nov 2023 15:25:13 +0100 Subject: [PATCH 4/4] test(emqx_bridge): test broken config --- .../test/emqx_bridge_v2_api_SUITE.erl | 115 +++++++++++++++++- 1 file changed, 114 insertions(+), 1 deletion(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl index 9879fe1e6..059f9ac9f 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl @@ -177,7 +177,9 @@ all() -> groups() -> AllTCs = emqx_common_test_helpers:all(?MODULE), SingleOnlyTests = [ - t_bridges_probe + t_bridges_probe, + t_broken_bridge_config, + t_fix_broken_bridge_config ], ClusterLaterJoinOnlyTCs = [ % t_cluster_later_join_metrics @@ -551,6 +553,117 @@ t_bridges_lifecycle(Config) -> {ok, 400, _} = request(post, uri([?ROOT]), ?KAFKA_BRIDGE(<<"a.b">>), Config), ok. +t_broken_bridge_config(Config) -> + emqx_cth_suite:stop_apps([emqx_bridge]), + BridgeName = ?BRIDGE_NAME, + StartOps = + #{ + config => + "actions {\n" + " " + ?BRIDGE_TYPE_STR + " {\n" + " " ++ binary_to_list(BridgeName) ++ + " {\n" + " connector = does_not_exist\n" + " enable = true\n" + " kafka {\n" + " topic = test-topic-one-partition\n" + " }\n" + " local_topic = \"mqtt/local/topic\"\n" + " resource_opts {health_check_interval = 32s}\n" + " }\n" + " }\n" + "}\n" + "\n", + schema_mod => emqx_bridge_v2_schema + }, + emqx_cth_suite:start_app(emqx_bridge, StartOps), + + ?assertMatch( + {ok, 200, [ + #{ + <<"name">> := BridgeName, + <<"type">> := ?BRIDGE_TYPE, + <<"connector">> := <<"does_not_exist">>, + <<"status">> := <<"disconnected">>, + <<"error">> := <<"Pending installation">> + } + ]}, + request_json(get, uri([?ROOT]), Config) + ), + + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME), + ?assertEqual( + {ok, 204, <<>>}, + request(delete, uri([?ROOT, BridgeID]), Config) + ), + + ?assertEqual( + {ok, 200, []}, + request_json(get, uri([?ROOT]), Config) + ), + + ok. + +t_fix_broken_bridge_config(Config) -> + emqx_cth_suite:stop_apps([emqx_bridge]), + BridgeName = ?BRIDGE_NAME, + StartOps = + #{ + config => + "actions {\n" + " " + ?BRIDGE_TYPE_STR + " {\n" + " " ++ binary_to_list(BridgeName) ++ + " {\n" + " connector = does_not_exist\n" + " enable = true\n" + " kafka {\n" + " topic = test-topic-one-partition\n" + " }\n" + " local_topic = \"mqtt/local/topic\"\n" + " resource_opts {health_check_interval = 32s}\n" + " }\n" + " }\n" + "}\n" + "\n", + schema_mod => emqx_bridge_v2_schema + }, + emqx_cth_suite:start_app(emqx_bridge, StartOps), + + ?assertMatch( + {ok, 200, [ + #{ + <<"name">> := BridgeName, + <<"type">> := ?BRIDGE_TYPE, + <<"connector">> := <<"does_not_exist">>, + <<"status">> := <<"disconnected">>, + <<"error">> := <<"Pending installation">> + } + ]}, + request_json(get, uri([?ROOT]), Config) + ), + + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME), + request_json( + put, + uri([?ROOT, BridgeID]), + ?KAFKA_BRIDGE_UPDATE(?BRIDGE_NAME, ?CONNECTOR_NAME), + Config + ), + + ?assertMatch( + {ok, 200, #{ + <<"connector">> := ?CONNECTOR_NAME, + <<"status">> := <<"connected">> + }}, + request_json(get, uri([?ROOT, BridgeID]), Config) + ), + + ok. + t_start_bridge_unknown_node(Config) -> {ok, 404, _} = request(