diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 67e9286bf..7aa9f4b7b 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -50,7 +50,6 @@ , remove/2 , update/2 , update/3 - , start/2 , stop/2 , restart/2 ]). @@ -208,12 +207,10 @@ lookup(Type, Name, RawConf) -> raw_config => RawConf}} end. -start(Type, Name) -> - restart(Type, Name). - stop(Type, Name) -> emqx_resource:stop(resource_id(Type, Name)). +%% we don't provide 'start', as we want an already started bridge to be restarted. restart(Type, Name) -> emqx_resource:restart(resource_id(Type, Name)). @@ -263,8 +260,8 @@ update(Type, Name, {OldConf, Conf}) -> %% we don't need to recreate the bridge if this config change is only to %% toggole the config 'bridge.{type}.{name}.enable' case maps:get(enable, Conf, true) of - false -> stop(Type, Name); - true -> start(Type, Name) + true -> restart(Type, Name); + false -> stop(Type, Name) end end. diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 28420b268..5326342b5 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -33,6 +33,7 @@ -export([ '/bridges'/2 , '/bridges/:id'/2 , '/bridges/:id/operation/:operation'/2 + , '/nodes/:node/bridges/:id/operation/:operation'/2 ]). -export([ lookup_from_local_node/2 @@ -74,7 +75,8 @@ namespace() -> "bridge". api_spec() -> emqx_dashboard_swagger:spec(?MODULE, #{check_schema => false}). -paths() -> ["/bridges", "/bridges/:id", "/bridges/:id/operation/:operation"]. +paths() -> ["/bridges", "/bridges/:id", "/bridges/:id/operation/:operation", + "/nodes/:node/bridges/:id/operation/:operation"]. error_schema(Code, Message) when is_atom(Code) -> error_schema([Code], Message); @@ -87,11 +89,28 @@ get_response_body_schema() -> emqx_dashboard_swagger:schema_with_examples(emqx_bridge_schema:get_response(), bridge_info_examples(get)). -param_path_operation() -> - {operation, mk(enum([start, stop, restart]), +param_path_operation_cluster() -> + {operation, mk(enum([enable, disable, stop, restart]), #{ in => path , required => true , example => <<"start">> + , desc => <<"Operations can be one of: enable, disable, start, stop, restart">> + })}. + +param_path_operation_on_node() -> + {operation, mk(enum([stop, restart]), + #{ in => path + , required => true + , example => <<"start">> + , desc => <<"Operations can be one of: start, stop, restart">> + })}. + +param_path_node() -> + {node, mk(binary(), + #{ in => path + , required => true + , example => <<"emqx@127.0.0.1">> + , desc => <<"The bridge Id. Must be of format {type}:{name}">> })}. param_path_id() -> @@ -219,7 +238,7 @@ schema("/bridges") -> bridge_info_examples(post)), responses => #{ 201 => get_response_body_schema(), - 400 => error_schema('BAD_REQUEST', "Create bridge failed") + 400 => error_schema('ALREADY_EXISTS', "Bridge already exists") } } }; @@ -267,11 +286,32 @@ schema("/bridges/:id/operation/:operation") -> 'operationId' => '/bridges/:id/operation/:operation', post => #{ tags => [<<"bridges">>], - summary => <<"Start/Stop/Restart Bridge">>, - description => <<"Start/Stop/Restart bridges on a specific node.">>, + summary => <<"Enable/Disable/Stop/Restart Bridge">>, + description => <<"Enable/Disable/Stop/Restart bridges on all nodes" + " in the cluster.">>, parameters => [ param_path_id(), - param_path_operation() + param_path_operation_cluster() + ], + responses => #{ + 500 => error_schema('INTERNAL_ERROR', "Operation Failed"), + 200 => <<"Operation success">> + } + } + }; + +schema("/nodes/:node/bridges/:id/operation/:operation") -> + #{ + 'operationId' => '/nodes/:node/bridges/:id/operation/:operation', + post => #{ + tags => [<<"bridges">>], + summary => <<"Stop/Restart Bridge">>, + description => <<"Stop/Restart bridges on a specific node.\n" + "NOTE: It's not allowed to disable/enable bridges on a single node.">>, + parameters => [ + param_path_node(), + param_path_id(), + param_path_operation_on_node() ], responses => #{ 500 => error_schema('INTERNAL_ERROR', "Operation Failed"), @@ -341,23 +381,51 @@ lookup_from_local_node(BridgeType, BridgeName) -> '/bridges/:id/operation/:operation'(post, #{bindings := #{id := Id, operation := Op}}) -> - ?TRY_PARSE_ID(Id, case operation_to_conf_req(Op) of + ?TRY_PARSE_ID(Id, case operation_func(Op) of invalid -> {400, error_msg('BAD_REQUEST', <<"invalid operation">>)}; - UpReq -> + OperFunc when OperFunc == enable; OperFunc == disable -> case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], - {UpReq, BridgeType, BridgeName}, #{override_to => cluster}) of + {OperFunc, BridgeType, BridgeName}, #{override_to => cluster}) of {ok, _} -> {200}; {error, {pre_config_update, _, bridge_not_found}} -> {404, error_msg('NOT_FOUND', <<"bridge not found">>)}; {error, Reason} -> {500, error_msg('INTERNAL_ERROR', Reason)} + end; + OperFunc -> + Nodes = mria_mnesia:running_nodes(), + operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) + end). + +'/nodes/:node/bridges/:id/operation/:operation'(post, #{bindings := + #{id := Id, operation := Op}}) -> + ?TRY_PARSE_ID(Id, case operation_func(Op) of + invalid -> {400, error_msg('BAD_REQUEST', <<"invalid operation">>)}; + OperFunc when OperFunc == restart; OperFunc == stop -> + case emqx_bridge:OperFunc(BridgeType, BridgeName) of + ok -> {200}; + {error, Reason} -> + {500, error_msg('INTERNAL_ERROR', Reason)} end end). -operation_to_conf_req(<<"start">>) -> start; -operation_to_conf_req(<<"stop">>) -> stop; -operation_to_conf_req(<<"restart">>) -> restart; -operation_to_conf_req(_) -> invalid. +operation_func(<<"stop">>) -> stop; +operation_func(<<"restart">>) -> restart; +operation_func(<<"enable">>) -> enable; +operation_func(<<"disable">>) -> disable; +operation_func(_) -> invalid. + +operation_to_all_nodes(Nodes, OperFunc, BridgeType, BridgeName) -> + RpcFunc = case OperFunc of + restart -> restart_bridges_to_all_nodes; + stop -> stop_bridges_to_all_nodes + end, + case is_ok(emqx_bridge_proto_v1:RpcFunc(Nodes, BridgeType, BridgeName)) of + {ok, _} -> + {200}; + {error, ErrL} -> + {500, error_msg('INTERNAL_ERROR', ErrL)} + end. ensure_bridge_created(BridgeType, BridgeName, Conf) -> case emqx_conf:update(emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], @@ -437,7 +505,7 @@ format_metrics(#{ is_ok(ResL) -> - case lists:filter(fun({ok, _}) -> false; (_) -> true end, ResL) of + case lists:filter(fun({ok, _}) -> false; (ok) -> false; (_) -> true end, ResL) of [] -> {ok, [Res || {ok, Res} <- ResL]}; ErrL -> {error, ErrL} end. diff --git a/apps/emqx_bridge/src/emqx_bridge_app.erl b/apps/emqx_bridge/src/emqx_bridge_app.erl index f192cf73c..b02fe2a9c 100644 --- a/apps/emqx_bridge/src/emqx_bridge_app.erl +++ b/apps/emqx_bridge/src/emqx_bridge_app.erl @@ -39,24 +39,16 @@ stop(_State) -> ok = emqx_bridge:unload_hook(), ok. --define(IS_OPER(O), when Oper == start; Oper == stop; Oper == restart). -pre_config_update(_, {Oper, _, _}, undefined) ?IS_OPER(Oper) -> +%% NOTE: We depends on the `emqx_bridge:pre_config_update/3` to restart/stop the +%% underlying resources. +pre_config_update(_, {_Oper, _, _}, undefined) -> {error, bridge_not_found}; -pre_config_update(_, {Oper, Type, Name}, OldConfig) ?IS_OPER(Oper) -> - case perform_operation(Oper, Type, Name) of - ok -> - %% we also need to save the 'enable' to the config files - {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}}; - {error, _} = Err -> Err - end; -pre_config_update(_, Conf, _OldConfig) -> +pre_config_update(_, {Oper, _Type, _Name}, OldConfig) -> + %% to save the 'enable' to the config files + {ok, OldConfig#{<<"enable">> => operation_to_enable(Oper)}}; +pre_config_update(_, Conf, _OldConfig) when is_map(Conf) -> {ok, Conf}. %% internal functions -operation_to_enable(start) -> true; -operation_to_enable(stop) -> false; -operation_to_enable(restart) -> true. - -perform_operation(start, Type, Name) -> emqx_bridge:restart(Type, Name); -perform_operation(restart, Type, Name) -> emqx_bridge:restart(Type, Name); -perform_operation(stop, Type, Name) -> emqx_bridge:stop(Type, Name). +operation_to_enable(disable) -> false; +operation_to_enable(enable) -> true. diff --git a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl index 71ea1d2dc..021074a1c 100644 --- a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl +++ b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v1.erl @@ -22,6 +22,8 @@ , list_bridges/1 , lookup_from_all_nodes/3 + , restart_bridges_to_all_nodes/3 + , stop_bridges_to_all_nodes/3 ]). -include_lib("emqx/include/bpapi.hrl"). @@ -37,7 +39,20 @@ list_bridges(Node) -> -type key() :: atom() | binary() | [byte()]. +-spec restart_bridges_to_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +restart_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall(Nodes, emqx_bridge, restart, + [BridgeType, BridgeName], ?TIMEOUT). + +-spec stop_bridges_to_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +stop_bridges_to_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall(Nodes, emqx_bridge, stop, + [BridgeType, BridgeName], ?TIMEOUT). + -spec lookup_from_all_nodes([node()], key(), key()) -> - emqx_rpc:erpc_multicall(). + emqx_rpc:erpc_multicall(). lookup_from_all_nodes(Nodes, BridgeType, BridgeName) -> - erpc:multicall(Nodes, emqx_bridge_api, lookup_from_local_node, [BridgeType, BridgeName], ?TIMEOUT). + erpc:multicall(Nodes, emqx_bridge_api, lookup_from_local_node, + [BridgeType, BridgeName], ?TIMEOUT). diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 47dc55f6d..fd0d04f9e 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -239,6 +239,11 @@ t_http_crud_apis(_) -> ok. t_start_stop_bridges(_) -> + lists:foreach(fun(Type) -> + do_start_stop_bridges(Type) + end, [node, cluster]). + +do_start_stop_bridges(Type) -> %% assert we there's no bridges at first {ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []), @@ -249,7 +254,7 @@ t_start_stop_bridges(_) -> %ct:pal("the bridge ==== ~p", [Bridge]), #{ <<"type">> := ?BRIDGE_TYPE , <<"name">> := ?BRIDGE_NAME - , <<"status">> := _ + , <<"status">> := <<"connected">> , <<"node_status">> := [_|_] , <<"metrics">> := _ , <<"node_metrics">> := [_|_] @@ -257,24 +262,24 @@ t_start_stop_bridges(_) -> } = jsx:decode(Bridge), BridgeID = emqx_bridge:bridge_id(?BRIDGE_TYPE, ?BRIDGE_NAME), %% stop it - {ok, 200, <<>>} = request(post, operation_path(stop, BridgeID), <<"">>), + {ok, 200, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>), {ok, 200, Bridge2} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch(#{ <<"status">> := <<"disconnected">> }, jsx:decode(Bridge2)), %% start again - {ok, 200, <<>>} = request(post, operation_path(start, BridgeID), <<"">>), + {ok, 200, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch(#{ <<"status">> := <<"connected">> }, jsx:decode(Bridge3)), %% restart an already started bridge - {ok, 200, <<>>} = request(post, operation_path(restart, BridgeID), <<"">>), + {ok, 200, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), {ok, 200, Bridge3} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch(#{ <<"status">> := <<"connected">> }, jsx:decode(Bridge3)), %% stop it again - {ok, 200, <<>>} = request(post, operation_path(stop, BridgeID), <<"">>), + {ok, 200, <<>>} = request(post, operation_path(Type, stop, BridgeID), <<"">>), %% restart a stopped bridge - {ok, 200, <<>>} = request(post, operation_path(restart, BridgeID), <<"">>), + {ok, 200, <<>>} = request(post, operation_path(Type, restart, BridgeID), <<"">>), {ok, 200, Bridge4} = request(get, uri(["bridges", BridgeID]), []), ?assertMatch(#{ <<"status">> := <<"connected">> }, jsx:decode(Bridge4)), @@ -307,7 +312,7 @@ request(Method, Url, Body) -> uri() -> uri([]). uri(Parts) when is_list(Parts) -> NParts = [E || E <- Parts], - ?HOST ++ filename:join([?BASE_PATH, ?API_VERSION | NParts]). + ?HOST ++ str(filename:join([?BASE_PATH, ?API_VERSION | NParts])). auth_header_() -> Username = <<"bridge_admin">>, @@ -315,5 +320,10 @@ auth_header_() -> {ok, Token} = emqx_dashboard_admin:sign_token(Username, Password), {"Authorization", "Bearer " ++ binary_to_list(Token)}. -operation_path(Oper, BridgeID) -> +operation_path(node, Oper, BridgeID) -> + uri(["nodes", node(), "bridges", BridgeID, "operation", Oper]); +operation_path(cluster, Oper, BridgeID) -> uri(["bridges", BridgeID, "operation", Oper]). + +str(S) when is_list(S) -> S; +str(S) when is_binary(S) -> binary_to_list(S).