From e3bc8e4f0ace2066455a51dcb448f33aae2051df Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 22 Feb 2019 09:07:50 +0800 Subject: [PATCH] Support batch delivery - Upgrade the emqx_session module to support batch delivery - Update emqx_protocol:deliver/2 to support batch delivery - Update some test cases --- src/emqx_protocol.erl | 12 +- src/emqx_session.erl | 309 +++++++++++++++++++++------------ test/emqx_session_SUITE.erl | 5 +- test/emqx_shared_sub_SUITE.erl | 10 +- 4 files changed, 219 insertions(+), 117 deletions(-) diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 2ee5ff7fc..a75fc25bf 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -586,7 +586,17 @@ puback(?QOS_2, PacketId, {error, ReasonCode}, PState) -> %% Deliver Packet -> Client %%------------------------------------------------------------------------------ --spec(deliver(tuple(), state()) -> {ok, state()} | {error, term()}). +-spec(deliver(list(tuple()) | tuple(), state()) -> {ok, state()} | {error, term()}). +deliver([], PState) -> + {ok, PState}; +deliver([Pub|More], PState) -> + case deliver(Pub, PState) of + {ok, PState1} -> + deliver(More, PState1); + {error, _} = Error -> + Error + end; + deliver({connack, ReasonCode}, PState) -> send(?CONNACK_PACKET(ReasonCode), PState); diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 388e78032..04cae7f4f 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -71,8 +71,11 @@ %% Clean Start Flag clean_start = false :: boolean(), - %% Client Binding: local | remote - binding = local :: local | remote, + %% Conn Binding: local | remote + %% binding = local :: local | remote, + + %% Deliver fun + deliver_fun :: function(), %% ClientId: Identifier of Session client_id :: binary(), @@ -157,6 +160,8 @@ -export_type([attr/0]). +-define(DEFAULT_BATCH_N, 1000). + %% @doc Start a session proc. -spec(start_link(SessAttrs :: map()) -> {ok, pid()}). start_link(SessAttrs) -> @@ -196,13 +201,13 @@ attrs(SPid) when is_pid(SPid) -> gen_server:call(SPid, attrs, infinity); attrs(#state{clean_start = CleanStart, - binding = Binding, client_id = ClientId, + conn_pid = ConnPid, username = Username, expiry_interval = ExpiryInterval, created_at = CreatedAt}) -> [{clean_start, CleanStart}, - {binding, Binding}, + {binding, binding(ConnPid)}, {client_id, ClientId}, {username, Username}, {expiry_interval, ExpiryInterval div 1000}, @@ -342,7 +347,7 @@ init([Parent, #{zone := Zone, IdleTimout = get_env(Zone, idle_timeout, 30000), State = #state{idle_timeout = IdleTimout, clean_start = CleanStart, - binding = binding(ConnPid), + deliver_fun = deliver_fun(ConnPid), client_id = ClientId, username = Username, conn_pid = ConnPid, @@ -376,9 +381,18 @@ init_mqueue(Zone) -> default_priority => get_env(Zone, mqueue_default_priority) }). +binding(undefined) -> undefined; binding(ConnPid) -> case node(ConnPid) =:= node() of true -> local; false -> remote end. +deliver_fun(ConnPid) when node(ConnPid) == node() -> + fun(Packet) -> ConnPid ! {deliver, Packet}, ok end; +deliver_fun(ConnPid) -> + Node = node(ConnPid), + fun(Packet) -> + emqx_rpc:cast(Node, erlang, send, [ConnPid, {deliver, Packet}]) + end. + handle_call(info, _From, State) -> reply(info(State), State); @@ -539,7 +553,7 @@ handle_cast({resume, #{conn_pid := ConnPid, true = link(ConnPid), State1 = State#state{conn_pid = ConnPid, - binding = binding(ConnPid), + deliver_fun = deliver_fun(ConnPid), old_conn_pid = OldConnPid, clean_start = false, retry_timer = undefined, @@ -566,25 +580,11 @@ handle_cast(Msg, State) -> emqx_logger:error("[Session] unexpected cast: ~p", [Msg]), {noreply, State}. -%% Batch dispatch -handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) -> - noreply(lists:foldl( - fun(Msg, St) -> - element(2, handle_info({dispatch, Topic, Msg}, St)) - end, State, Msgs)); +handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, message) -> + handle_dispatch([{Topic, Msg}], State); -%% Dispatch message -handle_info({dispatch, Topic, Msg = #message{}}, State) -> - case emqx_shared_sub:is_ack_required(Msg) andalso not has_connection(State) of - true -> - %% Require ack, but we do not have connection - %% negative ack the message so it can try the next subscriber in the group - ok = emqx_shared_sub:nack_no_connection(Msg), - {noreply, State}; - false -> - NewState = handle_dispatch(Topic, Msg, State), - noreply(ensure_stats_timer(maybe_gc({1, msg_size(Msg)}, NewState))) - end; +handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) -> + handle_dispatch([{Topic, Msg} || Msg <- Msgs], State); %% Do nothing if the client has been disconnected. handle_info({timeout, Timer, retry_delivery}, State = #state{conn_pid = undefined, retry_timer = Timer}) -> @@ -684,18 +684,11 @@ maybe_shutdown(Pid, Reason) -> %% Internal functions %%------------------------------------------------------------------------------ -has_connection(#state{conn_pid = Pid}) -> +is_connection_alive(#state{conn_pid = Pid}) -> is_pid(Pid) andalso is_process_alive(Pid). -handle_dispatch(Topic, Msg, State = #state{subscriptions = SubMap}) -> - case maps:find(Topic, SubMap) of - {ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} -> - run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}, {subid, SubId}], Msg, State); - {ok, #{nl := Nl, qos := QoS, rap := Rap}} -> - run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}], Msg, State); - error -> - dispatch(emqx_message:unset_flag(dup, Msg), State) - end. +%%------------------------------------------------------------------------------ +%% Suback and unsuback suback(_From, undefined, _ReasonCodes) -> ignore; @@ -722,7 +715,6 @@ kick(ClientId, OldConnPid, ConnPid) -> %%------------------------------------------------------------------------------ %% Replay or Retry Delivery -%%------------------------------------------------------------------------------ %% Redeliver at once if force is true retry_delivery(Force, State = #state{inflight = Inflight}) -> @@ -766,6 +758,7 @@ retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now, %%------------------------------------------------------------------------------ %% Send Will Message %%------------------------------------------------------------------------------ + send_willmsg(undefined) -> ignore; send_willmsg(WillMsg) -> @@ -801,64 +794,156 @@ expire_awaiting_rel([{PacketId, Ts} | More], Now, is_awaiting_full(#state{max_awaiting_rel = 0}) -> false; -is_awaiting_full(#state{awaiting_rel = AwaitingRel, max_awaiting_rel = MaxLen}) -> +is_awaiting_full(#state{awaiting_rel = AwaitingRel, + max_awaiting_rel = MaxLen}) -> maps:size(AwaitingRel) >= MaxLen. %%------------------------------------------------------------------------------ -%% Dispatch Messages +%% Dispatch messages %%------------------------------------------------------------------------------ -run_dispatch_steps([], Msg, State) -> - dispatch(Msg, State); -run_dispatch_steps([{nl, 1}|_Steps], #message{from = ClientId}, State = #state{client_id = ClientId}) -> - State; -run_dispatch_steps([{nl, _}|Steps], Msg, State) -> - run_dispatch_steps(Steps, Msg, State); -run_dispatch_steps([{qos, SubQoS}|Steps], Msg0 = #message{qos = PubQoS}, State = #state{upgrade_qos = false}) -> - %% Ack immediately if a shared dispatch QoS is downgraded to 0 - Msg = case SubQoS =:= ?QOS_0 of - true -> emqx_shared_sub:maybe_ack(Msg0); - false -> Msg0 - end, - run_dispatch_steps(Steps, Msg#message{qos = min(SubQoS, PubQoS)}, State); -run_dispatch_steps([{qos, SubQoS}|Steps], Msg = #message{qos = PubQoS}, State = #state{upgrade_qos = true}) -> - run_dispatch_steps(Steps, Msg#message{qos = max(SubQoS, PubQoS)}, State); -run_dispatch_steps([{rap, _Rap}|Steps], Msg = #message{flags = Flags, headers = #{retained := true}}, State = #state{}) -> - run_dispatch_steps(Steps, Msg#message{flags = maps:put(retain, true, Flags)}, State); -run_dispatch_steps([{rap, 0}|Steps], Msg = #message{flags = Flags}, State = #state{}) -> - run_dispatch_steps(Steps, Msg#message{flags = maps:put(retain, false, Flags)}, State); -run_dispatch_steps([{rap, _}|Steps], Msg, State) -> - run_dispatch_steps(Steps, Msg, State); -run_dispatch_steps([{subid, SubId}|Steps], Msg, State) -> - run_dispatch_steps(Steps, emqx_message:set_header('Subscription-Identifier', SubId, Msg), State). +handle_dispatch(Msgs, State = #state{inflight = Inflight, subscriptions = SubMap}) -> + %% Drain the mailbox and batch deliver + Msgs1 = drain_m(batch_n(Inflight), Msgs), + %% Ack the messages for shared subscription + Msgs2 = maybe_ack_shared(Msgs1, State), + %% Process suboptions + Msgs3 = lists:foldr( + fun({Topic, Msg}, Acc) -> + SubOpts = find_subopts(Topic, SubMap), + case process_subopts(SubOpts, Msg, State) of + {ok, Msg1} -> [Msg1|Acc]; + ignore -> Acc + end + end, [], Msgs2), + NState = batch_process(Msgs3, State), + noreply(ensure_stats_timer(NState)). + +batch_n(Inflight) -> + case emqx_inflight:max_size(Inflight) of + 0 -> ?DEFAULT_BATCH_N; + Sz -> Sz - emqx_inflight:size(Inflight) + end. + +drain_m(Cnt, Msgs) when Cnt =< 0 -> + lists:reverse(Msgs); +drain_m(Cnt, Msgs) -> + receive + {dispatch, Topic, Msg} -> + drain_m(Cnt-1, [{Topic, Msg}|Msgs]) + after 0 -> + lists:reverse(Msgs) + end. + +%% Ack or nack the messages of shared subscription? +maybe_ack_shared(Msgs, State) when is_list(Msgs) -> + lists:foldr( + fun({Topic, Msg}, Acc) -> + case maybe_ack_shared(Msg, State) of + ok -> Acc; + Msg1 -> [{Topic, Msg1}|Acc] + end + end, [], Msgs); + +maybe_ack_shared(Msg, State) -> + case emqx_shared_sub:is_ack_required(Msg) of + true -> do_ack_shared(Msg, State); + false -> Msg + end. + +do_ack_shared(Msg, State = #state{inflight = Inflight}) -> + case {is_connection_alive(State), + emqx_inflight:is_full(Inflight)} of + {false, _} -> + %% Require ack, but we do not have connection + %% negative ack the message so it can try the next subscriber in the group + emqx_shared_sub:nack_no_connection(Msg); + {_, true} -> + emqx_shared_sub:maybe_nack_dropped(Msg); + _ -> + %% Ack QoS1/QoS2 messages when message is delivered to connection. + %% NOTE: NOT to wait for PUBACK because: + %% The sender is monitoring this session process, + %% if the message is delivered to client but connection or session crashes, + %% sender will try to dispatch the message to the next shared subscriber. + %% This violates spec as QoS2 messages are not allowed to be sent to more + %% than one member in the group. + emqx_shared_sub:maybe_ack(Msg) + end. + +process_subopts([], Msg, _State) -> + {ok, Msg}; +process_subopts([{nl, 1}|_Opts], #message{from = ClientId}, #state{client_id = ClientId}) -> + ignore; +process_subopts([{nl, _}|Opts], Msg, State) -> + process_subopts(Opts, Msg, State); +process_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, State = #state{upgrade_qos = false}) -> + process_subopts(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, State); +process_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, State = #state{upgrade_qos = true}) -> + process_subopts(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, State); +process_subopts([{rap, _Rap}|Opts], Msg = #message{flags = Flags, headers = #{retained := true}}, State = #state{}) -> + process_subopts(Opts, Msg#message{flags = maps:put(retain, true, Flags)}, State); +process_subopts([{rap, 0}|Opts], Msg = #message{flags = Flags}, State = #state{}) -> + process_subopts(Opts, Msg#message{flags = maps:put(retain, false, Flags)}, State); +process_subopts([{rap, _}|Opts], Msg, State) -> + process_subopts(Opts, Msg, State); +process_subopts([{subid, SubId}|Opts], Msg, State) -> + process_subopts(Opts, emqx_message:set_header('Subscription-Identifier', SubId, Msg), State). + +find_subopts(Topic, SubMap) -> + case maps:find(Topic, SubMap) of + {ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} -> + [{nl, Nl}, {qos, QoS}, {rap, Rap}, {subid, SubId}]; + {ok, #{nl := Nl, qos := QoS, rap := Rap}} -> + [{nl, Nl}, {qos, QoS}, {rap, Rap}]; + error -> [] + end. + +batch_process(Msgs, State) -> + {ok, Publishes, NState} = process_msgs(Msgs, [], State), + ok = batch_deliver(Publishes, NState), + maybe_gc(msg_cnt(Msgs), NState). + +process_msgs([], Publishes, State) -> + {ok, lists:reverse(Publishes), State}; + +process_msgs([Msg|Msgs], Publishes, State) -> + case process_msg(Msg, State) of + {ok, Publish, NState} -> + process_msgs(Msgs, [Publish|Publishes], NState); + {ignore, NState} -> + process_msgs(Msgs, Publishes, NState) + end. %% Enqueue message if the client has been disconnected -dispatch(Msg, State = #state{client_id = ClientId, username = Username, conn_pid = undefined}) -> - case emqx_hooks:run('message.dropped', [#{client_id => ClientId, username => Username}, Msg]) of - ok -> enqueue_msg(Msg, State); - stop -> State - end; +process_msg(Msg, State = #state{conn_pid = undefined}) -> + {ignore, enqueue_msg(Msg, State)}; -%% Deliver qos0 message directly to client -dispatch(Msg = #message{qos = ?QOS_0} = Msg, State) -> - ok = deliver(undefined, Msg, State), - State; +%% Prepare the qos0 message delivery +process_msg(Msg = #message{qos = ?QOS_0}, State) -> + {ok, {publish, undefined, Msg}, State}; -dispatch(Msg = #message{qos = QoS} = Msg, - State = #state{next_pkt_id = PacketId, inflight = Inflight}) +process_msg(Msg = #message{qos = QoS}, + State = #state{next_pkt_id = PacketId, inflight = Inflight}) when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> case emqx_inflight:is_full(Inflight) of true -> - enqueue_msg(Msg, State); + {ignore, enqueue_msg(Msg, State)}; false -> - ok = deliver(PacketId, Msg, State), - await(PacketId, Msg, next_pkt_id(State)) + Publish = {publish, PacketId, Msg}, + NState = await(PacketId, Msg, State), + {ok, Publish, next_pkt_id(NState)} end. -enqueue_msg(Msg, State = #state{mqueue = Q}) -> +enqueue_msg(Msg, State = #state{mqueue = Q, client_id = ClientId, username = Username}) -> emqx_pd:update_counter(enqueue_stats, 1), {Dropped, NewQ} = emqx_mqueue:in(Msg, Q), - Dropped =/= undefined andalso emqx_shared_sub:maybe_nack_dropped(Dropped), + if + Dropped =/= undefined -> + SessProps = #{client_id => ClientId, username => Username}, + emqx_hooks:run('message.dropped', [SessProps, Msg]); + true -> ok + end, State#state{mqueue = NewQ}. %%------------------------------------------------------------------------------ @@ -866,28 +951,22 @@ enqueue_msg(Msg, State = #state{mqueue = Q}) -> %%------------------------------------------------------------------------------ redeliver({PacketId, Msg = #message{qos = QoS}}, State) -> - deliver(PacketId, if QoS =:= ?QOS_2 -> Msg; - true -> emqx_message:set_flag(dup, Msg) - end, State); + Msg1 = if + QoS =:= ?QOS_2 -> Msg; + true -> emqx_message:set_flag(dup, Msg) + end, + do_deliver(PacketId, Msg1, State); -redeliver({pubrel, PacketId}, #state{conn_pid = ConnPid}) -> - ConnPid ! {deliver, {pubrel, PacketId}}. +redeliver({pubrel, PacketId}, #state{deliver_fun = DeliverFun}) -> + DeliverFun({pubrel, PacketId}). -deliver(PacketId, Msg, State) -> +do_deliver(PacketId, Msg, #state{deliver_fun = DeliverFun}) -> emqx_pd:update_counter(deliver_stats, 1), - %% Ack QoS1/QoS2 messages when message is delivered to connection. - %% NOTE: NOT to wait for PUBACK because: - %% The sender is monitoring this session process, - %% if the message is delivered to client but connection or session crashes, - %% sender will try to dispatch the message to the next shared subscriber. - %% This violates spec as QoS2 messages are not allowed to be sent to more - %% than one member in the group. - do_deliver(PacketId, emqx_shared_sub:maybe_ack(Msg), State). + DeliverFun({publish, PacketId, Msg}). -do_deliver(PacketId, Msg, #state{conn_pid = ConnPid, binding = local}) -> - ConnPid ! {deliver, {publish, PacketId, Msg}}, ok; -do_deliver(PacketId, Msg, #state{conn_pid = ConnPid, binding = remote}) -> - emqx_rpc:cast(node(ConnPid), erlang, send, [ConnPid, {deliver, {publish, PacketId, Msg}}]). +batch_deliver(Publishes, #state{deliver_fun = DeliverFun}) -> + emqx_pd:update_counter(deliver_stats, length(Publishes)), + DeliverFun(Publishes). %%------------------------------------------------------------------------------ %% Awaiting ACK for QoS1/QoS2 Messages @@ -932,24 +1011,31 @@ acked(pubcomp, PacketId, State = #state{inflight = Inflight}) -> dequeue(State = #state{conn_pid = undefined}) -> State; -dequeue(State = #state{inflight = Inflight}) -> - case emqx_inflight:is_full(Inflight) of - true -> State; - false -> dequeue2(State) +dequeue(State = #state{inflight = Inflight, mqueue = Q}) -> + case emqx_mqueue:is_empty(Q) + orelse emqx_inflight:is_full(Inflight) of + true -> State; + false -> + {Msgs, Q1} = drain_q(batch_n(Inflight), [], Q), + batch_process(lists:reverse(Msgs), State#state{mqueue = Q1}) end. -dequeue2(State = #state{mqueue = Q}) -> +drain_q(Cnt, Msgs, Q) when Cnt =< 0 -> + {Msgs, Q}; + +drain_q(Cnt, Msgs, Q) -> case emqx_mqueue:out(Q) of - {empty, _Q} -> State; + {empty, _Q} -> {Msgs, Q}; {{value, Msg}, Q1} -> - %% Dequeue more - dequeue(dispatch(Msg, State#state{mqueue = Q1})) + io:format("Drain Msg: ~p~n", [Msg]), + drain_q(Cnt-1, [Msg|Msgs], Q1) end. %%------------------------------------------------------------------------------ %% Ensure timers -ensure_await_rel_timer(State = #state{await_rel_timer = undefined, await_rel_timeout = Timeout}) -> +ensure_await_rel_timer(State = #state{await_rel_timer = undefined, + await_rel_timeout = Timeout}) -> ensure_await_rel_timer(Timeout, State); ensure_await_rel_timer(State) -> State. @@ -959,7 +1045,8 @@ ensure_await_rel_timer(Timeout, State = #state{await_rel_timer = undefined}) -> ensure_await_rel_timer(_Timeout, State) -> State. -ensure_retry_timer(State = #state{retry_timer = undefined, retry_interval = Interval}) -> +ensure_retry_timer(State = #state{retry_timer = undefined, + retry_interval = Interval}) -> ensure_retry_timer(Interval, State); ensure_retry_timer(State) -> State. @@ -969,7 +1056,8 @@ ensure_retry_timer(Interval, State = #state{retry_timer = undefined}) -> ensure_retry_timer(_Timeout, State) -> State. -ensure_expire_timer(State = #state{expiry_interval = Interval}) when Interval > 0 andalso Interval =/= 16#ffffffff -> +ensure_expire_timer(State = #state{expiry_interval = Interval}) + when Interval > 0 andalso Interval =/= 16#ffffffff -> State#state{expiry_timer = emqx_misc:start_timer(Interval * 1000, expired)}; ensure_expire_timer(State) -> State. @@ -996,15 +1084,20 @@ next_pkt_id(State = #state{next_pkt_id = 16#FFFF}) -> next_pkt_id(State = #state{next_pkt_id = Id}) -> State#state{next_pkt_id = Id + 1}. +%%------------------------------------------------------------------------------ +%% Maybe GC + +msg_cnt(Msgs) -> + lists:foldl(fun(Msg, {Cnt, Oct}) -> + {Cnt+1, Oct+msg_size(Msg)} + end, {0, 0}, Msgs). + %% Take only the payload size into account, add other fields if necessary msg_size(#message{payload = Payload}) -> payload_size(Payload). %% Payload should be binary(), but not 100% sure. Need dialyzer! payload_size(Payload) -> erlang:iolist_size(Payload). -%%------------------------------------------------------------------------------ -%% Maybe GC - maybe_gc(_, State = #state{gc_state = undefined}) -> State; maybe_gc({Cnt, Oct}, State = #state{gc_state = GCSt}) -> diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index 54b2e8579..37ce34be9 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -1,4 +1,3 @@ - %% Copyright (c) 2013-2019 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); @@ -45,7 +44,7 @@ ignore_loop(_Config) -> application:set_env(emqx, mqtt_ignore_loop_deliver, false). t_session_all(_) -> - emqx_zone:set_env(internal, idle_timeout, 100), + emqx_zone:set_env(internal, idle_timeout, 1000), ClientId = <<"ClientId">>, {ok, ConnPid} = emqx_mock_client:start_link(ClientId), {ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal), @@ -56,7 +55,7 @@ t_session_all(_) -> [{<<"topic">>, _}] = emqx:subscriptions(SPid), emqx_session:publish(SPid, 1, Message1), timer:sleep(200), - {publish, 1, _} = emqx_mock_client:get_last_message(ConnPid), + [{publish, 1, _}] = emqx_mock_client:get_last_message(ConnPid), Attrs = emqx_session:attrs(SPid), Info = emqx_session:info(SPid), Stats = emqx_session:stats(SPid), diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 1ee059812..1fd7bef9b 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -59,7 +59,7 @@ t_random_basic(_) -> PacketId = 1, emqx_session:publish(SPid, PacketId, Message1), ?wait(case emqx_mock_client:get_last_message(ConnPid) of - {publish, 1, _} -> true; + [{publish, 1, _}] -> true; Other -> Other end, 1000), emqx_session:pubrec(SPid, PacketId, reasoncode), @@ -105,7 +105,7 @@ t_no_connection_nack(_) -> fun(PacketId, ConnPid) -> Payload = MkPayload(PacketId), case emqx_mock_client:get_last_message(ConnPid) of - {publish, _, #message{payload = Payload}} -> + [{publish, _, #message{payload = Payload}}] -> CasePid ! {Ref, PacketId, ConnPid}, true; _Other -> @@ -176,7 +176,7 @@ t_not_so_sticky(_) -> ?wait(subscribed(<<"group1">>, <<"foo/bar">>, SPid1), 1000), emqx_session:publish(SPid1, 1, Message1), ?wait(case emqx_mock_client:get_last_message(ConnPid1) of - {publish, _, #message{payload = <<"hello1">>}} -> true; + [{publish, _, #message{payload = <<"hello1">>}}] -> true; Other -> Other end, 1000), emqx_mock_client:close_session(ConnPid1), @@ -185,7 +185,7 @@ t_not_so_sticky(_) -> ?wait(subscribed(<<"group1">>, <<"foo/#">>, SPid2), 1000), emqx_session:publish(SPid2, 2, Message2), ?wait(case emqx_mock_client:get_last_message(ConnPid2) of - {publish, _, #message{payload = <<"hello2">>}} -> true; + [{publish, _, #message{payload = <<"hello2">>}}] -> true; Other -> Other end, 1000), emqx_mock_client:close_session(ConnPid2), @@ -240,7 +240,7 @@ test_two_messages(Strategy, WithAck) -> last_message(_ExpectedPayload, []) -> <<"not yet?">>; last_message(ExpectedPayload, [Pid | Pids]) -> case emqx_mock_client:get_last_message(Pid) of - {publish, _, #message{payload = ExpectedPayload}} -> {true, Pid}; + [{publish, _, #message{payload = ExpectedPayload}}] -> {true, Pid}; _Other -> last_message(ExpectedPayload, Pids) end.