Merge pull request #9578 from zmstone/1219-fix-shared-sub-sticky
1219 fix shared sub sticky
This commit is contained in:
commit
b5dbfcf280
|
@ -1078,11 +1078,13 @@ Supported configurations are the following:
|
||||||
|
|
||||||
broker_shared_dispatch_ack_enabled {
|
broker_shared_dispatch_ack_enabled {
|
||||||
desc {
|
desc {
|
||||||
en: """Enable/disable shared dispatch acknowledgement for QoS 1 and QoS 2 messages.
|
en: """Deprecated, will be removed in 5.1.
|
||||||
|
Enable/disable shared dispatch acknowledgement for QoS 1 and QoS 2 messages.
|
||||||
This should allow messages to be dispatched to a different subscriber in the group in case the picked (based on `shared_subscription_strategy`) subscriber is offline.
|
This should allow messages to be dispatched to a different subscriber in the group in case the picked (based on `shared_subscription_strategy`) subscriber is offline.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
zh: """启用/禁用 QoS 1 和 QoS 2 消息的共享派发确认。
|
zh: """该配置项已废弃,会在 5.1 中移除。
|
||||||
|
启用/禁用 QoS 1 和 QoS 2 消息的共享派发确认。
|
||||||
开启后,允许将消息从未及时回复 ACK 的订阅者 (例如,客户端离线)重新派发给另外一个订阅者。
|
开启后,允许将消息从未及时回复 ACK 的订阅者 (例如,客户端离线)重新派发给另外一个订阅者。
|
||||||
"""
|
"""
|
||||||
}
|
}
|
||||||
|
|
|
@ -1198,6 +1198,10 @@ fields("broker") ->
|
||||||
sc(
|
sc(
|
||||||
boolean(),
|
boolean(),
|
||||||
#{
|
#{
|
||||||
|
%% TODO: deprecated => {since, "5.1.0"}
|
||||||
|
%% in favor of session message re-dispatch at termination
|
||||||
|
%% we will stop supporting dispatch acks for shared
|
||||||
|
%% subscriptions.
|
||||||
default => false,
|
default => false,
|
||||||
desc => ?DESC(broker_shared_dispatch_ack_enabled)
|
desc => ?DESC(broker_shared_dispatch_ack_enabled)
|
||||||
}
|
}
|
||||||
|
|
|
@ -81,8 +81,6 @@
|
||||||
| round_robin_per_group
|
| round_robin_per_group
|
||||||
| sticky
|
| sticky
|
||||||
| local
|
| local
|
||||||
%% same as hash_clientid, backward compatible
|
|
||||||
| hash
|
|
||||||
| hash_clientid
|
| hash_clientid
|
||||||
| hash_topic.
|
| hash_topic.
|
||||||
|
|
||||||
|
@ -97,6 +95,7 @@
|
||||||
-define(NACK(Reason), {shared_sub_nack, Reason}).
|
-define(NACK(Reason), {shared_sub_nack, Reason}).
|
||||||
-define(NO_ACK, no_ack).
|
-define(NO_ACK, no_ack).
|
||||||
-define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}).
|
-define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}).
|
||||||
|
-define(SUBSCRIBER_DOWN, noproc).
|
||||||
|
|
||||||
-type redispatch_to() :: ?REDISPATCH_TO(emqx_topic:group(), emqx_topic:topic()).
|
-type redispatch_to() :: ?REDISPATCH_TO(emqx_topic:group(), emqx_topic:topic()).
|
||||||
|
|
||||||
|
@ -139,7 +138,7 @@ record(Group, Topic, SubPid) ->
|
||||||
-spec dispatch(emqx_types:group(), emqx_types:topic(), emqx_types:delivery()) ->
|
-spec dispatch(emqx_types:group(), emqx_types:topic(), emqx_types:delivery()) ->
|
||||||
emqx_types:deliver_result().
|
emqx_types:deliver_result().
|
||||||
dispatch(Group, Topic, Delivery) ->
|
dispatch(Group, Topic, Delivery) ->
|
||||||
dispatch(Group, Topic, Delivery, _FailedSubs = []).
|
dispatch(Group, Topic, Delivery, _FailedSubs = #{}).
|
||||||
|
|
||||||
dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
|
dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
|
||||||
#message{from = ClientId, topic = SourceTopic} = Msg,
|
#message{from = ClientId, topic = SourceTopic} = Msg,
|
||||||
|
@ -151,9 +150,9 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
|
||||||
case do_dispatch(SubPid, Group, Topic, Msg1, Type) of
|
case do_dispatch(SubPid, Group, Topic, Msg1, Type) of
|
||||||
ok ->
|
ok ->
|
||||||
{ok, 1};
|
{ok, 1};
|
||||||
{error, _Reason} ->
|
{error, Reason} ->
|
||||||
%% Failed to dispatch to this sub, try next.
|
%% Failed to dispatch to this sub, try next.
|
||||||
dispatch(Group, Topic, Delivery, [SubPid | FailedSubs])
|
dispatch(Group, Topic, Delivery, FailedSubs#{SubPid => Reason})
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -262,7 +261,9 @@ redispatch_shared_message(#message{} = Msg) ->
|
||||||
%% Note that dispatch is called with self() in failed subs
|
%% Note that dispatch is called with self() in failed subs
|
||||||
%% This is done to avoid dispatching back to caller
|
%% This is done to avoid dispatching back to caller
|
||||||
Delivery = #delivery{sender = self(), message = Msg},
|
Delivery = #delivery{sender = self(), message = Msg},
|
||||||
dispatch(Group, Topic, Delivery, [self()]).
|
%% Self is terminating, it makes no sense to loop-back the dispatch
|
||||||
|
FailedSubs = #{self() => ?SUBSCRIBER_DOWN},
|
||||||
|
dispatch(Group, Topic, Delivery, FailedSubs).
|
||||||
|
|
||||||
%% @hidden Return the `redispatch_to` group-topic in the message header.
|
%% @hidden Return the `redispatch_to` group-topic in the message header.
|
||||||
%% `false` is returned if the message is not a shared dispatch.
|
%% `false` is returned if the message is not a shared dispatch.
|
||||||
|
@ -307,27 +308,33 @@ maybe_ack(Msg) ->
|
||||||
|
|
||||||
pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
|
pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
|
||||||
Sub0 = erlang:get({shared_sub_sticky, Group, Topic}),
|
Sub0 = erlang:get({shared_sub_sticky, Group, Topic}),
|
||||||
case is_active_sub(Sub0, FailedSubs) of
|
All = subscribers(Group, Topic, FailedSubs),
|
||||||
|
case is_active_sub(Sub0, FailedSubs, All) of
|
||||||
true ->
|
true ->
|
||||||
%% the old subscriber is still alive
|
%% the old subscriber is still alive
|
||||||
%% keep using it for sticky strategy
|
%% keep using it for sticky strategy
|
||||||
{fresh, Sub0};
|
{fresh, Sub0};
|
||||||
false ->
|
false ->
|
||||||
%% randomly pick one for the first message
|
%% randomly pick one for the first message
|
||||||
{Type, Sub} = do_pick(random, ClientId, SourceTopic, Group, Topic, [Sub0 | FailedSubs]),
|
FailedSubs1 = FailedSubs#{Sub0 => ?SUBSCRIBER_DOWN},
|
||||||
%% stick to whatever pick result
|
Res = do_pick(All, random, ClientId, SourceTopic, Group, Topic, FailedSubs1),
|
||||||
erlang:put({shared_sub_sticky, Group, Topic}, Sub),
|
case Res of
|
||||||
{Type, Sub}
|
{_, Sub} ->
|
||||||
|
%% stick to whatever pick result
|
||||||
|
erlang:put({shared_sub_sticky, Group, Topic}, Sub);
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
Res
|
||||||
end;
|
end;
|
||||||
pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
|
pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
|
||||||
do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs).
|
All = subscribers(Group, Topic, FailedSubs),
|
||||||
|
do_pick(All, Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs).
|
||||||
|
|
||||||
do_pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
|
do_pick([], _Strategy, _ClientId, _SourceTopic, _Group, _Topic, _FailedSubs) ->
|
||||||
All = subscribers(Group, Topic),
|
false;
|
||||||
case All -- FailedSubs of
|
do_pick(All, Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
|
||||||
[] when All =:= [] ->
|
case lists:filter(fun(Sub) -> not maps:is_key(Sub, FailedSubs) end, All) of
|
||||||
%% Genuinely no subscriber
|
|
||||||
false;
|
|
||||||
[] ->
|
[] ->
|
||||||
%% All offline? pick one anyway
|
%% All offline? pick one anyway
|
||||||
{retry, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, All)};
|
{retry, pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, All)};
|
||||||
|
@ -351,9 +358,6 @@ pick_subscriber(Group, Topic, Strategy, ClientId, SourceTopic, Subs) ->
|
||||||
|
|
||||||
do_pick_subscriber(_Group, _Topic, random, _ClientId, _SourceTopic, Count) ->
|
do_pick_subscriber(_Group, _Topic, random, _ClientId, _SourceTopic, Count) ->
|
||||||
rand:uniform(Count);
|
rand:uniform(Count);
|
||||||
do_pick_subscriber(Group, Topic, hash, ClientId, SourceTopic, Count) ->
|
|
||||||
%% backward compatible
|
|
||||||
do_pick_subscriber(Group, Topic, hash_clientid, ClientId, SourceTopic, Count);
|
|
||||||
do_pick_subscriber(_Group, _Topic, hash_clientid, ClientId, _SourceTopic, Count) ->
|
do_pick_subscriber(_Group, _Topic, hash_clientid, ClientId, _SourceTopic, Count) ->
|
||||||
1 + erlang:phash2(ClientId) rem Count;
|
1 + erlang:phash2(ClientId) rem Count;
|
||||||
do_pick_subscriber(_Group, _Topic, hash_topic, _ClientId, SourceTopic, Count) ->
|
do_pick_subscriber(_Group, _Topic, hash_topic, _ClientId, SourceTopic, Count) ->
|
||||||
|
@ -374,6 +378,16 @@ do_pick_subscriber(Group, Topic, round_robin_per_group, _ClientId, _SourceTopic,
|
||||||
{Group, Topic}, 0
|
{Group, Topic}, 0
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
%% Select ETS table to get all subscriber pids which are not down.
|
||||||
|
subscribers(Group, Topic, FailedSubs) ->
|
||||||
|
lists:filter(
|
||||||
|
fun(P) ->
|
||||||
|
?SUBSCRIBER_DOWN =/= maps:get(P, FailedSubs, false)
|
||||||
|
end,
|
||||||
|
subscribers(Group, Topic)
|
||||||
|
).
|
||||||
|
|
||||||
|
%% Select ETS table to get all subscriber pids.
|
||||||
subscribers(Group, Topic) ->
|
subscribers(Group, Topic) ->
|
||||||
ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]).
|
ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]).
|
||||||
|
|
||||||
|
@ -427,6 +441,7 @@ handle_info(
|
||||||
{mnesia_table_event, {write, #emqx_shared_subscription{subpid = SubPid}, _}},
|
{mnesia_table_event, {write, #emqx_shared_subscription{subpid = SubPid}, _}},
|
||||||
State = #state{pmon = PMon}
|
State = #state{pmon = PMon}
|
||||||
) ->
|
) ->
|
||||||
|
ok = maybe_insert_alive_tab(SubPid),
|
||||||
{noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
|
{noreply, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
|
||||||
%% The subscriber may have subscribed multiple topics, so we need to keep monitoring the PID until
|
%% The subscriber may have subscribed multiple topics, so we need to keep monitoring the PID until
|
||||||
%% it `unsubscribed` the last topic.
|
%% it `unsubscribed` the last topic.
|
||||||
|
@ -512,8 +527,10 @@ update_stats(State) ->
|
||||||
State.
|
State.
|
||||||
|
|
||||||
%% Return 'true' if the subscriber process is alive AND not in the failed list
|
%% Return 'true' if the subscriber process is alive AND not in the failed list
|
||||||
is_active_sub(Pid, FailedSubs) ->
|
is_active_sub(Pid, FailedSubs, All) ->
|
||||||
is_alive_sub(Pid) andalso not lists:member(Pid, FailedSubs).
|
lists:member(Pid, All) andalso
|
||||||
|
(not maps:is_key(Pid, FailedSubs)) andalso
|
||||||
|
is_alive_sub(Pid).
|
||||||
|
|
||||||
%% erlang:is_process_alive/1 does not work with remote pid.
|
%% erlang:is_process_alive/1 does not work with remote pid.
|
||||||
is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) ->
|
is_alive_sub(Pid) when ?IS_LOCAL_PID(Pid) ->
|
||||||
|
|
|
@ -190,7 +190,7 @@ t_publish_qos2_with_error_return(_) ->
|
||||||
|
|
||||||
begin
|
begin
|
||||||
Msg2 = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload2">>),
|
Msg2 = emqx_message:make(clientid, ?QOS_2, <<"t">>, <<"payload2">>),
|
||||||
{ok, [], Session1} = emqx_session:publish(clientinfo(), PacketId2 = 2, Msg2, Session),
|
{ok, [], Session1} = emqx_session:publish(clientinfo(), _PacketId2 = 2, Msg2, Session),
|
||||||
?assertEqual(2, emqx_session:info(awaiting_rel_cnt, Session1)),
|
?assertEqual(2, emqx_session:info(awaiting_rel_cnt, Session1)),
|
||||||
{error, RC2 = ?RC_RECEIVE_MAXIMUM_EXCEEDED} = emqx_session:publish(
|
{error, RC2 = ?RC_RECEIVE_MAXIMUM_EXCEEDED} = emqx_session:publish(
|
||||||
clientinfo(), _PacketId3 = 3, Msg2, Session1
|
clientinfo(), _PacketId3 = 3, Msg2, Session1
|
||||||
|
|
|
@ -344,9 +344,42 @@ t_sticky(Config) when is_list(Config) ->
|
||||||
ok = ensure_config(sticky, true),
|
ok = ensure_config(sticky, true),
|
||||||
test_two_messages(sticky).
|
test_two_messages(sticky).
|
||||||
|
|
||||||
|
%% two subscribers in one shared group
|
||||||
|
%% one unsubscribe after receiving a message
|
||||||
|
%% the other one in the group should receive the next message
|
||||||
|
t_sticky_unsubscribe(Config) when is_list(Config) ->
|
||||||
|
ok = ensure_config(sticky, false),
|
||||||
|
Topic = <<"foo/bar/sticky-unsub">>,
|
||||||
|
ClientId1 = <<"c1-sticky-unsub">>,
|
||||||
|
ClientId2 = <<"c2-sticky-unsub">>,
|
||||||
|
Group = <<"gsu">>,
|
||||||
|
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
|
||||||
|
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]),
|
||||||
|
{ok, _} = emqtt:connect(ConnPid1),
|
||||||
|
{ok, _} = emqtt:connect(ConnPid2),
|
||||||
|
|
||||||
|
ShareTopic = <<"$share/", Group/binary, "/", Topic/binary>>,
|
||||||
|
emqtt:subscribe(ConnPid1, {ShareTopic, 0}),
|
||||||
|
emqtt:subscribe(ConnPid2, {ShareTopic, 0}),
|
||||||
|
|
||||||
|
Message1 = emqx_message:make(ClientId1, 0, Topic, <<"hello1">>),
|
||||||
|
Message2 = emqx_message:make(ClientId2, 0, Topic, <<"hello2">>),
|
||||||
|
ct:sleep(100),
|
||||||
|
|
||||||
|
emqx:publish(Message1),
|
||||||
|
{true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]),
|
||||||
|
emqtt:unsubscribe(UsedSubPid1, ShareTopic),
|
||||||
|
emqx:publish(Message2),
|
||||||
|
{true, UsedSubPid2} = last_message(<<"hello2">>, [ConnPid1, ConnPid2]),
|
||||||
|
?assertNotEqual(UsedSubPid1, UsedSubPid2),
|
||||||
|
|
||||||
|
kill_process(ConnPid1, fun(_) -> emqtt:stop(ConnPid1) end),
|
||||||
|
kill_process(ConnPid2, fun(_) -> emqtt:stop(ConnPid2) end),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_hash(Config) when is_list(Config) ->
|
t_hash(Config) when is_list(Config) ->
|
||||||
ok = ensure_config(hash, false),
|
ok = ensure_config(hash_clientid, false),
|
||||||
test_two_messages(hash).
|
test_two_messages(hash_clientid).
|
||||||
|
|
||||||
t_hash_clinetid(Config) when is_list(Config) ->
|
t_hash_clinetid(Config) when is_list(Config) ->
|
||||||
ok = ensure_config(hash_clientid, false),
|
ok = ensure_config(hash_clientid, false),
|
||||||
|
@ -453,7 +486,7 @@ test_two_messages(Strategy, Group) ->
|
||||||
sticky -> ?assertEqual(UsedSubPid1, UsedSubPid2);
|
sticky -> ?assertEqual(UsedSubPid1, UsedSubPid2);
|
||||||
round_robin -> ?assertNotEqual(UsedSubPid1, UsedSubPid2);
|
round_robin -> ?assertNotEqual(UsedSubPid1, UsedSubPid2);
|
||||||
round_robin_per_group -> ?assertNotEqual(UsedSubPid1, UsedSubPid2);
|
round_robin_per_group -> ?assertNotEqual(UsedSubPid1, UsedSubPid2);
|
||||||
hash -> ?assertEqual(UsedSubPid1, UsedSubPid2);
|
hash_clientid -> ?assertEqual(UsedSubPid1, UsedSubPid2);
|
||||||
_ -> ok
|
_ -> ok
|
||||||
end,
|
end,
|
||||||
ok.
|
ok.
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
|
|
||||||
-module(emqx_connector_jwt).
|
-module(emqx_connector_jwt).
|
||||||
|
|
||||||
-include("emqx_connector_tables.hrl").
|
-include_lib("emqx_connector/include/emqx_connector_tables.hrl").
|
||||||
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
-include_lib("emqx_resource/include/emqx_resource.hrl").
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
|
|
||||||
-behaviour(supervisor).
|
-behaviour(supervisor).
|
||||||
|
|
||||||
-include("emqx_connector_tables.hrl").
|
-include_lib("emqx_connector/include/emqx_connector_tables.hrl").
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
start_link/0,
|
start_link/0,
|
||||||
|
|
|
@ -17,3 +17,6 @@
|
||||||
## Bug fixes
|
## Bug fixes
|
||||||
|
|
||||||
- Trigger `message.dropped` hook when QoS2 message is resend by client with a same packet id, or 'awaiting_rel' queue is full [#9487](https://github.com/emqx/emqx/pull/9487).
|
- Trigger `message.dropped` hook when QoS2 message is resend by client with a same packet id, or 'awaiting_rel' queue is full [#9487](https://github.com/emqx/emqx/pull/9487).
|
||||||
|
|
||||||
|
- Fix shared subscription 'sticky' strategy [#9578](https://github.com/emqx/emqx/pull/9578).
|
||||||
|
Prior to this change, a 'sticky' subscriber may continue to receive messages after unsubscribing.
|
||||||
|
|
|
@ -17,3 +17,6 @@
|
||||||
## 修复
|
## 修复
|
||||||
|
|
||||||
- 当 QoS2 消息被重发(使用相同 Packet ID),或当 'awaiting_rel' 队列已满时,触发消息丢弃钩子(`message.dropped`)及计数器 [#9487](https://github.com/emqx/emqx/pull/9487)。
|
- 当 QoS2 消息被重发(使用相同 Packet ID),或当 'awaiting_rel' 队列已满时,触发消息丢弃钩子(`message.dropped`)及计数器 [#9487](https://github.com/emqx/emqx/pull/9487)。
|
||||||
|
|
||||||
|
- 修复共享订阅的 'sticky' 策略 [#9578](https://github.com/emqx/emqx/pull/9578)。
|
||||||
|
在此修复前,使用 'sticky' 策略的订阅客户端可能在取消订阅后继续收到消息。
|
||||||
|
|
Loading…
Reference in New Issue