Compare commits
3 Commits
master
...
shared_red
Author | SHA1 | Date |
---|---|---|
![]() |
1657bde1ed | |
![]() |
a661a26218 | |
![]() |
4d19420100 |
|
@ -561,10 +561,12 @@ deliver_msg(
|
||||||
end,
|
end,
|
||||||
{ok, Session1};
|
{ok, Session1};
|
||||||
false ->
|
false ->
|
||||||
|
%% Note that we publish message without shared ack header
|
||||||
|
%% But add to inflight with ack headers
|
||||||
Publish = {PacketId, maybe_ack(Msg)},
|
Publish = {PacketId, maybe_ack(Msg)},
|
||||||
Msg2 = mark_begin_deliver(Msg),
|
Msg1 = with_ts(mark_begin_deliver(Msg)),
|
||||||
Session1 = await(PacketId, Msg2, Session),
|
Inflight1 = emqx_inflight:insert(PacketId, Msg1, Inflight),
|
||||||
{ok, [Publish], next_pkt_id(Session1)}
|
{ok, [Publish], next_pkt_id(Session#session{inflight = Inflight1})}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec enqueue(
|
-spec enqueue(
|
||||||
|
@ -625,14 +627,10 @@ enrich_deliver({deliver, Topic, Msg}, Session = #session{subscriptions = Subs})
|
||||||
enrich_subopts(get_subopts(Topic, Subs), Msg, Session).
|
enrich_subopts(get_subopts(Topic, Subs), Msg, Session).
|
||||||
|
|
||||||
maybe_ack(Msg) ->
|
maybe_ack(Msg) ->
|
||||||
case emqx_shared_sub:is_ack_required(Msg) of
|
emqx_shared_sub:maybe_ack(Msg).
|
||||||
true -> emqx_shared_sub:maybe_ack(Msg);
|
|
||||||
false -> Msg
|
|
||||||
end.
|
|
||||||
|
|
||||||
maybe_nack(Msg) ->
|
maybe_nack(Msg) ->
|
||||||
emqx_shared_sub:is_ack_required(Msg) andalso
|
emqx_shared_sub:maybe_nack_dropped(Msg).
|
||||||
(ok == emqx_shared_sub:maybe_nack_dropped(Msg)).
|
|
||||||
|
|
||||||
get_subopts(Topic, SubMap) ->
|
get_subopts(Topic, SubMap) ->
|
||||||
case maps:find(Topic, SubMap) of
|
case maps:find(Topic, SubMap) of
|
||||||
|
@ -673,14 +671,6 @@ enrich_subopts([{subid, SubId} | Opts], Msg, Session) ->
|
||||||
Msg1 = emqx_message:set_header(properties, Props#{'Subscription-Identifier' => SubId}, Msg),
|
Msg1 = emqx_message:set_header(properties, Props#{'Subscription-Identifier' => SubId}, Msg),
|
||||||
enrich_subopts(Opts, Msg1, Session).
|
enrich_subopts(Opts, Msg1, Session).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Awaiting ACK for QoS1/QoS2 Messages
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
await(PacketId, Msg, Session = #session{inflight = Inflight}) ->
|
|
||||||
Inflight1 = emqx_inflight:insert(PacketId, with_ts(Msg), Inflight),
|
|
||||||
Session#session{inflight = Inflight1}.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Retry Delivery
|
%% Retry Delivery
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -808,13 +798,38 @@ replay(ClientInfo, Session = #session{inflight = Inflight}) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok.
|
-spec terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok.
|
||||||
terminate(ClientInfo, discarded, Session) ->
|
|
||||||
run_hook('session.discarded', [ClientInfo, info(Session)]);
|
|
||||||
terminate(ClientInfo, takenover, Session) ->
|
|
||||||
run_hook('session.takenover', [ClientInfo, info(Session)]);
|
|
||||||
terminate(ClientInfo, Reason, Session) ->
|
terminate(ClientInfo, Reason, Session) ->
|
||||||
|
run_terminate_hooks(ClientInfo, Reason, Session),
|
||||||
|
cleanup_self_from_shared_subs(),
|
||||||
|
redispatch_shared_messages(Session),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
run_terminate_hooks(ClientInfo, discarded, Session) ->
|
||||||
|
run_hook('session.discarded', [ClientInfo, info(Session)]);
|
||||||
|
run_terminate_hooks(ClientInfo, takeovered, Session) ->
|
||||||
|
run_hook('session.takeovered', [ClientInfo, info(Session)]);
|
||||||
|
run_terminate_hooks(ClientInfo, Reason, Session) ->
|
||||||
run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
|
run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
|
||||||
|
|
||||||
|
redispatch_shared_messages(#session{inflight = Inflight}) ->
|
||||||
|
InflightList = emqx_inflight:to_list(Inflight),
|
||||||
|
lists:map(
|
||||||
|
fun({_, {#message{topic = Topic} = Msg, _}}) ->
|
||||||
|
case emqx_shared_sub:get_group(Msg) of
|
||||||
|
{ok, Group} ->
|
||||||
|
Delivery = #delivery{sender = self(), message = Msg},
|
||||||
|
emqx_shared_sub:dispatch(Group, Topic, Delivery);
|
||||||
|
_ ->
|
||||||
|
false
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
InflightList
|
||||||
|
).
|
||||||
|
|
||||||
|
cleanup_self_from_shared_subs() ->
|
||||||
|
emqx_shared_sub:cleanup(self()).
|
||||||
|
|
||||||
|
-compile({inline, [run_hook/2]}).
|
||||||
run_hook(Name, Args) ->
|
run_hook(Name, Args) ->
|
||||||
ok = emqx_metrics:inc(Name),
|
ok = emqx_metrics:inc(Name),
|
||||||
emqx_hooks:run(Name, Args).
|
emqx_hooks:run(Name, Args).
|
||||||
|
|
|
@ -30,24 +30,18 @@
|
||||||
|
|
||||||
%% APIs
|
%% APIs
|
||||||
-export([start_link/0]).
|
-export([start_link/0]).
|
||||||
|
-export([subscribe/3, unsubscribe/3]).
|
||||||
-export([
|
|
||||||
subscribe/3,
|
|
||||||
unsubscribe/3
|
|
||||||
]).
|
|
||||||
|
|
||||||
-export([dispatch/3]).
|
-export([dispatch/3]).
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
maybe_ack/1,
|
maybe_ack/1,
|
||||||
maybe_nack_dropped/1,
|
maybe_nack_dropped/1,
|
||||||
nack_no_connection/1,
|
nack_no_connection/1,
|
||||||
is_ack_required/1
|
is_ack_required/1,
|
||||||
|
get_group/1,
|
||||||
|
cleanup/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% for testing
|
%% for testing
|
||||||
-export([subscribers/2]).
|
-export([subscribers/2]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([
|
-export([
|
||||||
init/1,
|
init/1,
|
||||||
|
@ -60,27 +54,21 @@
|
||||||
|
|
||||||
-export_type([strategy/0]).
|
-export_type([strategy/0]).
|
||||||
|
|
||||||
-type strategy() ::
|
-type strategy() :: random | round_robin | sticky | hash | hash_clientid | hash_topic.
|
||||||
random
|
|
||||||
| round_robin
|
%% same as hash_clientid, backward compatible
|
||||||
| sticky
|
|
||||||
%% same as hash_clientid, backward compatible
|
|
||||||
| hash
|
|
||||||
| hash_clientid
|
|
||||||
| hash_topic.
|
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
-define(TAB, emqx_shared_subscription).
|
-define(TAB, emqx_shared_subscription).
|
||||||
-define(SHARED_SUBS, emqx_shared_subscriber).
|
-define(SHARED_SUBS, emqx_shared_subscriber).
|
||||||
-define(ALIVE_SUBS, emqx_alive_shared_subscribers).
|
-define(ALIVE_SUBS, emqx_alive_shared_subscribers).
|
||||||
-define(SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS, 5).
|
-define(SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS, 5).
|
||||||
-define(IS_LOCAL_PID(Pid), (is_pid(Pid) andalso node(Pid) =:= node())).
|
-define(IS_LOCAL_PID(Pid), is_pid(Pid) andalso node(Pid) =:= node()).
|
||||||
-define(ACK, shared_sub_ack).
|
-define(ACK, shared_sub_ack).
|
||||||
-define(NACK(Reason), {shared_sub_nack, Reason}).
|
-define(NACK(Reason), {shared_sub_nack, Reason}).
|
||||||
-define(NO_ACK, no_ack).
|
-define(NO_ACK, no_ack).
|
||||||
|
|
||||||
-record(state, {pmon}).
|
-record(state, {pmon}).
|
||||||
|
|
||||||
-record(emqx_shared_subscription, {group, topic, subpid}).
|
-record(emqx_shared_subscription, {group, topic, subpid}).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -88,13 +76,17 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
mnesia(boot) ->
|
mnesia(boot) ->
|
||||||
ok = mria:create_table(?TAB, [
|
ok =
|
||||||
|
mria:create_table(
|
||||||
|
?TAB,
|
||||||
|
[
|
||||||
{type, bag},
|
{type, bag},
|
||||||
{rlog_shard, ?SHARED_SUB_SHARD},
|
{rlog_shard, ?SHARED_SUB_SHARD},
|
||||||
{storage, ram_copies},
|
{storage, ram_copies},
|
||||||
{record_name, emqx_shared_subscription},
|
{record_name, emqx_shared_subscription},
|
||||||
{attributes, record_info(fields, emqx_shared_subscription)}
|
{attributes, record_info(fields, emqx_shared_subscription)}
|
||||||
]).
|
]
|
||||||
|
).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% API
|
%% API
|
||||||
|
@ -112,8 +104,16 @@ subscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
|
||||||
unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
|
unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
|
||||||
gen_server:call(?SERVER, {unsubscribe, Group, Topic, SubPid}).
|
gen_server:call(?SERVER, {unsubscribe, Group, Topic, SubPid}).
|
||||||
|
|
||||||
|
-spec cleanup(pid()) -> ok.
|
||||||
|
cleanup(SubPid) ->
|
||||||
|
gen_server:call(?SERVER, {cleanup, SubPid}).
|
||||||
|
|
||||||
record(Group, Topic, SubPid) ->
|
record(Group, Topic, SubPid) ->
|
||||||
#emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
|
#emqx_shared_subscription{
|
||||||
|
group = Group,
|
||||||
|
topic = Topic,
|
||||||
|
subpid = SubPid
|
||||||
|
}.
|
||||||
|
|
||||||
-spec dispatch(emqx_types:group(), emqx_types:topic(), emqx_types:delivery()) ->
|
-spec dispatch(emqx_types:group(), emqx_types:topic(), emqx_types:delivery()) ->
|
||||||
emqx_types:deliver_result().
|
emqx_types:deliver_result().
|
||||||
|
@ -126,7 +126,7 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
|
||||||
false ->
|
false ->
|
||||||
{error, no_subscribers};
|
{error, no_subscribers};
|
||||||
{Type, SubPid} ->
|
{Type, SubPid} ->
|
||||||
case do_dispatch(SubPid, Topic, Msg, Type) of
|
case do_dispatch(SubPid, Group, Topic, Msg, Type) of
|
||||||
ok ->
|
ok ->
|
||||||
{ok, 1};
|
{ok, 1};
|
||||||
{error, _Reason} ->
|
{error, _Reason} ->
|
||||||
|
@ -143,40 +143,39 @@ strategy() ->
|
||||||
ack_enabled() ->
|
ack_enabled() ->
|
||||||
emqx:get_config([broker, shared_dispatch_ack_enabled]).
|
emqx:get_config([broker, shared_dispatch_ack_enabled]).
|
||||||
|
|
||||||
do_dispatch(SubPid, Topic, Msg, _Type) when SubPid =:= self() ->
|
do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() ->
|
||||||
%% Deadlock otherwise
|
%% Deadlock otherwise
|
||||||
_ = erlang:send(SubPid, {deliver, Topic, Msg}),
|
SubPid ! {deliver, Topic, Msg},
|
||||||
ok;
|
ok;
|
||||||
do_dispatch(SubPid, Topic, Msg, Type) ->
|
|
||||||
dispatch_per_qos(SubPid, Topic, Msg, Type).
|
|
||||||
|
|
||||||
%% return either 'ok' (when everything is fine) or 'error'
|
%% return either 'ok' (when everything is fine) or 'error'
|
||||||
dispatch_per_qos(SubPid, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
|
do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
|
||||||
%% For QoS 0 message, send it as regular dispatch
|
%% For QoS 0 message, send it as regular dispatch
|
||||||
_ = erlang:send(SubPid, {deliver, Topic, Msg}),
|
SubPid ! {deliver, Topic, Msg},
|
||||||
ok;
|
ok;
|
||||||
dispatch_per_qos(SubPid, Topic, Msg, retry) ->
|
do_dispatch(SubPid, _Group, Topic, Msg, retry) ->
|
||||||
%% Retry implies all subscribers nack:ed, send again without ack
|
%% Retry implies all subscribers nack:ed, send again without ack
|
||||||
_ = erlang:send(SubPid, {deliver, Topic, Msg}),
|
SubPid ! {deliver, Topic, Msg},
|
||||||
ok;
|
ok;
|
||||||
dispatch_per_qos(SubPid, Topic, Msg, fresh) ->
|
do_dispatch(SubPid, Group, Topic, Msg, fresh) ->
|
||||||
case ack_enabled() of
|
case ack_enabled() of
|
||||||
true ->
|
true ->
|
||||||
dispatch_with_ack(SubPid, Topic, Msg);
|
dispatch_with_ack(SubPid, Group, Topic, Msg);
|
||||||
false ->
|
false ->
|
||||||
_ = erlang:send(SubPid, {deliver, Topic, Msg}),
|
SubPid ! {deliver, Topic, Msg},
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
dispatch_with_ack(SubPid, Topic, Msg) ->
|
dispatch_with_ack(SubPid, Group, Topic, Msg) ->
|
||||||
%% For QoS 1/2 message, expect an ack
|
%% For QoS 1/2 message, expect an ack
|
||||||
Ref = erlang:monitor(process, SubPid),
|
Ref = erlang:monitor(process, SubPid),
|
||||||
Sender = self(),
|
Sender = self(),
|
||||||
_ = erlang:send(SubPid, {deliver, Topic, with_ack_ref(Msg, {Sender, Ref})}),
|
SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Sender, Ref)},
|
||||||
Timeout =
|
Timeout =
|
||||||
case Msg#message.qos of
|
case Msg#message.qos of
|
||||||
?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS);
|
?QOS_1 ->
|
||||||
?QOS_2 -> infinity
|
timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS);
|
||||||
|
?QOS_2 ->
|
||||||
|
infinity
|
||||||
end,
|
end,
|
||||||
try
|
try
|
||||||
receive
|
receive
|
||||||
|
@ -194,24 +193,37 @@ dispatch_with_ack(SubPid, Topic, Msg) ->
|
||||||
ok = emqx_pmon:demonitor(Ref)
|
ok = emqx_pmon:demonitor(Ref)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
with_ack_ref(Msg, SenderRef) ->
|
with_group_ack(Msg, Group, Sender, Ref) ->
|
||||||
emqx_message:set_headers(#{shared_dispatch_ack => SenderRef}, Msg).
|
emqx_message:set_headers(#{shared_dispatch_ack => {Group, Sender, Ref}}, Msg).
|
||||||
|
|
||||||
without_ack_ref(Msg) ->
|
-spec without_group_ack(emqx_types:message()) -> emqx_types:message().
|
||||||
|
without_group_ack(Msg) ->
|
||||||
emqx_message:set_headers(#{shared_dispatch_ack => ?NO_ACK}, Msg).
|
emqx_message:set_headers(#{shared_dispatch_ack => ?NO_ACK}, Msg).
|
||||||
|
|
||||||
get_ack_ref(Msg) ->
|
get_group_ack(Msg) ->
|
||||||
emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK).
|
emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK).
|
||||||
|
|
||||||
|
-spec get_group(emqx_types:message()) -> {ok, any()} | error.
|
||||||
|
get_group(Msg) ->
|
||||||
|
case get_group_ack(Msg) of
|
||||||
|
?NO_ACK ->
|
||||||
|
error;
|
||||||
|
{Group, _Sender, _Ref} ->
|
||||||
|
{ok, Group}
|
||||||
|
end.
|
||||||
|
|
||||||
-spec is_ack_required(emqx_types:message()) -> boolean().
|
-spec is_ack_required(emqx_types:message()) -> boolean().
|
||||||
is_ack_required(Msg) -> ?NO_ACK =/= get_ack_ref(Msg).
|
is_ack_required(Msg) ->
|
||||||
|
?NO_ACK =/= get_group_ack(Msg).
|
||||||
|
|
||||||
%% @doc Negative ack dropped message due to inflight window or message queue being full.
|
%% @doc Negative ack dropped message due to inflight window or message queue being full.
|
||||||
-spec maybe_nack_dropped(emqx_types:message()) -> ok.
|
-spec maybe_nack_dropped(emqx_types:message()) -> boolean().
|
||||||
maybe_nack_dropped(Msg) ->
|
maybe_nack_dropped(Msg) ->
|
||||||
case get_ack_ref(Msg) of
|
case get_group_ack(Msg) of
|
||||||
?NO_ACK -> ok;
|
?NO_ACK ->
|
||||||
{Sender, Ref} -> nack(Sender, Ref, dropped)
|
false;
|
||||||
|
{_Group, Sender, Ref} ->
|
||||||
|
ok == nack(Sender, Ref, dropped)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @doc Negative ack message due to connection down.
|
%% @doc Negative ack message due to connection down.
|
||||||
|
@ -219,22 +231,23 @@ maybe_nack_dropped(Msg) ->
|
||||||
%% i.e is_ack_required returned true.
|
%% i.e is_ack_required returned true.
|
||||||
-spec nack_no_connection(emqx_types:message()) -> ok.
|
-spec nack_no_connection(emqx_types:message()) -> ok.
|
||||||
nack_no_connection(Msg) ->
|
nack_no_connection(Msg) ->
|
||||||
{Sender, Ref} = get_ack_ref(Msg),
|
{_Group, Sender, Ref} = get_group_ack(Msg),
|
||||||
nack(Sender, Ref, no_connection).
|
nack(Sender, Ref, no_connection).
|
||||||
|
|
||||||
-spec nack(pid(), reference(), dropped | no_connection) -> ok.
|
-spec nack(pid(), reference(), dropped | no_connection) -> ok.
|
||||||
nack(Sender, Ref, Reason) ->
|
nack(Sender, Ref, Reason) ->
|
||||||
erlang:send(Sender, {Ref, ?NACK(Reason)}),
|
Sender ! {Ref, ?NACK(Reason)},
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
-spec maybe_ack(emqx_types:message()) -> emqx_types:message().
|
-spec maybe_ack(emqx_types:message()) -> emqx_types:message().
|
||||||
maybe_ack(Msg) ->
|
maybe_ack(Msg) ->
|
||||||
case get_ack_ref(Msg) of
|
case get_group_ack(Msg) of
|
||||||
?NO_ACK ->
|
?NO_ACK ->
|
||||||
Msg;
|
Msg;
|
||||||
{Sender, Ref} ->
|
{_Group, Sender, Ref} ->
|
||||||
erlang:send(Sender, {Ref, ?ACK}),
|
Sender ! {Ref, ?ACK},
|
||||||
without_ack_ref(Msg)
|
%% without_group_ack(Msg)
|
||||||
|
without_group_ack(Msg)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
|
pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
|
||||||
|
@ -286,8 +299,10 @@ do_pick_subscriber(_Group, _Topic, hash_topic, _ClientId, SourceTopic, Count) ->
|
||||||
do_pick_subscriber(Group, Topic, round_robin, _ClientId, _SourceTopic, Count) ->
|
do_pick_subscriber(Group, Topic, round_robin, _ClientId, _SourceTopic, Count) ->
|
||||||
Rem =
|
Rem =
|
||||||
case erlang:get({shared_sub_round_robin, Group, Topic}) of
|
case erlang:get({shared_sub_round_robin, Group, Topic}) of
|
||||||
undefined -> rand:uniform(Count) - 1;
|
undefined ->
|
||||||
N -> (N + 1) rem Count
|
rand:uniform(Count) - 1;
|
||||||
|
N ->
|
||||||
|
(N + 1) rem Count
|
||||||
end,
|
end,
|
||||||
_ = erlang:put({shared_sub_round_robin, Group, Topic}, Rem),
|
_ = erlang:put({shared_sub_round_robin, Group, Topic}, Rem),
|
||||||
Rem + 1.
|
Rem + 1.
|
||||||
|
@ -316,11 +331,16 @@ init_monitors() ->
|
||||||
?TAB
|
?TAB
|
||||||
).
|
).
|
||||||
|
|
||||||
|
handle_call({cleanup, SubPid}, _From, State) ->
|
||||||
|
cleanup_down(SubPid),
|
||||||
|
{reply, ok, State};
|
||||||
handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) ->
|
handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) ->
|
||||||
mria:dirty_write(?TAB, record(Group, Topic, SubPid)),
|
mria:dirty_write(?TAB, record(Group, Topic, SubPid)),
|
||||||
case ets:member(?SHARED_SUBS, {Group, Topic}) of
|
case ets:member(?SHARED_SUBS, {Group, Topic}) of
|
||||||
true -> ok;
|
true ->
|
||||||
false -> ok = emqx_router:do_add_route(Topic, {Group, node()})
|
ok;
|
||||||
|
false ->
|
||||||
|
ok = emqx_router:do_add_route(Topic, {Group, node()})
|
||||||
end,
|
end,
|
||||||
ok = maybe_insert_alive_tab(SubPid),
|
ok = maybe_insert_alive_tab(SubPid),
|
||||||
true = ets:insert(?SHARED_SUBS, {{Group, Topic}, SubPid}),
|
true = ets:insert(?SHARED_SUBS, {{Group, Topic}, SubPid}),
|
||||||
|
@ -348,11 +368,17 @@ handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = P
|
||||||
% handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) ->
|
% handle_info({mnesia_table_event, {delete_object, OldRecord, _}}, State = #state{pmon = PMon}) ->
|
||||||
% #emqx_shared_subscription{subpid = SubPid} = OldRecord,
|
% #emqx_shared_subscription{subpid = SubPid} = OldRecord,
|
||||||
% {noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})};
|
% {noreply, update_stats(State#state{pmon = emqx_pmon:demonitor(SubPid, PMon)})};
|
||||||
|
|
||||||
handle_info({mnesia_table_event, _Event}, State) ->
|
handle_info({mnesia_table_event, _Event}, State) ->
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
handle_info({'DOWN', _MRef, process, SubPid, Reason}, State = #state{pmon = PMon}) ->
|
handle_info({'DOWN', _MRef, process, SubPid, Reason}, State = #state{pmon = PMon}) ->
|
||||||
?SLOG(info, #{msg => "shared_subscriber_down", sub_pid => SubPid, reason => Reason}),
|
?SLOG(
|
||||||
|
info,
|
||||||
|
#{
|
||||||
|
msg => "shared_subscriber_down",
|
||||||
|
sub_pid => SubPid,
|
||||||
|
reason => Reason
|
||||||
|
}
|
||||||
|
),
|
||||||
cleanup_down(SubPid),
|
cleanup_down(SubPid),
|
||||||
{noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})};
|
{noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})};
|
||||||
handle_info(_Info, State) ->
|
handle_info(_Info, State) ->
|
||||||
|
@ -369,7 +395,8 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
%% keep track of alive remote pids
|
%% keep track of alive remote pids
|
||||||
maybe_insert_alive_tab(Pid) when ?IS_LOCAL_PID(Pid) -> ok;
|
maybe_insert_alive_tab(Pid) when ?IS_LOCAL_PID(Pid) ->
|
||||||
|
ok;
|
||||||
maybe_insert_alive_tab(Pid) when is_pid(Pid) ->
|
maybe_insert_alive_tab(Pid) when is_pid(Pid) ->
|
||||||
ets:insert(?ALIVE_SUBS, {Pid}),
|
ets:insert(?ALIVE_SUBS, {Pid}),
|
||||||
ok.
|
ok.
|
||||||
|
@ -405,6 +432,8 @@ is_alive_sub(Pid) ->
|
||||||
|
|
||||||
delete_route_if_needed({Group, Topic}) ->
|
delete_route_if_needed({Group, Topic}) ->
|
||||||
case ets:member(?SHARED_SUBS, {Group, Topic}) of
|
case ets:member(?SHARED_SUBS, {Group, Topic}) of
|
||||||
true -> ok;
|
true ->
|
||||||
false -> ok = emqx_router:do_delete_route(Topic, {Group, node()})
|
ok;
|
||||||
|
false ->
|
||||||
|
ok = emqx_router:do_delete_route(Topic, {Group, node()})
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -48,9 +48,9 @@ t_is_ack_required(_) ->
|
||||||
?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})).
|
?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})).
|
||||||
|
|
||||||
t_maybe_nack_dropped(_) ->
|
t_maybe_nack_dropped(_) ->
|
||||||
?assertEqual(ok, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})),
|
?assertEqual(false, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})),
|
||||||
Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}},
|
Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
|
||||||
?assertEqual(ok, emqx_shared_sub:maybe_nack_dropped(Msg)),
|
?assertEqual(true, emqx_shared_sub:maybe_nack_dropped(Msg)),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
ok,
|
ok,
|
||||||
receive
|
receive
|
||||||
|
@ -60,7 +60,7 @@ t_maybe_nack_dropped(_) ->
|
||||||
).
|
).
|
||||||
|
|
||||||
t_nack_no_connection(_) ->
|
t_nack_no_connection(_) ->
|
||||||
Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}},
|
Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
|
||||||
?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)),
|
?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
ok,
|
ok,
|
||||||
|
@ -72,7 +72,7 @@ t_nack_no_connection(_) ->
|
||||||
|
|
||||||
t_maybe_ack(_) ->
|
t_maybe_ack(_) ->
|
||||||
?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})),
|
?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})),
|
||||||
Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}},
|
Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
#message{headers = #{shared_dispatch_ack => ?no_ack}},
|
#message{headers = #{shared_dispatch_ack => ?no_ack}},
|
||||||
emqx_shared_sub:maybe_ack(Msg)
|
emqx_shared_sub:maybe_ack(Msg)
|
||||||
|
@ -321,11 +321,15 @@ test_two_messages(Strategy, WithAck) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
last_message(ExpectedPayload, Pids) ->
|
last_message(ExpectedPayload, Pids) ->
|
||||||
|
last_message(ExpectedPayload, Pids, 100).
|
||||||
|
|
||||||
|
last_message(ExpectedPayload, Pids, Timeout) ->
|
||||||
receive
|
receive
|
||||||
{publish, #{client_pid := Pid, payload := ExpectedPayload}} ->
|
{publish, #{client_pid := Pid, payload := ExpectedPayload}} ->
|
||||||
ct:pal("~p ====== ~p", [Pids, Pid]),
|
ct:pal("~p ====== ~p", [Pids, Pid]),
|
||||||
{true, Pid}
|
{true, Pid}
|
||||||
after 100 ->
|
after Timeout ->
|
||||||
|
ct:pal("not yet"),
|
||||||
<<"not yet?">>
|
<<"not yet?">>
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue