feat(bridge): add APIs for restart/stop bridges on one node
This commit is contained in:
parent
e0557551aa
commit
36e068d00d
|
@ -50,7 +50,6 @@
|
|||
, remove/2
|
||||
, update/2
|
||||
, update/3
|
||||
, start/2
|
||||
, stop/2
|
||||
, restart/2
|
||||
]).
|
||||
|
@ -208,12 +207,10 @@ lookup(Type, Name, RawConf) ->
|
|||
raw_config => RawConf}}
|
||||
end.
|
||||
|
||||
start(Type, Name) ->
|
||||
restart(Type, Name).
|
||||
|
||||
stop(Type, Name) ->
|
||||
emqx_resource:stop(resource_id(Type, Name)).
|
||||
|
||||
%% we don't provide 'start', as we want an already started bridge to be restarted.
|
||||
restart(Type, Name) ->
|
||||
emqx_resource:restart(resource_id(Type, Name)).
|
||||
|
||||
|
@ -263,8 +260,8 @@ update(Type, Name, {OldConf, Conf}) ->
|
|||
%% we don't need to recreate the bridge if this config change is only to
|
||||
%% toggole the config 'bridge.{type}.{name}.enable'
|
||||
case maps:get(enable, Conf, true) of
|
||||
false -> stop(Type, Name);
|
||||
true -> start(Type, Name)
|
||||
true -> restart(Type, Name);
|
||||
false -> stop(Type, Name)
|
||||
end
|
||||
end.
|
||||
|
||||
|
|
|
@ -33,6 +33,7 @@
|
|||
-export([ '/bridges'/2
|
||||
, '/bridges/:id'/2
|
||||
, '/bridges/:id/operation/:operation'/2
|
||||
, '/nodes/:node/bridges/:id/operation/:operation'/2
|
||||
]).
|
||||
|
||||
-export([ lookup_from_local_node/2
|
||||
|
@ -74,7 +75,8 @@ namespace() -> "bridge".
|
|||
api_spec() ->
|
||||
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}).
|
||||
|
||||
paths() -> ["/bridges", "/bridges/:id", "/bridges/:id/operation/:operation"].
|
||||
paths() -> ["/bridges", "/bridges/:id", "/bridges/:id/operation/:operation",
|
||||
"/nodes/:node/bridges/:id/operation/:operation"].
|
||||
|
||||
error_schema(Code, Message) when is_atom(Code) ->
|
||||
error_schema([Code], Message);
|
||||
|
@ -87,11 +89,28 @@ get_response_body_schema() ->
|
|||
emqx_dashboard_swagger:schema_with_examples(emqx_bridge_schema:get_response(),
|
||||
bridge_info_examples(get)).
|
||||
|
||||
param_path_operation() ->
|
||||
{operation, mk(enum([start, stop, restart]),
|
||||
param_path_operation_cluster() ->
|
||||
{operation, mk(enum([enable, disable, stop, restart]),
|
||||
#{ in => path
|
||||
, required => true
|
||||
, example => <<"start">>
|
||||
, desc => <<"Operations can be one of: enable, disable, start, stop, restart">>
|
||||
})}.
|
||||
|
||||
param_path_operation_on_node() ->
|
||||
{operation, mk(enum([stop, restart]),
|
||||
#{ in => path
|
||||
, required => true
|
||||
, example => <<"start">>
|
||||
, desc => <<"Operations can be one of: start, stop, restart">>
|
||||
})}.
|
||||
|
||||
param_path_node() ->
|
||||
{node, mk(binary(),
|
||||
#{ in => path
|
||||
, required => true
|
||||
, example => <<"emqx@127.0.0.1">>
|
||||
, desc => <<"The bridge Id. Must be of format {type}:{name}">>
|
||||
})}.
|
||||
|
||||
param_path_id() ->
|
||||
|
@ -219,7 +238,7 @@ schema("/bridges") ->
|
|||
bridge_info_examples(post)),
|
||||
responses => #{
|
||||
201 => get_response_body_schema(),
|
||||
400 => error_schema('BAD_REQUEST', "Create bridge failed")
|
||||
400 => error_schema('ALREADY_EXISTS', "Bridge already exists")
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -267,11 +286,32 @@ schema("/bridges/:id/operation/:operation") ->
|
|||
'operationId' => '/bridges/:id/operation/:operation',
|
||||
post => #{
|
||||
tags => [<<"bridges">>],
|
||||
summary => <<"Start/Stop/Restart Bridge">>,
|
||||
description => <<"Start/Stop/Restart bridges on a specific node.">>,
|
||||
summary => <<"Enable/Disable/Stop/Restart Bridge">>,
|
||||
description => <<"Enable/Disable/Stop/Restart bridges on all nodes"
|
||||
" in the cluster.">>,
|
||||
parameters => [
|
||||
param_path_id(),
|
||||
param_path_operation()
|
||||
param_path_operation_cluster()
|
||||
],
|
||||
responses => #{
|
||||
500 => error_schema('INTERNAL_ERROR', "Operation Failed"),
|
||||
200 => <<"Operation success">>
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
schema("/nodes/:node/bridges/:id/operation/:operation") ->
|
||||
#{
|
||||
'operationId' => '/nodes/:node/bridges/:id/operation/:operation',
|
||||
post => #{
|
||||
tags => [<<"bridges">>],
|
||||
summary => <<"Stop/Restart Bridge">>,
|
||||
description => <<"Stop/Restart bridges on a specific node.\n"
|
||||
"NOTE: It's not allowed to disable/enable bridges on a single node.">>,
|
||||
parameters => [
|
||||
param_path_node(),
|
||||
param_path_id(),
|
||||
param_path_operation_on_node()
|
||||
],
|
||||
responses => #{
|
||||
500 => error_schema('INTERNAL_ERROR', "Operation Failed"),
|
||||
|
@ -341,23 +381,51 @@ lookup_from_local_node(BridgeType, BridgeName) ->
|
|||
|
||||
'/bridges/:id/operation/:operation'(post, #{bindings :=
|
||||
#{id := Id, operation := Op}}) ->
|
||||
?TRY_PARSE_ID(Id, case operation_to_conf_req(Op) of
|
||||
?TRY_PARSE_ID(Id, case operation_func(Op) of
|
||||
invalid -> {400, error_msg('BAD_REQUEST', <<"invalid operation">>)};
|
||||
UpReq ->
|
||||
OperFunc when OperFunc == enable; OperFunc == disable ->
|
||||
case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
|
||||
{UpReq, BridgeType, BridgeName}, #{override_to => cluster}) of
|
||||
{OperFunc, BridgeType, BridgeName}, #{override_to => cluster}) of
|
||||
{ok, _} -> {200};
|
||||
{error, {pre_config_update, _, bridge_not_found}} ->
|
||||
{404, error_msg('NOT_FOUND', <<"bridge not found">>)};
|
||||
{error, Reason} ->
|
||||
{500, error_msg('INTERNAL_ERROR', Reason)}
|
||||
end;
|
||||
OperFunc ->
|
||||
Nodes = mria_mnesia:running_nodes(),
|
||||
operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName)
|
||||
end).
|
||||
|
||||
'/nodes/:node/bridges/:id/operation/:operation'(post, #{bindings :=
|
||||
#{id := Id, operation := Op}}) ->
|
||||
?TRY_PARSE_ID(Id, case operation_func(Op) of
|
||||
invalid -> {400, error_msg('BAD_REQUEST', <<"invalid operation">>)};
|
||||
OperFunc when OperFunc == restart; OperFunc == stop ->
|
||||
case emqx_bridge:OperFunc(BridgeType, BridgeName) of
|
||||
ok -> {200};
|
||||
{error, Reason} ->
|
||||
{500, error_msg('INTERNAL_ERROR', Reason)}
|
||||
end
|
||||
end).
|
||||
|
||||
operation_to_conf_req(<<"start">>) -> start;
|
||||
operation_to_conf_req(<<"stop">>) -> stop;
|
||||
operation_to_conf_req(<<"restart">>) -> restart;
|
||||
operation_to_conf_req(_) -> invalid.
|
||||
operation_func(<<"stop">>) -> stop;
|
||||
operation_func(<<"restart">>) -> restart;
|
||||
operation_func(<<"enable">>) -> enable;
|
||||
operation_func(<<"disable">>) -> disable;
|
||||
operation_func(_) -> invalid.
|
||||
|
||||
operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) ->
|
||||
RpcFunc = case OperFunc of
|
||||
restart -> restart_bridges_to_all_nodes;
|
||||
stop -> stop_bridges_to_all_nodes
|
||||
end,
|
||||
case is_ok(emqx_bridge_proto_v1:RpcFunc(Nodes, BridgeType, BridgeName)) of
|
||||
{ok, _} ->
|
||||
{200};
|
||||
{error, ErrL} ->
|
||||
{500, error_msg('INTERNAL_ERROR', ErrL)}
|
||||
end.
|
||||
|
||||
ensure_bridge_created(BridgeType, BridgeName, Conf) ->
|
||||
case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],
|
||||
|
@ -437,7 +505,7 @@ format_metrics(#{
|
|||
|
||||
|
||||
is_ok(ResL) ->
|
||||
case lists:filter(fun({ok, _}) -> false; (_) -> true end, ResL) of
|
||||
case lists:filter(fun({ok, _}) -> false; (ok) -> false; (_) -> true end, ResL) of
|
||||
[] -> {ok, [Res || {ok, Res} <- ResL]};
|
||||
ErrL -> {error, ErrL}
|
||||
end.
|
||||
|
|
|
@ -39,24 +39,16 @@ stop(_State) ->
|
|||
ok = emqx_bridge:unload_hook(),
|
||||
ok.
|
||||
|
||||
-define(IS_OPER(O), when Oper == start; Oper == stop; Oper == restart).
|
||||
pre_config_update(_, {Oper, _, _}, undefined) ?IS_OPER(Oper) ->
|
||||
%% NOTE: We depends on the `emqx_bridge:pre_config_update/3` to restart/stop the
|
||||
%% underlying resources.
|
||||
pre_config_update(_, {_Oper, _, _}, undefined) ->
|
||||
{error, bridge_not_found};
|
||||
pre_config_update(_, {Oper, Type, Name}, OldConfig) ?IS_OPER(Oper) ->
|
||||
case perform_operation(Oper, Type, Name) of
|
||||
ok ->
|
||||
%% we also need to save the 'enable' to the config files
|
||||
{ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}};
|
||||
{error, _} = Err -> Err
|
||||
end;
|
||||
pre_config_update(_, Conf, _OldConfig) ->
|
||||
pre_config_update(_, {Oper, _Type, _Name}, OldConfig) ->
|
||||
%% to save the 'enable' to the config files
|
||||
{ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}};
|
||||
pre_config_update(_, Conf, _OldConfig) when is_map(Conf) ->
|
||||
{ok, Conf}.
|
||||
|
||||
%% internal functions
|
||||
operation_to_enable(start) -> true;
|
||||
operation_to_enable(stop) -> false;
|
||||
operation_to_enable(restart) -> true.
|
||||
|
||||
perform_operation(start, Type, Name) -> emqx_bridge:restart(Type, Name);
|
||||
perform_operation(restart, Type, Name) -> emqx_bridge:restart(Type, Name);
|
||||
perform_operation(stop, Type, Name) -> emqx_bridge:stop(Type, Name).
|
||||
operation_to_enable(disable) -> false;
|
||||
operation_to_enable(enable) -> true.
|
||||
|
|
|
@ -22,6 +22,8 @@
|
|||
|
||||
, list_bridges/1
|
||||
, lookup_from_all_nodes/3
|
||||
, restart_bridges_to_all_nodes/3
|
||||
, stop_bridges_to_all_nodes/3
|
||||
]).
|
||||
|
||||
-include_lib("emqx/include/bpapi.hrl").
|
||||
|
@ -37,7 +39,20 @@ list_bridges(Node) ->
|
|||
|
||||
-type key() :: atom() | binary() | [byte()].
|
||||
|
||||
-spec restart_bridges_to_all_nodes([node()], key(), key()) ->
|
||||
emqx_rpc:erpc_multicall().
|
||||
restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||
erpc:multicall(Nodes, emqx_bridge, restart,
|
||||
[BridgeType, BridgeName], ?TIMEOUT).
|
||||
|
||||
-spec stop_bridges_to_all_nodes([node()], key(), key()) ->
|
||||
emqx_rpc:erpc_multicall().
|
||||
stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||
erpc:multicall(Nodes, emqx_bridge, stop,
|
||||
[BridgeType, BridgeName], ?TIMEOUT).
|
||||
|
||||
-spec lookup_from_all_nodes([node()], key(), key()) ->
|
||||
emqx_rpc:erpc_multicall().
|
||||
emqx_rpc:erpc_multicall().
|
||||
lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
|
||||
erpc:multicall(Nodes, emqx_bridge_api, lookup_from_local_node, [BridgeType, BridgeName], ?TIMEOUT).
|
||||
erpc:multicall(Nodes, emqx_bridge_api, lookup_from_local_node,
|
||||
[BridgeType, BridgeName], ?TIMEOUT).
|
||||
|
|
|
@ -239,6 +239,11 @@ t_http_crud_apis(_) ->
|
|||
ok.
|
||||
|
||||
t_start_stop_bridges(_) ->
|
||||
lists:foreach(fun(Type) ->
|
||||
do_start_stop_bridges(Type)
|
||||
end, [node, cluster]).
|
||||
|
||||
do_start_stop_bridges(Type) ->
|
||||
%% assert we there's no bridges at first
|
||||
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
|
||||
|
||||
|
@ -249,7 +254,7 @@ t_start_stop_bridges(_) ->
|
|||
%ct:pal("the bridge ==== ~p", [Bridge]),
|
||||
#{ <<"type">> := ?BRIDGE_TYPE
|
||||
, <<"name">> := ?BRIDGE_NAME
|
||||
, <<"status">> := _
|
||||
, <<"status">> := <<"connected">>
|
||||
, <<"node_status">> := [_|_]
|
||||
, <<"metrics">> := _
|
||||
, <<"node_metrics">> := [_|_]
|
||||
|
@ -257,24 +262,24 @@ t_start_stop_bridges(_) ->
|
|||
} = jsx:decode(Bridge),
|
||||
BridgeID = emqx_bridge:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME),
|
||||
%% stop it
|
||||
{ok, 200, <<>>} = request(post, operation_path(stop, BridgeID), <<"">>),
|
||||
{ok, 200, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>),
|
||||
{ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []),
|
||||
?assertMatch(#{ <<"status">> := <<"disconnected">>
|
||||
}, jsx:decode(Bridge2)),
|
||||
%% start again
|
||||
{ok, 200, <<>>} = request(post, operation_path(start, BridgeID), <<"">>),
|
||||
{ok, 200, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>),
|
||||
{ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
|
||||
?assertMatch(#{ <<"status">> := <<"connected">>
|
||||
}, jsx:decode(Bridge3)),
|
||||
%% restart an already started bridge
|
||||
{ok, 200, <<>>} = request(post, operation_path(restart, BridgeID), <<"">>),
|
||||
{ok, 200, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>),
|
||||
{ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []),
|
||||
?assertMatch(#{ <<"status">> := <<"connected">>
|
||||
}, jsx:decode(Bridge3)),
|
||||
%% stop it again
|
||||
{ok, 200, <<>>} = request(post, operation_path(stop, BridgeID), <<"">>),
|
||||
{ok, 200, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>),
|
||||
%% restart a stopped bridge
|
||||
{ok, 200, <<>>} = request(post, operation_path(restart, BridgeID), <<"">>),
|
||||
{ok, 200, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>),
|
||||
{ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []),
|
||||
?assertMatch(#{ <<"status">> := <<"connected">>
|
||||
}, jsx:decode(Bridge4)),
|
||||
|
@ -307,7 +312,7 @@ request(Method, Url, Body) ->
|
|||
uri() -> uri([]).
|
||||
uri(Parts) when is_list(Parts) ->
|
||||
NParts = [E || E <- Parts],
|
||||
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION | NParts]).
|
||||
?HOST ++ str(filename:join([?BASE_PATH, ?API_VERSION | NParts])).
|
||||
|
||||
auth_header_() ->
|
||||
Username = <<"bridge_admin">>,
|
||||
|
@ -315,5 +320,10 @@ auth_header_() ->
|
|||
{ok, Token} = emqx_dashboard_admin:sign_token(Username, Password),
|
||||
{"Authorization", "Bearer " ++ binary_to_list(Token)}.
|
||||
|
||||
operation_path(Oper, BridgeID) ->
|
||||
operation_path(node, Oper, BridgeID) ->
|
||||
uri(["nodes", node(), "bridges", BridgeID, "operation", Oper]);
|
||||
operation_path(cluster, Oper, BridgeID) ->
|
||||
uri(["bridges", BridgeID, "operation", Oper]).
|
||||
|
||||
str(S) when is_list(S) -> S;
|
||||
str(S) when is_binary(S) -> binary_to_list(S).
|
||||
|
|
Loading…
Reference in New Issue