From a3fd0897bc23ea1ef06b9d904ffc657f250fd38e Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Wed, 8 Feb 2023 13:39:45 +0100 Subject: [PATCH] refactor: less code duplication --- apps/emqx_bridge/src/emqx_bridge_api.erl | 116 ++++++++---------- .../test/emqx_bridge_api_SUITE.erl | 50 ++++++-- 2 files changed, 89 insertions(+), 77 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index ebb529904..04760f021 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -609,12 +609,12 @@ lookup_from_local_node(BridgeType, BridgeName) -> }) -> ?TRY_PARSE_ID( Id, - case operation_func(Op) of + case operation_to_all_func(Op) of invalid -> {400, error_msg('BAD_REQUEST', <<"invalid operation">>)}; OperFunc -> Nodes = mria_mnesia:running_nodes(), - operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) + call_operation(all, OperFunc, [Nodes, BridgeType, BridgeName]) end ). @@ -637,7 +637,14 @@ lookup_from_local_node(BridgeType, BridgeName) -> <<"forbidden operation: bridge disabled">> )}; true -> - call_operation(Node, OperFunc, BridgeType, BridgeName) + case emqx_misc:safe_to_existing_atom(Node, utf8) of + {ok, TargetNode} -> + call_operation(TargetNode, OperFunc, [ + TargetNode, BridgeType, BridgeName + ]); + {error, _} -> + {400, error_msg('INVALID_NODE', <<"invalid node">>)} + end end end ). @@ -647,40 +654,15 @@ node_operation_func(<<"start">>) -> start_bridge_to_node; node_operation_func(<<"stop">>) -> stop_bridge_to_node; node_operation_func(_) -> invalid. -operation_func(<<"restart">>) -> restart; -operation_func(<<"start">>) -> start; -operation_func(<<"stop">>) -> stop; -operation_func(_) -> invalid. +operation_to_all_func(<<"restart">>) -> restart_bridges_to_all_nodes; +operation_to_all_func(<<"start">>) -> start_bridges_to_all_nodes; +operation_to_all_func(<<"stop">>) -> stop_bridges_to_all_nodes; +operation_to_all_func(_) -> invalid. enable_func(<<"true">>) -> enable; enable_func(<<"false">>) -> disable; enable_func(_) -> invalid. -operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) -> - RpcFunc = - case OperFunc of - restart -> restart_bridges_to_all_nodes; - start -> start_bridges_to_all_nodes; - stop -> stop_bridges_to_all_nodes - end, - case is_ok(do_bpapi_call(RpcFunc, [Nodes, BridgeType, BridgeName])) of - {ok, _} -> - {204}; - {error, not_implemented} -> - %% As of now this can only happen when we call 'start' on nodes - %% that run on an older proto version. - maybe_try_restart(Nodes, OperFunc, BridgeType, BridgeName); - {error, [timeout | _]} -> - {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; - {error, ErrL} -> - {500, error_msg('INTERNAL_ERROR', ErrL)} - end. - -maybe_try_restart(Nodes, start, BridgeType, BridgeName) -> - operation_to_all_nodes(Nodes, restart, BridgeType, BridgeName); -maybe_try_restart(_, _, _, _) -> - {501}. - ensure_bridge_created(BridgeType, BridgeName, Conf) -> case emqx_bridge:create(BridgeType, BridgeName, Conf) of {ok, _} -> ok; @@ -872,6 +854,10 @@ unpack_bridge_conf(Type, PackedConf) -> #{<<"foo">> := RawConf} = maps:get(bin(Type), Bridges), RawConf. +is_ok(ok) -> + ok; +is_ok({ok, _} = OkResult) -> + OkResult; is_ok(Error = {error, _}) -> Error; is_ok(ResL) -> @@ -912,44 +898,42 @@ bin(S) when is_atom(S) -> bin(S) when is_binary(S) -> S. -call_operation(Node, OperFunc, BridgeType, BridgeName) -> - case emqx_misc:safe_to_existing_atom(Node, utf8) of - {ok, TargetNode} -> - case do_bpapi_call(TargetNode, OperFunc, [TargetNode, BridgeType, BridgeName]) of - ok -> - {204}; - {error, not_implemented} -> - %% Should only happen if we call `start` on a node that is - %% still on an older bpapi version that doesn't support it. - maybe_try_restart_node(Node, OperFunc, BridgeType, BridgeName); - {error, timeout} -> - {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; - {error, {start_pool_failed, Name, Reason}} -> - {503, - error_msg( - 'SERVICE_UNAVAILABLE', - bin( - io_lib:format( - "failed to start ~p pool for reason ~p", - [Name, Reason] - ) - ) - )}; - {error, Reason} -> - {500, error_msg('INTERNAL_ERROR', Reason)} - end; - {error, _} -> - {400, error_msg('INVALID_NODE', <<"invalid node">>)} +call_operation(NodeOrAll, OperFunc, Args) -> + case is_ok(do_bpapi_call(NodeOrAll, OperFunc, Args)) of + ok -> + {204}; + {ok, _} -> + {204}; + {error, not_implemented} -> + %% Should only happen if we call `start` on a node that is + %% still on an older bpapi version that doesn't support it. + maybe_try_restart(NodeOrAll, OperFunc, Args); + {error, timeout} -> + {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; + {error, {start_pool_failed, Name, Reason}} -> + {503, + error_msg( + 'SERVICE_UNAVAILABLE', + bin( + io_lib:format( + "failed to start ~p pool for reason ~p", + [Name, Reason] + ) + ) + )}; + {error, Reason} -> + {500, error_msg('INTERNAL_ERROR', Reason)} end. -maybe_try_restart_node(Node, start_bridge_to_node, BridgeType, BridgeName) -> - call_operation(Node, restart_bridge_to_node, BridgeType, BridgeName); -maybe_try_restart_node(_, _, _, _) -> +maybe_try_restart(all, start_bridges_to_all_nodes, Args) -> + call_operation(all, restart_bridges_to_all_nodes, Args); +maybe_try_restart(Node, start_bridge_to_node, Args) -> + call_operation(Node, restart_bridge_to_node, Args); +maybe_try_restart(_, _, _) -> {501}. -do_bpapi_call(Call, Args) -> - do_bpapi_call_vsn(emqx_bpapi:supported_version(emqx_bridge), Call, Args). - +do_bpapi_call(all, Call, Args) -> + do_bpapi_call_vsn(emqx_bpapi:supported_version(emqx_bridge), Call, Args); do_bpapi_call(Node, Call, Args) -> do_bpapi_call_vsn(emqx_bpapi:supported_version(Node, emqx_bridge), Call, Args). diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index bf09cf7f3..d6e8708ff 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -82,19 +82,27 @@ end_per_suite(_Config) -> emqx_mgmt_api_test_util:end_suite([emqx_rule_engine, emqx_bridge]), ok. -init_per_testcase(t_bad_bpapi_vsn, Config) -> +init_per_testcase(t_broken_bpapi_vsn, Config) -> + meck:new(emqx_bpapi, [passthrough]), + meck:expect(emqx_bpapi, supported_version, 1, -1), + meck:expect(emqx_bpapi, supported_version, 2, -1), + init_per_testcase(commong, Config); +init_per_testcase(t_old_bpapi_vsn, Config) -> meck:new(emqx_bpapi, [passthrough]), meck:expect(emqx_bpapi, supported_version, 1, 1), meck:expect(emqx_bpapi, supported_version, 2, 1), - init_per_testcase(commong, Config); + init_per_testcase(common, Config); init_per_testcase(_, Config) -> {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), {Port, Sock, Acceptor} = start_http_server(fun handle_fun_200_ok/2), [{port, Port}, {sock, Sock}, {acceptor, Acceptor} | Config]. -end_per_testcase(t_bad_bpapi_vsn, Config) -> +end_per_testcase(t_broken_bpapi_vsn, Config) -> meck:unload([emqx_bpapi]), - end_per_testcase(commong, Config); + end_per_testcase(common, Config); +end_per_testcase(t_old_bpapi_vsn, Config) -> + meck:unload([emqx_bpapi]), + end_per_testcase(common, Config); end_per_testcase(_, Config) -> Sock = ?config(sock, Config), Acceptor = ?config(acceptor, Config), @@ -442,13 +450,7 @@ t_cascade_delete_actions(Config) -> {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), ok. -t_start_stop_bridges_node(Config) -> - do_start_stop_bridges(node, Config). - -t_start_stop_bridges_cluster(Config) -> - do_start_stop_bridges(cluster, Config). - -t_bad_bpapi_vsn(Config) -> +t_broken_bpapi_vsn(Config) -> Port = ?config(port, Config), URL1 = ?URL(Port, "abc"), Name = <<"t_bad_bpapi_vsn">>, @@ -458,10 +460,36 @@ t_bad_bpapi_vsn(Config) -> ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name) ), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), + %% still works since we redirect to 'restart' {ok, 501, <<>>} = request(post, operation_path(cluster, start, BridgeID), <<"">>), {ok, 501, <<>>} = request(post, operation_path(node, start, BridgeID), <<"">>), ok. +t_old_bpapi_vsn(Config) -> + Port = ?config(port, Config), + URL1 = ?URL(Port, "abc"), + Name = <<"t_bad_bpapi_vsn">>, + {ok, 201, _Bridge} = request( + post, + uri(["bridges"]), + ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name) + ), + BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), + {ok, 204, <<>>} = request(post, operation_path(cluster, stop, BridgeID), <<"">>), + {ok, 204, <<>>} = request(post, operation_path(node, stop, BridgeID), <<"">>), + %% still works since we redirect to 'restart' + {ok, 204, <<>>} = request(post, operation_path(cluster, start, BridgeID), <<"">>), + {ok, 204, <<>>} = request(post, operation_path(node, start, BridgeID), <<"">>), + {ok, 204, <<>>} = request(post, operation_path(cluster, restart, BridgeID), <<"">>), + {ok, 204, <<>>} = request(post, operation_path(node, restart, BridgeID), <<"">>), + ok. + +t_start_stop_bridges_node(Config) -> + do_start_stop_bridges(node, Config). + +t_start_stop_bridges_cluster(Config) -> + do_start_stop_bridges(cluster, Config). + do_start_stop_bridges(Type, Config) -> %% assert we there's no bridges at first {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),