diff --git a/src/emqx_pool.erl b/src/emqx_pool.erl index 8d927ddd9..276352797 100644 --- a/src/emqx_pool.erl +++ b/src/emqx_pool.erl @@ -23,16 +23,14 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {pool, id}). - -define(POOL, ?MODULE). %% @doc Start pooler supervisor. start_link() -> emqx_pool_sup:start_link(?POOL, random, {?MODULE, start_link, []}). -%% @doc Start pool --spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}). +%% @doc Start pool. +-spec(start_link(atom(), pos_integer()) -> emqx_types:startlink_ret()). start_link(Pool, Id) -> gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)}, ?MODULE, [Pool, Id], []). @@ -49,13 +47,13 @@ async_submit(Fun) -> worker() -> gproc_pool:pick_worker(pool). -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% gen_server callbacks -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ init([Pool, Id]) -> true = gproc_pool:connect_worker(Pool, {Pool, Id}), - {ok, #state{pool = Pool, id = Id}}. + {ok, #{pool => Pool, id => Id}}. handle_call({submit, Fun}, _From, State) -> {reply, catch run(Fun), State}; @@ -79,15 +77,15 @@ handle_info(Info, State) -> emqx_logger:error("[Pool] unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #state{pool = Pool, id = Id}) -> +terminate(_Reason, #{pool := Pool, id := Id}) -> true = gproc_pool:disconnect_worker(Pool, {Pool, Id}). code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Internal functions -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ run({M, F, A}) -> erlang:apply(M, F, A); diff --git a/src/emqx_pool_sup.erl b/src/emqx_pool_sup.erl index b71c15f1e..b371549c0 100644 --- a/src/emqx_pool_sup.erl +++ b/src/emqx_pool_sup.erl @@ -26,8 +26,12 @@ spec(Args) -> -spec(spec(any(), list()) -> supervisor:child_spec()). spec(ChildId, Args) -> - {ChildId, {?MODULE, start_link, Args}, - transient, infinity, supervisor, [?MODULE]}. + #{id => ChildId, + start => {?MODULE, start_link, Args}, + restart => transient, + shutdown => infinity, + type => supervisor, + modules => [?MODULE]}. -spec(start_link(atom() | tuple(), atom(), mfa()) -> {ok, pid()} | {error, term()}). start_link(Pool, Type, MFA) -> diff --git a/src/emqx_zone.erl b/src/emqx_zone.erl index 209f0323c..dd183dbdf 100644 --- a/src/emqx_zone.erl +++ b/src/emqx_zone.erl @@ -21,17 +21,20 @@ -export([start_link/0]). -export([get_env/2, get_env/3]). -export([set_env/3]). +-export([force_reload/0]). +%% for test +-export([stop/0]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {timer}). - -define(TAB, ?MODULE). +-define(SERVER, ?MODULE). +-spec(start_link() -> emqx_types:startlink_ret()). start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -spec(get_env(emqx_types:zone() | undefined, atom()) -> undefined | term()). get_env(undefined, Key) -> @@ -50,7 +53,15 @@ get_env(Zone, Key, Def) -> -spec(set_env(emqx_types:zone(), atom(), term()) -> ok). set_env(Zone, Key, Val) -> - gen_server:cast(?MODULE, {set_env, Zone, Key, Val}). + gen_server:cast(?SERVER, {set_env, Zone, Key, Val}). + +-spec(force_reload() -> ok). +force_reload() -> + gen_server:call(?SERVER, force_reload). + +-spec(stop() -> ok). +stop() -> + gen_server:stop(?SERVER, normal, infinity). %%------------------------------------------------------------------------------ %% gen_server callbacks @@ -58,7 +69,11 @@ set_env(Zone, Key, Val) -> init([]) -> _ = emqx_tables:new(?TAB, [set, {read_concurrency, true}]), - {ok, element(2, handle_info(reload, #state{}))}. + {ok, element(2, handle_info(reload, #{timer => undefined}))}. + +handle_call(force_reload, _From, State) -> + _ = do_reload(), + {reply, ok, State}; handle_call(Req, _From, State) -> emqx_logger:error("[Zone] unexpected call: ~p", [Req]), @@ -73,11 +88,8 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info(reload, State) -> - lists:foreach( - fun({Zone, Opts}) -> - [ets:insert(?TAB, {{Zone, Key}, Val}) || {Key, Val} <- Opts] - end, emqx_config:get_env(zones, [])), - {noreply, ensure_reload_timer(State), hibernate}; + _ = do_reload(), + {noreply, ensure_reload_timer(State#{timer := undefined}), hibernate}; handle_info(Info, State) -> emqx_logger:error("[Zone] unexpected info: ~p", [Info]), @@ -93,6 +105,12 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%------------------------------------------------------------------------------ -ensure_reload_timer(State) -> - State#state{timer = erlang:send_after(10000, self(), reload)}. +do_reload() -> + [ets:insert(?TAB, [{{Zone, Key}, Val} || {Key, Val} <- Opts]) + || {Zone, Opts} <- emqx_config:get_env(zones, [])]. + +ensure_reload_timer(State = #{timer := undefined}) -> + State#{timer := erlang:send_after(timer:minutes(5), self(), reload)}; +ensure_reload_timer(State) -> + State. diff --git a/test/emqx_zone_SUITE.erl b/test/emqx_zone_SUITE.erl index 282acc3e5..83c2ceaab 100644 --- a/test/emqx_zone_SUITE.erl +++ b/test/emqx_zone_SUITE.erl @@ -18,15 +18,21 @@ -compile(nowarn_export_all). -include("emqx_mqtt.hrl"). +-include_lib("eunit/include/eunit.hrl"). all() -> [t_set_get_env]. t_set_get_env(_) -> - emqx_zone:start_link(), - ok = emqx_zone:set_env(china, language, chinese), - timer:sleep(100), % make sure set_env/3 is okay + application:set_env(emqx, zones, [{china, [{language, chinese}]}]), + {ok, _} = emqx_zone:start_link(), + ct:print("~p~n", [ets:tab2list(emqx_zone)]), chinese = emqx_zone:get_env(china, language), cn470 = emqx_zone:get_env(china, ism_band, cn470), undefined = emqx_zone:get_env(undefined, delay), - 500 = emqx_zone:get_env(undefined, delay, 500). + 500 = emqx_zone:get_env(undefined, delay, 500), + application:set_env(emqx, zones, [{zone1, [{key, val}]}]), + ?assertEqual(undefined, emqx_zone:get_env(zone1, key)), + emqx_zone:force_reload(), + ?assertEqual(val, emqx_zone:get_env(zone1, key)), + emqx_zone:stop().