chore: dialyzer warning
This commit is contained in:
parent
50041fdddf
commit
946de4a303
|
@ -171,6 +171,10 @@ strategy(Group) ->
|
||||||
get_default_shared_subscription_strategy()
|
get_default_shared_subscription_strategy()
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec ack_enabled() -> boolean().
|
||||||
|
ack_enabled() ->
|
||||||
|
emqx:get_config([broker, shared_dispatch_ack_enabled], false).
|
||||||
|
|
||||||
do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() ->
|
do_dispatch(SubPid, _Group, Topic, Msg, _Type) when SubPid =:= self() ->
|
||||||
%% Deadlock otherwise
|
%% Deadlock otherwise
|
||||||
SubPid ! {deliver, Topic, Msg},
|
SubPid ! {deliver, Topic, Msg},
|
||||||
|
@ -182,8 +186,14 @@ do_dispatch(SubPid, _Group, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
|
||||||
do_dispatch(SubPid, _Group, Topic, Msg, retry) ->
|
do_dispatch(SubPid, _Group, Topic, Msg, retry) ->
|
||||||
%% Retry implies all subscribers nack:ed, send again without ack
|
%% Retry implies all subscribers nack:ed, send again without ack
|
||||||
send(SubPid, Topic, {deliver, Topic, Msg});
|
send(SubPid, Topic, {deliver, Topic, Msg});
|
||||||
do_dispatch(SubPid, _Group, Topic, Msg, fresh) ->
|
do_dispatch(SubPid, Group, Topic, Msg, fresh) ->
|
||||||
send(SubPid, Topic, {deliver, Topic, Msg}).
|
case ack_enabled() of
|
||||||
|
true ->
|
||||||
|
%% TODO: delete this case after 5.1.0
|
||||||
|
do_dispatch_with_ack(SubPid, Group, Topic, Msg);
|
||||||
|
false ->
|
||||||
|
send(SubPid, Topic, {deliver, Topic, Msg})
|
||||||
|
end.
|
||||||
|
|
||||||
-spec do_dispatch_with_ack(pid(), emqx_types:group(), emqx_types:topic(), emqx_types:message()) ->
|
-spec do_dispatch_with_ack(pid(), emqx_types:group(), emqx_types:topic(), emqx_types:message()) ->
|
||||||
ok | {error, _}.
|
ok | {error, _}.
|
||||||
|
|
|
@ -440,6 +440,7 @@ zone_global_defaults() ->
|
||||||
server_keepalive => disabled,
|
server_keepalive => disabled,
|
||||||
session_expiry_interval => 7200000,
|
session_expiry_interval => 7200000,
|
||||||
shared_subscription => true,
|
shared_subscription => true,
|
||||||
|
shared_subscription_strategy => round_robin,
|
||||||
strict_mode => false,
|
strict_mode => false,
|
||||||
upgrade_qos => false,
|
upgrade_qos => false,
|
||||||
use_username_as_clientid => false,
|
use_username_as_clientid => false,
|
||||||
|
|
|
@ -769,12 +769,12 @@ t_qos1_random_dispatch_if_all_members_are_down(Config) when is_list(Config) ->
|
||||||
%% Expected behaviour:
|
%% Expected behaviour:
|
||||||
%% the messages sent to client1's inflight and mq are re-dispatched after client1 is down
|
%% the messages sent to client1's inflight and mq are re-dispatched after client1 is down
|
||||||
t_dispatch_qos2({init, Config}) when is_list(Config) ->
|
t_dispatch_qos2({init, Config}) when is_list(Config) ->
|
||||||
|
ok = ensure_config(round_robin, _AckEnabled = false),
|
||||||
emqx_config:put_zone_conf(default, [mqtt, max_inflight], 1),
|
emqx_config:put_zone_conf(default, [mqtt, max_inflight], 1),
|
||||||
Config;
|
Config;
|
||||||
t_dispatch_qos2({'end', Config}) when is_list(Config) ->
|
t_dispatch_qos2({'end', Config}) when is_list(Config) ->
|
||||||
emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0);
|
emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0);
|
||||||
t_dispatch_qos2(Config) when is_list(Config) ->
|
t_dispatch_qos2(Config) when is_list(Config) ->
|
||||||
ok = ensure_config(round_robin, _AckEnabled = false),
|
|
||||||
Topic = <<"foo/bar/1">>,
|
Topic = <<"foo/bar/1">>,
|
||||||
ClientId1 = <<"ClientId1">>,
|
ClientId1 = <<"ClientId1">>,
|
||||||
ClientId2 = <<"ClientId2">>,
|
ClientId2 = <<"ClientId2">>,
|
||||||
|
@ -923,12 +923,12 @@ t_session_takeover(Config) when is_list(Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_session_kicked({init, Config}) when is_list(Config) ->
|
t_session_kicked({init, Config}) when is_list(Config) ->
|
||||||
|
ok = ensure_config(round_robin, _AckEnabled = false),
|
||||||
emqx_config:put_zone_conf(default, [mqtt, max_inflight], 1),
|
emqx_config:put_zone_conf(default, [mqtt, max_inflight], 1),
|
||||||
Config;
|
Config;
|
||||||
t_session_kicked({'end', Config}) when is_list(Config) ->
|
t_session_kicked({'end', Config}) when is_list(Config) ->
|
||||||
emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0);
|
emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0);
|
||||||
t_session_kicked(Config) when is_list(Config) ->
|
t_session_kicked(Config) when is_list(Config) ->
|
||||||
ok = ensure_config(round_robin, _AckEnabled = false),
|
|
||||||
Topic = <<"foo/bar/1">>,
|
Topic = <<"foo/bar/1">>,
|
||||||
ClientId1 = <<"ClientId1">>,
|
ClientId1 = <<"ClientId1">>,
|
||||||
ClientId2 = <<"ClientId2">>,
|
ClientId2 = <<"ClientId2">>,
|
||||||
|
@ -1019,12 +1019,12 @@ ensure_config(Strategy) ->
|
||||||
ensure_config(Strategy, _AckEnabled = true).
|
ensure_config(Strategy, _AckEnabled = true).
|
||||||
|
|
||||||
ensure_config(Strategy, AckEnabled) ->
|
ensure_config(Strategy, AckEnabled) ->
|
||||||
emqx_config:put([broker, shared_subscription_strategy], Strategy),
|
emqx_config:put([mqtt, shared_subscription_strategy], Strategy),
|
||||||
emqx_config:put([broker, shared_dispatch_ack_enabled], AckEnabled),
|
emqx_config:put([broker, shared_dispatch_ack_enabled], AckEnabled),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
ensure_node_config(Node, Strategy) ->
|
ensure_node_config(Node, Strategy) ->
|
||||||
rpc:call(Node, emqx_config, force_put, [[broker, shared_subscription_strategy], Strategy]).
|
rpc:call(Node, emqx_config, force_put, [[mqtt, shared_subscription_strategy], Strategy]).
|
||||||
|
|
||||||
ensure_group_config(Group2Strategy) ->
|
ensure_group_config(Group2Strategy) ->
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
|
|
Loading…
Reference in New Issue