Compare commits

...

3 Commits

Author SHA1 Message Date
Georgy Sychev 1657bde1ed fix: typo 2022-05-03 13:15:30 +04:00
Georgy Sychev a661a26218 fix(shared): nack fix 2022-05-02 20:27:07 +04:00
Georgy Sychev 4d19420100 feat(shared): redispatch to another shared sub, when no ACK received 2022-04-28 17:01:08 +04:00
3 changed files with 144 additions and 96 deletions

View File

@ -561,10 +561,12 @@ deliver_msg(
end, end,
{ok, Session1}; {ok, Session1};
false -> false ->
%% Note that we publish message without shared ack header
%% But add to inflight with ack headers
Publish = {PacketId, maybe_ack(Msg)}, Publish = {PacketId, maybe_ack(Msg)},
Msg2 = mark_begin_deliver(Msg), Msg1 = with_ts(mark_begin_deliver(Msg)),
Session1 = await(PacketId, Msg2, Session), Inflight1 = emqx_inflight:insert(PacketId, Msg1, Inflight),
{ok, [Publish], next_pkt_id(Session1)} {ok, [Publish], next_pkt_id(Session#session{inflight = Inflight1})}
end. end.
-spec enqueue( -spec enqueue(
@ -625,14 +627,10 @@ enrich_deliver({deliver, Topic, Msg}, Session = #session{subscriptions = Subs})
enrich_subopts(get_subopts(Topic, Subs), Msg, Session). enrich_subopts(get_subopts(Topic, Subs), Msg, Session).
maybe_ack(Msg) -> maybe_ack(Msg) ->
case emqx_shared_sub:is_ack_required(Msg) of emqx_shared_sub:maybe_ack(Msg).
true -> emqx_shared_sub:maybe_ack(Msg);
false -> Msg
end.
maybe_nack(Msg) -> maybe_nack(Msg) ->
emqx_shared_sub:is_ack_required(Msg) andalso emqx_shared_sub:maybe_nack_dropped(Msg).
(ok == emqx_shared_sub:maybe_nack_dropped(Msg)).
get_subopts(Topic, SubMap) -> get_subopts(Topic, SubMap) ->
case maps:find(Topic, SubMap) of case maps:find(Topic, SubMap) of
@ -673,14 +671,6 @@ enrich_subopts([{subid, SubId} | Opts], Msg, Session) ->
Msg1 = emqx_message:set_header(properties, Props#{'Subscription-Identifier' => SubId}, Msg), Msg1 = emqx_message:set_header(properties, Props#{'Subscription-Identifier' => SubId}, Msg),
enrich_subopts(Opts, Msg1, Session). enrich_subopts(Opts, Msg1, Session).
%%--------------------------------------------------------------------
%% Awaiting ACK for QoS1/QoS2 Messages
%%--------------------------------------------------------------------
await(PacketId, Msg, Session = #session{inflight = Inflight}) ->
Inflight1 = emqx_inflight:insert(PacketId, with_ts(Msg), Inflight),
Session#session{inflight = Inflight1}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Retry Delivery %% Retry Delivery
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -808,13 +798,38 @@ replay(ClientInfo, Session = #session{inflight = Inflight}) ->
end. end.
-spec terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok. -spec terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok.
terminate(ClientInfo, discarded, Session) ->
run_hook('session.discarded', [ClientInfo, info(Session)]);
terminate(ClientInfo, takenover, Session) ->
run_hook('session.takenover', [ClientInfo, info(Session)]);
terminate(ClientInfo, Reason, Session) -> terminate(ClientInfo, Reason, Session) ->
run_terminate_hooks(ClientInfo, Reason, Session),
cleanup_self_from_shared_subs(),
redispatch_shared_messages(Session),
ok.
run_terminate_hooks(ClientInfo, discarded, Session) ->
run_hook('session.discarded', [ClientInfo, info(Session)]);
run_terminate_hooks(ClientInfo, takeovered, Session) ->
run_hook('session.takeovered', [ClientInfo, info(Session)]);
run_terminate_hooks(ClientInfo, Reason, Session) ->
run_hook('session.terminated', [ClientInfo, Reason, info(Session)]). run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
redispatch_shared_messages(#session{inflight = Inflight}) ->
InflightList = emqx_inflight:to_list(Inflight),
lists:map(
fun({_, {#message{topic = Topic} = Msg, _}}) ->
case emqx_shared_sub:get_group(Msg) of
{ok, Group} ->
Delivery = #delivery{sender = self(), message = Msg},
emqx_shared_sub:dispatch(Group, Topic, Delivery);
_ ->
false
end
end,
InflightList
).
cleanup_self_from_shared_subs() ->
emqx_shared_sub:cleanup(self()).
-compile({inline, [run_hook/2]}).
run_hook(Name, Args) -> run_hook(Name, Args) ->
ok = emqx_metrics:inc(Name), ok = emqx_metrics:inc(Name),
emqx_hooks:run(Name, Args). emqx_hooks:run(Name, Args).

View File

@ -30,24 +30,18 @@
%% APIs %% APIs
-export([start_link/0]). -export([start_link/0]).
-export([subscribe/3, unsubscribe/3]).
-export([
subscribe/3,
unsubscribe/3
]).
-export([dispatch/3]). -export([dispatch/3]).
-export([ -export([
maybe_ack/1, maybe_ack/1,
maybe_nack_dropped/1, maybe_nack_dropped/1,
nack_no_connection/1, nack_no_connection/1,
is_ack_required/1 is_ack_required/1,
get_group/1,
cleanup/1
]). ]).
%% for testing %% for testing
-export([subscribers/2]). -export([subscribers/2]).
%% gen_server callbacks %% gen_server callbacks
-export([ -export([
init/1, init/1,
@ -60,27 +54,21 @@
-export_type([strategy/0]). -export_type([strategy/0]).
-type strategy() :: -type strategy() :: random | round_robin | sticky | hash | hash_clientid | hash_topic.
random
| round_robin %% same as hash_clientid, backward compatible
| sticky
%% same as hash_clientid, backward compatible
| hash
| hash_clientid
| hash_topic.
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
-define(TAB, emqx_shared_subscription). -define(TAB, emqx_shared_subscription).
-define(SHARED_SUBS, emqx_shared_subscriber). -define(SHARED_SUBS, emqx_shared_subscriber).
-define(ALIVE_SUBS, emqx_alive_shared_subscribers). -define(ALIVE_SUBS, emqx_alive_shared_subscribers).
-define(SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS, 5). -define(SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS, 5).
-define(IS_LOCAL_PID(Pid), (is_pid(Pid) andalso node(Pid) =:= node())). -define(IS_LOCAL_PID(Pid), is_pid(Pid) andalso node(Pid) =:= node()).
-define(ACK, shared_sub_ack). -define(ACK, shared_sub_ack).
-define(NACK(Reason), {shared_sub_nack, Reason}). -define(NACK(Reason), {shared_sub_nack, Reason}).
-define(NO_ACK, no_ack). -define(NO_ACK, no_ack).
-record(state, {pmon}). -record(state, {pmon}).
-record(emqx_shared_subscription, {group, topic, subpid}). -record(emqx_shared_subscription, {group, topic, subpid}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -88,13 +76,17 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
mnesia(boot) -> mnesia(boot) ->
ok = mria:create_table(?TAB, [ ok =
mria:create_table(
?TAB,
[
{type, bag}, {type, bag},
{rlog_shard, ?SHARED_SUB_SHARD}, {rlog_shard, ?SHARED_SUB_SHARD},
{storage, ram_copies}, {storage, ram_copies},
{record_name, emqx_shared_subscription}, {record_name, emqx_shared_subscription},
{attributes, record_info(fields, emqx_shared_subscription)} {attributes, record_info(fields, emqx_shared_subscription)}
]). ]
).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API
@ -112,8 +104,16 @@ subscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) -> unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
gen_server:call(?SERVER, {unsubscribe, Group, Topic, SubPid}). gen_server:call(?SERVER, {unsubscribe, Group, Topic, SubPid}).
-spec cleanup(pid()) -> ok.
cleanup(SubPid) ->
gen_server:call(?SERVER, {cleanup, SubPid}).
record(Group, Topic, SubPid) -> record(Group, Topic, SubPid) ->
#emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}. #emqx_shared_subscription{
group = Group,
topic = Topic,
subpid = SubPid
}.
-spec dispatch(emqx_types:group(), emqx_types:topic(), emqx_types:delivery()) -> -spec dispatch(emqx_types:group(), emqx_types:topic(), emqx_types:delivery()) ->
emqx_types:deliver_result(). emqx_types:deliver_result().
@ -126,7 +126,7 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
false -> false ->
{error, no_subscribers}; {error, no_subscribers};
{Type, SubPid} -> {Type, SubPid} ->
case do_dispatch(SubPid, Topic, Msg, Type) of case do_dispatch(SubPid, Group, Topic, Msg, Type) of
ok -> ok ->
{ok, 1}; {ok, 1};
{error, _Reason} -> {error, _Reason} ->
@ -143,40 +143,39 @@ strategy() ->
ack_enabled() -> ack_enabled() ->
emqx:get_config([broker, shared_dispatch_ack_enabled]). emqx:get_config([broker, shared_dispatch_ack_enabled]).
do_dispatch(SubPid, Topic, Msg, _Type) when SubPid =:= self() -> do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() ->
%% Deadlock otherwise %% Deadlock otherwise
_ = erlang:send(SubPid, {deliver, Topic, Msg}), SubPid ! {deliver, Topic, Msg},
ok; ok;
do_dispatch(SubPid, Topic, Msg, Type) ->
dispatch_per_qos(SubPid, Topic, Msg, Type).
%% return either 'ok' (when everything is fine) or 'error' %% return either 'ok' (when everything is fine) or 'error'
dispatch_per_qos(SubPid, Topic, #message{qos = ?QOS_0} = Msg, _Type) -> do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
%% For QoS 0 message, send it as regular dispatch %% For QoS 0 message, send it as regular dispatch
_ = erlang:send(SubPid, {deliver, Topic, Msg}), SubPid ! {deliver, Topic, Msg},
ok; ok;
dispatch_per_qos(SubPid, Topic, Msg, retry) -> do_dispatch(SubPid, _Group, Topic, Msg, retry) ->
%% Retry implies all subscribers nack:ed, send again without ack %% Retry implies all subscribers nack:ed, send again without ack
_ = erlang:send(SubPid, {deliver, Topic, Msg}), SubPid ! {deliver, Topic, Msg},
ok; ok;
dispatch_per_qos(SubPid, Topic, Msg, fresh) -> do_dispatch(SubPid, Group, Topic, Msg, fresh) ->
case ack_enabled() of case ack_enabled() of
true -> true ->
dispatch_with_ack(SubPid, Topic, Msg); dispatch_with_ack(SubPid, Group, Topic, Msg);
false -> false ->
_ = erlang:send(SubPid, {deliver, Topic, Msg}), SubPid ! {deliver, Topic, Msg},
ok ok
end. end.
dispatch_with_ack(SubPid, Topic, Msg) -> dispatch_with_ack(SubPid, Group, Topic, Msg) ->
%% For QoS 1/2 message, expect an ack %% For QoS 1/2 message, expect an ack
Ref = erlang:monitor(process, SubPid), Ref = erlang:monitor(process, SubPid),
Sender = self(), Sender = self(),
_ = erlang:send(SubPid, {deliver, Topic, with_ack_ref(Msg, {Sender, Ref})}), SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Sender, Ref)},
Timeout = Timeout =
case Msg#message.qos of case Msg#message.qos of
?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS); ?QOS_1 ->
?QOS_2 -> infinity timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS);
?QOS_2 ->
infinity
end, end,
try try
receive receive
@ -194,24 +193,37 @@ dispatch_with_ack(SubPid, Topic, Msg) ->
ok = emqx_pmon:demonitor(Ref) ok = emqx_pmon:demonitor(Ref)
end. end.
with_ack_ref(Msg, SenderRef) -> with_group_ack(Msg, Group, Sender, Ref) ->
emqx_message:set_headers(#{shared_dispatch_ack => SenderRef}, Msg). emqx_message:set_headers(#{shared_dispatch_ack => {Group, Sender, Ref}}, Msg).
without_ack_ref(Msg) -> -spec without_group_ack(emqx_types:message()) -> emqx_types:message().
without_group_ack(Msg) ->
emqx_message:set_headers(#{shared_dispatch_ack => ?NO_ACK}, Msg). emqx_message:set_headers(#{shared_dispatch_ack => ?NO_ACK}, Msg).
get_ack_ref(Msg) -> get_group_ack(Msg) ->
emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK). emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK).
-spec get_group(emqx_types:message()) -> {ok, any()} | error.
get_group(Msg) ->
case get_group_ack(Msg) of
?NO_ACK ->
error;
{Group, _Sender, _Ref} ->
{ok, Group}
end.
-spec is_ack_required(emqx_types:message()) -> boolean(). -spec is_ack_required(emqx_types:message()) -> boolean().
is_ack_required(Msg) -> ?NO_ACK =/= get_ack_ref(Msg). is_ack_required(Msg) ->
?NO_ACK =/= get_group_ack(Msg).
%% @doc Negative ack dropped message due to inflight window or message queue being full. %% @doc Negative ack dropped message due to inflight window or message queue being full.
-spec maybe_nack_dropped(emqx_types:message()) -> ok. -spec maybe_nack_dropped(emqx_types:message()) -> boolean().
maybe_nack_dropped(Msg) -> maybe_nack_dropped(Msg) ->
case get_ack_ref(Msg) of case get_group_ack(Msg) of
?NO_ACK -> ok; ?NO_ACK ->
{Sender, Ref} -> nack(Sender, Ref, dropped) false;
{_Group, Sender, Ref} ->
ok == nack(Sender, Ref, dropped)
end. end.
%% @doc Negative ack message due to connection down. %% @doc Negative ack message due to connection down.
@ -219,22 +231,23 @@ maybe_nack_dropped(Msg) ->
%% i.e is_ack_required returned true. %% i.e is_ack_required returned true.
-spec nack_no_connection(emqx_types:message()) -> ok. -spec nack_no_connection(emqx_types:message()) -> ok.
nack_no_connection(Msg) -> nack_no_connection(Msg) ->
{Sender, Ref} = get_ack_ref(Msg), {_Group, Sender, Ref} = get_group_ack(Msg),
nack(Sender, Ref, no_connection). nack(Sender, Ref, no_connection).
-spec nack(pid(), reference(), dropped | no_connection) -> ok. -spec nack(pid(), reference(), dropped | no_connection) -> ok.
nack(Sender, Ref, Reason) -> nack(Sender, Ref, Reason) ->
erlang:send(Sender, {Ref, ?NACK(Reason)}), Sender ! {Ref, ?NACK(Reason)},
ok. ok.
-spec maybe_ack(emqx_types:message()) -> emqx_types:message(). -spec maybe_ack(emqx_types:message()) -> emqx_types:message().
maybe_ack(Msg) -> maybe_ack(Msg) ->
case get_ack_ref(Msg) of case get_group_ack(Msg) of
?NO_ACK -> ?NO_ACK ->
Msg; Msg;
{Sender, Ref} -> {_Group, Sender, Ref} ->
erlang:send(Sender, {Ref, ?ACK}), Sender ! {Ref, ?ACK},
without_ack_ref(Msg) %% without_group_ack(Msg)
without_group_ack(Msg)
end. end.
pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) -> pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
@ -286,8 +299,10 @@ do_pick_subscriber(_Group, _Topic, hash_topic, _ClientId, SourceTopic, Count) ->
do_pick_subscriber(Group, Topic, round_robin, _ClientId, _SourceTopic, Count) -> do_pick_subscriber(Group, Topic, round_robin, _ClientId, _SourceTopic, Count) ->
Rem = Rem =
case erlang:get({shared_sub_round_robin, Group, Topic}) of case erlang:get({shared_sub_round_robin, Group, Topic}) of
undefined -> rand:uniform(Count) - 1; undefined ->
N -> (N + 1) rem Count rand:uniform(Count) - 1;
N ->
(N + 1) rem Count
end, end,
_ = erlang:put({shared_sub_round_robin, Group, Topic}, Rem), _ = erlang:put({shared_sub_round_robin, Group, Topic}, Rem),
Rem + 1. Rem + 1.
@ -316,11 +331,16 @@ init_monitors() ->
?TAB ?TAB
). ).
handle_call({cleanup, SubPid}, _From, State) ->
cleanup_down(SubPid),
{reply, ok, State};
handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) -> handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) ->
mria:dirty_write(?TAB, record(Group, Topic, SubPid)), mria:dirty_write(?TAB, record(Group, Topic, SubPid)),
case ets:member(?SHARED_SUBS, {Group, Topic}) of case ets:member(?SHARED_SUBS, {Group, Topic}) of
true -> ok; true ->
false -> ok = emqx_router:do_add_route(Topic, {Group, node()}) ok;
false ->
ok = emqx_router:do_add_route(Topic, {Group, node()})
end, end,
ok = maybe_insert_alive_tab(SubPid), ok = maybe_insert_alive_tab(SubPid),
true = ets:insert(?SHARED_SUBS, {{Group, Topic}, SubPid}), true = ets:insert(?SHARED_SUBS, {{Group, Topic}, SubPid}),
@ -348,11 +368,17 @@ handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = P
% handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) -> % handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) ->
% #emqx_shared_subscription{subpid = SubPid} = OldRecord, % #emqx_shared_subscription{subpid = SubPid} = OldRecord,
% {noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})}; % {noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})};
handle_info({mnesia_table_event, _Event}, State) -> handle_info({mnesia_table_event, _Event}, State) ->
{noreply, State}; {noreply, State};
handle_info({'DOWN', _MRef, process, SubPid, Reason}, State = #state{pmon = PMon}) -> handle_info({'DOWN', _MRef, process, SubPid, Reason}, State = #state{pmon = PMon}) ->
?SLOG(info, #{msg => "shared_subscriber_down", sub_pid => SubPid, reason => Reason}), ?SLOG(
info,
#{
msg => "shared_subscriber_down",
sub_pid => SubPid,
reason => Reason
}
),
cleanup_down(SubPid), cleanup_down(SubPid),
{noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})}; {noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})};
handle_info(_Info, State) -> handle_info(_Info, State) ->
@ -369,7 +395,8 @@ code_change(_OldVsn, State, _Extra) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% keep track of alive remote pids %% keep track of alive remote pids
maybe_insert_alive_tab(Pid) when ?IS_LOCAL_PID(Pid) -> ok; maybe_insert_alive_tab(Pid) when ?IS_LOCAL_PID(Pid) ->
ok;
maybe_insert_alive_tab(Pid) when is_pid(Pid) -> maybe_insert_alive_tab(Pid) when is_pid(Pid) ->
ets:insert(?ALIVE_SUBS, {Pid}), ets:insert(?ALIVE_SUBS, {Pid}),
ok. ok.
@ -405,6 +432,8 @@ is_alive_sub(Pid) ->
delete_route_if_needed({Group, Topic}) -> delete_route_if_needed({Group, Topic}) ->
case ets:member(?SHARED_SUBS, {Group, Topic}) of case ets:member(?SHARED_SUBS, {Group, Topic}) of
true -> ok; true ->
false -> ok = emqx_router:do_delete_route(Topic, {Group, node()}) ok;
false ->
ok = emqx_router:do_delete_route(Topic, {Group, node()})
end. end.

