diff --git a/src/emqx_zone.erl b/src/emqx_zone.erl index 90d56eb0e..386b71af7 100644 --- a/src/emqx_zone.erl +++ b/src/emqx_zone.erl @@ -25,27 +25,58 @@ -logger_header("[Zone]"). +-compile({inline, + [ idle_timeout/1 + , publish_limit/1 + , mqtt_frame_options/1 + , mqtt_strict_mode/1 + , max_packet_size/1 + , mountpoint/1 + , use_username_as_clientid/1 + , stats_timer/1 + , enable_stats/1 + , enable_acl/1 + , enable_ban/1 + , enable_flapping_detect/1 + , ignore_loop_deliver/1 + , server_keepalive/1 + , keepalive_backoff/1 + , max_inflight/1 + , session_expiry_interval/1 + , force_gc_policy/1 + , force_shutdown_policy/1 + ]}). + %% APIs -export([start_link/0, stop/0]). --export([ frame_options/1 +%% Zone Option API +-export([ idle_timeout/1 + , publish_limit/1 + , mqtt_frame_options/1 , mqtt_strict_mode/1 , max_packet_size/1 + , mountpoint/1 , use_username_as_clientid/1 + , stats_timer/1 , enable_stats/1 , enable_acl/1 , enable_ban/1 , enable_flapping_detect/1 , ignore_loop_deliver/1 , server_keepalive/1 + , keepalive_backoff/1 , max_inflight/1 , session_expiry_interval/1 , force_gc_policy/1 , force_shutdown_policy/1 ]). --export([check_oom/2]). +-export([ init_gc_state/1 + , oom_policy/1 + ]). +%% Zone API -export([ get_env/2 , get_env/3 , set_env/3 @@ -63,27 +94,46 @@ , code_change/3 ]). --export_type([zone/0]). +-import(emqx_misc, [maybe_apply/2]). -%% dummy state --record(state, {}). +-export_type([zone/0]). -type(zone() :: atom()). -define(TAB, ?MODULE). -define(SERVER, ?MODULE). +-define(DEFAULT_IDLE_TIMEOUT, 30000). -define(KEY(Zone, Key), {?MODULE, Zone, Key}). -%%-------------------------------------------------------------------- -%% APIs -%%-------------------------------------------------------------------- - -spec(start_link() -> startlink_ret()). start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). --spec(frame_options(zone()) -> emqx_frame:options()). -frame_options(Zone) -> +-spec(stop() -> ok). +stop() -> + gen_server:stop(?SERVER). + +-spec(init_gc_state(zone()) -> emqx_gc:gc_state()). +init_gc_state(Zone) -> + maybe_apply(fun emqx_gc:init/1, force_gc_policy(Zone)). + +-spec(oom_policy(zone()) -> emqx_types:oom_policy()). +oom_policy(Zone) -> force_shutdown_policy(Zone). + +%%-------------------------------------------------------------------- +%% Zone Options API +%%-------------------------------------------------------------------- + +-spec(idle_timeout(zone()) -> pos_integer()). +idle_timeout(Zone) -> + get_env(Zone, idle_timeout, ?DEFAULT_IDLE_TIMEOUT). + +-spec(publish_limit(zone()) -> maybe(esockd_rate_limit:config())). +publish_limit(Zone) -> + get_env(Zone, publish_limit). + +-spec(mqtt_frame_options(zone()) -> emqx_frame:options()). +mqtt_frame_options(Zone) -> #{strict_mode => mqtt_strict_mode(Zone), max_size => max_packet_size(Zone) }. @@ -96,10 +146,17 @@ mqtt_strict_mode(Zone) -> max_packet_size(Zone) -> get_env(Zone, max_packet_size, ?MAX_PACKET_SIZE). +-spec(mountpoint(zone()) -> maybe(emqx_mountpoint:mountpoint())). +mountpoint(Zone) -> get_env(Zone, mountpoint). + -spec(use_username_as_clientid(zone()) -> boolean()). use_username_as_clientid(Zone) -> get_env(Zone, use_username_as_clientid, false). +-spec(stats_timer(zone()) -> undefined | disabled). +stats_timer(Zone) -> + case enable_stats(Zone) of true -> undefined; false -> disabled end. + -spec(enable_stats(zone()) -> boolean()). enable_stats(Zone) -> get_env(Zone, enable_stats, true). @@ -124,6 +181,10 @@ ignore_loop_deliver(Zone) -> server_keepalive(Zone) -> get_env(Zone, server_keepalive). +-spec(keepalive_backoff(zone()) -> float()). +keepalive_backoff(Zone) -> + get_env(Zone, keepalive_backoff, 0.75). + -spec(max_inflight(zone()) -> 0..65535). max_inflight(Zone) -> get_env(Zone, max_inflight, 65535). @@ -140,18 +201,9 @@ force_gc_policy(Zone) -> force_shutdown_policy(Zone) -> get_env(Zone, force_shutdown_policy). --spec(check_oom(zone(), fun()) -> ok | term()). -check_oom(Zone, Action) -> - case emqx_zone:force_shutdown_policy(Zone) of - undefined -> ok; - Policy -> do_check_oom(emqx_oom:init(Policy), Action) - end. - -do_check_oom(OomPolicy, Action) -> - case emqx_oom:check(OomPolicy) of - ok -> ok; - Shutdown -> Action(Shutdown) - end. +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- -spec(get_env(maybe(zone()), atom()) -> maybe(term())). get_env(undefined, Key) -> emqx:get_env(Key); @@ -179,17 +231,13 @@ unset_env(Zone, Key) -> force_reload() -> gen_server:call(?SERVER, force_reload). --spec(stop() -> ok). -stop() -> - gen_server:stop(?SERVER, normal, infinity). - %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- init([]) -> _ = do_reload(), - {ok, #state{}}. + {ok, #{}}. handle_call(force_reload, _From, State) -> _ = do_reload(),