From e2b4fb3bda08a2277f540cf21504b96d60d7cbb9 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Tue, 17 Oct 2023 11:44:04 +0200 Subject: [PATCH] fix: support 'start' operation --- apps/emqx/priv/bpapi.versions | 1 + apps/emqx_bridge/src/emqx_bridge_v2_api.erl | 276 ++++++++++++++---- .../src/proto/emqx_bridge_proto_v5.erl | 25 ++ rel/i18n/emqx_bridge_v2_api.hocon | 4 +- 4 files changed, 252 insertions(+), 54 deletions(-) diff --git a/apps/emqx/priv/bpapi.versions b/apps/emqx/priv/bpapi.versions index 91b01a4ee..47967cb1e 100644 --- a/apps/emqx/priv/bpapi.versions +++ b/apps/emqx/priv/bpapi.versions @@ -7,6 +7,7 @@ {emqx_bridge,2}. {emqx_bridge,3}. {emqx_bridge,4}. +{emqx_bridge,5}. {emqx_broker,1}. {emqx_cm,1}. {emqx_cm,2}. diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index 7dfaf25b4..d48135a00 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -38,8 +38,8 @@ '/bridges_v2'/2, '/bridges_v2/:id'/2, '/bridges_v2/:id/enable/:enable'/2, - %% '/bridges_v2/:id/:operation'/2, - %% '/nodes/:node/bridges_v2/:id/:operation'/2, + '/bridges_v2/:id/:operation'/2, + '/nodes/:node/bridges_v2/:id/:operation'/2, '/bridges_v2_probe'/2 ]). @@ -53,6 +53,10 @@ ) ). +-define(BRIDGE_NOT_ENABLED, + ?BAD_REQUEST(<<"Forbidden operation, bridge not enabled">>) +). + -define(TRY_PARSE_ID(ID, EXPR), try emqx_bridge_resource:parse_bridge_id(Id, #{atom_name => false}) of {BridgeType, BridgeName} -> @@ -73,8 +77,8 @@ paths() -> "/bridges_v2", "/bridges_v2/:id", "/bridges_v2/:id/enable/:enable", - %% "/bridges_v2/:id/:operation", - %% "/nodes/:node/bridges_v2/:id/:operation", + "/bridges_v2/:id/:operation", + "/nodes/:node/bridges_v2/:id/:operation", "/bridges_v2_probe" ]. @@ -119,6 +123,42 @@ param_path_id() -> } )}. +param_path_operation_cluster() -> + {operation, + mk( + enum([start]), + #{ + in => path, + required => true, + example => <<"start">>, + desc => ?DESC("desc_param_path_operation_cluster") + } + )}. + +param_path_operation_on_node() -> + {operation, + mk( + enum([start]), + #{ + in => path, + required => true, + example => <<"start">>, + desc => ?DESC("desc_param_path_operation_on_node") + } + )}. + +param_path_node() -> + {node, + mk( + binary(), + #{ + in => path, + required => true, + example => <<"emqx@127.0.0.1">>, + desc => ?DESC("desc_param_path_node") + } + )}. + param_path_enable() -> {enable, mk( @@ -222,54 +262,54 @@ schema("/bridges_v2/:id/enable/:enable") -> } } }; -%% schema("/bridges/:id/:operation") -> -%% #{ -%% 'operationId' => '/bridges/:id/:operation', -%% post => #{ -%% tags => [<<"bridges">>], -%% summary => <<"Stop, start or restart bridge">>, -%% description => ?DESC("desc_api7"), -%% parameters => [ -%% param_path_id(), -%% param_path_operation_cluster() -%% ], -%% responses => #{ -%% 204 => <<"Operation success">>, -%% 400 => error_schema( -%% 'BAD_REQUEST', "Problem with configuration of external service" -%% ), -%% 404 => error_schema('NOT_FOUND', "Bridge not found or invalid operation"), -%% 501 => error_schema('NOT_IMPLEMENTED', "Not Implemented"), -%% 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") -%% } -%% } -%% }; -%% schema("/nodes/:node/bridges/:id/:operation") -> -%% #{ -%% 'operationId' => '/nodes/:node/bridges/:id/:operation', -%% post => #{ -%% tags => [<<"bridges">>], -%% summary => <<"Stop, start or restart bridge">>, -%% description => ?DESC("desc_api8"), -%% parameters => [ -%% param_path_node(), -%% param_path_id(), -%% param_path_operation_on_node() -%% ], -%% responses => #{ -%% 204 => <<"Operation success">>, -%% 400 => error_schema( -%% 'BAD_REQUEST', -%% "Problem with configuration of external service or bridge not enabled" -%% ), -%% 404 => error_schema( -%% 'NOT_FOUND', "Bridge or node not found or invalid operation" -%% ), -%% 501 => error_schema('NOT_IMPLEMENTED', "Not Implemented"), -%% 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") -%% } -%% } -%% }; +schema("/bridges_v2/:id/:operation") -> + #{ + 'operationId' => '/bridges_v2/:id/:operation', + post => #{ + tags => [<<"bridges">>], + summary => <<"Manually start a bridge">>, + description => ?DESC("desc_api7"), + parameters => [ + param_path_id(), + param_path_operation_cluster() + ], + responses => #{ + 204 => <<"Operation success">>, + 400 => error_schema( + 'BAD_REQUEST', "Problem with configuration of external service" + ), + 404 => error_schema('NOT_FOUND', "Bridge not found or invalid operation"), + 501 => error_schema('NOT_IMPLEMENTED', "Not Implemented"), + 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") + } + } + }; +schema("/nodes/:node/bridges_v2/:id/:operation") -> + #{ + 'operationId' => '/nodes/:node/bridges_v2/:id/:operation', + post => #{ + tags => [<<"bridges">>], + summary => <<"Manually start a bridge">>, + description => ?DESC("desc_api8"), + parameters => [ + param_path_node(), + param_path_id(), + param_path_operation_on_node() + ], + responses => #{ + 204 => <<"Operation success">>, + 400 => error_schema( + 'BAD_REQUEST', + "Problem with configuration of external service or bridge not enabled" + ), + 404 => error_schema( + 'NOT_FOUND', "Bridge or node not found or invalid operation" + ), + 501 => error_schema('NOT_IMPLEMENTED', "Not Implemented"), + 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable") + } + } + }; schema("/bridges_v2_probe") -> #{ 'operationId' => '/bridges_v2_probe', @@ -378,6 +418,58 @@ schema("/bridges_v2_probe") -> '/bridges_v2/:id/enable/:enable'(_, _) -> ?METHOD_NOT_ALLOWED. +'/bridges_v2/:id/:operation'(post, #{ + bindings := + #{id := Id, operation := Op} +}) -> + ?TRY_PARSE_ID( + Id, + try is_enabled_bridge(BridgeType, BridgeName) of + false -> + ?BRIDGE_NOT_ENABLED; + true -> + case operation_to_all_func(Op) of + invalid -> + ?NOT_FOUND(<<"Invalid operation: ", Op/binary>>); + OperFunc -> + Nodes = mria:running_nodes(), + call_operation(all, OperFunc, [Nodes, BridgeType, BridgeName]) + end + catch + throw:not_found -> + ?BRIDGE_NOT_FOUND(BridgeType, BridgeName) + end + ). + +'/nodes/:node/bridges_v2/:id/:operation'(post, #{ + bindings := + #{id := Id, operation := Op, node := Node} +}) -> + ?TRY_PARSE_ID( + Id, + case emqx_utils:safe_to_existing_atom(Node, utf8) of + {ok, TargetNode} -> + try is_enabled_bridge(BridgeType, BridgeName) of + false -> + ?BRIDGE_NOT_ENABLED; + true -> + case node_operation_func(Op) of + invalid -> + ?NOT_FOUND(<<"Invalid operation: ", Op/binary>>); + OperFunc -> + call_operation(TargetNode, OperFunc, [ + TargetNode, BridgeType, BridgeName + ]) + end + catch + throw:not_found -> + ?BRIDGE_NOT_FOUND(BridgeType, BridgeName) + end; + {error, _} -> + ?NOT_FOUND(<<"Invalid node name: ", Node/binary>>) + end + ). + '/bridges_v2_probe'(post, Request) -> RequestMeta = #{module => ?MODULE, method => post, path => "/bridges_v2_probe"}, case emqx_dashboard_swagger:filter_check_request_and_translate_body(Request, RequestMeta) of @@ -471,6 +563,86 @@ lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) -> ?INTERNAL_ERROR(Reason) end. +is_enabled_bridge(BridgeType, BridgeName) -> + try emqx_bridge_v2:lookup(BridgeType, binary_to_existing_atom(BridgeName)) of + {ok, #{raw_config := ConfMap}} -> + maps:get(<<"enable">>, ConfMap, false); + {error, not_found} -> + throw(not_found) + catch + error:badarg -> + %% catch non-existing atom, + %% none-existing atom means it is not available in config PT storage. + throw(not_found) + end. + +node_operation_func(<<"start">>) -> v2_start_bridge_to_node; +node_operation_func(_) -> invalid. + +operation_to_all_func(<<"start">>) -> v2_start_bridge_to_all_nodes; +operation_to_all_func(_) -> invalid. + +call_operation(NodeOrAll, OperFunc, Args = [_Nodes, BridgeType, BridgeName]) -> + case is_ok(do_bpapi_call(NodeOrAll, OperFunc, Args)) of + Ok when Ok =:= ok; is_tuple(Ok), element(1, Ok) =:= ok -> + ?NO_CONTENT; + {error, not_implemented} -> + ?NOT_IMPLEMENTED; + {error, timeout} -> + ?BAD_REQUEST(<<"Request timeout">>); + {error, {start_pool_failed, Name, Reason}} -> + Msg = bin( + io_lib:format("Failed to start ~p pool for reason ~p", [Name, redact(Reason)]) + ), + ?BAD_REQUEST(Msg); + {error, not_found} -> + BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName), + ?SLOG(warning, #{ + msg => "bridge_inconsistent_in_cluster_for_call_operation", + reason => not_found, + type => BridgeType, + name => BridgeName, + bridge => BridgeId + }), + ?SERVICE_UNAVAILABLE(<<"Bridge not found on remote node: ", BridgeId/binary>>); + {error, {node_not_found, Node}} -> + ?NOT_FOUND(<<"Node not found: ", (atom_to_binary(Node))/binary>>); + {error, {unhealthy_target, Message}} -> + ?BAD_REQUEST(Message); + {error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' -> + ?BAD_REQUEST(redact(Reason)) + end. + +do_bpapi_call(all, Call, Args) -> + maybe_unwrap( + do_bpapi_call_vsn(emqx_bpapi:supported_version(emqx_bridge), Call, Args) + ); +do_bpapi_call(Node, Call, Args) -> + case lists:member(Node, mria:running_nodes()) of + true -> + do_bpapi_call_vsn(emqx_bpapi:supported_version(Node, emqx_bridge), Call, Args); + false -> + {error, {node_not_found, Node}} + end. + +do_bpapi_call_vsn(Version, Call, Args) -> + case is_supported_version(Version, Call) of + true -> + apply(emqx_bridge_proto_v5, Call, Args); + false -> + {error, not_implemented} + end. + +is_supported_version(Version, Call) -> + lists:member(Version, supported_versions(Call)). + +supported_versions(_Call) -> [5]. + +maybe_unwrap({error, not_implemented}) -> + {error, not_implemented}; +maybe_unwrap(RpcMulticallResult) -> + emqx_rpc:unwrap_erpc(RpcMulticallResult). + zip_bridges([BridgesFirstNode | _] = BridgesAllNodes) -> lists:foldl( fun(#{type := Type, name := Name}, Acc) -> diff --git a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v5.erl b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v5.erl index 135ca3f99..1417615a7 100644 --- a/apps/emqx_bridge/src/proto/emqx_bridge_proto_v5.erl +++ b/apps/emqx_bridge/src/proto/emqx_bridge_proto_v5.erl @@ -31,6 +31,8 @@ start_bridges_to_all_nodes/3, stop_bridges_to_all_nodes/3, + v2_start_bridge_to_node/3, + v2_start_bridge_to_all_nodes/3, v2_list_bridges_on_nodes/1, v2_lookup_from_all_nodes/3 ]). @@ -137,6 +139,7 @@ get_metrics_from_all_nodes(Nodes, BridgeType, BridgeName) -> ?TIMEOUT ). +%% V2 Calls -spec v2_list_bridges_on_nodes([node()]) -> emqx_rpc:erpc_multicall([emqx_resource:resource_data()]). v2_list_bridges_on_nodes(Nodes) -> @@ -152,3 +155,25 @@ v2_lookup_from_all_nodes(Nodes, BridgeType, BridgeName) -> [BridgeType, BridgeName], ?TIMEOUT ). + +-spec v2_start_bridge_to_all_nodes([node()], key(), key()) -> + emqx_rpc:erpc_multicall(). +v2_start_bridge_to_all_nodes(Nodes, BridgeType, BridgeName) -> + erpc:multicall( + Nodes, + emqx_bridge_v2, + start, + [BridgeType, BridgeName], + ?TIMEOUT + ). + +-spec v2_start_bridge_to_node(node(), key(), key()) -> + term(). +v2_start_bridge_to_node(Node, BridgeType, BridgeName) -> + rpc:call( + Node, + emqx_bridge_v2, + start, + [BridgeType, BridgeName], + ?TIMEOUT + ). diff --git a/rel/i18n/emqx_bridge_v2_api.hocon b/rel/i18n/emqx_bridge_v2_api.hocon index f64e7cdff..1ee4419f6 100644 --- a/rel/i18n/emqx_bridge_v2_api.hocon +++ b/rel/i18n/emqx_bridge_v2_api.hocon @@ -86,13 +86,13 @@ desc_param_path_node.label: """The node name""" desc_param_path_operation_cluster.desc: -"""Operations can be one of: 'stop' or 'restart'.""" +"""Operations can be one of: 'start'.""" desc_param_path_operation_cluster.label: """Cluster Operation""" desc_param_path_operation_on_node.desc: -"""Operations can be one of: 'stop' or 'restart'.""" +"""Operations can be one of: 'start'.""" desc_param_path_operation_on_node.label: """Node Operation """