fix(buffer_worker): avoid sending late reply messages to callers

Fixes https://emqx.atlassian.net/browse/EMQX-9635

During a sync call from process `A` to a buffer worker `B`, its call
to the underlying resource `C` can be very slow.  In those cases, `A`
will receive a timeout response and expect no more messages from `B`
nor `C`.  However, prior to this fix, if `B` is stuck in a long sync
call to `C` and then gets its response after `A` timed out, `B` would
still send the late response to `A`, polluting its mailbox.
This commit is contained in:
Thales Macedo Garitezi 2023-04-19 17:42:27 -03:00
parent 8ccfbe9e16
commit cb995e2033
5 changed files with 73 additions and 6 deletions

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_resource, [ {application, emqx_resource, [
{description, "Manager for all external resources"}, {description, "Manager for all external resources"},
{vsn, "0.1.13"}, {vsn, "0.1.14"},
{registered, []}, {registered, []},
{mod, {emqx_resource_app, []}}, {mod, {emqx_resource_app, []}},
{applications, [ {applications, [

View File

@ -52,7 +52,7 @@
-export([queue_item_marshaller/1, estimate_size/1]). -export([queue_item_marshaller/1, estimate_size/1]).
-export([handle_async_reply/2, handle_async_batch_reply/2]). -export([handle_async_reply/2, handle_async_batch_reply/2, reply_call/2]).
-export([clear_disk_queue_dir/2]). -export([clear_disk_queue_dir/2]).
@ -293,10 +293,8 @@ code_change(_OldVsn, State, _Extra) ->
pick_call(Id, Key, Query, Timeout) -> pick_call(Id, Key, Query, Timeout) ->
?PICK(Id, Key, Pid, begin ?PICK(Id, Key, Pid, begin
Caller = self(),
MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]), MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]),
From = {Caller, MRef}, ReplyTo = {fun ?MODULE:reply_call/2, [MRef]},
ReplyTo = {fun gen_statem:reply/2, [From]},
erlang:send(Pid, ?SEND_REQ(ReplyTo, Query)), erlang:send(Pid, ?SEND_REQ(ReplyTo, Query)),
receive receive
{MRef, Response} -> {MRef, Response} ->
@ -1703,6 +1701,17 @@ default_resume_interval(_RequestTimeout = infinity, HealthCheckInterval) ->
default_resume_interval(RequestTimeout, HealthCheckInterval) -> default_resume_interval(RequestTimeout, HealthCheckInterval) ->
max(1, min(HealthCheckInterval, RequestTimeout div 3)). max(1, min(HealthCheckInterval, RequestTimeout div 3)).
-spec reply_call(reference(), term()) -> ok.
reply_call(Alias, Response) ->
%% Since we use a reference created with `{alias,
%% reply_demonitor}', after we `demonitor' it in case of a
%% timeout, we won't send any more messages that the caller is not
%% expecting anymore. Using `gen_statem:reply({pid(),
%% reference()}, _)' would still send a late reply even after the
%% demonitor.
erlang:send(Alias, {Alias, Response}),
ok.
-ifdef(TEST). -ifdef(TEST).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
adjust_batch_time_test_() -> adjust_batch_time_test_() ->

View File

@ -144,7 +144,11 @@ on_query(_InstId, {sleep_before_reply, For}, #{pid := Pid}) ->
Result Result
after 1000 -> after 1000 ->
{error, timeout} {error, timeout}
end. end;
on_query(_InstId, {sync_sleep_before_reply, SleepFor}, _State) ->
%% This simulates a slow sync call
timer:sleep(SleepFor),
{ok, slept}.
on_query_async(_InstId, block, ReplyFun, #{pid := Pid}) -> on_query_async(_InstId, block, ReplyFun, #{pid := Pid}) ->
Pid ! {block, ReplyFun}, Pid ! {block, ReplyFun},

View File

@ -2751,6 +2751,51 @@ t_volatile_offload_mode(_Config) ->
end end
). ).
t_late_call_reply(_Config) ->
emqx_connector_demo:set_callback_mode(always_sync),
RequestTimeout = 500,
?assertMatch(
{ok, _},
emqx_resource:create(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource},
#{
buffer_mode => memory_only,
request_timeout => RequestTimeout,
query_mode => sync
}
)
),
?check_trace(
begin
%% Sleep for longer than the request timeout; the call reply will
%% have been already returned (a timeout), but the resource will
%% still send a message with the reply.
%% The demo connector will reply with `{error, timeout}' after 1 s.
SleepFor = RequestTimeout + 500,
?assertMatch(
{error, {resource_error, #{reason := timeout}}},
emqx_resource:query(
?ID,
{sync_sleep_before_reply, SleepFor},
#{timeout => RequestTimeout}
)
),
%% Our process shouldn't receive any late messages.
receive
LateReply ->
ct:fail("received late reply: ~p", [LateReply])
after SleepFor ->
ok
end,
ok
end,
[]
),
ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Helpers %% Helpers
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -0,0 +1,9 @@
Fixed an issue that could cause (otherwise harmless) noise in the logs.
During some particularly slow synchronous calls to bridges, some late replies could be sent to connections processes that were no longer expecting a reply, and then emit an error log like:
```
2023-04-19T18:24:35.350233+00:00 [error] msg: unexpected_info, mfa: emqx_channel:handle_info/2, line: 1278, peername: 172.22.0.1:36384, clientid: caribdis_bench_sub_1137967633_4788, info: {#Ref<0.408802983.1941504010.189402>,{ok,200,[{<<"cache-control">>,<<"max-age=0, ...">>}}
```
Those logs are harmless, but they could flood and worry the users without need.