fix(mqtt-bridge): ensure proper feedback on async forwards

So that buffer worker would notice a connection loss in time, and
recycle inflight messages subsequently.
This commit is contained in:
Andrew Mayorov 2023-01-30 14:51:09 +03:00
parent 35c429ef1d
commit 4d146c521b
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
3 changed files with 18 additions and 19 deletions

View File

@ -198,10 +198,7 @@ on_query_async(
#{name := InstanceId}
) ->
?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
%% this is a cast, currently.
ok = emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplyFun, Args}),
WorkerPid = get_worker_pid(InstanceId),
{ok, WorkerPid}.
emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplyFun, Args}).
on_get_status(_InstId, #{name := InstanceId}) ->
case emqx_connector_mqtt_worker:status(InstanceId) of
@ -215,12 +212,6 @@ ensure_mqtt_worker_started(InstanceId, BridgeConf) ->
{error, Reason} -> {error, Reason}
end.
%% mqtt workers, when created and called via bridge callbacks, are
%% registered.
-spec get_worker_pid(atom()) -> pid().
get_worker_pid(InstanceId) ->
whereis(InstanceId).
make_sub_confs(EmptyMap, _Conf, _) when map_size(EmptyMap) == 0 ->
undefined;
make_sub_confs(undefined, _Conf, _) ->

View File

@ -26,6 +26,8 @@
ping/1
]).
-export([info/2]).
-export([
ensure_subscribed/3,
ensure_unsubscribed/2
@ -90,6 +92,9 @@ ping(undefined) ->
ping(#{client_pid := Pid}) ->
emqtt:ping(Pid).
info(pid, #{client_pid := Pid}) ->
Pid.
ensure_subscribed(#{client_pid := Pid, subscriptions := Subs} = Conn, Topic, QoS) when
is_pid(Pid)
->

View File

@ -168,9 +168,9 @@ send_to_remote(Name, Msg) ->
gen_statem:call(name(Name), {send_to_remote, Msg}).
send_to_remote_async(Pid, Msg, Callback) when is_pid(Pid) ->
gen_statem:cast(Pid, {send_to_remote_async, Msg, Callback});
gen_statem:call(Pid, {send_to_remote_async, Msg, Callback});
send_to_remote_async(Name, Msg, Callback) ->
gen_statem:cast(name(Name), {send_to_remote_async, Msg, Callback}).
gen_statem:call(name(Name), {send_to_remote_async, Msg, Callback}).
%% @doc Return all forwards (local subscriptions).
-spec get_forwards(id()) -> [topic()].
@ -270,12 +270,14 @@ maybe_destroy_session(_State) ->
idle({call, From}, ensure_started, State) ->
case do_connect(State) of
{ok, State1} ->
{next_state, connected, State1, [{reply, From, ok}, {state_timeout, 0, connected}]};
{next_state, connected, State1, {reply, From, ok}};
{error, Reason, _State} ->
{keep_state_and_data, {reply, From, {error, Reason}}}
end;
idle({call, From}, {send_to_remote, _}, _State) ->
{keep_state_and_data, {reply, From, {error, {recoverable_error, not_connected}}}};
idle({call, From}, {send_to_remote_async, _, _}, _State) ->
{keep_state_and_data, {reply, From, {error, {recoverable_error, not_connected}}}};
%% @doc Standing by for manual start.
idle(info, idle, #{start_type := manual}) ->
keep_state_and_data;
@ -290,14 +292,11 @@ idle(Type, Content, State) ->
connecting(#{reconnect_interval := ReconnectDelayMs} = State) ->
case do_connect(State) of
{ok, State1} ->
{next_state, connected, State1, {state_timeout, 0, connected}};
{next_state, connected, State1};
_ ->
{keep_state_and_data, {state_timeout, ReconnectDelayMs, reconnect}}
end.
connected(state_timeout, connected, State) ->
%% nothing to do
{keep_state, State};
connected({call, From}, {send_to_remote, Msg}, State) ->
case do_send(State, Msg) of
{ok, NState} ->
@ -305,9 +304,13 @@ connected({call, From}, {send_to_remote, Msg}, State) ->
{error, Reason} ->
{keep_state_and_data, {reply, From, {error, Reason}}}
end;
connected(cast, {send_to_remote_async, Msg, Callback}, State) ->
connected(
{call, From},
{send_to_remote_async, Msg, Callback},
State = #{connection := Connection}
) ->
_ = do_send_async(State, Msg, Callback),
{keep_state, State};
{keep_state, State, {reply, From, {ok, emqx_connector_mqtt_mod:info(pid, Connection)}}};
connected(
info,
{disconnected, Conn, Reason},