Merge pull request #9094 from emqx/1004-fix-wildcard-redispatch-for-shared-subs
chore: fix shared subscription redispatch
This commit is contained in:
commit
cf5f1fd78c
|
@ -32,6 +32,11 @@ File format:
|
||||||
- Added a test to prevent a last will testament message to be
|
- Added a test to prevent a last will testament message to be
|
||||||
published when a client is denied connection. [#8894](https://github.com/emqx/emqx/pull/8894)
|
published when a client is denied connection. [#8894](https://github.com/emqx/emqx/pull/8894)
|
||||||
|
|
||||||
|
- QoS1 and QoS2 messages in session's buffer are re-dispatched to other members in the group
|
||||||
|
when the session terminates [#9094](https://github.com/emqx/emqx/pull/9094).
|
||||||
|
Prior to this enhancement, one would have to set `broker.shared_dispatch_ack_enabled` to true
|
||||||
|
to prevent sessions from buffering messages, however this acknowledgement comes with a cost.
|
||||||
|
|
||||||
### Bug fixes
|
### Bug fixes
|
||||||
|
|
||||||
- Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908)
|
- Fix delayed publish inaccurate caused by os time change. [#8908](https://github.com/emqx/emqx/pull/8908)
|
||||||
|
@ -48,6 +53,11 @@ File format:
|
||||||
Same `format_status` callback is added here too for `gen_server`s which hold password in
|
Same `format_status` callback is added here too for `gen_server`s which hold password in
|
||||||
their state.
|
their state.
|
||||||
|
|
||||||
|
- Fix shared subscription message re-dispatches [#9094](https://github.com/emqx/emqx/pull/9094).
|
||||||
|
- When discarding QoS 2 inflight messages, there were excessive logs
|
||||||
|
- For wildcard deliveries, the re-dispatch used the wrong topic (the publishing topic,
|
||||||
|
but not the subscribing topic), caused messages to be lost when dispatching.
|
||||||
|
|
||||||
## v4.3.20
|
## v4.3.20
|
||||||
|
|
||||||
### Bug fixes
|
### Bug fixes
|
||||||
|
|
|
@ -2,7 +2,9 @@
|
||||||
%% Unless you know what you are doing, DO NOT edit manually!!
|
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||||
{VSN,
|
{VSN,
|
||||||
[{"4.3.21",
|
[{"4.3.21",
|
||||||
[{add_module,emqx_secret},
|
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
|
{add_module,emqx_secret},
|
||||||
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
|
@ -17,7 +19,8 @@
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.20",
|
{"4.3.20",
|
||||||
[{add_module,emqx_secret},
|
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
|
{add_module,emqx_secret},
|
||||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_router,brutal_purge,soft_purge,[]},
|
{load_module,emqx_router,brutal_purge,soft_purge,[]},
|
||||||
|
@ -34,7 +37,8 @@
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.19",
|
{"4.3.19",
|
||||||
[{add_module,emqx_secret},
|
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
|
{add_module,emqx_secret},
|
||||||
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_router,brutal_purge,soft_purge,[]},
|
{load_module,emqx_router,brutal_purge,soft_purge,[]},
|
||||||
|
@ -52,7 +56,8 @@
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.18",
|
{"4.3.18",
|
||||||
[{add_module,emqx_secret},
|
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
|
{add_module,emqx_secret},
|
||||||
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_router,brutal_purge,soft_purge,[]},
|
{load_module,emqx_router,brutal_purge,soft_purge,[]},
|
||||||
|
@ -857,9 +862,10 @@
|
||||||
{load_module,emqx_message,brutal_purge,soft_purge,[]},
|
{load_module,emqx_message,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
|
||||||
{<<".*">>,[]}],
|
{<<".*">>,[]}],
|
||||||
[
|
[{"4.3.21",
|
||||||
{"4.3.21",
|
[{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||||
[{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||||
|
@ -873,7 +879,8 @@
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.20",
|
{"4.3.20",
|
||||||
[{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_router,brutal_purge,soft_purge,[]},
|
{load_module,emqx_router,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_tracer,brutal_purge,soft_purge,[]},
|
{load_module,emqx_tracer,brutal_purge,soft_purge,[]},
|
||||||
|
@ -889,7 +896,8 @@
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.19",
|
{"4.3.19",
|
||||||
[{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_router,brutal_purge,soft_purge,[]},
|
{load_module,emqx_router,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_tracer,brutal_purge,soft_purge,[]},
|
{load_module,emqx_tracer,brutal_purge,soft_purge,[]},
|
||||||
|
@ -906,7 +914,8 @@
|
||||||
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_channel,brutal_purge,soft_purge,[]}]},
|
||||||
{"4.3.18",
|
{"4.3.18",
|
||||||
[{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_listeners,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
{load_module,emqx_router_helper,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_router,brutal_purge,soft_purge,[]},
|
{load_module,emqx_router,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_tracer,brutal_purge,soft_purge,[]},
|
{load_module,emqx_tracer,brutal_purge,soft_purge,[]},
|
||||||
|
|
|
@ -638,26 +638,35 @@ run_terminate_hooks(ClientInfo, takeovered, Session) ->
|
||||||
run_terminate_hooks(ClientInfo, Reason, Session) ->
|
run_terminate_hooks(ClientInfo, Reason, Session) ->
|
||||||
run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
|
run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
|
||||||
|
|
||||||
redispatch_shared_messages(#session{inflight = Inflight}) ->
|
redispatch_shared_messages(#session{inflight = Inflight, mqueue = Q}) ->
|
||||||
InflightList = emqx_inflight:to_list(Inflight),
|
AllInflights = emqx_inflight:to_list(sort_fun(), Inflight),
|
||||||
lists:foreach(fun
|
F = fun({_, {Msg, _Ts}}) ->
|
||||||
%% Only QoS1 messages get redispatched, because QoS2 messages
|
case Msg of
|
||||||
%% must be sent to the same client, once they're in flight
|
#message{qos = ?QOS_1} ->
|
||||||
({_, {#message{qos = ?QOS_2} = Msg, _}}) ->
|
%% For QoS 2, here is what the spec says:
|
||||||
?LOG(warning, "Not redispatching qos2 msg: ~s", [emqx_message:format(Msg)]);
|
%% If the Client's Session terminates before the Client reconnects,
|
||||||
({_, {#message{topic = Topic, qos = ?QOS_1} = Msg, _}}) ->
|
%% the Server MUST NOT send the Application Message to any other
|
||||||
case emqx_shared_sub:get_group(Msg) of
|
%% subscribed Client [MQTT-4.8.2-5].
|
||||||
{ok, Group} ->
|
{true, Msg};
|
||||||
%% Note that dispatch is called with self() in failed subs
|
|
||||||
%% This is done to avoid dispatching back to caller
|
|
||||||
Delivery = #delivery{sender = self(), message = Msg},
|
|
||||||
emqx_shared_sub:dispatch_to_non_self(Group, Topic, Delivery);
|
|
||||||
_ ->
|
_ ->
|
||||||
|
%% QoS 2, after pubrec is received
|
||||||
|
%% the inflight record is updated to an atom
|
||||||
false
|
false
|
||||||
end;
|
end
|
||||||
(_) ->
|
end,
|
||||||
ok
|
InflightList = lists:filtermap(F, AllInflights),
|
||||||
end, InflightList).
|
MqList = mqueue_to_list(Q, []),
|
||||||
|
emqx_shared_sub:redispatch(InflightList ++ MqList).
|
||||||
|
|
||||||
|
%% convert mqueue to a list
|
||||||
|
%% the messages at the head of the list is to be dispatched before the tail
|
||||||
|
mqueue_to_list(Q, Acc) ->
|
||||||
|
case emqx_mqueue:out(Q) of
|
||||||
|
{empty, _Q} ->
|
||||||
|
lists:reverse(Acc);
|
||||||
|
{{value, Msg}, Q1} ->
|
||||||
|
mqueue_to_list(Q1, [Msg | Acc])
|
||||||
|
end.
|
||||||
|
|
||||||
-compile({inline, [run_hook/2]}).
|
-compile({inline, [run_hook/2]}).
|
||||||
run_hook(Name, Args) ->
|
run_hook(Name, Args) ->
|
||||||
|
|
|
@ -39,7 +39,7 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ dispatch/3
|
-export([ dispatch/3
|
||||||
, dispatch_to_non_self/3
|
, redispatch/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ maybe_ack/1
|
-export([ maybe_ack/1
|
||||||
|
@ -47,7 +47,6 @@
|
||||||
, nack_no_connection/1
|
, nack_no_connection/1
|
||||||
, is_ack_required/1
|
, is_ack_required/1
|
||||||
, is_retry_dispatch/1
|
, is_retry_dispatch/1
|
||||||
, get_group/1
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% for testing
|
%% for testing
|
||||||
|
@ -84,6 +83,9 @@
|
||||||
-define(ACK, shared_sub_ack).
|
-define(ACK, shared_sub_ack).
|
||||||
-define(NACK(Reason), {shared_sub_nack, Reason}).
|
-define(NACK(Reason), {shared_sub_nack, Reason}).
|
||||||
-define(NO_ACK, no_ack).
|
-define(NO_ACK, no_ack).
|
||||||
|
-define(REDISPATCH_TO(GROUP, TOPIC), {GROUP, TOPIC}).
|
||||||
|
|
||||||
|
-type redispatch_to() :: ?REDISPATCH_TO(emqx_topic:group(), emqx_topic:topic()).
|
||||||
|
|
||||||
-record(state, {pmon}).
|
-record(state, {pmon}).
|
||||||
|
|
||||||
|
@ -134,11 +136,12 @@ dispatch(Group, Topic, Delivery) ->
|
||||||
Strategy = strategy(Group),
|
Strategy = strategy(Group),
|
||||||
dispatch(Strategy, Group, Topic, Delivery, _FailedSubs = #{}).
|
dispatch(Strategy, Group, Topic, Delivery, _FailedSubs = #{}).
|
||||||
|
|
||||||
dispatch(Strategy, Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
|
dispatch(Strategy, Group, Topic, Delivery = #delivery{message = Msg0}, FailedSubs) ->
|
||||||
#message{from = ClientId, topic = SourceTopic} = Msg,
|
#message{from = ClientId, topic = SourceTopic} = Msg0,
|
||||||
case pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) of
|
case pick(Strategy, ClientId, SourceTopic, Group, Topic, FailedSubs) of
|
||||||
false -> {error, no_subscribers};
|
false -> {error, no_subscribers};
|
||||||
{Type, SubPid} ->
|
{Type, SubPid} ->
|
||||||
|
Msg = with_redispatch_to(Msg0, Group, Topic),
|
||||||
case do_dispatch(SubPid, Group, Topic, Msg, Type) of
|
case do_dispatch(SubPid, Group, Topic, Msg, Type) of
|
||||||
ok -> {ok, 1};
|
ok -> {ok, 1};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
@ -162,7 +165,7 @@ ack_enabled() ->
|
||||||
emqx:get_env(shared_dispatch_ack_enabled, false).
|
emqx:get_env(shared_dispatch_ack_enabled, false).
|
||||||
|
|
||||||
do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() ->
|
do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() ->
|
||||||
%% Deadlock otherwise
|
%% dispatch without ack, deadlock otherwise
|
||||||
send(SubPid, Topic, {deliver, Topic, Msg});
|
send(SubPid, Topic, {deliver, Topic, Msg});
|
||||||
%% return either 'ok' (when everything is fine) or 'error'
|
%% return either 'ok' (when everything is fine) or 'error'
|
||||||
do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
|
do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
|
||||||
|
@ -176,6 +179,10 @@ do_dispatch(SubPid, Group, Topic, Msg, Type) ->
|
||||||
send(SubPid, Topic, {deliver, Topic, Msg})
|
send(SubPid, Topic, {deliver, Topic, Msg})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
with_redispatch_to(#message{qos = ?QOS_0} = Msg, _Group, _Topic) -> Msg;
|
||||||
|
with_redispatch_to(Msg, Group, Topic) ->
|
||||||
|
emqx_message:set_headers(#{redispatch_to => ?REDISPATCH_TO(Group, Topic)}, Msg).
|
||||||
|
|
||||||
dispatch_with_ack(SubPid, Group, Topic, Msg, Type) ->
|
dispatch_with_ack(SubPid, Group, Topic, Msg, Type) ->
|
||||||
%% For QoS 1/2 message, expect an ack
|
%% For QoS 1/2 message, expect an ack
|
||||||
Ref = erlang:monitor(process, SubPid),
|
Ref = erlang:monitor(process, SubPid),
|
||||||
|
@ -228,13 +235,22 @@ without_group_ack(Msg) ->
|
||||||
get_group_ack(Msg) ->
|
get_group_ack(Msg) ->
|
||||||
emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK).
|
emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK).
|
||||||
|
|
||||||
-spec(get_group(emqx_types:message()) -> {ok, any()} | error).
|
%% @hidden Redispatch is neede only for the messages with redispatch_to header added.
|
||||||
get_group(Msg) ->
|
is_redispatch_needed(#message{} = Msg) ->
|
||||||
case get_group_ack(Msg) of
|
case get_redispatch_to(Msg) of
|
||||||
{_Sender, {_Type, Group, _Ref}} -> {ok, Group};
|
?REDISPATCH_TO(_, _) ->
|
||||||
_ -> error
|
true;
|
||||||
|
_ ->
|
||||||
|
false
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% @hidden Return the `redispatch_to` group-topic in the message header.
|
||||||
|
%% `false` is returned if the message is not a shared dispatch.
|
||||||
|
%% or when it's a QoS 0 message.
|
||||||
|
-spec(get_redispatch_to(emqx_types:message()) -> redispatch_to() | false).
|
||||||
|
get_redispatch_to(Msg) ->
|
||||||
|
emqx_message:get_header(redispatch_to, Msg, false).
|
||||||
|
|
||||||
-spec(is_ack_required(emqx_types:message()) -> boolean()).
|
-spec(is_ack_required(emqx_types:message()) -> boolean()).
|
||||||
is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg).
|
is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg).
|
||||||
|
|
||||||
|
@ -245,6 +261,26 @@ is_retry_dispatch(Msg) ->
|
||||||
_ -> false
|
_ -> false
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% @doc Redispatch shared deliveries to other members in the group.
|
||||||
|
redispatch(Messages0) ->
|
||||||
|
Messages = lists:filter(fun is_redispatch_needed/1, Messages0),
|
||||||
|
case length(Messages) of
|
||||||
|
L when L > 0 ->
|
||||||
|
?LOG(info, "Redispatching ~p shared subscription message(s)", [L]),
|
||||||
|
lists:foreach(fun redispatch_shared_message/1, Messages);
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
redispatch_shared_message(#message{} = Msg) ->
|
||||||
|
%% As long as it's still a #message{} record in inflight,
|
||||||
|
%% we should try to re-dispatch
|
||||||
|
?REDISPATCH_TO(Group, Topic) = get_redispatch_to(Msg),
|
||||||
|
%% Note that dispatch is called with self() in failed subs
|
||||||
|
%% This is done to avoid dispatching back to caller
|
||||||
|
Delivery = #delivery{sender = self(), message = Msg},
|
||||||
|
dispatch_to_non_self(Group, Topic, Delivery).
|
||||||
|
|
||||||
%% @doc Negative ack dropped message due to inflight window or message queue being full.
|
%% @doc Negative ack dropped message due to inflight window or message queue being full.
|
||||||
-spec(maybe_nack_dropped(emqx_types:message()) -> store | drop).
|
-spec(maybe_nack_dropped(emqx_types:message()) -> store | drop).
|
||||||
maybe_nack_dropped(Msg) ->
|
maybe_nack_dropped(Msg) ->
|
||||||
|
|
|
@ -25,13 +25,24 @@
|
||||||
|
|
||||||
-define(SUITE, ?MODULE).
|
-define(SUITE, ?MODULE).
|
||||||
|
|
||||||
-define(wait(For, Timeout),
|
|
||||||
emqx_ct_helpers:wait_for(
|
|
||||||
?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)).
|
|
||||||
|
|
||||||
-define(ack, shared_sub_ack).
|
-define(ack, shared_sub_ack).
|
||||||
-define(no_ack, no_ack).
|
-define(no_ack, no_ack).
|
||||||
|
|
||||||
|
-define(WAIT(TIMEOUT, PATTERN, Res),
|
||||||
|
(fun() ->
|
||||||
|
receive
|
||||||
|
PATTERN ->
|
||||||
|
Res;
|
||||||
|
Other ->
|
||||||
|
ct:fail(#{expected => ??PATTERN,
|
||||||
|
got => Other
|
||||||
|
})
|
||||||
|
after
|
||||||
|
TIMEOUT ->
|
||||||
|
ct:fail({timeout, ??PATTERN})
|
||||||
|
end
|
||||||
|
end)()).
|
||||||
|
|
||||||
all() -> emqx_ct:all(?SUITE).
|
all() -> emqx_ct:all(?SUITE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
|
@ -151,40 +162,7 @@ t_no_connection_nack(Config) when is_list(Config) ->
|
||||||
SendF(1),
|
SendF(1),
|
||||||
ct:sleep(200),
|
ct:sleep(200),
|
||||||
%% This is the connection which was picked by broker to dispatch (sticky) for 1st message
|
%% This is the connection which was picked by broker to dispatch (sticky) for 1st message
|
||||||
|
|
||||||
?assertMatch([#{packet_id := 1}], recv_msgs(1)),
|
?assertMatch([#{packet_id := 1}], recv_msgs(1)),
|
||||||
%% Now kill the connection, expect all following messages to be delivered to the other
|
|
||||||
%% subscriber.
|
|
||||||
%emqx_mock_client:stop(ConnPid),
|
|
||||||
%% sleep then make synced calls to session processes to ensure that
|
|
||||||
%% the connection pid's 'EXIT' message is propagated to the session process
|
|
||||||
%% also to be sure sessions are still alive
|
|
||||||
% timer:sleep(2),
|
|
||||||
% _ = emqx_session:info(SPid1),
|
|
||||||
% _ = emqx_session:info(SPid2),
|
|
||||||
% %% Now we know what is the other still alive connection
|
|
||||||
% [TheOtherConnPid] = [SubConnPid1, SubConnPid2] -- [ConnPid],
|
|
||||||
% %% Send some more messages
|
|
||||||
% PacketIdList = lists:seq(2, 10),
|
|
||||||
% lists:foreach(fun(Id) ->
|
|
||||||
% SendF(Id),
|
|
||||||
% ?wait(Received(Id, TheOtherConnPid), 1000)
|
|
||||||
% end, PacketIdList),
|
|
||||||
% %% Now close the 2nd (last connection)
|
|
||||||
% emqx_mock_client:stop(TheOtherConnPid),
|
|
||||||
% timer:sleep(2),
|
|
||||||
% %% both sessions should have conn_pid = undefined
|
|
||||||
% ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid1))),
|
|
||||||
% ?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid2))),
|
|
||||||
% %% send more messages, but all should be queued in session state
|
|
||||||
% lists:foreach(fun(Id) -> SendF(Id) end, PacketIdList),
|
|
||||||
% {_, L1} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid1)),
|
|
||||||
% {_, L2} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid2)),
|
|
||||||
% ?assertEqual(length(PacketIdList), L1 + L2),
|
|
||||||
% %% clean up
|
|
||||||
% emqx_mock_client:close_session(PubConnPid),
|
|
||||||
% emqx_sm:close_session(SPid1),
|
|
||||||
% emqx_sm:close_session(SPid2),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_random(Config) when is_list(Config) ->
|
t_random(Config) when is_list(Config) ->
|
||||||
|
@ -443,8 +421,14 @@ t_local_fallback(Config) when is_list(Config) ->
|
||||||
|
|
||||||
%% This one tests that broker tries to select another shared subscriber
|
%% This one tests that broker tries to select another shared subscriber
|
||||||
%% If the first one doesn't return an ACK
|
%% If the first one doesn't return an ACK
|
||||||
t_redispatch(Config) when is_list(Config) ->
|
t_redispatch_with_ack(Config) when is_list(Config) ->
|
||||||
ok = ensure_config(sticky, true),
|
test_redispatch(Config, true).
|
||||||
|
|
||||||
|
t_redispatch_no_ack(Config) when is_list(Config) ->
|
||||||
|
test_redispatch(Config, false).
|
||||||
|
|
||||||
|
test_redispatch(_Config, AckEnabled) ->
|
||||||
|
ok = ensure_config(sticky, AckEnabled),
|
||||||
application:set_env(emqx, shared_dispatch_ack_enabled, true),
|
application:set_env(emqx, shared_dispatch_ack_enabled, true),
|
||||||
|
|
||||||
Group = <<"group1">>,
|
Group = <<"group1">>,
|
||||||
|
@ -474,15 +458,59 @@ t_redispatch(Config) when is_list(Config) ->
|
||||||
emqtt:stop(UsedSubPid2),
|
emqtt:stop(UsedSubPid2),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_redispatch_wildcard_with_ack(Config) when is_list(Config)->
|
||||||
|
redispatch_wildcard(Config, true).
|
||||||
|
|
||||||
|
t_redispatch_wildcard_no_ack(Config) when is_list(Config) ->
|
||||||
|
redispatch_wildcard(Config, false).
|
||||||
|
|
||||||
|
%% This one tests that broker tries to redispatch to another member in the group
|
||||||
|
%% if the first one disconnected before acking (auto_ack set to false)
|
||||||
|
redispatch_wildcard(_Config, AckEnabled) ->
|
||||||
|
ok = ensure_config(sticky, AckEnabled),
|
||||||
|
|
||||||
|
Group = <<"group1">>,
|
||||||
|
|
||||||
|
Topic = <<"foo/bar/1">>,
|
||||||
|
ClientId1 = <<"ClientId1">>,
|
||||||
|
ClientId2 = <<"ClientId2">>,
|
||||||
|
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
|
||||||
|
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, false}]),
|
||||||
|
{ok, _} = emqtt:connect(ConnPid1),
|
||||||
|
{ok, _} = emqtt:connect(ConnPid2),
|
||||||
|
|
||||||
|
emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar/#">>, 1}),
|
||||||
|
emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar/#">>, 1}),
|
||||||
|
|
||||||
|
Message = emqx_message:make(ClientId1, 1, Topic, <<"hello1">>),
|
||||||
|
|
||||||
|
emqx:publish(Message),
|
||||||
|
|
||||||
|
{true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]),
|
||||||
|
ok = emqtt:stop(UsedSubPid1),
|
||||||
|
|
||||||
|
Res = last_message(<<"hello1">>, [ConnPid1, ConnPid2], 6000),
|
||||||
|
?assertMatch({true, Pid} when Pid =/= UsedSubPid1, Res),
|
||||||
|
|
||||||
|
{true, UsedSubPid2} = Res,
|
||||||
|
emqtt:stop(UsedSubPid2),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_dispatch_when_inflights_are_full({init, Config}) ->
|
||||||
|
%% make sure broker does not push more than one inflight
|
||||||
|
meck:new(emqx_zone, [passthrough, no_history]),
|
||||||
|
meck:expect(emqx_zone, max_inflight, fun(_Zone) -> 1 end),
|
||||||
|
Config;
|
||||||
|
t_dispatch_when_inflights_are_full({'end', _Config}) ->
|
||||||
|
meck:unload(emqx_zone);
|
||||||
t_dispatch_when_inflights_are_full(Config) when is_list(Config) ->
|
t_dispatch_when_inflights_are_full(Config) when is_list(Config) ->
|
||||||
ok = ensure_config(round_robin, true),
|
ok = ensure_config(round_robin, _AckEnabled = true),
|
||||||
Topic = <<"foo/bar">>,
|
Topic = <<"foo/bar">>,
|
||||||
ClientId1 = <<"ClientId1">>,
|
ClientId1 = <<"ClientId1">>,
|
||||||
ClientId2 = <<"ClientId2">>,
|
ClientId2 = <<"ClientId2">>,
|
||||||
|
|
||||||
%% Note that max_inflight is 1
|
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}]),
|
||||||
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {max_inflight, 1}]),
|
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}]),
|
||||||
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {max_inflight, 1}]),
|
|
||||||
{ok, _} = emqtt:connect(ConnPid1),
|
{ok, _} = emqtt:connect(ConnPid1),
|
||||||
{ok, _} = emqtt:connect(ConnPid2),
|
{ok, _} = emqtt:connect(ConnPid2),
|
||||||
|
|
||||||
|
@ -505,8 +533,7 @@ t_dispatch_when_inflights_are_full(Config) when is_list(Config) ->
|
||||||
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)),
|
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)),
|
||||||
|
|
||||||
%% Now kill any client
|
%% Now kill any client
|
||||||
erlang:exit(ConnPid1, normal),
|
ok = kill_process(ConnPid1),
|
||||||
ct:sleep(100),
|
|
||||||
|
|
||||||
%% And try to send the message
|
%% And try to send the message
|
||||||
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
|
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
|
||||||
|
@ -521,10 +548,137 @@ t_dispatch_when_inflights_are_full(Config) when is_list(Config) ->
|
||||||
emqtt:stop(ConnPid2),
|
emqtt:stop(ConnPid2),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%% No ack, QoS 2 subscriptions,
|
||||||
|
%% client1 receives one message, send pubrec, then suspend
|
||||||
|
%% client2 acts normal (auto_ack=true)
|
||||||
|
%% Expected behaviour:
|
||||||
|
%% the messages sent to client1's inflight and mq are re-dispatched after client1 is down
|
||||||
|
t_dispatch_qos2({init, Config}) when is_list(Config) ->
|
||||||
|
meck:new(emqx_zone, [passthrough, no_history]),
|
||||||
|
meck:expect(emqx_zone, max_inflight, fun(_Zone) -> 1 end),
|
||||||
|
Config;
|
||||||
|
t_dispatch_qos2({'end', Config}) when is_list(Config) ->
|
||||||
|
meck:unload(emqx_zone);
|
||||||
|
t_dispatch_qos2(Config) when is_list(Config) ->
|
||||||
|
ok = ensure_config(round_robin, _AckEnabled = false),
|
||||||
|
Topic = <<"foo/bar/1">>,
|
||||||
|
ClientId1 = <<"ClientId1">>,
|
||||||
|
ClientId2 = <<"ClientId2">>,
|
||||||
|
|
||||||
|
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
|
||||||
|
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]),
|
||||||
|
{ok, _} = emqtt:connect(ConnPid1),
|
||||||
|
{ok, _} = emqtt:connect(ConnPid2),
|
||||||
|
|
||||||
|
emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 2}),
|
||||||
|
emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 2}),
|
||||||
|
|
||||||
|
Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
|
||||||
|
Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
|
||||||
|
Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
|
||||||
|
Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
|
||||||
|
ct:sleep(100),
|
||||||
|
|
||||||
|
ok = sys:suspend(ConnPid1),
|
||||||
|
|
||||||
|
%% One message is inflight
|
||||||
|
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message1)),
|
||||||
|
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message2)),
|
||||||
|
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message3)),
|
||||||
|
?assertMatch([{_, _, {ok, 1}}], emqx:publish(Message4)),
|
||||||
|
|
||||||
|
MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
|
||||||
|
MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
|
||||||
|
%% assert hello2 > hello1 or hello4 > hello3
|
||||||
|
?assert(MsgRec2 > MsgRec1),
|
||||||
|
|
||||||
|
sys:resume(ConnPid1),
|
||||||
|
%% emqtt subscriber automatically sends PUBREC, but since auto_ack is set to false
|
||||||
|
%% so it will never send PUBCOMP, hence EMQX should not attempt to send
|
||||||
|
%% the 4th message yet since max_inflight is 1.
|
||||||
|
MsgRec3 = ?WAIT(2000, {publish, #{client_pid := ConnPid1, payload := P3}}, P3),
|
||||||
|
ct:sleep(100),
|
||||||
|
%% no message expected
|
||||||
|
?assertEqual([], collect_msgs(0)),
|
||||||
|
%% now kill client 1
|
||||||
|
kill_process(ConnPid1),
|
||||||
|
%% client 2 should receive the message
|
||||||
|
MsgRec4 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P4}}, P4),
|
||||||
|
%% assert hello2 > hello1 or hello4 > hello3
|
||||||
|
?assert(MsgRec4 > MsgRec3),
|
||||||
|
emqtt:stop(ConnPid2),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_dispatch_qos0({init, Config}) when is_list(Config) ->
|
||||||
|
Config;
|
||||||
|
t_dispatch_qos0({'end', Config}) when is_list(Config) ->
|
||||||
|
ok;
|
||||||
|
t_dispatch_qos0(Config) when is_list(Config) ->
|
||||||
|
ok = ensure_config(round_robin, _AckEnabled = false),
|
||||||
|
Topic = <<"foo/bar/1">>,
|
||||||
|
ClientId1 = <<"ClientId1">>,
|
||||||
|
ClientId2 = <<"ClientId2">>,
|
||||||
|
|
||||||
|
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
|
||||||
|
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, true}]),
|
||||||
|
{ok, _} = emqtt:connect(ConnPid1),
|
||||||
|
{ok, _} = emqtt:connect(ConnPid2),
|
||||||
|
|
||||||
|
%% subscribe with QoS 0
|
||||||
|
emqtt:subscribe(ConnPid1, {<<"$share/group/foo/bar/#">>, 0}),
|
||||||
|
emqtt:subscribe(ConnPid2, {<<"$share/group/foo/bar/#">>, 0}),
|
||||||
|
|
||||||
|
%% publish with QoS 2, but should be downgraded to 0 as the subscribers
|
||||||
|
%% subscribe with QoS 0
|
||||||
|
Message1 = emqx_message:make(ClientId1, 2, Topic, <<"hello1">>),
|
||||||
|
Message2 = emqx_message:make(ClientId1, 2, Topic, <<"hello2">>),
|
||||||
|
Message3 = emqx_message:make(ClientId1, 2, Topic, <<"hello3">>),
|
||||||
|
Message4 = emqx_message:make(ClientId1, 2, Topic, <<"hello4">>),
|
||||||
|
ct:sleep(100),
|
||||||
|
|
||||||
|
ok = sys:suspend(ConnPid1),
|
||||||
|
|
||||||
|
?assertMatch([_], emqx:publish(Message1)),
|
||||||
|
?assertMatch([_], emqx:publish(Message2)),
|
||||||
|
?assertMatch([_], emqx:publish(Message3)),
|
||||||
|
?assertMatch([_], emqx:publish(Message4)),
|
||||||
|
|
||||||
|
MsgRec1 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P1}}, P1),
|
||||||
|
MsgRec2 = ?WAIT(2000, {publish, #{client_pid := ConnPid2, payload := P2}}, P2),
|
||||||
|
%% assert hello2 > hello1 or hello4 > hello3
|
||||||
|
?assert(MsgRec2 > MsgRec1),
|
||||||
|
|
||||||
|
kill_process(ConnPid1),
|
||||||
|
%% expect no redispatch
|
||||||
|
?assertEqual([], collect_msgs(timer:seconds(2))),
|
||||||
|
emqtt:stop(ConnPid2),
|
||||||
|
ok.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% help functions
|
%% help functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
kill_process(Pid) ->
|
||||||
|
_ = unlink(Pid),
|
||||||
|
_ = monitor(process, Pid),
|
||||||
|
erlang:exit(Pid, kill),
|
||||||
|
receive
|
||||||
|
{'DOWN', _, process, Pid, _} ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
collect_msgs(Timeout) ->
|
||||||
|
collect_msgs([], Timeout).
|
||||||
|
|
||||||
|
collect_msgs(Acc, Timeout) ->
|
||||||
|
receive
|
||||||
|
Msg ->
|
||||||
|
collect_msgs([Msg | Acc], Timeout)
|
||||||
|
after
|
||||||
|
Timeout ->
|
||||||
|
lists:reverse(Acc)
|
||||||
|
end.
|
||||||
|
|
||||||
ensure_config(Strategy) ->
|
ensure_config(Strategy) ->
|
||||||
ensure_config(Strategy, _AckEnabled = true).
|
ensure_config(Strategy, _AckEnabled = true).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue