feat(queue): add config

This commit is contained in:
Ilya Averyanov 2024-07-05 20:13:38 +03:00
parent b74189570d
commit 077ee38530
9 changed files with 256 additions and 41 deletions

View File

@ -391,6 +391,8 @@ default_appspec(emqx_schema_validation, _SuiteOpts) ->
#{schema_mod => emqx_schema_validation_schema, config => #{}};
default_appspec(emqx_message_transformation, _SuiteOpts) ->
#{schema_mod => emqx_message_transformation_schema, config => #{}};
default_appspec(emqx_ds_shared_sub, _SuiteOpts) ->
#{schema_mod => emqx_ds_shared_sub_schema, config => #{}};
default_appspec(_, _) ->
#{}.

View File

@ -15,9 +15,11 @@
-spec start(application:start_type(), term()) -> {ok, pid()}.
start(_Type, _Args) ->
ok = emqx_ds_shared_sub_config:load(),
{ok, Sup} = emqx_ds_shared_sub_sup:start_link(),
{ok, Sup}.
-spec stop(term()) -> ok.
stop(_State) ->
ok = emqx_ds_shared_sub_config:unload(),
ok.

View File

@ -0,0 +1,84 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ds_shared_sub_config).
-behaviour(emqx_config_handler).
-behaviour(emqx_config_backup).
-type update_request() :: emqx_config:config().
%% callbacks for emqx_config_handler
-export([
pre_config_update/3,
post_config_update/5
]).
%% callbacks for emqx_config_backup
-export([
import_config/1
]).
%% API
-export([
load/0,
unload/0,
get/1
]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec load() -> ok.
load() ->
emqx_conf:add_handler([durable_queues], ?MODULE).
-spec unload() -> ok.
unload() ->
ok = emqx_conf:remove_handler([durable_queues]).
-spec get(atom() | [atom()]) -> term().
get(Name) when is_atom(Name) ->
emqx_config:get([durable_queues, Name]);
get(Name) when is_list(Name) ->
emqx_config:get([durable_queues | Name]).
%%--------------------------------------------------------------------
%% emqx_config_handler callbacks
%%--------------------------------------------------------------------
-spec pre_config_update(list(atom()), update_request(), emqx_config:raw_config()) ->
{ok, emqx_config:update_request()}.
pre_config_update([durable_queues | _], NewConfig, _OldConfig) ->
{ok, NewConfig}.
-spec post_config_update(
list(atom()),
update_request(),
emqx_config:config(),
emqx_config:config(),
emqx_config:app_envs()
) ->
ok.
post_config_update([durable_queues | _], _Req, _NewConfig, _OldConfig, _AppEnvs) ->
ok.
%%----------------------------------------------------------------------------------------
%% Data backup
%%----------------------------------------------------------------------------------------
import_config(#{<<"durable_queues">> := DQConf}) ->
OldDQConf = emqx:get_raw_config([durable_queues], #{}),
NewDQConf = maps:merge(OldDQConf, DQConf),
case emqx_conf:update([durable_queues], NewDQConf, #{override_to => cluster}) of
{ok, #{raw_config := NewRawConf}} ->
Changed = maps:get(changed, emqx_utils_maps:diff_maps(NewRawConf, DQConf)),
ChangedPaths = [[durable_queues, K] || K <- maps:keys(Changed)],
{ok, #{root_key => durable_queues, changed => ChangedPaths}};
Error ->
{error, #{root_key => durable_queues, reason => Error}}
end;
import_config(_) ->
{ok, #{root_key => durable_queues, changed => []}}.

View File

@ -0,0 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-define(dq_config(Path), emqx_ds_shared_sub_config:get(Path)).

View File

@ -10,6 +10,7 @@
-module(emqx_ds_shared_sub_group_sm).
-include_lib("emqx/include/logger.hrl").
-include("emqx_ds_shared_sub_config.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-export([
@ -118,16 +119,6 @@
state_timers => #{timer_name() => timer()}
}.
%%-----------------------------------------------------------------------
%% Constants
%%-----------------------------------------------------------------------
%% TODO https://emqx.atlassian.net/browse/EMQX-12574
%% Move to settings
-define(FIND_LEADER_TIMEOUT, 1000).
-define(RENEW_LEASE_TIMEOUT, 5000).
-define(MIN_UPDATE_STREAM_STATE_INTERVAL, 500).
%%-----------------------------------------------------------------------
%% API
%%-----------------------------------------------------------------------
@ -200,7 +191,7 @@ handle_connecting(#{agent := Agent, topic_filter := ShareTopicFilter} = GSM) ->
topic_filter => ShareTopicFilter
}),
ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, agent_metadata(GSM), ShareTopicFilter),
ensure_state_timeout(GSM, find_leader_timeout, ?FIND_LEADER_TIMEOUT).
ensure_state_timeout(GSM, find_leader_timeout, ?dq_config(session_find_leader_timeout_ms)).
handle_leader_lease_streams(
#{state := ?connecting, topic_filter := TopicFilter} = GSM0, Leader, StreamProgresses, Version
@ -228,16 +219,20 @@ handle_find_leader_timeout(#{agent := Agent, topic_filter := TopicFilter} = GSM0
topic_filter => TopicFilter
}),
ok = emqx_ds_shared_sub_registry:lookup_leader(Agent, agent_metadata(GSM0), TopicFilter),
GSM1 = ensure_state_timeout(GSM0, find_leader_timeout, ?FIND_LEADER_TIMEOUT),
GSM1 = ensure_state_timeout(
GSM0, find_leader_timeout, ?dq_config(session_find_leader_timeout_ms)
),
GSM1.
%%-----------------------------------------------------------------------
%% Replaying state
handle_replaying(GSM0) ->
GSM1 = ensure_state_timeout(GSM0, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT),
GSM1 = ensure_state_timeout(
GSM0, renew_lease_timeout, ?dq_config(session_renew_lease_timeout_ms)
),
GSM2 = ensure_state_timeout(
GSM1, update_stream_state_timeout, ?MIN_UPDATE_STREAM_STATE_INTERVAL
GSM1, update_stream_state_timeout, ?dq_config(session_min_update_stream_state_interval_ms)
),
GSM2.
@ -249,9 +244,11 @@ handle_renew_lease_timeout(#{agent := Agent, topic_filter := TopicFilter} = GSM)
%% Updating state
handle_updating(GSM0) ->
GSM1 = ensure_state_timeout(GSM0, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT),
GSM1 = ensure_state_timeout(
GSM0, renew_lease_timeout, ?dq_config(session_renew_lease_timeout_ms)
),
GSM2 = ensure_state_timeout(
GSM1, update_stream_state_timeout, ?MIN_UPDATE_STREAM_STATE_INTERVAL
GSM1, update_stream_state_timeout, ?dq_config(session_min_update_stream_state_interval_ms)
),
GSM2.
@ -332,7 +329,7 @@ handle_leader_update_streams(
VersionNew,
_StreamProgresses
) ->
ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT);
ensure_state_timeout(GSM, renew_lease_timeout, ?dq_config(session_renew_lease_timeout_ms));
handle_leader_update_streams(
#{state := ?disconnected} = GSM, _VersionOld, _VersionNew, _StreamProgresses
) ->
@ -349,7 +346,7 @@ handle_leader_update_streams(GSM, VersionOld, VersionNew, _StreamProgresses) ->
handle_leader_renew_stream_lease(
#{state := ?replaying, state_data := #{version := Version}} = GSM, Version
) ->
ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT);
ensure_state_timeout(GSM, renew_lease_timeout, ?dq_config(session_renew_lease_timeout_ms));
handle_leader_renew_stream_lease(
#{state := ?updating, state_data := #{version := Version} = StateData} = GSM, Version
) ->
@ -364,13 +361,13 @@ handle_leader_renew_stream_lease(GSM, _Version) ->
handle_leader_renew_stream_lease(
#{state := ?replaying, state_data := #{version := Version}} = GSM, VersionOld, VersionNew
) when VersionOld =:= Version orelse VersionNew =:= Version ->
ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT);
ensure_state_timeout(GSM, renew_lease_timeout, ?dq_config(session_renew_lease_timeout_ms));
handle_leader_renew_stream_lease(
#{state := ?updating, state_data := #{version := VersionNew, prev_version := VersionOld}} = GSM,
VersionOld,
VersionNew
) ->
ensure_state_timeout(GSM, renew_lease_timeout, ?RENEW_LEASE_TIMEOUT);
ensure_state_timeout(GSM, renew_lease_timeout, ?dq_config(session_renew_lease_timeout_ms));
handle_leader_renew_stream_lease(
#{state := ?disconnected} = GSM, _VersionOld, _VersionNew
) ->
@ -402,7 +399,9 @@ handle_stream_progress(
ok = emqx_ds_shared_sub_proto:agent_update_stream_states(
Leader, Agent, StreamProgresses, Version
),
ensure_state_timeout(GSM, update_stream_state_timeout, ?MIN_UPDATE_STREAM_STATE_INTERVAL);
ensure_state_timeout(
GSM, update_stream_state_timeout, ?dq_config(session_min_update_stream_state_interval_ms)
);
handle_stream_progress(
#{
state := ?updating,
@ -418,7 +417,9 @@ handle_stream_progress(
ok = emqx_ds_shared_sub_proto:agent_update_stream_states(
Leader, Agent, StreamProgresses, PrevVersion, Version
),
ensure_state_timeout(GSM, update_stream_state_timeout, ?MIN_UPDATE_STREAM_STATE_INTERVAL);
ensure_state_timeout(
GSM, update_stream_state_timeout, ?dq_config(session_min_update_stream_state_interval_ms)
);
handle_stream_progress(#{state := ?disconnected} = GSM, _StreamProgresses) ->
GSM.

View File

@ -7,6 +7,7 @@
-behaviour(gen_statem).
-include("emqx_ds_shared_sub_proto.hrl").
-include("emqx_ds_shared_sub_config.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/logger.hrl").
@ -102,15 +103,6 @@
%% Constants
%% TODO https://emqx.atlassian.net/browse/EMQX-12574
%% Move to settings
-define(RENEW_LEASE_INTERVAL, 1000).
-define(RENEW_STREAMS_INTERVAL, 1000).
-define(DROP_TIMEOUT_INTERVAL, 1000).
-define(AGENT_TIMEOUT, 5000).
-define(MAX_NOT_REPLAYING, 5000).
-define(START_TIME_THRESHOLD, 5000).
%%--------------------------------------------------------------------
@ -176,8 +168,8 @@ handle_event(enter, _OldState, ?leader_active, #{topic := Topic, router_id := Ro
ok = emqx_persistent_session_ds_router:do_add_route(Topic, RouterId),
{keep_state_and_data, [
{{timeout, #renew_streams{}}, 0, #renew_streams{}},
{{timeout, #renew_leases{}}, ?RENEW_LEASE_INTERVAL, #renew_leases{}},
{{timeout, #drop_timeout{}}, ?DROP_TIMEOUT_INTERVAL, #drop_timeout{}}
{{timeout, #renew_leases{}}, ?dq_config(leader_renew_lease_interval_ms), #renew_leases{}},
{{timeout, #drop_timeout{}}, ?dq_config(leader_drop_timeout_interval_ms), #drop_timeout{}}
]};
%%--------------------------------------------------------------------
%% timers
@ -185,17 +177,24 @@ handle_event(enter, _OldState, ?leader_active, #{topic := Topic, router_id := Ro
handle_event({timeout, #renew_streams{}}, #renew_streams{}, ?leader_active, Data0) ->
% ?tp(warning, shared_sub_leader_timeout, #{timeout => renew_streams}),
Data1 = renew_streams(Data0),
{keep_state, Data1, {{timeout, #renew_streams{}}, ?RENEW_STREAMS_INTERVAL, #renew_streams{}}};
{keep_state, Data1,
{
{timeout, #renew_streams{}},
?dq_config(leader_renew_streams_interval_ms),
#renew_streams{}
}};
%% renew_leases timer
handle_event({timeout, #renew_leases{}}, #renew_leases{}, ?leader_active, Data0) ->
% ?tp(warning, shared_sub_leader_timeout, #{timeout => renew_leases}),
Data1 = renew_leases(Data0),
{keep_state, Data1, {{timeout, #renew_leases{}}, ?RENEW_LEASE_INTERVAL, #renew_leases{}}};
{keep_state, Data1,
{{timeout, #renew_leases{}}, ?dq_config(leader_renew_lease_interval_ms), #renew_leases{}}};
%% drop_timeout timer
handle_event({timeout, #drop_timeout{}}, #drop_timeout{}, ?leader_active, Data0) ->
% ?tp(warning, shared_sub_leader_timeout, #{timeout => drop_timeout}),
Data1 = drop_timeout_agents(Data0),
{keep_state, Data1, {{timeout, #drop_timeout{}}, ?DROP_TIMEOUT_INTERVAL, #drop_timeout{}}};
{keep_state, Data1,
{{timeout, #drop_timeout{}}, ?dq_config(leader_drop_timeout_interval_ms), #drop_timeout{}}};
%%--------------------------------------------------------------------
%% agent events
handle_event(
@ -860,7 +859,7 @@ agent_transition_to_initial_waiting_replaying(
prev_version => undefined,
streams => InitialStreams,
revoked_streams => [],
update_deadline => now_ms_monotonic() + ?AGENT_TIMEOUT
update_deadline => now_ms_monotonic() + ?dq_config(leader_session_update_timeout_ms)
},
renew_no_replaying_deadline(AgentState).
@ -900,13 +899,15 @@ now_ms_monotonic() ->
renew_no_replaying_deadline(#{not_replaying_deadline := undefined} = AgentState) ->
AgentState#{
not_replaying_deadline => now_ms_monotonic() + ?MAX_NOT_REPLAYING
not_replaying_deadline => now_ms_monotonic() +
?dq_config(leader_session_not_replaying_timeout_ms)
};
renew_no_replaying_deadline(#{not_replaying_deadline := _Deadline} = AgentState) ->
AgentState;
renew_no_replaying_deadline(#{} = AgentState) ->
AgentState#{
not_replaying_deadline => now_ms_monotonic() + ?MAX_NOT_REPLAYING
not_replaying_deadline => now_ms_monotonic() +
?dq_config(leader_session_not_replaying_timeout_ms)
}.
unassigned_streams(#{stream_states := StreamStates, stream_owners := StreamOwners}) ->
@ -991,7 +992,7 @@ set_agent_state(#{agents := Agents} = Data, Agent, AgentState) ->
update_agent_timeout(AgentState) ->
AgentState#{
update_deadline => now_ms_monotonic() + ?AGENT_TIMEOUT
update_deadline => now_ms_monotonic() + ?dq_config(leader_session_update_timeout_ms)
}.
get_agent_state(#{agents := Agents} = _Data, Agent) ->

View File

@ -0,0 +1,57 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ds_shared_sub_schema).
-include_lib("hocon/include/hoconsc.hrl").
-export([
namespace/0,
roots/0,
fields/1,
desc/1
]).
namespace() -> emqx_shared_subs.
roots() ->
[
durable_queues
].
fields(durable_queues) ->
[
{enable,
?HOCON(
boolean(),
#{
required => false,
default => true,
desc => ?DESC(durable_queues)
}
)},
duration(session_find_leader_timeout_ms, 1000),
duration(session_renew_lease_timeout_ms, 5000),
duration(session_min_update_stream_state_interval_ms, 500),
duration(leader_renew_lease_interval_ms, 1000),
duration(leader_renew_streams_interval_ms, 1000),
duration(leader_drop_timeout_interval_ms, 1000),
duration(leader_session_update_timeout_ms, 5000),
duration(leader_session_not_replaying_timeout_ms, 5000)
].
duration(MsFieldName, Default) ->
{MsFieldName,
?HOCON(
emqx_schema:timeout_duration_ms(),
#{
required => false,
default => Default,
desc => ?DESC(MsFieldName),
importance => ?IMPORTANCE_HIDDEN
}
)}.
desc(durable_queues) -> "Settings for durable queues".

View File

@ -0,0 +1,62 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ds_shared_sub_config_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx/include/asserts.hrl").
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
Apps = emqx_cth_suite:start(
[
emqx_conf,
{emqx, #{
config => #{
<<"durable_sessions">> => #{
<<"enable">> => true,
<<"renew_streams_interval">> => "100ms"
},
<<"durable_storage">> => #{
<<"messages">> => #{
<<"backend">> => <<"builtin_raft">>
}
}
}
}},
{emqx_ds_shared_sub, #{
config => #{
<<"durable_queues">> => #{
<<"enable">> => true,
<<"session_find_leader_timeout_ms">> => "1200ms"
}
}
}}
],
#{work_dir => ?config(priv_dir, Config)}
),
[{apps, Apps} | Config].
end_per_suite(Config) ->
ok = emqx_cth_suite:stop(?config(apps, Config)),
ok.
t_update_config(_Config) ->
?assertEqual(
1200,
emqx_ds_shared_sub_config:get(session_find_leader_timeout_ms)
),
{ok, _} = emqx_conf:update([durable_queues], #{session_find_leader_timeout_ms => 2000}, #{}),
?assertEqual(
2000,
emqx_ds_shared_sub_config:get(session_find_leader_timeout_ms)
).

View File

@ -18,7 +18,8 @@
emqx_schema_registry_schema,
emqx_schema_validation_schema,
emqx_message_transformation_schema,
emqx_ft_schema
emqx_ft_schema,
emqx_ds_shared_sub_schema
]).
%% Callback to upgrade config after loaded from config file but before validation.