refactor: less code duplication

This commit is contained in:
Stefan Strigler 2023-02-08 13:39:45 +01:00
parent 502f62e18d
commit a3fd0897bc
2 changed files with 89 additions and 77 deletions

View File

@ -609,12 +609,12 @@ lookup_from_local_node(BridgeType, BridgeName) ->
}) -> }) ->
?TRY_PARSE_ID( ?TRY_PARSE_ID(
Id, Id,
case operation_func(Op) of case operation_to_all_func(Op) of
invalid -> invalid ->
{400, error_msg('BAD_REQUEST', <<"invalid operation">>)}; {400, error_msg('BAD_REQUEST', <<"invalid operation">>)};
OperFunc -> OperFunc ->
Nodes = mria_mnesia:running_nodes(), Nodes = mria_mnesia:running_nodes(),
operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) call_operation(all, OperFunc, [Nodes, BridgeType, BridgeName])
end end
). ).
@ -637,7 +637,14 @@ lookup_from_local_node(BridgeType, BridgeName) ->
<<"forbidden operation: bridge disabled">> <<"forbidden operation: bridge disabled">>
)}; )};
true -> true ->
call_operation(Node, OperFunc, BridgeType, BridgeName) case emqx_misc:safe_to_existing_atom(Node, utf8) of
{ok, TargetNode} ->
call_operation(TargetNode, OperFunc, [
TargetNode, BridgeType, BridgeName
]);
{error, _} ->
{400, error_msg('INVALID_NODE', <<"invalid node">>)}
end
end end
end end
). ).
@ -647,40 +654,15 @@ node_operation_func(<<"start">>) -> start_bridge_to_node;
node_operation_func(<<"stop">>) -> stop_bridge_to_node; node_operation_func(<<"stop">>) -> stop_bridge_to_node;
node_operation_func(_) -> invalid. node_operation_func(_) -> invalid.
operation_func(<<"restart">>) -> restart; operation_to_all_func(<<"restart">>) -> restart_bridges_to_all_nodes;
operation_func(<<"start">>) -> start; operation_to_all_func(<<"start">>) -> start_bridges_to_all_nodes;
operation_func(<<"stop">>) -> stop; operation_to_all_func(<<"stop">>) -> stop_bridges_to_all_nodes;
operation_func(_) -> invalid. operation_to_all_func(_) -> invalid.
enable_func(<<"true">>) -> enable; enable_func(<<"true">>) -> enable;
enable_func(<<"false">>) -> disable; enable_func(<<"false">>) -> disable;
enable_func(_) -> invalid. enable_func(_) -> invalid.
operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) ->
RpcFunc =
case OperFunc of
restart -> restart_bridges_to_all_nodes;
start -> start_bridges_to_all_nodes;
stop -> stop_bridges_to_all_nodes
end,
case is_ok(do_bpapi_call(RpcFunc, [Nodes, BridgeType, BridgeName])) of
{ok, _} ->
{204};
{error, not_implemented} ->
%% As of now this can only happen when we call 'start' on nodes
%% that run on an older proto version.
maybe_try_restart(Nodes, OperFunc, BridgeType, BridgeName);
{error, [timeout | _]} ->
{503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
{error, ErrL} ->
{500, error_msg('INTERNAL_ERROR', ErrL)}
end.
maybe_try_restart(Nodes, start, BridgeType, BridgeName) ->
operation_to_all_nodes(Nodes, restart, BridgeType, BridgeName);
maybe_try_restart(_, _, _, _) ->
{501}.
ensure_bridge_created(BridgeType, BridgeName, Conf) -> ensure_bridge_created(BridgeType, BridgeName, Conf) ->
case emqx_bridge:create(BridgeType, BridgeName, Conf) of case emqx_bridge:create(BridgeType, BridgeName, Conf) of
{ok, _} -> ok; {ok, _} -> ok;
@ -872,6 +854,10 @@ unpack_bridge_conf(Type, PackedConf) ->
#{<<"foo">> := RawConf} = maps:get(bin(Type), Bridges), #{<<"foo">> := RawConf} = maps:get(bin(Type), Bridges),
RawConf. RawConf.
is_ok(ok) ->
ok;
is_ok({ok, _} = OkResult) ->
OkResult;
is_ok(Error = {error, _}) -> is_ok(Error = {error, _}) ->
Error; Error;
is_ok(ResL) -> is_ok(ResL) ->
@ -912,44 +898,42 @@ bin(S) when is_atom(S) ->
bin(S) when is_binary(S) -> bin(S) when is_binary(S) ->
S. S.
call_operation(Node, OperFunc, BridgeType, BridgeName) -> call_operation(NodeOrAll, OperFunc, Args) ->
case emqx_misc:safe_to_existing_atom(Node, utf8) of case is_ok(do_bpapi_call(NodeOrAll, OperFunc, Args)) of
{ok, TargetNode} -> ok ->
case do_bpapi_call(TargetNode, OperFunc, [TargetNode, BridgeType, BridgeName]) of {204};
ok -> {ok, _} ->
{204}; {204};
{error, not_implemented} -> {error, not_implemented} ->
%% Should only happen if we call `start` on a node that is %% Should only happen if we call `start` on a node that is
%% still on an older bpapi version that doesn't support it. %% still on an older bpapi version that doesn't support it.
maybe_try_restart_node(Node, OperFunc, BridgeType, BridgeName); maybe_try_restart(NodeOrAll, OperFunc, Args);
{error, timeout} -> {error, timeout} ->
{503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)}; {503, error_msg('SERVICE_UNAVAILABLE', <<"request timeout">>)};
{error, {start_pool_failed, Name, Reason}} -> {error, {start_pool_failed, Name, Reason}} ->
{503, {503,
error_msg( error_msg(
'SERVICE_UNAVAILABLE', 'SERVICE_UNAVAILABLE',
bin( bin(
io_lib:format( io_lib:format(
"failed to start ~p pool for reason ~p", "failed to start ~p pool for reason ~p",
[Name, Reason] [Name, Reason]
) )
) )
)}; )};
{error, Reason} -> {error, Reason} ->
{500, error_msg('INTERNAL_ERROR', Reason)} {500, error_msg('INTERNAL_ERROR', Reason)}
end;
{error, _} ->
{400, error_msg('INVALID_NODE', <<"invalid node">>)}
end. end.
maybe_try_restart_node(Node, start_bridge_to_node, BridgeType, BridgeName) -> maybe_try_restart(all, start_bridges_to_all_nodes, Args) ->
call_operation(Node, restart_bridge_to_node, BridgeType, BridgeName); call_operation(all, restart_bridges_to_all_nodes, Args);
maybe_try_restart_node(_, _, _, _) -> maybe_try_restart(Node, start_bridge_to_node, Args) ->
call_operation(Node, restart_bridge_to_node, Args);
maybe_try_restart(_, _, _) ->
{501}. {501}.
do_bpapi_call(Call, Args) -> do_bpapi_call(all, Call, Args) ->
do_bpapi_call_vsn(emqx_bpapi:supported_version(emqx_bridge), Call, Args). do_bpapi_call_vsn(emqx_bpapi:supported_version(emqx_bridge), Call, Args);
do_bpapi_call(Node, Call, Args) -> do_bpapi_call(Node, Call, Args) ->
do_bpapi_call_vsn(emqx_bpapi:supported_version(Node, emqx_bridge), Call, Args). do_bpapi_call_vsn(emqx_bpapi:supported_version(Node, emqx_bridge), Call, Args).

