Merge pull request #8207 from HJianBo/gw-fixes

Gateway restart won't make built-in-database data lost
This commit is contained in:
JianBo He 2022-06-15 17:41:13 +08:00 committed by GitHub
commit 57e57205a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 136 additions and 70 deletions

View File

@ -88,7 +88,10 @@ fields(sentinel) ->
required => true, required => true,
desc => ?DESC("sentinel") desc => ?DESC("sentinel")
}}, }},
{sentinel, #{type => string(), desc => ?DESC("sentinel_desc")}} {sentinel, #{
type => string(),
desc => ?DESC("sentinel_desc")
}}
] ++ ] ++
redis_fields() ++ redis_fields() ++
emqx_connector_schema_lib:ssl_fields(). emqx_connector_schema_lib:ssl_fields().

View File

@ -292,12 +292,20 @@ do_listeners_cluster_status(Listeners) ->
fun({Id, ListenOn}, Acc) -> fun({Id, ListenOn}, Acc) ->
BinId = erlang:atom_to_binary(Id), BinId = erlang:atom_to_binary(Id),
{ok, #{<<"max_connections">> := Max}} = emqx_gateway_conf:listener(BinId), {ok, #{<<"max_connections">> := Max}} = emqx_gateway_conf:listener(BinId),
Curr = esockd:get_current_connections({Id, ListenOn}), Curr =
try esockd:get_current_connections({Id, ListenOn}) of
Int -> Int
catch
%% not started
error:not_found ->
0
end,
Acc#{ Acc#{
Id => #{ Id => #{
node => Node, node => Node,
current_connections => Curr, current_connections => Curr,
max_connections => Max %% XXX: Since it is taken from raw-conf, it is possible a string
max_connections => int(Max)
} }
} }
end, end,
@ -305,6 +313,11 @@ do_listeners_cluster_status(Listeners) ->
Listeners Listeners
). ).
int(B) when is_binary(B) ->
binary_to_integer(B);
int(I) when is_integer(I) ->
I.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Swagger defines %% Swagger defines
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -45,7 +45,7 @@
name :: gateway_name(), name :: gateway_name(),
config :: emqx_config:config(), config :: emqx_config:config(),
ctx :: emqx_gateway_ctx:context(), ctx :: emqx_gateway_ctx:context(),
authns :: [emqx_authentication:chain_name()], authns :: [{emqx_authentication:chain_name(), map()}],
status :: stopped | running, status :: stopped | running,
child_pids :: [pid()], child_pids :: [pid()],
gw_state :: emqx_gateway_impl:state() | undefined, gw_state :: emqx_gateway_impl:state() | undefined,
@ -115,11 +115,11 @@ init([Gateway, Ctx, _GwDscrptr]) ->
}), }),
{ok, State}; {ok, State};
true -> true ->
case cb_gateway_load(State) of case cb_gateway_load(ensure_authn_created(State)) of
{error, Reason} -> {error, Reason} ->
{stop, Reason}; {stop, Reason};
{ok, NState} -> {ok, NState1} ->
{ok, NState} {ok, NState1}
end end
end. end.
@ -130,7 +130,7 @@ handle_call(disable, _From, State = #state{status = Status}) ->
running -> running ->
case cb_gateway_unload(State) of case cb_gateway_unload(State) of
{ok, NState} -> {ok, NState} ->
{reply, ok, NState}; {reply, ok, disable_authns(NState)};
{error, Reason} -> {error, Reason} ->
{reply, {error, Reason}, State} {reply, {error, Reason}, State}
end; end;
@ -140,11 +140,11 @@ handle_call(disable, _From, State = #state{status = Status}) ->
handle_call(enable, _From, State = #state{status = Status}) -> handle_call(enable, _From, State = #state{status = Status}) ->
case Status of case Status of
stopped -> stopped ->
case cb_gateway_load(State) of case cb_gateway_load(ensure_authn_running(State)) of
{error, Reason} -> {error, Reason} ->
{reply, {error, Reason}, State}; {reply, {error, Reason}, State};
{ok, NState} -> {ok, NState1} ->
{reply, ok, NState} {reply, ok, NState1}
end; end;
_ -> _ ->
{reply, {error, already_started}, State} {reply, {error, already_started}, State}
@ -173,14 +173,14 @@ handle_info(
) -> ) ->
case lists:member(Pid, Pids) of case lists:member(Pid, Pids) of
true -> true ->
?SLOG(error, #{ ?SLOG(info, #{
msg => "child_process_exited", msg => "child_process_exited",
child => Pid, child => Pid,
reason => Reason reason => Reason
}), }),
case Pids -- [Pid] of case Pids -- [Pid] of
[] -> [] ->
?SLOG(error, #{ ?SLOG(info, #{
msg => "gateway_all_children_process_existed", msg => "gateway_all_children_process_existed",
gateway_name => Name gateway_name => Name
}), }),
@ -193,7 +193,7 @@ handle_info(
{noreply, State#state{child_pids = RemainPids}} {noreply, State#state{child_pids = RemainPids}}
end; end;
_ -> _ ->
?SLOG(error, #{ ?SLOG(info, #{
msg => "gateway_catch_a_unknown_process_exited", msg => "gateway_catch_a_unknown_process_exited",
child => Pid, child => Pid,
reason => Reason, reason => Reason,
@ -233,48 +233,91 @@ detailed_gateway_info(State) ->
%% Internal funcs %% Internal funcs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% Authn resources managing funcs
%% ensure authentication chain, authenticator created and keep its status
%% as expected
ensure_authn_created(State = #state{ctx = Ctx, name = GwName, config = Config}) ->
Authns = init_authn(GwName, Config),
AuthnNames = lists:map(fun({ChainName, _}) -> ChainName end, Authns),
State#state{authns = Authns, ctx = maps:put(auth, AuthnNames, Ctx)}.
%% temporarily disable authenticators after gateway disabled
disable_authns(State = #state{ctx = Ctx, authns = Authns}) ->
lists:foreach(
fun({ChainName, AuthConf}) ->
TempConf = maps:put(enable, false, AuthConf),
do_update_authenticator(ChainName, TempConf)
end,
Authns
),
State#state{ctx = maps:remove(auth, Ctx)}.
%% keep authenticators running as expected
ensure_authn_running(State = #state{ctx = Ctx, authns = Authns}) ->
AuthnNames = lists:map(
fun({ChainName, AuthConf}) ->
ok = do_update_authenticator(ChainName, AuthConf),
ChainName
end,
Authns
),
State#state{ctx = maps:put(auth, AuthnNames, Ctx)}.
do_update_authenticator({ChainName, Confs}) ->
do_update_authenticator(ChainName, Confs).
do_update_authenticator(ChainName, Confs) ->
{ok, [#{id := AuthenticatorId}]} = emqx_authentication:list_authenticators(ChainName),
{ok, _} = emqx_authentication:update_authenticator(ChainName, AuthenticatorId, Confs),
ok.
%% There are two layer authentication configs %% There are two layer authentication configs
%% stomp.authn %% stomp.authn
%% / \ %% / \
%% listeners.tcp.default.authn *.ssl.default.authn %% listeners.tcp.default.authn *.ssl.default.authn
%% %%
init_authn(GwName, Config) -> init_authn(GwName, Config) ->
Authns = authns(GwName, Config), Authns = authns(GwName, Config),
try try
do_init_authn(Authns, []) ok = do_init_authn(Authns),
Authns
catch catch
throw:Reason = {badauth, _} -> throw:Reason = {badauth, _} ->
do_deinit_authn(proplists:get_keys(Authns)), do_deinit_authn(Authns),
throw(Reason) throw(Reason)
end. end.
do_init_authn([], Names) -> do_init_authn([]) ->
Names; ok;
do_init_authn([{_ChainName, _AuthConf = #{enable := false}} | More], Names) -> do_init_authn([{ChainName, AuthConf} | More]) when is_map(AuthConf) ->
do_init_authn(More, Names); ok = do_create_authn_chain(ChainName, AuthConf),
do_init_authn([{ChainName, AuthConf} | More], Names) when is_map(AuthConf) -> do_init_authn(More).
_ = application:ensure_all_started(emqx_authn),
do_create_authn_chain(ChainName, AuthConf),
do_init_authn(More, [ChainName | Names]);
do_init_authn([_BadConf | More], Names) ->
do_init_authn(More, Names).
authns(GwName, Config) -> authns(GwName, Config) ->
Listeners = maps:to_list(maps:get(listeners, Config, #{})), Listeners = maps:to_list(maps:get(listeners, Config, #{})),
lists:append( Authns0 =
[ lists:append(
[ [
{emqx_gateway_utils:listener_chain(GwName, LisType, LisName), authn_conf(Opts)} [
|| {LisName, Opts} <- maps:to_list(LisNames) {emqx_gateway_utils:listener_chain(GwName, LisType, LisName), authn_conf(Opts)}
|| {LisName, Opts} <- maps:to_list(LisNames)
]
|| {LisType, LisNames} <- Listeners
] ]
|| {LisType, LisNames} <- Listeners ) ++
] [{emqx_gateway_utils:global_chain(GwName), authn_conf(Config)}],
) ++ lists:filter(
[{emqx_gateway_utils:global_chain(GwName), authn_conf(Config)}]. fun
({_, undefined}) -> false;
(_) -> true
end,
Authns0
).
authn_conf(Conf) -> authn_conf(Conf) ->
maps:get(authentication, Conf, #{enable => false}). maps:get(authentication, Conf, undefined).
do_create_authn_chain(ChainName, AuthConf) -> do_create_authn_chain(ChainName, AuthConf) ->
case emqx_authentication:create_authenticator(ChainName, AuthConf) of case emqx_authentication:create_authenticator(ChainName, AuthConf) of
@ -290,9 +333,9 @@ do_create_authn_chain(ChainName, AuthConf) ->
throw({badauth, Reason}) throw({badauth, Reason})
end. end.
do_deinit_authn(Names) -> do_deinit_authn(Authns) ->
lists:foreach( lists:foreach(
fun(ChainName) -> fun({ChainName, _}) ->
case emqx_authentication:delete_chain(ChainName) of case emqx_authentication:delete_chain(ChainName) of
ok -> ok ->
ok; ok;
@ -306,7 +349,7 @@ do_deinit_authn(Names) ->
}) })
end end
end, end,
Names Authns
). ).
do_update_one_by_one( do_update_one_by_one(
@ -319,37 +362,57 @@ do_update_one_by_one(
) -> ) ->
NEnable = maps:get(enable, NCfg, true), NEnable = maps:get(enable, NCfg, true),
OAuths = authns(GwName, OCfg), OAuthns = authns(GwName, OCfg),
NAuths = authns(GwName, NCfg), NAuthns = authns(GwName, NCfg),
case {Status, NEnable} of case {Status, NEnable} of
{stopped, true} -> {stopped, true} ->
NState = State#state{config = NCfg}, NState = State#state{config = NCfg},
cb_gateway_load(NState); cb_gateway_load(ensure_authn_running(NState));
{stopped, false} -> {stopped, false} ->
{ok, State#state{config = NCfg}}; {ok, State#state{config = NCfg}};
{running, true} -> {running, true} ->
NState = {Added, Updated, Deleted} = diff_auths(NAuthns, OAuthns),
case NAuths == OAuths of _ = do_deinit_authn(Deleted),
true -> _ = do_init_authn(Added),
State; _ = lists:foreach(fun do_update_authenticator/1, Updated),
false -> NState = State#state{authns = NAuthns},
%% Reset Authentication first
_ = do_deinit_authn(State#state.authns),
AuthnNames = init_authn(State#state.name, NCfg),
State#state{authns = AuthnNames}
end,
%% TODO: minimum impact update ??? %% TODO: minimum impact update ???
cb_gateway_update(NCfg, NState); cb_gateway_update(NCfg, NState);
{running, false} -> {running, false} ->
case cb_gateway_unload(State) of case cb_gateway_unload(State) of
{ok, NState} -> {ok, NState#state{config = NCfg}}; {ok, NState} -> {ok, disable_authns(NState#state{config = NCfg})};
{error, Reason} -> {error, Reason} {error, Reason} -> {error, Reason}
end; end;
_ -> _ ->
throw(nomatch) throw(nomatch)
end. end.
diff_auths(NAuthns, OAuthns) ->
NNames = proplists:get_keys(NAuthns),
ONames = proplists:get_keys(OAuthns),
AddedNames = NNames -- ONames,
DeletedNames = ONames -- NNames,
BothNames = NNames -- AddedNames,
UpdatedNames = lists:foldl(
fun(Name, Acc) ->
case
proplists:get_value(Name, NAuthns) ==
proplists:get_value(Name, OAuthns)
of
true -> Acc;
false -> [Name | Acc]
end
end,
[],
BothNames
),
{
lists:filter(fun({Name, _}) -> lists:member(Name, AddedNames) end, NAuthns),
lists:filter(fun({Name, _}) -> lists:member(Name, UpdatedNames) end, NAuthns),
lists:filter(fun({Name, _}) -> lists:member(Name, DeletedNames) end, OAuthns)
}.
cb_gateway_unload( cb_gateway_unload(
State = #state{ State = #state{
name = GwName, name = GwName,
@ -362,7 +425,6 @@ cb_gateway_unload(
CbMod:on_gateway_unload(Gateway, GwState), CbMod:on_gateway_unload(Gateway, GwState),
{ok, State#state{ {ok, State#state{
child_pids = [], child_pids = [],
authns = [],
status = stopped, status = stopped,
gw_state = undefined, gw_state = undefined,
started_at = undefined, started_at = undefined,
@ -378,8 +440,6 @@ cb_gateway_unload(
stacktrace => Stk stacktrace => Stk
}), }),
{error, Reason} {error, Reason}
after
_ = do_deinit_authn(State#state.authns)
end. end.
%% @doc 1. Create Authentcation Context %% @doc 1. Create Authentcation Context
@ -389,24 +449,19 @@ cb_gateway_unload(
cb_gateway_load( cb_gateway_load(
State = #state{ State = #state{
name = GwName, name = GwName,
config = Config,
ctx = Ctx ctx = Ctx
} }
) -> ) ->
Gateway = detailed_gateway_info(State), Gateway = detailed_gateway_info(State),
try try
AuthnNames = init_authn(GwName, Config),
NCtx = Ctx#{auth => AuthnNames},
#{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName), #{cbkmod := CbMod} = emqx_gateway_registry:lookup(GwName),
case CbMod:on_gateway_load(Gateway, NCtx) of case CbMod:on_gateway_load(Gateway, Ctx) of
{error, Reason} -> {error, Reason} ->
do_deinit_authn(AuthnNames),
{error, Reason}; {error, Reason};
{ok, ChildPidOrSpecs, GwState} -> {ok, ChildPidOrSpecs, GwState} ->
ChildPids = start_child_process(ChildPidOrSpecs), ChildPids = start_child_process(ChildPidOrSpecs),
{ok, State#state{ {ok, State#state{
ctx = NCtx, ctx = Ctx,
authns = AuthnNames,
status = running, status = running,
child_pids = ChildPids, child_pids = ChildPids,
gw_state = GwState, gw_state = GwState,

View File

@ -72,7 +72,6 @@ init_per_group(GrpName, Cfg) ->
put(grpname, GrpName), put(grpname, GrpName),
Svrs = emqx_exproto_echo_svr:start(), Svrs = emqx_exproto_echo_svr:start(),
emqx_common_test_helpers:start_apps([emqx_gateway], fun set_special_cfg/1), emqx_common_test_helpers:start_apps([emqx_gateway], fun set_special_cfg/1),
emqx_logger:set_log_level(debug),
[{servers, Svrs}, {listener_type, GrpName} | Cfg]. [{servers, Svrs}, {listener_type, GrpName} | Cfg].
end_per_group(_, Cfg) -> end_per_group(_, Cfg) ->

View File

@ -34,7 +34,7 @@ all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Conf) -> init_per_suite(Conf) ->
emqx_config:erase(gateway), emqx_config:erase(gateway),
emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT), emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT),
emqx_common_test_helpers:start_apps([emqx_gateway]), emqx_common_test_helpers:start_apps([emqx_authn, emqx_gateway]),
Conf. Conf.
end_per_suite(_Conf) -> end_per_suite(_Conf) ->
@ -44,7 +44,7 @@ end_per_suite(_Conf) ->
init_per_testcase(t_get_basic_usage_info_2, Config) -> init_per_testcase(t_get_basic_usage_info_2, Config) ->
DataDir = ?config(data_dir, Config), DataDir = ?config(data_dir, Config),
emqx_common_test_helpers:stop_apps([emqx_gateway]), application:stop(emqx_gateway),
ok = setup_fake_usage_data(DataDir), ok = setup_fake_usage_data(DataDir),
Config; Config;
init_per_testcase(_TestCase, Config) -> init_per_testcase(_TestCase, Config) ->

View File

@ -1914,8 +1914,6 @@ t_register_subs_resume_on(_) ->
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-b">>, <<"m2">>)), _ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-b">>, <<"m2">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)), _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)),
emqx_logger:set_log_level(debug),
{ok, NSocket} = gen_udp:open(0, [binary]), {ok, NSocket} = gen_udp:open(0, [binary]),
send_connect_msg(NSocket, <<"test">>, 0), send_connect_msg(NSocket, <<"test">>, 0),
?assertMatch( ?assertMatch(
@ -2088,8 +2086,6 @@ t_register_skip_failure_topic_name_and_reach_max_retry_times(_) ->
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket), gen_udp:close(Socket),
emqx_logger:set_log_level(debug),
{ok, NSocket} = gen_udp:open(0, [binary]), {ok, NSocket} = gen_udp:open(0, [binary]),
send_connect_msg(NSocket, <<"test">>, 0), send_connect_msg(NSocket, <<"test">>, 0),
?assertMatch( ?assertMatch(