Merge remote-tracking branch 'emqx/release-v43' into 1020-sync-release-v43-back-to-main

This commit is contained in:
JimMoen 2022-10-20 10:23:08 +08:00
commit 8adddb018f
2 changed files with 139 additions and 9 deletions

View File

@ -625,10 +625,11 @@ replay(ClientInfo, Session = #session{inflight = Inflight}) ->
end.
-spec(terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok).
terminate(ClientInfo, {shutdown, Reason}, Session) ->
terminate(ClientInfo, Reason, Session);
terminate(ClientInfo, Reason, Session) ->
run_terminate_hooks(ClientInfo, Reason, Session),
Reason =/= takeovered andalso
redispatch_shared_messages(Session),
maybe_redispatch_shared_messages(Reason, Session),
ok.
run_terminate_hooks(ClientInfo, discarded, Session) ->
@ -638,6 +639,13 @@ run_terminate_hooks(ClientInfo, takeovered, Session) ->
run_terminate_hooks(ClientInfo, Reason, Session) ->
run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
maybe_redispatch_shared_messages(takeovered, _Session) ->
ok;
maybe_redispatch_shared_messages(kicked, _Session) ->
ok;
maybe_redispatch_shared_messages(_Reason, Session) ->
redispatch_shared_messages(Session).
redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) ->
AllInflights = emqx_inflight:to_list(sort_fun(), Inflight),
F = fun({_, {Msg, _Ts}}) ->

View File

