feat(bridges): add start/stop/restart HTTP APIs for bridges

This commit is contained in:
Shawn 2021-09-15 10:15:21 +08:00
parent cb8dabe579
commit 8730a03ab8
3 changed files with 49 additions and 19 deletions

View File

@ -23,6 +23,7 @@
, list_local_bridges/1
, crud_bridges_cluster/2
, crud_bridges/3
, manage_bridges/2
]).
-define(TYPES, [mqtt]).
@ -113,14 +114,24 @@ crud_bridges_apis() ->
operation_apis() ->
Metadata = #{
post => #{
description => <<"Restart bridges on all nodes in the cluster">>,
description => <<"Start/Stop/Restart bridges on a specific node">>,
parameters => [
param_path_node(),
param_path_id(),
param_path_operation()],
responses => #{
<<"500">> => emqx_mgmt_util:error_schema(<<"Operation Failed">>, ['INTERNAL_ERROR']),
<<"200">> => emqx_mgmt_util:schema(<<"Operation success">>)}}},
{"/bridges/:id/operation/:operation", Metadata, manage_bridges}.
{"/nodes/:node/bridges/:id/operation/:operation", Metadata, manage_bridges}.
param_path_node() ->
#{
name => node,
in => path,
schema => #{type => string},
required => true,
example => node()
}.
param_path_id() ->
#{
@ -192,6 +203,20 @@ crud_bridges(_, delete, #{bindings := #{id := Id}}) ->
{500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
end).
manage_bridges(post, #{bindings := #{node := Node, id := Id, operation := Op}}) ->
OperFun =
fun (<<"start">>) -> start_bridge;
(<<"stop">>) -> stop_bridge;
(<<"restart">>) -> restart_bridge
end,
?TRY_PARSE_ID(Id,
case rpc_call(binary_to_atom(Node, latin1), emqx_bridge, OperFun(Op),
[BridgeType, BridgeName]) of
ok -> {200};
{error, Reason} ->
{500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
end).
format_resp(#{id := Id, raw_config := RawConf, resource_data := #{mod := Mod, status := Status}}) ->
IsConnected = fun(started) -> true; (_) -> false end,
RawConf#{
@ -202,7 +227,12 @@ format_resp(#{id := Id, raw_config := RawConf, resource_data := #{mod := Mod, st
}.
rpc_call(Node, Fun, Args) ->
case rpc:call(Node, ?MODULE, Fun, Args) of
rpc_call(Node, ?MODULE, Fun, Args).
rpc_call(Node, Mod, Fun, Args) when Node =:= node() ->
apply(Mod, Fun, Args);
rpc_call(Node, Mod, Fun, Args) ->
case rpc:call(Node, Mod, Fun, Args) of
{badrpc, Reason} -> {error, Reason};
Res -> Res
end.

View File

@ -8,10 +8,7 @@
roots() -> ["bridges"].
fields("bridges") ->
[{mqtt, hoconsc:ref(?MODULE, "mqtt")}];
fields("mqtt") ->
[{"$name", hoconsc:ref(?MODULE, "mqtt_bridge")}];
[{mqtt, hoconsc:mk(hoconsc:map(name, hoconsc:ref(?MODULE, "mqtt_bridge")))}];
fields("mqtt_bridge") ->
emqx_connector_mqtt:fields("config").

View File

@ -101,20 +101,21 @@ on_start(InstId, Conf) ->
end
end, InitRes, InOutConfigs).
on_stop(InstId, #{}) ->
on_stop(InstId, #{sub_bridges := NameList}) ->
logger:info("stopping mqtt connector: ~p", [InstId]),
case ?MODULE:drop_bridge(InstId) of
ok -> ok;
{error, not_found} -> ok;
{error, Reason} ->
logger:error("stop bridge failed, error: ~p", [Reason])
end.
lists:foreach(fun(Name) ->
case ?MODULE:drop_bridge(Name) of
ok -> ok;
{error, not_found} -> ok;
{error, Reason} ->
logger:error("stop channel ~p failed, error: ~p", [Name, Reason])
end
end, NameList).
%% TODO: let the emqx_resource trigger on_query/4 automatically according to the
%% `message_in` and `message_out` config
on_query(InstId, {create_channel, Conf}, _AfterQuery, #{name_prefix := Prefix,
on_query(_InstId, {create_channel, Conf}, _AfterQuery, #{name_prefix := Prefix,
baisc_conf := BasicConf}) ->
logger:debug("create channel to connector: ~p, conf: ~p", [InstId, Conf]),
create_channel(Conf, Prefix, BasicConf);
on_query(InstId, {publish_to_local, Msg}, _AfterQuery, _State) ->
logger:debug("publish to local node, connector: ~p, msg: ~p", [InstId, Msg]);
@ -139,14 +140,16 @@ check_channel_id_dup(Confs) ->
%% this is an `message_in` bridge
create_channel(#{subscribe_remote_topic := _, id := Id} = InConf, NamePrefix, BasicConf) ->
logger:info("creating 'message_in' channel for: ~p", [Id]),
Name = bridge_name(NamePrefix, Id),
logger:info("creating 'message_in' channel ~p", [Name]),
create_sub_bridge(BasicConf#{
name => bridge_name(NamePrefix, Id),
name => Name,
clientid => clientid(Id),
subscriptions => InConf, forwards => undefined});
%% this is an `message_out` bridge
create_channel(#{subscribe_local_topic := _, id := Id} = OutConf, NamePrefix, BasicConf) ->
logger:info("creating 'message_out' channel for: ~p", [Id]),
Name = bridge_name(NamePrefix, Id),
logger:info("creating 'message_out' channel ~p", [Name]),
create_sub_bridge(BasicConf#{
name => bridge_name(NamePrefix, Id),
clientid => clientid(Id),