diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl index 9879fe1e6..059f9ac9f 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl @@ -177,7 +177,9 @@ all() -> groups() -> AllTCs = emqx_common_test_helpers:all(?MODULE), SingleOnlyTests = [ - t_bridges_probe + t_bridges_probe, + t_broken_bridge_config, + t_fix_broken_bridge_config ], ClusterLaterJoinOnlyTCs = [ % t_cluster_later_join_metrics @@ -551,6 +553,117 @@ t_bridges_lifecycle(Config) -> {ok, 400, _} = request(post, uri([?ROOT]), ?KAFKA_BRIDGE(<<"a.b">>), Config), ok. +t_broken_bridge_config(Config) -> + emqx_cth_suite:stop_apps([emqx_bridge]), + BridgeName = ?BRIDGE_NAME, + StartOps = + #{ + config => + "actions {\n" + " " + ?BRIDGE_TYPE_STR + " {\n" + " " ++ binary_to_list(BridgeName) ++ + " {\n" + " connector = does_not_exist\n" + " enable = true\n" + " kafka {\n" + " topic = test-topic-one-partition\n" + " }\n" + " local_topic = \"mqtt/local/topic\"\n" + " resource_opts {health_check_interval = 32s}\n" + " }\n" + " }\n" + "}\n" + "\n", + schema_mod => emqx_bridge_v2_schema + }, + emqx_cth_suite:start_app(emqx_bridge, StartOps), + + ?assertMatch( + {ok, 200, [ + #{ + <<"name">> := BridgeName, + <<"type">> := ?BRIDGE_TYPE, + <<"connector">> := <<"does_not_exist">>, + <<"status">> := <<"disconnected">>, + <<"error">> := <<"Pending installation">> + } + ]}, + request_json(get, uri([?ROOT]), Config) + ), + + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME), + ?assertEqual( + {ok, 204, <<>>}, + request(delete, uri([?ROOT, BridgeID]), Config) + ), + + ?assertEqual( + {ok, 200, []}, + request_json(get, uri([?ROOT]), Config) + ), + + ok. + +t_fix_broken_bridge_config(Config) -> + emqx_cth_suite:stop_apps([emqx_bridge]), + BridgeName = ?BRIDGE_NAME, + StartOps = + #{ + config => + "actions {\n" + " " + ?BRIDGE_TYPE_STR + " {\n" + " " ++ binary_to_list(BridgeName) ++ + " {\n" + " connector = does_not_exist\n" + " enable = true\n" + " kafka {\n" + " topic = test-topic-one-partition\n" + " }\n" + " local_topic = \"mqtt/local/topic\"\n" + " resource_opts {health_check_interval = 32s}\n" + " }\n" + " }\n" + "}\n" + "\n", + schema_mod => emqx_bridge_v2_schema + }, + emqx_cth_suite:start_app(emqx_bridge, StartOps), + + ?assertMatch( + {ok, 200, [ + #{ + <<"name">> := BridgeName, + <<"type">> := ?BRIDGE_TYPE, + <<"connector">> := <<"does_not_exist">>, + <<"status">> := <<"disconnected">>, + <<"error">> := <<"Pending installation">> + } + ]}, + request_json(get, uri([?ROOT]), Config) + ), + + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME), + request_json( + put, + uri([?ROOT, BridgeID]), + ?KAFKA_BRIDGE_UPDATE(?BRIDGE_NAME, ?CONNECTOR_NAME), + Config + ), + + ?assertMatch( + {ok, 200, #{ + <<"connector">> := ?CONNECTOR_NAME, + <<"status">> := <<"connected">> + }}, + request_json(get, uri([?ROOT, BridgeID]), Config) + ), + + ok. + t_start_bridge_unknown_node(Config) -> {ok, 404, _} = request(