Merge pull request #10455 from thalesmg/fix-late-gen-server-replies-buf-worker-v50
fix(buffer_worker): avoid sending late reply messages to callers
This commit is contained in:
commit
3f18c5e2e3
|
@ -1,7 +1,7 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{application, emqx_resource, [
|
||||
{description, "Manager for all external resources"},
|
||||
{vsn, "0.1.13"},
|
||||
{vsn, "0.1.14"},
|
||||
{registered, []},
|
||||
{mod, {emqx_resource_app, []}},
|
||||
{applications, [
|
||||
|
|
|
@ -52,7 +52,7 @@
|
|||
|
||||
-export([queue_item_marshaller/1, estimate_size/1]).
|
||||
|
||||
-export([handle_async_reply/2, handle_async_batch_reply/2]).
|
||||
-export([handle_async_reply/2, handle_async_batch_reply/2, reply_call/2]).
|
||||
|
||||
-export([clear_disk_queue_dir/2]).
|
||||
|
||||
|
@ -293,10 +293,8 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
|
||||
pick_call(Id, Key, Query, Timeout) ->
|
||||
?PICK(Id, Key, Pid, begin
|
||||
Caller = self(),
|
||||
MRef = erlang:monitor(process, Pid, [{alias, reply_demonitor}]),
|
||||
From = {Caller, MRef},
|
||||
ReplyTo = {fun gen_statem:reply/2, [From]},
|
||||
ReplyTo = {fun ?MODULE:reply_call/2, [MRef]},
|
||||
erlang:send(Pid, ?SEND_REQ(ReplyTo, Query)),
|
||||
receive
|
||||
{MRef, Response} ->
|
||||
|
@ -1703,6 +1701,17 @@ default_resume_interval(_RequestTimeout = infinity, HealthCheckInterval) ->
|
|||
default_resume_interval(RequestTimeout, HealthCheckInterval) ->
|
||||
max(1, min(HealthCheckInterval, RequestTimeout div 3)).
|
||||
|
||||
-spec reply_call(reference(), term()) -> ok.
|
||||
reply_call(Alias, Response) ->
|
||||
%% Since we use a reference created with `{alias,
|
||||
%% reply_demonitor}', after we `demonitor' it in case of a
|
||||
%% timeout, we won't send any more messages that the caller is not
|
||||
%% expecting anymore. Using `gen_statem:reply({pid(),
|
||||
%% reference()}, _)' would still send a late reply even after the
|
||||
%% demonitor.
|
||||
erlang:send(Alias, {Alias, Response}),
|
||||
ok.
|
||||
|
||||
-ifdef(TEST).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
adjust_batch_time_test_() ->
|
||||
|
|
|
@ -144,7 +144,11 @@ on_query(_InstId, {sleep_before_reply, For}, #{pid := Pid}) ->
|
|||
Result
|
||||
after 1000 ->
|
||||
{error, timeout}
|
||||
end.
|
||||
end;
|
||||
on_query(_InstId, {sync_sleep_before_reply, SleepFor}, _State) ->
|
||||
%% This simulates a slow sync call
|
||||
timer:sleep(SleepFor),
|
||||
{ok, slept}.
|
||||
|
||||
on_query_async(_InstId, block, ReplyFun, #{pid := Pid}) ->
|
||||
Pid ! {block, ReplyFun},
|
||||
|
|
|
@ -2751,6 +2751,51 @@ t_volatile_offload_mode(_Config) ->
|
|||
end
|
||||
).
|
||||
|
||||
t_late_call_reply(_Config) ->
|
||||
emqx_connector_demo:set_callback_mode(always_sync),
|
||||
RequestTimeout = 500,
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
emqx_resource:create(
|
||||
?ID,
|
||||
?DEFAULT_RESOURCE_GROUP,
|
||||
?TEST_RESOURCE,
|
||||
#{name => test_resource},
|
||||
#{
|
||||
buffer_mode => memory_only,
|
||||
request_timeout => RequestTimeout,
|
||||
query_mode => sync
|
||||
}
|
||||
)
|
||||
),
|
||||
?check_trace(
|
||||
begin
|
||||
%% Sleep for longer than the request timeout; the call reply will
|
||||
%% have been already returned (a timeout), but the resource will
|
||||
%% still send a message with the reply.
|
||||
%% The demo connector will reply with `{error, timeout}' after 1 s.
|
||||
SleepFor = RequestTimeout + 500,
|
||||
?assertMatch(
|
||||
{error, {resource_error, #{reason := timeout}}},
|
||||
emqx_resource:query(
|
||||
?ID,
|
||||
{sync_sleep_before_reply, SleepFor},
|
||||
#{timeout => RequestTimeout}
|
||||
)
|
||||
),
|
||||
%% Our process shouldn't receive any late messages.
|
||||
receive
|
||||
LateReply ->
|
||||
ct:fail("received late reply: ~p", [LateReply])
|
||||
after SleepFor ->
|
||||
ok
|
||||
end,
|
||||
ok
|
||||
end,
|
||||
[]
|
||||
),
|
||||
ok.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Helpers
|
||||
%%------------------------------------------------------------------------------
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
Fixed an issue that could cause (otherwise harmless) noise in the logs.
|
||||
|
||||
During some particularly slow synchronous calls to bridges, some late replies could be sent to connections processes that were no longer expecting a reply, and then emit an error log like:
|
||||
|
||||
```
|
||||
2023-04-19T18:24:35.350233+00:00 [error] msg: unexpected_info, mfa: emqx_channel:handle_info/2, line: 1278, peername: 172.22.0.1:36384, clientid: caribdis_bench_sub_1137967633_4788, info: {#Ref<0.408802983.1941504010.189402>,{ok,200,[{<<"cache-control">>,<<"max-age=0, ...">>}}
|
||||
```
|
||||
|
||||
Those logs are harmless, but they could flood and worry the users without need.
|
Loading…
Reference in New Issue