fix: support 'start' operation

This commit is contained in:
Stefan Strigler 2023-10-17 11:44:04 +02:00 committed by Zaiming (Stone) Shi
parent d5ac3d0fd7
commit e2b4fb3bda
4 changed files with 252 additions and 54 deletions

View File

@ -7,6 +7,7 @@
{emqx_bridge,2}.
{emqx_bridge,3}.
{emqx_bridge,4}.
{emqx_bridge,5}.
{emqx_broker,1}.
{emqx_cm,1}.
{emqx_cm,2}.

View File

@ -38,8 +38,8 @@
'/bridges_v2'/2,
'/bridges_v2/:id'/2,
'/bridges_v2/:id/enable/:enable'/2,
%% '/bridges_v2/:id/:operation'/2,
%% '/nodes/:node/bridges_v2/:id/:operation'/2,
'/bridges_v2/:id/:operation'/2,
'/nodes/:node/bridges_v2/:id/:operation'/2,
'/bridges_v2_probe'/2
]).
@ -53,6 +53,10 @@
)
).
-define(BRIDGE_NOT_ENABLED,
?BAD_REQUEST(<<"Forbidden operation, bridge not enabled">>)
).
-define(TRY_PARSE_ID(ID, EXPR),
try emqx_bridge_resource:parse_bridge_id(Id, #{atom_name => false}) of
{BridgeType, BridgeName} ->
@ -73,8 +77,8 @@ paths() ->
"/bridges_v2",
"/bridges_v2/:id",
"/bridges_v2/:id/enable/:enable",
%% "/bridges_v2/:id/:operation",
%% "/nodes/:node/bridges_v2/:id/:operation",
"/bridges_v2/:id/:operation",
"/nodes/:node/bridges_v2/:id/:operation",
"/bridges_v2_probe"
].
@ -119,6 +123,42 @@ param_path_id() ->
}
)}.
param_path_operation_cluster() ->
{operation,
mk(
enum([start]),
#{
in => path,
required => true,
example => <<"start">>,
desc => ?DESC("desc_param_path_operation_cluster")
}
)}.
param_path_operation_on_node() ->
{operation,
mk(
enum([start]),
#{
in => path,
required => true,
example => <<"start">>,
desc => ?DESC("desc_param_path_operation_on_node")
}
)}.
param_path_node() ->
{node,
mk(
binary(),
#{
in => path,
required => true,
example => <<"emqx@127.0.0.1">>,
desc => ?DESC("desc_param_path_node")
}
)}.
param_path_enable() ->
{enable,
mk(
@ -222,54 +262,54 @@ schema("/bridges_v2/:id/enable/:enable") ->
}
}
};
%% schema("/bridges/:id/:operation") ->
%% #{
%% 'operationId' => '/bridges/:id/:operation',
%% post => #{
%% tags => [<<"bridges">>],
%% summary => <<"Stop, start or restart bridge">>,
%% description => ?DESC("desc_api7"),
%% parameters => [
%% param_path_id(),
%% param_path_operation_cluster()
%% ],
%% responses => #{
%% 204 => <<"Operation success">>,
%% 400 => error_schema(
%% 'BAD_REQUEST', "Problem with configuration of external service"
%% ),
%% 404 => error_schema('NOT_FOUND', "Bridge not found or invalid operation"),
%% 501 => error_schema('NOT_IMPLEMENTED', "Not Implemented"),
%% 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
%% }
%% }
%% };
%% schema("/nodes/:node/bridges/:id/:operation") ->
%% #{
%% 'operationId' => '/nodes/:node/bridges/:id/:operation',
%% post => #{
%% tags => [<<"bridges">>],
%% summary => <<"Stop, start or restart bridge">>,
%% description => ?DESC("desc_api8"),
%% parameters => [
%% param_path_node(),
%% param_path_id(),
%% param_path_operation_on_node()
%% ],
%% responses => #{
%% 204 => <<"Operation success">>,
%% 400 => error_schema(
%% 'BAD_REQUEST',
%% "Problem with configuration of external service or bridge not enabled"
%% ),
%% 404 => error_schema(
%% 'NOT_FOUND', "Bridge or node not found or invalid operation"
%% ),
%% 501 => error_schema('NOT_IMPLEMENTED', "Not Implemented"),
%% 503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
%% }
%% }
%% };
schema("/bridges_v2/:id/:operation") ->
#{
'operationId' => '/bridges_v2/:id/:operation',
post => #{
tags => [<<"bridges">>],
summary => <<"Manually start a bridge">>,
description => ?DESC("desc_api7"),
parameters => [
param_path_id(),
param_path_operation_cluster()
],
responses => #{
204 => <<"Operation success">>,
400 => error_schema(
'BAD_REQUEST', "Problem with configuration of external service"
),
404 => error_schema('NOT_FOUND', "Bridge not found or invalid operation"),
501 => error_schema('NOT_IMPLEMENTED', "Not Implemented"),
503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
}
}
};
schema("/nodes/:node/bridges_v2/:id/:operation") ->
#{
'operationId' => '/nodes/:node/bridges_v2/:id/:operation',
post => #{
tags => [<<"bridges">>],
summary => <<"Manually start a bridge">>,
description => ?DESC("desc_api8"),
parameters => [
param_path_node(),
param_path_id(),
param_path_operation_on_node()
],
responses => #{
204 => <<"Operation success">>,
400 => error_schema(
'BAD_REQUEST',
"Problem with configuration of external service or bridge not enabled"
),
404 => error_schema(
'NOT_FOUND', "Bridge or node not found or invalid operation"
),
501 => error_schema('NOT_IMPLEMENTED', "Not Implemented"),
503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
}
}
};
schema("/bridges_v2_probe") ->
#{
'operationId' => '/bridges_v2_probe',
@ -378,6 +418,58 @@ schema("/bridges_v2_probe") ->
'/bridges_v2/:id/enable/:enable'(_, _) ->
?METHOD_NOT_ALLOWED.
'/bridges_v2/:id/:operation'(post, #{
bindings :=
#{id := Id, operation := Op}
}) ->
?TRY_PARSE_ID(
Id,
try is_enabled_bridge(BridgeType, BridgeName) of
false ->
?BRIDGE_NOT_ENABLED;
true ->
case operation_to_all_func(Op) of
invalid ->
?NOT_FOUND(<<"Invalid operation: ", Op/binary>>);
OperFunc ->
Nodes = mria:running_nodes(),
call_operation(all, OperFunc, [Nodes, BridgeType, BridgeName])
end
catch
throw:not_found ->
?BRIDGE_NOT_FOUND(BridgeType, BridgeName)
end
).
'/nodes/:node/bridges_v2/:id/:operation'(post, #{
bindings :=
#{id := Id, operation := Op, node := Node}
}) ->
?TRY_PARSE_ID(
Id,
case emqx_utils:safe_to_existing_atom(Node, utf8) of
{ok, TargetNode} ->
try is_enabled_bridge(BridgeType, BridgeName) of
false ->
?BRIDGE_NOT_ENABLED;
true ->
case node_operation_func(Op) of
invalid ->
?NOT_FOUND(<<"Invalid operation: ", Op/binary>>);
OperFunc ->
call_operation(TargetNode, OperFunc, [
TargetNode, BridgeType, BridgeName
])
end
catch
throw:not_found ->
?BRIDGE_NOT_FOUND(BridgeType, BridgeName)
end;
{error, _} ->
?NOT_FOUND(<<"Invalid node name: ", Node/binary>>)
end
).
'/bridges_v2_probe'(post, Request) ->
RequestMeta = #{module => ?MODULE, method => post, path => "/bridges_v2_probe"},
case emqx_dashboard_swagger:filter_check_request_and_translate_body(Request, RequestMeta) of
@ -471,6 +563,86 @@ lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
?INTERNAL_ERROR(Reason)
end.
is_enabled_bridge(BridgeType, BridgeName) ->
try emqx_bridge_v2:lookup(BridgeType, binary_to_existing_atom(BridgeName)) of
{ok, #{raw_config := ConfMap}} ->
maps:get(<<"enable">>, ConfMap, false);
{error, not_found} ->
throw(not_found)
catch
error:badarg ->
%% catch non-existing atom,
%% none-existing atom means it is not available in config PT storage.
throw(not_found)
end.
node_operation_func(<<"start">>) -> v2_start_bridge_to_node;
node_operation_func(_) -> invalid.
operation_to_all_func(<<"start">>) -> v2_start_bridge_to_all_nodes;
operation_to_all_func(_) -> invalid.
call_operation(NodeOrAll, OperFunc, Args = [_Nodes, BridgeType, BridgeName]) ->
case is_ok(do_bpapi_call(NodeOrAll, OperFunc, Args)) of
Ok when Ok =:= ok; is_tuple(Ok), element(1, Ok) =:= ok ->
?NO_CONTENT;
{error, not_implemented} ->
?NOT_IMPLEMENTED;
{error, timeout} ->
?BAD_REQUEST(<<"Request timeout">>);
{error, {start_pool_failed, Name, Reason}} ->
Msg = bin(
io_lib:format("Failed to start ~p pool for reason ~p", [Name, redact(Reason)])
),
?BAD_REQUEST(Msg);
{error, not_found} ->
BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
?SLOG(warning, #{
msg => "bridge_inconsistent_in_cluster_for_call_operation",
reason => not_found,
type => BridgeType,
name => BridgeName,
bridge => BridgeId
}),
?SERVICE_UNAVAILABLE(<<"Bridge not found on remote node: ", BridgeId/binary>>);
{error, {node_not_found, Node}} ->
?NOT_FOUND(<<"Node not found: ", (atom_to_binary(Node))/binary>>);
{error, {unhealthy_target, Message}} ->
?BAD_REQUEST(Message);
{error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' ->
?BAD_REQUEST(redact(Reason))
end.
do_bpapi_call(all, Call, Args) ->
maybe_unwrap(
do_bpapi_call_vsn(emqx_bpapi:supported_version(emqx_bridge), Call, Args)
);
do_bpapi_call(Node, Call, Args) ->
case lists:member(Node, mria:running_nodes()) of
true ->
do_bpapi_call_vsn(emqx_bpapi:supported_version(Node, emqx_bridge), Call, Args);
false ->
{error, {node_not_found, Node}}
end.
do_bpapi_call_vsn(Version, Call, Args) ->
case is_supported_version(Version, Call) of
true ->
apply(emqx_bridge_proto_v5, Call, Args);
false ->
{error, not_implemented}
end.
is_supported_version(Version, Call) ->
lists:member(Version, supported_versions(Call)).
supported_versions(_Call) -> [5].
maybe_unwrap({error, not_implemented}) ->
{error, not_implemented};
maybe_unwrap(RpcMulticallResult) ->
emqx_rpc:unwrap_erpc(RpcMulticallResult).
zip_bridges([BridgesFirstNode | _] = BridgesAllNodes) ->
lists:foldl(
fun(#{type := Type, name := Name}, Acc) ->

View File

@ -31,6 +31,8 @@
start_bridges_to_all_nodes/3,
stop_bridges_to_all_nodes/3,
v2_start_bridge_to_node/3,
v2_start_bridge_to_all_nodes/3,
v2_list_bridges_on_nodes/1,
v2_lookup_from_all_nodes/3
]).
@ -137,6 +139,7 @@ get_metrics_from_all_nodes(Nodes, BridgeType, BridgeName) ->
?TIMEOUT
).
%% V2 Calls
-spec v2_list_bridges_on_nodes([node()]) ->
emqx_rpc:erpc_multicall([emqx_resource:resource_data()]).
v2_list_bridges_on_nodes(Nodes) ->
@ -152,3 +155,25 @@ v2_lookup_from_all_nodes(Nodes, BridgeType, BridgeName) ->
[BridgeType, BridgeName],
?TIMEOUT
).
-spec v2_start_bridge_to_all_nodes([node()], key(), key()) ->
emqx_rpc:erpc_multicall().
v2_start_bridge_to_all_nodes(Nodes, BridgeType, BridgeName) ->
erpc:multicall(
Nodes,
emqx_bridge_v2,
start,
[BridgeType, BridgeName],
?TIMEOUT
).
-spec v2_start_bridge_to_node(node(), key(), key()) ->
term().
v2_start_bridge_to_node(Node, BridgeType, BridgeName) ->
rpc:call(
Node,
emqx_bridge_v2,
start,
[BridgeType, BridgeName],
?TIMEOUT
).

View File

@ -86,13 +86,13 @@ desc_param_path_node.label:
"""The node name"""
desc_param_path_operation_cluster.desc:
"""Operations can be one of: 'stop' or 'restart'."""
"""Operations can be one of: 'start'."""
desc_param_path_operation_cluster.label:
"""Cluster Operation"""
desc_param_path_operation_on_node.desc:
"""Operations can be one of: 'stop' or 'restart'."""
"""Operations can be one of: 'start'."""
desc_param_path_operation_on_node.label:
"""Node Operation """