From 8886d607203e57e9cb0a6eb712f5c10ddeb06902 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 26 Aug 2021 14:45:56 +0800 Subject: [PATCH] refactor(gw): change the on_gateway_update/3 callback params --- .../src/bhvrs/emqx_gateway_impl.erl | 4 +- apps/emqx_gateway/src/coap/emqx_coap_impl.erl | 8 +-- apps/emqx_gateway/src/emqx_gateway.erl | 3 +- apps/emqx_gateway/src/emqx_gateway_gw_sup.erl | 9 +-- .../src/emqx_gateway_insta_sup.erl | 66 +++++++------------ apps/emqx_gateway/src/emqx_gateway_schema.erl | 4 -- apps/emqx_gateway/src/emqx_gateway_sup.erl | 8 +-- .../src/exproto/emqx_exproto_impl.erl | 8 +-- apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl | 14 ++-- .../src/stomp/emqx_stomp_impl.erl | 8 +-- 10 files changed, 55 insertions(+), 77 deletions(-) diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_impl.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_impl.erl index 6906043d9..ac3289dfa 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_impl.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_impl.erl @@ -30,8 +30,8 @@ | {ok, [Childspec :: supervisor:child_spec()], GwState :: state()}. %% @doc --callback on_gateway_update(NewGateway :: gateway(), - OldGateway :: gateway(), +-callback on_gateway_update(Config :: emqx_config:config(), + Gateway :: gateway(), GwState :: state()) -> ok | {ok, [ChildPid :: pid()], NGwState :: state()} diff --git a/apps/emqx_gateway/src/coap/emqx_coap_impl.erl b/apps/emqx_gateway/src/coap/emqx_coap_impl.erl index aacb98546..da2f2b8e9 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_impl.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_impl.erl @@ -58,13 +58,13 @@ on_gateway_load(_Gateway = #{name := GwName, {ok, ListenerPids, #{ctx => Ctx}}. -on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) -> - GwName = maps:get(name, NewGateway), +on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) -> + GwName = maps:get(name, Gateway), try %% XXX: 1. How hot-upgrade the changes ??? %% XXX: 2. Check the New confs first before destroy old instance ??? - on_gateway_unload(OldGateway, GwState), - on_gateway_load(NewGateway, Ctx) + on_gateway_unload(Gateway, GwState), + on_gateway_load(Gateway#{config => Config}, Ctx) catch Class : Reason : Stk -> logger:error("Failed to update ~s; " diff --git a/apps/emqx_gateway/src/emqx_gateway.erl b/apps/emqx_gateway/src/emqx_gateway.erl index d8b4125ce..aab0dab55 100644 --- a/apps/emqx_gateway/src/emqx_gateway.erl +++ b/apps/emqx_gateway/src/emqx_gateway.erl @@ -63,8 +63,7 @@ lookup(Name) -> -spec update(gateway_name(), emqx_config:config()) -> ok | {error, any()}. update(Name, Config) -> - NewGateway = #{name => Name, config => Config}, - emqx_gateway_sup:update_gateway(NewGateway). + emqx_gateway_sup:update_gateway(Name, Config). -spec start(gateway_name()) -> ok | {error, any()}. start(Name) -> diff --git a/apps/emqx_gateway/src/emqx_gateway_gw_sup.erl b/apps/emqx_gateway/src/emqx_gateway_gw_sup.erl index bfde2b562..d56c8783b 100644 --- a/apps/emqx_gateway/src/emqx_gateway_gw_sup.erl +++ b/apps/emqx_gateway/src/emqx_gateway_gw_sup.erl @@ -29,7 +29,7 @@ -export([ create_insta/3 , remove_insta/2 - , update_insta/2 + , update_insta/3 , start_insta/2 , stop_insta/2 , list_insta/1 @@ -72,12 +72,13 @@ remove_insta(Sup, Name) -> ok = supervisor:delete_child(Sup, Name) end. --spec update_insta(pid(), NewGateway :: gateway()) -> ok | {error, any()}. -update_insta(Sup, NewGateway = #{name := Name}) -> +-spec update_insta(pid(), gateway_name(), emqx_config:config()) + -> ok | {error, any()}. +update_insta(Sup, Name, Config) -> case emqx_gateway_utils:find_sup_child(Sup, Name) of false -> {error, not_found}; {ok, GwInstaPid} -> - emqx_gateway_insta_sup:update(GwInstaPid, NewGateway) + emqx_gateway_insta_sup:update(GwInstaPid, Config) end. -spec start_insta(pid(), gateway_name()) -> ok | {error, any()}. diff --git a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl index 1eae5f54c..74419914e 100644 --- a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl +++ b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl @@ -69,15 +69,15 @@ info(Pid) -> disable(Pid) -> call(Pid, disable). -%% @doc Start gateway +%% @doc Start gateway -spec enable(pid()) -> ok | {error, any()}. enable(Pid) -> call(Pid, enable). %% @doc Update the gateway configurations --spec update(pid(), gateway()) -> ok | {error, any()}. -update(Pid, NewGateway) -> - call(Pid, {update, NewGateway}). +-spec update(pid(), emqx_config:config()) -> ok | {error, any()}. +update(Pid, Config) -> + call(Pid, {update, Config}). call(Pid, Req) -> gen_server:call(Pid, Req, 5000). @@ -125,12 +125,7 @@ do_deinit_context(Ctx) -> ok. handle_call(info, _From, State = #state{gw = Gateway}) -> - GwInfo = Gateway#{status => State#state.status, - created_at => State#state.created_at, - started_at => State#state.started_at, - stopped_at => State#state.stopped_at - }, - {reply, GwInfo, State}; + {reply, state2info(Gateway), State}; handle_call(disable, _From, State = #state{status = Status}) -> case Status of @@ -158,32 +153,12 @@ handle_call(enable, _From, State = #state{status = Status}) -> {reply, {error, already_started}, State} end; -%% Stopped -> update -handle_call({update, NewGateway}, _From, State = #state{ - gw = Gateway, - status = stopped}) -> - case maps:get(name, NewGateway, undefined) - == maps:get(name, Gateway, undefined) of - true -> - {reply, ok, State#state{gw = NewGateway}}; - false -> - {reply, {error, gateway_name_not_match}, State} - end; - -%% Running -> update -handle_call({update, NewGateway}, _From, State = #state{gw = Gateway, - status = running}) -> - case maps:get(name, NewGateway, undefined) - == maps:get(name, Gateway, undefined) of - true -> - case cb_gateway_update(NewGateway, State) of - {ok, NState} -> - {reply, ok, NState}; - {error, Reason} -> - {reply, {error, Reason}, State} - end; - false -> - {reply, {error, gateway_name_not_match}, State} +handle_call({update, Config}, _From, State) -> + case cb_gateway_update(Config, State) of + {ok, NState} -> + {reply, ok, NState}; + {error, Reason} -> + {reply, {error, Reason}, State} end; handle_call(_Request, _From, State) -> @@ -223,6 +198,14 @@ terminate(_Reason, State = #state{ctx = Ctx, child_pids = Pids}) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +state2info(State = #state{gw = Gateway}) -> + Gateway#{ + status => State#state.status, + created_at => State#state.created_at, + started_at => State#state.started_at, + stopped_at => State#state.stopped_at + }. + %%-------------------------------------------------------------------- %% Internal funcs %%-------------------------------------------------------------------- @@ -299,13 +282,12 @@ cb_gateway_load(State = #state{gw = Gateway = #{name := GwName}, {error, {Class, Reason1, Stk}} end. -cb_gateway_update(NewGateway, - State = #state{gw = Gateway = #{name := GwName}, - ctx = Ctx, +cb_gateway_update(Config, + State = #state{gw = #{name := GwName}, gw_state = GwState}) -> try #{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName), - case CbMod:on_gateway_update(NewGateway, Gateway, GwState) of + case CbMod:on_gateway_update(Config, state2info(State), GwState) of {error, Reason} -> throw({callback_return_error, Reason}); {ok, ChildPidOrSpecs, NGwState} -> %% XXX: Hot-upgrade ??? @@ -317,9 +299,9 @@ cb_gateway_update(NewGateway, end catch Class : Reason1 : Stk -> - logger:error("Failed to update gateway (~0p, ~0p, ~0p) crashed: " + logger:error("Failed to update ~s gateway to config: ~0p crashed: " "{~p, ~p}, stacktrace: ~0p", - [NewGateway, Gateway, Ctx, + [GwName, Config, Class, Reason1, Stk]), {error, {Class, Reason1, Stk}} end. diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index f313058c1..8db75e504 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -234,10 +234,6 @@ authentication() -> , hoconsc:ref(emqx_enhanced_authn_scram_mnesia, config) ]). -%translations() -> []. -% -%translations(_) -> []. - gateway_common_options() -> [ {enable, t(boolean(), undefined, true)} , {enable_stats, t(boolean(), undefined, true)} diff --git a/apps/emqx_gateway/src/emqx_gateway_sup.erl b/apps/emqx_gateway/src/emqx_gateway_sup.erl index 57ac7e7c7..09b74450a 100644 --- a/apps/emqx_gateway/src/emqx_gateway_sup.erl +++ b/apps/emqx_gateway/src/emqx_gateway_sup.erl @@ -26,7 +26,7 @@ -export([ load_gateway/1 , unload_gateway/1 , lookup_gateway/1 - , update_gateway/1 + , update_gateway/2 , start_gateway_insta/1 , stop_gateway_insta/1 , list_gateway_insta/0 @@ -74,13 +74,13 @@ lookup_gateway(GwName) -> undefined end. --spec update_gateway(gateway()) +-spec update_gateway(gateway_name(), emqx_config:config()) -> ok | {error, any()}. -update_gateway(NewGateway = #{name := GwName}) -> +update_gateway(GwName, Config) -> case emqx_gateway_utils:find_sup_child(?MODULE, GwName) of {ok, GwSup} -> - emqx_gateway_gw_sup:update_insta(GwSup, NewGateway); + emqx_gateway_gw_sup:update_insta(GwSup, GwName, Config); _ -> {error, not_found} end. diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl index 2821e9d15..8131f2d0c 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_impl.erl @@ -109,13 +109,13 @@ on_gateway_load(_Gateway = #{ name := GwName, end, Listeners), {ok, ListenerPids, _GwState = #{ctx => Ctx}}. -on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) -> - GwName = maps:get(name, NewGateway), +on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) -> + GwName = maps:get(name, Gateway), try %% XXX: 1. How hot-upgrade the changes ??? %% XXX: 2. Check the New confs first before destroy old instance ??? - on_gateway_unload(OldGateway, GwState), - on_gateway_load(NewGateway, Ctx) + on_gateway_unload(Gateway, GwState), + on_gateway_load(Gateway#{config => Config}, Ctx) catch Class : Reason : Stk -> logger:error("Failed to update ~s; " diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl index 196818478..039b23924 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_impl.erl @@ -77,13 +77,13 @@ on_gateway_load(_Gateway = #{ name := GwName, end, Listeners), {ok, ListenerPids, _InstaState = #{ctx => Ctx}}. -on_gateway_update(NewGateway = #{name := GwName}, OldGateway, - GwState = #{ctx := Ctx}) -> +on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) -> + GwName = maps:get(name, Gateway), try %% XXX: 1. How hot-upgrade the changes ??? %% XXX: 2. Check the New confs first before destroy old instance ??? - on_gateway_unload(OldGateway, GwState), - on_gateway_load(NewGateway, Ctx) + on_gateway_unload(Gateway, GwState), + on_gateway_load(Gateway#{config => Config}, Ctx) catch Class : Reason : Stk -> logger:error("Failed to update ~s; " @@ -92,9 +92,9 @@ on_gateway_update(NewGateway = #{name := GwName}, OldGateway, {error, {Class, Reason}} end. -on_gateway_unload(_Insta = #{ name := GwName, - config := Config - }, _GwState) -> +on_gateway_unload(_Gateway = #{ name := GwName, + config := Config + }, _GwState) -> Listeners = emqx_gateway_utils:normalize_config(Config), lists:foreach(fun(Lis) -> stop_listener(GwName, Lis) diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl index fd7c0427a..593b71289 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_impl.erl @@ -62,13 +62,13 @@ on_gateway_load(_Gateway = #{ name := GwName, %% FIXME: Assign ctx to GwState {ok, ListenerPids, _GwState = #{ctx => Ctx}}. -on_gateway_update(NewGateway, OldGateway, GwState = #{ctx := Ctx}) -> - GwName = maps:get(name, NewGateway), +on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) -> + GwName = maps:get(name, Gateway), try %% XXX: 1. How hot-upgrade the changes ??? %% XXX: 2. Check the New confs first before destroy old state??? - on_gateway_unload(OldGateway, GwState), - on_gateway_load(NewGateway, Ctx) + on_gateway_unload(Gateway, GwState), + on_gateway_load(Gateway#{config => Config}, Ctx) catch Class : Reason : Stk -> logger:error("Failed to update ~s; "