From 35c429ef1d43c02ac5315fc3e453db5abbc567e9 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 30 Jan 2023 14:49:56 +0300 Subject: [PATCH 1/5] refactor: drop a couple of unused macros / includes --- apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl | 6 ------ 1 file changed, 6 deletions(-) diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index 870f9acfc..afe173985 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -38,12 +38,6 @@ ]). -include_lib("emqx/include/logger.hrl"). --include_lib("emqx/include/emqx_mqtt.hrl"). - --define(ACK_REF(ClientPid, PktId), {ClientPid, PktId}). - -%% Messages towards ack collector process --define(REF_IDS(Ref, Ids), {Ref, Ids}). %%-------------------------------------------------------------------- %% emqx_bridge_connect callbacks From 4d146c521b76a284a672a5605f18dd89e5554572 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 30 Jan 2023 14:51:09 +0300 Subject: [PATCH 2/5] fix(mqtt-bridge): ensure proper feedback on async forwards So that buffer worker would notice a connection loss in time, and recycle inflight messages subsequently. --- .../src/emqx_connector_mqtt.erl | 11 +--------- .../src/mqtt/emqx_connector_mqtt_mod.erl | 5 +++++ .../src/mqtt/emqx_connector_mqtt_worker.erl | 21 +++++++++++-------- 3 files changed, 18 insertions(+), 19 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 71ed81dda..585122539 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -198,10 +198,7 @@ on_query_async( #{name := InstanceId} ) -> ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), - %% this is a cast, currently. - ok = emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplyFun, Args}), - WorkerPid = get_worker_pid(InstanceId), - {ok, WorkerPid}. + emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplyFun, Args}). on_get_status(_InstId, #{name := InstanceId}) -> case emqx_connector_mqtt_worker:status(InstanceId) of @@ -215,12 +212,6 @@ ensure_mqtt_worker_started(InstanceId, BridgeConf) -> {error, Reason} -> {error, Reason} end. -%% mqtt workers, when created and called via bridge callbacks, are -%% registered. --spec get_worker_pid(atom()) -> pid(). -get_worker_pid(InstanceId) -> - whereis(InstanceId). - make_sub_confs(EmptyMap, _Conf, _) when map_size(EmptyMap) == 0 -> undefined; make_sub_confs(undefined, _Conf, _) -> diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl index afe173985..6acbe3bb4 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl @@ -26,6 +26,8 @@ ping/1 ]). +-export([info/2]). + -export([ ensure_subscribed/3, ensure_unsubscribed/2 @@ -90,6 +92,9 @@ ping(undefined) -> ping(#{client_pid := Pid}) -> emqtt:ping(Pid). +info(pid, #{client_pid := Pid}) -> + Pid. + ensure_subscribed(#{client_pid := Pid, subscriptions := Subs} = Conn, Topic, QoS) when is_pid(Pid) -> diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index 00b45789e..776d2d8d9 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -168,9 +168,9 @@ send_to_remote(Name, Msg) -> gen_statem:call(name(Name), {send_to_remote, Msg}). send_to_remote_async(Pid, Msg, Callback) when is_pid(Pid) -> - gen_statem:cast(Pid, {send_to_remote_async, Msg, Callback}); + gen_statem:call(Pid, {send_to_remote_async, Msg, Callback}); send_to_remote_async(Name, Msg, Callback) -> - gen_statem:cast(name(Name), {send_to_remote_async, Msg, Callback}). + gen_statem:call(name(Name), {send_to_remote_async, Msg, Callback}). %% @doc Return all forwards (local subscriptions). -spec get_forwards(id()) -> [topic()]. @@ -270,12 +270,14 @@ maybe_destroy_session(_State) -> idle({call, From}, ensure_started, State) -> case do_connect(State) of {ok, State1} -> - {next_state, connected, State1, [{reply, From, ok}, {state_timeout, 0, connected}]}; + {next_state, connected, State1, {reply, From, ok}}; {error, Reason, _State} -> {keep_state_and_data, {reply, From, {error, Reason}}} end; idle({call, From}, {send_to_remote, _}, _State) -> {keep_state_and_data, {reply, From, {error, {recoverable_error, not_connected}}}}; +idle({call, From}, {send_to_remote_async, _, _}, _State) -> + {keep_state_and_data, {reply, From, {error, {recoverable_error, not_connected}}}}; %% @doc Standing by for manual start. idle(info, idle, #{start_type := manual}) -> keep_state_and_data; @@ -290,14 +292,11 @@ idle(Type, Content, State) -> connecting(#{reconnect_interval := ReconnectDelayMs} = State) -> case do_connect(State) of {ok, State1} -> - {next_state, connected, State1, {state_timeout, 0, connected}}; + {next_state, connected, State1}; _ -> {keep_state_and_data, {state_timeout, ReconnectDelayMs, reconnect}} end. -connected(state_timeout, connected, State) -> - %% nothing to do - {keep_state, State}; connected({call, From}, {send_to_remote, Msg}, State) -> case do_send(State, Msg) of {ok, NState} -> @@ -305,9 +304,13 @@ connected({call, From}, {send_to_remote, Msg}, State) -> {error, Reason} -> {keep_state_and_data, {reply, From, {error, Reason}}} end; -connected(cast, {send_to_remote_async, Msg, Callback}, State) -> +connected( + {call, From}, + {send_to_remote_async, Msg, Callback}, + State = #{connection := Connection} +) -> _ = do_send_async(State, Msg, Callback), - {keep_state, State}; + {keep_state, State, {reply, From, {ok, emqx_connector_mqtt_mod:info(pid, Connection)}}}; connected( info, {disconnected, Conn, Reason}, From d0c10b59aa1527dfa537cef34559232f97ceaaa1 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Mon, 30 Jan 2023 18:35:17 +0300 Subject: [PATCH 3/5] feat(mqtt-bridge): avoid middleman process Instead, supervise `emqtt` client process directly. --- .../test/emqx_bridge_api_SUITE.erl | 2 +- .../test/emqx_bridge_mqtt_SUITE.erl | 8 +- .../src/emqx_connector_mqtt.erl | 78 ++- .../src/mqtt/emqx_connector_mqtt_mod.erl | 235 ------- .../src/mqtt/emqx_connector_mqtt_worker.erl | 627 ++++++++---------- .../test/emqx_connector_mqtt_tests.erl | 60 -- .../test/emqx_connector_mqtt_worker_tests.erl | 101 --- 7 files changed, 338 insertions(+), 773 deletions(-) delete mode 100644 apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl delete mode 100644 apps/emqx_connector/test/emqx_connector_mqtt_tests.erl delete mode 100644 apps/emqx_connector/test/emqx_connector_mqtt_worker_tests.erl diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 6b0b3a941..5cb78d3ba 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -640,7 +640,7 @@ t_bridges_probe(Config) -> ?assertMatch( #{ <<"code">> := <<"TEST_FAILED">>, - <<"message">> := <<"#{reason => econnrefused", _/binary>> + <<"message">> := <<"econnrefused">> }, jsx:decode(ConnRefused) ), diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl index cd5a17184..6e3bf77ee 100644 --- a/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_SUITE.erl @@ -825,15 +825,15 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> <<"type">> => ?TYPE_MQTT, <<"name">> => ?BRIDGE_NAME_EGRESS, <<"egress">> => ?EGRESS_CONF, - %% to make it reconnect quickly - <<"reconnect_interval">> => <<"1s">>, <<"resource_opts">> => #{ <<"worker_pool_size">> => 2, <<"query_mode">> => <<"sync">>, %% using a long time so we can test recovery <<"request_timeout">> => <<"15s">>, %% to make it check the healthy quickly - <<"health_check_interval">> => <<"0.5s">> + <<"health_check_interval">> => <<"0.5s">>, + %% to make it reconnect quickly + <<"auto_restart_interval">> => <<"1s">> } } ), @@ -911,7 +911,7 @@ t_mqtt_conn_bridge_egress_reconnect(_) -> Decoded1 = jsx:decode(BridgeStr1), DecodedMetrics1 = jsx:decode(BridgeMetricsStr1), ?assertMatch( - Status when (Status == <<"connected">> orelse Status == <<"connecting">>), + Status when (Status == <<"connecting">> orelse Status == <<"disconnected">>), maps:get(<<"status">>, Decoded1) ), %% matched >= 3 because of possible retries. diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 585122539..462bac0b8 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -105,16 +105,15 @@ init([]) -> {ok, {SupFlag, []}}. bridge_spec(Config) -> + {Name, NConfig} = maps:take(name, Config), #{ - id => maps:get(name, Config), - start => {emqx_connector_mqtt_worker, start_link, [Config]}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_connector_mqtt_worker] + id => Name, + start => {emqx_connector_mqtt_worker, start_link, [Name, NConfig]}, + restart => temporary, + shutdown => 5000 }. --spec bridges() -> [{node(), map()}]. +-spec bridges() -> [{_Name, _Status}]. bridges() -> [ {Name, emqx_connector_mqtt_worker:status(Name)} @@ -144,8 +143,7 @@ on_message_received(Msg, HookPoint, ResId) -> %% =================================================================== callback_mode() -> async_if_possible. -on_start(InstId, Conf) -> - InstanceId = binary_to_atom(InstId, utf8), +on_start(InstanceId, Conf) -> ?SLOG(info, #{ msg => "starting_mqtt_connector", connector => InstanceId, @@ -154,8 +152,8 @@ on_start(InstId, Conf) -> BasicConf = basic_config(Conf), BridgeConf = BasicConf#{ name => InstanceId, - clientid => clientid(InstId, Conf), - subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined), Conf, InstId), + clientid => clientid(InstanceId, Conf), + subscriptions => make_sub_confs(maps:get(ingress, Conf, undefined), Conf, InstanceId), forwards => make_forward_confs(maps:get(egress, Conf, undefined)) }, case ?MODULE:create_bridge(BridgeConf) of @@ -189,35 +187,49 @@ on_stop(_InstId, #{name := InstanceId}) -> on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) -> ?TRACE("QUERY", "send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), - emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg). - -on_query_async( - _InstId, - {send_message, Msg}, - {ReplyFun, Args}, - #{name := InstanceId} -) -> - ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), - emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplyFun, Args}). - -on_get_status(_InstId, #{name := InstanceId}) -> - case emqx_connector_mqtt_worker:status(InstanceId) of - connected -> connected; - _ -> connecting + case emqx_connector_mqtt_worker:send_to_remote(InstanceId, Msg) of + ok -> + ok; + {error, Reason} -> + classify_error(Reason) end. +on_query_async(_InstId, {send_message, Msg}, Callback, #{name := InstanceId}) -> + ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), + case emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, Callback) of + ok -> + % TODO this is racy + {ok, emqx_connector_mqtt_worker:pid(InstanceId)}; + {error, Reason} -> + classify_error(Reason) + end. + +on_get_status(_InstId, #{name := InstanceId}) -> + emqx_connector_mqtt_worker:status(InstanceId). + +classify_error(disconnected = Reason) -> + {error, {recoverable_error, Reason}}; +classify_error({disconnected, _RC, _} = Reason) -> + {error, {recoverable_error, Reason}}; +classify_error({shutdown, _} = Reason) -> + {error, {recoverable_error, Reason}}; +classify_error(Reason) -> + {error, {unrecoverable_error, Reason}}. + ensure_mqtt_worker_started(InstanceId, BridgeConf) -> - case emqx_connector_mqtt_worker:ensure_started(InstanceId) of - ok -> {ok, #{name => InstanceId, bridge_conf => BridgeConf}}; - {error, Reason} -> {error, Reason} + case emqx_connector_mqtt_worker:connect(InstanceId) of + {ok, Properties} -> + {ok, #{name => InstanceId, config => BridgeConf, props => Properties}}; + {error, Reason} -> + {error, Reason} end. make_sub_confs(EmptyMap, _Conf, _) when map_size(EmptyMap) == 0 -> undefined; make_sub_confs(undefined, _Conf, _) -> undefined; -make_sub_confs(SubRemoteConf, Conf, InstId) -> - ResId = emqx_resource_manager:manager_id_to_resource_id(InstId), +make_sub_confs(SubRemoteConf, Conf, InstanceId) -> + ResId = emqx_resource_manager:manager_id_to_resource_id(InstanceId), case maps:find(hookpoint, Conf) of error -> error({no_hookpoint_provided, Conf}); @@ -251,7 +263,6 @@ basic_config( %% 30s connect_timeout => 30, auto_reconnect => true, - reconnect_interval => ?AUTO_RECONNECT_INTERVAL, proto_ver => ProtoVer, %% Opening bridge_mode will form a non-standard mqtt connection message. %% A load balancing server (such as haproxy) is often set up before the emqx broker server. @@ -264,8 +275,7 @@ basic_config( retry_interval => RetryIntv, max_inflight => MaxInflight, ssl => EnableSsl, - ssl_opts => maps:to_list(maps:remove(enable, Ssl)), - if_record_metrics => true + ssl_opts => maps:to_list(maps:remove(enable, Ssl)) }, maybe_put_fields([username, password], Conf, BasicConf). diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl deleted file mode 100644 index 6acbe3bb4..000000000 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_mod.erl +++ /dev/null @@ -1,235 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - -%% @doc This module implements EMQX Bridge transport layer on top of MQTT protocol - --module(emqx_connector_mqtt_mod). - --export([ - start/1, - send/2, - send_async/3, - stop/1, - ping/1 -]). - --export([info/2]). - --export([ - ensure_subscribed/3, - ensure_unsubscribed/2 -]). - -%% callbacks for emqtt --export([ - handle_publish/3, - handle_disconnected/2 -]). - --include_lib("emqx/include/logger.hrl"). - -%%-------------------------------------------------------------------- -%% emqx_bridge_connect callbacks -%%-------------------------------------------------------------------- - -start(Config) -> - Parent = self(), - ServerStr = iolist_to_binary(maps:get(server, Config)), - {Server, Port} = emqx_connector_mqtt_schema:parse_server(ServerStr), - Mountpoint = maps:get(receive_mountpoint, Config, undefined), - Subscriptions = maps:get(subscriptions, Config, undefined), - Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Subscriptions), - Handlers = make_hdlr(Parent, Vars, #{server => ServerStr}), - Config1 = Config#{ - msg_handler => Handlers, - host => Server, - port => Port, - force_ping => true, - proto_ver => maps:get(proto_ver, Config, v4) - }, - case emqtt:start_link(process_config(Config1)) of - {ok, Pid} -> - case emqtt:connect(Pid) of - {ok, _} -> - try - ok = sub_remote_topics(Pid, Subscriptions), - {ok, #{client_pid => Pid, subscriptions => Subscriptions}} - catch - throw:Reason -> - ok = stop(#{client_pid => Pid}), - {error, error_reason(Reason, ServerStr)} - end; - {error, Reason} -> - ok = stop(#{client_pid => Pid}), - {error, error_reason(Reason, ServerStr)} - end; - {error, Reason} -> - {error, error_reason(Reason, ServerStr)} - end. - -error_reason(Reason, ServerStr) -> - #{reason => Reason, server => ServerStr}. - -stop(#{client_pid := Pid}) -> - safe_stop(Pid, fun() -> emqtt:stop(Pid) end, 1000), - ok. - -ping(undefined) -> - pang; -ping(#{client_pid := Pid}) -> - emqtt:ping(Pid). - -info(pid, #{client_pid := Pid}) -> - Pid. - -ensure_subscribed(#{client_pid := Pid, subscriptions := Subs} = Conn, Topic, QoS) when - is_pid(Pid) --> - case emqtt:subscribe(Pid, Topic, QoS) of - {ok, _, _} -> Conn#{subscriptions => [{Topic, QoS} | Subs]}; - Error -> {error, Error} - end; -ensure_subscribed(_Conn, _Topic, _QoS) -> - %% return ok for now - %% next re-connect should should call start with new topic added to config - ok. - -ensure_unsubscribed(#{client_pid := Pid, subscriptions := Subs} = Conn, Topic) when is_pid(Pid) -> - case emqtt:unsubscribe(Pid, Topic) of - {ok, _, _} -> Conn#{subscriptions => lists:keydelete(Topic, 1, Subs)}; - Error -> {error, Error} - end; -ensure_unsubscribed(Conn, _) -> - %% return ok for now - %% next re-connect should should call start with this topic deleted from config - Conn. - -safe_stop(Pid, StopF, Timeout) -> - MRef = monitor(process, Pid), - unlink(Pid), - try - StopF() - catch - _:_ -> - ok - end, - receive - {'DOWN', MRef, _, _, _} -> - ok - after Timeout -> - exit(Pid, kill) - end. - -send(#{client_pid := ClientPid}, Msg) -> - emqtt:publish(ClientPid, Msg). - -send_async(#{client_pid := ClientPid}, Msg, Callback) -> - emqtt:publish_async(ClientPid, Msg, infinity, Callback). - -handle_publish(Msg, undefined, _Opts) -> - ?SLOG(error, #{ - msg => - "cannot_publish_to_local_broker_as" - "_'ingress'_is_not_configured", - message => Msg - }); -handle_publish(#{properties := Props} = Msg0, Vars, Opts) -> - Msg = format_msg_received(Msg0, Opts), - ?SLOG(debug, #{ - msg => "publish_to_local_broker", - message => Msg, - vars => Vars - }), - case Vars of - #{on_message_received := {Mod, Func, Args}} -> - _ = erlang:apply(Mod, Func, [Msg | Args]); - _ -> - ok - end, - maybe_publish_to_local_broker(Msg, Vars, Props). - -handle_disconnected(Reason, Parent) -> - Parent ! {disconnected, self(), Reason}. - -make_hdlr(Parent, Vars, Opts) -> - #{ - publish => {fun ?MODULE:handle_publish/3, [Vars, Opts]}, - disconnected => {fun ?MODULE:handle_disconnected/2, [Parent]} - }. - -sub_remote_topics(_ClientPid, undefined) -> - ok; -sub_remote_topics(ClientPid, #{remote := #{topic := FromTopic, qos := QoS}}) -> - case emqtt:subscribe(ClientPid, FromTopic, QoS) of - {ok, _, _} -> ok; - Error -> throw(Error) - end. - -process_config(Config) -> - maps:without([conn_type, address, receive_mountpoint, subscriptions, name], Config). - -maybe_publish_to_local_broker(Msg, Vars, Props) -> - case emqx_map_lib:deep_get([local, topic], Vars, undefined) of - %% local topic is not set, discard it - undefined -> ok; - _ -> emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars, Props)) - end. - -format_msg_received( - #{ - dup := Dup, - payload := Payload, - properties := Props, - qos := QoS, - retain := Retain, - topic := Topic - }, - #{server := Server} -) -> - #{ - id => emqx_guid:to_hexstr(emqx_guid:gen()), - server => Server, - payload => Payload, - topic => Topic, - qos => QoS, - dup => Dup, - retain => Retain, - pub_props => printable_maps(Props), - message_received_at => erlang:system_time(millisecond) - }. - -printable_maps(undefined) -> - #{}; -printable_maps(Headers) -> - maps:fold( - fun - ('User-Property', V0, AccIn) when is_list(V0) -> - AccIn#{ - 'User-Property' => maps:from_list(V0), - 'User-Property-Pairs' => [ - #{ - key => Key, - value => Value - } - || {Key, Value} <- V0 - ] - }; - (K, V0, AccIn) -> - AccIn#{K => V0} - end, - #{}, - Headers - ). diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index 776d2d8d9..85261a063 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -60,172 +60,252 @@ %% * Local messages are all normalised to QoS-1 when exporting to remote -module(emqx_connector_mqtt_worker). --behaviour(gen_statem). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx/include/logger.hrl"). %% APIs -export([ - start_link/1, - stop/1 -]). - -%% gen_statem callbacks --export([ - terminate/3, - code_change/4, - init/1, - callback_mode/0 -]). - -%% state functions --export([ - idle/3, - connected/3 + start_link/2, + stop/1, + pid/1 ]). %% management APIs -export([ - ensure_started/1, - ensure_stopped/1, + connect/1, status/1, ping/1, send_to_remote/2, send_to_remote_async/3 ]). --export([get_forwards/1]). - --export([get_subscriptions/1]). +-export([handle_publish/3]). +-export([handle_disconnect/1]). -export_type([ config/0, ack_ref/0 ]). --type id() :: atom() | string() | pid(). --type qos() :: emqx_types:qos(). +-type name() :: term(). +% -type qos() :: emqx_types:qos(). -type config() :: map(). -type ack_ref() :: term(). --type topic() :: emqx_types:topic(). +% -type topic() :: emqx_types:topic(). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -%% same as default in-flight limit for emqtt --define(DEFAULT_INFLIGHT_SIZE, 32). --define(DEFAULT_RECONNECT_DELAY_MS, timer:seconds(5)). +-define(REF(Name), {via, gproc, ?NAME(Name)}). +-define(NAME(Name), {n, l, Name}). %% @doc Start a bridge worker. Supported configs: -%% start_type: 'manual' (default) or 'auto', when manual, bridge will stay -%% at 'idle' state until a manual call to start it. -%% connect_module: The module which implements emqx_bridge_connect behaviour -%% and work as message batch transport layer -%% reconnect_interval: Delay in milli-seconds for the bridge worker to retry -%% in case of transportation failure. -%% max_inflight: Max number of batches allowed to send-ahead before receiving -%% confirmation from remote node/cluster %% mountpoint: The topic mount point for messages sent to remote node/cluster %% `undefined', `<<>>' or `""' to disable %% forwards: Local topics to subscribe. %% %% Find more connection specific configs in the callback modules %% of emqx_bridge_connect behaviour. -start_link(Opts) when is_list(Opts) -> - start_link(maps:from_list(Opts)); -start_link(Opts) -> - case maps:get(name, Opts, undefined) of - undefined -> - gen_statem:start_link(?MODULE, Opts, []); - Name -> - Name1 = name(Name), - gen_statem:start_link({local, Name1}, ?MODULE, Opts#{name => Name1}, []) +-spec start_link(name(), map()) -> + {ok, pid()} | {error, _Reason}. +start_link(Name, BridgeOpts) -> + ?SLOG(debug, #{ + msg => "client_starting", + name => Name, + options => BridgeOpts + }), + Conf = init_config(BridgeOpts), + Options = mk_client_options(Conf, BridgeOpts), + case emqtt:start_link(Options) of + {ok, Pid} -> + true = gproc:reg_other(?NAME(Name), Pid, Conf), + {ok, Pid}; + {error, Reason} = Error -> + ?SLOG(error, #{ + msg => "client_start_failed", + config => emqx_misc:redact(BridgeOpts), + reason => Reason + }), + Error end. -ensure_started(Name) -> - gen_statem:call(name(Name), ensure_started). - -%% @doc Manually stop bridge worker. State idempotency ensured. -ensure_stopped(Name) -> - gen_statem:call(name(Name), ensure_stopped, 5000). - -stop(Pid) -> gen_statem:stop(Pid). - -status(Pid) when is_pid(Pid) -> - gen_statem:call(Pid, status); -status(Name) -> - gen_statem:call(name(Name), status). - -ping(Pid) when is_pid(Pid) -> - gen_statem:call(Pid, ping); -ping(Name) -> - gen_statem:call(name(Name), ping). - -send_to_remote(Pid, Msg) when is_pid(Pid) -> - gen_statem:call(Pid, {send_to_remote, Msg}); -send_to_remote(Name, Msg) -> - gen_statem:call(name(Name), {send_to_remote, Msg}). - -send_to_remote_async(Pid, Msg, Callback) when is_pid(Pid) -> - gen_statem:call(Pid, {send_to_remote_async, Msg, Callback}); -send_to_remote_async(Name, Msg, Callback) -> - gen_statem:call(name(Name), {send_to_remote_async, Msg, Callback}). - -%% @doc Return all forwards (local subscriptions). --spec get_forwards(id()) -> [topic()]. -get_forwards(Name) -> gen_statem:call(name(Name), get_forwards, timer:seconds(1000)). - -%% @doc Return all subscriptions (subscription over mqtt connection to remote broker). --spec get_subscriptions(id()) -> [{emqx_types:topic(), qos()}]. -get_subscriptions(Name) -> gen_statem:call(name(Name), get_subscriptions). - -callback_mode() -> [state_functions]. - -%% @doc Config should be a map(). -init(#{name := Name} = ConnectOpts) -> - ?SLOG(debug, #{ - msg => "starting_bridge_worker", - name => Name - }), - erlang:process_flag(trap_exit, true), - State = init_state(ConnectOpts), - self() ! idle, - {ok, idle, State#{ - connect_opts => pre_process_opts(ConnectOpts) - }}. - -init_state(Opts) -> - ReconnDelayMs = maps:get(reconnect_interval, Opts, ?DEFAULT_RECONNECT_DELAY_MS), - StartType = maps:get(start_type, Opts, manual), +init_config(Opts) -> Mountpoint = maps:get(forward_mountpoint, Opts, undefined), - MaxInflightSize = maps:get(max_inflight, Opts, ?DEFAULT_INFLIGHT_SIZE), - Name = maps:get(name, Opts, undefined), + Subscriptions = maps:get(subscriptions, Opts, undefined), + Forwards = maps:get(forwards, Opts, undefined), #{ - start_type => StartType, - reconnect_interval => ReconnDelayMs, mountpoint => format_mountpoint(Mountpoint), - max_inflight => MaxInflightSize, - connection => undefined, - name => Name + subscriptions => pre_process_subscriptions(Subscriptions), + forwards => pre_process_forwards(Forwards) }. -pre_process_opts(#{subscriptions := InConf, forwards := OutConf} = ConnectOpts) -> - ConnectOpts#{ - subscriptions => pre_process_in_out(in, InConf), - forwards => pre_process_in_out(out, OutConf) +mk_client_options(Conf, BridgeOpts) -> + Server = iolist_to_binary(maps:get(server, BridgeOpts)), + HostPort = emqx_connector_mqtt_schema:parse_server(Server), + Mountpoint = maps:get(receive_mountpoint, BridgeOpts, undefined), + Subscriptions = maps:get(subscriptions, Conf), + Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Subscriptions), + Opts = maps:without( + [ + address, + auto_reconnect, + conn_type, + mountpoint, + forwards, + receive_mountpoint, + subscriptions + ], + BridgeOpts + ), + Opts#{ + msg_handler => mk_client_event_handler(Vars, #{server => Server}), + hosts => [HostPort], + force_ping => true, + proto_ver => maps:get(proto_ver, BridgeOpts, v4) }. -pre_process_in_out(_, undefined) -> +mk_client_event_handler(Vars, Opts) when Vars /= undefined -> + #{ + publish => {fun ?MODULE:handle_publish/3, [Vars, Opts]}, + disconnected => {fun ?MODULE:handle_disconnect/1, []} + }; +mk_client_event_handler(undefined, _Opts) -> + undefined. + +connect(Name) -> + #{subscriptions := Subscriptions} = get_config(Name), + case emqtt:connect(pid(Name)) of + {ok, Properties} -> + case subscribe_remote_topics(Name, Subscriptions) of + ok -> + {ok, Properties}; + {ok, _, _RCs} -> + {ok, Properties}; + {error, Reason} = Error -> + ?SLOG(error, #{ + msg => "client_subscribe_failed", + subscriptions => Subscriptions, + reason => Reason + }), + Error + end; + {error, Reason} = Error -> + ?SLOG(error, #{ + msg => "client_connect_failed", + reason => Reason + }), + Error + end. + +subscribe_remote_topics(Ref, #{remote := #{topic := FromTopic, qos := QoS}}) -> + emqtt:subscribe(ref(Ref), FromTopic, QoS); +subscribe_remote_topics(_Ref, undefined) -> + ok. + +stop(Ref) -> + emqtt:stop(ref(Ref)). + +pid(Name) -> + gproc:lookup_pid(?NAME(Name)). + +status(Ref) -> + trycall( + fun() -> + Info = emqtt:info(ref(Ref)), + case proplists:get_value(socket, Info) of + Socket when Socket /= undefined -> + connected; + undefined -> + connecting + end + end, + #{noproc => disconnected} + ). + +ping(Ref) -> + emqtt:ping(ref(Ref)). + +send_to_remote(Name, MsgIn) -> + trycall( + fun() -> do_send(Name, export_msg(Name, MsgIn)) end, + #{ + badarg => {error, disconnected}, + noproc => {error, disconnected} + } + ). + +do_send(Name, {true, Msg}) -> + case emqtt:publish(pid(Name), Msg) of + ok -> + ok; + {ok, #{reason_code := RC}} when + RC =:= ?RC_SUCCESS; + RC =:= ?RC_NO_MATCHING_SUBSCRIBERS + -> + ok; + {ok, #{reason_code := RC, reason_code_name := Reason}} -> + ?SLOG(warning, #{ + msg => "remote_publish_failed", + message => Msg, + reason_code => RC, + reason_code_name => Reason + }), + {error, Reason}; + {error, Reason} -> + ?SLOG(info, #{ + msg => "client_failed", + reason => Reason + }), + {error, Reason} + end; +do_send(_Name, false) -> + ok. + +send_to_remote_async(Name, MsgIn, Callback) -> + trycall( + fun() -> do_send_async(Name, export_msg(Name, MsgIn), Callback) end, + #{badarg => {error, disconnected}} + ). + +do_send_async(Name, {true, Msg}, Callback) -> + emqtt:publish_async(pid(Name), Msg, _Timeout = infinity, Callback); +do_send_async(_Name, false, _Callback) -> + ok. + +ref(Pid) when is_pid(Pid) -> + Pid; +ref(Term) -> + ?REF(Term). + +trycall(Fun, Else) -> + try + Fun() + catch + error:badarg -> + maps:get(badarg, Else); + exit:{noproc, _} -> + maps:get(noproc, Else) + end. + +format_mountpoint(undefined) -> undefined; -pre_process_in_out(in, #{local := LC} = Conf) when is_map(Conf) -> +format_mountpoint(Prefix) -> + binary:replace(iolist_to_binary(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)). + +pre_process_subscriptions(undefined) -> + undefined; +pre_process_subscriptions(#{local := LC} = Conf) when is_map(Conf) -> Conf#{local => pre_process_in_out_common(LC)}; -pre_process_in_out(in, Conf) when is_map(Conf) -> +pre_process_subscriptions(Conf) when is_map(Conf) -> %% have no 'local' field in the config + undefined. + +pre_process_forwards(undefined) -> undefined; -pre_process_in_out(out, #{remote := RC} = Conf) when is_map(Conf) -> +pre_process_forwards(#{remote := RC} = Conf) when is_map(Conf) -> Conf#{remote => pre_process_in_out_common(RC)}; -pre_process_in_out(out, Conf) when is_map(Conf) -> +pre_process_forwards(Conf) when is_map(Conf) -> %% have no 'remote' field in the config undefined. @@ -245,241 +325,112 @@ pre_process_conf(Key, Conf) -> Conf#{Key => Val} end. -code_change(_Vsn, State, Data, _Extra) -> - {ok, State, Data}. +get_config(Name) -> + gproc:lookup_value(?NAME(Name)). -terminate(_Reason, _StateName, State) -> - _ = disconnect(State), - maybe_destroy_session(State). +export_msg(Name, Msg) -> + case get_config(Name) of + #{forwards := Forwards = #{}, mountpoint := Mountpoint} -> + {true, export_msg(Mountpoint, Forwards, Msg)}; + #{forwards := undefined} -> + ?SLOG(error, #{ + msg => "forwarding_unavailable", + message => Msg, + reason => "egress is not configured" + }), + false + end. -maybe_destroy_session(#{connect_opts := ConnectOpts = #{clean_start := false}} = State) -> - try - %% Destroy session if clean_start is not set. - %% Ignore any crashes, just refresh the clean_start = true. - _ = do_connect(State#{connect_opts => ConnectOpts#{clean_start => true}}), - _ = disconnect(State), - ok - catch - _:_ -> +export_msg(Mountpoint, Forwards, Msg) -> + Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards), + emqx_connector_mqtt_msg:to_remote_msg(Msg, Vars). + +%% + +handle_publish(#{properties := Props} = MsgIn, Vars, Opts) -> + Msg = import_msg(MsgIn, Opts), + ?SLOG(debug, #{ + msg => "publish_local", + message => Msg, + vars => Vars + }), + case Vars of + #{on_message_received := {Mod, Func, Args}} -> + _ = erlang:apply(Mod, Func, [Msg | Args]); + _ -> ok - end; -maybe_destroy_session(_State) -> + end, + maybe_publish_local(Msg, Vars, Props). + +handle_disconnect(_Reason) -> ok. -%% ensure_started will be deprecated in the future -idle({call, From}, ensure_started, State) -> - case do_connect(State) of - {ok, State1} -> - {next_state, connected, State1, {reply, From, ok}}; - {error, Reason, _State} -> - {keep_state_and_data, {reply, From, {error, Reason}}} - end; -idle({call, From}, {send_to_remote, _}, _State) -> - {keep_state_and_data, {reply, From, {error, {recoverable_error, not_connected}}}}; -idle({call, From}, {send_to_remote_async, _, _}, _State) -> - {keep_state_and_data, {reply, From, {error, {recoverable_error, not_connected}}}}; -%% @doc Standing by for manual start. -idle(info, idle, #{start_type := manual}) -> - keep_state_and_data; -%% @doc Standing by for auto start. -idle(info, idle, #{start_type := auto} = State) -> - connecting(State); -idle(state_timeout, reconnect, State) -> - connecting(State); -idle(Type, Content, State) -> - common(idle, Type, Content, State). - -connecting(#{reconnect_interval := ReconnectDelayMs} = State) -> - case do_connect(State) of - {ok, State1} -> - {next_state, connected, State1}; +maybe_publish_local(Msg, Vars, Props) -> + case emqx_map_lib:deep_get([local, topic], Vars, undefined) of + %% local topic is not set, discard it + undefined -> + ok; _ -> - {keep_state_and_data, {state_timeout, ReconnectDelayMs, reconnect}} + emqx_broker:publish(emqx_connector_mqtt_msg:to_broker_msg(Msg, Vars, Props)) end. -connected({call, From}, {send_to_remote, Msg}, State) -> - case do_send(State, Msg) of - {ok, NState} -> - {keep_state, NState, {reply, From, ok}}; - {error, Reason} -> - {keep_state_and_data, {reply, From, {error, Reason}}} - end; -connected( - {call, From}, - {send_to_remote_async, Msg, Callback}, - State = #{connection := Connection} -) -> - _ = do_send_async(State, Msg, Callback), - {keep_state, State, {reply, From, {ok, emqx_connector_mqtt_mod:info(pid, Connection)}}}; -connected( - info, - {disconnected, Conn, Reason}, - #{connection := Connection, name := Name, reconnect_interval := ReconnectDelayMs} = State -) -> - ?tp(info, disconnected, #{name => Name, reason => Reason}), - case Conn =:= maps:get(client_pid, Connection, undefined) of - true -> - {next_state, idle, State#{connection => undefined}, - {state_timeout, ReconnectDelayMs, reconnect}}; - false -> - keep_state_and_data - end; -connected(Type, Content, State) -> - common(connected, Type, Content, State). - -%% Common handlers -common(StateName, {call, From}, status, _State) -> - {keep_state_and_data, {reply, From, StateName}}; -common(_StateName, {call, From}, ping, #{connection := Conn} = _State) -> - Reply = emqx_connector_mqtt_mod:ping(Conn), - {keep_state_and_data, {reply, From, Reply}}; -common(_StateName, {call, From}, ensure_stopped, #{connection := undefined} = _State) -> - {keep_state_and_data, {reply, From, ok}}; -common(_StateName, {call, From}, ensure_stopped, #{connection := Conn} = State) -> - Reply = emqx_connector_mqtt_mod:stop(Conn), - {next_state, idle, State#{connection => undefined}, {reply, From, Reply}}; -common(_StateName, {call, From}, get_forwards, #{connect_opts := #{forwards := Forwards}}) -> - {keep_state_and_data, {reply, From, Forwards}}; -common(_StateName, {call, From}, get_subscriptions, #{connection := Connection}) -> - {keep_state_and_data, {reply, From, maps:get(subscriptions, Connection, #{})}}; -common(_StateName, {call, From}, Req, _State) -> - {keep_state_and_data, {reply, From, {error, {unsupported_request, Req}}}}; -common(_StateName, info, {'EXIT', _, _}, State) -> - {keep_state, State}; -common(StateName, Type, Content, #{name := Name} = State) -> - ?SLOG(error, #{ - msg => "bridge_discarded_event", - name => Name, - type => Type, - state_name => StateName, - content => Content - }), - {keep_state, State}. - -do_connect( +import_msg( #{ - connect_opts := ConnectOpts, - name := Name - } = State -) -> - case emqx_connector_mqtt_mod:start(ConnectOpts) of - {ok, Conn} -> - ?tp(info, connected, #{name => Name}), - {ok, State#{connection => Conn}}; - {error, Reason} -> - ConnectOpts1 = obfuscate(ConnectOpts), - ?SLOG(error, #{ - msg => "failed_to_connect", - config => ConnectOpts1, - reason => Reason - }), - {error, Reason, State} - end. - -do_send(#{connect_opts := #{forwards := undefined}}, Msg) -> - ?SLOG(error, #{ - msg => - "cannot_forward_messages_to_remote_broker" - "_as_'egress'_is_not_configured", - messages => Msg - }); -do_send( - #{ - connection := Connection, - mountpoint := Mountpoint, - connect_opts := #{forwards := Forwards} - } = State, - Msg -) -> - Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards), - ExportMsg = emqx_connector_mqtt_msg:to_remote_msg(Msg, Vars), - ?SLOG(debug, #{ - msg => "publish_to_remote_broker", - message => Msg, - vars => Vars - }), - case emqx_connector_mqtt_mod:send(Connection, ExportMsg) of - ok -> - {ok, State}; - {ok, #{reason_code := RC}} when - RC =:= ?RC_SUCCESS; - RC =:= ?RC_NO_MATCHING_SUBSCRIBERS - -> - {ok, State}; - {ok, #{reason_code := RC, reason_code_name := RCN}} -> - ?SLOG(warning, #{ - msg => "publish_to_remote_node_falied", - message => Msg, - reason_code => RC, - reason_code_name => RCN - }), - {error, RCN}; - {error, Reason} -> - ?SLOG(info, #{ - msg => "mqtt_bridge_produce_failed", - reason => Reason - }), - {error, Reason} - end. - -do_send_async(#{connect_opts := #{forwards := undefined}}, Msg, _Callback) -> - %% TODO: eval callback with undefined error - ?SLOG(error, #{ - msg => - "cannot_forward_messages_to_remote_broker" - "_as_'egress'_is_not_configured", - messages => Msg - }); -do_send_async( - #{ - connection := Connection, - mountpoint := Mountpoint, - connect_opts := #{forwards := Forwards} + dup := Dup, + payload := Payload, + properties := Props, + qos := QoS, + retain := Retain, + topic := Topic }, - Msg, - Callback + #{server := Server} ) -> - Vars = emqx_connector_mqtt_msg:make_pub_vars(Mountpoint, Forwards), - ExportMsg = emqx_connector_mqtt_msg:to_remote_msg(Msg, Vars), - ?SLOG(debug, #{ - msg => "publish_to_remote_broker", - message => Msg, - vars => Vars - }), - emqx_connector_mqtt_mod:send_async(Connection, ExportMsg, Callback). + #{ + id => emqx_guid:to_hexstr(emqx_guid:gen()), + server => Server, + payload => Payload, + topic => Topic, + qos => QoS, + dup => Dup, + retain => Retain, + pub_props => printable_maps(Props), + message_received_at => erlang:system_time(millisecond) + }. -disconnect(#{connection := Conn} = State) when Conn =/= undefined -> - emqx_connector_mqtt_mod:stop(Conn), - State#{connection => undefined}; -disconnect(State) -> - State. - -format_mountpoint(undefined) -> - undefined; -format_mountpoint(Prefix) -> - binary:replace(iolist_to_binary(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)). - -name(Id) -> list_to_atom(str(Id)). - -obfuscate(Map) -> +printable_maps(undefined) -> + #{}; +printable_maps(Headers) -> maps:fold( - fun(K, V, Acc) -> - case is_sensitive(K) of - true -> [{K, '***'} | Acc]; - false -> [{K, V} | Acc] - end + fun + ('User-Property', V0, AccIn) when is_list(V0) -> + AccIn#{ + 'User-Property' => maps:from_list(V0), + 'User-Property-Pairs' => [ + #{ + key => Key, + value => Value + } + || {Key, Value} <- V0 + ] + }; + (K, V0, AccIn) -> + AccIn#{K => V0} end, - [], - Map + #{}, + Headers ). -is_sensitive(password) -> true; -is_sensitive(ssl_opts) -> true; -is_sensitive(_) -> false. - -str(A) when is_atom(A) -> - atom_to_list(A); -str(B) when is_binary(B) -> - binary_to_list(B); -str(S) when is_list(S) -> - S. +%% TODO +% maybe_destroy_session(#{connect_opts := ConnectOpts = #{clean_start := false}} = State) -> +% try +% %% Destroy session if clean_start is not set. +% %% Ignore any crashes, just refresh the clean_start = true. +% _ = do_connect(State#{connect_opts => ConnectOpts#{clean_start => true}}), +% _ = disconnect(State), +% ok +% catch +% _:_ -> +% ok +% end; +% maybe_destroy_session(_State) -> +% ok. diff --git a/apps/emqx_connector/test/emqx_connector_mqtt_tests.erl b/apps/emqx_connector/test/emqx_connector_mqtt_tests.erl deleted file mode 100644 index 88c8b5218..000000000 --- a/apps/emqx_connector/test/emqx_connector_mqtt_tests.erl +++ /dev/null @@ -1,60 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_connector_mqtt_tests). - --include_lib("eunit/include/eunit.hrl"). --include_lib("emqx/include/emqx_mqtt.hrl"). - -send_and_ack_test() -> - %% delegate from gen_rpc to rpc for unit test - meck:new(emqtt, [passthrough, no_history]), - meck:expect( - emqtt, - start_link, - 1, - fun(_) -> - {ok, spawn_link(fun() -> ok end)} - end - ), - meck:expect(emqtt, connect, 1, {ok, dummy}), - meck:expect( - emqtt, - stop, - 1, - fun(Pid) -> Pid ! stop end - ), - meck:expect( - emqtt, - publish, - 2, - fun(Client, Msg) -> - Client ! {publish, Msg}, - %% as packet id - {ok, Msg} - end - ), - try - Max = 1, - Batch = lists:seq(1, Max), - {ok, Conn} = emqx_connector_mqtt_mod:start(#{server => "127.0.0.1:1883"}), - %% return last packet id as batch reference - {ok, _AckRef} = emqx_connector_mqtt_mod:send(Conn, Batch), - - ok = emqx_connector_mqtt_mod:stop(Conn) - after - meck:unload(emqtt) - end. diff --git a/apps/emqx_connector/test/emqx_connector_mqtt_worker_tests.erl b/apps/emqx_connector/test/emqx_connector_mqtt_worker_tests.erl deleted file mode 100644 index 49bff7bbc..000000000 --- a/apps/emqx_connector/test/emqx_connector_mqtt_worker_tests.erl +++ /dev/null @@ -1,101 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_connector_mqtt_worker_tests). - --include_lib("eunit/include/eunit.hrl"). --include_lib("emqx/include/emqx.hrl"). --include_lib("emqx/include/emqx_mqtt.hrl"). - --define(BRIDGE_NAME, test). --define(BRIDGE_REG_NAME, emqx_connector_mqtt_worker_test). --define(WAIT(PATTERN, TIMEOUT), - receive - PATTERN -> - ok - after TIMEOUT -> - error(timeout) - end -). - --export([start/1, send/2, stop/1]). - -start(#{connect_result := Result, test_pid := Pid, test_ref := Ref}) -> - case is_pid(Pid) of - true -> Pid ! {connection_start_attempt, Ref}; - false -> ok - end, - Result. - -send(SendFun, Batch) when is_function(SendFun, 2) -> - SendFun(Batch). - -stop(_Pid) -> ok. - -%% connect first, disconnect, then connect again -disturbance_test() -> - meck:new(emqx_connector_mqtt_mod, [passthrough, no_history]), - meck:expect(emqx_connector_mqtt_mod, start, 1, fun(Conf) -> start(Conf) end), - meck:expect(emqx_connector_mqtt_mod, send, 2, fun(SendFun, Batch) -> send(SendFun, Batch) end), - meck:expect(emqx_connector_mqtt_mod, stop, 1, fun(Pid) -> stop(Pid) end), - try - emqx_metrics:start_link(), - Ref = make_ref(), - TestPid = self(), - Config = make_config(Ref, TestPid, {ok, #{client_pid => TestPid}}), - {ok, Pid} = emqx_connector_mqtt_worker:start_link(Config#{name => bridge_disturbance}), - ?assertEqual(Pid, whereis(bridge_disturbance)), - ?WAIT({connection_start_attempt, Ref}, 1000), - Pid ! {disconnected, TestPid, test}, - ?WAIT({connection_start_attempt, Ref}, 1000), - emqx_metrics:stop(), - ok = emqx_connector_mqtt_worker:stop(Pid) - after - meck:unload(emqx_connector_mqtt_mod) - end. - -manual_start_stop_test() -> - meck:new(emqx_connector_mqtt_mod, [passthrough, no_history]), - meck:expect(emqx_connector_mqtt_mod, start, 1, fun(Conf) -> start(Conf) end), - meck:expect(emqx_connector_mqtt_mod, send, 2, fun(SendFun, Batch) -> send(SendFun, Batch) end), - meck:expect(emqx_connector_mqtt_mod, stop, 1, fun(Pid) -> stop(Pid) end), - try - emqx_metrics:start_link(), - Ref = make_ref(), - TestPid = self(), - BridgeName = manual_start_stop, - Config0 = make_config(Ref, TestPid, {ok, #{client_pid => TestPid}}), - Config = Config0#{start_type := manual}, - {ok, Pid} = emqx_connector_mqtt_worker:start_link(Config#{name => BridgeName}), - %% call ensure_started again should yield the same result - ok = emqx_connector_mqtt_worker:ensure_started(BridgeName), - emqx_connector_mqtt_worker:ensure_stopped(BridgeName), - emqx_metrics:stop(), - ok = emqx_connector_mqtt_worker:stop(Pid) - after - meck:unload(emqx_connector_mqtt_mod) - end. - -make_config(Ref, TestPid, Result) -> - #{ - start_type => auto, - subscriptions => undefined, - forwards => undefined, - reconnect_interval => 50, - test_pid => TestPid, - test_ref => Ref, - connect_result => Result - }. From c76311c9c306c8ede80b54691caedb52a9d5442c Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 31 Jan 2023 14:17:45 +0300 Subject: [PATCH 4/5] fix(buffer): count inflight batches properly --- .../src/emqx_resource_buffer_worker.erl | 9 +++--- .../test/emqx_resource_SUITE.erl | 32 +++++++++++-------- 2 files changed, 23 insertions(+), 18 deletions(-) diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 50534df4f..c5395c8df 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1121,6 +1121,10 @@ append_queue(Id, Index, Q, Queries) -> -define(INITIAL_TIME_REF, initial_time). -define(INITIAL_MONOTONIC_TIME_REF, initial_monotonic_time). +%% NOTE +%% There are 4 metadata rows in an inflight table, keyed by atoms declared above. ☝ +-define(INFLIGHT_META_ROWS, 4). + inflight_new(InfltWinSZ, Id, Index) -> TableId = ets:new( emqx_resource_buffer_worker_inflight_tab, @@ -1181,12 +1185,9 @@ is_inflight_full(InflightTID) -> Size >= MaxSize. inflight_num_batches(InflightTID) -> - %% Note: we subtract 2 because there're 2 metadata rows that hold - %% the maximum size value and the number of messages. - MetadataRowCount = 2, case ets:info(InflightTID, size) of undefined -> 0; - Size -> max(0, Size - MetadataRowCount) + Size -> max(0, Size - ?INFLIGHT_META_ROWS) end. inflight_num_msgs(InflightTID) -> diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 227b6fedc..27101d1cc 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -411,7 +411,8 @@ t_query_counter_async_inflight(_) -> ?check_trace( {_, {ok, _}} = ?wait_async_action( - inc_counter_in_parallel(WindowSize, ReqOpts), + %% one more so that inflight would be already full upon last query + inc_counter_in_parallel(WindowSize + 1, ReqOpts), #{?snk_kind := buffer_worker_flush_but_inflight_full}, 1_000 ), @@ -445,9 +446,9 @@ t_query_counter_async_inflight(_) -> %% all responses should be received after the resource is resumed. {ok, SRef0} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), - %% +1 because the tmp_query above will be retried and succeed + %% +2 because the tmp_query above will be retried and succeed %% this time. - WindowSize + 1, + WindowSize + 2, _Timeout0 = 10_000 ), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), @@ -475,7 +476,7 @@ t_query_counter_async_inflight(_) -> fun(Trace) -> QueryTrace = ?of_kind(call_query_async, Trace), ?assertMatch([#{query := {query, _, {inc_counter, _}, _, _}} | _], QueryTrace), - ?assertEqual(WindowSize + Num, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), + ?assertEqual(WindowSize + Num + 1, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), tap_metrics(?LINE), ok end @@ -487,7 +488,8 @@ t_query_counter_async_inflight(_) -> ?check_trace( {_, {ok, _}} = ?wait_async_action( - inc_counter_in_parallel(WindowSize, ReqOpts), + %% one more so that inflight would be already full upon last query + inc_counter_in_parallel(WindowSize + 1, ReqOpts), #{?snk_kind := buffer_worker_flush_but_inflight_full}, 1_000 ), @@ -500,10 +502,10 @@ t_query_counter_async_inflight(_) -> %% this will block the resource_worker ok = emqx_resource:query(?ID, {inc_counter, 4}), - Sent = WindowSize + Num + WindowSize, + Sent = WindowSize + 1 + Num + WindowSize + 1, {ok, SRef1} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), - WindowSize, + WindowSize + 1, _Timeout0 = 10_000 ), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), @@ -593,7 +595,8 @@ t_query_counter_async_inflight_batch(_) -> ?check_trace( {_, {ok, _}} = ?wait_async_action( - inc_counter_in_parallel(NumMsgs, ReqOpts), + %% a batch more so that inflight would be already full upon last query + inc_counter_in_parallel(NumMsgs + BatchSize, ReqOpts), #{?snk_kind := buffer_worker_flush_but_inflight_full}, 5_000 ), @@ -652,9 +655,9 @@ t_query_counter_async_inflight_batch(_) -> %% all responses should be received after the resource is resumed. {ok, SRef0} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), - %% +1 because the tmp_query above will be retried and succeed + %% +2 because the tmp_query above will be retried and succeed %% this time. - WindowSize + 1, + WindowSize + 2, 10_000 ), ?assertMatch(ok, emqx_resource:simple_sync_query(?ID, resume)), @@ -664,7 +667,7 @@ t_query_counter_async_inflight_batch(_) -> %% take it again from the table; this time, it should have %% succeeded. ?assertMatch([{tmp_query, ok}], ets:take(Tab0, tmp_query)), - ?assertEqual(NumMsgs, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), + ?assertEqual(NumMsgs + BatchSize, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)}), tap_metrics(?LINE), %% send async query, this time everything should be ok. @@ -691,7 +694,7 @@ t_query_counter_async_inflight_batch(_) -> end ), ?assertEqual( - NumMsgs + NumMsgs1, + NumMsgs + BatchSize + NumMsgs1, ets:info(Tab0, size), #{tab => ets:tab2list(Tab0)} ), @@ -703,7 +706,8 @@ t_query_counter_async_inflight_batch(_) -> ?check_trace( {_, {ok, _}} = ?wait_async_action( - inc_counter_in_parallel(NumMsgs, ReqOpts), + %% a batch more so that inflight would be already full upon last query + inc_counter_in_parallel(NumMsgs + BatchSize, ReqOpts), #{?snk_kind := buffer_worker_flush_but_inflight_full}, 5_000 ), @@ -719,7 +723,7 @@ t_query_counter_async_inflight_batch(_) -> %% this will block the resource_worker ok = emqx_resource:query(?ID, {inc_counter, 1}), - Sent = NumMsgs + NumMsgs1 + NumMsgs, + Sent = NumMsgs + BatchSize + NumMsgs1 + NumMsgs, {ok, SRef1} = snabbkaffe:subscribe( ?match_event(#{?snk_kind := connector_demo_inc_counter_async}), WindowSize, From c5a7cd5acd38440d1be63dd5690ea688bbe5232f Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 31 Jan 2023 15:20:46 +0300 Subject: [PATCH 5/5] fix(mqtt-bridge): drop unused configuration parameter --- .../src/schema/emqx_bridge_compatible_config.erl | 1 - .../test/emqx_bridge_compatible_config_tests.erl | 2 -- apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl | 6 ------ 3 files changed, 9 deletions(-) diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl b/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl index 862b5e188..1e55d0c0e 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_compatible_config.erl @@ -72,7 +72,6 @@ up(#{<<"connector">> := Connector} = Config) -> Cn(proto_ver, <<"v4">>), Cn(server, undefined), Cn(retry_interval, <<"15s">>), - Cn(reconnect_interval, <<"15s">>), Cn(ssl, default_ssl()), {enable, Enable}, {resource_opts, default_resource_opts()}, diff --git a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl index a2671a40e..36dd6324a 100644 --- a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl +++ b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl @@ -224,7 +224,6 @@ bridges { mode = \"cluster_shareload\" password = \"\" proto_ver = \"v5\" - reconnect_interval = \"15s\" replayq {offload = false, seg_bytes = \"100MB\"} retry_interval = \"12s\" server = \"localhost:1883\" @@ -257,7 +256,6 @@ bridges { mode = \"cluster_shareload\" password = \"\" proto_ver = \"v4\" - reconnect_interval = \"15s\" replayq {offload = false, seg_bytes = \"100MB\"} retry_interval = \"44s\" server = \"localhost:1883\" diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index be462fcc1..6ea609cc6 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -72,12 +72,6 @@ fields("server_configs") -> )}, {server, emqx_schema:servers_sc(#{desc => ?DESC("server")}, ?MQTT_HOST_OPTS)}, {clientid_prefix, mk(binary(), #{required => false, desc => ?DESC("clientid_prefix")})}, - {reconnect_interval, - mk_duration( - "Reconnect interval. Delay for the MQTT bridge to retry establishing the connection " - "in case of transportation failure.", - #{default => "15s"} - )}, {proto_ver, mk( hoconsc:enum([v3, v4, v5]),