From e31f4d609162e983911e539c046a1f151d7d5593 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Wed, 8 Mar 2023 15:37:23 +0100 Subject: [PATCH 01/12] refactor(emqx_bridge): add BAD_REQUEST macro and minor cleanups --- apps/emqx_bridge/src/emqx_bridge_api.erl | 52 ++++++++++-------------- 1 file changed, 21 insertions(+), 31 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index ff55976d0..a65990443 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -46,6 +46,8 @@ -export([lookup_from_local_node/2]). +-define(BAD_REQUEST(Reason), {400, error_msg('BAD_REQUEST', Reason)}). + -define(NOT_FOUND(Reason), {404, error_msg('NOT_FOUND', Reason)}). -define(BRIDGE_NOT_FOUND(Type, Name), @@ -477,9 +479,11 @@ schema("/bridges_probe") -> {ok, _} -> {400, error_msg('ALREADY_EXISTS', <<"bridge already exists">>)}; {error, not_found} -> - case ensure_bridge_created(BridgeType, BridgeName, Conf) of - ok -> lookup_from_all_nodes(BridgeType, BridgeName, 201); - {error, Error} -> {400, Error} + case emqx_bridge:create(BridgeType, BridgeName, Conf) of + {ok, _} -> + lookup_from_all_nodes(BridgeType, BridgeName, 201); + {error, Reason} -> + ?BAD_REQUEST(Reason) end end; '/bridges'(get, _Params) -> @@ -499,11 +503,11 @@ schema("/bridges_probe") -> {ok, _} -> RawConf = emqx:get_raw_config([bridges, BridgeType, BridgeName], #{}), Conf = deobfuscate(Conf1, RawConf), - case ensure_bridge_created(BridgeType, BridgeName, Conf) of - ok -> + case emqx_bridge:create(BridgeType, BridgeName, Conf) of + {ok, _} -> lookup_from_all_nodes(BridgeType, BridgeName, 200); - {error, Error} -> - {400, Error} + {error, Reason} -> + ?BAD_REQUEST(Reason) end; {error, not_found} -> ?BRIDGE_NOT_FOUND(BridgeType, BridgeName) @@ -524,12 +528,10 @@ schema("/bridges_probe") -> {ok, _} -> 204; {error, {rules_deps_on_this_bridge, RuleIds}} -> - {400, - error_msg( - 'BAD_REQUEST', - {<<"Can not delete bridge while active rules defined for this bridge">>, - RuleIds} - )}; + ?BAD_REQUEST( + {<<"Can not delete bridge while active rules defined for this bridge">>, + RuleIds} + ); {error, timeout} -> {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; {error, Reason} -> @@ -561,7 +563,7 @@ schema("/bridges_probe") -> Params1 = maybe_deobfuscate_bridge_probe(Params), case emqx_bridge_resource:create_dry_run(ConnType, maps:remove(<<"type">>, Params1)) of ok -> - {204}; + 204; {error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' -> {400, error_msg('TEST_FAILED', to_hr_reason(Reason))} end; @@ -615,7 +617,7 @@ lookup_from_local_node(BridgeType, BridgeName) -> OperFunc -> case emqx_bridge:disable_enable(OperFunc, BridgeType, BridgeName) of {ok, _} -> - {204}; + 204; {error, {pre_config_update, _, bridge_not_found}} -> ?BRIDGE_NOT_FOUND(BridgeType, BridgeName); {error, {_, _, timeout}} -> @@ -656,11 +658,7 @@ lookup_from_local_node(BridgeType, BridgeName) -> ConfMap = emqx:get_config([bridges, BridgeType, BridgeName]), case maps:get(enable, ConfMap, false) of false -> - {400, - error_msg( - 'BAD_REQUEST', - <<"Forbidden operation, bridge not enabled">> - )}; + ?BAD_REQUEST(<<"Forbidden operation, bridge not enabled">>); true -> case emqx_misc:safe_to_existing_atom(Node, utf8) of {ok, TargetNode} -> @@ -688,12 +686,6 @@ enable_func(<<"true">>) -> enable; enable_func(<<"false">>) -> disable; enable_func(_) -> invalid. -ensure_bridge_created(BridgeType, BridgeName, Conf) -> - case emqx_bridge:create(BridgeType, BridgeName, Conf) of - {ok, _} -> ok; - {error, Reason} -> {error, error_msg('BAD_REQUEST', Reason)} - end. - zip_bridges([BridgesFirstNode | _] = BridgesAllNodes) -> lists:foldl( fun(#{type := Type, name := Name}, Acc) -> @@ -932,10 +924,8 @@ bin(S) when is_binary(S) -> call_operation(NodeOrAll, OperFunc, Args) -> case is_ok(do_bpapi_call(NodeOrAll, OperFunc, Args)) of - ok -> - {204}; - {ok, _} -> - {204}; + Ok when Ok =:= ok; is_tuple(Ok), element(1, Ok) =:= ok -> + 204; {error, not_implemented} -> %% Should only happen if we call `start` on a node that is %% still on an older bpapi version that doesn't support it. @@ -954,7 +944,7 @@ call_operation(NodeOrAll, OperFunc, Args) -> ) )}; {error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' -> - {400, error_msg('BAD_REQUEST', to_hr_reason(Reason))} + ?BAD_REQUEST(to_hr_reason(Reason)) end. maybe_try_restart(all, start_bridges_to_all_nodes, Args) -> From fb3d101b3a4815b36d63fed4d31a225c785d5b43 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Wed, 8 Mar 2023 16:32:30 +0100 Subject: [PATCH 02/12] refactor(emqx_bridge): fix var names --- apps/emqx_bridge/src/emqx_bridge_api.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index a65990443..779f9e93d 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -50,10 +50,10 @@ -define(NOT_FOUND(Reason), {404, error_msg('NOT_FOUND', Reason)}). --define(BRIDGE_NOT_FOUND(Type, Name), +-define(BRIDGE_NOT_FOUND(BridgeType, BridgeName), ?NOT_FOUND( - <<"Bridge lookup failed: bridge named '", Name/binary, "' of type ", - (atom_to_binary(Type))/binary, " does not exist.">> + <<"Bridge lookup failed: bridge named '", BridgeName/binary, "' of type ", + (atom_to_binary(BridgeType))/binary, " does not exist.">> ) ). From 80b81748dfcca1b862874971bd958d2d142474ae Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Wed, 8 Mar 2023 16:34:19 +0100 Subject: [PATCH 03/12] fix(emqx_bridge): handle bridge not found in call_operation --- apps/emqx_bridge/src/emqx_bridge_api.erl | 6 +- .../test/emqx_bridge_api_SUITE.erl | 75 ++++++++++--------- 2 files changed, 44 insertions(+), 37 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 779f9e93d..6ad116cd3 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -922,7 +922,7 @@ bin(S) when is_atom(S) -> bin(S) when is_binary(S) -> S. -call_operation(NodeOrAll, OperFunc, Args) -> +call_operation(NodeOrAll, OperFunc, Args = [_Nodes, BridgeType, BridgeName]) -> case is_ok(do_bpapi_call(NodeOrAll, OperFunc, Args)) of Ok when Ok =:= ok; is_tuple(Ok), element(1, Ok) =:= ok -> 204; @@ -943,6 +943,8 @@ call_operation(NodeOrAll, OperFunc, Args) -> ) ) )}; + {error, not_found} -> + ?BRIDGE_NOT_FOUND(BridgeType, BridgeName); {error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' -> ?BAD_REQUEST(to_hr_reason(Reason)) end. @@ -952,7 +954,7 @@ maybe_try_restart(all, start_bridges_to_all_nodes, Args) -> maybe_try_restart(Node, start_bridge_to_node, Args) -> call_operation(Node, restart_bridge_to_node, Args); maybe_try_restart(_, _, _) -> - {501}. + 501. do_bpapi_call(all, Call, Args) -> maybe_unwrap( diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index d30a9bff8..8feb2bcc6 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -214,7 +214,7 @@ t_http_crud_apis(Config) -> <<"status">> := _, <<"node_status">> := [_ | _], <<"url">> := URL1 - } = jsx:decode(Bridge), + } = emqx_json:decode(Bridge, [return_maps]), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), %% send an message to emqx and the message should be forwarded to the HTTP server @@ -251,7 +251,7 @@ t_http_crud_apis(Config) -> <<"node_status">> := [_ | _], <<"url">> := URL2 }, - jsx:decode(Bridge2) + emqx_json:decode(Bridge2, [return_maps]) ), %% list all bridges again, assert Bridge2 is in it @@ -269,7 +269,7 @@ t_http_crud_apis(Config) -> <<"url">> := URL2 } ], - jsx:decode(Bridge2Str) + emqx_json:decode(Bridge2Str, [return_maps]) ), %% get the bridge by id @@ -283,7 +283,7 @@ t_http_crud_apis(Config) -> <<"node_status">> := [_ | _], <<"url">> := URL2 }, - jsx:decode(Bridge3Str) + emqx_json:decode(Bridge3Str, [return_maps]) ), %% send an message to emqx again, check the path has been changed @@ -315,7 +315,7 @@ t_http_crud_apis(Config) -> <<"code">> := <<"NOT_FOUND">>, <<"message">> := _ }, - jsx:decode(ErrMsg2) + emqx_json:decode(ErrMsg2, [return_maps]) ), %% Deleting a non-existing bridge should result in an error {ok, 404, ErrMsg3} = request(delete, uri(["bridges", BridgeID]), []), @@ -324,7 +324,7 @@ t_http_crud_apis(Config) -> <<"code">> := <<"NOT_FOUND">>, <<"message">> := _ }, - jsx:decode(ErrMsg3) + emqx_json:decode(ErrMsg3, [return_maps]) ), ok. @@ -402,7 +402,7 @@ t_check_dependent_actions_on_delete(Config) -> <<"sql">> => <<"SELECT * from \"t\"">> } ), - #{<<"id">> := RuleId} = jsx:decode(Rule), + #{<<"id">> := RuleId} = emqx_json:decode(Rule, [return_maps]), %% delete the bridge should fail because there is a rule depenents on it {ok, 400, _} = request(delete, uri(["bridges", BridgeID]), []), %% delete the rule first @@ -437,7 +437,7 @@ t_cascade_delete_actions(Config) -> <<"sql">> => <<"SELECT * from \"t\"">> } ), - #{<<"id">> := RuleId} = jsx:decode(Rule), + #{<<"id">> := RuleId} = emqx_json:decode(Rule, [return_maps]), %% delete the bridge will also delete the actions from the rules {ok, 204, _} = request(delete, uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions", []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), @@ -446,7 +446,7 @@ t_cascade_delete_actions(Config) -> #{ <<"actions">> := [] }, - jsx:decode(Rule1) + emqx_json:decode(Rule1, [return_maps]) ), {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), ok. @@ -511,34 +511,39 @@ do_start_stop_bridges(Type, Config) -> <<"status">> := <<"connected">>, <<"node_status">> := [_ | _], <<"url">> := URL1 - } = jsx:decode(Bridge), + } = emqx_json:decode(Bridge, [return_maps]), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), %% stop it {ok, 204, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>), {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []), - ?assertMatch(#{<<"status">> := <<"stopped">>}, jsx:decode(Bridge2)), + ?assertMatch(#{<<"status">> := <<"stopped">>}, emqx_json:decode(Bridge2, [return_maps])), %% start again {ok, 204, <<>>} = request(post, operation_path(Type, start, BridgeID), <<"">>), {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), - ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)), + ?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge3, [return_maps])), %% start a started bridge {ok, 204, <<>>} = request(post, operation_path(Type, start, BridgeID), <<"">>), {ok, 200, Bridge3_1} = request(get, uri(["bridges", BridgeID]), []), - ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3_1)), + ?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge3_1, [return_maps])), %% restart an already started bridge {ok, 204, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), - ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)), + ?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge3, [return_maps])), %% stop it again {ok, 204, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>), %% restart a stopped bridge {ok, 204, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), {ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []), - ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge4)), + ?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge4, [return_maps])), %% delete the bridge {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + %% Fail parse-id check + {ok, 404, _} = request(post, operation_path(Type, start, <<"wreckbook_fugazi">>), <<"">>), + %% Looks ok but doesn't exist + {ok, 404, _} = request(post, operation_path(Type, start, <<"webhook:cptn_hook">>), <<"">>), + %% Create broken bridge {ListenPort, Sock} = listen_on_random_port(), %% Connecting to this endpoint should always timeout @@ -556,7 +561,7 @@ do_start_stop_bridges(Type, Config) -> <<"server">> := BadServer, <<"status">> := <<"connecting">>, <<"node_status">> := [_ | _] - } = jsx:decode(BadBridge1), + } = emqx_json:decode(BadBridge1, [return_maps]), BadBridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_MQTT, BadName), ?assertMatch( {ok, SC, _} when SC == 500 orelse SC == 503, @@ -585,24 +590,24 @@ t_enable_disable_bridges(Config) -> <<"status">> := <<"connected">>, <<"node_status">> := [_ | _], <<"url">> := URL1 - } = jsx:decode(Bridge), + } = emqx_json:decode(Bridge, [return_maps]), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), %% disable it {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), <<"">>), {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []), - ?assertMatch(#{<<"status">> := <<"stopped">>}, jsx:decode(Bridge2)), + ?assertMatch(#{<<"status">> := <<"stopped">>}, emqx_json:decode(Bridge2, [return_maps])), %% enable again {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>), {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), - ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)), + ?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge3, [return_maps])), %% enable an already started bridge {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>), {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), - ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge3)), + ?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge3, [return_maps])), %% disable it again {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), <<"">>), - {ok, 400, Res} = request(post, operation_path(node, restart, BridgeID), <<"">>), + {ok, 400, Res} = request(post, operation_path(node, start, BridgeID), <<"">>), ?assertEqual( <<"{\"code\":\"BAD_REQUEST\",\"message\":\"Forbidden operation, bridge not enabled\"}">>, Res @@ -611,7 +616,7 @@ t_enable_disable_bridges(Config) -> %% enable a stopped bridge {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>), {ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []), - ?assertMatch(#{<<"status">> := <<"connected">>}, jsx:decode(Bridge4)), + ?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge4, [return_maps])), %% delete the bridge {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []). @@ -636,7 +641,7 @@ t_reset_bridges(Config) -> <<"status">> := <<"connected">>, <<"node_status">> := [_ | _], <<"url">> := URL1 - } = jsx:decode(Bridge), + } = emqx_json:decode(Bridge, [return_maps]), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), {ok, 204, <<>>} = request(put, uri(["bridges", BridgeID, "metrics/reset"]), []), @@ -704,7 +709,7 @@ t_bridges_probe(Config) -> <<"code">> := <<"TEST_FAILED">>, <<"message">> := _ }, - jsx:decode(NxDomain) + emqx_json:decode(NxDomain, [return_maps]) ), {ok, 204, _} = request( @@ -723,7 +728,7 @@ t_bridges_probe(Config) -> <<"code">> := <<"TEST_FAILED">>, <<"message">> := <<"Connection refused">> }, - jsx:decode(ConnRefused) + emqx_json:decode(ConnRefused, [return_maps]) ), {ok, 400, HostNotFound} = request( @@ -736,7 +741,7 @@ t_bridges_probe(Config) -> <<"code">> := <<"TEST_FAILED">>, <<"message">> := <<"Host not found">> }, - jsx:decode(HostNotFound) + emqx_json:decode(HostNotFound, [return_maps]) ), AuthnConfig = #{ @@ -767,7 +772,7 @@ t_bridges_probe(Config) -> <<"code">> := <<"TEST_FAILED">>, <<"message">> := <<"Unauthorized client">> }, - jsx:decode(Unauthorized) + emqx_json:decode(Unauthorized, [return_maps]) ), {ok, 400, Malformed} = request( @@ -782,7 +787,7 @@ t_bridges_probe(Config) -> <<"code">> := <<"TEST_FAILED">>, <<"message">> := <<"Malformed username or password">> }, - jsx:decode(Malformed) + emqx_json:decode(Malformed, [return_maps]) ), {ok, 400, NotAuthorized} = request( @@ -795,7 +800,7 @@ t_bridges_probe(Config) -> <<"code">> := <<"TEST_FAILED">>, <<"message">> := <<"Not authorized">> }, - jsx:decode(NotAuthorized) + emqx_json:decode(NotAuthorized, [return_maps]) ), {ok, 400, BadReq} = request( @@ -803,7 +808,7 @@ t_bridges_probe(Config) -> uri(["bridges_probe"]), ?BRIDGE(<<"bad_bridge">>, <<"unknown_type">>) ), - ?assertMatch(#{<<"code">> := <<"BAD_REQUEST">>}, jsx:decode(BadReq)), + ?assertMatch(#{<<"code">> := <<"BAD_REQUEST">>}, emqx_json:decode(BadReq, [return_maps])), ok. t_metrics(Config) -> @@ -829,7 +834,7 @@ t_metrics(Config) -> <<"status">> := _, <<"node_status">> := [_ | _], <<"url">> := URL1 - } = jsx:decode(Bridge), + } = emqx_json:decode(Bridge, [return_maps]), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), @@ -840,12 +845,12 @@ t_metrics(Config) -> <<"metrics">> := #{<<"success">> := 0}, <<"node_metrics">> := [_ | _] }, - jsx:decode(Bridge1Str) + emqx_json:decode(Bridge1Str, [return_maps]) ), %% check that the bridge doesn't contain metrics anymore {ok, 200, Bridge2Str} = request(get, uri(["bridges", BridgeID]), []), - Decoded = jsx:decode(Bridge2Str), + Decoded = emqx_json:decode(Bridge2Str, [return_maps]), ?assertNot(maps:is_key(<<"metrics">>, Decoded)), ?assertNot(maps:is_key(<<"node_metrics">>, Decoded)), @@ -875,7 +880,7 @@ t_metrics(Config) -> <<"metrics">> := #{<<"success">> := _}, <<"node_metrics">> := [_ | _] }, - jsx:decode(Bridge3Str) + emqx_json:decode(Bridge3Str, [return_maps]) ), %% check for non-empty metrics when listing all bridges @@ -887,7 +892,7 @@ t_metrics(Config) -> <<"node_metrics">> := [_ | _] } ], - jsx:decode(BridgesStr) + emqx_json:decode(BridgesStr, [return_maps]) ), ok. From a325133391e8662d9f4f4c52b3cecd04ad1ad4f5 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Wed, 8 Mar 2023 16:33:09 +0100 Subject: [PATCH 04/12] fix(emqx_bridge): don't crash checking if bridge enabled --- apps/emqx_bridge/src/emqx_bridge_api.erl | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 6ad116cd3..1ae984ea4 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -655,8 +655,7 @@ lookup_from_local_node(BridgeType, BridgeName) -> invalid -> ?NOT_FOUND(<<"Invalid operation: ", Op/binary>>); OperFunc -> - ConfMap = emqx:get_config([bridges, BridgeType, BridgeName]), - case maps:get(enable, ConfMap, false) of + try is_enabled_bridge(BridgeType, BridgeName) of false -> ?BAD_REQUEST(<<"Forbidden operation, bridge not enabled">>); true -> @@ -668,10 +667,22 @@ lookup_from_local_node(BridgeType, BridgeName) -> {error, _} -> ?NOT_FOUND(<<"Invalid node name: ", Node/binary>>) end + catch + throw:not_found -> + ?BRIDGE_NOT_FOUND(BridgeType, BridgeName) end end ). +is_enabled_bridge(BridgeType, BridgeName) -> + try emqx:get_config([bridges, BridgeType, BridgeName]) of + ConfMap -> + maps:get(enable, ConfMap, false) + catch + error:{config_not_found, _} -> + throw(not_found) + end. + node_operation_func(<<"restart">>) -> restart_bridge_to_node; node_operation_func(<<"start">>) -> start_bridge_to_node; node_operation_func(<<"stop">>) -> stop_bridge_to_node; From 1bcc5623ed8c65b95f0072020901ce53c534b137 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Wed, 8 Mar 2023 16:50:41 +0100 Subject: [PATCH 05/12] fix(emqx_bridge): check if bridge enabled before calling op --- apps/emqx_bridge/src/emqx_bridge_api.erl | 18 +++++++++++++++--- .../emqx_bridge/test/emqx_bridge_api_SUITE.erl | 1 + 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 1ae984ea4..54e8dea07 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -48,6 +48,10 @@ -define(BAD_REQUEST(Reason), {400, error_msg('BAD_REQUEST', Reason)}). +-define(BRIDGE_NOT_ENABLED, + ?BAD_REQUEST(<<"Forbidden operation, bridge not enabled">>) +). + -define(NOT_FOUND(Reason), {404, error_msg('NOT_FOUND', Reason)}). -define(BRIDGE_NOT_FOUND(BridgeType, BridgeName), @@ -640,8 +644,16 @@ lookup_from_local_node(BridgeType, BridgeName) -> invalid -> ?NOT_FOUND(<<"Invalid operation: ", Op/binary>>); OperFunc -> - Nodes = mria:running_nodes(), - call_operation(all, OperFunc, [Nodes, BridgeType, BridgeName]) + try is_enabled_bridge(BridgeType, BridgeName) of + false -> + ?BRIDGE_NOT_ENABLED; + true -> + Nodes = mria:running_nodes(), + call_operation(all, OperFunc, [Nodes, BridgeType, BridgeName]) + catch + throw:not_found -> + ?BRIDGE_NOT_FOUND(BridgeType, BridgeName) + end end ). @@ -657,7 +669,7 @@ lookup_from_local_node(BridgeType, BridgeName) -> OperFunc -> try is_enabled_bridge(BridgeType, BridgeName) of false -> - ?BAD_REQUEST(<<"Forbidden operation, bridge not enabled">>); + ?BRIDGE_NOT_ENABLED; true -> case emqx_misc:safe_to_existing_atom(Node, utf8) of {ok, TargetNode} -> diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 8feb2bcc6..68e612cb3 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -612,6 +612,7 @@ t_enable_disable_bridges(Config) -> <<"{\"code\":\"BAD_REQUEST\",\"message\":\"Forbidden operation, bridge not enabled\"}">>, Res ), + {ok, 400, Res} = request(post, operation_path(cluster, start, BridgeID), <<"">>), %% enable a stopped bridge {ok, 204, <<>>} = request(put, enable_path(true, BridgeID), <<"">>), From cf73aacd7bff083c19bf8e86245a743910fce06f Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Thu, 9 Mar 2023 09:52:07 +0100 Subject: [PATCH 06/12] chore: add changelog --- changes/ce/fix-10107.en.md | 9 +++++++++ changes/ce/fix-10107.zh.md | 8 ++++++++ 2 files changed, 17 insertions(+) create mode 100644 changes/ce/fix-10107.en.md create mode 100644 changes/ce/fix-10107.zh.md diff --git a/changes/ce/fix-10107.en.md b/changes/ce/fix-10107.en.md new file mode 100644 index 000000000..ee972faf3 --- /dev/null +++ b/changes/ce/fix-10107.en.md @@ -0,0 +1,9 @@ +For operations on `bridges API` if `bridge-id` is unknown we now return `404` +instead of `400`. Also a bug was fixed that caused a crash if that was a node +operation. Additionally we now also check if the given bridge is enabled when +doing the cluster operation `start` . Affected endpoints: + * [cluster] `/bridges/:id/:operation`, + * [node] `/nodes/:node/bridges/:id/:operation`, where `operation` is one of +`[start|stop|restart]`. +Moreover, for a node operation, we check if node name is in our cluster and +return `404` instead of `501`. diff --git a/changes/ce/fix-10107.zh.md b/changes/ce/fix-10107.zh.md new file mode 100644 index 000000000..71695096e --- /dev/null +++ b/changes/ce/fix-10107.zh.md @@ -0,0 +1,8 @@ +现在对桥接的 API 进行调用时,如果 `bridge-id` 不存在,将会返回 `404`,而不再是`400`。 +然后,还修复了这种情况下,在节点级别上进行 API 调用时,可能导致崩溃的问题。 +另外,在启动某个桥接时,会先检查指定桥接是否已启用。 +受影响的接口有: + * [cluster] `/bridges/:id/:operation`, + * [node] `/nodes/:node/bridges/:id/:operation`, +其中 `operation` 是 `[start|stop|restart]` 之一。 +此外,对于节点操作,我们将检查节点是否存在于集群中,如果不在,则会返回`404`,而不再是`501`。 From 7124600a71c78514ca599e0a96841ac6914f54f8 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Thu, 9 Mar 2023 10:22:21 +0100 Subject: [PATCH 07/12] docs(emqx_bridge): add `start` as operation to bridge --- apps/emqx_bridge/src/emqx_bridge_api.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 54e8dea07..525cec0b8 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -104,11 +104,11 @@ get_response_body_schema() -> param_path_operation_cluster() -> {operation, mk( - enum([stop, restart]), + enum([start, stop, restart]), #{ in => path, required => true, - example => <<"restart">>, + example => <<"start">>, desc => ?DESC("desc_param_path_operation_cluster") } )}. @@ -116,11 +116,11 @@ param_path_operation_cluster() -> param_path_operation_on_node() -> {operation, mk( - enum([stop, restart]), + enum([start, stop, restart]), #{ in => path, required => true, - example => <<"stop">>, + example => <<"start">>, desc => ?DESC("desc_param_path_operation_on_node") } )}. From 71ec77a2f27bafa3110cc6d819b56ab418305001 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Thu, 9 Mar 2023 15:12:45 +0100 Subject: [PATCH 08/12] fix(emqx_bridge): for node operation check if node is part of cluster This fixes the case where we returned `501 NOT IMPLEMENTED` in the past. --- apps/emqx_bridge/src/emqx_bridge_api.erl | 11 +++++++++-- apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl | 12 ++++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 525cec0b8..fed24d660 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -453,7 +453,7 @@ schema("/nodes/:node/bridges/:id/:operation") -> 'BAD_REQUEST', "Problem with configuration of external service or bridge not enabled" ), - 404 => error_schema('NOT_FOUND', "Bridge not found or invalid operation"), + 404 => error_schema('NOT_FOUND', "Bridge or node not found or invalid operation"), 501 => error_schema('NOT_IMPLEMENTED', "Not Implemented"), 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") } @@ -968,6 +968,8 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, BridgeType, BridgeName]) -> )}; {error, not_found} -> ?BRIDGE_NOT_FOUND(BridgeType, BridgeName); + {error, {node_not_found, Node}} -> + ?NOT_FOUND(<<"Node not found: ", (atom_to_binary(Node))/binary>>); {error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' -> ?BAD_REQUEST(to_hr_reason(Reason)) end. @@ -984,7 +986,12 @@ do_bpapi_call(all, Call, Args) -> do_bpapi_call_vsn(emqx_bpapi:supported_version(emqx_bridge), Call, Args) ); do_bpapi_call(Node, Call, Args) -> - do_bpapi_call_vsn(emqx_bpapi:supported_version(Node, emqx_bridge), Call, Args). + case lists:member(Node, mria:running_nodes()) of + true -> + do_bpapi_call_vsn(emqx_bpapi:supported_version(Node, emqx_bridge), Call, Args); + false -> + {error, {node_not_found, Node}} + end. do_bpapi_call_vsn(SupportedVersion, Call, Args) -> case lists:member(SupportedVersion, supported_versions(Call)) of diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 68e612cb3..f56bbec8e 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -486,6 +486,18 @@ t_old_bpapi_vsn(Config) -> ok. t_start_stop_bridges_node(Config) -> + {ok, 404, _} = + request( + post, + uri(["nodes", "thisbetterbenotanatomyet", "bridges", "webhook:foo", start]), + <<"">> + ), + {ok, 404, _} = + request( + post, + uri(["nodes", "undefined", "bridges", "webhook:foo", start]), + <<"">> + ), do_start_stop_bridges(node, Config). t_start_stop_bridges_cluster(Config) -> From dba95ec0facdc3dfa2af9502d6fc6fe42da5bb1a Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Thu, 9 Mar 2023 15:20:11 +0100 Subject: [PATCH 09/12] style(emqx_bridge): fix wording --- apps/emqx_bridge/src/emqx_bridge_api.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index fed24d660..5cf92582a 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -361,7 +361,7 @@ schema("/bridges/:id") -> 204 => <<"Bridge deleted">>, 400 => error_schema( 'BAD_REQUEST', - "Can not delete bridge while active rules defined for this bridge" + "Cannot delete bridge while active rules are defined for this bridge" ), 404 => error_schema('NOT_FOUND', "Bridge not found"), 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") @@ -533,7 +533,7 @@ schema("/bridges_probe") -> 204; {error, {rules_deps_on_this_bridge, RuleIds}} -> ?BAD_REQUEST( - {<<"Can not delete bridge while active rules defined for this bridge">>, + {<<"Cannot delete bridge while active rules are defined for this bridge">>, RuleIds} ); {error, timeout} -> From 478601ee4113e8f661fddaa7172eb488aea5650b Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Thu, 9 Mar 2023 15:17:32 +0100 Subject: [PATCH 10/12] fix(emqx_bridge): remove unreachable code paths --- apps/emqx_bridge/src/emqx_bridge_api.erl | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 5cf92582a..30671b2bb 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -478,17 +478,13 @@ schema("/bridges_probe") -> }. '/bridges'(post, #{body := #{<<"type">> := BridgeType, <<"name">> := BridgeName} = Conf0}) -> - Conf = filter_out_request_body(Conf0), case emqx_bridge:lookup(BridgeType, BridgeName) of {ok, _} -> {400, error_msg('ALREADY_EXISTS', <<"bridge already exists">>)}; {error, not_found} -> - case emqx_bridge:create(BridgeType, BridgeName, Conf) of - {ok, _} -> - lookup_from_all_nodes(BridgeType, BridgeName, 201); - {error, Reason} -> - ?BAD_REQUEST(Reason) - end + Conf = filter_out_request_body(Conf0), + {ok, _} = emqx_bridge:create(BridgeType, BridgeName, Conf), + lookup_from_all_nodes(BridgeType, BridgeName, 201) end; '/bridges'(get, _Params) -> {200, @@ -507,12 +503,8 @@ schema("/bridges_probe") -> {ok, _} -> RawConf = emqx:get_raw_config([bridges, BridgeType, BridgeName], #{}), Conf = deobfuscate(Conf1, RawConf), - case emqx_bridge:create(BridgeType, BridgeName, Conf) of - {ok, _} -> - lookup_from_all_nodes(BridgeType, BridgeName, 200); - {error, Reason} -> - ?BAD_REQUEST(Reason) - end; + {ok, _} = emqx_bridge:create(BridgeType, BridgeName, Conf), + lookup_from_all_nodes(BridgeType, BridgeName, 200); {error, not_found} -> ?BRIDGE_NOT_FOUND(BridgeType, BridgeName) end From c1adf0de1f99fc797710497e6db892597148369a Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Thu, 9 Mar 2023 15:17:47 +0100 Subject: [PATCH 11/12] test(emqx_bridge): increase coverage by adding common edge cases --- .../test/emqx_bridge_api_SUITE.erl | 52 +++++++++++++++++-- 1 file changed, 49 insertions(+), 3 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index f56bbec8e..8b388a771 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -196,6 +196,9 @@ t_http_crud_apis(Config) -> %% assert we there's no bridges at first {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + {ok, 404, _} = request(get, uri(["bridges", "foo"]), []), + {ok, 404, _} = request(get, uri(["bridges", "webhook:foo"]), []), + %% then we add a webhook bridge, using POST %% POST /bridges/ will create a bridge URL1 = ?URL(Port, "path1"), @@ -317,6 +320,17 @@ t_http_crud_apis(Config) -> }, emqx_json:decode(ErrMsg2, [return_maps]) ), + + %% try delete bad bridge id + {ok, 404, BadId} = request(delete, uri(["bridges", "foo"]), []), + ?assertMatch( + #{ + <<"code">> := <<"NOT_FOUND">>, + <<"message">> := <<"Invalid bridge ID", _/binary>> + }, + emqx_json:decode(BadId, [return_maps]) + ), + %% Deleting a non-existing bridge should result in an error {ok, 404, ErrMsg3} = request(delete, uri(["bridges", BridgeID]), []), ?assertMatch( @@ -403,13 +417,16 @@ t_check_dependent_actions_on_delete(Config) -> } ), #{<<"id">> := RuleId} = emqx_json:decode(Rule, [return_maps]), - %% delete the bridge should fail because there is a rule depenents on it - {ok, 400, _} = request(delete, uri(["bridges", BridgeID]), []), + %% deleting the bridge should fail because there is a rule that depends on it + {ok, 400, _} = request( + delete, uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions=false", [] + ), %% delete the rule first {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), %% then delete the bridge is OK {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), + ok. t_cascade_delete_actions(Config) -> @@ -439,7 +456,9 @@ t_cascade_delete_actions(Config) -> ), #{<<"id">> := RuleId} = emqx_json:decode(Rule, [return_maps]), %% delete the bridge will also delete the actions from the rules - {ok, 204, _} = request(delete, uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions", []), + {ok, 204, _} = request( + delete, uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions=true", [] + ), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), {ok, 200, Rule1} = request(get, uri(["rules", RuleId]), []), ?assertMatch( @@ -449,6 +468,25 @@ t_cascade_delete_actions(Config) -> emqx_json:decode(Rule1, [return_maps]) ), {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), + + {ok, 201, _} = request( + post, + uri(["bridges"]), + ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name) + ), + {ok, 201, _} = request( + post, + uri(["rules"]), + #{ + <<"name">> => <<"t_http_crud_apis">>, + <<"enable">> => true, + <<"actions">> => [BridgeID], + <<"sql">> => <<"SELECT * from \"t\"">> + } + ), + + {ok, 204, _} = request(delete, uri(["bridges", BridgeID]) ++ "?also_delete_dep_actions", []), + {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), ok. t_broken_bpapi_vsn(Config) -> @@ -547,6 +585,9 @@ do_start_stop_bridges(Type, Config) -> {ok, 204, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), {ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch(#{<<"status">> := <<"connected">>}, emqx_json:decode(Bridge4, [return_maps])), + + {ok, 404, _} = request(post, operation_path(Type, invalidop, BridgeID), <<"">>), + %% delete the bridge {ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), @@ -619,6 +660,11 @@ t_enable_disable_bridges(Config) -> %% disable it again {ok, 204, <<>>} = request(put, enable_path(false, BridgeID), <<"">>), + %% bad param + {ok, 404, _} = request(put, enable_path(foo, BridgeID), <<"">>), + {ok, 404, _} = request(put, enable_path(true, "foo"), <<"">>), + {ok, 404, _} = request(put, enable_path(true, "webhook:foo"), <<"">>), + {ok, 400, Res} = request(post, operation_path(node, start, BridgeID), <<"">>), ?assertEqual( <<"{\"code\":\"BAD_REQUEST\",\"message\":\"Forbidden operation, bridge not enabled\"}">>, From 18c3b9bb1cd21720f61807b68662fe22138250e2 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Mon, 13 Mar 2023 11:14:59 +0100 Subject: [PATCH 12/12] style: fix wording --- changes/ce/fix-10107.en.md | 2 +- changes/ce/fix-10107.zh.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/changes/ce/fix-10107.en.md b/changes/ce/fix-10107.en.md index ee972faf3..1bcbbad60 100644 --- a/changes/ce/fix-10107.en.md +++ b/changes/ce/fix-10107.en.md @@ -5,5 +5,5 @@ doing the cluster operation `start` . Affected endpoints: * [cluster] `/bridges/:id/:operation`, * [node] `/nodes/:node/bridges/:id/:operation`, where `operation` is one of `[start|stop|restart]`. -Moreover, for a node operation, we check if node name is in our cluster and +Moreover, for a node operation, EMQX checks if node name is in our cluster and return `404` instead of `501`. diff --git a/changes/ce/fix-10107.zh.md b/changes/ce/fix-10107.zh.md index 71695096e..e541a834f 100644 --- a/changes/ce/fix-10107.zh.md +++ b/changes/ce/fix-10107.zh.md @@ -5,4 +5,4 @@ * [cluster] `/bridges/:id/:operation`, * [node] `/nodes/:node/bridges/:id/:operation`, 其中 `operation` 是 `[start|stop|restart]` 之一。 -此外,对于节点操作,我们将检查节点是否存在于集群中,如果不在,则会返回`404`,而不再是`501`。 +此外,对于节点操作,EMQX 将检查节点是否存在于集群中,如果不在,则会返回`404`,而不再是`501`。