Merge pull request #7807 from emqx/shared_redispatch43
feat(shared): redispatch to another shared sub, when no ACK received
This commit is contained in:
commit
192c65047e
|
@ -22,6 +22,7 @@ File format:
|
|||
* Add more rule engine date functions: format_date/3, format_date/4, date_to_unix_ts/4 [#7894]
|
||||
* Add proto_name and proto_ver fields for $event/client_disconnected event.
|
||||
* Mnesia auth/acl http api support multiple condition queries.
|
||||
* Inflight QoS1 Messages for shared topics are now redispatched to another alive subscribers upon chosen subscriber session termination.
|
||||
|
||||
### Bug fixes
|
||||
* List subscription topic (/api/v4/subscriptions), the result do not match with multiple conditions.
|
||||
|
|
|
@ -2,15 +2,18 @@
|
|||
%% Unless you know what you are doing, DO NOT edit manually!!
|
||||
{VSN,
|
||||
[{"4.3.15",
|
||||
[{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.14",
|
||||
[{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_sys,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
|
||||
|
@ -19,7 +22,8 @@
|
|||
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.13",
|
||||
[{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx,brutal_purge,soft_purge,[]},
|
||||
|
@ -462,12 +466,15 @@
|
|||
{<<".*">>,[]}],
|
||||
[{"4.3.15",
|
||||
[{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_frame,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_app,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.14",
|
||||
[{load_module,emqx_misc,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
|
||||
|
@ -478,7 +485,8 @@
|
|||
{load_module,emqx_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.13",
|
||||
[{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
[{load_module,emqx_session,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_access_rule,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_hooks,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx,brutal_purge,soft_purge,[]},
|
||||
|
|
|
@ -454,9 +454,12 @@ deliver_msg(ClientInfo, Msg = #message{qos = QoS}, Session =
|
|||
end,
|
||||
{ok, Session1};
|
||||
false ->
|
||||
%% Note that we publish message without shared ack header
|
||||
%% But add to inflight with ack headers
|
||||
%% This ack header is required for redispatch-on-terminate feature to work
|
||||
Publish = {PacketId, maybe_ack(Msg)},
|
||||
Session1 = await(PacketId, Msg, Session),
|
||||
{ok, [Publish], next_pkt_id(Session1)}
|
||||
Inflight1 = emqx_inflight:insert(PacketId, with_ts(Msg), Inflight),
|
||||
{ok, [Publish], next_pkt_id(Session#session{inflight = Inflight1})}
|
||||
end.
|
||||
|
||||
-spec(enqueue(emqx_types:clientinfo(), list(emqx_types:deliver())|emqx_types:message(),
|
||||
|
@ -491,14 +494,10 @@ enrich_delivers({deliver, Topic, Msg}, Session = #session{subscriptions = Subs})
|
|||
enrich_subopts(get_subopts(Topic, Subs), Msg, Session).
|
||||
|
||||
maybe_ack(Msg) ->
|
||||
case emqx_shared_sub:is_ack_required(Msg) of
|
||||
true -> emqx_shared_sub:maybe_ack(Msg);
|
||||
false -> Msg
|
||||
end.
|
||||
emqx_shared_sub:maybe_ack(Msg).
|
||||
|
||||
maybe_nack(Msg) ->
|
||||
emqx_shared_sub:is_ack_required(Msg)
|
||||
andalso (ok == emqx_shared_sub:maybe_nack_dropped(Msg)).
|
||||
emqx_shared_sub:maybe_nack_dropped(Msg).
|
||||
|
||||
get_subopts(Topic, SubMap) ->
|
||||
case maps:find(Topic, SubMap) of
|
||||
|
@ -531,14 +530,6 @@ enrich_subopts([{subid, SubId}|Opts], Msg, Session) ->
|
|||
Msg1 = emqx_message:set_header(properties, Props#{'Subscription-Identifier' => SubId}, Msg),
|
||||
enrich_subopts(Opts, Msg1, Session).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Awaiting ACK for QoS1/QoS2 Messages
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
await(PacketId, Msg, Session = #session{inflight = Inflight}) ->
|
||||
Inflight1 = emqx_inflight:insert(PacketId, with_ts(Msg), Inflight),
|
||||
Session#session{inflight = Inflight1}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Retry Delivery
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -634,16 +625,43 @@ replay(ClientInfo, Session = #session{inflight = Inflight}) ->
|
|||
end.
|
||||
|
||||
-spec(terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok).
|
||||
terminate(ClientInfo, discarded, Session) ->
|
||||
run_hook('session.discarded', [ClientInfo, info(Session)]);
|
||||
terminate(ClientInfo, takeovered, Session) ->
|
||||
run_hook('session.takeovered', [ClientInfo, info(Session)]);
|
||||
terminate(ClientInfo, Reason, Session) ->
|
||||
run_terminate_hooks(ClientInfo, Reason, Session),
|
||||
redispatch_shared_messages(Session),
|
||||
ok.
|
||||
|
||||
run_terminate_hooks(ClientInfo, discarded, Session) ->
|
||||
run_hook('session.discarded', [ClientInfo, info(Session)]);
|
||||
run_terminate_hooks(ClientInfo, takeovered, Session) ->
|
||||
run_hook('session.takeovered', [ClientInfo, info(Session)]);
|
||||
run_terminate_hooks(ClientInfo, Reason, Session) ->
|
||||
run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
|
||||
|
||||
redispatch_shared_messages(#session{inflight = Inflight}) ->
|
||||
InflightList = emqx_inflight:to_list(Inflight),
|
||||
lists:foreach(fun
|
||||
%% Only QoS1 messages get redispatched, because QoS2 messages
|
||||
%% must be sent to the same client, once they're in flight
|
||||
({_, {#message{qos = ?QOS_2} = Msg, _}}) ->
|
||||
?LOG(warning, "Not redispatching qos2 msg: ~s", [emqx_message:format(Msg)]);
|
||||
({_, {#message{topic = Topic, qos = ?QOS_1} = Msg, _}}) ->
|
||||
case emqx_shared_sub:get_group(Msg) of
|
||||
{ok, Group} ->
|
||||
%% Note that dispatch is called with self() in failed subs
|
||||
%% This is done to avoid dispatching back to caller
|
||||
Delivery = #delivery{sender = self(), message = Msg},
|
||||
emqx_shared_sub:dispatch(Group, Topic, Delivery, [self()]);
|
||||
_ ->
|
||||
false
|
||||
end;
|
||||
(_) ->
|
||||
ok
|
||||
end, InflightList).
|
||||
|
||||
-compile({inline, [run_hook/2]}).
|
||||
run_hook(Name, Args) ->
|
||||
ok = emqx_metrics:inc(Name), emqx_hooks:run(Name, Args).
|
||||
ok = emqx_metrics:inc(Name),
|
||||
emqx_hooks:run(Name, Args).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Inc message/delivery expired counter
|
||||
|
|
|
@ -38,12 +38,15 @@
|
|||
, unsubscribe/3
|
||||
]).
|
||||
|
||||
-export([dispatch/3]).
|
||||
-export([ dispatch/3
|
||||
, dispatch/4
|
||||
]).
|
||||
|
||||
-export([ maybe_ack/1
|
||||
, maybe_nack_dropped/1
|
||||
, nack_no_connection/1
|
||||
, is_ack_required/1
|
||||
, get_group/1
|
||||
]).
|
||||
|
||||
%% for testing
|
||||
|
@ -131,7 +134,7 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
|
|||
false ->
|
||||
{error, no_subscribers};
|
||||
{Type, SubPid} ->
|
||||
case do_dispatch(SubPid, Topic, Msg, Type) of
|
||||
case do_dispatch(SubPid, Group, Topic, Msg, Type) of
|
||||
ok -> {ok, 1};
|
||||
{error, _Reason} ->
|
||||
%% Failed to dispatch to this sub, try next.
|
||||
|
@ -153,36 +156,33 @@ strategy(Group) ->
|
|||
ack_enabled() ->
|
||||
emqx:get_env(shared_dispatch_ack_enabled, false).
|
||||
|
||||
do_dispatch(SubPid, Topic, Msg, _Type) when SubPid =:= self() ->
|
||||
do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() ->
|
||||
%% Deadlock otherwise
|
||||
_ = erlang:send(SubPid, {deliver, Topic, Msg}),
|
||||
SubPid ! {deliver, Topic, Msg},
|
||||
ok;
|
||||
do_dispatch(SubPid, Topic, Msg, Type) ->
|
||||
dispatch_per_qos(SubPid, Topic, Msg, Type).
|
||||
|
||||
%% return either 'ok' (when everything is fine) or 'error'
|
||||
dispatch_per_qos(SubPid, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
|
||||
do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
|
||||
%% For QoS 0 message, send it as regular dispatch
|
||||
_ = erlang:send(SubPid, {deliver, Topic, Msg}),
|
||||
SubPid ! {deliver, Topic, Msg},
|
||||
ok;
|
||||
dispatch_per_qos(SubPid, Topic, Msg, retry) ->
|
||||
do_dispatch(SubPid, _Group, Topic, Msg, retry) ->
|
||||
%% Retry implies all subscribers nack:ed, send again without ack
|
||||
_ = erlang:send(SubPid, {deliver, Topic, Msg}),
|
||||
SubPid ! {deliver, Topic, Msg},
|
||||
ok;
|
||||
dispatch_per_qos(SubPid, Topic, Msg, fresh) ->
|
||||
do_dispatch(SubPid, Group, Topic, Msg, fresh) ->
|
||||
case ack_enabled() of
|
||||
true ->
|
||||
dispatch_with_ack(SubPid, Topic, Msg);
|
||||
dispatch_with_ack(SubPid, Group, Topic, Msg);
|
||||
false ->
|
||||
_ = erlang:send(SubPid, {deliver, Topic, Msg}),
|
||||
SubPid ! {deliver, Topic, Msg},
|
||||
ok
|
||||
end.
|
||||
|
||||
dispatch_with_ack(SubPid, Topic, Msg) ->
|
||||
dispatch_with_ack(SubPid, Group, Topic, Msg) ->
|
||||
%% For QoS 1/2 message, expect an ack
|
||||
Ref = erlang:monitor(process, SubPid),
|
||||
Sender = self(),
|
||||
_ = erlang:send(SubPid, {deliver, Topic, with_ack_ref(Msg, {Sender, Ref})}),
|
||||
SubPid ! {deliver, Topic, with_group_ack(Msg, Group, Sender, Ref)},
|
||||
Timeout = case Msg#message.qos of
|
||||
?QOS_1 -> timer:seconds(?SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS);
|
||||
?QOS_2 -> infinity
|
||||
|
@ -204,24 +204,32 @@ dispatch_with_ack(SubPid, Topic, Msg) ->
|
|||
_ = erlang:demonitor(Ref, [flush])
|
||||
end.
|
||||
|
||||
with_ack_ref(Msg, SenderRef) ->
|
||||
emqx_message:set_headers(#{shared_dispatch_ack => SenderRef}, Msg).
|
||||
with_group_ack(Msg, Group, Sender, Ref) ->
|
||||
emqx_message:set_headers(#{shared_dispatch_ack => {Group, Sender, Ref}}, Msg).
|
||||
|
||||
without_ack_ref(Msg) ->
|
||||
-spec(without_group_ack(emqx_types:message()) -> emqx_types:message()).
|
||||
without_group_ack(Msg) ->
|
||||
emqx_message:set_headers(#{shared_dispatch_ack => ?NO_ACK}, Msg).
|
||||
|
||||
get_ack_ref(Msg) ->
|
||||
get_group_ack(Msg) ->
|
||||
emqx_message:get_header(shared_dispatch_ack, Msg, ?NO_ACK).
|
||||
|
||||
-spec(get_group(emqx_types:message()) -> {ok, any()} | error).
|
||||
get_group(Msg) ->
|
||||
case get_group_ack(Msg) of
|
||||
?NO_ACK -> error;
|
||||
{Group, _Sender, _Ref} -> {ok, Group}
|
||||
end.
|
||||
|
||||
-spec(is_ack_required(emqx_types:message()) -> boolean()).
|
||||
is_ack_required(Msg) -> ?NO_ACK =/= get_ack_ref(Msg).
|
||||
is_ack_required(Msg) -> ?NO_ACK =/= get_group_ack(Msg).
|
||||
|
||||
%% @doc Negative ack dropped message due to inflight window or message queue being full.
|
||||
-spec(maybe_nack_dropped(emqx_types:message()) -> ok).
|
||||
-spec(maybe_nack_dropped(emqx_types:message()) -> boolean()).
|
||||
maybe_nack_dropped(Msg) ->
|
||||
case get_ack_ref(Msg) of
|
||||
?NO_ACK -> ok;
|
||||
{Sender, Ref} -> nack(Sender, Ref, dropped)
|
||||
case get_group_ack(Msg) of
|
||||
?NO_ACK -> false;
|
||||
{_Group, Sender, Ref} -> ok == nack(Sender, Ref, dropped)
|
||||
end.
|
||||
|
||||
%% @doc Negative ack message due to connection down.
|
||||
|
@ -229,22 +237,22 @@ maybe_nack_dropped(Msg) ->
|
|||
%% i.e is_ack_required returned true.
|
||||
-spec(nack_no_connection(emqx_types:message()) -> ok).
|
||||
nack_no_connection(Msg) ->
|
||||
{Sender, Ref} = get_ack_ref(Msg),
|
||||
{_Group, Sender, Ref} = get_group_ack(Msg),
|
||||
nack(Sender, Ref, no_connection).
|
||||
|
||||
-spec(nack(pid(), reference(), dropped | no_connection) -> ok).
|
||||
nack(Sender, Ref, Reason) ->
|
||||
erlang:send(Sender, {Ref, ?NACK(Reason)}),
|
||||
Sender ! {Ref, ?NACK(Reason)},
|
||||
ok.
|
||||
|
||||
-spec(maybe_ack(emqx_types:message()) -> emqx_types:message()).
|
||||
maybe_ack(Msg) ->
|
||||
case get_ack_ref(Msg) of
|
||||
case get_group_ack(Msg) of
|
||||
?NO_ACK ->
|
||||
Msg;
|
||||
{Sender, Ref} ->
|
||||
erlang:send(Sender, {Ref, ?ACK}),
|
||||
without_ack_ref(Msg)
|
||||
{_Group, Sender, Ref} ->
|
||||
Sender ! {Ref, ?ACK},
|
||||
without_group_ack(Msg)
|
||||
end.
|
||||
|
||||
pick(sticky, ClientId, SourceTopic, Group, Topic, FailedSubs) ->
|
||||
|
|
|
@ -47,20 +47,20 @@ t_is_ack_required(_) ->
|
|||
?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})).
|
||||
|
||||
t_maybe_nack_dropped(_) ->
|
||||
?assertEqual(ok, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})),
|
||||
Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}},
|
||||
?assertEqual(ok, emqx_shared_sub:maybe_nack_dropped(Msg)),
|
||||
?assertEqual(false, emqx_shared_sub:maybe_nack_dropped(#message{headers = #{}})),
|
||||
Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
|
||||
?assertEqual(true, emqx_shared_sub:maybe_nack_dropped(Msg)),
|
||||
?assertEqual(ok,receive {for_test, {shared_sub_nack, dropped}} -> ok after 100 -> timeout end).
|
||||
|
||||
t_nack_no_connection(_) ->
|
||||
Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}},
|
||||
Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
|
||||
?assertEqual(ok, emqx_shared_sub:nack_no_connection(Msg)),
|
||||
?assertEqual(ok,receive {for_test, {shared_sub_nack, no_connection}} -> ok
|
||||
after 100 -> timeout end).
|
||||
|
||||
t_maybe_ack(_) ->
|
||||
?assertEqual(#message{headers = #{}}, emqx_shared_sub:maybe_ack(#message{headers = #{}})),
|
||||
Msg = #message{headers = #{shared_dispatch_ack => {self(), for_test}}},
|
||||
Msg = #message{headers = #{shared_dispatch_ack => {<<"group">>, self(), for_test}}},
|
||||
?assertEqual(#message{headers = #{shared_dispatch_ack => ?no_ack}},
|
||||
emqx_shared_sub:maybe_ack(Msg)),
|
||||
?assertEqual(ok,receive {for_test, ?ack} -> ok after 100 -> timeout end).
|
||||
|
@ -284,11 +284,14 @@ test_two_messages(Strategy, Group) ->
|
|||
ok.
|
||||
|
||||
last_message(ExpectedPayload, Pids) ->
|
||||
last_message(ExpectedPayload, Pids, 100).
|
||||
|
||||
last_message(ExpectedPayload, Pids, Timeout) ->
|
||||
receive
|
||||
{publish, #{client_pid := Pid, payload := ExpectedPayload}} ->
|
||||
ct:pal("last_message: ~p ====== ~p, payload=~p", [Pids, Pid, ExpectedPayload]),
|
||||
{true, Pid}
|
||||
after 100 ->
|
||||
after Timeout ->
|
||||
ct:pal("not yet"),
|
||||
<<"not yet?">>
|
||||
end.
|
||||
|
@ -410,6 +413,39 @@ t_local_fallback(_) ->
|
|||
?assertEqual(UsedSubPid1, UsedSubPid2),
|
||||
ok.
|
||||
|
||||
%% This one tests that broker tries to select another shared subscriber
|
||||
%% If the first one doesn't return an ACK
|
||||
t_redispatch(_) ->
|
||||
ok = ensure_config(sticky, true),
|
||||
application:set_env(emqx, shared_dispatch_ack_enabled, true),
|
||||
|
||||
Group = <<"group1">>,
|
||||
|
||||
Topic = <<"foo/bar">>,
|
||||
ClientId1 = <<"ClientId1">>,
|
||||
ClientId2 = <<"ClientId2">>,
|
||||
{ok, ConnPid1} = emqtt:start_link([{clientid, ClientId1}, {auto_ack, false}]),
|
||||
{ok, ConnPid2} = emqtt:start_link([{clientid, ClientId2}, {auto_ack, false}]),
|
||||
{ok, _} = emqtt:connect(ConnPid1),
|
||||
{ok, _} = emqtt:connect(ConnPid2),
|
||||
|
||||
emqtt:subscribe(ConnPid1, {<<"$share/", Group/binary, "/foo/bar">>, 1}),
|
||||
emqtt:subscribe(ConnPid2, {<<"$share/", Group/binary, "/foo/bar">>, 1}),
|
||||
|
||||
Message = emqx_message:make(ClientId1, 1, Topic, <<"hello1">>),
|
||||
|
||||
emqx:publish(Message),
|
||||
|
||||
{true, UsedSubPid1} = last_message(<<"hello1">>, [ConnPid1, ConnPid2]),
|
||||
ok = emqtt:stop(UsedSubPid1),
|
||||
|
||||
Res = last_message(<<"hello1">>, [ConnPid1, ConnPid2], 6000),
|
||||
?assertMatch({true, Pid} when Pid =/= UsedSubPid1, Res),
|
||||
|
||||
{true, UsedSubPid2} = Res,
|
||||
emqtt:stop(UsedSubPid2),
|
||||
ok.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% help functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -481,6 +517,7 @@ setup_node(Node, Port) ->
|
|||
opts => [{zone,internal}],
|
||||
proto => tcp}]),
|
||||
application:set_env(gen_rpc, port_discovery, manual),
|
||||
application:set_env(gen_rpc, tcp_server_port, Port * 2),
|
||||
ok;
|
||||
(_) ->
|
||||
ok
|
||||
|
|
Loading…
Reference in New Issue