%%-------------------------------------------------------------------- %% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- %% @doc The gateway runtime -module(emqx_gateway_insta_sup). -behaviour(gen_server). -include("include/emqx_gateway.hrl"). -include_lib("emqx/include/logger.hrl"). %% APIs -export([ start_link/3 , info/1 , disable/1 , enable/1 , update/2 ]). %% gen_server callbacks -export([ init/1 , handle_call/3 , handle_cast/2 , handle_info/2 , terminate/2 , code_change/3 ]). -record(state, { name :: gateway_name(), config :: emqx_config:config(), ctx :: emqx_gateway_ctx:context(), authns :: [emqx_authentication:chain_name()], status :: stopped | running, child_pids :: [pid()], gw_state :: emqx_gateway_impl:state() | undefined, created_at :: integer(), started_at :: integer() | undefined, stopped_at :: integer() | undefined }). %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- start_link(Gateway, Ctx, GwDscrptr) -> gen_server:start_link( ?MODULE, [Gateway, Ctx, GwDscrptr], [] ). -spec info(pid()) -> gateway(). info(Pid) -> gen_server:call(Pid, info). %% @doc Stop gateway -spec disable(pid()) -> ok | {error, any()}. disable(Pid) -> call(Pid, disable). %% @doc Start gateway -spec enable(pid()) -> ok | {error, any()}. enable(Pid) -> call(Pid, enable). %% @doc Update the gateway configurations -spec update(pid(), emqx_config:config()) -> ok | {error, any()}. update(Pid, Config) -> call(Pid, {update, Config}). call(Pid, Req) -> gen_server:call(Pid, Req, 5000). %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- init([Gateway, Ctx, _GwDscrptr]) -> process_flag(trap_exit, true), #{name := GwName, config := Config } = Gateway, State = #state{ ctx = Ctx, name = GwName, authns = [], config = Config, child_pids = [], status = stopped, created_at = erlang:system_time(millisecond) }, case maps:get(enable, Config, true) of false -> ?SLOG(info, #{ msg => "skip_to_start_gateway_due_to_disabled" , gateway_name => GwName }), {ok, State}; true -> case cb_gateway_load(State) of {error, Reason} -> {stop, {load_gateway_failure, Reason}}; {ok, NState} -> {ok, NState} end end. handle_call(info, _From, State) -> {reply, detailed_gateway_info(State), State}; handle_call(disable, _From, State = #state{status = Status}) -> %% XXX: The `disable` opertaion is not persist to config database case Status of running -> case cb_gateway_unload(State) of {ok, NState} -> {reply, ok, NState}; {error, Reason} -> {reply, {error, Reason}, State} end; _ -> {reply, {error, already_stopped}, State} end; handle_call(enable, _From, State = #state{status = Status}) -> case Status of stopped -> case cb_gateway_load(State) of {error, Reason} -> {reply, {error, Reason}, State}; {ok, NState} -> {reply, ok, NState} end; _ -> {reply, {error, already_started}, State} end; handle_call({update, Config}, _From, State) -> case do_update_one_by_one(Config, State) of {ok, NState} -> {reply, ok, NState}; {error, Reason} -> %% If something wrong, nothing to update {reply, {error, Reason}, State} end; handle_call(_Request, _From, State) -> Reply = ok, {reply, Reply, State}. handle_cast(_Msg, State) -> {noreply, State}. handle_info({'EXIT', Pid, Reason}, State = #state{name = Name, child_pids = Pids}) -> case lists:member(Pid, Pids) of true -> ?SLOG(error, #{ msg => "child_process_exited" , child => Pid , reason => Reason }), case Pids -- [Pid]of [] -> ?SLOG(error, #{ msg => "gateway_all_children_process_existed" , gateway_name => Name }), {noreply, State#state{status = stopped, child_pids = [], gw_state = undefined}}; RemainPids -> {noreply, State#state{child_pids = RemainPids}} end; _ -> ?SLOG(error, #{ msg => "gateway_catch_a_unknown_process_exited" , child => Pid , reason => Reason , gateway_name => Name }), {noreply, State} end; handle_info(Info, State) -> ?SLOG(warning, #{ msg => "unexcepted_info" , info => Info }), {noreply, State}. terminate(_Reason, State = #state{child_pids = Pids}) -> Pids /= [] andalso (_ = cb_gateway_unload(State)), _ = do_deinit_authn(State#state.authns), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. detailed_gateway_info(State) -> maps:filter( fun(_, V) -> V =/= undefined end, #{name => State#state.name, config => State#state.config, status => State#state.status, created_at => State#state.created_at, started_at => State#state.started_at, stopped_at => State#state.stopped_at }). %%-------------------------------------------------------------------- %% Internal funcs %%-------------------------------------------------------------------- %% same with emqx_authentication:global_chain/1 global_chain(mqtt) -> 'mqtt:global'; global_chain('mqtt-sn') -> 'mqtt-sn:global'; global_chain(coap) -> 'coap:global'; global_chain(lwm2m) -> 'lwm2m:global'; global_chain(stomp) -> 'stomp:global'; global_chain(_) -> 'unknown:global'. listener_chain(GwName, Type, LisName) -> emqx_gateway_utils:listener_id(GwName, Type, LisName). %% There are two layer authentication configs %% stomp.authn %% / \ %% listeners.tcp.defautl.authn *.ssl.default.authn %% init_authn(GwName, Config) -> Authns = authns(GwName, Config), try do_init_authn(Authns, []) catch throw : Reason = {badauth, _} -> do_deinit_authn(proplists:get_keys(Authns)), throw(Reason) end. do_init_authn([], Names) -> Names; do_init_authn([{_ChainName, _AuthConf = #{enable := false}}|More], Names) -> do_init_authn(More, Names); do_init_authn([{ChainName, AuthConf}|More], Names) when is_map(AuthConf) -> _ = application:ensure_all_started(emqx_authn), do_create_authn_chain(ChainName, AuthConf), do_init_authn(More, [ChainName|Names]); do_init_authn([_BadConf|More], Names) -> do_init_authn(More, Names). authns(GwName, Config) -> Listeners = maps:to_list(maps:get(listeners, Config, #{})), lists:append( [ [{listener_chain(GwName, LisType, LisName), authn_conf(Opts)} || {LisName, Opts} <- maps:to_list(LisNames) ] || {LisType, LisNames} <- Listeners]) ++ [{global_chain(GwName), authn_conf(Config)}]. authn_conf(Conf) -> maps:get(authentication, Conf, #{enable => false}). do_create_authn_chain(ChainName, AuthConf) -> case ensure_chain(ChainName) of ok -> case emqx_authentication:create_authenticator(ChainName, AuthConf) of {ok, _} -> ok; {error, Reason} -> ?SLOG(error, #{ msg => "failed_to_create_authenticator" , chain_name => ChainName , reason => Reason , config => AuthConf }), throw({badauth, Reason}) end; {error, Reason} -> ?SLOG(error, #{ msg => "failed_to_create_authn_chanin" , chain_name => ChainName , reason => Reason }), throw({badauth, Reason}) end. ensure_chain(ChainName) -> case emqx_authentication:create_chain(ChainName) of {ok, _ChainInfo} -> ok; {error, {already_exists, _}} -> ok; {error, Reason} -> {error, Reason} end. do_deinit_authn(Names) -> lists:foreach(fun(ChainName) -> case emqx_authentication:delete_chain(ChainName) of ok -> ok; {error, {not_found, _}} -> ok; {error, Reason} -> ?SLOG(error, #{ msg => "failed_to_clean_authn_chain" , chain_name => ChainName , reason => Reason }) end end, Names). do_update_one_by_one(NCfg, State = #state{ name = GwName, config = OCfg, status = Status}) -> OEnable = maps:get(enable, OCfg, true), NEnable = maps:get(enable, NCfg, OEnable), OAuths = authns(GwName, OCfg), NAuths = authns(GwName, NCfg), if Status == stopped, NEnable == true -> NState = State#state{config = NCfg}, cb_gateway_load(NState); Status == stopped, NEnable == false -> {ok, State#state{config = NCfg}}; Status == running, NEnable == true -> NState = case NAuths == OAuths of true -> State; false -> %% Reset Authentication first _ = do_deinit_authn(State#state.authns), AuthnNames = init_authn(State#state.name, NCfg), State#state{authns = AuthnNames} end, %% XXX: minimum impact update ??? cb_gateway_update(NCfg, NState); Status == running, NEnable == false -> case cb_gateway_unload(State) of {ok, NState} -> {ok, NState#state{config = NCfg}}; {error, Reason} -> {error, Reason} end; true -> throw(nomatch) end. cb_gateway_unload(State = #state{name = GwName, gw_state = GwState}) -> Gateway = detailed_gateway_info(State), try #{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName), CbMod:on_gateway_unload(Gateway, GwState), {ok, State#state{child_pids = [], authns = [], status = stopped, gw_state = undefined, started_at = undefined, stopped_at = erlang:system_time(millisecond)}} catch Class : Reason : Stk -> ?SLOG(error, #{ msg => "unload_gateway_crashed" , gateway_name => GwName , inner_state => GwState , reason => {Class, Reason} , stacktrace => Stk }), {error, {Class, Reason, Stk}} after _ = do_deinit_authn(State#state.authns) end. %% @doc 1. Create Authentcation Context %% 2. Callback to Mod:on_gateway_load/2 %% %% Notes: If failed, rollback cb_gateway_load(State = #state{name = GwName, config = Config, ctx = Ctx}) -> Gateway = detailed_gateway_info(State), try AuthnNames = init_authn(GwName, Config), NCtx = Ctx#{auth => AuthnNames}, #{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName), case CbMod:on_gateway_load(Gateway, NCtx) of {error, Reason} -> do_deinit_authn(AuthnNames), throw({callback_return_error, Reason}); {ok, ChildPidOrSpecs, GwState} -> ChildPids = start_child_process(ChildPidOrSpecs), {ok, State#state{ ctx = NCtx, authns = AuthnNames, status = running, child_pids = ChildPids, gw_state = GwState, stopped_at = undefined, started_at = erlang:system_time(millisecond) }} end catch Class : Reason1 : Stk -> ?SLOG(error, #{ msg => "load_gateway_crashed" , gateway_name => GwName , gateway => Gateway , ctx => Ctx , reason => {Class, Reason1} , stacktrace => Stk }), {error, {Class, Reason1, Stk}} end. cb_gateway_update(Config, State = #state{name = GwName, gw_state = GwState}) -> try #{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName), case CbMod:on_gateway_update(Config, detailed_gateway_info(State), GwState) of {error, Reason} -> throw({callback_return_error, Reason}); {ok, ChildPidOrSpecs, NGwState} -> %% XXX: Hot-upgrade ??? ChildPids = start_child_process(ChildPidOrSpecs), {ok, State#state{ config = Config, child_pids = ChildPids, gw_state = NGwState }} end catch Class : Reason1 : Stk -> ?SLOG(error, #{ msg => "update_gateway_crashed" , gateway_name => GwName , new_config => Config , reason => {Class, Reason1} , stacktrace => Stk }), {error, {Class, Reason1, Stk}} end. start_child_process([]) -> []; start_child_process([Indictor|_] = ChildPidOrSpecs) -> case erlang:is_pid(Indictor) of true -> ChildPidOrSpecs; _ -> do_start_child_process(ChildPidOrSpecs) end. do_start_child_process(ChildSpecs) when is_list(ChildSpecs) -> lists:map(fun do_start_child_process/1, ChildSpecs); do_start_child_process(_ChildSpec = #{start := {M, F, A}}) -> case erlang:apply(M, F, A) of {ok, Pid} -> Pid; {error, Reason} -> throw({start_child_process, Reason}) end.