From 8730a03ab8d0160c9e77bd62828c736a203366f8 Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 15 Sep 2021 10:15:21 +0800 Subject: [PATCH] feat(bridges): add start/stop/restart HTTP APIs for bridges --- apps/emqx_bridge/src/emqx_bridge_api.erl | 36 +++++++++++++++++-- apps/emqx_bridge/src/emqx_bridge_schema.erl | 5 +-- .../src/emqx_connector_mqtt.erl | 27 +++++++------- 3 files changed, 49 insertions(+), 19 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 56c421e0c..e4805d7eb 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -23,6 +23,7 @@ , list_local_bridges/1 , crud_bridges_cluster/2 , crud_bridges/3 + , manage_bridges/2 ]). -define(TYPES, [mqtt]). @@ -113,14 +114,24 @@ crud_bridges_apis() -> operation_apis() -> Metadata = #{ post => #{ - description => <<"Restart bridges on all nodes in the cluster">>, + description => <<"Start/Stop/Restart bridges on a specific node">>, parameters => [ + param_path_node(), param_path_id(), param_path_operation()], responses => #{ <<"500">> => emqx_mgmt_util:error_schema(<<"Operation Failed">>, ['INTERNAL_ERROR']), <<"200">> => emqx_mgmt_util:schema(<<"Operation success">>)}}}, - {"/bridges/:id/operation/:operation", Metadata, manage_bridges}. + {"/nodes/:node/bridges/:id/operation/:operation", Metadata, manage_bridges}. + +param_path_node() -> + #{ + name => node, + in => path, + schema => #{type => string}, + required => true, + example => node() + }. param_path_id() -> #{ @@ -192,6 +203,20 @@ crud_bridges(_, delete, #{bindings := #{id := Id}}) -> {500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}} end). +manage_bridges(post, #{bindings := #{node := Node, id := Id, operation := Op}}) -> + OperFun = + fun (<<"start">>) -> start_bridge; + (<<"stop">>) -> stop_bridge; + (<<"restart">>) -> restart_bridge + end, + ?TRY_PARSE_ID(Id, + case rpc_call(binary_to_atom(Node, latin1), emqx_bridge, OperFun(Op), + [BridgeType, BridgeName]) of + ok -> {200}; + {error, Reason} -> + {500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}} + end). + format_resp(#{id := Id, raw_config := RawConf, resource_data := #{mod := Mod, status := Status}}) -> IsConnected = fun(started) -> true; (_) -> false end, RawConf#{ @@ -202,7 +227,12 @@ format_resp(#{id := Id, raw_config := RawConf, resource_data := #{mod := Mod, st }. rpc_call(Node, Fun, Args) -> - case rpc:call(Node, ?MODULE, Fun, Args) of + rpc_call(Node, ?MODULE, Fun, Args). + +rpc_call(Node, Mod, Fun, Args) when Node =:= node() -> + apply(Mod, Fun, Args); +rpc_call(Node, Mod, Fun, Args) -> + case rpc:call(Node, Mod, Fun, Args) of {badrpc, Reason} -> {error, Reason}; Res -> Res end. diff --git a/apps/emqx_bridge/src/emqx_bridge_schema.erl b/apps/emqx_bridge/src/emqx_bridge_schema.erl index beb0f282c..94cdaa30b 100644 --- a/apps/emqx_bridge/src/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/emqx_bridge_schema.erl @@ -8,10 +8,7 @@ roots() -> ["bridges"]. fields("bridges") -> - [{mqtt, hoconsc:ref(?MODULE, "mqtt")}]; - -fields("mqtt") -> - [{"$name", hoconsc:ref(?MODULE, "mqtt_bridge")}]; + [{mqtt, hoconsc:mk(hoconsc:map(name, hoconsc:ref(?MODULE, "mqtt_bridge")))}]; fields("mqtt_bridge") -> emqx_connector_mqtt:fields("config"). diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 708bcdeb9..55a4dde25 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -101,20 +101,21 @@ on_start(InstId, Conf) -> end end, InitRes, InOutConfigs). -on_stop(InstId, #{}) -> +on_stop(InstId, #{sub_bridges := NameList}) -> logger:info("stopping mqtt connector: ~p", [InstId]), - case ?MODULE:drop_bridge(InstId) of - ok -> ok; - {error, not_found} -> ok; - {error, Reason} -> - logger:error("stop bridge failed, error: ~p", [Reason]) - end. + lists:foreach(fun(Name) -> + case ?MODULE:drop_bridge(Name) of + ok -> ok; + {error, not_found} -> ok; + {error, Reason} -> + logger:error("stop channel ~p failed, error: ~p", [Name, Reason]) + end + end, NameList). %% TODO: let the emqx_resource trigger on_query/4 automatically according to the %% `message_in` and `message_out` config -on_query(InstId, {create_channel, Conf}, _AfterQuery, #{name_prefix := Prefix, +on_query(_InstId, {create_channel, Conf}, _AfterQuery, #{name_prefix := Prefix, baisc_conf := BasicConf}) -> - logger:debug("create channel to connector: ~p, conf: ~p", [InstId, Conf]), create_channel(Conf, Prefix, BasicConf); on_query(InstId, {publish_to_local, Msg}, _AfterQuery, _State) -> logger:debug("publish to local node, connector: ~p, msg: ~p", [InstId, Msg]); @@ -139,14 +140,16 @@ check_channel_id_dup(Confs) -> %% this is an `message_in` bridge create_channel(#{subscribe_remote_topic := _, id := Id} = InConf, NamePrefix, BasicConf) -> - logger:info("creating 'message_in' channel for: ~p", [Id]), + Name = bridge_name(NamePrefix, Id), + logger:info("creating 'message_in' channel ~p", [Name]), create_sub_bridge(BasicConf#{ - name => bridge_name(NamePrefix, Id), + name => Name, clientid => clientid(Id), subscriptions => InConf, forwards => undefined}); %% this is an `message_out` bridge create_channel(#{subscribe_local_topic := _, id := Id} = OutConf, NamePrefix, BasicConf) -> - logger:info("creating 'message_out' channel for: ~p", [Id]), + Name = bridge_name(NamePrefix, Id), + logger:info("creating 'message_out' channel ~p", [Name]), create_sub_bridge(BasicConf#{ name => bridge_name(NamePrefix, Id), clientid => clientid(Id),