@ -290,7 +290,7 @@ last_message(ExpectedPayload, Pids) ->
last_message(ExpectedPayload, Pids, Timeout) ->
receive
{publish, #{client_pid := Pid, payload := ExpectedPayload}} ->
ct:pal("last_message: ~p ====== ~p, payload=~p", [Pids, Pid, ExpectedPayload]),
?assert(lists:member(Pid, Pids)),
{true, Pid}
after Timeout ->
ct:pal("not yet"),
@ -587,11 +587,16 @@ t_dispatch_qos2(Config) when is_list(Config) ->
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)),
%% assert client 2 receives two messages, they are eiter 1,3 or 2,4 depending
%% on if it's picked as the first one for round_robin
MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
%% assert hello2 > hello1 or hello4 > hello3
?assert(MsgRec2 > MsgRec1),
case MsgRec2 of
<<"hello3">> ->
?assertEqual(<<"hello1">>, MsgRec1);
<<"hello4">> ->
?assertEqual(<<"hello2">>, MsgRec1)
end,
sys:resume(ConnPid1),
%% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false
%% so it will never send PUBCOMP, hence EMQX should not attempt to send
@ -604,8 +609,14 @@ t_dispatch_qos2(Config) when is_list(Config) ->
kill_process(ConnPid1),
%% client 2 should receive the message
MsgRec4 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P4}}, P4),
%% assert hello2 > hello1 or hello4 > hello3
?assert(MsgRec4 > MsgRec3),
case MsgRec2 of
<<"hello3">> ->
?assertEqual(<<"hello2">>, MsgRec3),
?assertEqual(<<"hello4">>, MsgRec4);
<<"hello4">> ->
?assertEqual(<<"hello1">>, MsgRec3),
?assertEqual(<<"hello3">>, MsgRec4)
end,
emqtt:stop(ConnPid2),
ok.
@ -654,17 +665,128 @@ t_dispatch_qos0(Config) when is_list(Config) ->
emqtt:stop(ConnPid2),
ok.
t_session_takeover({init, Config}) when is_list(Config) ->
Config;
t_session_takeover({'end', Config}) when is_list(Config) ->
ok;
t_session_takeover(Config) when is_list(Config) ->
Topic = <<"t1/a">>,
ClientId = iolist_to_binary("c" ++ integer_to_list(erlang:system_time())),
Opts = [{clientid, ClientId},
{auto_ack, true},
{proto_ver, v5},
{clean_start, false},
{properties, #{'Session-Expiry-Interval' => 60}}
],
{ok, ConnPid1} = emqtt:start_link(Opts),
%% with the same client ID, start another client
{ok, ConnPid2} = emqtt:start_link(Opts),
{ok, _} = emqtt:connect(ConnPid1),
emqtt:subscribe(ConnPid1, {<<"$share/t1/", Topic/binary>>, _QoS = 1}),
Message1 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello1">>),
Message2 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello2">>),
Message3 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello3">>),
Message4 = emqx_message:make(<<"dummypub">>, 2, Topic, <<"hello4">>),
%% Make sure client1 is functioning
?assertMatch([_], emqx:publish(Message1)),
{true, _} = last_message(<<"hello1">>, [ConnPid1]),
%% Kill client1
emqtt:stop(ConnPid1),
%% publish another message (should end up in client1's session)
?assertMatch([_], emqx:publish(Message2)),
%% connect client2 (with the same clientid)
{ok, _} = emqtt:connect(ConnPid2), %% should trigger session take over
?assertMatch([_], emqx:publish(Message3)),
?assertMatch([_], emqx:publish(Message4)),
{true, _} = last_message(<<"hello2">>, [ConnPid2]),
{true, _} = last_message(<<"hello3">>, [ConnPid2]),
{true, _} = last_message(<<"hello4">>, [ConnPid2]),
?assertEqual([], collect_msgs(timer:seconds(2))),
emqtt:stop(ConnPid2),
ok.
t_session_kicked({init, Config}) when is_list(Config) ->
meck:new(emqx_zone, [passthrough, no_history]),
meck:expect(emqx_zone, max_inflight, fun(_Zone) -> 1 end),
Config;
t_session_kicked({'end', Config}) when is_list(Config) ->
meck:unload(emqx_zone);
t_session_kicked(Config) when is_list(Config) ->
ok = ensure_config(round_robin, _AckEnabled = false),
Topic = <<"foo/bar/1">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]),
{ok, _} = emqtt:connect(ConnPid1),
{ok, _} = emqtt:connect(ConnPid2),
emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 2}),
emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 2}),
Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
ct:sleep(100),
ok = sys:suspend(ConnPid1),
%% One message is inflight
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)),
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)),
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)),
%% assert client 2 receives two messages, they are eiter 1,3 or 2,4 depending
%% on if it's picked as the first one for round_robin
MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
case MsgRec2 of
<<"hello3">> ->
?assertEqual(<<"hello1">>, MsgRec1);
<<"hello4">> ->
?assertEqual(<<"hello2">>, MsgRec1)
end,
sys:resume(ConnPid1),
%% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false
%% so it will never send PUBCOMP, hence EMQX should not attempt to send
%% the 4th message yet since max_inflight is 1.
MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3),
case MsgRec2 of
<<"hello3">> ->
?assertEqual(<<"hello2">>, MsgRec3);
<<"hello4">> ->
?assertEqual(<<"hello1">>, MsgRec3)
end,
%% no message expected
?assertEqual([], collect_msgs(0)),
%% now kick client 1
kill_process(ConnPid1, fun(_Pid) -> emqx_cm:kick_session(ClientId1) end),
%% client 2 should NOT receive the message
?assertEqual([], collect_msgs(1000)),
emqtt:stop(ConnPid2),
?assertEqual([], collect_msgs(0)),
ok.
%%--------------------------------------------------------------------
%% help functions
%%--------------------------------------------------------------------
kill_process(Pid) ->
kill_process(Pid, fun(_) -> erlang:exit(Pid, kill) end).
kill_process(Pid, WithFun) ->
_ = unlink(Pid),
_ = monitor(process, Pid),
erlang:exit(Pid, kill),
_ = WithFun(Pid),
receive
{'DOWN', _, process, Pid, _} ->
ok
after 10_000 ->
error(timeout)
end.
collect_msgs(Timeout) ->