refactor(gw): suppport the hot upgrade gateway instance

This commit is contained in:
JianBo He 2021-08-27 09:43:49 +08:00 committed by turtleDeng
parent 8886d60720
commit dacc53facf
4 changed files with 162 additions and 104 deletions

View File

@ -16,8 +16,15 @@
-module(emqx_gateway).
-behaviour(emqx_config_handler).
-include("include/emqx_gateway.hrl").
%% callbacks for emqx_config_handler
-export([ pre_config_update/2
, post_config_update/3
]).
%% APIs
-export([ registered_gateway/0
, load/2
@ -31,6 +38,10 @@
-export([update_rawconf/2]).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
-spec registered_gateway() ->
[{gateway_name(), emqx_gateway_registry:descriptor()}].
registered_gateway() ->
@ -79,6 +90,26 @@ stop(Name) ->
update_rawconf(RawName, RawConfDiff) ->
emqx:update_config([gateway], {RawName, RawConfDiff}).
%%--------------------------------------------------------------------
%% Config Handler
-spec pre_config_update(emqx_config:update_request(), emqx_config:raw_config()) ->
{ok, emqx_config:update_request()} | {error, term()}.
pre_config_update({RawName, RawConfDiff}, RawConf) ->
{ok, emqx_map_lib:deep_merge(RawConf, #{RawName => RawConfDiff})}.
-spec post_config_update(emqx_config:update_request(), emqx_config:config(),
emqx_config:config()) -> ok | {ok, Result::any()} | {error, Reason::term()}.
post_config_update({RawName, _}, NewConfig, OldConfig) ->
GwName = binary_to_existing_atom(RawName),
SubConf = maps:get(GwName, NewConfig),
case maps:get(GwName, OldConfig, undefined) of
undefined ->
emqx_gateway:load(GwName, SubConf);
_ ->
emqx_gateway:update(GwName, SubConf)
end.
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------

View File

@ -349,9 +349,7 @@ gateway_insta(delete, Request) ->
ok ->
{200, ok};
{error, not_found} ->
{404, <<"Not Found">>};
{error, Reason} ->
{500, Reason}
{404, <<"Not Found">>}
end;
gateway_insta(get, Request) ->
Name = binary_to_existing_atom(cowboy_req:binding(name, Request)),
@ -363,7 +361,7 @@ gateway_insta(get, Request) ->
{404, <<"Not Found">>}
end;
gateway_insta(post, Request) ->
Name = binary_to_existing_atom(cowboy_req:binding(name, Request)),
Name = cowboy_req:binding(name, Request),
{ok, RawConf, _NRequest} = cowboy_req:read_body(Request),
%% XXX: Consistence ??
case emqx_gateway:update_rawconf(Name, RawConf) of

View File

@ -17,22 +17,19 @@
-module(emqx_gateway_app).
-behaviour(application).
-behaviour(emqx_config_handler).
-include_lib("emqx/include/logger.hrl").
-export([start/2, stop/1]).
-export([ pre_config_update/2
, post_config_update/3
]).
-define(CONF_CALLBACK_MODULE, emqx_gateway).
start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_gateway_sup:start_link(),
emqx_gateway_cli:load(),
load_default_gateway_applications(),
load_gateway_by_default(),
emqx_config_handler:add_handler([gateway], ?MODULE),
emqx_config_handler:add_handler([gateway], ?CONF_CALLBACK_MODULE),
{ok, Sup}.
stop(_State) ->
@ -41,28 +38,6 @@ stop(_State) ->
%emqx_config_handler:remove_handler([gateway], ?MODULE),
ok.
%%--------------------------------------------------------------------
%% Config Handler
%% All of update_request is created by emqx_gateway_xx_api.erl module
-spec pre_config_update(emqx_config:update_request(), emqx_config:raw_config()) ->
{ok, emqx_config:update_request()} | {error, term()}.
pre_config_update({RawName, RawConfDiff}, RawConf) ->
{ok, emqx_map_lib:deep_merge(RawConf, #{RawName => RawConfDiff})}.
-spec post_config_update(emqx_config:update_request(), emqx_config:config(),
emqx_config:config()) -> ok | {ok, Result::any()} | {error, Reason::term()}.
post_config_update({RawName, _}, NewConfig, OldConfig) ->
GwName = binary_to_existing_atom(RawName),
SubConf = maps:get(GwName, NewConfig),
case maps:get(GwName, OldConfig, undefined) of
undefined ->
emqx_gateway:load(GwName, SubConf);
_ ->
emqx_gateway:update(GwName, SubConf)
end.
%%--------------------------------------------------------------------
%% Internal funcs

View File

@ -20,6 +20,7 @@
-behaviour(gen_server).
-include("include/emqx_gateway.hrl").
-include_lib("emqx/include/logger.hrl").
%% APIs
-export([ start_link/3
@ -39,7 +40,8 @@
]).
-record(state, {
gw :: gateway(),
name :: gateway_name(),
config :: emqx_config:config(),
ctx :: emqx_gateway_ctx:context(),
status :: stopped | running,
child_pids :: [pid()],
@ -86,48 +88,29 @@ call(Pid, Req) ->
%% gen_server callbacks
%%--------------------------------------------------------------------
init([Gateway, Ctx0, _GwDscrptr]) ->
init([Gateway, Ctx, _GwDscrptr]) ->
process_flag(trap_exit, true),
#{name := GwName, config := Config } = Gateway,
Ctx = do_init_context(GwName, Config, Ctx0),
State = #state{
gw = Gateway,
ctx = Ctx,
name = GwName,
config = Config,
child_pids = [],
status = stopped,
created_at = erlang:system_time(millisecond)
},
case cb_gateway_load(State) of
{error, Reason} ->
do_deinit_context(Ctx),
{stop, {load_gateway_failure, Reason}};
{ok, NState} ->
{ok, NState}
end.
do_init_context(GwName, Config, Ctx) ->
Auth = case maps:get(authentication, Config, #{enable => false}) of
#{enable := false} -> undefined;
AuthCfg when is_map(AuthCfg) ->
case maps:get(enable, AuthCfg, true) of
false ->
undefined;
_ ->
create_authentication_for_gateway_insta(GwName, AuthCfg)
end;
_ ->
undefined
end,
Ctx#{auth => Auth}.
do_deinit_context(Ctx) ->
cleanup_authentication_for_gateway_insta(maps:get(auth, Ctx)),
ok.
handle_call(info, _From, State = #state{gw = Gateway}) ->
{reply, state2info(Gateway), State};
handle_call(info, _From, State) ->
{reply, detailed_gateway_info(State), State};
handle_call(disable, _From, State = #state{status = Status}) ->
%% XXX: The `disable` opertaion is not persist to config database
case Status of
running ->
case cb_gateway_unload(State) of
@ -154,10 +137,11 @@ handle_call(enable, _From, State = #state{status = Status}) ->
end;
handle_call({update, Config}, _From, State) ->
case cb_gateway_update(Config, State) of
case do_update_one_by_one(Config, State) of
{ok, NState} ->
{reply, ok, NState};
{error, Reason} ->
%% If something wrong, nothing to update
{reply, {error, Reason}, State}
end;
@ -171,10 +155,10 @@ handle_cast(_Msg, State) ->
handle_info({'EXIT', Pid, Reason}, State = #state{child_pids = Pids}) ->
case lists:member(Pid, Pids) of
true ->
logger:error("Child process ~p exited: ~0p.", [Pid, Reason]),
?LOG(error, "Child process ~p exited: ~0p.", [Pid, Reason]),
case Pids -- [Pid]of
[] ->
logger:error("All child process exited!"),
?LOG(error, "All child process exited!"),
{noreply, State#state{status = stopped,
child_pids = [],
gw_state = undefined}};
@ -182,24 +166,25 @@ handle_info({'EXIT', Pid, Reason}, State = #state{child_pids = Pids}) ->
{noreply, State#state{child_pids = RemainPids}}
end;
_ ->
logger:error("Unknown process exited ~p:~0p", [Pid, Reason]),
?LOG(error, "Unknown process exited ~p:~0p", [Pid, Reason]),
{noreply, State}
end;
handle_info(Info, State) ->
logger:warning("Unexcepted info: ~p", [Info]),
?LOG(warning, "Unexcepted info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, State = #state{ctx = Ctx, child_pids = Pids}) ->
Pids /= [] andalso (_ = cb_gateway_unload(State)),
_ = do_deinit_context(Ctx),
_ = do_deinit_authn(maps:get(auth, Ctx, undefined)),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
state2info(State = #state{gw = Gateway}) ->
Gateway#{
detailed_gateway_info(State) ->
#{name => State#state.name,
config => State#state.config,
status => State#state.status,
created_at => State#state.created_at,
started_at => State#state.started_at,
@ -210,36 +195,94 @@ state2info(State = #state{gw = Gateway}) ->
%% Internal funcs
%%--------------------------------------------------------------------
create_authentication_for_gateway_insta(GwName, AuthCfg) ->
ChainId = atom_to_binary(GwName, utf8),
case emqx_authn:create_chain(#{id => ChainId}) of
{ok, _ChainInfo} ->
case emqx_authn:create_authenticator(ChainId, AuthCfg) of
{ok, _} -> ChainId;
{error, Reason} ->
logger:error("Failed to create authentication ~p", [Reason]),
throw({bad_authentication, Reason})
do_init_authn(GwName, Config) ->
case maps:get(authentication, Config, #{enable => false}) of
#{enable := false} -> undefined;
AuthCfg when is_map(AuthCfg) ->
case maps:get(enable, AuthCfg, true) of
false ->
undefined;
_ ->
%% TODO: Implement Authentication
GwName
%case emqx_authn:create_chain(#{id => ChainId}) of
% {ok, _ChainInfo} ->
% case emqx_authn:create_authenticator(ChainId, AuthCfg) of
% {ok, _} -> ChainId;
% {error, Reason} ->
% ?LOG(error, "Failed to create authentication ~p", [Reason]),
% throw({bad_authentication, Reason})
% end;
% {error, Reason} ->
% ?LOG(error, "Failed to create authentication chain: ~p", [Reason]),
% throw({bad_chain, {ChainId, Reason}})
%end.
end;
{error, Reason} ->
logger:error("Failed to create authentication chain: ~p", [Reason]),
throw({bad_chain, {ChainId, Reason}})
_ ->
undefined
end.
cleanup_authentication_for_gateway_insta(undefined) ->
do_deinit_authn(undefined) ->
ok;
cleanup_authentication_for_gateway_insta(ChainId) ->
case emqx_authn:delete_chain(ChainId) of
ok -> ok;
{error, {not_found, _}} ->
logger:warning("Failed to clean authentication chain: ~s, "
"reason: not_found", [ChainId]);
{error, Reason} ->
logger:error("Failed to clean authentication chain: ~s, "
"reason: ~p", [ChainId, Reason])
do_deinit_authn(AuthnRef) ->
%% TODO:
?LOG(error, "Failed to clean authn ~p, not suppported now", [AuthnRef]).
%case emqx_authn:delete_chain(AuthnRef) of
% ok -> ok;
% {error, {not_found, _}} ->
% ?LOG(warning, "Failed to clean authentication chain: ~s, "
% "reason: not_found", [AuthnRef]);
% {error, Reason} ->
% ?LOG(error, "Failed to clean authentication chain: ~s, "
% "reason: ~p", [AuthnRef, Reason])
%end.
do_update_one_by_one(NCfg0, State = #state{
ctx = Ctx,
config = OCfg,
status = Status}) ->
NCfg = emqx_map_lib:deep_merge(OCfg, NCfg0),
OEnable = maps:get(enable, OCfg, true),
NEnable = maps:get(enable, NCfg0, OEnable),
OAuth = maps:get(authentication, OCfg, undefined),
NAuth = maps:get(authentication, NCfg0, OAuth),
if
Status == stopped, NEnable == true ->
NState = State#state{config = NCfg},
cb_gateway_load(NState);
Status == stopped, NEnable == false ->
{ok, State#state{config = NCfg}};
Status == running, NEnable == true ->
NState = case NAuth == OAuth of
true -> State;
false ->
%% Reset Authentication first
_ = do_deinit_authn(maps:get(auth, Ctx, undefined)),
NCtx = Ctx#{
auth => do_init_authn(
State#state.name,
NCfg
)
},
State#state{ctx = NCtx}
end,
cb_gateway_update(NCfg, NState);
Status == running, NEnable == false ->
case cb_gateway_unload(State) of
{ok, NState} -> {ok, NState#state{config = NCfg}};
{error, Reason} -> {error, Reason}
end;
true ->
throw(nomatch)
end.
cb_gateway_unload(State = #state{gw = Gateway = #{name := GwName},
cb_gateway_unload(State = #state{name = GwName,
gw_state = GwState}) ->
Gateway = detailed_gateway_info(State),
try
#{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName),
CbMod:on_gateway_unload(Gateway, GwState),
@ -250,22 +293,33 @@ cb_gateway_unload(State = #state{gw = Gateway = #{name := GwName},
stopped_at = erlang:system_time(millisecond)}}
catch
Class : Reason : Stk ->
logger:error("Failed to unload gateway (~0p, ~0p) crashed: "
?LOG(error, "Failed to unload gateway (~0p, ~0p) crashed: "
"{~p, ~p}, stacktrace: ~0p",
[Gateway, GwState,
[GwName, GwState,
Class, Reason, Stk]),
{error, {Class, Reason, Stk}}
end.
cb_gateway_load(State = #state{gw = Gateway = #{name := GwName},
%% @doc 1. Create Authentcation Context
%% 2. Callback to Mod:on_gateway_load/2
%%
%% Notes: If failed, rollback
cb_gateway_load(State = #state{name = GwName,
config = Config,
ctx = Ctx}) ->
Gateway = detailed_gateway_info(State),
try
AuthnRef = do_init_authn(GwName, Config),
NCtx = Ctx#{auth => AuthnRef},
#{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName),
case CbMod:on_gateway_load(Gateway, Ctx) of
{error, Reason} -> throw({callback_return_error, Reason});
case CbMod:on_gateway_load(Gateway, NCtx) of
{error, Reason} ->
do_deinit_authn(AuthnRef),
throw({callback_return_error, Reason});
{ok, ChildPidOrSpecs, GwState} ->
ChildPids = start_child_process(ChildPidOrSpecs),
{ok, State#state{
ctx = NCtx,
status = running,
child_pids = ChildPids,
gw_state = GwState,
@ -275,7 +329,7 @@ cb_gateway_load(State = #state{gw = Gateway = #{name := GwName},
end
catch
Class : Reason1 : Stk ->
logger:error("Failed to load ~s gateway (~0p, ~0p) crashed: "
?LOG(error, "Failed to load ~s gateway (~0p, ~0p) crashed: "
"{~p, ~p}, stacktrace: ~0p",
[GwName, Gateway, Ctx,
Class, Reason1, Stk]),
@ -283,26 +337,26 @@ cb_gateway_load(State = #state{gw = Gateway = #{name := GwName},
end.
cb_gateway_update(Config,
State = #state{gw = #{name := GwName},
State = #state{name = GwName,
gw_state = GwState}) ->
try
#{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName),
case CbMod:on_gateway_update(Config, state2info(State), GwState) of
case CbMod:on_gateway_update(Config, detailed_gateway_info(State), GwState) of
{error, Reason} -> throw({callback_return_error, Reason});
{ok, ChildPidOrSpecs, NGwState} ->
%% XXX: Hot-upgrade ???
ChildPids = start_child_process(ChildPidOrSpecs),
{ok, State#state{
config = Config,
child_pids = ChildPids,
gw_state = NGwState
}}
end
catch
Class : Reason1 : Stk ->
logger:error("Failed to update ~s gateway to config: ~0p crashed: "
?LOG(error, "Failed to update ~s gateway to config: ~0p crashed: "
"{~p, ~p}, stacktrace: ~0p",
[GwName, Config,
Class, Reason1, Stk]),
[GwName, Config, Class, Reason1, Stk]),
{error, {Class, Reason1, Stk}}
end.