chore: fix elvis warnings
This commit is contained in:
parent
12cc9065f8
commit
b1959086d9
|
@ -94,7 +94,8 @@ fields(client_bucket) ->
|
||||||
, {initial, sc(initial(), #{default => "0"})}
|
, {initial, sc(initial(), #{default => "0"})}
|
||||||
%% low_water_mark add for emqx_channel and emqx_session
|
%% low_water_mark add for emqx_channel and emqx_session
|
||||||
%% both modules consume first and then check
|
%% both modules consume first and then check
|
||||||
%% so we need to use this value to prevent excessive consumption (e.g, consumption from an empty bucket)
|
%% so we need to use this value to prevent excessive consumption
|
||||||
|
%% (e.g, consumption from an empty bucket)
|
||||||
, {low_water_mark, sc(initial(),
|
, {low_water_mark, sc(initial(),
|
||||||
#{desc => "if the remaining tokens are lower than this value,
|
#{desc => "if the remaining tokens are lower than this value,
|
||||||
the check/consume will succeed, but it will be forced to hang for a short period of time",
|
the check/consume will succeed, but it will be forced to hang for a short period of time",
|
||||||
|
|
|
@ -89,6 +89,8 @@
|
||||||
-export_type([index/0]).
|
-export_type([index/0]).
|
||||||
-import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]).
|
-import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]).
|
||||||
|
|
||||||
|
-elvis([{elvis_style, no_if_expression, disable}]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% API
|
%% API
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -337,8 +339,9 @@ longitudinal(#{id := Id,
|
||||||
|
|
||||||
case lists:min([ShouldAlloc, Flow, Capacity]) of
|
case lists:min([ShouldAlloc, Flow, Capacity]) of
|
||||||
Avaiable when Avaiable > 0 ->
|
Avaiable when Avaiable > 0 ->
|
||||||
%% XXX if capacity is infinity, and flow always > 0, the value in counter
|
%% XXX if capacity is infinity, and flow always > 0, the value in
|
||||||
%% will be overflow at some point in the future, do we need to deal with this situation???
|
%% counter will be overflow at some point in the future, do we need
|
||||||
|
%% to deal with this situation???
|
||||||
{Inc, Node2} = emqx_limiter_correction:add(Avaiable, Node),
|
{Inc, Node2} = emqx_limiter_correction:add(Avaiable, Node),
|
||||||
counters:add(Counter, Index, Inc),
|
counters:add(Counter, Index, Inc),
|
||||||
|
|
||||||
|
|
|
@ -923,7 +923,12 @@ t_ws_cookie_init(_) ->
|
||||||
conn_mod => emqx_ws_connection,
|
conn_mod => emqx_ws_connection,
|
||||||
ws_cookie => WsCookie
|
ws_cookie => WsCookie
|
||||||
},
|
},
|
||||||
Channel = emqx_channel:init(ConnInfo, #{zone => default, limiter => limiter_cfg(), listener => {tcp, default}}),
|
Channel = emqx_channel:init(
|
||||||
|
ConnInfo,
|
||||||
|
#{zone => default,
|
||||||
|
limiter => limiter_cfg(),
|
||||||
|
listener => {tcp, default}
|
||||||
|
}),
|
||||||
?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)).
|
?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -948,7 +953,12 @@ channel(InitFields) ->
|
||||||
maps:fold(fun(Field, Value, Channel) ->
|
maps:fold(fun(Field, Value, Channel) ->
|
||||||
emqx_channel:set_field(Field, Value, Channel)
|
emqx_channel:set_field(Field, Value, Channel)
|
||||||
end,
|
end,
|
||||||
emqx_channel:init(ConnInfo, #{zone => default, limiter => limiter_cfg(), listener => {tcp, default}}),
|
emqx_channel:init(
|
||||||
|
ConnInfo,
|
||||||
|
#{zone => default,
|
||||||
|
limiter => limiter_cfg(),
|
||||||
|
listener => {tcp, default}
|
||||||
|
}),
|
||||||
maps:merge(#{clientinfo => clientinfo(),
|
maps:merge(#{clientinfo => clientinfo(),
|
||||||
session => session(),
|
session => session(),
|
||||||
conn_state => connected
|
conn_state => connected
|
||||||
|
|
|
@ -77,7 +77,8 @@ schema("/exhooks/:name") ->
|
||||||
description => <<"Delete the server">>,
|
description => <<"Delete the server">>,
|
||||||
parameters => params_server_name_in_path(),
|
parameters => params_server_name_in_path(),
|
||||||
responses => #{204 => <<>>,
|
responses => #{204 => <<>>,
|
||||||
500 => error_codes([?BAD_RPC], <<"Bad RPC">>) }
|
500 => error_codes([?BAD_RPC], <<"Bad RPC">>)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -176,7 +177,8 @@ action_with_name(put, #{bindings := #{name := Name}, body := Body}) ->
|
||||||
}};
|
}};
|
||||||
{ok, {error, Reason}} ->
|
{ok, {error, Reason}} ->
|
||||||
{400, #{code => <<"BAD_REQUEST">>,
|
{400, #{code => <<"BAD_REQUEST">>,
|
||||||
message => unicode:characters_to_binary(io_lib:format("Error Reason:~p~n", [Reason]))
|
message => unicode:characters_to_binary(
|
||||||
|
io_lib:format("Error Reason:~p~n", [Reason]))
|
||||||
}};
|
}};
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
{200};
|
{200};
|
||||||
|
|
|
@ -461,11 +461,9 @@ check_timer(Timer, _, _) ->
|
||||||
|
|
||||||
-spec get_backend_module() -> backend().
|
-spec get_backend_module() -> backend().
|
||||||
get_backend_module() ->
|
get_backend_module() ->
|
||||||
#{type := Backend} = emqx:get_config([retainer, config]),
|
ModName = case emqx:get_config([retainer, config]) of
|
||||||
ModName = if Backend =:= built_in_database ->
|
#{type := built_in_database} -> mnesia;
|
||||||
mnesia;
|
#{type := Backend} -> Backend
|
||||||
true ->
|
|
||||||
Backend
|
|
||||||
end,
|
end,
|
||||||
erlang:list_to_existing_atom(io_lib:format("~ts_~ts", [?APP, ModName])).
|
erlang:list_to_existing_atom(io_lib:format("~ts_~ts", [?APP, ModName])).
|
||||||
|
|
||||||
|
|
|
@ -95,7 +95,11 @@ store_retained(_, Msg =#message{topic = Topic}) ->
|
||||||
end,
|
end,
|
||||||
case mria:transaction(?RETAINER_SHARD, Fun) of
|
case mria:transaction(?RETAINER_SHARD, Fun) of
|
||||||
{atomic, ok} -> ok;
|
{atomic, ok} -> ok;
|
||||||
{aborted, Reason} -> ?SLOG(error, #{msg => "failed_to_retain_message", topic => Topic, reason => Reason})
|
{aborted, Reason} ->
|
||||||
|
?SLOG(error, #{ msg => "failed_to_retain_message"
|
||||||
|
, topic => Topic
|
||||||
|
, reason => Reason
|
||||||
|
})
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -90,7 +90,10 @@ t_store_and_clean(_) ->
|
||||||
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||||
{ok, _} = emqtt:connect(C1),
|
{ok, _} = emqtt:connect(C1),
|
||||||
|
|
||||||
emqtt:publish(C1, <<"retained">>, <<"this is a retained message">>, [{qos, 0}, {retain, true}]),
|
emqtt:publish(
|
||||||
|
C1, <<"retained">>,
|
||||||
|
<<"this is a retained message">>,
|
||||||
|
[{qos, 0}, {retain, true}]),
|
||||||
timer:sleep(100),
|
timer:sleep(100),
|
||||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
|
||||||
?assertEqual(1, length(receive_messages(1))),
|
?assertEqual(1, length(receive_messages(1))),
|
||||||
|
@ -118,7 +121,10 @@ t_retain_handling(_) ->
|
||||||
?assertEqual(0, length(receive_messages(1))),
|
?assertEqual(0, length(receive_messages(1))),
|
||||||
{ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/#">>),
|
{ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/#">>),
|
||||||
|
|
||||||
emqtt:publish(C1, <<"retained">>, <<"this is a retained message">>, [{qos, 0}, {retain, true}]),
|
emqtt:publish(
|
||||||
|
C1, <<"retained">>,
|
||||||
|
<<"this is a retained message">>,
|
||||||
|
[{qos, 0}, {retain, true}]),
|
||||||
|
|
||||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
|
||||||
?assertEqual(1, length(receive_messages(1))),
|
?assertEqual(1, length(receive_messages(1))),
|
||||||
|
@ -148,9 +154,18 @@ t_retain_handling(_) ->
|
||||||
t_wildcard_subscription(_) ->
|
t_wildcard_subscription(_) ->
|
||||||
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||||
{ok, _} = emqtt:connect(C1),
|
{ok, _} = emqtt:connect(C1),
|
||||||
emqtt:publish(C1, <<"retained/0">>, <<"this is a retained message 0">>, [{qos, 0}, {retain, true}]),
|
emqtt:publish(
|
||||||
emqtt:publish(C1, <<"retained/1">>, <<"this is a retained message 1">>, [{qos, 0}, {retain, true}]),
|
C1, <<"retained/0">>,
|
||||||
emqtt:publish(C1, <<"retained/a/b/c">>, <<"this is a retained message 2">>, [{qos, 0}, {retain, true}]),
|
<<"this is a retained message 0">>,
|
||||||
|
[{qos, 0}, {retain, true}]),
|
||||||
|
emqtt:publish(
|
||||||
|
C1, <<"retained/1">>,
|
||||||
|
<<"this is a retained message 1">>,
|
||||||
|
[{qos, 0}, {retain, true}]),
|
||||||
|
emqtt:publish(
|
||||||
|
C1, <<"retained/a/b/c">>,
|
||||||
|
<<"this is a retained message 2">>,
|
||||||
|
[{qos, 0}, {retain, true}]),
|
||||||
|
|
||||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0),
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0),
|
||||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+/b/#">>, 0),
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+/b/#">>, 0),
|
||||||
|
@ -165,11 +180,26 @@ t_message_expiry(_) ->
|
||||||
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||||
{ok, _} = emqtt:connect(C1),
|
{ok, _} = emqtt:connect(C1),
|
||||||
|
|
||||||
emqtt:publish(C1, <<"retained/0">>, #{'Message-Expiry-Interval' => 0}, <<"don't expire">>, [{qos, 0}, {retain, true}]),
|
emqtt:publish(
|
||||||
emqtt:publish(C1, <<"retained/1">>, #{'Message-Expiry-Interval' => 2}, <<"expire">>, [{qos, 0}, {retain, true}]),
|
C1, <<"retained/0">>, #{'Message-Expiry-Interval' => 0},
|
||||||
emqtt:publish(C1, <<"retained/2">>, #{'Message-Expiry-Interval' => 5}, <<"don't expire">>, [{qos, 0}, {retain, true}]),
|
<<"don't expire">>,
|
||||||
emqtt:publish(C1, <<"retained/3">>, <<"don't expire">>, [{qos, 0}, {retain, true}]),
|
[{qos, 0}, {retain, true}]),
|
||||||
emqtt:publish(C1, <<"$SYS/retained/4">>, <<"don't expire">>, [{qos, 0}, {retain, true}]),
|
emqtt:publish(
|
||||||
|
C1, <<"retained/1">>, #{'Message-Expiry-Interval' => 2},
|
||||||
|
<<"expire">>,
|
||||||
|
[{qos, 0}, {retain, true}]),
|
||||||
|
emqtt:publish(
|
||||||
|
C1, <<"retained/2">>, #{'Message-Expiry-Interval' => 5},
|
||||||
|
<<"don't expire">>,
|
||||||
|
[{qos, 0}, {retain, true}]),
|
||||||
|
emqtt:publish(
|
||||||
|
C1, <<"retained/3">>,
|
||||||
|
<<"don't expire">>,
|
||||||
|
[{qos, 0}, {retain, true}]),
|
||||||
|
emqtt:publish(
|
||||||
|
C1, <<"$SYS/retained/4">>,
|
||||||
|
<<"don't expire">>,
|
||||||
|
[{qos, 0}, {retain, true}]),
|
||||||
|
|
||||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0),
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0),
|
||||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0),
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0),
|
||||||
|
@ -210,9 +240,18 @@ t_message_expiry_2(_) ->
|
||||||
t_clean(_) ->
|
t_clean(_) ->
|
||||||
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||||
{ok, _} = emqtt:connect(C1),
|
{ok, _} = emqtt:connect(C1),
|
||||||
emqtt:publish(C1, <<"retained/0">>, <<"this is a retained message 0">>, [{qos, 0}, {retain, true}]),
|
emqtt:publish(
|
||||||
emqtt:publish(C1, <<"retained/1">>, <<"this is a retained message 1">>, [{qos, 0}, {retain, true}]),
|
C1, <<"retained/0">>,
|
||||||
emqtt:publish(C1, <<"retained/test/0">>, <<"this is a retained message 2">>, [{qos, 0}, {retain, true}]),
|
<<"this is a retained message 0">>,
|
||||||
|
[{qos, 0}, {retain, true}]),
|
||||||
|
emqtt:publish(
|
||||||
|
C1, <<"retained/1">>,
|
||||||
|
<<"this is a retained message 1">>,
|
||||||
|
[{qos, 0}, {retain, true}]),
|
||||||
|
emqtt:publish(
|
||||||
|
C1, <<"retained/test/0">>,
|
||||||
|
<<"this is a retained message 2">>,
|
||||||
|
[{qos, 0}, {retain, true}]),
|
||||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]),
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]),
|
||||||
?assertEqual(3, length(receive_messages(3))),
|
?assertEqual(3, length(receive_messages(3))),
|
||||||
|
|
||||||
|
@ -227,7 +266,11 @@ t_stop_publish_clear_msg(_) ->
|
||||||
emqx_retainer:update_config(#{<<"stop_publish_clear_msg">> => true}),
|
emqx_retainer:update_config(#{<<"stop_publish_clear_msg">> => true}),
|
||||||
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||||
{ok, _} = emqtt:connect(C1),
|
{ok, _} = emqtt:connect(C1),
|
||||||
emqtt:publish(C1, <<"retained/0">>, <<"this is a retained message 0">>, [{qos, 0}, {retain, true}]),
|
emqtt:publish(
|
||||||
|
C1, <<"retained/0">>,
|
||||||
|
<<"this is a retained message 0">>,
|
||||||
|
[{qos, 0}, {retain, true}]
|
||||||
|
),
|
||||||
|
|
||||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]),
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]),
|
||||||
?assertEqual(1, length(receive_messages(1))),
|
?assertEqual(1, length(receive_messages(1))),
|
||||||
|
@ -239,14 +282,27 @@ t_stop_publish_clear_msg(_) ->
|
||||||
ok = emqtt:disconnect(C1).
|
ok = emqtt:disconnect(C1).
|
||||||
|
|
||||||
t_flow_control(_) ->
|
t_flow_control(_) ->
|
||||||
emqx_retainer:update_config(#{<<"flow_control">> => #{<<"max_read_number">> => 1,
|
emqx_retainer:update_config(#{<<"flow_control">> =>
|
||||||
|
#{<<"max_read_number">> => 1,
|
||||||
<<"msg_deliver_quota">> => 1,
|
<<"msg_deliver_quota">> => 1,
|
||||||
<<"quota_release_interval">> => <<"1s">>}}),
|
<<"quota_release_interval">> => <<"1s">>}}),
|
||||||
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
|
||||||
{ok, _} = emqtt:connect(C1),
|
{ok, _} = emqtt:connect(C1),
|
||||||
emqtt:publish(C1, <<"retained/0">>, <<"this is a retained message 0">>, [{qos, 0}, {retain, true}]),
|
emqtt:publish(
|
||||||
emqtt:publish(C1, <<"retained/1">>, <<"this is a retained message 1">>, [{qos, 0}, {retain, true}]),
|
C1, <<"retained/0">>,
|
||||||
emqtt:publish(C1, <<"retained/3">>, <<"this is a retained message 3">>, [{qos, 0}, {retain, true}]),
|
<<"this is a retained message 0">>,
|
||||||
|
[{qos, 0}, {retain, true}]
|
||||||
|
),
|
||||||
|
emqtt:publish(
|
||||||
|
C1, <<"retained/1">>,
|
||||||
|
<<"this is a retained message 1">>,
|
||||||
|
[{qos, 0}, {retain, true}]
|
||||||
|
),
|
||||||
|
emqtt:publish(
|
||||||
|
C1, <<"retained/3">>,
|
||||||
|
<<"this is a retained message 3">>,
|
||||||
|
[{qos, 0}, {retain, true}]
|
||||||
|
),
|
||||||
Begin = erlang:system_time(millisecond),
|
Begin = erlang:system_time(millisecond),
|
||||||
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]),
|
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]),
|
||||||
?assertEqual(3, length(receive_messages(3))),
|
?assertEqual(3, length(receive_messages(3))),
|
||||||
|
|
|
@ -94,13 +94,23 @@ t_publish_retain_message(_) ->
|
||||||
|
|
||||||
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
||||||
{ok, _} = emqtt:connect(Client1),
|
{ok, _} = emqtt:connect(Client1),
|
||||||
{ok, _} = emqtt:publish(Client1, Topic, #{}, <<"retained message">>, [{qos, 2}, {retain, true}]),
|
{ok, _} = emqtt:publish(
|
||||||
{ok, _} = emqtt:publish(Client1, Topic, #{}, <<"new retained message">>, [{qos, 2}, {retain, true}]),
|
Client1, Topic, #{},
|
||||||
{ok, _} = emqtt:publish(Client1, Topic, #{}, <<"not retained message">>, [{qos, 2}, {retain, false}]),
|
<<"retained message">>,
|
||||||
|
[{qos, 2}, {retain, true}]),
|
||||||
|
{ok, _} = emqtt:publish(
|
||||||
|
Client1, Topic, #{},
|
||||||
|
<<"new retained message">>,
|
||||||
|
[{qos, 2}, {retain, true}]),
|
||||||
|
{ok, _} = emqtt:publish(
|
||||||
|
Client1, Topic, #{},
|
||||||
|
<<"not retained message">>,
|
||||||
|
[{qos, 2}, {retain, false}]),
|
||||||
{ok, _, [2]} = emqtt:subscribe(Client1, Topic, 2),
|
{ok, _, [2]} = emqtt:subscribe(Client1, Topic, 2),
|
||||||
|
|
||||||
[Msg] = receive_messages(1),
|
[Msg] = receive_messages(1),
|
||||||
?assertEqual(<<"new retained message">>, maps:get(payload, Msg)), %% [MQTT-3.3.1-5] [MQTT-3.3.1-8]
|
%% [MQTT-3.3.1-5] [MQTT-3.3.1-8]
|
||||||
|
?assertEqual(<<"new retained message">>, maps:get(payload, Msg)),
|
||||||
|
|
||||||
{ok, _, [0]} = emqtt:unsubscribe(Client1, Topic),
|
{ok, _, [0]} = emqtt:unsubscribe(Client1, Topic),
|
||||||
{ok, _} = emqtt:publish(Client1, Topic, #{}, <<"">>, [{qos, 2}, {retain, true}]),
|
{ok, _} = emqtt:publish(Client1, Topic, #{}, <<"">>, [{qos, 2}, {retain, true}]),
|
||||||
|
@ -113,16 +123,33 @@ t_publish_retain_message(_) ->
|
||||||
t_publish_message_expiry_interval(_) ->
|
t_publish_message_expiry_interval(_) ->
|
||||||
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
||||||
{ok, _} = emqtt:connect(Client1),
|
{ok, _} = emqtt:connect(Client1),
|
||||||
{ok, _} = emqtt:publish(Client1, <<"topic/A">>, #{'Message-Expiry-Interval' => 1}, <<"retained message">>, [{qos, 1}, {retain, true}]),
|
{ok, _} = emqtt:publish(
|
||||||
{ok, _} = emqtt:publish(Client1, <<"topic/B">>, #{'Message-Expiry-Interval' => 1}, <<"retained message">>, [{qos, 2}, {retain, true}]),
|
Client1, <<"topic/A">>, #{'Message-Expiry-Interval' => 1},
|
||||||
{ok, _} = emqtt:publish(Client1, <<"topic/C">>, #{'Message-Expiry-Interval' => 10}, <<"retained message">>, [{qos, 1}, {retain, true}]),
|
<<"retained message">>,
|
||||||
{ok, _} = emqtt:publish(Client1, <<"topic/D">>, #{'Message-Expiry-Interval' => 10}, <<"retained message">>, [{qos, 2}, {retain, true}]),
|
[{qos, 1}, {retain, true}]),
|
||||||
|
{ok, _} = emqtt:publish(
|
||||||
|
Client1, <<"topic/B">>, #{'Message-Expiry-Interval' => 1},
|
||||||
|
<<"retained message">>,
|
||||||
|
[{qos, 2}, {retain, true}]),
|
||||||
|
{ok, _} = emqtt:publish(
|
||||||
|
Client1, <<"topic/C">>, #{'Message-Expiry-Interval' => 10},
|
||||||
|
<<"retained message">>,
|
||||||
|
[{qos, 1}, {retain, true}]),
|
||||||
|
{ok, _} = emqtt:publish(
|
||||||
|
Client1, <<"topic/D">>, #{'Message-Expiry-Interval' => 10},
|
||||||
|
<<"retained message">>,
|
||||||
|
[{qos, 2}, {retain, true}]),
|
||||||
timer:sleep(1500),
|
timer:sleep(1500),
|
||||||
{ok, _, [2]} = emqtt:subscribe(Client1, <<"topic/+">>, 2),
|
{ok, _, [2]} = emqtt:subscribe(Client1, <<"topic/+">>, 2),
|
||||||
Msgs = receive_messages(4),
|
Msgs = receive_messages(4),
|
||||||
?assertEqual(2, length(Msgs)), %% [MQTT-3.3.2-5]
|
?assertEqual(2, length(Msgs)), %% [MQTT-3.3.2-5]
|
||||||
|
|
||||||
L = lists:map(fun(Msg) -> MessageExpiryInterval = maps:get('Message-Expiry-Interval', maps:get(properties, Msg)), MessageExpiryInterval < 10 end, Msgs),
|
L = lists:map(
|
||||||
|
fun(Msg) ->
|
||||||
|
MessageExpiryInterval = maps:get('Message-Expiry-Interval',
|
||||||
|
maps:get(properties, Msg)),
|
||||||
|
MessageExpiryInterval < 10
|
||||||
|
end, Msgs),
|
||||||
?assertEqual(2, length(L)), %% [MQTT-3.3.2-6]
|
?assertEqual(2, length(L)), %% [MQTT-3.3.2-6]
|
||||||
|
|
||||||
ok = emqtt:disconnect(Client1),
|
ok = emqtt:disconnect(Client1),
|
||||||
|
@ -137,9 +164,21 @@ t_publish_message_expiry_interval(_) ->
|
||||||
t_subscribe_retain_handing(_) ->
|
t_subscribe_retain_handing(_) ->
|
||||||
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
||||||
{ok, _} = emqtt:connect(Client1),
|
{ok, _} = emqtt:connect(Client1),
|
||||||
ok = emqtt:publish(Client1, <<"topic/A">>, #{}, <<"retained message">>, [{qos, 0}, {retain, true}]),
|
ok = emqtt:publish(
|
||||||
{ok, _} = emqtt:publish(Client1, <<"topic/B">>, #{}, <<"retained message">>, [{qos, 1}, {retain, true}]),
|
Client1, <<"topic/A">>, #{},
|
||||||
{ok, _} = emqtt:publish(Client1, <<"topic/C">>, #{}, <<"retained message">>, [{qos, 2}, {retain, true}]),
|
<<"retained message">>,
|
||||||
|
[{qos, 0}, {retain, true}]
|
||||||
|
),
|
||||||
|
{ok, _} = emqtt:publish(
|
||||||
|
Client1, <<"topic/B">>, #{},
|
||||||
|
<<"retained message">>,
|
||||||
|
[{qos, 1}, {retain, true}]
|
||||||
|
),
|
||||||
|
{ok, _} = emqtt:publish(
|
||||||
|
Client1, <<"topic/C">>, #{},
|
||||||
|
<<"retained message">>,
|
||||||
|
[{qos, 2}, {retain, true}]
|
||||||
|
),
|
||||||
|
|
||||||
{ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 1}, {qos, 2}]}]),
|
{ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 1}, {qos, 2}]}]),
|
||||||
?assertEqual(3, length(receive_messages(3))), %% [MQTT-3.3.1-10]
|
?assertEqual(3, length(receive_messages(3))), %% [MQTT-3.3.1-10]
|
||||||
|
|
|
@ -104,7 +104,8 @@ on_stats_update(#{clientid := ClientId,
|
||||||
case ets:lookup(?TOPK_TAB, LastIndex) of
|
case ets:lookup(?TOPK_TAB, LastIndex) of
|
||||||
[#top_k{index = Index}] ->
|
[#top_k{index = Index}] ->
|
||||||
%% if last value == the new value, update the type and last_update_time
|
%% if last value == the new value, update the type and last_update_time
|
||||||
%% XXX for clients whose latency are stable for a long time, is it possible to reduce updates?
|
%% XXX for clients whose latency are stable for a long time, is it
|
||||||
|
%% possible to reduce updates?
|
||||||
ets:insert(?TOPK_TAB,
|
ets:insert(?TOPK_TAB,
|
||||||
#top_k{index = Index, type = Type, last_update_time = Ts});
|
#top_k{index = Index, type = Type, last_update_time = Ts});
|
||||||
[_] ->
|
[_] ->
|
||||||
|
|
|
@ -72,11 +72,16 @@ schema("/slow_subscriptions/settings") ->
|
||||||
}.
|
}.
|
||||||
|
|
||||||
fields(record) ->
|
fields(record) ->
|
||||||
[
|
[ {clientid,
|
||||||
{clientid, mk(string(), #{desc => <<"the clientid">>})},
|
mk(string(), #{desc => <<"the clientid">>})},
|
||||||
{latency, mk(integer(), #{desc => <<"average time for message delivery or time for message expire">>})},
|
{latency,
|
||||||
{type, mk(string(), #{desc => <<"type of the latency, could be average or expire">>})},
|
mk(integer(),
|
||||||
{last_update_time, mk(integer(), #{desc => <<"the timestamp of last update">>})}
|
#{desc => <<"average time for message delivery or time for message expire">>})},
|
||||||
|
{type,
|
||||||
|
mk(string(),
|
||||||
|
#{desc => <<"type of the latency, could be average or expire">>})},
|
||||||
|
{last_update_time,
|
||||||
|
mk(integer(), #{desc => <<"the timestamp of last update">>})}
|
||||||
].
|
].
|
||||||
|
|
||||||
conf_schema() ->
|
conf_schema() ->
|
||||||
|
|
|
@ -23,8 +23,8 @@ fields("slow_subs") ->
|
||||||
, {notice_interval,
|
, {notice_interval,
|
||||||
sc(emqx_schema:duration_ms(),
|
sc(emqx_schema:duration_ms(),
|
||||||
"0s",
|
"0s",
|
||||||
"The interval for pushing statistics table records to the system topic. When set to 0, push is disabled"
|
"The interval for pushing statistics table records to the system topic. "
|
||||||
"publish topk list to $SYS/brokers/${node}/slow_subs per notice_interval"
|
"publish topk list to $SYS/brokers/${node}/slow_subs per notice_interval. "
|
||||||
"publish is disabled if set to 0s."
|
"publish is disabled if set to 0s."
|
||||||
)}
|
)}
|
||||||
, {notice_qos,
|
, {notice_qos,
|
||||||
|
|
Loading…
Reference in New Issue