From 077ee3853081cd1339bd7e4813977142ee147f3f Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Fri, 5 Jul 2024 20:13:38 +0300 Subject: [PATCH] feat(queue): add config --- apps/emqx/test/emqx_cth_suite.erl | 2 + .../src/emqx_ds_shared_sub_app.erl | 2 + .../src/emqx_ds_shared_sub_config.erl | 84 +++++++++++++++++++ .../src/emqx_ds_shared_sub_config.hrl | 5 ++ .../src/emqx_ds_shared_sub_group_sm.erl | 45 +++++----- .../src/emqx_ds_shared_sub_leader.erl | 37 ++++---- .../src/emqx_ds_shared_sub_schema.erl | 57 +++++++++++++ .../test/emqx_ds_shared_sub_config_SUITE.erl | 62 ++++++++++++++ .../src/emqx_enterprise_schema.erl | 3 +- 9 files changed, 256 insertions(+), 41 deletions(-) create mode 100644 apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_config.erl create mode 100644 apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_config.hrl create mode 100644 apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_schema.erl create mode 100644 apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_config_SUITE.erl diff --git a/apps/emqx/test/emqx_cth_suite.erl b/apps/emqx/test/emqx_cth_suite.erl index 8e7c84580..c0e3430db 100644 --- a/apps/emqx/test/emqx_cth_suite.erl +++ b/apps/emqx/test/emqx_cth_suite.erl @@ -391,6 +391,8 @@ default_appspec(emqx_schema_validation, _SuiteOpts) -> #{schema_mod => emqx_schema_validation_schema, config => #{}}; default_appspec(emqx_message_transformation, _SuiteOpts) -> #{schema_mod => emqx_message_transformation_schema, config => #{}}; +default_appspec(emqx_ds_shared_sub, _SuiteOpts) -> + #{schema_mod => emqx_ds_shared_sub_schema, config => #{}}; default_appspec(_, _) -> #{}. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_app.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_app.erl index 5c2d8d964..80e728a80 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_app.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_app.erl @@ -15,9 +15,11 @@ -spec start(application:start_type(), term()) -> {ok, pid()}. start(_Type, _Args) -> + ok = emqx_ds_shared_sub_config:load(), {ok, Sup} = emqx_ds_shared_sub_sup:start_link(), {ok, Sup}. -spec stop(term()) -> ok. stop(_State) -> + ok = emqx_ds_shared_sub_config:unload(), ok. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_config.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_config.erl new file mode 100644 index 000000000..454e2b6e8 --- /dev/null +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_config.erl @@ -0,0 +1,84 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ds_shared_sub_config). + +-behaviour(emqx_config_handler). +-behaviour(emqx_config_backup). + +-type update_request() :: emqx_config:config(). + +%% callbacks for emqx_config_handler +-export([ + pre_config_update/3, + post_config_update/5 +]). + +%% callbacks for emqx_config_backup +-export([ + import_config/1 +]). + +%% API +-export([ + load/0, + unload/0, + get/1 +]). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +-spec load() -> ok. +load() -> + emqx_conf:add_handler([durable_queues], ?MODULE). + +-spec unload() -> ok. +unload() -> + ok = emqx_conf:remove_handler([durable_queues]). + +-spec get(atom() | [atom()]) -> term(). +get(Name) when is_atom(Name) -> + emqx_config:get([durable_queues, Name]); +get(Name) when is_list(Name) -> + emqx_config:get([durable_queues | Name]). + +%%-------------------------------------------------------------------- +%% emqx_config_handler callbacks +%%-------------------------------------------------------------------- + +-spec pre_config_update(list(atom()), update_request(), emqx_config:raw_config()) -> + {ok, emqx_config:update_request()}. +pre_config_update([durable_queues | _], NewConfig, _OldConfig) -> + {ok, NewConfig}. + +-spec post_config_update( + list(atom()), + update_request(), + emqx_config:config(), + emqx_config:config(), + emqx_config:app_envs() +) -> + ok. +post_config_update([durable_queues | _], _Req, _NewConfig, _OldConfig, _AppEnvs) -> + ok. + +%%---------------------------------------------------------------------------------------- +%% Data backup +%%---------------------------------------------------------------------------------------- + +import_config(#{<<"durable_queues">> := DQConf}) -> + OldDQConf = emqx:get_raw_config([durable_queues], #{}), + NewDQConf = maps:merge(OldDQConf, DQConf), + case emqx_conf:update([durable_queues], NewDQConf, #{override_to => cluster}) of + {ok, #{raw_config := NewRawConf}} -> + Changed = maps:get(changed, emqx_utils_maps:diff_maps(NewRawConf, DQConf)), + ChangedPaths = [[durable_queues, K] || K <- maps:keys(Changed)], + {ok, #{root_key => durable_queues, changed => ChangedPaths}}; + Error -> + {error, #{root_key => durable_queues, reason => Error}} + end; +import_config(_) -> + {ok, #{root_key => durable_queues, changed => []}}. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_config.hrl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_config.hrl new file mode 100644 index 000000000..592a60643 --- /dev/null +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_config.hrl @@ -0,0 +1,5 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-define(dq_config(Path), emqx_ds_shared_sub_config:get(Path)). diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl index f9a81bbb8..81bca367a 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_group_sm.erl @@ -10,6 +10,7 @@ -module(emqx_ds_shared_sub_group_sm). -include_lib("emqx/include/logger.hrl"). +-include("emqx_ds_shared_sub_config.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -export([ @@ -118,16 +119,6 @@ state_timers => #{timer_name() => timer()} }. -%%----------------------------------------------------------------------- -%% Constants -%%----------------------------------------------------------------------- - -%% TODO https://emqx.atlassian.net/browse/EMQX-12574 -%% Move to settings --define(FIND_LEADER_TIMEOUT, 1000). --define(RENEW_LEASE_TIMEOUT, 5000). --define(MIN_UPDATE_STREAM_STATE_INTERVAL, 500). - %%----------------------------------------------------------------------- %% API %%----------------------------------------------------------------------- @@ -200,7 +191,7 @@ handle_connecting(#{agent := Agent, topic_filter := ShareTopicFilter} = GSM) -> topic_filter => ShareTopicFilter }), ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, agent_metadata(GSM), ShareTopicFilter), - ensure_state_timeout(GSM, find_leader_timeout, ?FIND_LEADER_TIMEOUT). + ensure_state_timeout(GSM, find_leader_timeout, ?dq_config(session_find_leader_timeout_ms)). handle_leader_lease_streams( #{state := ?connecting, topic_filter := TopicFilter} = GSM0, Leader, StreamProgresses, Version @@ -228,16 +219,20 @@ handle_find_leader_timeout(#{agent := Agent, topic_filter := TopicFilter} = GSM0 topic_filter => TopicFilter }), ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, agent_metadata(GSM0), TopicFilter), - GSM1 = ensure_state_timeout(GSM0, find_leader_timeout, ?FIND_LEADER_TIMEOUT), + GSM1 = ensure_state_timeout( + GSM0, find_leader_timeout, ?dq_config(session_find_leader_timeout_ms) + ), GSM1. %%----------------------------------------------------------------------- %% Replaying state handle_replaying(GSM0) -> - GSM1 = ensure_state_timeout(GSM0, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT), + GSM1 = ensure_state_timeout( + GSM0, renew_lease_timeout, ?dq_config(session_renew_lease_timeout_ms) + ), GSM2 = ensure_state_timeout( - GSM1, update_stream_state_timeout, ?MIN_UPDATE_STREAM_STATE_INTERVAL + GSM1, update_stream_state_timeout, ?dq_config(session_min_update_stream_state_interval_ms) ), GSM2. @@ -249,9 +244,11 @@ handle_renew_lease_timeout(#{agent := Agent, topic_filter := TopicFilter} = GSM) %% Updating state handle_updating(GSM0) -> - GSM1 = ensure_state_timeout(GSM0, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT), + GSM1 = ensure_state_timeout( + GSM0, renew_lease_timeout, ?dq_config(session_renew_lease_timeout_ms) + ), GSM2 = ensure_state_timeout( - GSM1, update_stream_state_timeout, ?MIN_UPDATE_STREAM_STATE_INTERVAL + GSM1, update_stream_state_timeout, ?dq_config(session_min_update_stream_state_interval_ms) ), GSM2. @@ -332,7 +329,7 @@ handle_leader_update_streams( VersionNew, _StreamProgresses ) -> - ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT); + ensure_state_timeout(GSM, renew_lease_timeout, ?dq_config(session_renew_lease_timeout_ms)); handle_leader_update_streams( #{state := ?disconnected} = GSM, _VersionOld, _VersionNew, _StreamProgresses ) -> @@ -349,7 +346,7 @@ handle_leader_update_streams(GSM, VersionOld, VersionNew, _StreamProgresses) -> handle_leader_renew_stream_lease( #{state := ?replaying, state_data := #{version := Version}} = GSM, Version ) -> - ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT); + ensure_state_timeout(GSM, renew_lease_timeout, ?dq_config(session_renew_lease_timeout_ms)); handle_leader_renew_stream_lease( #{state := ?updating, state_data := #{version := Version} = StateData} = GSM, Version ) -> @@ -364,13 +361,13 @@ handle_leader_renew_stream_lease(GSM, _Version) -> handle_leader_renew_stream_lease( #{state := ?replaying, state_data := #{version := Version}} = GSM, VersionOld, VersionNew ) when VersionOld =:= Version orelse VersionNew =:= Version -> - ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT); + ensure_state_timeout(GSM, renew_lease_timeout, ?dq_config(session_renew_lease_timeout_ms)); handle_leader_renew_stream_lease( #{state := ?updating, state_data := #{version := VersionNew, prev_version := VersionOld}} = GSM, VersionOld, VersionNew ) -> - ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT); + ensure_state_timeout(GSM, renew_lease_timeout, ?dq_config(session_renew_lease_timeout_ms)); handle_leader_renew_stream_lease( #{state := ?disconnected} = GSM, _VersionOld, _VersionNew ) -> @@ -402,7 +399,9 @@ handle_stream_progress( ok = emqx_ds_shared_sub_proto:agent_update_stream_states( Leader, Agent, StreamProgresses, Version ), - ensure_state_timeout(GSM, update_stream_state_timeout, ?MIN_UPDATE_STREAM_STATE_INTERVAL); + ensure_state_timeout( + GSM, update_stream_state_timeout, ?dq_config(session_min_update_stream_state_interval_ms) + ); handle_stream_progress( #{ state := ?updating, @@ -418,7 +417,9 @@ handle_stream_progress( ok = emqx_ds_shared_sub_proto:agent_update_stream_states( Leader, Agent, StreamProgresses, PrevVersion, Version ), - ensure_state_timeout(GSM, update_stream_state_timeout, ?MIN_UPDATE_STREAM_STATE_INTERVAL); + ensure_state_timeout( + GSM, update_stream_state_timeout, ?dq_config(session_min_update_stream_state_interval_ms) + ); handle_stream_progress(#{state := ?disconnected} = GSM, _StreamProgresses) -> GSM. diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl index de277ece8..143eed1fe 100644 --- a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_leader.erl @@ -7,6 +7,7 @@ -behaviour(gen_statem). -include("emqx_ds_shared_sub_proto.hrl"). +-include("emqx_ds_shared_sub_config.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -102,15 +103,6 @@ %% Constants -%% TODO https://emqx.atlassian.net/browse/EMQX-12574 -%% Move to settings --define(RENEW_LEASE_INTERVAL, 1000). --define(RENEW_STREAMS_INTERVAL, 1000). --define(DROP_TIMEOUT_INTERVAL, 1000). - --define(AGENT_TIMEOUT, 5000). --define(MAX_NOT_REPLAYING, 5000). - -define(START_TIME_THRESHOLD, 5000). %%-------------------------------------------------------------------- @@ -176,8 +168,8 @@ handle_event(enter, _OldState, ?leader_active, #{topic := Topic, router_id := Ro ok = emqx_persistent_session_ds_router:do_add_route(Topic, RouterId), {keep_state_and_data, [ {{timeout, #renew_streams{}}, 0, #renew_streams{}}, - {{timeout, #renew_leases{}}, ?RENEW_LEASE_INTERVAL, #renew_leases{}}, - {{timeout, #drop_timeout{}}, ?DROP_TIMEOUT_INTERVAL, #drop_timeout{}} + {{timeout, #renew_leases{}}, ?dq_config(leader_renew_lease_interval_ms), #renew_leases{}}, + {{timeout, #drop_timeout{}}, ?dq_config(leader_drop_timeout_interval_ms), #drop_timeout{}} ]}; %%-------------------------------------------------------------------- %% timers @@ -185,17 +177,24 @@ handle_event(enter, _OldState, ?leader_active, #{topic := Topic, router_id := Ro handle_event({timeout, #renew_streams{}}, #renew_streams{}, ?leader_active, Data0) -> % ?tp(warning, shared_sub_leader_timeout, #{timeout => renew_streams}), Data1 = renew_streams(Data0), - {keep_state, Data1, {{timeout, #renew_streams{}}, ?RENEW_STREAMS_INTERVAL, #renew_streams{}}}; + {keep_state, Data1, + { + {timeout, #renew_streams{}}, + ?dq_config(leader_renew_streams_interval_ms), + #renew_streams{} + }}; %% renew_leases timer handle_event({timeout, #renew_leases{}}, #renew_leases{}, ?leader_active, Data0) -> % ?tp(warning, shared_sub_leader_timeout, #{timeout => renew_leases}), Data1 = renew_leases(Data0), - {keep_state, Data1, {{timeout, #renew_leases{}}, ?RENEW_LEASE_INTERVAL, #renew_leases{}}}; + {keep_state, Data1, + {{timeout, #renew_leases{}}, ?dq_config(leader_renew_lease_interval_ms), #renew_leases{}}}; %% drop_timeout timer handle_event({timeout, #drop_timeout{}}, #drop_timeout{}, ?leader_active, Data0) -> % ?tp(warning, shared_sub_leader_timeout, #{timeout => drop_timeout}), Data1 = drop_timeout_agents(Data0), - {keep_state, Data1, {{timeout, #drop_timeout{}}, ?DROP_TIMEOUT_INTERVAL, #drop_timeout{}}}; + {keep_state, Data1, + {{timeout, #drop_timeout{}}, ?dq_config(leader_drop_timeout_interval_ms), #drop_timeout{}}}; %%-------------------------------------------------------------------- %% agent events handle_event( @@ -860,7 +859,7 @@ agent_transition_to_initial_waiting_replaying( prev_version => undefined, streams => InitialStreams, revoked_streams => [], - update_deadline => now_ms_monotonic() + ?AGENT_TIMEOUT + update_deadline => now_ms_monotonic() + ?dq_config(leader_session_update_timeout_ms) }, renew_no_replaying_deadline(AgentState). @@ -900,13 +899,15 @@ now_ms_monotonic() -> renew_no_replaying_deadline(#{not_replaying_deadline := undefined} = AgentState) -> AgentState#{ - not_replaying_deadline => now_ms_monotonic() + ?MAX_NOT_REPLAYING + not_replaying_deadline => now_ms_monotonic() + + ?dq_config(leader_session_not_replaying_timeout_ms) }; renew_no_replaying_deadline(#{not_replaying_deadline := _Deadline} = AgentState) -> AgentState; renew_no_replaying_deadline(#{} = AgentState) -> AgentState#{ - not_replaying_deadline => now_ms_monotonic() + ?MAX_NOT_REPLAYING + not_replaying_deadline => now_ms_monotonic() + + ?dq_config(leader_session_not_replaying_timeout_ms) }. unassigned_streams(#{stream_states := StreamStates, stream_owners := StreamOwners}) -> @@ -991,7 +992,7 @@ set_agent_state(#{agents := Agents} = Data, Agent, AgentState) -> update_agent_timeout(AgentState) -> AgentState#{ - update_deadline => now_ms_monotonic() + ?AGENT_TIMEOUT + update_deadline => now_ms_monotonic() + ?dq_config(leader_session_update_timeout_ms) }. get_agent_state(#{agents := Agents} = _Data, Agent) -> diff --git a/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_schema.erl b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_schema.erl new file mode 100644 index 000000000..198554d8a --- /dev/null +++ b/apps/emqx_ds_shared_sub/src/emqx_ds_shared_sub_schema.erl @@ -0,0 +1,57 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ds_shared_sub_schema). + +-include_lib("hocon/include/hoconsc.hrl"). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +namespace() -> emqx_shared_subs. + +roots() -> + [ + durable_queues + ]. + +fields(durable_queues) -> + [ + {enable, + ?HOCON( + boolean(), + #{ + required => false, + default => true, + desc => ?DESC(durable_queues) + } + )}, + duration(session_find_leader_timeout_ms, 1000), + duration(session_renew_lease_timeout_ms, 5000), + duration(session_min_update_stream_state_interval_ms, 500), + + duration(leader_renew_lease_interval_ms, 1000), + duration(leader_renew_streams_interval_ms, 1000), + duration(leader_drop_timeout_interval_ms, 1000), + duration(leader_session_update_timeout_ms, 5000), + duration(leader_session_not_replaying_timeout_ms, 5000) + ]. + +duration(MsFieldName, Default) -> + {MsFieldName, + ?HOCON( + emqx_schema:timeout_duration_ms(), + #{ + required => false, + default => Default, + desc => ?DESC(MsFieldName), + importance => ?IMPORTANCE_HIDDEN + } + )}. + +desc(durable_queues) -> "Settings for durable queues". diff --git a/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_config_SUITE.erl b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_config_SUITE.erl new file mode 100644 index 000000000..a3d58ebf9 --- /dev/null +++ b/apps/emqx_ds_shared_sub/test/emqx_ds_shared_sub_config_SUITE.erl @@ -0,0 +1,62 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_ds_shared_sub_config_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-include_lib("emqx/include/asserts.hrl"). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + Apps = emqx_cth_suite:start( + [ + emqx_conf, + {emqx, #{ + config => #{ + <<"durable_sessions">> => #{ + <<"enable">> => true, + <<"renew_streams_interval">> => "100ms" + }, + <<"durable_storage">> => #{ + <<"messages">> => #{ + <<"backend">> => <<"builtin_raft">> + } + } + } + }}, + {emqx_ds_shared_sub, #{ + config => #{ + <<"durable_queues">> => #{ + <<"enable">> => true, + <<"session_find_leader_timeout_ms">> => "1200ms" + } + } + }} + ], + #{work_dir => ?config(priv_dir, Config)} + ), + [{apps, Apps} | Config]. + +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(apps, Config)), + ok. + +t_update_config(_Config) -> + ?assertEqual( + 1200, + emqx_ds_shared_sub_config:get(session_find_leader_timeout_ms) + ), + + {ok, _} = emqx_conf:update([durable_queues], #{session_find_leader_timeout_ms => 2000}, #{}), + ?assertEqual( + 2000, + emqx_ds_shared_sub_config:get(session_find_leader_timeout_ms) + ). diff --git a/apps/emqx_enterprise/src/emqx_enterprise_schema.erl b/apps/emqx_enterprise/src/emqx_enterprise_schema.erl index eeafe340a..eb43f67af 100644 --- a/apps/emqx_enterprise/src/emqx_enterprise_schema.erl +++ b/apps/emqx_enterprise/src/emqx_enterprise_schema.erl @@ -18,7 +18,8 @@ emqx_schema_registry_schema, emqx_schema_validation_schema, emqx_message_transformation_schema, - emqx_ft_schema + emqx_ft_schema, + emqx_ds_shared_sub_schema ]). %% Callback to upgrade config after loaded from config file but before validation.