diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index f0de07da2..c00eb6b14 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -256,11 +256,11 @@ t_mqtt_egress_bridge_ignores_clean_start(_) -> } ), - {ok, _, #{state := #{name := WorkerName}}} = + {ok, _, #{state := #{worker := WorkerPid}}} = emqx_resource:get_instance(emqx_bridge_resource:resource_id(BridgeID)), ?assertMatch( #{clean_start := true}, - maps:from_list(emqx_connector_mqtt_worker:info(WorkerName)) + maps:from_list(emqx_connector_mqtt_worker:info(WorkerPid)) ), %% delete the bridge diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 1efe6f3e0..bd4bf6eb1 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -25,8 +25,8 @@ callback_mode/0, start_link/0, init/1, - create_bridge/1, - drop_bridge/1, + create_bridge/2, + remove_bridge/1, bridges/0 ]). @@ -56,11 +56,10 @@ init([]) -> }, {ok, {SupFlag, []}}. -bridge_spec(Config) -> - {Name, NConfig} = maps:take(name, Config), +bridge_spec(Name, Options) -> #{ id => Name, - start => {emqx_connector_mqtt_worker, start_link, [Name, NConfig]}, + start => {emqx_connector_mqtt_worker, start_link, [Name, Options]}, restart => temporary, shutdown => 1000 }. @@ -72,10 +71,10 @@ bridges() -> || {Name, _Pid, _, _} <- supervisor:which_children(?MODULE) ]. -create_bridge(Config) -> - supervisor:start_child(?MODULE, bridge_spec(Config)). +create_bridge(Name, Options) -> + supervisor:start_child(?MODULE, bridge_spec(Name, Options)). -drop_bridge(Name) -> +remove_bridge(Name) -> case supervisor:terminate_child(?MODULE, Name) of ok -> supervisor:delete_child(?MODULE, Name); @@ -95,36 +94,39 @@ on_message_received(Msg, HookPoint, ResId) -> %% =================================================================== callback_mode() -> async_if_possible. -on_start(InstanceId, Conf) -> +on_start(ResourceId, Conf) -> ?SLOG(info, #{ msg => "starting_mqtt_connector", - connector => InstanceId, + connector => ResourceId, config => emqx_utils:redact(Conf) }), BasicConf = basic_config(Conf), - BridgeConf = BasicConf#{ - name => InstanceId, - clientid => clientid(InstanceId, Conf), - subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined), Conf, InstanceId), - forwards => make_forward_confs(maps:get(egress, Conf, undefined)) + BridgeOpts = BasicConf#{ + clientid => clientid(ResourceId, Conf), + subscriptions => make_sub_confs(maps:get(ingress, Conf, #{}), Conf, ResourceId), + forwards => maps:get(egress, Conf, #{}) }, - case ?MODULE:create_bridge(BridgeConf) of - {ok, _Pid} -> - ensure_mqtt_worker_started(InstanceId, BridgeConf); + case create_bridge(ResourceId, BridgeOpts) of + {ok, Pid, {ConnProps, WorkerConf}} -> + {ok, #{ + name => ResourceId, + worker => Pid, + config => WorkerConf, + props => ConnProps + }}; {error, {already_started, _Pid}} -> - ok = ?MODULE:drop_bridge(InstanceId), - {ok, _} = ?MODULE:create_bridge(BridgeConf), - ensure_mqtt_worker_started(InstanceId, BridgeConf); + ok = remove_bridge(ResourceId), + on_start(ResourceId, Conf); {error, Reason} -> {error, Reason} end. -on_stop(_InstId, #{name := InstanceId}) -> +on_stop(ResourceId, #{}) -> ?SLOG(info, #{ msg => "stopping_mqtt_connector", - connector => InstanceId + connector => ResourceId }), - case ?MODULE:drop_bridge(InstanceId) of + case remove_bridge(ResourceId) of ok -> ok; {error, not_found} -> @@ -132,24 +134,24 @@ on_stop(_InstId, #{name := InstanceId}) -> {error, Reason} -> ?SLOG(error, #{ msg => "stop_mqtt_connector_error", - connector => InstanceId, + connector => ResourceId, reason => Reason }) end. -on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) -> - ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), - case emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg) of +on_query(ResourceId, {send_message, Msg}, #{worker := Pid, config := Config}) -> + ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => ResourceId}), + case emqx_connector_mqtt_worker:send_to_remote(Pid, Msg, Config) of ok -> ok; {error, Reason} -> classify_error(Reason) end. -on_query_async(_InstId, {send_message, Msg}, CallbackIn, #{name := InstanceId}) -> - ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), +on_query_async(ResourceId, {send_message, Msg}, CallbackIn, #{worker := Pid, config := Config}) -> + ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => ResourceId}), Callback = {fun on_async_result/2, [CallbackIn]}, - case emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, Callback) of + case emqx_connector_mqtt_worker:send_to_remote_async(Pid, Msg, Callback, Config) of ok -> ok; {ok, Pid} -> @@ -172,8 +174,8 @@ apply_callback_function({F, A}, Result) when is_function(F), is_list(A) -> apply_callback_function({M, F, A}, Result) when is_atom(M), is_atom(F), is_list(A) -> erlang:apply(M, F, A ++ [Result]). -on_get_status(_InstId, #{name := InstanceId}) -> - emqx_connector_mqtt_worker:status(InstanceId). +on_get_status(_ResourceId, #{worker := Pid}) -> + emqx_connector_mqtt_worker:status(Pid). classify_error(disconnected = Reason) -> {error, {recoverable_error, Reason}}; @@ -186,33 +188,13 @@ classify_error(shutdown = Reason) -> classify_error(Reason) -> {error, {unrecoverable_error, Reason}}. -ensure_mqtt_worker_started(InstanceId, BridgeConf) -> - case emqx_connector_mqtt_worker:connect(InstanceId) of - {ok, Properties} -> - {ok, #{name => InstanceId, config => BridgeConf, props => Properties}}; - {error, Reason} -> - {error, Reason} - end. - -make_sub_confs(EmptyMap, _Conf, _) when map_size(EmptyMap) == 0 -> - undefined; -make_sub_confs(undefined, _Conf, _) -> - undefined; -make_sub_confs(SubRemoteConf, Conf, ResourceId) -> - case maps:find(hookpoint, Conf) of - error -> - error({no_hookpoint_provided, Conf}); - {ok, HookPoint} -> - MFA = {?MODULE, on_message_received, [HookPoint, ResourceId]}, - SubRemoteConf#{on_message_received => MFA} - end. - -make_forward_confs(EmptyMap) when map_size(EmptyMap) == 0 -> - undefined; -make_forward_confs(undefined) -> - undefined; -make_forward_confs(FrowardConf) -> - FrowardConf. +make_sub_confs(Subscriptions, _Conf, _) when map_size(Subscriptions) == 0 -> + Subscriptions; +make_sub_confs(Subscriptions, #{hookpoint := HookPoint}, ResourceId) -> + MFA = {?MODULE, on_message_received, [HookPoint, ResourceId]}, + Subscriptions#{on_message_received => MFA}; +make_sub_confs(_SubRemoteConf, Conf, ResourceId) -> + error({no_hookpoint_provided, ResourceId, Conf}). basic_config( #{ @@ -227,17 +209,14 @@ basic_config( } = Conf ) -> BasicConf = #{ - %% connection opts server => Server, %% 30s connect_timeout => 30, - auto_reconnect => true, proto_ver => ProtoVer, - %% Opening bridge_mode will form a non-standard mqtt connection message. + %% Opening a connection in bridge mode will form a non-standard mqtt connection message. %% A load balancing server (such as haproxy) is often set up before the emqx broker server. %% When the load balancing server enables mqtt connection packet inspection, - %% non-standard mqtt connection packets will be filtered out by LB. - %% So let's disable bridge_mode. + %% non-standard mqtt connection packets might be filtered out by LB. bridge_mode => BridgeMode, keepalive => ms_to_s(KeepAlive), clean_start => CleanStart, @@ -246,18 +225,9 @@ basic_config( ssl => EnableSsl, ssl_opts => maps:to_list(maps:remove(enable, Ssl)) }, - maybe_put_fields([username, password], Conf, BasicConf). - -maybe_put_fields(Fields, Conf, Acc0) -> - lists:foldl( - fun(Key, Acc) -> - case maps:find(Key, Conf) of - error -> Acc; - {ok, Val} -> Acc#{Key => Val} - end - end, - Acc0, - Fields + maps:merge( + BasicConf, + maps:with([username, password], Conf) ). ms_to_s(Ms) -> diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl index 8fc70405f..004819678 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl @@ -17,7 +17,6 @@ -module(emqx_connector_mqtt_msg). -export([ - make_pub_vars/2, to_remote_msg/2, to_broker_msg/3 ]). @@ -46,11 +45,6 @@ remote := remote_config() }. -make_pub_vars(_, undefined) -> - undefined; -make_pub_vars(Mountpoint, Conf) when is_map(Conf) -> - Conf#{mountpoint => Mountpoint}. - %% @doc Make export format: %% 1. Mount topic to a prefix %% 2. Fix QoS to 1 @@ -70,8 +64,7 @@ to_remote_msg(MapMsg, #{ topic := TopicToken, qos := QoSToken, retain := RetainToken - } = Remote, - mountpoint := Mountpoint + } = Remote }) when is_map(MapMsg) -> Topic = replace_vars_in_str(TopicToken, MapMsg), Payload = process_payload(Remote, MapMsg), @@ -81,12 +74,10 @@ to_remote_msg(MapMsg, #{ #mqtt_msg{ qos = QoS, retain = Retain, - topic = topic(Mountpoint, Topic), + topic = Topic, props = emqx_utils:pub_props_to_packet(PubProps), payload = Payload - }; -to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) -> - Msg#message{topic = topic(Mountpoint, Topic)}. + }. %% published from remote node over a MQTT connection to_broker_msg(Msg, Vars, undefined) -> @@ -98,8 +89,7 @@ to_broker_msg( topic := TopicToken, qos := QoSToken, retain := RetainToken - } = Local, - mountpoint := Mountpoint + } = Local }, Props ) -> @@ -112,7 +102,7 @@ to_broker_msg( Props#{properties => emqx_utils:pub_props_to_packet(PubProps)}, emqx_message:set_flags( #{dup => Dup, retain => Retain}, - emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload) + emqx_message:make(bridge, QoS, Topic, Payload) ) ). @@ -142,5 +132,3 @@ replace_simple_var(Val, _Data) -> set_headers(Val, Msg) -> emqx_message:set_headers(Val, Msg). -topic(undefined, Topic) -> Topic; -topic(Prefix, Topic) -> emqx_topic:prepend(Prefix, Topic). diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index e49603e51..c49d5b180 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -72,34 +72,75 @@ %% management APIs -export([ - connect/1, status/1, ping/1, info/1, - send_to_remote/2, - send_to_remote_async/3 + send_to_remote/3, + send_to_remote_async/4 ]). -export([handle_publish/3]). -export([handle_disconnect/1]). --export_type([ - config/0, - ack_ref/0 -]). +-export_type([config/0]). + +-type template() :: emqx_plugin_libs_rule:tmpl_token(). -type name() :: term(). -% -type qos() :: emqx_types:qos(). --type config() :: map(). --type ack_ref() :: term(). -% -type topic() :: emqx_types:topic(). +-type options() :: #{ + % endpoint + server := iodata(), + % emqtt client options + proto_ver := v3 | v4 | v5, + username := binary(), + password := binary(), + clientid := binary(), + clean_start := boolean(), + max_inflight := pos_integer(), + connect_timeout := pos_integer(), + retry_interval := timeout(), + bridge_mode := boolean(), + ssl := boolean(), + ssl_opts := proplists:proplist(), + % bridge options + subscriptions := map(), + forwards := map() +}. + +-type config() :: #{ + subscriptions := subscriptions() | undefined, + forwards := forwards() | undefined +}. + +-type subscriptions() :: #{ + remote := #{ + topic := emqx_topic:topic(), + qos => emqx_types:qos() + }, + local := #{ + topic => template(), + qos => template() | emqx_types:qos(), + retain => template() | boolean(), + payload => template() | undefined + }, + on_message_received := {module(), atom(), [term()]} +}. + +-type forwards() :: #{ + local => #{ + topic => emqx_topic:topic() + }, + remote := #{ + topic := template(), + qos => template() | emqx_types:qos(), + retain => template() | boolean(), + payload => template() | undefined + } +}. -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). --define(REF(Name), {via, gproc, ?NAME(Name)}). --define(NAME(Name), {n, l, Name}). - %% @doc Start a bridge worker. Supported configs: %% mountpoint: The topic mount point for messages sent to remote node/cluster %% `undefined', `<<>>' or `""' to disable @@ -107,20 +148,19 @@ %% %% Find more connection specific configs in the callback modules %% of emqx_bridge_connect behaviour. --spec start_link(name(), map()) -> - {ok, pid()} | {error, _Reason}. +-spec start_link(name(), options()) -> + {ok, pid(), {emqtt:properties(), config()}} | {error, _Reason}. start_link(Name, BridgeOpts) -> ?SLOG(debug, #{ msg => "client_starting", name => Name, options => BridgeOpts }), - Conf = init_config(Name, BridgeOpts), - Options = mk_client_options(Conf, BridgeOpts), + Config = init_config(Name, BridgeOpts), + Options = mk_client_options(Config, BridgeOpts), case emqtt:start_link(Options) of {ok, Pid} -> - true = gproc:reg_other(?NAME(Name), Pid, Conf), - {ok, Pid}; + connect(Pid, Name, Config); {error, Reason} = Error -> ?SLOG(error, #{ msg => "client_start_failed", @@ -130,22 +170,50 @@ start_link(Name, BridgeOpts) -> Error end. +connect(Pid, Name, Config = #{subscriptions := Subscriptions}) -> + case emqtt:connect(Pid) of + {ok, Props} -> + case subscribe_remote_topics(Pid, Subscriptions) of + ok -> + {ok, Pid, {Props, Config}}; + {ok, _, _RCs} -> + {ok, Pid, {Props, Config}}; + {error, Reason} = Error -> + ?SLOG(error, #{ + msg => "client_subscribe_failed", + subscriptions => Subscriptions, + reason => Reason + }), + _ = emqtt:stop(Pid), + Error + end; + {error, Reason} = Error -> + ?SLOG(warning, #{ + msg => "client_connect_failed", + reason => Reason, + name => Name + }), + _ = emqtt:stop(Pid), + Error + end. + +subscribe_remote_topics(Pid, #{remote := #{topic := RemoteTopic, qos := QoS}}) -> + emqtt:subscribe(Pid, RemoteTopic, QoS); +subscribe_remote_topics(_Ref, undefined) -> + ok. + init_config(Name, Opts) -> - Mountpoint = maps:get(forward_mountpoint, Opts, undefined), Subscriptions = maps:get(subscriptions, Opts, undefined), Forwards = maps:get(forwards, Opts, undefined), #{ - mountpoint => format_mountpoint(Mountpoint), subscriptions => pre_process_subscriptions(Subscriptions, Name, Opts), forwards => pre_process_forwards(Forwards) }. -mk_client_options(Conf, BridgeOpts) -> +mk_client_options(Config, BridgeOpts) -> Server = iolist_to_binary(maps:get(server, BridgeOpts)), HostPort = emqx_connector_mqtt_schema:parse_server(Server), - Mountpoint = maps:get(receive_mountpoint, BridgeOpts, undefined), - Subscriptions = maps:get(subscriptions, Conf), - Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Subscriptions), + Subscriptions = maps:get(subscriptions, Config), CleanStart = case Subscriptions of #{remote := _} -> @@ -156,24 +224,26 @@ mk_client_options(Conf, BridgeOpts) -> %% to ensure proper session recovery according to the MQTT spec. true end, - Opts = maps:without( + Opts = maps:with( [ - address, - auto_reconnect, - conn_type, - mountpoint, - forwards, - receive_mountpoint, - subscriptions + proto_ver, + username, + password, + clientid, + max_inflight, + connect_timeout, + retry_interval, + bridge_mode, + ssl, + ssl_opts ], BridgeOpts ), Opts#{ - msg_handler => mk_client_event_handler(Vars, #{server => Server}), + msg_handler => mk_client_event_handler(Subscriptions, #{server => Server}), hosts => [HostPort], clean_start => CleanStart, - force_ping => true, - proto_ver => maps:get(proto_ver, BridgeOpts, v4) + force_ping => true }. mk_client_event_handler(Vars, Opts) when Vars /= undefined -> @@ -184,45 +254,15 @@ mk_client_event_handler(Vars, Opts) when Vars /= undefined -> mk_client_event_handler(undefined, _Opts) -> undefined. -connect(Name) -> - #{subscriptions := Subscriptions} = get_config(Name), - case emqtt:connect(get_pid(Name)) of - {ok, Properties} -> - case subscribe_remote_topics(Name, Subscriptions) of - ok -> - {ok, Properties}; - {ok, _, _RCs} -> - {ok, Properties}; - {error, Reason} = Error -> - ?SLOG(error, #{ - msg => "client_subscribe_failed", - subscriptions => Subscriptions, - reason => Reason - }), - Error - end; - {error, Reason} = Error -> - ?SLOG(warning, #{ - msg => "client_connect_failed", - reason => Reason, - name => Name - }), - Error - end. -subscribe_remote_topics(Ref, #{remote := #{topic := FromTopic, qos := QoS}}) -> - emqtt:subscribe(ref(Ref), FromTopic, QoS); -subscribe_remote_topics(_Ref, undefined) -> - ok. +stop(Pid) -> + emqtt:stop(Pid). -stop(Ref) -> - emqtt:stop(ref(Ref)). +info(Pid) -> + emqtt:info(Pid). -info(Ref) -> - emqtt:info(ref(Ref)). - -status(Ref) -> +status(Pid) -> try - case proplists:get_value(socket, info(Ref)) of + case proplists:get_value(socket, info(Pid)) of Socket when Socket /= undefined -> connected; undefined -> @@ -233,14 +273,14 @@ status(Ref) -> disconnected end. -ping(Ref) -> - emqtt:ping(ref(Ref)). +ping(Pid) -> + emqtt:ping(Pid). -send_to_remote(Name, MsgIn) -> - trycall(fun() -> do_send(Name, export_msg(Name, MsgIn)) end). +send_to_remote(Pid, MsgIn, Conf) -> + do_send(Pid, export_msg(MsgIn, Conf)). -do_send(Name, {true, Msg}) -> - case emqtt:publish(get_pid(Name), Msg) of +do_send(Pid, {true, Msg}) -> + case emqtt:publish(Pid, Msg) of ok -> ok; {ok, #{reason_code := RC}} when @@ -266,36 +306,15 @@ do_send(Name, {true, Msg}) -> do_send(_Name, false) -> ok. -send_to_remote_async(Name, MsgIn, Callback) -> - trycall(fun() -> do_send_async(Name, export_msg(Name, MsgIn), Callback) end). +send_to_remote_async(Pid, MsgIn, Callback, Conf) -> + do_send_async(Pid, export_msg(MsgIn, Conf), Callback). -do_send_async(Name, {true, Msg}, Callback) -> - Pid = get_pid(Name), +do_send_async(Pid, {true, Msg}, Callback) -> ok = emqtt:publish_async(Pid, Msg, _Timeout = infinity, Callback), {ok, Pid}; -do_send_async(_Name, false, _Callback) -> +do_send_async(_Pid, false, _Callback) -> ok. -ref(Pid) when is_pid(Pid) -> - Pid; -ref(Term) -> - ?REF(Term). - -trycall(Fun) -> - try - Fun() - catch - throw:noproc -> - {error, disconnected}; - exit:{noproc, _} -> - {error, disconnected} - end. - -format_mountpoint(undefined) -> - undefined; -format_mountpoint(Prefix) -> - binary:replace(iolist_to_binary(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)). - pre_process_subscriptions(undefined, _, _) -> undefined; pre_process_subscriptions( @@ -356,38 +375,15 @@ downgrade_ingress_qos(2) -> downgrade_ingress_qos(QoS) -> QoS. -get_pid(Name) -> - case gproc:where(?NAME(Name)) of - Pid when is_pid(Pid) -> - Pid; - undefined -> - throw(noproc) - end. - -get_config(Name) -> - try - gproc:lookup_value(?NAME(Name)) - catch - error:badarg -> - throw(noproc) - end. - -export_msg(Name, Msg) -> - case get_config(Name) of - #{forwards := Forwards = #{}, mountpoint := Mountpoint} -> - {true, export_msg(Mountpoint, Forwards, Msg)}; - #{forwards := undefined} -> - ?SLOG(error, #{ - msg => "forwarding_unavailable", - message => Msg, - reason => "egress is not configured" - }), - false - end. - -export_msg(Mountpoint, Forwards, Msg) -> - Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards), - emqx_connector_mqtt_msg:to_remote_msg(Msg, Vars). +export_msg(Msg, #{forwards := Forwards = #{}}) -> + {true, emqx_connector_mqtt_msg:to_remote_msg(Msg, Forwards)}; +export_msg(Msg, #{forwards := undefined}) -> + ?SLOG(error, #{ + msg => "forwarding_unavailable", + message => Msg, + reason => "egress is not configured" + }), + false. %%