From 3880862c810b386b35ebcd83fe0c596cb10b3809 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Tue, 21 Mar 2023 14:22:04 +0100 Subject: [PATCH] fix(emqx_bridge): return 503 for inconsistency in bridge setup --- apps/emqx_bridge/src/emqx_bridge_api.erl | 10 ++++++- .../test/emqx_bridge_api_SUITE.erl | 30 +++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 917b44096..f7a1bb345 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -980,7 +980,15 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, BridgeType, BridgeName]) -> bin(io_lib:format("Failed to start ~p pool for reason ~p", [Name, Reason])) ); {error, not_found} -> - ?BRIDGE_NOT_FOUND(BridgeType, BridgeName); + BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName), + ?SLOG(warning, #{ + msg => "bridge_inconsistent_in_cluster_for_call_operation", + reason => not_found, + type => BridgeType, + name => BridgeName, + bridge => BridgeId + }), + ?SERVICE_UNAVAILABLE(<<"Bridge not found on remote node: ", BridgeId/binary>>); {error, {node_not_found, Node}} -> ?NOT_FOUND(<<"Node not found: ", (atom_to_binary(Node))/binary>>); {error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' -> diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 986c0d29d..45ab2b623 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -98,6 +98,20 @@ init_per_testcase(t_old_bpapi_vsn, Config) -> meck:expect(emqx_bpapi, supported_version, 1, 1), meck:expect(emqx_bpapi, supported_version, 2, 1), init_per_testcase(common, Config); +init_per_testcase(StartStop, Config) when + StartStop == t_start_stop_bridges_cluster; + StartStop == t_start_stop_bridges_node +-> + meck:new(emqx_bridge_resource, [passthrough]), + meck:expect( + emqx_bridge_resource, + stop, + fun + (_, <<"bridge_not_found">>) -> {error, not_found}; + (Type, Name) -> meck:passthrough([Type, Name]) + end + ), + init_per_testcase(common, Config); init_per_testcase(_, Config) -> {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), {Port, Sock, Acceptor} = start_http_server(fun handle_fun_200_ok/2), @@ -109,6 +123,12 @@ end_per_testcase(t_broken_bpapi_vsn, Config) -> end_per_testcase(t_old_bpapi_vsn, Config) -> meck:unload([emqx_bpapi]), end_per_testcase(common, Config); +end_per_testcase(StartStop, Config) when + StartStop == t_start_stop_bridges_cluster; + StartStop == t_start_stop_bridges_node +-> + meck:unload([emqx_bridge_resource]), + end_per_testcase(common, Config); end_per_testcase(_, Config) -> Sock = ?config(sock, Config), Acceptor = ?config(acceptor, Config), @@ -626,6 +646,16 @@ do_start_stop_bridges(Type, Config) -> %% Looks ok but doesn't exist {ok, 404, _} = request(post, operation_path(Type, start, <<"webhook:cptn_hook">>), <<"">>), + %% + {ok, 201, _Bridge} = request( + post, + uri(["bridges"]), + ?HTTP_BRIDGE(URL1, <<"bridge_not_found">>) + ), + {ok, 503, _} = request( + post, operation_path(Type, stop, <<"webhook:bridge_not_found">>), <<"">> + ), + %% Create broken bridge {ListenPort, Sock} = listen_on_random_port(), %% Connecting to this endpoint should always timeout