Merge pull request #9180 from zmstone/1019-no-shared-message-redispatch-for-takeover
fix(shared): do not redispatch shared messages for certain shutdown
This commit is contained in:
commit
6bedc72bd2
|
@ -625,10 +625,11 @@ replay(ClientInfo, Session = #session{inflight = Inflight}) ->
|
|||
end.
|
||||
|
||||
-spec(terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok).
|
||||
terminate(ClientInfo, {shutdown, Reason}, Session) ->
|
||||
terminate(ClientInfo, Reason, Session);
|
||||
terminate(ClientInfo, Reason, Session) ->
|
||||
run_terminate_hooks(ClientInfo, Reason, Session),
|
||||
Reason =/= takeovered andalso
|
||||
redispatch_shared_messages(Session),
|
||||
maybe_redispatch_shared_messages(Reason, Session),
|
||||
ok.
|
||||
|
||||
run_terminate_hooks(ClientInfo, discarded, Session) ->
|
||||
|
@ -638,6 +639,13 @@ run_terminate_hooks(ClientInfo, takeovered, Session) ->
|
|||
run_terminate_hooks(ClientInfo, Reason, Session) ->
|
||||
run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
|
||||
|
||||
maybe_redispatch_shared_messages(takeovered, _Session) ->
|
||||
ok;
|
||||
maybe_redispatch_shared_messages(kicked, _Session) ->
|
||||
ok;
|
||||
maybe_redispatch_shared_messages(_Reason, Session) ->
|
||||
redispatch_shared_messages(Session).
|
||||
|
||||
redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) ->
|
||||
AllInflights = emqx_inflight:to_list(sort_fun(), Inflight),
|
||||
F = fun({_, {Msg, _Ts}}) ->
|
||||
|
|
|
@ -290,7 +290,7 @@ last_message(ExpectedPayload, Pids) ->
|
|||
last_message(ExpectedPayload, Pids, Timeout) ->
|
||||
receive
|
||||
{publish, #{client_pid := Pid, payload := ExpectedPayload}} ->
|
||||
ct:pal("last_message: ~p ====== ~p, payload=~p", [Pids, Pid, ExpectedPayload]),
|
||||
?assert(lists:member(Pid, Pids)),
|
||||
{true, Pid}
|
||||
after Timeout ->
|
||||
ct:pal("not yet"),
|
||||
|
@ -587,11 +587,16 @@ t_dispatch_qos2(Config) when is_list(Config) ->
|
|||
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
|
||||
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)),
|
||||
|
||||
%% assert client 2 receives two messages, they are eiter 1,3 or 2,4 depending
|
||||
%% on if it's picked as the first one for round_robin
|
||||
MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
|
||||
MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
|
||||
%% assert hello2 > hello1 or hello4 > hello3
|
||||
?assert(MsgRec2 > MsgRec1),
|
||||
|
||||
case MsgRec2 of
|
||||
<<"hello3">> ->
|
||||
?assertEqual(<<"hello1">>, MsgRec1);
|
||||
<<"hello4">> ->
|
||||
?assertEqual(<<"hello2">>, MsgRec1)
|
||||
end,
|
||||
sys:resume(ConnPid1),
|
||||
%% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false
|
||||
%% so it will never send PUBCOMP, hence EMQX should not attempt to send
|
||||
|
@ -604,8 +609,14 @@ t_dispatch_qos2(Config) when is_list(Config) ->
|
|||
kill_process(ConnPid1),
|
||||
%% client 2 should receive the message
|
||||
MsgRec4 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P4}}, P4),
|
||||
%% assert hello2 > hello1 or hello4 > hello3
|
||||
?assert(MsgRec4 > MsgRec3),
|
||||
case MsgRec2 of
|
||||
<<"hello3">> ->
|
||||
?assertEqual(<<"hello2">>, MsgRec3),
|
||||
?assertEqual(<<"hello4">>, MsgRec4);
|
||||
<<"hello4">> ->
|
||||
?assertEqual(<<"hello1">>, MsgRec3),
|
||||
?assertEqual(<<"hello3">>, MsgRec4)
|
||||
end,
|
||||
emqtt:stop(ConnPid2),
|
||||
ok.
|
||||
|
||||
|
@ -654,17 +665,128 @@ t_dispatch_qos0(Config) when is_list(Config) ->
|
|||
emqtt:stop(ConnPid2),
|
||||
ok.
|
||||
|
||||
t_session_takeover({init, Config}) when is_list(Config) ->
|
||||
Config;
|
||||
t_session_takeover({'end', Config}) when is_list(Config) ->
|
||||
ok;
|
||||
t_session_takeover(Config) when is_list(Config) ->
|
||||
Topic = <<"t1/a">>,
|
||||
ClientId = iolist_to_binary("c" ++ integer_to_list(erlang:system_time())),
|
||||
Opts = [{clientid, ClientId},
|
||||
{auto_ack, true},
|
||||
{proto_ver, v5},
|
||||
{clean_start, false},
|
||||
{properties, #{'Session-Expiry-Interval' => 60}}
|
||||
],
|
||||
{ok, ConnPid1} = emqtt:start_link(Opts),
|
||||
%% with the same client ID, start another client
|
||||
{ok, ConnPid2} = emqtt:start_link(Opts),
|
||||
{ok, _} = emqtt:connect(ConnPid1),
|
||||
emqtt:subscribe(ConnPid1, {<<"$share/t1/", Topic/binary>>, _QoS = 1}),
|
||||
Message1 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello1">>),
|
||||
Message2 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello2">>),
|
||||
Message3 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello3">>),
|
||||
Message4 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello4">>),
|
||||
%% Make sure client1 is functioning
|
||||
?assertMatch([_], emqx:publish(Message1)),
|
||||
{true, _} = last_message(<<"hello1">>, [ConnPid1]),
|
||||
%% Kill client1
|
||||
emqtt:stop(ConnPid1),
|
||||
%% publish another message (should end up in client1's session)
|
||||
?assertMatch([_], emqx:publish(Message2)),
|
||||
%% connect client2 (with the same clientid)
|
||||
{ok, _} = emqtt:connect(ConnPid2), %% should trigger session take over
|
||||
?assertMatch([_], emqx:publish(Message3)),
|
||||
?assertMatch([_], emqx:publish(Message4)),
|
||||
{true, _} = last_message(<<"hello2">>, [ConnPid2]),
|
||||
{true, _} = last_message(<<"hello3">>, [ConnPid2]),
|
||||
{true, _} = last_message(<<"hello4">>, [ConnPid2]),
|
||||
?assertEqual([], collect_msgs(timer:seconds(2))),
|
||||
emqtt:stop(ConnPid2),
|
||||
ok.
|
||||
|
||||
|
||||
t_session_kicked({init, Config}) when is_list(Config) ->
|
||||
meck:new(emqx_zone, [passthrough, no_history]),
|
||||
meck:expect(emqx_zone, max_inflight, fun(_Zone) -> 1 end),
|
||||
Config;
|
||||
t_session_kicked({'end', Config}) when is_list(Config) ->
|
||||
meck:unload(emqx_zone);
|
||||
t_session_kicked(Config) when is_list(Config) ->
|
||||
ok = ensure_config(round_robin, _AckEnabled = false),
|
||||
Topic = <<"foo/bar/1">>,
|
||||
ClientId1 = <<"ClientId1">>,
|
||||
ClientId2 = <<"ClientId2">>,
|
||||
|
||||
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
|
||||
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]),
|
||||
{ok, _} = emqtt:connect(ConnPid1),
|
||||
{ok, _} = emqtt:connect(ConnPid2),
|
||||
|
||||
emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 2}),
|
||||
emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 2}),
|
||||
|
||||
Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
|
||||
Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
|
||||
Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
|
||||
Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
|
||||
ct:sleep(100),
|
||||
|
||||
ok = sys:suspend(ConnPid1),
|
||||
|
||||
%% One message is inflight
|
||||
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)),
|
||||
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)),
|
||||
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
|
||||
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)),
|
||||
|
||||
%% assert client 2 receives two messages, they are eiter 1,3 or 2,4 depending
|
||||
%% on if it's picked as the first one for round_robin
|
||||
MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
|
||||
MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
|
||||
case MsgRec2 of
|
||||
<<"hello3">> ->
|
||||
?assertEqual(<<"hello1">>, MsgRec1);
|
||||
<<"hello4">> ->
|
||||
?assertEqual(<<"hello2">>, MsgRec1)
|
||||
end,
|
||||
sys:resume(ConnPid1),
|
||||
%% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false
|
||||
%% so it will never send PUBCOMP, hence EMQX should not attempt to send
|
||||
%% the 4th message yet since max_inflight is 1.
|
||||
MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3),
|
||||
case MsgRec2 of
|
||||
<<"hello3">> ->
|
||||
?assertEqual(<<"hello2">>, MsgRec3);
|
||||
<<"hello4">> ->
|
||||
?assertEqual(<<"hello1">>, MsgRec3)
|
||||
end,
|
||||
%% no message expected
|
||||
?assertEqual([], collect_msgs(0)),
|
||||
%% now kick client 1
|
||||
kill_process(ConnPid1, fun(_Pid) -> emqx_cm:kick_session(ClientId1) end),
|
||||
%% client 2 should NOT receive the message
|
||||
?assertEqual([], collect_msgs(1000)),
|
||||
emqtt:stop(ConnPid2),
|
||||
?assertEqual([], collect_msgs(0)),
|
||||
ok.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% help functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
kill_process(Pid) ->
|
||||
kill_process(Pid, fun(_) -> erlang:exit(Pid, kill) end).
|
||||
|
||||
kill_process(Pid, WithFun) ->
|
||||
_ = unlink(Pid),
|
||||
_ = monitor(process, Pid),
|
||||
erlang:exit(Pid, kill),
|
||||
_ = WithFun(Pid),
|
||||
receive
|
||||
{'DOWN', _, process, Pid, _} ->
|
||||
ok
|
||||
after 10_000 ->
|
||||
error(timeout)
|
||||
end.
|
||||
|
||||
collect_msgs(Timeout) ->
|
||||
|
|
Loading…
Reference in New Issue