From 9a03869bd76652eaecb648941bec0dc60e27d2e2 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 4 Jan 2022 13:20:29 +0800 Subject: [PATCH 1/7] chore(slow-subs): change root name emqx_slow_subs > slow_subs --- apps/emqx_slow_subs/etc/emqx_slow_subs.conf | 2 +- apps/emqx_slow_subs/src/emqx_slow_subs.erl | 20 +++++++++---------- .../emqx_slow_subs/src/emqx_slow_subs_api.erl | 6 +++--- .../src/emqx_slow_subs_schema.erl | 4 ++-- .../test/emqx_slow_subs_SUITE.erl | 2 +- .../test/emqx_slow_subs_api_SUITE.erl | 4 ++-- 6 files changed, 19 insertions(+), 19 deletions(-) diff --git a/apps/emqx_slow_subs/etc/emqx_slow_subs.conf b/apps/emqx_slow_subs/etc/emqx_slow_subs.conf index 9477b2e2c..8378e971c 100644 --- a/apps/emqx_slow_subs/etc/emqx_slow_subs.conf +++ b/apps/emqx_slow_subs/etc/emqx_slow_subs.conf @@ -2,7 +2,7 @@ ## EMQ X Slow Subscribers Statistics ##-------------------------------------------------------------------- -emqx_slow_subs { +slow_subs { enable = false threshold = 500ms diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs.erl b/apps/emqx_slow_subs/src/emqx_slow_subs.erl index 356810d85..370f250fb 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs.erl @@ -124,7 +124,7 @@ clear_history() -> gen_server:call(?MODULE, ?FUNCTION_NAME, ?DEF_CALL_TIMEOUT). update_settings(Conf) -> - emqx_conf:update([emqx_slow_subs], Conf, #{override_to => cluster}). + emqx_conf:update([slow_subs], Conf, #{override_to => cluster}). init_topk_tab() -> case ets:whereis(?TOPK_TAB) of @@ -146,7 +146,7 @@ post_config_update(_KeyPath, _UpdateReq, NewConf, _OldConf, _AppEnvs) -> %%-------------------------------------------------------------------- init([]) -> - emqx_conf:add_handler([emqx_slow_subs], ?MODULE), + emqx_conf:add_handler([slow_subs], ?MODULE), InitState = #{enable => false, last_tick_at => 0, @@ -154,11 +154,11 @@ init([]) -> notice_timer => undefined }, - Enable = emqx:get_config([emqx_slow_subs, enable]), + Enable = emqx:get_config([slow_subs, enable]), {ok, check_enable(Enable, InitState)}. handle_call({update_settings, #{enable := Enable} = Conf}, _From, State) -> - emqx_config:put([emqx_slow_subs], Conf), + emqx_config:put([slow_subs], Conf), State2 = check_enable(Enable, State), {reply, ok, State2}; @@ -204,7 +204,7 @@ expire_tick() -> erlang:send_after(?EXPIRE_CHECK_INTERVAL, self(), ?FUNCTION_NAME). notice_tick() -> - case emqx:get_config([emqx_slow_subs, notice_interval]) of + case emqx:get_config([slow_subs, notice_interval]) of 0 -> undefined; Interval -> erlang:send_after(Interval, self(), ?FUNCTION_NAME) @@ -225,7 +225,7 @@ do_publish([], _, _) -> ok; do_publish(Logs, Rank, TickTime) -> - BatchSize = emqx:get_config([emqx_slow_subs, notice_batch_size]), + BatchSize = emqx:get_config([slow_subs, notice_batch_size]), do_publish(Logs, BatchSize, Rank, TickTime, []). do_publish([Log | T], Size, Rank, TickTime, Cache) when Size > 0 -> @@ -254,7 +254,7 @@ publish(TickTime, Notices) -> logs => lists:reverse(Notices)}, Payload = emqx_json:encode(WindowLog), Msg = #message{ id = emqx_guid:gen() - , qos = emqx:get_config([emqx_slow_subs, notice_qos]) + , qos = emqx:get_config([slow_subs, notice_qos]) , from = ?MODULE , topic = emqx_topic:systop(?NOTICE_TOPIC_NAME) , payload = Payload @@ -264,7 +264,7 @@ publish(TickTime, Notices) -> ok. load(State) -> - MaxSizeT = emqx:get_config([emqx_slow_subs, top_k_num]), + MaxSizeT = emqx:get_config([slow_subs, top_k_num]), MaxSize = erlang:min(MaxSizeT, ?MAX_TAB_SIZE), _ = emqx:hook('message.slow_subs_stats', {?MODULE, on_stats_update, [#{max_size => MaxSize}]} @@ -283,7 +283,7 @@ unload(#{notice_timer := NoticeTimer, expire_timer := ExpireTimer} = State) -> do_clear(Logs) -> Now = ?NOW, - Interval = emqx:get_config([emqx_slow_subs, expire_interval]), + Interval = emqx:get_config([slow_subs, expire_interval]), Each = fun(#top_k{index = Index, last_update_time = Ts}) -> case Now - Ts >= Interval of true -> @@ -330,7 +330,7 @@ check_enable(Enable, #{enable := IsEnable} = State) -> end. update_threshold() -> - Threshold = emqx:get_config([emqx_slow_subs, threshold]), + Threshold = emqx:get_config([slow_subs, threshold]), emqx_message_latency_stats:update_threshold(Threshold), ok. diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl b/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl index 97abcfe9a..5ad7e207b 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl @@ -80,7 +80,7 @@ fields(record) -> ]. conf_schema() -> - Ref = hoconsc:ref(emqx_slow_subs_schema, "emqx_slow_subs"), + Ref = hoconsc:ref(emqx_slow_subs_schema, "slow_subs"), hoconsc:mk(Ref, #{}). slow_subs(delete, _) -> @@ -104,8 +104,8 @@ encode_record(#top_k{index = ?INDEX(Latency, ClientId), last_update_time => Ts}. settings(get, _) -> - {200, emqx:get_raw_config([?APP_NAME], #{})}; + {200, emqx:get_raw_config([slow_subs], #{})}; settings(put, #{body := Body}) -> _ = emqx_slow_subs:update_settings(Body), - {200, emqx:get_raw_config([?APP_NAME], #{})}. + {200, emqx:get_raw_config([slow_subs], #{})}. diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl b/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl index 2cef9affc..4a802eb4c 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl @@ -4,9 +4,9 @@ -export([roots/0, fields/1]). -roots() -> ["emqx_slow_subs"]. +roots() -> ["slow_subs"]. -fields("emqx_slow_subs") -> +fields("slow_subs") -> [ {enable, sc(boolean(), false, "switch of this function")} , {threshold, sc(emqx_schema:duration_ms(), diff --git a/apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl b/apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl index 7ecb34b6c..8111fba31 100644 --- a/apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl +++ b/apps/emqx_slow_subs/test/emqx_slow_subs_SUITE.erl @@ -27,7 +27,7 @@ -define(NOW, erlang:system_time(millisecond)). -define(BASE_CONF, <<""" -emqx_slow_subs { +slow_subs { enable = true top_k_num = 5, expire_interval = 3000 diff --git a/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl b/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl index a3fdd7fdb..832735054 100644 --- a/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl +++ b/apps/emqx_slow_subs/test/emqx_slow_subs_api_SUITE.erl @@ -35,7 +35,7 @@ -define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). -define(CONF_DEFAULT, <<""" -emqx_slow_subs +slow_subs { enable = true top_k_num = 5, @@ -121,7 +121,7 @@ t_clear(_) -> ?assertEqual(0, ets:info(?TOPK_TAB, size)). t_settting(_) -> - Conf = emqx:get_config([emqx_slow_subs]), + Conf = emqx:get_config([slow_subs]), Conf2 = Conf#{threshold => 1000}, {ok, Data} = request_api(put, api_path(["slow_subscriptions", "settings"]), From 6bb919b65c26c00d1ca94f667bd905869b27e172 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 4 Jan 2022 14:11:08 +0800 Subject: [PATCH 2/7] chore(retainer): change root name emqx_retainer > retainer --- apps/emqx_retainer/etc/emqx_retainer.conf | 2 +- apps/emqx_retainer/src/emqx_retainer.erl | 22 +++++++++---------- apps/emqx_retainer/src/emqx_retainer_api.erl | 8 +++---- .../src/emqx_retainer_mnesia.erl | 4 ++-- .../src/emqx_retainer_schema.erl | 4 ++-- .../test/emqx_retainer_SUITE.erl | 2 +- .../test/emqx_retainer_mqtt_v5_SUITE.erl | 2 +- 7 files changed, 22 insertions(+), 22 deletions(-) diff --git a/apps/emqx_retainer/etc/emqx_retainer.conf b/apps/emqx_retainer/etc/emqx_retainer.conf index 5824186d0..f7ba44be3 100644 --- a/apps/emqx_retainer/etc/emqx_retainer.conf +++ b/apps/emqx_retainer/etc/emqx_retainer.conf @@ -5,7 +5,7 @@ ## Where to store the retained messages. ## ## Notice that all nodes in the same cluster have to be configured to -emqx_retainer { +retainer { ## enable/disable emqx_retainer enable = true diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index adb8f7c71..ed81ece82 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -139,7 +139,7 @@ deliver(Result, #{context_id := Id} = Context, Pid, Topic, Cursor) -> false -> ok; _ -> - #{msg_deliver_quota := MaxDeliverNum} = emqx:get_config([?APP, flow_control]), + #{msg_deliver_quota := MaxDeliverNum} = emqx:get_config([retainer, flow_control]), case MaxDeliverNum of 0 -> _ = [Pid ! {deliver, Topic, Msg} || Msg <- Result], @@ -160,18 +160,18 @@ get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' : timestamp = Ts}) -> Ts + Interval * 1000; get_expiry_time(#message{timestamp = Ts}) -> - Interval = emqx_conf:get([?APP, msg_expiry_interval], ?DEF_EXPIRY_INTERVAL), + Interval = emqx_conf:get([retainer, msg_expiry_interval], ?DEF_EXPIRY_INTERVAL), case Interval of 0 -> 0; _ -> Ts + Interval end. get_stop_publish_clear_msg() -> - emqx_conf:get([?APP, stop_publish_clear_msg], false). + emqx_conf:get([retainer, stop_publish_clear_msg], false). -spec update_config(hocon:config()) -> {ok, _} | {error, _}. update_config(Conf) -> - emqx_conf:update([emqx_retainer], Conf, #{override_to => cluster}). + emqx_conf:update([retainer], Conf, #{override_to => cluster}). clean() -> call(?FUNCTION_NAME). @@ -196,10 +196,10 @@ stats_fun() -> %%-------------------------------------------------------------------- init([]) -> - emqx_conf:add_handler([emqx_retainer], ?MODULE), + emqx_conf:add_handler([retainer], ?MODULE), init_shared_context(), State = new_state(), - #{enable := Enable} = Cfg = emqx:get_config([?APP]), + #{enable := Enable} = Cfg = emqx:get_config([retainer]), {ok, case Enable of true -> @@ -245,7 +245,7 @@ handle_cast(Msg, State) -> handle_info(clear_expired, #{context := Context} = State) -> Mod = get_backend_module(), Mod:clear_expired(Context), - Interval = emqx_conf:get([?APP, msg_clear_interval], ?DEF_EXPIRY_INTERVAL), + Interval = emqx_conf:get([retainer, msg_clear_interval], ?DEF_EXPIRY_INTERVAL), {noreply, State#{clear_timer := add_timer(Interval, clear_expired)}, hibernate}; handle_info(release_deliver_quota, #{context := Context, wait_quotas := Waits} = State) -> @@ -261,7 +261,7 @@ handle_info(release_deliver_quota, #{context := Context, wait_quotas := Waits} = end, Waits2) end, - Interval = emqx:get_config([?APP, flow_control, quota_release_interval]), + Interval = emqx:get_config([retainer, flow_control, quota_release_interval]), {noreply, State#{release_quota_timer := add_timer(Interval, release_deliver_quota), wait_quotas := []}}; @@ -294,7 +294,7 @@ new_context(Id) -> #{context_id => Id}. is_too_big(Size) -> - Limit = emqx_conf:get([?APP, max_payload_size], ?DEF_MAX_PAYLOAD_SIZE), + Limit = emqx_conf:get([retainer, max_payload_size], ?DEF_MAX_PAYLOAD_SIZE), Limit > 0 andalso (Size > Limit). %% @private @@ -368,7 +368,7 @@ insert_shared_context(Key, Term) -> -spec get_msg_deliver_quota() -> non_neg_integer(). get_msg_deliver_quota() -> - emqx:get_config([?APP, flow_control, msg_deliver_quota]). + emqx:get_config([retainer, flow_control, msg_deliver_quota]). -spec update_config(state(), hocons:config(), hocons:config()) -> state(). update_config(State, Conf, OldConf) -> @@ -461,7 +461,7 @@ check_timer(Timer, _, _) -> -spec get_backend_module() -> backend(). get_backend_module() -> - #{type := Backend} = emqx:get_config([?APP, config]), + #{type := Backend} = emqx:get_config([retainer, config]), ModName = if Backend =:= built_in_database -> mnesia; true -> diff --git a/apps/emqx_retainer/src/emqx_retainer_api.erl b/apps/emqx_retainer/src/emqx_retainer_api.erl index 37aded29c..afbc69f5c 100644 --- a/apps/emqx_retainer/src/emqx_retainer_api.erl +++ b/apps/emqx_retainer/src/emqx_retainer_api.erl @@ -41,7 +41,7 @@ api_spec() -> {[lookup_retained_api(), with_topic_api(), config_api()], []}. conf_schema() -> - gen_schema(emqx:get_raw_config([emqx_retainer])). + gen_schema(emqx:get_raw_config([retainer])). message_props() -> properties([ @@ -126,12 +126,12 @@ with_topic_warp(Type, Params) -> check_backend(Type, Params, fun with_topic/2). config(get, _) -> - {200, emqx:get_raw_config([emqx_retainer])}; + {200, emqx:get_raw_config([retainer])}; config(put, #{body := Body}) -> try {ok, _} = emqx_retainer:update_config(Body), - {200, emqx:get_raw_config([emqx_retainer])} + {200, emqx:get_raw_config([retainer])} catch _:Reason:_ -> {400, #{code => 'UPDATE_FAILED', @@ -188,7 +188,7 @@ to_bin_string(Data) -> list_to_binary(io_lib:format("~p", [Data])). check_backend(Type, Params, Cont) -> - case emqx:get_config([emqx_retainer, config, type]) of + case emqx:get_config([retainer, config, type]) of built_in_database -> Cont(Type, Params); _ -> diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 2209c760f..6bb0ae340 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -140,7 +140,7 @@ page_read(_, Topic, Page, Limit) -> {ok, Rows}. match_messages(_, Topic, Cursor) -> - MaxReadNum = emqx:get_config([?APP, flow_control, max_read_number]), + MaxReadNum = emqx:get_config([retainer, flow_control, max_read_number]), case Cursor of undefined -> case MaxReadNum of @@ -249,7 +249,7 @@ make_cursor(Topic) -> -spec is_table_full() -> boolean(). is_table_full() -> - #{max_retained_messages := Limit} = emqx:get_config([?APP, config]), + #{max_retained_messages := Limit} = emqx:get_config([retainer, config]), Limit > 0 andalso (table_size() >= Limit). -spec table_size() -> non_neg_integer(). diff --git a/apps/emqx_retainer/src/emqx_retainer_schema.erl b/apps/emqx_retainer/src/emqx_retainer_schema.erl index d6bc598a7..308fc528f 100644 --- a/apps/emqx_retainer/src/emqx_retainer_schema.erl +++ b/apps/emqx_retainer/src/emqx_retainer_schema.erl @@ -6,9 +6,9 @@ -define(TYPE(Type), hoconsc:mk(Type)). -roots() -> ["emqx_retainer"]. +roots() -> ["retainer"]. -fields("emqx_retainer") -> +fields("retainer") -> [ {enable, sc(boolean(), false)} , {msg_expiry_interval, sc(emqx_schema:duration_ms(), "0s")} , {msg_clear_interval, sc(emqx_schema:duration_ms(), "0s")} diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index 54657cf3d..e51417e6e 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -28,7 +28,7 @@ all() -> emqx_common_test_helpers:all(?MODULE). -define(BASE_CONF, <<""" -emqx_retainer { +retainer { enable = true msg_clear_interval = 0s msg_expiry_interval = 0s diff --git a/apps/emqx_retainer/test/emqx_retainer_mqtt_v5_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_mqtt_v5_SUITE.erl index 40cd30e69..9a887d2d3 100644 --- a/apps/emqx_retainer/test/emqx_retainer_mqtt_v5_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_mqtt_v5_SUITE.erl @@ -22,7 +22,7 @@ -include_lib("eunit/include/eunit.hrl"). -define(BASE_CONF, <<""" -emqx_retainer { +retainer { enable = true msg_clear_interval = 0s msg_expiry_interval = 0s From af2868e59869f320444968dc9b737733ff336247 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 4 Jan 2022 14:23:52 +0800 Subject: [PATCH 3/7] chore(dashboard): change root name emqx_dashboard > dashboard --- apps/emqx_authn/test/emqx_authn_api_SUITE.erl | 2 +- apps/emqx_dashboard/etc/emqx_dashboard.conf | 2 +- apps/emqx_dashboard/src/emqx_dashboard.erl | 2 +- apps/emqx_dashboard/src/emqx_dashboard_admin.erl | 2 +- apps/emqx_dashboard/src/emqx_dashboard_collection.erl | 4 ++-- apps/emqx_dashboard/src/emqx_dashboard_middleware.erl | 2 +- apps/emqx_dashboard/src/emqx_dashboard_schema.erl | 4 ++-- apps/emqx_dashboard/src/emqx_dashboard_token.erl | 2 +- apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl | 2 +- 9 files changed, 11 insertions(+), 11 deletions(-) diff --git a/apps/emqx_authn/test/emqx_authn_api_SUITE.erl b/apps/emqx_authn/test/emqx_authn_api_SUITE.erl index 8a61af36a..63c631b4b 100644 --- a/apps/emqx_authn/test/emqx_authn_api_SUITE.erl +++ b/apps/emqx_authn/test/emqx_authn_api_SUITE.erl @@ -79,7 +79,7 @@ set_special_configs(emqx_dashboard) -> port => 18083 }] }, - emqx_config:put([emqx_dashboard], Config), + emqx_config:put([dashboard], Config), ok; set_special_configs(_App) -> ok. diff --git a/apps/emqx_dashboard/etc/emqx_dashboard.conf b/apps/emqx_dashboard/etc/emqx_dashboard.conf index ba2a68eeb..22c090457 100644 --- a/apps/emqx_dashboard/etc/emqx_dashboard.conf +++ b/apps/emqx_dashboard/etc/emqx_dashboard.conf @@ -2,7 +2,7 @@ ## EMQ X Dashboard ##-------------------------------------------------------------------- -emqx_dashboard { +dashboard { default_username = "admin" default_password = "public" ## notice: sample_interval should be divisible by 60. diff --git a/apps/emqx_dashboard/src/emqx_dashboard.erl b/apps/emqx_dashboard/src/emqx_dashboard.erl index fbd6d32eb..a8ec70b1b 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard.erl @@ -90,7 +90,7 @@ listeners() -> Name = listener_name(Protocol, Port), RanchOptions = ranch_opts(maps:without([protocol], ListenerOptions)), {Name, Protocol, Port, RanchOptions} - end || ListenerOptions <- emqx_conf:get([emqx_dashboard, listeners], [])]. + end || ListenerOptions <- emqx_conf:get([dashboard, listeners], [])]. ranch_opts(RanchOptions) -> Keys = [ {ack_timeout, handshake_timeout} diff --git a/apps/emqx_dashboard/src/emqx_dashboard_admin.erl b/apps/emqx_dashboard/src/emqx_dashboard_admin.erl index 39e73f659..158c42e6e 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_admin.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_admin.erl @@ -229,7 +229,7 @@ add_default_user() -> add_default_user(binenv(default_username), binenv(default_password)). binenv(Key) -> - iolist_to_binary(emqx_conf:get([emqx_dashboard, Key], "")). + iolist_to_binary(emqx_conf:get([dashboard, Key], "")). add_default_user(Username, Password) when ?EMPTY_KEY(Username) orelse ?EMPTY_KEY(Password) -> {ok, empty}; diff --git a/apps/emqx_dashboard/src/emqx_dashboard_collection.erl b/apps/emqx_dashboard/src/emqx_dashboard_collection.erl index b125dfa3e..00f9d6d10 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_collection.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_collection.erl @@ -55,7 +55,7 @@ get_collect() -> gen_server:call(whereis(?MODULE), get_collect). init([]) -> timer(next_interval(), collect), timer(get_today_remaining_seconds(), clear_expire_data), - ExpireInterval = emqx_conf:get([emqx_dashboard, monitor, interval], ?EXPIRE_INTERVAL), + ExpireInterval = emqx_conf:get([dashboard, monitor, interval], ?EXPIRE_INTERVAL), State = #{ count => count(), expire_interval => ExpireInterval, @@ -75,7 +75,7 @@ next_interval() -> (1000 * interval()) - (erlang:system_time(millisecond) rem (1000 * interval())) - 1. interval() -> - emqx_conf:get([?APP, sample_interval], ?DEFAULT_INTERVAL). + emqx_conf:get([dashboard, sample_interval], ?DEFAULT_INTERVAL). count() -> 60 div interval(). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_middleware.erl b/apps/emqx_dashboard/src/emqx_dashboard_middleware.erl index 7c22ee3ef..77b6dbdd0 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_middleware.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_middleware.erl @@ -21,7 +21,7 @@ -export([execute/2]). execute(Req, Env) -> - CORS = emqx_conf:get([emqx_dashboard, cors], false), + CORS = emqx_conf:get([dashboard, cors], false), case CORS andalso cowboy_req:header(<<"origin">>, Req, undefined) of false -> {ok, Req, Env}; diff --git a/apps/emqx_dashboard/src/emqx_dashboard_schema.erl b/apps/emqx_dashboard/src/emqx_dashboard_schema.erl index cd1bcf0d8..fddf8ca62 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_schema.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_schema.erl @@ -22,9 +22,9 @@ ,namespace/0]). namespace() -> <<"dashboard">>. -roots() -> ["emqx_dashboard"]. +roots() -> ["dashboard"]. -fields("emqx_dashboard") -> +fields("dashboard") -> [ {listeners, hoconsc:array(hoconsc:union([hoconsc:ref(?MODULE, "http"), hoconsc:ref(?MODULE, "https")]))} , {default_username, fun default_username/1} diff --git a/apps/emqx_dashboard/src/emqx_dashboard_token.erl b/apps/emqx_dashboard/src/emqx_dashboard_token.erl index e8c9f8619..515a59cae 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_token.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_token.erl @@ -163,7 +163,7 @@ jwt_expiration_time() -> erlang:system_time(millisecond) + token_ttl(). token_ttl() -> - emqx_conf:get([emqx_dashboard, token_expired_time], ?EXPTIME). + emqx_conf:get([dashboard, token_expired_time], ?EXPTIME). format(Token, Username, ExpTime) -> #?ADMIN_JWT{ diff --git a/apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl b/apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl index 11d8e8052..9fa20a6d8 100644 --- a/apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_dashboard_SUITE.erl @@ -94,7 +94,7 @@ set_special_configs(emqx_dashboard) -> default_username => <<"admin">>, default_password => <<"public">> }, - emqx_config:put([emqx_dashboard], Config), + emqx_config:put([dashboard], Config), ok; set_special_configs(_) -> ok. From 10b110447cea66165125969c461c2ceb656e86f5 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 4 Jan 2022 15:30:24 +0800 Subject: [PATCH 4/7] chore(exhook): change root name emqx_exhook > exhook --- apps/emqx_exhook/etc/emqx_exhook.conf | 2 +- apps/emqx_exhook/src/emqx_exhook_api.erl | 8 ++++---- apps/emqx_exhook/src/emqx_exhook_mgr.erl | 8 ++++---- apps/emqx_exhook/src/emqx_exhook_schema.erl | 6 +++--- apps/emqx_exhook/test/emqx_exhook_SUITE.erl | 10 +++++----- apps/emqx_exhook/test/emqx_exhook_api_SUITE.erl | 15 ++++++++------- apps/emqx_exhook/test/props/prop_exhook_hooks.erl | 11 ++++++----- .../test/emqx_mgmt_api_test_util.erl | 2 +- 8 files changed, 32 insertions(+), 30 deletions(-) diff --git a/apps/emqx_exhook/etc/emqx_exhook.conf b/apps/emqx_exhook/etc/emqx_exhook.conf index 8769e9a2d..fcdb0de96 100644 --- a/apps/emqx_exhook/etc/emqx_exhook.conf +++ b/apps/emqx_exhook/etc/emqx_exhook.conf @@ -2,7 +2,7 @@ ## EMQ X Hooks ##==================================================================== -emqx_exhook { +exhook { servers = [ ##{ diff --git a/apps/emqx_exhook/src/emqx_exhook_api.erl b/apps/emqx_exhook/src/emqx_exhook_api.erl index 7de57f724..bd3351616 100644 --- a/apps/emqx_exhook/src/emqx_exhook_api.erl +++ b/apps/emqx_exhook/src/emqx_exhook_api.erl @@ -146,7 +146,7 @@ exhooks(get, _) -> {200, ServerL2}; exhooks(post, #{body := Body}) -> - case emqx_exhook_mgr:update_config([emqx_exhook, servers], {add, Body}) of + case emqx_exhook_mgr:update_config([exhook, servers], {add, Body}) of {ok, Result} -> {201, Result}; {error, Error} -> @@ -168,7 +168,7 @@ action_with_name(get, #{bindings := #{name := Name}}) -> end; action_with_name(put, #{bindings := #{name := Name}, body := Body}) -> - case emqx_exhook_mgr:update_config([emqx_exhook, servers], + case emqx_exhook_mgr:update_config([exhook, servers], {update, Name, Body}) of {ok, not_found} -> {400, #{code => <<"BAD_REQUEST">>, @@ -187,7 +187,7 @@ action_with_name(put, #{bindings := #{name := Name}, body := Body}) -> end; action_with_name(delete, #{bindings := #{name := Name}}) -> - case emqx_exhook_mgr:update_config([emqx_exhook, servers], + case emqx_exhook_mgr:update_config([exhook, servers], {delete, Name}) of {ok, _} -> {200}; @@ -200,7 +200,7 @@ action_with_name(delete, #{bindings := #{name := Name}}) -> move(post, #{bindings := #{name := Name}, body := Body}) -> #{<<"position">> := PositionT, <<"related">> := Related} = Body, Position = erlang:binary_to_atom(PositionT), - case emqx_exhook_mgr:update_config([emqx_exhook, servers], + case emqx_exhook_mgr:update_config([exhook, servers], {move, Name, Position, Related}) of {ok, ok} -> {200}; diff --git a/apps/emqx_exhook/src/emqx_exhook_mgr.erl b/apps/emqx_exhook/src/emqx_exhook_mgr.erl index 35bbe00a4..d3d429021 100644 --- a/apps/emqx_exhook/src/emqx_exhook_mgr.erl +++ b/apps/emqx_exhook/src/emqx_exhook_mgr.erl @@ -108,10 +108,10 @@ lookup(Name) -> call({lookup, Name}). enable(Name) -> - update_config([emqx_exhook, servers], {enable, Name, true}). + update_config([exhook, servers], {enable, Name, true}). disable(Name) -> - update_config([emqx_exhook, servers], {enable, Name, false}). + update_config([exhook, servers], {enable, Name, false}). server_status(Name) -> call({server_status, Name}). @@ -176,8 +176,8 @@ post_config_update(_KeyPath, UpdateReq, NewConf, _OldConf, _AppEnvs) -> init([]) -> process_flag(trap_exit, true), - emqx_conf:add_handler([emqx_exhook, servers], ?MODULE), - ServerL = emqx:get_config([emqx_exhook, servers]), + emqx_conf:add_handler([exhook, servers], ?MODULE), + ServerL = emqx:get_config([exhook, servers]), {Waiting, Running, Stopped} = load_all_servers(ServerL), Orders = reorder(ServerL), {ok, ensure_reload_timer( diff --git a/apps/emqx_exhook/src/emqx_exhook_schema.erl b/apps/emqx_exhook/src/emqx_exhook_schema.erl index 88a61ef1c..a8666798c 100644 --- a/apps/emqx_exhook/src/emqx_exhook_schema.erl +++ b/apps/emqx_exhook/src/emqx_exhook_schema.erl @@ -34,11 +34,11 @@ -export([namespace/0, roots/0, fields/1, server_config/0]). -namespace() -> emqx_exhook. +namespace() -> exhook. -roots() -> [emqx_exhook]. +roots() -> [exhook]. -fields(emqx_exhook) -> +fields(exhook) -> [{servers, sc(hoconsc:array(ref(server)), #{default => []})} diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl index 490e002e3..002900f1e 100644 --- a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl @@ -24,11 +24,11 @@ -define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). -define(CONF_DEFAULT, <<" -emqx_exhook -{servers = [ - {name = default, - url = \"http://127.0.0.1:9000\" - }] +exhook { + servers = [ + { name = default, + url = \"http://127.0.0.1:9000\" + }] } ">>). diff --git a/apps/emqx_exhook/test/emqx_exhook_api_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_api_SUITE.erl index f95f2b98c..b82451714 100644 --- a/apps/emqx_exhook/test/emqx_exhook_api_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_api_SUITE.erl @@ -27,12 +27,13 @@ -define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). -define(CONF_DEFAULT, <<" -emqx_exhook {servers = [ - {name = default, - url = \"http://127.0.0.1:9000\" - } - ] - } +exhook { + servers = + [ { name = default, + url = \"http://127.0.0.1:9000\" + } + ] +} ">>). all() -> @@ -49,7 +50,7 @@ init_per_suite(Config) -> _ = emqx_exhook_demo_svr:start(), ok = emqx_config:init_load(emqx_exhook_schema, ?CONF_DEFAULT), emqx_mgmt_api_test_util:init_suite([emqx_exhook]), - [Conf] = emqx:get_config([emqx_exhook, servers]), + [Conf] = emqx:get_config([exhook, servers]), [{template, Conf} | Config]. end_per_suite(Config) -> diff --git a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl index 284e2b89b..84999e4a8 100644 --- a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl +++ b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl @@ -31,11 +31,12 @@ ]). -define(CONF_DEFAULT, <<" -emqx_exhook -{servers = [ - {name = default, - url = \"http://127.0.0.1:9000\" - }] +exhook { + servers = + [ { name = default, + url = \"http://127.0.0.1:9000\" + } + ] } ">>). diff --git a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl index 8dcbfcc5e..42481abcb 100644 --- a/apps/emqx_management/test/emqx_mgmt_api_test_util.erl +++ b/apps/emqx_management/test/emqx_mgmt_api_test_util.erl @@ -45,7 +45,7 @@ set_special_configs(emqx_dashboard) -> port => 18083 }] }, - emqx_config:put([emqx_dashboard], Config), + emqx_config:put([dashboard], Config), ok; set_special_configs(_App) -> ok. From 12cc9065f879d92ce1f395d4e6dea5df6b2241df Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 4 Jan 2022 15:34:43 +0800 Subject: [PATCH 5/7] chore(limiter): change root name emqx_limiter > limiter --- apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf | 3 ++- .../src/emqx_limiter/src/emqx_limiter_schema.erl | 16 ++++++++-------- .../src/emqx_limiter/src/emqx_limiter_server.erl | 4 ++-- .../emqx_limiter/src/emqx_limiter_server_sup.erl | 2 +- apps/emqx/test/emqx_channel_SUITE.erl | 6 +++--- apps/emqx/test/emqx_ratelimiter_SUITE.erl | 14 +++++++------- 6 files changed, 23 insertions(+), 22 deletions(-) diff --git a/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf b/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf index 5c199e63f..78cb35207 100644 --- a/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf +++ b/apps/emqx/src/emqx_limiter/etc/emqx_limiter.conf @@ -1,7 +1,8 @@ ##-------------------------------------------------------------------- ## Emq X Rate Limiter ##-------------------------------------------------------------------- -emqx_limiter { + +limiter { bytes_in { global.rate = infinity # token generation rate zone.default.rate = infinity diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index 37002edc4..ed87ce6c2 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -56,16 +56,16 @@ -import(emqx_schema, [sc/2, map/2]). -roots() -> [emqx_limiter]. - -fields(emqx_limiter) -> - [ {bytes_in, sc(ref(limiter), #{})} - , {message_in, sc(ref(limiter), #{})} - , {connection, sc(ref(limiter), #{})} - , {message_routing, sc(ref(limiter), #{})} - ]; +roots() -> [limiter]. fields(limiter) -> + [ {bytes_in, sc(ref(limiter_opts), #{})} + , {message_in, sc(ref(limiter_opts), #{})} + , {connection, sc(ref(limiter_opts), #{})} + , {message_routing, sc(ref(limiter_opts), #{})} + ]; + +fields(limiter_opts) -> [ {global, sc(ref(rate_burst), #{})} , {zone, sc(map("zone name", ref(rate_burst)), #{})} , {bucket, sc(map("bucket id", ref(bucket)), diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index f683ef68e..5e189a8b1 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -95,7 +95,7 @@ -spec connect(limiter_type(), bucket_name() | #{limiter_type() => bucket_name()}) -> emqx_htb_limiter:limiter(). connect(Type, BucketName) when is_atom(BucketName) -> - Path = [emqx_limiter, Type, bucket, BucketName], + Path = [limiter, Type, bucket, BucketName], case emqx:get_config(Path, undefined) of undefined -> ?SLOG(error, #{msg => "bucket_config_not_found", path => Path}), @@ -447,7 +447,7 @@ dispatch_burst_to_buckets([], _, Alloced, Nodes) -> init_tree(Type, State) -> #{global := Global, zone := Zone, - bucket := Bucket} = emqx:get_config([emqx_limiter, Type]), + bucket := Bucket} = emqx:get_config([limiter, Type]), {Factor, Root} = make_root(Global, Zone), State2 = State#{root := Root}, {NodeId, State3} = make_zone(maps:to_list(Zone), Factor, 1, State2), diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl index 7698449db..7f8d227ec 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server_sup.erl @@ -89,6 +89,6 @@ make_child(Type) -> modules => [emqx_limiter_server]}. childs() -> - Conf = emqx:get_config([emqx_limiter]), + Conf = emqx:get_config([limiter]), Types = maps:keys(Conf), [make_child(Type) || Type <- Types]. diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index da7e24058..9e64e70c9 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -130,7 +130,7 @@ basic_conf() -> stats => stats_conf(), listeners => listeners_conf(), zones => zone_conf(), - emqx_limiter => emqx:get_config([emqx_limiter]) + limiter => emqx:get_config([limiter]) }. set_test_listener_confs() -> @@ -201,7 +201,7 @@ modify_limiter(TestCase, NewConf) -> %% per_client 5/1s,5 %% aggregated 10/1s,10 -modify_limiter(#{emqx_limiter := Limiter} = NewConf) -> +modify_limiter(#{limiter := Limiter} = NewConf) -> #{message_routing := #{bucket := Bucket} = Routing} = Limiter, #{default := #{per_client := Client} = Default} = Bucket, Client2 = Client#{rate := 5, @@ -216,7 +216,7 @@ modify_limiter(#{emqx_limiter := Limiter} = NewConf) -> Bucket2 = Bucket#{default := Default2}, Routing2 = Routing#{bucket := Bucket2}, - NewConf2 = NewConf#{emqx_limiter := Limiter#{message_routing := Routing2}}, + NewConf2 = NewConf#{limiter := Limiter#{message_routing := Routing2}}, emqx_config:put(NewConf2), emqx_limiter_manager:restart_server(message_routing), ok. diff --git a/apps/emqx/test/emqx_ratelimiter_SUITE.erl b/apps/emqx/test/emqx_ratelimiter_SUITE.erl index 6d21399ce..17a63e79a 100644 --- a/apps/emqx/test/emqx_ratelimiter_SUITE.erl +++ b/apps/emqx/test/emqx_ratelimiter_SUITE.erl @@ -25,7 +25,7 @@ -include_lib("common_test/include/ct.hrl"). -define(BASE_CONF, <<""" -emqx_limiter { +limiter { bytes_in { global.rate = infinity zone.default.rate = infinity @@ -496,7 +496,7 @@ start_client(Name, EndTime, Counter, Number) -> start_client(Name, EndTime, Counter) -> #{per_client := PerClient} = - emqx_config:get([emqx_limiter, message_routing, bucket, Name]), + emqx_config:get([limiter, message_routing, bucket, Name]), #{rate := Rate} = PerClient, Client = #client{start = ?NOW, endtime = EndTime, @@ -578,18 +578,18 @@ to_rate(Str) -> Rate. with_global(Modifier, ZoneName, ZoneModifier, Buckets, Case) -> - Path = [emqx_limiter, message_routing], + Path = [limiter, message_routing], #{global := Global} = Cfg = emqx_config:get(Path), Cfg2 = Cfg#{global := Modifier(Global)}, with_zone(Cfg2, ZoneName, ZoneModifier, Buckets, Case). with_zone(Name, Modifier, Buckets, Case) -> - Path = [emqx_limiter, message_routing], + Path = [limiter, message_routing], Cfg = emqx_config:get(Path), with_zone(Cfg, Name, Modifier, Buckets, Case). with_zone(Cfg, Name, Modifier, Buckets, Case) -> - Path = [emqx_limiter, message_routing], + Path = [limiter, message_routing], #{zone := ZoneCfgs, bucket := BucketCfgs} = Cfg, ZoneCfgs2 = apply_modifier(Name, Modifier, ZoneCfgs), @@ -598,11 +598,11 @@ with_zone(Cfg, Name, Modifier, Buckets, Case) -> with_config(Path, fun(_) -> Cfg2 end, Case). with_bucket(Bucket, Modifier, Case) -> - Path = [emqx_limiter, message_routing, bucket, Bucket], + Path = [limiter, message_routing, bucket, Bucket], with_config(Path, Modifier, Case). with_per_client(Bucket, Modifier, Case) -> - Path = [emqx_limiter, message_routing, bucket, Bucket, per_client], + Path = [limiter, message_routing, bucket, Bucket, per_client], with_config(Path, Modifier, Case). with_config(Path, Modifier, Case) -> From b1959086d952cb34f88d7af55406a39442fefb3c Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 4 Jan 2022 16:57:52 +0800 Subject: [PATCH 6/7] chore: fix elvis warnings --- .../emqx_limiter/src/emqx_limiter_schema.erl | 3 +- .../emqx_limiter/src/emqx_limiter_server.erl | 7 +- apps/emqx/test/emqx_channel_SUITE.erl | 14 ++- apps/emqx_exhook/src/emqx_exhook_api.erl | 6 +- apps/emqx_retainer/src/emqx_retainer.erl | 8 +- .../src/emqx_retainer_mnesia.erl | 6 +- .../test/emqx_retainer_SUITE.erl | 96 +++++++++++++++---- .../test/emqx_retainer_mqtt_v5_SUITE.erl | 63 +++++++++--- apps/emqx_slow_subs/src/emqx_slow_subs.erl | 3 +- .../emqx_slow_subs/src/emqx_slow_subs_api.erl | 15 ++- .../src/emqx_slow_subs_schema.erl | 4 +- 11 files changed, 172 insertions(+), 53 deletions(-) diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index ed87ce6c2..91936c8f0 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -94,7 +94,8 @@ fields(client_bucket) -> , {initial, sc(initial(), #{default => "0"})} %% low_water_mark add for emqx_channel and emqx_session %% both modules consume first and then check - %% so we need to use this value to prevent excessive consumption (e.g, consumption from an empty bucket) + %% so we need to use this value to prevent excessive consumption + %% (e.g, consumption from an empty bucket) , {low_water_mark, sc(initial(), #{desc => "if the remaining tokens are lower than this value, the check/consume will succeed, but it will be forced to hang for a short period of time", diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl index 5e189a8b1..1bccbf2a0 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_server.erl @@ -89,6 +89,8 @@ -export_type([index/0]). -import(emqx_limiter_decimal, [add/2, sub/2, mul/2, put_to_counter/3]). +-elvis([{elvis_style, no_if_expression, disable}]). + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- @@ -337,8 +339,9 @@ longitudinal(#{id := Id, case lists:min([ShouldAlloc, Flow, Capacity]) of Avaiable when Avaiable > 0 -> - %% XXX if capacity is infinity, and flow always > 0, the value in counter - %% will be overflow at some point in the future, do we need to deal with this situation??? + %% XXX if capacity is infinity, and flow always > 0, the value in + %% counter will be overflow at some point in the future, do we need + %% to deal with this situation??? {Inc, Node2} = emqx_limiter_correction:add(Avaiable, Node), counters:add(Counter, Index, Inc), diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index 9e64e70c9..19bc2b3c3 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -923,7 +923,12 @@ t_ws_cookie_init(_) -> conn_mod => emqx_ws_connection, ws_cookie => WsCookie }, - Channel = emqx_channel:init(ConnInfo, #{zone => default, limiter => limiter_cfg(), listener => {tcp, default}}), + Channel = emqx_channel:init( + ConnInfo, + #{zone => default, + limiter => limiter_cfg(), + listener => {tcp, default} + }), ?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)). %%-------------------------------------------------------------------- @@ -948,7 +953,12 @@ channel(InitFields) -> maps:fold(fun(Field, Value, Channel) -> emqx_channel:set_field(Field, Value, Channel) end, - emqx_channel:init(ConnInfo, #{zone => default, limiter => limiter_cfg(), listener => {tcp, default}}), + emqx_channel:init( + ConnInfo, + #{zone => default, + limiter => limiter_cfg(), + listener => {tcp, default} + }), maps:merge(#{clientinfo => clientinfo(), session => session(), conn_state => connected diff --git a/apps/emqx_exhook/src/emqx_exhook_api.erl b/apps/emqx_exhook/src/emqx_exhook_api.erl index bd3351616..7581dd17b 100644 --- a/apps/emqx_exhook/src/emqx_exhook_api.erl +++ b/apps/emqx_exhook/src/emqx_exhook_api.erl @@ -77,7 +77,8 @@ schema("/exhooks/:name") -> description => <<"Delete the server">>, parameters => params_server_name_in_path(), responses => #{204 => <<>>, - 500 => error_codes([?BAD_RPC], <<"Bad RPC">>) } + 500 => error_codes([?BAD_RPC], <<"Bad RPC">>) + } } }; @@ -176,7 +177,8 @@ action_with_name(put, #{bindings := #{name := Name}, body := Body}) -> }}; {ok, {error, Reason}} -> {400, #{code => <<"BAD_REQUEST">>, - message => unicode:characters_to_binary(io_lib:format("Error Reason:~p~n", [Reason])) + message => unicode:characters_to_binary( + io_lib:format("Error Reason:~p~n", [Reason])) }}; {ok, _} -> {200}; diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index ed81ece82..3f43c1597 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -461,11 +461,9 @@ check_timer(Timer, _, _) -> -spec get_backend_module() -> backend(). get_backend_module() -> - #{type := Backend} = emqx:get_config([retainer, config]), - ModName = if Backend =:= built_in_database -> - mnesia; - true -> - Backend + ModName = case emqx:get_config([retainer, config]) of + #{type := built_in_database} -> mnesia; + #{type := Backend} -> Backend end, erlang:list_to_existing_atom(io_lib:format("~ts_~ts", [?APP, ModName])). diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 6bb0ae340..28be3aa55 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -95,7 +95,11 @@ store_retained(_, Msg =#message{topic = Topic}) -> end, case mria:transaction(?RETAINER_SHARD, Fun) of {atomic, ok} -> ok; - {aborted, Reason} -> ?SLOG(error, #{msg => "failed_to_retain_message", topic => Topic, reason => Reason}) + {aborted, Reason} -> + ?SLOG(error, #{ msg => "failed_to_retain_message" + , topic => Topic + , reason => Reason + }) end end. diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index e51417e6e..54e19e42c 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -90,7 +90,10 @@ t_store_and_clean(_) -> {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), - emqtt:publish(C1, <<"retained">>, <<"this is a retained message">>, [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"retained">>, + <<"this is a retained message">>, + [{qos, 0}, {retain, true}]), timer:sleep(100), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]), ?assertEqual(1, length(receive_messages(1))), @@ -118,7 +121,10 @@ t_retain_handling(_) -> ?assertEqual(0, length(receive_messages(1))), {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/#">>), - emqtt:publish(C1, <<"retained">>, <<"this is a retained message">>, [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"retained">>, + <<"this is a retained message">>, + [{qos, 0}, {retain, true}]), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained">>, [{qos, 0}, {rh, 0}]), ?assertEqual(1, length(receive_messages(1))), @@ -148,9 +154,18 @@ t_retain_handling(_) -> t_wildcard_subscription(_) -> {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), - emqtt:publish(C1, <<"retained/0">>, <<"this is a retained message 0">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"retained/1">>, <<"this is a retained message 1">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"retained/a/b/c">>, <<"this is a retained message 2">>, [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"retained/0">>, + <<"this is a retained message 0">>, + [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"retained/1">>, + <<"this is a retained message 1">>, + [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"retained/a/b/c">>, + <<"this is a retained message 2">>, + [{qos, 0}, {retain, true}]), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+/b/#">>, 0), @@ -165,11 +180,26 @@ t_message_expiry(_) -> {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), - emqtt:publish(C1, <<"retained/0">>, #{'Message-Expiry-Interval' => 0}, <<"don't expire">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"retained/1">>, #{'Message-Expiry-Interval' => 2}, <<"expire">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"retained/2">>, #{'Message-Expiry-Interval' => 5}, <<"don't expire">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"retained/3">>, <<"don't expire">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"$SYS/retained/4">>, <<"don't expire">>, [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"retained/0">>, #{'Message-Expiry-Interval' => 0}, + <<"don't expire">>, + [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"retained/1">>, #{'Message-Expiry-Interval' => 2}, + <<"expire">>, + [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"retained/2">>, #{'Message-Expiry-Interval' => 5}, + <<"don't expire">>, + [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"retained/3">>, + <<"don't expire">>, + [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"$SYS/retained/4">>, + <<"don't expire">>, + [{qos, 0}, {retain, true}]), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0), @@ -210,9 +240,18 @@ t_message_expiry_2(_) -> t_clean(_) -> {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), - emqtt:publish(C1, <<"retained/0">>, <<"this is a retained message 0">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"retained/1">>, <<"this is a retained message 1">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"retained/test/0">>, <<"this is a retained message 2">>, [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"retained/0">>, + <<"this is a retained message 0">>, + [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"retained/1">>, + <<"this is a retained message 1">>, + [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"retained/test/0">>, + <<"this is a retained message 2">>, + [{qos, 0}, {retain, true}]), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), ?assertEqual(3, length(receive_messages(3))), @@ -227,7 +266,11 @@ t_stop_publish_clear_msg(_) -> emqx_retainer:update_config(#{<<"stop_publish_clear_msg">> => true}), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), - emqtt:publish(C1, <<"retained/0">>, <<"this is a retained message 0">>, [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"retained/0">>, + <<"this is a retained message 0">>, + [{qos, 0}, {retain, true}] + ), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), ?assertEqual(1, length(receive_messages(1))), @@ -239,14 +282,27 @@ t_stop_publish_clear_msg(_) -> ok = emqtt:disconnect(C1). t_flow_control(_) -> - emqx_retainer:update_config(#{<<"flow_control">> => #{<<"max_read_number">> => 1, - <<"msg_deliver_quota">> => 1, - <<"quota_release_interval">> => <<"1s">>}}), + emqx_retainer:update_config(#{<<"flow_control">> => + #{<<"max_read_number">> => 1, + <<"msg_deliver_quota">> => 1, + <<"quota_release_interval">> => <<"1s">>}}), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, _} = emqtt:connect(C1), - emqtt:publish(C1, <<"retained/0">>, <<"this is a retained message 0">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"retained/1">>, <<"this is a retained message 1">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"retained/3">>, <<"this is a retained message 3">>, [{qos, 0}, {retain, true}]), + emqtt:publish( + C1, <<"retained/0">>, + <<"this is a retained message 0">>, + [{qos, 0}, {retain, true}] + ), + emqtt:publish( + C1, <<"retained/1">>, + <<"this is a retained message 1">>, + [{qos, 0}, {retain, true}] + ), + emqtt:publish( + C1, <<"retained/3">>, + <<"this is a retained message 3">>, + [{qos, 0}, {retain, true}] + ), Begin = erlang:system_time(millisecond), {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/#">>, [{qos, 0}, {rh, 0}]), ?assertEqual(3, length(receive_messages(3))), diff --git a/apps/emqx_retainer/test/emqx_retainer_mqtt_v5_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_mqtt_v5_SUITE.erl index 9a887d2d3..c5c828e55 100644 --- a/apps/emqx_retainer/test/emqx_retainer_mqtt_v5_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_mqtt_v5_SUITE.erl @@ -94,13 +94,23 @@ t_publish_retain_message(_) -> {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, _} = emqtt:connect(Client1), - {ok, _} = emqtt:publish(Client1, Topic, #{}, <<"retained message">>, [{qos, 2}, {retain, true}]), - {ok, _} = emqtt:publish(Client1, Topic, #{}, <<"new retained message">>, [{qos, 2}, {retain, true}]), - {ok, _} = emqtt:publish(Client1, Topic, #{}, <<"not retained message">>, [{qos, 2}, {retain, false}]), + {ok, _} = emqtt:publish( + Client1, Topic, #{}, + <<"retained message">>, + [{qos, 2}, {retain, true}]), + {ok, _} = emqtt:publish( + Client1, Topic, #{}, + <<"new retained message">>, + [{qos, 2}, {retain, true}]), + {ok, _} = emqtt:publish( + Client1, Topic, #{}, + <<"not retained message">>, + [{qos, 2}, {retain, false}]), {ok, _, [2]} = emqtt:subscribe(Client1, Topic, 2), [Msg] = receive_messages(1), - ?assertEqual(<<"new retained message">>, maps:get(payload, Msg)), %% [MQTT-3.3.1-5] [MQTT-3.3.1-8] + %% [MQTT-3.3.1-5] [MQTT-3.3.1-8] + ?assertEqual(<<"new retained message">>, maps:get(payload, Msg)), {ok, _, [0]} = emqtt:unsubscribe(Client1, Topic), {ok, _} = emqtt:publish(Client1, Topic, #{}, <<"">>, [{qos, 2}, {retain, true}]), @@ -113,16 +123,33 @@ t_publish_retain_message(_) -> t_publish_message_expiry_interval(_) -> {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, _} = emqtt:connect(Client1), - {ok, _} = emqtt:publish(Client1, <<"topic/A">>, #{'Message-Expiry-Interval' => 1}, <<"retained message">>, [{qos, 1}, {retain, true}]), - {ok, _} = emqtt:publish(Client1, <<"topic/B">>, #{'Message-Expiry-Interval' => 1}, <<"retained message">>, [{qos, 2}, {retain, true}]), - {ok, _} = emqtt:publish(Client1, <<"topic/C">>, #{'Message-Expiry-Interval' => 10}, <<"retained message">>, [{qos, 1}, {retain, true}]), - {ok, _} = emqtt:publish(Client1, <<"topic/D">>, #{'Message-Expiry-Interval' => 10}, <<"retained message">>, [{qos, 2}, {retain, true}]), + {ok, _} = emqtt:publish( + Client1, <<"topic/A">>, #{'Message-Expiry-Interval' => 1}, + <<"retained message">>, + [{qos, 1}, {retain, true}]), + {ok, _} = emqtt:publish( + Client1, <<"topic/B">>, #{'Message-Expiry-Interval' => 1}, + <<"retained message">>, + [{qos, 2}, {retain, true}]), + {ok, _} = emqtt:publish( + Client1, <<"topic/C">>, #{'Message-Expiry-Interval' => 10}, + <<"retained message">>, + [{qos, 1}, {retain, true}]), + {ok, _} = emqtt:publish( + Client1, <<"topic/D">>, #{'Message-Expiry-Interval' => 10}, + <<"retained message">>, + [{qos, 2}, {retain, true}]), timer:sleep(1500), {ok, _, [2]} = emqtt:subscribe(Client1, <<"topic/+">>, 2), Msgs = receive_messages(4), ?assertEqual(2, length(Msgs)), %% [MQTT-3.3.2-5] - L = lists:map(fun(Msg) -> MessageExpiryInterval = maps:get('Message-Expiry-Interval', maps:get(properties, Msg)), MessageExpiryInterval < 10 end, Msgs), + L = lists:map( + fun(Msg) -> + MessageExpiryInterval = maps:get('Message-Expiry-Interval', + maps:get(properties, Msg)), + MessageExpiryInterval < 10 + end, Msgs), ?assertEqual(2, length(L)), %% [MQTT-3.3.2-6] ok = emqtt:disconnect(Client1), @@ -137,9 +164,21 @@ t_publish_message_expiry_interval(_) -> t_subscribe_retain_handing(_) -> {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, _} = emqtt:connect(Client1), - ok = emqtt:publish(Client1, <<"topic/A">>, #{}, <<"retained message">>, [{qos, 0}, {retain, true}]), - {ok, _} = emqtt:publish(Client1, <<"topic/B">>, #{}, <<"retained message">>, [{qos, 1}, {retain, true}]), - {ok, _} = emqtt:publish(Client1, <<"topic/C">>, #{}, <<"retained message">>, [{qos, 2}, {retain, true}]), + ok = emqtt:publish( + Client1, <<"topic/A">>, #{}, + <<"retained message">>, + [{qos, 0}, {retain, true}] + ), + {ok, _} = emqtt:publish( + Client1, <<"topic/B">>, #{}, + <<"retained message">>, + [{qos, 1}, {retain, true}] + ), + {ok, _} = emqtt:publish( + Client1, <<"topic/C">>, #{}, + <<"retained message">>, + [{qos, 2}, {retain, true}] + ), {ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{<<"topic/+">>, [{rh, 1}, {qos, 2}]}]), ?assertEqual(3, length(receive_messages(3))), %% [MQTT-3.3.1-10] diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs.erl b/apps/emqx_slow_subs/src/emqx_slow_subs.erl index 370f250fb..027b304b1 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs.erl @@ -104,7 +104,8 @@ on_stats_update(#{clientid := ClientId, case ets:lookup(?TOPK_TAB, LastIndex) of [#top_k{index = Index}] -> %% if last value == the new value, update the type and last_update_time - %% XXX for clients whose latency are stable for a long time, is it possible to reduce updates? + %% XXX for clients whose latency are stable for a long time, is it + %% possible to reduce updates? ets:insert(?TOPK_TAB, #top_k{index = Index, type = Type, last_update_time = Ts}); [_] -> diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl b/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl index 5ad7e207b..ee2016268 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs_api.erl @@ -72,11 +72,16 @@ schema("/slow_subscriptions/settings") -> }. fields(record) -> - [ - {clientid, mk(string(), #{desc => <<"the clientid">>})}, - {latency, mk(integer(), #{desc => <<"average time for message delivery or time for message expire">>})}, - {type, mk(string(), #{desc => <<"type of the latency, could be average or expire">>})}, - {last_update_time, mk(integer(), #{desc => <<"the timestamp of last update">>})} + [ {clientid, + mk(string(), #{desc => <<"the clientid">>})}, + {latency, + mk(integer(), + #{desc => <<"average time for message delivery or time for message expire">>})}, + {type, + mk(string(), + #{desc => <<"type of the latency, could be average or expire">>})}, + {last_update_time, + mk(integer(), #{desc => <<"the timestamp of last update">>})} ]. conf_schema() -> diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl b/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl index 4a802eb4c..2eca0e730 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl @@ -23,8 +23,8 @@ fields("slow_subs") -> , {notice_interval, sc(emqx_schema:duration_ms(), "0s", - "The interval for pushing statistics table records to the system topic. When set to 0, push is disabled" - "publish topk list to $SYS/brokers/${node}/slow_subs per notice_interval" + "The interval for pushing statistics table records to the system topic. " + "publish topk list to $SYS/brokers/${node}/slow_subs per notice_interval. " "publish is disabled if set to 0s." )} , {notice_qos, From 6e0a2485552738623668f444ccff65d502f9245c Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 12 Jan 2022 19:04:08 +0800 Subject: [PATCH 7/7] chore: add namespace for _schema.erl module --- apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl | 5 ++++- apps/emqx_retainer/src/emqx_retainer_schema.erl | 4 +++- apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl | 4 +++- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl index 91936c8f0..5bee4ed93 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_limiter_schema.erl @@ -19,7 +19,8 @@ -include_lib("typerefl/include/types.hrl"). -export([ roots/0, fields/1, to_rate/1, to_capacity/1 - , minimum_period/0, to_burst_rate/1, to_initial/1]). + , minimum_period/0, to_burst_rate/1, to_initial/1 + , namespace/0]). -define(KILOBYTE, 1024). @@ -56,6 +57,8 @@ -import(emqx_schema, [sc/2, map/2]). +namespace() -> limiter. + roots() -> [limiter]. fields(limiter) -> diff --git a/apps/emqx_retainer/src/emqx_retainer_schema.erl b/apps/emqx_retainer/src/emqx_retainer_schema.erl index 308fc528f..f4c1e54e5 100644 --- a/apps/emqx_retainer/src/emqx_retainer_schema.erl +++ b/apps/emqx_retainer/src/emqx_retainer_schema.erl @@ -2,10 +2,12 @@ -include_lib("typerefl/include/types.hrl"). --export([roots/0, fields/1]). +-export([roots/0, fields/1, namespace/0]). -define(TYPE(Type), hoconsc:mk(Type)). +namespace() -> "retainer". + roots() -> ["retainer"]. fields("retainer") -> diff --git a/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl b/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl index 2eca0e730..7a1b57d0b 100644 --- a/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl +++ b/apps/emqx_slow_subs/src/emqx_slow_subs_schema.erl @@ -2,7 +2,9 @@ -include_lib("typerefl/include/types.hrl"). --export([roots/0, fields/1]). +-export([roots/0, fields/1, namespace/0]). + +namespace() -> "slow_subs". roots() -> ["slow_subs"].