View File

@ -82,19 +82,27 @@ end_per_suite(_Config) ->
emqx_mgmt_api_test_util:end_suite([emqx_rule_engine, emqx_bridge]), emqx_mgmt_api_test_util:end_suite([emqx_rule_engine, emqx_bridge]),
ok. ok.
init_per_testcase(t_bad_bpapi_vsn, Config) -> init_per_testcase(t_broken_bpapi_vsn, Config) ->
meck:new(emqx_bpapi, [passthrough]),
meck:expect(emqx_bpapi, supported_version, 1, -1),
meck:expect(emqx_bpapi, supported_version, 2, -1),
init_per_testcase(commong, Config);
init_per_testcase(t_old_bpapi_vsn, Config) ->
meck:new(emqx_bpapi, [passthrough]), meck:new(emqx_bpapi, [passthrough]),
meck:expect(emqx_bpapi, supported_version, 1, 1), meck:expect(emqx_bpapi, supported_version, 1, 1),
meck:expect(emqx_bpapi, supported_version, 2, 1), meck:expect(emqx_bpapi, supported_version, 2, 1),
init_per_testcase(commong, Config); init_per_testcase(common, Config);
init_per_testcase(_, Config) -> init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
{Port, Sock, Acceptor} = start_http_server(fun handle_fun_200_ok/2), {Port, Sock, Acceptor} = start_http_server(fun handle_fun_200_ok/2),
[{port, Port}, {sock, Sock}, {acceptor, Acceptor} | Config]. [{port, Port}, {sock, Sock}, {acceptor, Acceptor} | Config].
end_per_testcase(t_bad_bpapi_vsn, Config) -> end_per_testcase(t_broken_bpapi_vsn, Config) ->
meck:unload([emqx_bpapi]), meck:unload([emqx_bpapi]),
end_per_testcase(commong, Config); end_per_testcase(common, Config);
end_per_testcase(t_old_bpapi_vsn, Config) ->
meck:unload([emqx_bpapi]),
end_per_testcase(common, Config);
end_per_testcase(_, Config) -> end_per_testcase(_, Config) ->
Sock = ?config(sock, Config), Sock = ?config(sock, Config),
Acceptor = ?config(acceptor, Config), Acceptor = ?config(acceptor, Config),
@ -442,13 +450,7 @@ t_cascade_delete_actions(Config) ->
{ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []), {ok, 204, <<>>} = request(delete, uri(["rules", RuleId]), []),
ok. ok.
t_start_stop_bridges_node(Config) -> t_broken_bpapi_vsn(Config) ->
do_start_stop_bridges(node, Config).
t_start_stop_bridges_cluster(Config) ->
do_start_stop_bridges(cluster, Config).
t_bad_bpapi_vsn(Config) ->
Port = ?config(port, Config), Port = ?config(port, Config),
URL1 = ?URL(Port, "abc"), URL1 = ?URL(Port, "abc"),
Name = <<"t_bad_bpapi_vsn">>, Name = <<"t_bad_bpapi_vsn">>,
@ -458,10 +460,36 @@ t_bad_bpapi_vsn(Config) ->
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name) ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
), ),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name), BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
%% still works since we redirect to 'restart'
{ok, 501, <<>>} = request(post, operation_path(cluster, start, BridgeID), <<"">>), {ok, 501, <<>>} = request(post, operation_path(cluster, start, BridgeID), <<"">>),
{ok, 501, <<>>} = request(post, operation_path(node, start, BridgeID), <<"">>), {ok, 501, <<>>} = request(post, operation_path(node, start, BridgeID), <<"">>),
ok. ok.
t_old_bpapi_vsn(Config) ->
Port = ?config(port, Config),
URL1 = ?URL(Port, "abc"),
Name = <<"t_bad_bpapi_vsn">>,
{ok, 201, _Bridge} = request(
post,
uri(["bridges"]),
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name)
),
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, Name),
{ok, 204, <<>>} = request(post, operation_path(cluster, stop, BridgeID), <<"">>),
{ok, 204, <<>>} = request(post, operation_path(node, stop, BridgeID), <<"">>),
%% still works since we redirect to 'restart'
{ok, 204, <<>>} = request(post, operation_path(cluster, start, BridgeID), <<"">>),
{ok, 204, <<>>} = request(post, operation_path(node, start, BridgeID), <<"">>),
{ok, 204, <<>>} = request(post, operation_path(cluster, restart, BridgeID), <<"">>),
{ok, 204, <<>>} = request(post, operation_path(node, restart, BridgeID), <<"">>),
ok.
t_start_stop_bridges_node(Config) ->
do_start_stop_bridges(node, Config).
t_start_stop_bridges_cluster(Config) ->
do_start_stop_bridges(cluster, Config).
do_start_stop_bridges(Type, Config) -> do_start_stop_bridges(Type, Config) ->
%% assert we there's no bridges at first %% assert we there's no bridges at first
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),