View File

@ -48,9 +48,9 @@ t_is_ack_required(_) ->
?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})). ?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})).
t_maybe_nack_dropped(_) -> t_maybe_nack_dropped(_) ->
?assertEqual(ok, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})), ?assertEqual(false, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})),
Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}}, Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
?assertEqual(ok, emqx_shared_sub:maybe_nack_dropped(Msg)), ?assertEqual(true, emqx_shared_sub:maybe_nack_dropped(Msg)),
?assertEqual( ?assertEqual(
ok, ok,
receive receive
@ -60,7 +60,7 @@ t_maybe_nack_dropped(_) ->
). ).
t_nack_no_connection(_) -> t_nack_no_connection(_) ->
Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}}, Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)), ?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)),
?assertEqual( ?assertEqual(
ok, ok,
@ -72,7 +72,7 @@ t_nack_no_connection(_) ->
t_maybe_ack(_) -> t_maybe_ack(_) ->
?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})), ?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})),
Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}}, Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
?assertEqual( ?assertEqual(
#message{headers = #{shared_dispatch_ack => ?no_ack}}, #message{headers = #{shared_dispatch_ack => ?no_ack}},
emqx_shared_sub:maybe_ack(Msg) emqx_shared_sub:maybe_ack(Msg)
@ -321,11 +321,15 @@ test_two_messages(Strategy, WithAck) ->
ok. ok.
last_message(ExpectedPayload, Pids) -> last_message(ExpectedPayload, Pids) ->
last_message(ExpectedPayload, Pids, 100).
last_message(ExpectedPayload, Pids, Timeout) ->
receive receive
{publish, #{client_pid := Pid, payload := ExpectedPayload}} -> {publish, #{client_pid := Pid, payload := ExpectedPayload}} ->
ct:pal("~p ====== ~p", [Pids, Pid]), ct:pal("~p ====== ~p", [Pids, Pid]),
{true, Pid} {true, Pid}
after 100 -> after Timeout ->
ct:pal("not yet"),
<<"not yet?">> <<"not yet?">>
end. end.