fix(shared_subs): rename redispatch function name and simply test case

This commit is contained in:
firest 2022-10-10 18:06:55 +08:00
parent f0769cb765
commit b4eb0f18f9
2 changed files with 13 additions and 9 deletions

View File

@ -47,6 +47,7 @@
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-include("logger.hrl"). -include("logger.hrl").
-include("types.hrl"). -include("types.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-ifdef(TEST). -ifdef(TEST).
-compile(export_all). -compile(export_all).
@ -802,7 +803,7 @@ replay(ClientInfo, Session = #session{inflight = Inflight}) ->
-spec terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok. -spec terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok.
terminate(ClientInfo, Reason, Session) -> terminate(ClientInfo, Reason, Session) ->
run_terminate_hooks(ClientInfo, Reason, Session), run_terminate_hooks(ClientInfo, Reason, Session),
redispatch_shared_messages(Reason, Session), maybe_redispatch_shared_messages(Reason, Session),
ok. ok.
run_terminate_hooks(ClientInfo, discarded, Session) -> run_terminate_hooks(ClientInfo, discarded, Session) ->
@ -812,11 +813,13 @@ run_terminate_hooks(ClientInfo, takenover, Session) ->
run_terminate_hooks(ClientInfo, Reason, Session) -> run_terminate_hooks(ClientInfo, Reason, Session) ->
run_hook('session.terminated', [ClientInfo, Reason, info(Session)]). run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
redispatch_shared_messages(takenover, _Session) -> maybe_redispatch_shared_messages(takenover, _Session) ->
?tp(debug, ignore_redispatch_shared_messages, #{reason => takenover}),
ok; ok;
redispatch_shared_messages(kicked, _Session) -> maybe_redispatch_shared_messages(kicked, _Session) ->
?tp(debug, ignore_redispatch_shared_messages, #{reason => kicked}),
ok; ok;
redispatch_shared_messages(_Reason, #session{inflight = Inflight, mqueue = Q}) -> maybe_redispatch_shared_messages(_Reason, #session{inflight = Inflight, mqueue = Q}) ->
AllInflights = emqx_inflight:to_list(fun sort_fun/2, Inflight), AllInflights = emqx_inflight:to_list(fun sort_fun/2, Inflight),
F = fun F = fun
({_PacketId, #inflight_data{message = #message{qos = ?QOS_1} = Msg}}) -> ({_PacketId, #inflight_data{message = #message{qos = ?QOS_1} = Msg}}) ->

View File

@ -22,6 +22,7 @@
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(SUITE, ?MODULE). -define(SUITE, ?MODULE).
@ -869,23 +870,23 @@ t_redispatch_when_kicked(_) ->
Subs = [erlang:spawn(fun() -> MkSub(ClientId) end) || ClientId <- [ClientId1, ClientId2]], Subs = [erlang:spawn(fun() -> MkSub(ClientId) end) || ClientId <- [ClientId1, ClientId2]],
timer:sleep(500), timer:sleep(500),
ok = snabbkaffe:start_trace(),
Message = emqx_message:make(ClientId1, 1, Topic, <<"hello1">>), Message = emqx_message:make(ClientId1, 1, Topic, <<"hello1">>),
emqx:publish(Message), emqx:publish(Message),
[Sub ! wait || Sub <- Subs], [Sub ! wait || Sub <- Subs],
timer:sleep(200),
Res = Receive(), Res = Receive(),
?assertMatch({got, _}, Res), ?assertMatch({got, _}, Res),
{got, ClientId} = Res, {got, ClientId} = Res,
emqx_cm:kick_session(ClientId), emqx_cm:kick_session(ClientId),
[Sub ! wait || Sub <- Subs], Trace = snabbkaffe:collect_trace(500),
timer:sleep(200), ?assertMatch([#{reason := kicked}], ?of_kind(ignore_redispatch_shared_messages, Trace)),
Res1 = Receive(),
?assertMatch(nothing, Res1),
[Sub ! stop || Sub <- Subs], [Sub ! stop || Sub <- Subs],
snabbkaffe:stop(),
ok. ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------