From dacc53facf8921c13f9ee191eccdc28ec30335b1 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 27 Aug 2021 09:43:49 +0800 Subject: [PATCH] refactor(gw): suppport the hot upgrade gateway instance --- apps/emqx_gateway/src/emqx_gateway.erl | 31 +++ apps/emqx_gateway/src/emqx_gateway_api.erl | 6 +- apps/emqx_gateway/src/emqx_gateway_app.erl | 29 +-- .../src/emqx_gateway_insta_sup.erl | 200 +++++++++++------- 4 files changed, 162 insertions(+), 104 deletions(-) diff --git a/apps/emqx_gateway/src/emqx_gateway.erl b/apps/emqx_gateway/src/emqx_gateway.erl index aab0dab55..81d9b593c 100644 --- a/apps/emqx_gateway/src/emqx_gateway.erl +++ b/apps/emqx_gateway/src/emqx_gateway.erl @@ -16,8 +16,15 @@ -module(emqx_gateway). +-behaviour(emqx_config_handler). + -include("include/emqx_gateway.hrl"). +%% callbacks for emqx_config_handler +-export([ pre_config_update/2 + , post_config_update/3 + ]). + %% APIs -export([ registered_gateway/0 , load/2 @@ -31,6 +38,10 @@ -export([update_rawconf/2]). +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- + -spec registered_gateway() -> [{gateway_name(), emqx_gateway_registry:descriptor()}]. registered_gateway() -> @@ -79,6 +90,26 @@ stop(Name) -> update_rawconf(RawName, RawConfDiff) -> emqx:update_config([gateway], {RawName, RawConfDiff}). +%%-------------------------------------------------------------------- +%% Config Handler + +-spec pre_config_update(emqx_config:update_request(), emqx_config:raw_config()) -> + {ok, emqx_config:update_request()} | {error, term()}. +pre_config_update({RawName, RawConfDiff}, RawConf) -> + {ok, emqx_map_lib:deep_merge(RawConf, #{RawName => RawConfDiff})}. + +-spec post_config_update(emqx_config:update_request(), emqx_config:config(), + emqx_config:config()) -> ok | {ok, Result::any()} | {error, Reason::term()}. +post_config_update({RawName, _}, NewConfig, OldConfig) -> + GwName = binary_to_existing_atom(RawName), + SubConf = maps:get(GwName, NewConfig), + case maps:get(GwName, OldConfig, undefined) of + undefined -> + emqx_gateway:load(GwName, SubConf); + _ -> + emqx_gateway:update(GwName, SubConf) + end. + %%-------------------------------------------------------------------- %% Internal funcs %%-------------------------------------------------------------------- diff --git a/apps/emqx_gateway/src/emqx_gateway_api.erl b/apps/emqx_gateway/src/emqx_gateway_api.erl index 89b17a6b9..270a8b332 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api.erl @@ -349,9 +349,7 @@ gateway_insta(delete, Request) -> ok -> {200, ok}; {error, not_found} -> - {404, <<"Not Found">>}; - {error, Reason} -> - {500, Reason} + {404, <<"Not Found">>} end; gateway_insta(get, Request) -> Name = binary_to_existing_atom(cowboy_req:binding(name, Request)), @@ -363,7 +361,7 @@ gateway_insta(get, Request) -> {404, <<"Not Found">>} end; gateway_insta(post, Request) -> - Name = binary_to_existing_atom(cowboy_req:binding(name, Request)), + Name = cowboy_req:binding(name, Request), {ok, RawConf, _NRequest} = cowboy_req:read_body(Request), %% XXX: Consistence ?? case emqx_gateway:update_rawconf(Name, RawConf) of diff --git a/apps/emqx_gateway/src/emqx_gateway_app.erl b/apps/emqx_gateway/src/emqx_gateway_app.erl index 8af5a1026..1ecd9cf26 100644 --- a/apps/emqx_gateway/src/emqx_gateway_app.erl +++ b/apps/emqx_gateway/src/emqx_gateway_app.erl @@ -17,22 +17,19 @@ -module(emqx_gateway_app). -behaviour(application). --behaviour(emqx_config_handler). -include_lib("emqx/include/logger.hrl"). -export([start/2, stop/1]). --export([ pre_config_update/2 - , post_config_update/3 - ]). +-define(CONF_CALLBACK_MODULE, emqx_gateway). start(_StartType, _StartArgs) -> {ok, Sup} = emqx_gateway_sup:start_link(), emqx_gateway_cli:load(), load_default_gateway_applications(), load_gateway_by_default(), - emqx_config_handler:add_handler([gateway], ?MODULE), + emqx_config_handler:add_handler([gateway], ?CONF_CALLBACK_MODULE), {ok, Sup}. stop(_State) -> @@ -41,28 +38,6 @@ stop(_State) -> %emqx_config_handler:remove_handler([gateway], ?MODULE), ok. -%%-------------------------------------------------------------------- -%% Config Handler - -%% All of update_request is created by emqx_gateway_xx_api.erl module - --spec pre_config_update(emqx_config:update_request(), emqx_config:raw_config()) -> - {ok, emqx_config:update_request()} | {error, term()}. -pre_config_update({RawName, RawConfDiff}, RawConf) -> - {ok, emqx_map_lib:deep_merge(RawConf, #{RawName => RawConfDiff})}. - --spec post_config_update(emqx_config:update_request(), emqx_config:config(), - emqx_config:config()) -> ok | {ok, Result::any()} | {error, Reason::term()}. -post_config_update({RawName, _}, NewConfig, OldConfig) -> - GwName = binary_to_existing_atom(RawName), - SubConf = maps:get(GwName, NewConfig), - case maps:get(GwName, OldConfig, undefined) of - undefined -> - emqx_gateway:load(GwName, SubConf); - _ -> - emqx_gateway:update(GwName, SubConf) - end. - %%-------------------------------------------------------------------- %% Internal funcs diff --git a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl index 74419914e..d61d2c479 100644 --- a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl +++ b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl @@ -20,6 +20,7 @@ -behaviour(gen_server). -include("include/emqx_gateway.hrl"). +-include_lib("emqx/include/logger.hrl"). %% APIs -export([ start_link/3 @@ -39,7 +40,8 @@ ]). -record(state, { - gw :: gateway(), + name :: gateway_name(), + config :: emqx_config:config(), ctx :: emqx_gateway_ctx:context(), status :: stopped | running, child_pids :: [pid()], @@ -86,48 +88,29 @@ call(Pid, Req) -> %% gen_server callbacks %%-------------------------------------------------------------------- -init([Gateway, Ctx0, _GwDscrptr]) -> +init([Gateway, Ctx, _GwDscrptr]) -> process_flag(trap_exit, true), #{name := GwName, config := Config } = Gateway, - Ctx = do_init_context(GwName, Config, Ctx0), State = #state{ - gw = Gateway, - ctx = Ctx, + ctx = Ctx, + name = GwName, + config = Config, child_pids = [], status = stopped, created_at = erlang:system_time(millisecond) }, case cb_gateway_load(State) of {error, Reason} -> - do_deinit_context(Ctx), {stop, {load_gateway_failure, Reason}}; {ok, NState} -> {ok, NState} end. -do_init_context(GwName, Config, Ctx) -> - Auth = case maps:get(authentication, Config, #{enable => false}) of - #{enable := false} -> undefined; - AuthCfg when is_map(AuthCfg) -> - case maps:get(enable, AuthCfg, true) of - false -> - undefined; - _ -> - create_authentication_for_gateway_insta(GwName, AuthCfg) - end; - _ -> - undefined - end, - Ctx#{auth => Auth}. - -do_deinit_context(Ctx) -> - cleanup_authentication_for_gateway_insta(maps:get(auth, Ctx)), - ok. - -handle_call(info, _From, State = #state{gw = Gateway}) -> - {reply, state2info(Gateway), State}; +handle_call(info, _From, State) -> + {reply, detailed_gateway_info(State), State}; handle_call(disable, _From, State = #state{status = Status}) -> + %% XXX: The `disable` opertaion is not persist to config database case Status of running -> case cb_gateway_unload(State) of @@ -154,10 +137,11 @@ handle_call(enable, _From, State = #state{status = Status}) -> end; handle_call({update, Config}, _From, State) -> - case cb_gateway_update(Config, State) of + case do_update_one_by_one(Config, State) of {ok, NState} -> {reply, ok, NState}; {error, Reason} -> + %% If something wrong, nothing to update {reply, {error, Reason}, State} end; @@ -171,10 +155,10 @@ handle_cast(_Msg, State) -> handle_info({'EXIT', Pid, Reason}, State = #state{child_pids = Pids}) -> case lists:member(Pid, Pids) of true -> - logger:error("Child process ~p exited: ~0p.", [Pid, Reason]), + ?LOG(error, "Child process ~p exited: ~0p.", [Pid, Reason]), case Pids -- [Pid]of [] -> - logger:error("All child process exited!"), + ?LOG(error, "All child process exited!"), {noreply, State#state{status = stopped, child_pids = [], gw_state = undefined}}; @@ -182,24 +166,25 @@ handle_info({'EXIT', Pid, Reason}, State = #state{child_pids = Pids}) -> {noreply, State#state{child_pids = RemainPids}} end; _ -> - logger:error("Unknown process exited ~p:~0p", [Pid, Reason]), + ?LOG(error, "Unknown process exited ~p:~0p", [Pid, Reason]), {noreply, State} end; handle_info(Info, State) -> - logger:warning("Unexcepted info: ~p", [Info]), + ?LOG(warning, "Unexcepted info: ~p", [Info]), {noreply, State}. terminate(_Reason, State = #state{ctx = Ctx, child_pids = Pids}) -> Pids /= [] andalso (_ = cb_gateway_unload(State)), - _ = do_deinit_context(Ctx), + _ = do_deinit_authn(maps:get(auth, Ctx, undefined)), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. -state2info(State = #state{gw = Gateway}) -> - Gateway#{ +detailed_gateway_info(State) -> + #{name => State#state.name, + config => State#state.config, status => State#state.status, created_at => State#state.created_at, started_at => State#state.started_at, @@ -210,36 +195,94 @@ state2info(State = #state{gw = Gateway}) -> %% Internal funcs %%-------------------------------------------------------------------- -create_authentication_for_gateway_insta(GwName, AuthCfg) -> - ChainId = atom_to_binary(GwName, utf8), - case emqx_authn:create_chain(#{id => ChainId}) of - {ok, _ChainInfo} -> - case emqx_authn:create_authenticator(ChainId, AuthCfg) of - {ok, _} -> ChainId; - {error, Reason} -> - logger:error("Failed to create authentication ~p", [Reason]), - throw({bad_authentication, Reason}) +do_init_authn(GwName, Config) -> + case maps:get(authentication, Config, #{enable => false}) of + #{enable := false} -> undefined; + AuthCfg when is_map(AuthCfg) -> + case maps:get(enable, AuthCfg, true) of + false -> + undefined; + _ -> + %% TODO: Implement Authentication + GwName + %case emqx_authn:create_chain(#{id => ChainId}) of + % {ok, _ChainInfo} -> + % case emqx_authn:create_authenticator(ChainId, AuthCfg) of + % {ok, _} -> ChainId; + % {error, Reason} -> + % ?LOG(error, "Failed to create authentication ~p", [Reason]), + % throw({bad_authentication, Reason}) + % end; + % {error, Reason} -> + % ?LOG(error, "Failed to create authentication chain: ~p", [Reason]), + % throw({bad_chain, {ChainId, Reason}}) + %end. end; - {error, Reason} -> - logger:error("Failed to create authentication chain: ~p", [Reason]), - throw({bad_chain, {ChainId, Reason}}) + _ -> + undefined end. -cleanup_authentication_for_gateway_insta(undefined) -> +do_deinit_authn(undefined) -> ok; -cleanup_authentication_for_gateway_insta(ChainId) -> - case emqx_authn:delete_chain(ChainId) of - ok -> ok; - {error, {not_found, _}} -> - logger:warning("Failed to clean authentication chain: ~s, " - "reason: not_found", [ChainId]); - {error, Reason} -> - logger:error("Failed to clean authentication chain: ~s, " - "reason: ~p", [ChainId, Reason]) +do_deinit_authn(AuthnRef) -> + %% TODO: + ?LOG(error, "Failed to clean authn ~p, not suppported now", [AuthnRef]). + %case emqx_authn:delete_chain(AuthnRef) of + % ok -> ok; + % {error, {not_found, _}} -> + % ?LOG(warning, "Failed to clean authentication chain: ~s, " + % "reason: not_found", [AuthnRef]); + % {error, Reason} -> + % ?LOG(error, "Failed to clean authentication chain: ~s, " + % "reason: ~p", [AuthnRef, Reason]) + %end. + +do_update_one_by_one(NCfg0, State = #state{ + ctx = Ctx, + config = OCfg, + status = Status}) -> + + NCfg = emqx_map_lib:deep_merge(OCfg, NCfg0), + + OEnable = maps:get(enable, OCfg, true), + NEnable = maps:get(enable, NCfg0, OEnable), + + OAuth = maps:get(authentication, OCfg, undefined), + NAuth = maps:get(authentication, NCfg0, OAuth), + + if + Status == stopped, NEnable == true -> + NState = State#state{config = NCfg}, + cb_gateway_load(NState); + Status == stopped, NEnable == false -> + {ok, State#state{config = NCfg}}; + Status == running, NEnable == true -> + NState = case NAuth == OAuth of + true -> State; + false -> + %% Reset Authentication first + _ = do_deinit_authn(maps:get(auth, Ctx, undefined)), + NCtx = Ctx#{ + auth => do_init_authn( + State#state.name, + NCfg + ) + }, + State#state{ctx = NCtx} + end, + cb_gateway_update(NCfg, NState); + Status == running, NEnable == false -> + case cb_gateway_unload(State) of + {ok, NState} -> {ok, NState#state{config = NCfg}}; + {error, Reason} -> {error, Reason} + end; + true -> + throw(nomatch) end. -cb_gateway_unload(State = #state{gw = Gateway = #{name := GwName}, +cb_gateway_unload(State = #state{name = GwName, gw_state = GwState}) -> + Gateway = detailed_gateway_info(State), try #{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName), CbMod:on_gateway_unload(Gateway, GwState), @@ -250,22 +293,33 @@ cb_gateway_unload(State = #state{gw = Gateway = #{name := GwName}, stopped_at = erlang:system_time(millisecond)}} catch Class : Reason : Stk -> - logger:error("Failed to unload gateway (~0p, ~0p) crashed: " - "{~p, ~p}, stacktrace: ~0p", - [Gateway, GwState, + ?LOG(error, "Failed to unload gateway (~0p, ~0p) crashed: " + "{~p, ~p}, stacktrace: ~0p", + [GwName, GwState, Class, Reason, Stk]), {error, {Class, Reason, Stk}} end. -cb_gateway_load(State = #state{gw = Gateway = #{name := GwName}, +%% @doc 1. Create Authentcation Context +%% 2. Callback to Mod:on_gateway_load/2 +%% +%% Notes: If failed, rollback +cb_gateway_load(State = #state{name = GwName, + config = Config, ctx = Ctx}) -> + Gateway = detailed_gateway_info(State), try + AuthnRef = do_init_authn(GwName, Config), + NCtx = Ctx#{auth => AuthnRef}, #{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName), - case CbMod:on_gateway_load(Gateway, Ctx) of - {error, Reason} -> throw({callback_return_error, Reason}); + case CbMod:on_gateway_load(Gateway, NCtx) of + {error, Reason} -> + do_deinit_authn(AuthnRef), + throw({callback_return_error, Reason}); {ok, ChildPidOrSpecs, GwState} -> ChildPids = start_child_process(ChildPidOrSpecs), {ok, State#state{ + ctx = NCtx, status = running, child_pids = ChildPids, gw_state = GwState, @@ -275,34 +329,34 @@ cb_gateway_load(State = #state{gw = Gateway = #{name := GwName}, end catch Class : Reason1 : Stk -> - logger:error("Failed to load ~s gateway (~0p, ~0p) crashed: " - "{~p, ~p}, stacktrace: ~0p", + ?LOG(error, "Failed to load ~s gateway (~0p, ~0p) crashed: " + "{~p, ~p}, stacktrace: ~0p", [GwName, Gateway, Ctx, Class, Reason1, Stk]), {error, {Class, Reason1, Stk}} end. cb_gateway_update(Config, - State = #state{gw = #{name := GwName}, - gw_state = GwState}) -> + State = #state{name = GwName, + gw_state = GwState}) -> try #{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName), - case CbMod:on_gateway_update(Config, state2info(State), GwState) of + case CbMod:on_gateway_update(Config, detailed_gateway_info(State), GwState) of {error, Reason} -> throw({callback_return_error, Reason}); {ok, ChildPidOrSpecs, NGwState} -> %% XXX: Hot-upgrade ??? ChildPids = start_child_process(ChildPidOrSpecs), {ok, State#state{ + config = Config, child_pids = ChildPids, gw_state = NGwState }} end catch Class : Reason1 : Stk -> - logger:error("Failed to update ~s gateway to config: ~0p crashed: " - "{~p, ~p}, stacktrace: ~0p", - [GwName, Config, - Class, Reason1, Stk]), + ?LOG(error, "Failed to update ~s gateway to config: ~0p crashed: " + "{~p, ~p}, stacktrace: ~0p", + [GwName, Config, Class, Reason1, Stk]), {error, {Class, Reason1, Stk}} end.