diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl index b8350dfd1..0de7fc312 100644 --- a/apps/emqx_connector/src/emqx_connector_redis.erl +++ b/apps/emqx_connector/src/emqx_connector_redis.erl @@ -88,7 +88,10 @@ fields(sentinel) -> required => true, desc => ?DESC("sentinel") }}, - {sentinel, #{type => string(), desc => ?DESC("sentinel_desc")}} + {sentinel, #{ + type => string(), + desc => ?DESC("sentinel_desc") + }} ] ++ redis_fields() ++ emqx_connector_schema_lib:ssl_fields(). diff --git a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl index 1e851d1c4..0f0ec8606 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_listeners.erl @@ -292,12 +292,20 @@ do_listeners_cluster_status(Listeners) -> fun({Id, ListenOn}, Acc) -> BinId = erlang:atom_to_binary(Id), {ok, #{<<"max_connections">> := Max}} = emqx_gateway_conf:listener(BinId), - Curr = esockd:get_current_connections({Id, ListenOn}), + Curr = + try esockd:get_current_connections({Id, ListenOn}) of + Int -> Int + catch + %% not started + error:not_found -> + 0 + end, Acc#{ Id => #{ node => Node, current_connections => Curr, - max_connections => Max + %% XXX: Since it is taken from raw-conf, it is possible a string + max_connections => int(Max) } } end, @@ -305,6 +313,11 @@ do_listeners_cluster_status(Listeners) -> Listeners ). +int(B) when is_binary(B) -> + binary_to_integer(B); +int(I) when is_integer(I) -> + I. + %%-------------------------------------------------------------------- %% Swagger defines %%-------------------------------------------------------------------- diff --git a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl index c47b8a050..a8e2d2f40 100644 --- a/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl +++ b/apps/emqx_gateway/src/emqx_gateway_insta_sup.erl @@ -45,7 +45,7 @@ name :: gateway_name(), config :: emqx_config:config(), ctx :: emqx_gateway_ctx:context(), - authns :: [emqx_authentication:chain_name()], + authns :: [{emqx_authentication:chain_name(), map()}], status :: stopped | running, child_pids :: [pid()], gw_state :: emqx_gateway_impl:state() | undefined, @@ -115,11 +115,11 @@ init([Gateway, Ctx, _GwDscrptr]) -> }), {ok, State}; true -> - case cb_gateway_load(State) of + case cb_gateway_load(ensure_authn_created(State)) of {error, Reason} -> {stop, Reason}; - {ok, NState} -> - {ok, NState} + {ok, NState1} -> + {ok, NState1} end end. @@ -130,7 +130,7 @@ handle_call(disable, _From, State = #state{status = Status}) -> running -> case cb_gateway_unload(State) of {ok, NState} -> - {reply, ok, NState}; + {reply, ok, disable_authns(NState)}; {error, Reason} -> {reply, {error, Reason}, State} end; @@ -140,11 +140,11 @@ handle_call(disable, _From, State = #state{status = Status}) -> handle_call(enable, _From, State = #state{status = Status}) -> case Status of stopped -> - case cb_gateway_load(State) of + case cb_gateway_load(ensure_authn_running(State)) of {error, Reason} -> {reply, {error, Reason}, State}; - {ok, NState} -> - {reply, ok, NState} + {ok, NState1} -> + {reply, ok, NState1} end; _ -> {reply, {error, already_started}, State} @@ -173,14 +173,14 @@ handle_info( ) -> case lists:member(Pid, Pids) of true -> - ?SLOG(error, #{ + ?SLOG(info, #{ msg => "child_process_exited", child => Pid, reason => Reason }), case Pids -- [Pid] of [] -> - ?SLOG(error, #{ + ?SLOG(info, #{ msg => "gateway_all_children_process_existed", gateway_name => Name }), @@ -193,7 +193,7 @@ handle_info( {noreply, State#state{child_pids = RemainPids}} end; _ -> - ?SLOG(error, #{ + ?SLOG(info, #{ msg => "gateway_catch_a_unknown_process_exited", child => Pid, reason => Reason, @@ -233,48 +233,91 @@ detailed_gateway_info(State) -> %% Internal funcs %%-------------------------------------------------------------------- +%%-------------------------------------------------------------------- +%% Authn resources managing funcs + +%% ensure authentication chain, authenticator created and keep its status +%% as expected +ensure_authn_created(State = #state{ctx = Ctx, name = GwName, config = Config}) -> + Authns = init_authn(GwName, Config), + AuthnNames = lists:map(fun({ChainName, _}) -> ChainName end, Authns), + State#state{authns = Authns, ctx = maps:put(auth, AuthnNames, Ctx)}. + +%% temporarily disable authenticators after gateway disabled +disable_authns(State = #state{ctx = Ctx, authns = Authns}) -> + lists:foreach( + fun({ChainName, AuthConf}) -> + TempConf = maps:put(enable, false, AuthConf), + do_update_authenticator(ChainName, TempConf) + end, + Authns + ), + State#state{ctx = maps:remove(auth, Ctx)}. + +%% keep authenticators running as expected +ensure_authn_running(State = #state{ctx = Ctx, authns = Authns}) -> + AuthnNames = lists:map( + fun({ChainName, AuthConf}) -> + ok = do_update_authenticator(ChainName, AuthConf), + ChainName + end, + Authns + ), + State#state{ctx = maps:put(auth, AuthnNames, Ctx)}. + +do_update_authenticator({ChainName, Confs}) -> + do_update_authenticator(ChainName, Confs). + +do_update_authenticator(ChainName, Confs) -> + {ok, [#{id := AuthenticatorId}]} = emqx_authentication:list_authenticators(ChainName), + {ok, _} = emqx_authentication:update_authenticator(ChainName, AuthenticatorId, Confs), + ok. + %% There are two layer authentication configs %% stomp.authn %% / \ %% listeners.tcp.default.authn *.ssl.default.authn %% - init_authn(GwName, Config) -> Authns = authns(GwName, Config), try - do_init_authn(Authns, []) + ok = do_init_authn(Authns), + Authns catch throw:Reason = {badauth, _} -> - do_deinit_authn(proplists:get_keys(Authns)), + do_deinit_authn(Authns), throw(Reason) end. -do_init_authn([], Names) -> - Names; -do_init_authn([{_ChainName, _AuthConf = #{enable := false}} | More], Names) -> - do_init_authn(More, Names); -do_init_authn([{ChainName, AuthConf} | More], Names) when is_map(AuthConf) -> - _ = application:ensure_all_started(emqx_authn), - do_create_authn_chain(ChainName, AuthConf), - do_init_authn(More, [ChainName | Names]); -do_init_authn([_BadConf | More], Names) -> - do_init_authn(More, Names). +do_init_authn([]) -> + ok; +do_init_authn([{ChainName, AuthConf} | More]) when is_map(AuthConf) -> + ok = do_create_authn_chain(ChainName, AuthConf), + do_init_authn(More). authns(GwName, Config) -> Listeners = maps:to_list(maps:get(listeners, Config, #{})), - lists:append( - [ + Authns0 = + lists:append( [ - {emqx_gateway_utils:listener_chain(GwName, LisType, LisName), authn_conf(Opts)} - || {LisName, Opts} <- maps:to_list(LisNames) + [ + {emqx_gateway_utils:listener_chain(GwName, LisType, LisName), authn_conf(Opts)} + || {LisName, Opts} <- maps:to_list(LisNames) + ] + || {LisType, LisNames} <- Listeners ] - || {LisType, LisNames} <- Listeners - ] - ) ++ - [{emqx_gateway_utils:global_chain(GwName), authn_conf(Config)}]. + ) ++ + [{emqx_gateway_utils:global_chain(GwName), authn_conf(Config)}], + lists:filter( + fun + ({_, undefined}) -> false; + (_) -> true + end, + Authns0 + ). authn_conf(Conf) -> - maps:get(authentication, Conf, #{enable => false}). + maps:get(authentication, Conf, undefined). do_create_authn_chain(ChainName, AuthConf) -> case emqx_authentication:create_authenticator(ChainName, AuthConf) of @@ -290,9 +333,9 @@ do_create_authn_chain(ChainName, AuthConf) -> throw({badauth, Reason}) end. -do_deinit_authn(Names) -> +do_deinit_authn(Authns) -> lists:foreach( - fun(ChainName) -> + fun({ChainName, _}) -> case emqx_authentication:delete_chain(ChainName) of ok -> ok; @@ -306,7 +349,7 @@ do_deinit_authn(Names) -> }) end end, - Names + Authns ). do_update_one_by_one( @@ -319,37 +362,57 @@ do_update_one_by_one( ) -> NEnable = maps:get(enable, NCfg, true), - OAuths = authns(GwName, OCfg), - NAuths = authns(GwName, NCfg), + OAuthns = authns(GwName, OCfg), + NAuthns = authns(GwName, NCfg), case {Status, NEnable} of {stopped, true} -> NState = State#state{config = NCfg}, - cb_gateway_load(NState); + cb_gateway_load(ensure_authn_running(NState)); {stopped, false} -> {ok, State#state{config = NCfg}}; {running, true} -> - NState = - case NAuths == OAuths of - true -> - State; - false -> - %% Reset Authentication first - _ = do_deinit_authn(State#state.authns), - AuthnNames = init_authn(State#state.name, NCfg), - State#state{authns = AuthnNames} - end, + {Added, Updated, Deleted} = diff_auths(NAuthns, OAuthns), + _ = do_deinit_authn(Deleted), + _ = do_init_authn(Added), + _ = lists:foreach(fun do_update_authenticator/1, Updated), + NState = State#state{authns = NAuthns}, %% TODO: minimum impact update ??? cb_gateway_update(NCfg, NState); {running, false} -> case cb_gateway_unload(State) of - {ok, NState} -> {ok, NState#state{config = NCfg}}; + {ok, NState} -> {ok, disable_authns(NState#state{config = NCfg})}; {error, Reason} -> {error, Reason} end; _ -> throw(nomatch) end. +diff_auths(NAuthns, OAuthns) -> + NNames = proplists:get_keys(NAuthns), + ONames = proplists:get_keys(OAuthns), + AddedNames = NNames -- ONames, + DeletedNames = ONames -- NNames, + BothNames = NNames -- AddedNames, + UpdatedNames = lists:foldl( + fun(Name, Acc) -> + case + proplists:get_value(Name, NAuthns) == + proplists:get_value(Name, OAuthns) + of + true -> Acc; + false -> [Name | Acc] + end + end, + [], + BothNames + ), + { + lists:filter(fun({Name, _}) -> lists:member(Name, AddedNames) end, NAuthns), + lists:filter(fun({Name, _}) -> lists:member(Name, UpdatedNames) end, NAuthns), + lists:filter(fun({Name, _}) -> lists:member(Name, DeletedNames) end, OAuthns) + }. + cb_gateway_unload( State = #state{ name = GwName, @@ -362,7 +425,6 @@ cb_gateway_unload( CbMod:on_gateway_unload(Gateway, GwState), {ok, State#state{ child_pids = [], - authns = [], status = stopped, gw_state = undefined, started_at = undefined, @@ -378,8 +440,6 @@ cb_gateway_unload( stacktrace => Stk }), {error, Reason} - after - _ = do_deinit_authn(State#state.authns) end. %% @doc 1. Create Authentcation Context @@ -389,24 +449,19 @@ cb_gateway_unload( cb_gateway_load( State = #state{ name = GwName, - config = Config, ctx = Ctx } ) -> Gateway = detailed_gateway_info(State), try - AuthnNames = init_authn(GwName, Config), - NCtx = Ctx#{auth => AuthnNames}, #{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName), - case CbMod:on_gateway_load(Gateway, NCtx) of + case CbMod:on_gateway_load(Gateway, Ctx) of {error, Reason} -> - do_deinit_authn(AuthnNames), {error, Reason}; {ok, ChildPidOrSpecs, GwState} -> ChildPids = start_child_process(ChildPidOrSpecs), {ok, State#state{ - ctx = NCtx, - authns = AuthnNames, + ctx = Ctx, status = running, child_pids = ChildPids, gw_state = GwState, diff --git a/apps/emqx_gateway/test/emqx_exproto_SUITE.erl b/apps/emqx_gateway/test/emqx_exproto_SUITE.erl index 9e1c3e6e9..244fadd2b 100644 --- a/apps/emqx_gateway/test/emqx_exproto_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_exproto_SUITE.erl @@ -72,7 +72,6 @@ init_per_group(GrpName, Cfg) -> put(grpname, GrpName), Svrs = emqx_exproto_echo_svr:start(), emqx_common_test_helpers:start_apps([emqx_gateway], fun set_special_cfg/1), - emqx_logger:set_log_level(debug), [{servers, Svrs}, {listener_type, GrpName} | Cfg]. end_per_group(_, Cfg) -> diff --git a/apps/emqx_gateway/test/emqx_gateway_SUITE.erl b/apps/emqx_gateway/test/emqx_gateway_SUITE.erl index 99bb05983..0bed52a76 100644 --- a/apps/emqx_gateway/test/emqx_gateway_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_gateway_SUITE.erl @@ -34,7 +34,7 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Conf) -> emqx_config:erase(gateway), emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT), - emqx_common_test_helpers:start_apps([emqx_gateway]), + emqx_common_test_helpers:start_apps([emqx_authn, emqx_gateway]), Conf. end_per_suite(_Conf) -> @@ -44,7 +44,7 @@ end_per_suite(_Conf) -> init_per_testcase(t_get_basic_usage_info_2, Config) -> DataDir = ?config(data_dir, Config), - emqx_common_test_helpers:stop_apps([emqx_gateway]), + application:stop(emqx_gateway), ok = setup_fake_usage_data(DataDir), Config; init_per_testcase(_TestCase, Config) -> diff --git a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl index 41e007770..22b87f772 100644 --- a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl @@ -1914,8 +1914,6 @@ t_register_subs_resume_on(_) -> _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-b">>, <<"m2">>)), _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)), - emqx_logger:set_log_level(debug), - {ok, NSocket} = gen_udp:open(0, [binary]), send_connect_msg(NSocket, <<"test">>, 0), ?assertMatch( @@ -2088,8 +2086,6 @@ t_register_skip_failure_topic_name_and_reach_max_retry_times(_) -> ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket), - emqx_logger:set_log_level(debug), - {ok, NSocket} = gen_udp:open(0, [binary]), send_connect_msg(NSocket, <<"test">>, 0), ?assertMatch(