Merge pull request #6624 from HJianBo/fix-conf-name-prefix

Remove `emqx_` prefix for some config scopes
This commit is contained in:
Zaiming (Stone) Shi 2022-01-12 20:36:52 +01:00 committed by GitHub
commit cc2ae5ed71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 287 additions and 158 deletions

View File

@ -1,7 +1,8 @@
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## Emq X Rate Limiter ## Emq X Rate Limiter
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
emqx_limiter {
limiter {
bytes_in { bytes_in {
global.rate = infinity # token generation rate global.rate = infinity # token generation rate
zone.default.rate = infinity zone.default.rate = infinity

View File

@ -19,7 +19,8 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-export([ roots/0, fields/1, to_rate/1, to_capacity/1 -export([ roots/0, fields/1, to_rate/1, to_capacity/1
, minimum_period/0, to_burst_rate/1, to_initial/1]). , minimum_period/0, to_burst_rate/1, to_initial/1
, namespace/0]).
-define(KILOBYTE, 1024). -define(KILOBYTE, 1024).
@ -56,16 +57,18 @@
-import(emqx_schema, [sc/2, map/2]). -import(emqx_schema, [sc/2, map/2]).
roots() -> [emqx_limiter]. namespace() -> limiter.
fields(emqx_limiter) -> roots() -> [limiter].
[ {bytes_in, sc(ref(limiter), #{})}
, {message_in, sc(ref(limiter), #{})}
, {connection, sc(ref(limiter), #{})}
, {message_routing, sc(ref(limiter), #{})}
];
fields(limiter) -> fields(limiter) ->
[ {bytes_in, sc(ref(limiter_opts), #{})}
, {message_in, sc(ref(limiter_opts), #{})}
, {connection, sc(ref(limiter_opts), #{})}
, {message_routing, sc(ref(limiter_opts), #{})}
];
fields(limiter_opts) ->
[ {global, sc(ref(rate_burst), #{})} [ {global, sc(ref(rate_burst), #{})}
, {zone, sc(map("zone name", ref(rate_burst)), #{})} , {zone, sc(map("zone name", ref(rate_burst)), #{})}
, {bucket, sc(map("bucket id", ref(bucket)), , {bucket, sc(map("bucket id", ref(bucket)),
@ -94,7 +97,8 @@ fields(client_bucket) ->
, {initial, sc(initial(), #{default => "0"})} , {initial, sc(initial(), #{default => "0"})}
%% low_water_mark add for emqx_channel and emqx_session %% low_water_mark add for emqx_channel and emqx_session
%% both modules consume first and then check %% both modules consume first and then check
%% so we need to use this value to prevent excessive consumption (e.g, consumption from an empty bucket) %% so we need to use this value to prevent excessive consumption
%% (e.g, consumption from an empty bucket)
, {low_water_mark, sc(initial(), , {low_water_mark, sc(initial(),
#{desc => "if the remaining tokens are lower than this value, #{desc => "if the remaining tokens are lower than this value,
the check/consume will succeed, but it will be forced to hang for a short period of time", the check/consume will succeed, but it will be forced to hang for a short period of time",

View File

@ -89,13 +89,15 @@
-export_type([index/0]). -export_type([index/0]).
-import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]). -import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]).
-elvis([{elvis_style, no_if_expression, disable}]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec connect(limiter_type(), -spec connect(limiter_type(),
bucket_name() | #{limiter_type() => bucket_name()}) -> emqx_htb_limiter:limiter(). bucket_name() | #{limiter_type() => bucket_name()}) -> emqx_htb_limiter:limiter().
connect(Type, BucketName) when is_atom(BucketName) -> connect(Type, BucketName) when is_atom(BucketName) ->
Path = [emqx_limiter, Type, bucket, BucketName], Path = [limiter, Type, bucket, BucketName],
case emqx:get_config(Path, undefined) of case emqx:get_config(Path, undefined) of
undefined -> undefined ->
?SLOG(error, #{msg => "bucket_config_not_found", path => Path}), ?SLOG(error, #{msg => "bucket_config_not_found", path => Path}),
@ -337,8 +339,9 @@ longitudinal(#{id := Id,
case lists:min([ShouldAlloc, Flow, Capacity]) of case lists:min([ShouldAlloc, Flow, Capacity]) of
Avaiable when Avaiable > 0 -> Avaiable when Avaiable > 0 ->
%% XXX if capacity is infinity, and flow always > 0, the value in counter %% XXX if capacity is infinity, and flow always > 0, the value in
%% will be overflow at some point in the future, do we need to deal with this situation??? %% counter will be overflow at some point in the future, do we need
%% to deal with this situation???
{Inc, Node2} = emqx_limiter_correction:add(Avaiable, Node), {Inc, Node2} = emqx_limiter_correction:add(Avaiable, Node),
counters:add(Counter, Index, Inc), counters:add(Counter, Index, Inc),
@ -447,7 +450,7 @@ dispatch_burst_to_buckets([], _, Alloced, Nodes) ->
init_tree(Type, State) -> init_tree(Type, State) ->
#{global := Global, #{global := Global,
zone := Zone, zone := Zone,
bucket := Bucket} = emqx:get_config([emqx_limiter, Type]), bucket := Bucket} = emqx:get_config([limiter, Type]),
{Factor, Root} = make_root(Global, Zone), {Factor, Root} = make_root(Global, Zone),
State2 = State#{root := Root}, State2 = State#{root := Root},
{NodeId, State3} = make_zone(maps:to_list(Zone), Factor, 1, State2), {NodeId, State3} = make_zone(maps:to_list(Zone), Factor, 1, State2),

View File

@ -89,6 +89,6 @@ make_child(Type) ->
modules => [emqx_limiter_server]}. modules => [emqx_limiter_server]}.
childs() -> childs() ->
Conf = emqx:get_config([emqx_limiter]), Conf = emqx:get_config([limiter]),
Types = maps:keys(Conf), Types = maps:keys(Conf),
[make_child(Type) || Type <- Types]. [make_child(Type) || Type <- Types].

View File

@ -130,7 +130,7 @@ basic_conf() ->
stats => stats_conf(), stats => stats_conf(),
listeners => listeners_conf(), listeners => listeners_conf(),
zones => zone_conf(), zones => zone_conf(),
emqx_limiter => emqx:get_config([emqx_limiter]) limiter => emqx:get_config([limiter])
}. }.
set_test_listener_confs() -> set_test_listener_confs() ->
@ -201,7 +201,7 @@ modify_limiter(TestCase, NewConf) ->
%% per_client 5/1s,5 %% per_client 5/1s,5
%% aggregated 10/1s,10 %% aggregated 10/1s,10
modify_limiter(#{emqx_limiter := Limiter} = NewConf) -> modify_limiter(#{limiter := Limiter} = NewConf) ->
#{message_routing := #{bucket := Bucket} = Routing} = Limiter, #{message_routing := #{bucket := Bucket} = Routing} = Limiter,
#{default := #{per_client := Client} = Default} = Bucket, #{default := #{per_client := Client} = Default} = Bucket,
Client2 = Client#{rate := 5, Client2 = Client#{rate := 5,
@ -216,7 +216,7 @@ modify_limiter(#{emqx_limiter := Limiter} = NewConf) ->
Bucket2 = Bucket#{default := Default2}, Bucket2 = Bucket#{default := Default2},
Routing2 = Routing#{bucket := Bucket2}, Routing2 = Routing#{bucket := Bucket2},
NewConf2 = NewConf#{emqx_limiter := Limiter#{message_routing := Routing2}}, NewConf2 = NewConf#{limiter := Limiter#{message_routing := Routing2}},
emqx_config:put(NewConf2), emqx_config:put(NewConf2),
emqx_limiter_manager:restart_server(message_routing), emqx_limiter_manager:restart_server(message_routing),
ok. ok.
@ -923,7 +923,12 @@ t_ws_cookie_init(_) ->
conn_mod => emqx_ws_connection, conn_mod => emqx_ws_connection,
ws_cookie => WsCookie ws_cookie => WsCookie
}, },
Channel = emqx_channel:init(ConnInfo, #{zone => default, limiter => limiter_cfg(), listener => {tcp, default}}), Channel = emqx_channel:init(
ConnInfo,
#{zone => default,
limiter => limiter_cfg(),
listener => {tcp, default}
}),
?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)). ?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -948,7 +953,12 @@ channel(InitFields) ->
maps:fold(fun(Field, Value, Channel) -> maps:fold(fun(Field, Value, Channel) ->
emqx_channel:set_field(Field, Value, Channel) emqx_channel:set_field(Field, Value, Channel)
end, end,
emqx_channel:init(ConnInfo, #{zone => default, limiter => limiter_cfg(), listener => {tcp, default}}), emqx_channel:init(
ConnInfo,
#{zone => default,
limiter => limiter_cfg(),
listener => {tcp, default}
}),
maps:merge(#{clientinfo => clientinfo(), maps:merge(#{clientinfo => clientinfo(),
session => session(), session => session(),
conn_state => connected conn_state => connected

View File

@ -25,7 +25,7 @@
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-define(BASE_CONF, <<""" -define(BASE_CONF, <<"""
emqx_limiter { limiter {
bytes_in { bytes_in {
global.rate = infinity global.rate = infinity
zone.default.rate = infinity zone.default.rate = infinity
@ -496,7 +496,7 @@ start_client(Name, EndTime, Counter, Number) ->
start_client(Name, EndTime, Counter) -> start_client(Name, EndTime, Counter) ->
#{per_client := PerClient} = #{per_client := PerClient} =
emqx_config:get([emqx_limiter, message_routing, bucket, Name]), emqx_config:get([limiter, message_routing, bucket, Name]),
#{rate := Rate} = PerClient, #{rate := Rate} = PerClient,
Client = #client{start = ?NOW, Client = #client{start = ?NOW,
endtime = EndTime, endtime = EndTime,
@ -578,18 +578,18 @@ to_rate(Str) ->
Rate. Rate.
with_global(Modifier, ZoneName, ZoneModifier, Buckets, Case) -> with_global(Modifier, ZoneName, ZoneModifier, Buckets, Case) ->
Path = [emqx_limiter, message_routing], Path = [limiter, message_routing],
#{global := Global} = Cfg = emqx_config:get(Path), #{global := Global} = Cfg = emqx_config:get(Path),
Cfg2 = Cfg#{global := Modifier(Global)}, Cfg2 = Cfg#{global := Modifier(Global)},
with_zone(Cfg2, ZoneName, ZoneModifier, Buckets, Case). with_zone(Cfg2, ZoneName, ZoneModifier, Buckets, Case).
with_zone(Name, Modifier, Buckets, Case) -> with_zone(Name, Modifier, Buckets, Case) ->
Path = [emqx_limiter, message_routing], Path = [limiter, message_routing],
Cfg = emqx_config:get(Path), Cfg = emqx_config:get(Path),
with_zone(Cfg, Name, Modifier, Buckets, Case). with_zone(Cfg, Name, Modifier, Buckets, Case).
with_zone(Cfg, Name, Modifier, Buckets, Case) -> with_zone(Cfg, Name, Modifier, Buckets, Case) ->
Path = [emqx_limiter, message_routing], Path = [limiter, message_routing],
#{zone := ZoneCfgs, #{zone := ZoneCfgs,
bucket := BucketCfgs} = Cfg, bucket := BucketCfgs} = Cfg,
ZoneCfgs2 = apply_modifier(Name, Modifier, ZoneCfgs), ZoneCfgs2 = apply_modifier(Name, Modifier, ZoneCfgs),
@ -598,11 +598,11 @@ with_zone(Cfg, Name, Modifier, Buckets, Case) ->
with_config(Path, fun(_) -> Cfg2 end, Case). with_config(Path, fun(_) -> Cfg2 end, Case).
with_bucket(Bucket, Modifier, Case) -> with_bucket(Bucket, Modifier, Case) ->
Path = [emqx_limiter, message_routing, bucket, Bucket], Path = [limiter, message_routing, bucket, Bucket],
with_config(Path, Modifier, Case). with_config(Path, Modifier, Case).
with_per_client(Bucket, Modifier, Case) -> with_per_client(Bucket, Modifier, Case) ->
Path = [emqx_limiter, message_routing, bucket, Bucket, per_client], Path = [limiter, message_routing, bucket, Bucket, per_client],
with_config(Path, Modifier, Case). with_config(Path, Modifier, Case).
with_config(Path, Modifier, Case) -> with_config(Path, Modifier, Case) ->

View File

@ -79,7 +79,7 @@ set_special_configs(emqx_dashboard) ->
port => 18083 port => 18083
}] }]
}, },
emqx_config:put([emqx_dashboard], Config), emqx_config:put([dashboard], Config),
ok; ok;
set_special_configs(_App) -> set_special_configs(_App) ->
ok. ok.

View File

@ -2,7 +2,7 @@
## EMQ X Dashboard ## EMQ X Dashboard
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
emqx_dashboard { dashboard {
default_username = "admin" default_username = "admin"
default_password = "public" default_password = "public"
## notice: sample_interval should be divisible by 60. ## notice: sample_interval should be divisible by 60.

View File

@ -90,7 +90,7 @@ listeners() ->
Name = listener_name(Protocol, Port), Name = listener_name(Protocol, Port),
RanchOptions = ranch_opts(maps:without([protocol], ListenerOptions)), RanchOptions = ranch_opts(maps:without([protocol], ListenerOptions)),
{Name, Protocol, Port, RanchOptions} {Name, Protocol, Port, RanchOptions}
end || ListenerOptions <- emqx_conf:get([emqx_dashboard, listeners], [])]. end || ListenerOptions <- emqx_conf:get([dashboard, listeners], [])].
ranch_opts(RanchOptions) -> ranch_opts(RanchOptions) ->
Keys = [ {ack_timeout, handshake_timeout} Keys = [ {ack_timeout, handshake_timeout}

View File

@ -229,7 +229,7 @@ add_default_user() ->
add_default_user(binenv(default_username), binenv(default_password)). add_default_user(binenv(default_username), binenv(default_password)).
binenv(Key) -> binenv(Key) ->
iolist_to_binary(emqx_conf:get([emqx_dashboard, Key], "")). iolist_to_binary(emqx_conf:get([dashboard, Key], "")).
add_default_user(Username, Password) when ?EMPTY_KEY(Username) orelse ?EMPTY_KEY(Password) -> add_default_user(Username, Password) when ?EMPTY_KEY(Username) orelse ?EMPTY_KEY(Password) ->
{ok, empty}; {ok, empty};

View File

@ -55,7 +55,7 @@ get_collect() -> gen_server:call(whereis(?MODULE), get_collect).
init([]) -> init([]) ->
timer(next_interval(), collect), timer(next_interval(), collect),
timer(get_today_remaining_seconds(), clear_expire_data), timer(get_today_remaining_seconds(), clear_expire_data),
ExpireInterval = emqx_conf:get([emqx_dashboard, monitor, interval], ?EXPIRE_INTERVAL), ExpireInterval = emqx_conf:get([dashboard, monitor, interval], ?EXPIRE_INTERVAL),
State = #{ State = #{
count => count(), count => count(),
expire_interval => ExpireInterval, expire_interval => ExpireInterval,
@ -75,7 +75,7 @@ next_interval() ->
(1000 * interval()) - (erlang:system_time(millisecond) rem (1000 * interval())) - 1. (1000 * interval()) - (erlang:system_time(millisecond) rem (1000 * interval())) - 1.
interval() -> interval() ->
emqx_conf:get([?APP, sample_interval], ?DEFAULT_INTERVAL). emqx_conf:get([dashboard, sample_interval], ?DEFAULT_INTERVAL).
count() -> count() ->
60 div interval(). 60 div interval().

View File

@ -21,7 +21,7 @@
-export([execute/2]). -export([execute/2]).
execute(Req, Env) -> execute(Req, Env) ->
CORS = emqx_conf:get([emqx_dashboard, cors], false), CORS = emqx_conf:get([dashboard, cors], false),
case CORS andalso cowboy_req:header(<<"origin">>, Req, undefined) of case CORS andalso cowboy_req:header(<<"origin">>, Req, undefined) of
false -> false ->
{ok, Req, Env}; {ok, Req, Env};

View File

@ -22,9 +22,9 @@
,namespace/0]). ,namespace/0]).
namespace() -> <<"dashboard">>. namespace() -> <<"dashboard">>.
roots() -> ["emqx_dashboard"]. roots() -> ["dashboard"].
fields("emqx_dashboard") -> fields("dashboard") ->
[ {listeners, hoconsc:array(hoconsc:union([hoconsc:ref(?MODULE, "http"), [ {listeners, hoconsc:array(hoconsc:union([hoconsc:ref(?MODULE, "http"),
hoconsc:ref(?MODULE, "https")]))} hoconsc:ref(?MODULE, "https")]))}
, {default_username, fun default_username/1} , {default_username, fun default_username/1}

View File

@ -163,7 +163,7 @@ jwt_expiration_time() ->
erlang:system_time(millisecond) + token_ttl(). erlang:system_time(millisecond) + token_ttl().
token_ttl() -> token_ttl() ->
emqx_conf:get([emqx_dashboard, token_expired_time], ?EXPTIME). emqx_conf:get([dashboard, token_expired_time], ?EXPTIME).
format(Token, Username, ExpTime) -> format(Token, Username, ExpTime) ->
#?ADMIN_JWT{ #?ADMIN_JWT{

View File

@ -94,7 +94,7 @@ set_special_configs(emqx_dashboard) ->
default_username => <<"admin">>, default_username => <<"admin">>,
default_password => <<"public">> default_password => <<"public">>
}, },
emqx_config:put([emqx_dashboard], Config), emqx_config:put([dashboard], Config),
ok; ok;
set_special_configs(_) -> set_special_configs(_) ->
ok. ok.

View File

@ -2,7 +2,7 @@
## EMQ X Hooks ## EMQ X Hooks
##==================================================================== ##====================================================================
emqx_exhook { exhook {
servers = [ servers = [
##{ ##{

View File

@ -77,7 +77,8 @@ schema("/exhooks/:name") ->
description => <<"Delete the server">>, description => <<"Delete the server">>,
parameters => params_server_name_in_path(), parameters => params_server_name_in_path(),
responses => #{204 => <<>>, responses => #{204 => <<>>,
500 => error_codes([?BAD_RPC], <<"Bad RPC">>) } 500 => error_codes([?BAD_RPC], <<"Bad RPC">>)
}
} }
}; };
@ -146,7 +147,7 @@ exhooks(get, _) ->
{200, ServerL2}; {200, ServerL2};
exhooks(post, #{body := Body}) -> exhooks(post, #{body := Body}) ->
case emqx_exhook_mgr:update_config([emqx_exhook, servers], {add, Body}) of case emqx_exhook_mgr:update_config([exhook, servers], {add, Body}) of
{ok, Result} -> {ok, Result} ->
{201, Result}; {201, Result};
{error, Error} -> {error, Error} ->
@ -168,7 +169,7 @@ action_with_name(get, #{bindings := #{name := Name}}) ->
end; end;
action_with_name(put, #{bindings := #{name := Name}, body := Body}) -> action_with_name(put, #{bindings := #{name := Name}, body := Body}) ->
case emqx_exhook_mgr:update_config([emqx_exhook, servers], case emqx_exhook_mgr:update_config([exhook, servers],
{update, Name, Body}) of {update, Name, Body}) of
{ok, not_found} -> {ok, not_found} ->
{400, #{code => <<"BAD_REQUEST">>, {400, #{code => <<"BAD_REQUEST">>,
@ -176,7 +177,8 @@ action_with_name(put, #{bindings := #{name := Name}, body := Body}) ->
}}; }};
{ok, {error, Reason}} -> {ok, {error, Reason}} ->
{400, #{code => <<"BAD_REQUEST">>, {400, #{code => <<"BAD_REQUEST">>,
message => unicode:characters_to_binary(io_lib:format("Error Reason:~p~n", [Reason])) message => unicode:characters_to_binary(
io_lib:format("Error Reason:~p~n", [Reason]))
}}; }};
{ok, _} -> {ok, _} ->
{200}; {200};
@ -187,7 +189,7 @@ action_with_name(put, #{bindings := #{name := Name}, body := Body}) ->
end; end;
action_with_name(delete, #{bindings := #{name := Name}}) -> action_with_name(delete, #{bindings := #{name := Name}}) ->
case emqx_exhook_mgr:update_config([emqx_exhook, servers], case emqx_exhook_mgr:update_config([exhook, servers],
{delete, Name}) of {delete, Name}) of
{ok, _} -> {ok, _} ->
{200}; {200};
@ -200,7 +202,7 @@ action_with_name(delete, #{bindings := #{name := Name}}) ->
move(post, #{bindings := #{name := Name}, body := Body}) -> move(post, #{bindings := #{name := Name}, body := Body}) ->
#{<<"position">> := PositionT, <<"related">> := Related} = Body, #{<<"position">> := PositionT, <<"related">> := Related} = Body,
Position = erlang:binary_to_atom(PositionT), Position = erlang:binary_to_atom(PositionT),
case emqx_exhook_mgr:update_config([emqx_exhook, servers], case emqx_exhook_mgr:update_config([exhook, servers],
{move, Name, Position, Related}) of {move, Name, Position, Related}) of
{ok, ok} -> {ok, ok} ->
{200}; {200};

View File

@ -108,10 +108,10 @@ lookup(Name) ->
call({lookup, Name}). call({lookup, Name}).
enable(Name) -> enable(Name) ->
update_config([emqx_exhook, servers], {enable, Name, true}). update_config([exhook, servers], {enable, Name, true}).
disable(Name) -> disable(Name) ->
update_config([emqx_exhook, servers], {enable, Name, false}). update_config([exhook, servers], {enable, Name, false}).
server_status(Name) -> server_status(Name) ->
call({server_status, Name}). call({server_status, Name}).
@ -176,8 +176,8 @@ post_config_update(_KeyPath, UpdateReq, NewConf, _OldConf, _AppEnvs) ->
init([]) -> init([]) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
emqx_conf:add_handler([emqx_exhook, servers], ?MODULE), emqx_conf:add_handler([exhook, servers], ?MODULE),
ServerL = emqx:get_config([emqx_exhook, servers]), ServerL = emqx:get_config([exhook, servers]),
{Waiting, Running, Stopped} = load_all_servers(ServerL), {Waiting, Running, Stopped} = load_all_servers(ServerL),
Orders = reorder(ServerL), Orders = reorder(ServerL),
{ok, ensure_reload_timer( {ok, ensure_reload_timer(

View File

@ -34,11 +34,11 @@
-export([namespace/0, roots/0, fields/1, server_config/0]). -export([namespace/0, roots/0, fields/1, server_config/0]).
namespace() -> emqx_exhook. namespace() -> exhook.
roots() -> [emqx_exhook]. roots() -> [exhook].
fields(emqx_exhook) -> fields(exhook) ->
[{servers, [{servers,
sc(hoconsc:array(ref(server)), sc(hoconsc:array(ref(server)),
#{default => []})} #{default => []})}

View File

@ -24,9 +24,9 @@
-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). -define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
-define(CONF_DEFAULT, <<" -define(CONF_DEFAULT, <<"
emqx_exhook exhook {
{servers = [ servers = [
{name = default, { name = default,
url = \"http://127.0.0.1:9000\" url = \"http://127.0.0.1:9000\"
}] }]
} }

View File

@ -27,12 +27,13 @@
-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). -define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
-define(CONF_DEFAULT, <<" -define(CONF_DEFAULT, <<"
emqx_exhook {servers = [ exhook {
{name = default, servers =
[ { name = default,
url = \"http://127.0.0.1:9000\" url = \"http://127.0.0.1:9000\"
} }
] ]
} }
">>). ">>).
all() -> all() ->
@ -49,7 +50,7 @@ init_per_suite(Config) ->
_ = emqx_exhook_demo_svr:start(), _ = emqx_exhook_demo_svr:start(),
ok = emqx_config:init_load(emqx_exhook_schema, ?CONF_DEFAULT), ok = emqx_config:init_load(emqx_exhook_schema, ?CONF_DEFAULT),
emqx_mgmt_api_test_util:init_suite([emqx_exhook]), emqx_mgmt_api_test_util:init_suite([emqx_exhook]),
[Conf] = emqx:get_config([emqx_exhook, servers]), [Conf] = emqx:get_config([exhook, servers]),
[{template, Conf} | Config]. [{template, Conf} | Config].
end_per_suite(Config) -> end_per_suite(Config) ->

View File

@ -31,11 +31,12 @@
]). ]).
-define(CONF_DEFAULT, <<" -define(CONF_DEFAULT, <<"
emqx_exhook exhook {
{servers = [ servers =
{name = default, [ { name = default,
url = \"http://127.0.0.1:9000\" url = \"http://127.0.0.1:9000\"
}] }
]
} }
">>). ">>).

View File

@ -45,7 +45,7 @@ set_special_configs(emqx_dashboard) ->
port => 18083 port => 18083
}] }]
}, },
emqx_config:put([emqx_dashboard], Config), emqx_config:put([dashboard], Config),
ok; ok;
set_special_configs(_App) -> set_special_configs(_App) ->
ok. ok.

View File

@ -5,7 +5,7 @@
## Where to store the retained messages. ## Where to store the retained messages.
## ##
## Notice that all nodes in the same cluster have to be configured to ## Notice that all nodes in the same cluster have to be configured to
emqx_retainer { retainer {
## enable/disable emqx_retainer ## enable/disable emqx_retainer
enable = true enable = true

View File

@ -139,7 +139,7 @@ deliver(Result, #{context_id := Id} = Context, Pid, Topic, Cursor) ->
false -> false ->
ok; ok;
_ -> _ ->
#{msg_deliver_quota := MaxDeliverNum} = emqx:get_config([?APP, flow_control]), #{msg_deliver_quota := MaxDeliverNum} = emqx:get_config([retainer, flow_control]),
case MaxDeliverNum of case MaxDeliverNum of
0 -> 0 ->
_ = [Pid ! {deliver, Topic, Msg} || Msg <- Result], _ = [Pid ! {deliver, Topic, Msg} || Msg <- Result],
@ -160,18 +160,18 @@ get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' :
timestamp = Ts}) -> timestamp = Ts}) ->
Ts + Interval * 1000; Ts + Interval * 1000;
get_expiry_time(#message{timestamp = Ts}) -> get_expiry_time(#message{timestamp = Ts}) ->
Interval = emqx_conf:get([?APP, msg_expiry_interval], ?DEF_EXPIRY_INTERVAL), Interval = emqx_conf:get([retainer, msg_expiry_interval], ?DEF_EXPIRY_INTERVAL),
case Interval of case Interval of
0 -> 0; 0 -> 0;
_ -> Ts + Interval _ -> Ts + Interval
end. end.
get_stop_publish_clear_msg() -> get_stop_publish_clear_msg() ->
emqx_conf:get([?APP, stop_publish_clear_msg], false). emqx_conf:get([retainer, stop_publish_clear_msg], false).
-spec update_config(hocon:config()) -> {ok, _} | {error, _}. -spec update_config(hocon:config()) -> {ok, _} | {error, _}.
update_config(Conf) -> update_config(Conf) ->
emqx_conf:update([emqx_retainer], Conf, #{override_to => cluster}). emqx_conf:update([retainer], Conf, #{override_to => cluster}).
clean() -> clean() ->
call(?FUNCTION_NAME). call(?FUNCTION_NAME).
@ -196,10 +196,10 @@ stats_fun() ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([]) -> init([]) ->
emqx_conf:add_handler([emqx_retainer], ?MODULE), emqx_conf:add_handler([retainer], ?MODULE),
init_shared_context(), init_shared_context(),
State = new_state(), State = new_state(),
#{enable := Enable} = Cfg = emqx:get_config([?APP]), #{enable := Enable} = Cfg = emqx:get_config([retainer]),
{ok, {ok,
case Enable of case Enable of
true -> true ->
@ -245,7 +245,7 @@ handle_cast(Msg, State) ->
handle_info(clear_expired, #{context := Context} = State) -> handle_info(clear_expired, #{context := Context} = State) ->
Mod = get_backend_module(), Mod = get_backend_module(),
Mod:clear_expired(Context), Mod:clear_expired(Context),
Interval = emqx_conf:get([?APP, msg_clear_interval], ?DEF_EXPIRY_INTERVAL), Interval = emqx_conf:get([retainer, msg_clear_interval], ?DEF_EXPIRY_INTERVAL),
{noreply, State#{clear_timer := add_timer(Interval, clear_expired)}, hibernate}; {noreply, State#{clear_timer := add_timer(Interval, clear_expired)}, hibernate};
handle_info(release_deliver_quota, #{context := Context, wait_quotas := Waits} = State) -> handle_info(release_deliver_quota, #{context := Context, wait_quotas := Waits} = State) ->
@ -261,7 +261,7 @@ handle_info(release_deliver_quota, #{context := Context, wait_quotas := Waits} =
end, end,
Waits2) Waits2)
end, end,
Interval = emqx:get_config([?APP, flow_control, quota_release_interval]), Interval = emqx:get_config([retainer, flow_control, quota_release_interval]),
{noreply, State#{release_quota_timer := add_timer(Interval, release_deliver_quota), {noreply, State#{release_quota_timer := add_timer(Interval, release_deliver_quota),
wait_quotas := []}}; wait_quotas := []}};
@ -294,7 +294,7 @@ new_context(Id) ->
#{context_id => Id}. #{context_id => Id}.
is_too_big(Size) -> is_too_big(Size) ->
Limit = emqx_conf:get([?APP, max_payload_size], ?DEF_MAX_PAYLOAD_SIZE), Limit = emqx_conf:get([retainer, max_payload_size], ?DEF_MAX_PAYLOAD_SIZE),
Limit > 0 andalso (Size > Limit). Limit > 0 andalso (Size > Limit).
%% @private %% @private
@ -368,7 +368,7 @@ insert_shared_context(Key, Term) ->
-spec get_msg_deliver_quota() -> non_neg_integer(). -spec get_msg_deliver_quota() -> non_neg_integer().
get_msg_deliver_quota() -> get_msg_deliver_quota() ->
emqx:get_config([?APP, flow_control, msg_deliver_quota]). emqx:get_config([retainer, flow_control, msg_deliver_quota]).
-spec update_config(state(), hocons:config(), hocons:config()) -> state(). -spec update_config(state(), hocons:config(), hocons:config()) -> state().
update_config(State, Conf, OldConf) -> update_config(State, Conf, OldConf) ->
@ -461,11 +461,9 @@ check_timer(Timer, _, _) ->
-spec get_backend_module() -> backend(). -spec get_backend_module() -> backend().
get_backend_module() -> get_backend_module() ->
#{type := Backend} = emqx:get_config([?APP, config]), ModName = case emqx:get_config([retainer, config]) of
ModName = if Backend =:= built_in_database -> #{type := built_in_database} -> mnesia;
mnesia; #{type := Backend} -> Backend
true ->
Backend
end, end,
erlang:list_to_existing_atom(io_lib:format("~ts_~ts", [?APP, ModName])). erlang:list_to_existing_atom(io_lib:format("~ts_~ts", [?APP, ModName])).

View File

@ -41,7 +41,7 @@ api_spec() ->
{[lookup_retained_api(), with_topic_api(), config_api()], []}. {[lookup_retained_api(), with_topic_api(), config_api()], []}.
conf_schema() -> conf_schema() ->
gen_schema(emqx:get_raw_config([emqx_retainer])). gen_schema(emqx:get_raw_config([retainer])).
message_props() -> message_props() ->
properties([ properties([
@ -126,12 +126,12 @@ with_topic_warp(Type, Params) ->
check_backend(Type, Params, fun with_topic/2). check_backend(Type, Params, fun with_topic/2).
config(get, _) -> config(get, _) ->
{200, emqx:get_raw_config([emqx_retainer])}; {200, emqx:get_raw_config([retainer])};
config(put, #{body := Body}) -> config(put, #{body := Body}) ->
try try
{ok, _} = emqx_retainer:update_config(Body), {ok, _} = emqx_retainer:update_config(Body),
{200, emqx:get_raw_config([emqx_retainer])} {200, emqx:get_raw_config([retainer])}
catch _:Reason:_ -> catch _:Reason:_ ->
{400, {400,
#{code => 'UPDATE_FAILED', #{code => 'UPDATE_FAILED',
@ -188,7 +188,7 @@ to_bin_string(Data) ->
list_to_binary(io_lib:format("~p", [Data])). list_to_binary(io_lib:format("~p", [Data])).
check_backend(Type, Params, Cont) -> check_backend(Type, Params, Cont) ->
case emqx:get_config([emqx_retainer, config, type]) of case emqx:get_config([retainer, config, type]) of
built_in_database -> built_in_database ->
Cont(Type, Params); Cont(Type, Params);
_ -> _ ->

View File

@ -95,7 +95,11 @@ store_retained(_, Msg =#message{topic = Topic}) ->
end, end,
case mria:transaction(?RETAINER_SHARD, Fun) of case mria:transaction(?RETAINER_SHARD, Fun) of
{atomic, ok} -> ok; {atomic, ok} -> ok;
{aborted, Reason} -> ?SLOG(error, #{msg => "failed_to_retain_message", topic => Topic, reason => Reason}) {aborted, Reason} ->
?SLOG(error, #{ msg => "failed_to_retain_message"
, topic => Topic
, reason => Reason
})
end end
end. end.
@ -140,7 +144,7 @@ page_read(_, Topic, Page, Limit) ->
{ok, Rows}. {ok, Rows}.
match_messages(_, Topic, Cursor) -> match_messages(_, Topic, Cursor) ->
MaxReadNum = emqx:get_config([?APP, flow_control, max_read_number]), MaxReadNum = emqx:get_config([retainer, flow_control, max_read_number]),
case Cursor of case Cursor of
undefined -> undefined ->
case MaxReadNum of case MaxReadNum of
@ -249,7 +253,7 @@ make_cursor(Topic) ->
-spec is_table_full() -> boolean(). -spec is_table_full() -> boolean().
is_table_full() -> is_table_full() ->
#{max_retained_messages := Limit} = emqx:get_config([?APP, config]), #{max_retained_messages := Limit} = emqx:get_config([retainer, config]),
Limit > 0 andalso (table_size() >= Limit). Limit > 0 andalso (table_size() >= Limit).
-spec table_size() -> non_neg_integer(). -spec table_size() -> non_neg_integer().

View File

@ -2,13 +2,15 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-export([roots/0, fields/1]). -export([roots/0, fields/1, namespace/0]).
-define(TYPE(Type), hoconsc:mk(Type)). -define(TYPE(Type), hoconsc:mk(Type)).
roots() -> ["emqx_retainer"]. namespace() -> "retainer".
fields("emqx_retainer") -> roots() -> ["retainer"].
fields("retainer") ->
[ {enable, sc(boolean(), false)} [ {enable, sc(boolean(), false)}
, {msg_expiry_interval, sc(emqx_schema:duration_ms(), "0s")} , {msg_expiry_interval, sc(emqx_schema:duration_ms(), "0s")}
, {msg_clear_interval, sc(emqx_schema:duration_ms(), "0s")} , {msg_clear_interval, sc(emqx_schema:duration_ms(), "0s")}

View File

@ -28,7 +28,7 @@
all() -> emqx_common_test_helpers:all(?MODULE). all() -> emqx_common_test_helpers:all(?MODULE).
-define(BASE_CONF, <<""" -define(BASE_CONF, <<"""
emqx_retainer { retainer {
enable = true enable = true
msg_clear_interval = 0s msg_clear_interval = 0s
msg_expiry_interval = 0s msg_expiry_interval = 0s
@ -90,7 +90,10 @@ t_store_and_clean(_) ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1), {ok, _} = emqtt:connect(C1),
emqtt:publish(C1, <<"retained">>, <<"this is a retained message">>, [{qos, 0}, {retain, true}]), emqtt:publish(
C1, <<"retained">>,
<<"this is a retained message">>,
[{qos, 0}, {retain, true}]),
timer:sleep(100), timer:sleep(100),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
?assertEqual(1, length(receive_messages(1))), ?assertEqual(1, length(receive_messages(1))),
@ -118,7 +121,10 @@ t_retain_handling(_) ->
?assertEqual(0, length(receive_messages(1))), ?assertEqual(0, length(receive_messages(1))),
{ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/#">>), {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/#">>),
emqtt:publish(C1, <<"retained">>, <<"this is a retained message">>, [{qos, 0}, {retain, true}]), emqtt:publish(
C1, <<"retained">>,
<<"this is a retained message">>,
[{qos, 0}, {retain, true}]),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]),
?assertEqual(1, length(receive_messages(1))), ?assertEqual(1, length(receive_messages(1))),
@ -148,9 +154,18 @@ t_retain_handling(_) ->
t_wildcard_subscription(_) -> t_wildcard_subscription(_) ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1), {ok, _} = emqtt:connect(C1),
emqtt:publish(C1, <<"retained/0">>, <<"this is a retained message 0">>, [{qos, 0}, {retain, true}]), emqtt:publish(
emqtt:publish(C1, <<"retained/1">>, <<"this is a retained message 1">>, [{qos, 0}, {retain, true}]), C1, <<"retained/0">>,
emqtt:publish(C1, <<"retained/a/b/c">>, <<"this is a retained message 2">>, [{qos, 0}, {retain, true}]), <<"this is a retained message 0">>,
[{qos, 0}, {retain, true}]),
emqtt:publish(
C1, <<"retained/1">>,
<<"this is a retained message 1">>,
[{qos, 0}, {retain, true}]),
emqtt:publish(
C1, <<"retained/a/b/c">>,
<<"this is a retained message 2">>,
[{qos, 0}, {retain, true}]),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+/b/#">>, 0), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+/b/#">>, 0),
@ -165,11 +180,26 @@ t_message_expiry(_) ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1), {ok, _} = emqtt:connect(C1),
emqtt:publish(C1, <<"retained/0">>, #{'Message-Expiry-Interval' => 0}, <<"don't expire">>, [{qos, 0}, {retain, true}]), emqtt:publish(
emqtt:publish(C1, <<"retained/1">>, #{'Message-Expiry-Interval' => 2}, <<"expire">>, [{qos, 0}, {retain, true}]), C1, <<"retained/0">>, #{'Message-Expiry-Interval' => 0},
emqtt:publish(C1, <<"retained/2">>, #{'Message-Expiry-Interval' => 5}, <<"don't expire">>, [{qos, 0}, {retain, true}]), <<"don't expire">>,
emqtt:publish(C1, <<"retained/3">>, <<"don't expire">>, [{qos, 0}, {retain, true}]), [{qos, 0}, {retain, true}]),
emqtt:publish(C1, <<"$SYS/retained/4">>, <<"don't expire">>, [{qos, 0}, {retain, true}]), emqtt:publish(
C1, <<"retained/1">>, #{'Message-Expiry-Interval' => 2},
<<"expire">>,
[{qos, 0}, {retain, true}]),
emqtt:publish(
C1, <<"retained/2">>, #{'Message-Expiry-Interval' => 5},
<<"don't expire">>,
[{qos, 0}, {retain, true}]),
emqtt:publish(
C1, <<"retained/3">>,
<<"don't expire">>,
[{qos, 0}, {retain, true}]),
emqtt:publish(
C1, <<"$SYS/retained/4">>,
<<"don't expire">>,
[{qos, 0}, {retain, true}]),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0),
@ -210,9 +240,18 @@ t_message_expiry_2(_) ->
t_clean(_) -> t_clean(_) ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1), {ok, _} = emqtt:connect(C1),
emqtt:publish(C1, <<"retained/0">>, <<"this is a retained message 0">>, [{qos, 0}, {retain, true}]), emqtt:publish(
emqtt:publish(C1, <<"retained/1">>, <<"this is a retained message 1">>, [{qos, 0}, {retain, true}]), C1, <<"retained/0">>,
emqtt:publish(C1, <<"retained/test/0">>, <<"this is a retained message 2">>, [{qos, 0}, {retain, true}]), <<"this is a retained message 0">>,
[{qos, 0}, {retain, true}]),
emqtt:publish(
C1, <<"retained/1">>,
<<"this is a retained message 1">>,
[{qos, 0}, {retain, true}]),
emqtt:publish(
C1, <<"retained/test/0">>,
<<"this is a retained message 2">>,
[{qos, 0}, {retain, true}]),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]),
?assertEqual(3, length(receive_messages(3))), ?assertEqual(3, length(receive_messages(3))),
@ -227,7 +266,11 @@ t_stop_publish_clear_msg(_) ->
emqx_retainer:update_config(#{<<"stop_publish_clear_msg">> => true}), emqx_retainer:update_config(#{<<"stop_publish_clear_msg">> => true}),
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1), {ok, _} = emqtt:connect(C1),
emqtt:publish(C1, <<"retained/0">>, <<"this is a retained message 0">>, [{qos, 0}, {retain, true}]), emqtt:publish(
C1, <<"retained/0">>,
<<"this is a retained message 0">>,
[{qos, 0}, {retain, true}]
),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]),
?assertEqual(1, length(receive_messages(1))), ?assertEqual(1, length(receive_messages(1))),
@ -239,14 +282,27 @@ t_stop_publish_clear_msg(_) ->
ok = emqtt:disconnect(C1). ok = emqtt:disconnect(C1).
t_flow_control(_) -> t_flow_control(_) ->
emqx_retainer:update_config(#{<<"flow_control">> => #{<<"max_read_number">> => 1, emqx_retainer:update_config(#{<<"flow_control">> =>
#{<<"max_read_number">> => 1,
<<"msg_deliver_quota">> => 1, <<"msg_deliver_quota">> => 1,
<<"quota_release_interval">> => <<"1s">>}}), <<"quota_release_interval">> => <<"1s">>}}),
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1), {ok, _} = emqtt:connect(C1),
emqtt:publish(C1, <<"retained/0">>, <<"this is a retained message 0">>, [{qos, 0}, {retain, true}]), emqtt:publish(
emqtt:publish(C1, <<"retained/1">>, <<"this is a retained message 1">>, [{qos, 0}, {retain, true}]), C1, <<"retained/0">>,
emqtt:publish(C1, <<"retained/3">>, <<"this is a retained message 3">>, [{qos, 0}, {retain, true}]), <<"this is a retained message 0">>,
[{qos, 0}, {retain, true}]
),
emqtt:publish(
C1, <<"retained/1">>,
<<"this is a retained message 1">>,
[{qos, 0}, {retain, true}]
),
emqtt:publish(
C1, <<"retained/3">>,
<<"this is a retained message 3">>,
[{qos, 0}, {retain, true}]
),
Begin = erlang:system_time(millisecond), Begin = erlang:system_time(millisecond),
{ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]),
?assertEqual(3, length(receive_messages(3))), ?assertEqual(3, length(receive_messages(3))),

View File

@ -22,7 +22,7 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-define(BASE_CONF, <<""" -define(BASE_CONF, <<"""
emqx_retainer { retainer {
enable = true enable = true
msg_clear_interval = 0s msg_clear_interval = 0s
msg_expiry_interval = 0s msg_expiry_interval = 0s
@ -94,13 +94,23 @@ t_publish_retain_message(_) ->
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
{ok, _} = emqtt:connect(Client1), {ok, _} = emqtt:connect(Client1),
{ok, _} = emqtt:publish(Client1, Topic, #{}, <<"retained message">>, [{qos, 2}, {retain, true}]), {ok, _} = emqtt:publish(
{ok, _} = emqtt:publish(Client1, Topic, #{}, <<"new retained message">>, [{qos, 2}, {retain, true}]), Client1, Topic, #{},
{ok, _} = emqtt:publish(Client1, Topic, #{}, <<"not retained message">>, [{qos, 2}, {retain, false}]), <<"retained message">>,
[{qos, 2}, {retain, true}]),
{ok, _} = emqtt:publish(
Client1, Topic, #{},
<<"new retained message">>,
[{qos, 2}, {retain, true}]),
{ok, _} = emqtt:publish(
Client1, Topic, #{},
<<"not retained message">>,
[{qos, 2}, {retain, false}]),
{ok, _, [2]} = emqtt:subscribe(Client1, Topic, 2), {ok, _, [2]} = emqtt:subscribe(Client1, Topic, 2),
[Msg] = receive_messages(1), [Msg] = receive_messages(1),
?assertEqual(<<"new retained message">>, maps:get(payload, Msg)), %% [MQTT-3.3.1-5] [MQTT-3.3.1-8] %% [MQTT-3.3.1-5] [MQTT-3.3.1-8]
?assertEqual(<<"new retained message">>, maps:get(payload, Msg)),
{ok, _, [0]} = emqtt:unsubscribe(Client1, Topic), {ok, _, [0]} = emqtt:unsubscribe(Client1, Topic),
{ok, _} = emqtt:publish(Client1, Topic, #{}, <<"">>, [{qos, 2}, {retain, true}]), {ok, _} = emqtt:publish(Client1, Topic, #{}, <<"">>, [{qos, 2}, {retain, true}]),
@ -113,16 +123,33 @@ t_publish_retain_message(_) ->
t_publish_message_expiry_interval(_) -> t_publish_message_expiry_interval(_) ->
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
{ok, _} = emqtt:connect(Client1), {ok, _} = emqtt:connect(Client1),
{ok, _} = emqtt:publish(Client1, <<"topic/A">>, #{'Message-Expiry-Interval' => 1}, <<"retained message">>, [{qos, 1}, {retain, true}]), {ok, _} = emqtt:publish(
{ok, _} = emqtt:publish(Client1, <<"topic/B">>, #{'Message-Expiry-Interval' => 1}, <<"retained message">>, [{qos, 2}, {retain, true}]), Client1, <<"topic/A">>, #{'Message-Expiry-Interval' => 1},
{ok, _} = emqtt:publish(Client1, <<"topic/C">>, #{'Message-Expiry-Interval' => 10}, <<"retained message">>, [{qos, 1}, {retain, true}]), <<"retained message">>,
{ok, _} = emqtt:publish(Client1, <<"topic/D">>, #{'Message-Expiry-Interval' => 10}, <<"retained message">>, [{qos, 2}, {retain, true}]), [{qos, 1}, {retain, true}]),
{ok, _} = emqtt:publish(
Client1, <<"topic/B">>, #{'Message-Expiry-Interval' => 1},
<<"retained message">>,
[{qos, 2}, {retain, true}]),
{ok, _} = emqtt:publish(
Client1, <<"topic/C">>, #{'Message-Expiry-Interval' => 10},
<<"retained message">>,
[{qos, 1}, {retain, true}]),
{ok, _} = emqtt:publish(
Client1, <<"topic/D">>, #{'Message-Expiry-Interval' => 10},
<<"retained message">>,
[{qos, 2}, {retain, true}]),
timer:sleep(1500), timer:sleep(1500),
{ok, _, [2]} = emqtt:subscribe(Client1, <<"topic/+">>, 2), {ok, _, [2]} = emqtt:subscribe(Client1, <<"topic/+">>, 2),
Msgs = receive_messages(4), Msgs = receive_messages(4),
?assertEqual(2, length(Msgs)), %% [MQTT-3.3.2-5] ?assertEqual(2, length(Msgs)), %% [MQTT-3.3.2-5]
L = lists:map(fun(Msg) -> MessageExpiryInterval = maps:get('Message-Expiry-Interval', maps:get(properties, Msg)), MessageExpiryInterval < 10 end, Msgs), L = lists:map(
fun(Msg) ->
MessageExpiryInterval = maps:get('Message-Expiry-Interval',
maps:get(properties, Msg)),
MessageExpiryInterval < 10
end, Msgs),
?assertEqual(2, length(L)), %% [MQTT-3.3.2-6] ?assertEqual(2, length(L)), %% [MQTT-3.3.2-6]
ok = emqtt:disconnect(Client1), ok = emqtt:disconnect(Client1),
@ -137,9 +164,21 @@ t_publish_message_expiry_interval(_) ->
t_subscribe_retain_handing(_) -> t_subscribe_retain_handing(_) ->
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
{ok, _} = emqtt:connect(Client1), {ok, _} = emqtt:connect(Client1),
ok = emqtt:publish(Client1, <<"topic/A">>, #{}, <<"retained message">>, [{qos, 0}, {retain, true}]), ok = emqtt:publish(
{ok, _} = emqtt:publish(Client1, <<"topic/B">>, #{}, <<"retained message">>, [{qos, 1}, {retain, true}]), Client1, <<"topic/A">>, #{},
{ok, _} = emqtt:publish(Client1, <<"topic/C">>, #{}, <<"retained message">>, [{qos, 2}, {retain, true}]), <<"retained message">>,
[{qos, 0}, {retain, true}]
),
{ok, _} = emqtt:publish(
Client1, <<"topic/B">>, #{},
<<"retained message">>,
[{qos, 1}, {retain, true}]
),
{ok, _} = emqtt:publish(
Client1, <<"topic/C">>, #{},
<<"retained message">>,
[{qos, 2}, {retain, true}]
),
{ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 1}, {qos, 2}]}]), {ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 1}, {qos, 2}]}]),
?assertEqual(3, length(receive_messages(3))), %% [MQTT-3.3.1-10] ?assertEqual(3, length(receive_messages(3))), %% [MQTT-3.3.1-10]

View File

@ -2,7 +2,7 @@
## EMQ X Slow Subscribers Statistics ## EMQ X Slow Subscribers Statistics
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
emqx_slow_subs { slow_subs {
enable = false enable = false
threshold = 500ms threshold = 500ms

View File

@ -104,7 +104,8 @@ on_stats_update(#{clientid := ClientId,
case ets:lookup(?TOPK_TAB, LastIndex) of case ets:lookup(?TOPK_TAB, LastIndex) of
[#top_k{index = Index}] -> [#top_k{index = Index}] ->
%% if last value == the new value, update the type and last_update_time %% if last value == the new value, update the type and last_update_time
%% XXX for clients whose latency are stable for a long time, is it possible to reduce updates? %% XXX for clients whose latency are stable for a long time, is it
%% possible to reduce updates?
ets:insert(?TOPK_TAB, ets:insert(?TOPK_TAB,
#top_k{index = Index, type = Type, last_update_time = Ts}); #top_k{index = Index, type = Type, last_update_time = Ts});
[_] -> [_] ->
@ -124,7 +125,7 @@ clear_history() ->
gen_server:call(?MODULE, ?FUNCTION_NAME, ?DEF_CALL_TIMEOUT). gen_server:call(?MODULE, ?FUNCTION_NAME, ?DEF_CALL_TIMEOUT).
update_settings(Conf) -> update_settings(Conf) ->
emqx_conf:update([emqx_slow_subs], Conf, #{override_to => cluster}). emqx_conf:update([slow_subs], Conf, #{override_to => cluster}).
init_topk_tab() -> init_topk_tab() ->
case ets:whereis(?TOPK_TAB) of case ets:whereis(?TOPK_TAB) of
@ -146,7 +147,7 @@ post_config_update(_KeyPath, _UpdateReq, NewConf, _OldConf, _AppEnvs) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([]) -> init([]) ->
emqx_conf:add_handler([emqx_slow_subs], ?MODULE), emqx_conf:add_handler([slow_subs], ?MODULE),
InitState = #{enable => false, InitState = #{enable => false,
last_tick_at => 0, last_tick_at => 0,
@ -154,11 +155,11 @@ init([]) ->
notice_timer => undefined notice_timer => undefined
}, },
Enable = emqx:get_config([emqx_slow_subs, enable]), Enable = emqx:get_config([slow_subs, enable]),
{ok, check_enable(Enable, InitState)}. {ok, check_enable(Enable, InitState)}.
handle_call({update_settings, #{enable := Enable} = Conf}, _From, State) -> handle_call({update_settings, #{enable := Enable} = Conf}, _From, State) ->
emqx_config:put([emqx_slow_subs], Conf), emqx_config:put([slow_subs], Conf),
State2 = check_enable(Enable, State), State2 = check_enable(Enable, State),
{reply, ok, State2}; {reply, ok, State2};
@ -204,7 +205,7 @@ expire_tick() ->
erlang:send_after(?EXPIRE_CHECK_INTERVAL, self(), ?FUNCTION_NAME). erlang:send_after(?EXPIRE_CHECK_INTERVAL, self(), ?FUNCTION_NAME).
notice_tick() -> notice_tick() ->
case emqx:get_config([emqx_slow_subs, notice_interval]) of case emqx:get_config([slow_subs, notice_interval]) of
0 -> undefined; 0 -> undefined;
Interval -> Interval ->
erlang:send_after(Interval, self(), ?FUNCTION_NAME) erlang:send_after(Interval, self(), ?FUNCTION_NAME)
@ -225,7 +226,7 @@ do_publish([], _, _) ->
ok; ok;
do_publish(Logs, Rank, TickTime) -> do_publish(Logs, Rank, TickTime) ->
BatchSize = emqx:get_config([emqx_slow_subs, notice_batch_size]), BatchSize = emqx:get_config([slow_subs, notice_batch_size]),
do_publish(Logs, BatchSize, Rank, TickTime, []). do_publish(Logs, BatchSize, Rank, TickTime, []).
do_publish([Log | T], Size, Rank, TickTime, Cache) when Size > 0 -> do_publish([Log | T], Size, Rank, TickTime, Cache) when Size > 0 ->
@ -254,7 +255,7 @@ publish(TickTime, Notices) ->
logs => lists:reverse(Notices)}, logs => lists:reverse(Notices)},
Payload = emqx_json:encode(WindowLog), Payload = emqx_json:encode(WindowLog),
Msg = #message{ id = emqx_guid:gen() Msg = #message{ id = emqx_guid:gen()
, qos = emqx:get_config([emqx_slow_subs, notice_qos]) , qos = emqx:get_config([slow_subs, notice_qos])
, from = ?MODULE , from = ?MODULE
, topic = emqx_topic:systop(?NOTICE_TOPIC_NAME) , topic = emqx_topic:systop(?NOTICE_TOPIC_NAME)
, payload = Payload , payload = Payload
@ -264,7 +265,7 @@ publish(TickTime, Notices) ->
ok. ok.
load(State) -> load(State) ->
MaxSizeT = emqx:get_config([emqx_slow_subs, top_k_num]), MaxSizeT = emqx:get_config([slow_subs, top_k_num]),
MaxSize = erlang:min(MaxSizeT, ?MAX_TAB_SIZE), MaxSize = erlang:min(MaxSizeT, ?MAX_TAB_SIZE),
_ = emqx:hook('message.slow_subs_stats', _ = emqx:hook('message.slow_subs_stats',
{?MODULE, on_stats_update, [#{max_size => MaxSize}]} {?MODULE, on_stats_update, [#{max_size => MaxSize}]}
@ -283,7 +284,7 @@ unload(#{notice_timer := NoticeTimer, expire_timer := ExpireTimer} = State) ->
do_clear(Logs) -> do_clear(Logs) ->
Now = ?NOW, Now = ?NOW,
Interval = emqx:get_config([emqx_slow_subs, expire_interval]), Interval = emqx:get_config([slow_subs, expire_interval]),
Each = fun(#top_k{index = Index, last_update_time = Ts}) -> Each = fun(#top_k{index = Index, last_update_time = Ts}) ->
case Now - Ts >= Interval of case Now - Ts >= Interval of
true -> true ->
@ -330,7 +331,7 @@ check_enable(Enable, #{enable := IsEnable} = State) ->
end. end.
update_threshold() -> update_threshold() ->
Threshold = emqx:get_config([emqx_slow_subs, threshold]), Threshold = emqx:get_config([slow_subs, threshold]),
emqx_message_latency_stats:update_threshold(Threshold), emqx_message_latency_stats:update_threshold(Threshold),
ok. ok.

View File

@ -72,15 +72,20 @@ schema("/slow_subscriptions/settings") ->
}. }.
fields(record) -> fields(record) ->
[ [ {clientid,
{clientid, mk(string(), #{desc => <<"the clientid">>})}, mk(string(), #{desc => <<"the clientid">>})},
{latency, mk(integer(), #{desc => <<"average time for message delivery or time for message expire">>})}, {latency,
{type, mk(string(), #{desc => <<"type of the latency, could be average or expire">>})}, mk(integer(),
{last_update_time, mk(integer(), #{desc => <<"the timestamp of last update">>})} #{desc => <<"average time for message delivery or time for message expire">>})},
{type,
mk(string(),
#{desc => <<"type of the latency, could be average or expire">>})},
{last_update_time,
mk(integer(), #{desc => <<"the timestamp of last update">>})}
]. ].
conf_schema() -> conf_schema() ->
Ref = hoconsc:ref(emqx_slow_subs_schema, "emqx_slow_subs"), Ref = hoconsc:ref(emqx_slow_subs_schema, "slow_subs"),
hoconsc:mk(Ref, #{}). hoconsc:mk(Ref, #{}).
slow_subs(delete, _) -> slow_subs(delete, _) ->
@ -104,8 +109,8 @@ encode_record(#top_k{index = ?INDEX(Latency, ClientId),
last_update_time => Ts}. last_update_time => Ts}.
settings(get, _) -> settings(get, _) ->
{200, emqx:get_raw_config([?APP_NAME], #{})}; {200, emqx:get_raw_config([slow_subs], #{})};
settings(put, #{body := Body}) -> settings(put, #{body := Body}) ->
_ = emqx_slow_subs:update_settings(Body), _ = emqx_slow_subs:update_settings(Body),
{200, emqx:get_raw_config([?APP_NAME], #{})}. {200, emqx:get_raw_config([slow_subs], #{})}.

View File

@ -2,11 +2,13 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-export([roots/0, fields/1]). -export([roots/0, fields/1, namespace/0]).
roots() -> ["emqx_slow_subs"]. namespace() -> "slow_subs".
fields("emqx_slow_subs") -> roots() -> ["slow_subs"].
fields("slow_subs") ->
[ {enable, sc(boolean(), false, "switch of this function")} [ {enable, sc(boolean(), false, "switch of this function")}
, {threshold, , {threshold,
sc(emqx_schema:duration_ms(), sc(emqx_schema:duration_ms(),
@ -23,8 +25,8 @@ fields("emqx_slow_subs") ->
, {notice_interval, , {notice_interval,
sc(emqx_schema:duration_ms(), sc(emqx_schema:duration_ms(),
"0s", "0s",
"The interval for pushing statistics table records to the system topic. When set to 0, push is disabled" "The interval for pushing statistics table records to the system topic. "
"publish topk list to $SYS/brokers/${node}/slow_subs per notice_interval" "publish topk list to $SYS/brokers/${node}/slow_subs per notice_interval. "
"publish is disabled if set to 0s." "publish is disabled if set to 0s."
)} )}
, {notice_qos, , {notice_qos,

View File

@ -27,7 +27,7 @@
-define(NOW, erlang:system_time(millisecond)). -define(NOW, erlang:system_time(millisecond)).
-define(BASE_CONF, <<""" -define(BASE_CONF, <<"""
emqx_slow_subs { slow_subs {
enable = true enable = true
top_k_num = 5, top_k_num = 5,
expire_interval = 3000 expire_interval = 3000

View File

@ -35,7 +35,7 @@
-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). -define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
-define(CONF_DEFAULT, <<""" -define(CONF_DEFAULT, <<"""
emqx_slow_subs slow_subs
{ {
enable = true enable = true
top_k_num = 5, top_k_num = 5,
@ -121,7 +121,7 @@ t_clear(_) ->
?assertEqual(0, ets:info(?TOPK_TAB, size)). ?assertEqual(0, ets:info(?TOPK_TAB, size)).
t_settting(_) -> t_settting(_) ->
Conf = emqx:get_config([emqx_slow_subs]), Conf = emqx:get_config([slow_subs]),
Conf2 = Conf#{threshold => 1000}, Conf2 = Conf#{threshold => 1000},
{ok, Data} = request_api(put, {ok, Data} = request_api(put,
api_path(["slow_subscriptions", "settings"]), api_path(["slow_subscriptions", "settings"]),