fix(sso): refactor backen update logic

1. valid config always can update successfully
2. the `running` endpoint only return successfully created backend
3. enhancement of the `/sso` endpoint, and will check is the resource online
This commit is contained in:
firest 2023-09-27 20:53:10 +08:00
parent b2699c687b
commit 08ad09a68f
5 changed files with 212 additions and 172 deletions

View File

@ -26,7 +26,9 @@
backend => atom(), backend => atom(),
atom() => term() atom() => term()
}. }.
-type state() :: #{atom() => term()}.
%% Note: if a backend has a resource, it must be stored in the state and named resource_id
-type state() :: #{resource_id => binary(), atom() => term()}.
-type raw_config() :: #{binary() => term()}. -type raw_config() :: #{binary() => term()}.
-type config() :: parsed_config() | raw_config(). -type config() :: parsed_config() | raw_config().
-type hocon_ref() :: ?R_REF(Module :: atom(), Name :: atom() | binary()). -type hocon_ref() :: ?R_REF(Module :: atom(), Name :: atom() | binary()).

View File

@ -133,24 +133,28 @@ schema("/sso/:backend") ->
}. }.
fields(backend_status) -> fields(backend_status) ->
emqx_dashboard_sso_schema:common_backend_schema(emqx_dashboard_sso:types()). emqx_dashboard_sso_schema:common_backend_schema(emqx_dashboard_sso:types()) ++
[
{running,
mk(
boolean(), #{
desc => ?DESC(running)
}
)},
{last_error,
mk(
binary(), #{
desc => ?DESC(last_error)
}
)}
].
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
running(get, _Request) -> running(get, _Request) ->
SSO = emqx:get_config(?MOD_KEY_PATH, #{}), {200, emqx_dashboard_sso_manager:running()}.
{200,
lists:filtermap(
fun
(#{backend := Backend, enable := true}) ->
{true, Backend};
(_) ->
false
end,
maps:values(SSO)
)}.
login(post, #{bindings := #{backend := Backend}, body := Body} = Request) -> login(post, #{bindings := #{backend := Backend}, body := Body} = Request) ->
case emqx_dashboard_sso_manager:lookup_state(Backend) of case emqx_dashboard_sso_manager:lookup_state(Backend) of
@ -185,8 +189,12 @@ sso(get, _Request) ->
SSO = emqx:get_config(?MOD_KEY_PATH, #{}), SSO = emqx:get_config(?MOD_KEY_PATH, #{}),
{200, {200,
lists:map( lists:map(
fun(Backend) -> fun(#{backend := Backend, enable := Enable}) ->
maps:with([backend, enable], Backend) Status = emqx_dashboard_sso_manager:get_backend_status(Backend, Enable),
Status#{
backend => Backend,
enable => Enable
}
end, end,
maps:values(SSO) maps:values(SSO)
)}. )}.

View File

@ -24,11 +24,11 @@
-export([ -export([
running/0, running/0,
get_backend_status/2,
lookup_state/1, lookup_state/1,
make_resource_id/1, make_resource_id/1,
create_resource/3, create_resource/3,
update_resource/3, update_resource/3
call/1
]). ]).
-export([ -export([
@ -39,20 +39,21 @@
propagated_post_config_update/5 propagated_post_config_update/5
]). ]).
-import(emqx_dashboard_sso, [provider/1]). -import(emqx_dashboard_sso, [provider/1, format/1]).
-define(MOD_TAB, emqx_dashboard_sso). -define(MOD_TAB, emqx_dashboard_sso).
-define(MOD_KEY_PATH, [dashboard, sso]). -define(MOD_KEY_PATH, [dashboard, sso]).
-define(CALL_TIMEOUT, timer:seconds(15)).
-define(MOD_KEY_PATH(Sub), [dashboard, sso, Sub]). -define(MOD_KEY_PATH(Sub), [dashboard, sso, Sub]).
-define(RESOURCE_GROUP, <<"emqx_dashboard_sso">>). -define(RESOURCE_GROUP, <<"emqx_dashboard_sso">>).
-define(START_ERROR_KEY, start_error).
-define(DEFAULT_RESOURCE_OPTS, #{ -define(DEFAULT_RESOURCE_OPTS, #{
start_after_created => false start_after_created => false
}). }).
-record(?MOD_TAB, { -record(?MOD_TAB, {
backend :: atom(), backend :: atom(),
state :: map() state :: map(),
last_error = <<>> :: term()
}). }).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -62,17 +63,36 @@ start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
running() -> running() ->
maps:fold( lists:filtermap(
fun fun
(Type, #{enable := true}, Acc) -> (#?MOD_TAB{backend = Backend, last_error = <<>>}) ->
[Type | Acc]; {true, Backend};
(_Type, _Cfg, Acc) -> (_) ->
Acc false
end, end,
[], ets:tab2list(?MOD_TAB)
emqx:get_config(?MOD_KEY_PATH)
). ).
get_backend_status(Backend, false) ->
#{
backend => Backend,
enable => false,
running => false,
last_error => <<>>
};
get_backend_status(Backend, _) ->
case lookup(Backend) of
undefined ->
#{
backend => Backend,
enable => true,
running => false,
last_error => <<"resource not found">>
};
Data ->
maps:merge(#{backend => Backend, enable => true}, do_get_backend_status(Data))
end.
update(Backend, Config) -> update(Backend, Config) ->
update_config(Backend, {?FUNCTION_NAME, Backend, Config}). update_config(Backend, {?FUNCTION_NAME, Backend, Config}).
delete(Backend) -> delete(Backend) ->
@ -106,14 +126,6 @@ update_resource(ResourceId, Module, Config) ->
), ),
start_resource_if_enabled(ResourceId, Result, Config). start_resource_if_enabled(ResourceId, Result, Config).
call(Req) ->
try
gen_server:call(?MODULE, Req, ?CALL_TIMEOUT)
catch
exit:{timeout, _} ->
{error, <<"Update backend failed: timeout">>}
end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% gen_server callbacks %% gen_server callbacks
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -133,9 +145,6 @@ init([]) ->
start_backend_services(), start_backend_services(),
{ok, #{}}. {ok, #{}}.
handle_call({update_config, Req, NewConf, OldConf}, _From, State) ->
Result = on_config_update(Req, NewConf, OldConf),
{reply, Result, State};
handle_call(_Request, _From, State) -> handle_call(_Request, _From, State) ->
Reply = ok, Reply = ok,
{reply, Reply, State}. {reply, Reply, State}.
@ -177,35 +186,47 @@ start_backend_services() ->
backend => Backend, backend => Backend,
reason => emqx_utils:redact(Reason) reason => emqx_utils:redact(Reason)
}) })
end end,
record_start_error(Backend, false)
end, end,
maps:to_list(Backends) maps:to_list(Backends)
). ).
update_config(Backend, UpdateReq) -> update_config(Backend, UpdateReq) ->
OkFun = fun(Result) ->
?SLOG(info, #{
msg => "update_sso_successfully",
backend => Backend,
result => emqx_utils:redact(Result)
}),
Result
end,
ErrFun = fun({error, Reason} = Error) ->
SafeReason = emqx_utils:redact(Reason),
?SLOG(error, #{
msg => "update_sso_failed",
backend => Backend,
reason => SafeReason
}),
Error
end,
%% we always make sure the valid configuration will update successfully,
%% ignore the runtime error during its update
case emqx_conf:update(?MOD_KEY_PATH(Backend), UpdateReq, #{override_to => cluster}) of case emqx_conf:update(?MOD_KEY_PATH(Backend), UpdateReq, #{override_to => cluster}) of
{ok, UpdateResult} -> {ok, UpdateResult} ->
#{post_config_update := #{?MODULE := Result}} = UpdateResult, #{post_config_update := #{?MODULE := Result}} = UpdateResult,
?SLOG(info, #{ case Result of
msg => "update_sso_successfully", ok ->
backend => Backend, OkFun(Result);
result => emqx_utils:redact(Result) {ok, _} ->
}), OkFun(Result);
Result; {error, _} = Error ->
{error, Reason} -> ErrFun(Error)
SafeReason = emqx_utils:redact(Reason), end;
?SLOG(error, #{ {error, _} = Error ->
msg => "update_sso_failed", ErrFun(Error)
backend => Backend,
reason => SafeReason
}),
{error,
case SafeReason of
{_Stage, _Mod, Reason2} ->
Reason2;
_ ->
Reason
end}
end. end.
pre_config_update(_, {update, _Backend, Config}, _OldConf) -> pre_config_update(_, {update, _Backend, Config}, _OldConf) ->
@ -215,8 +236,8 @@ pre_config_update(_, {delete, _Backend}, undefined) ->
pre_config_update(_, {delete, _Backend}, _OldConf) -> pre_config_update(_, {delete, _Backend}, _OldConf) ->
{ok, null}. {ok, null}.
post_config_update(_, UpdateReq, NewConf, OldConf, _AppEnvs) -> post_config_update(_, UpdateReq, NewConf, _OldConf, _AppEnvs) ->
call({update_config, UpdateReq, NewConf, OldConf}). {ok, on_config_update(UpdateReq, NewConf)}.
propagated_post_config_update( propagated_post_config_update(
?MOD_KEY_PATH(BackendBin) = Path, _UpdateReq, undefined, OldConf, AppEnvs ?MOD_KEY_PATH(BackendBin) = Path, _UpdateReq, undefined, OldConf, AppEnvs
@ -237,44 +258,37 @@ propagated_post_config_update(
Error Error
end. end.
on_config_update({update, Backend, _RawConfig}, Config, OldConfig) -> on_config_update({update, Backend, _RawConfig}, Config) ->
Provider = provider(Backend), Provider = provider(Backend),
case lookup(Backend) of case lookup(Backend) of
undefined -> undefined ->
on_backend_updated( on_backend_updated(
Backend,
emqx_dashboard_sso:create(Provider, Config), emqx_dashboard_sso:create(Provider, Config),
fun(State) -> fun(State, LastError) ->
ets:insert(?MOD_TAB, #?MOD_TAB{backend = Backend, state = State}) ets:insert(
?MOD_TAB,
#?MOD_TAB{backend = Backend, state = State, last_error = LastError}
)
end end
); );
Data -> Data ->
%% the steps for updating/recreating a resource are:
%% 1. destroy the old resource
%% 2. create a new resource
%% to keep consistency we need to follow those steps too,
%% however a failed update will not change the config, but will lose the resource
%% hence for consistency and atomicity, we should rollback when the update fails
ets:delete(?MOD_TAB, Backend),
UpdateState = fun(State) ->
ets:insert(?MOD_TAB, Data#?MOD_TAB{state = State})
end,
on_backend_updated( on_backend_updated(
Backend,
emqx_dashboard_sso:update(Provider, Config, Data#?MOD_TAB.state), emqx_dashboard_sso:update(Provider, Config, Data#?MOD_TAB.state),
UpdateState, fun(State, LastError) ->
rollback( ets:insert(?MOD_TAB, Data#?MOD_TAB{state = State, last_error = LastError})
Backend, end
OldConfig,
UpdateState
)
) )
end; end;
on_config_update({delete, Backend}, _NewConf, _OldConf) -> on_config_update({delete, Backend}, _NewConf) ->
case lookup(Backend) of case lookup(Backend) of
undefined -> undefined ->
{error, not_exists}; {error, not_exists};
Data -> Data ->
Provider = provider(Backend), Provider = provider(Backend),
on_backend_updated( on_backend_updated(
Backend,
emqx_dashboard_sso:destroy(Provider, Data#?MOD_TAB.state), emqx_dashboard_sso:destroy(Provider, Data#?MOD_TAB.state),
fun() -> fun() ->
ets:delete(?MOD_TAB, Backend) ets:delete(?MOD_TAB, Backend)
@ -290,45 +304,39 @@ lookup(Backend) ->
undefined undefined
end. end.
%% to avoid resource leakage the resource start will never affect the update result,
%% so the resource_id will always be recorded
start_resource_if_enabled(ResourceId, {ok, _} = Result, #{enable := true}) -> start_resource_if_enabled(ResourceId, {ok, _} = Result, #{enable := true}) ->
clear_start_error(),
case emqx_resource:start(ResourceId) of case emqx_resource:start(ResourceId) of
ok -> ok ->
Result; ok;
{error, Reason} -> {error, Reason} ->
SafeReason = emqx_utils:redact(Reason), SafeReason = emqx_utils:redact(Reason),
mark_start_error(SafeReason),
?SLOG(error, #{ ?SLOG(error, #{
msg => "start_backend_failed", msg => "start_backend_failed",
resource_id => ResourceId, resource_id => ResourceId,
reason => SafeReason reason => SafeReason
}), }),
clean_when_start_failed(ResourceId), ok
{error, emqx_dashboard_sso:format(["Start backend failed, Reason: ", SafeReason])} end,
end; Result;
start_resource_if_enabled(_ResourceId, Result, _Config) -> start_resource_if_enabled(_ResourceId, Result, _Config) ->
Result. Result.
%% ensure the backend creation is atomic, clean the corresponding resource when necessary, on_backend_updated(Backend, {ok, State} = Ok, Fun) ->
%% when creating a backend fails, nothing will be inserted into the SSO table, Fun(State, <<>>),
%% thus the resources created by backend will leakage. record_start_error(Backend, true),
%% Although we can treat start failure as successful, Ok;
%% and insert the resource data into the SSO table, on_backend_updated(_Backend, {ok, State, LastError} = Ok, Fun) ->
%% it may be strange for users: it succeeds, but can't be used. Fun(State, LastError),
clean_when_start_failed(ResourceId) -> Ok;
_ = emqx_resource:remove_local(ResourceId), on_backend_updated(_Backend, ok, Fun) ->
ok.
on_backend_updated(Result, OkFun) ->
on_backend_updated(Result, OkFun, undefined).
%% this first level `ok` is for emqx_config_handler, and the second level is for the caller
on_backend_updated({ok, State} = Ok, Fun, _ErrFun) ->
Fun(State),
{ok, Ok};
on_backend_updated(ok, Fun, _ErrFun) ->
Fun(), Fun(),
{ok, ok}; ok;
on_backend_updated(Error, _, ErrFun) -> on_backend_updated(Backend, {error, Reason} = Error, _) ->
erlang:is_function(ErrFun) andalso ErrFun(Error), update_last_error(Backend, Reason),
Error. Error.
bin(A) when is_atom(A) -> atom_to_binary(A, utf8); bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
@ -365,31 +373,44 @@ new_ssl_source(Source, undefined) ->
new_ssl_source(Source, SSL) -> new_ssl_source(Source, SSL) ->
Source#{<<"ssl">> => SSL}. Source#{<<"ssl">> => SSL}.
rollback(Backend, OldConf, OnSucc) -> clear_start_error() ->
fun(_) -> mark_start_error(<<>>).
try_recreate(Backend, OldConf, OnSucc)
mark_start_error(Reason) ->
erlang:put(?START_ERROR_KEY, Reason).
record_start_error(Backend, Force) ->
case erlang:get(?START_ERROR_KEY) of
<<>> when Force ->
update_last_error(Backend, <<>>);
<<>> ->
ok;
Reason ->
update_last_error(Backend, Reason)
end. end.
try_recreate(_Backend, undefined, _OnSucc) -> update_last_error(Backend, Reason) ->
ok; ets:update_element(?MOD_TAB, Backend, {#?MOD_TAB.last_error, Reason}).
try_recreate(_Backend, #{enable := false}, _OnSucc) ->
ok; do_get_backend_status(#?MOD_TAB{state = #{resource_id := ResourceId}}) ->
try_recreate(Backend, Config, OnSucc) -> case emqx_resource_manager:lookup(ResourceId) of
Provider = provider(Backend), {ok, _Group, #{status := connected}} ->
?SLOG(info, #{ #{running => true, last_error => <<>>};
msg => "backend_rollback", {ok, _Group, #{status := Status}} ->
backend => Backend, #{
reason => "update_sso_failed", running => false,
config => emqx_utils:redact(Config) last_error => format([<<"Resource not valid, status: ">>, Status])
}), };
on_backend_updated( {error, not_found} ->
emqx_dashboard_sso:create(Provider, Config), #{
OnSucc, running => false,
fun(Error) -> last_error => <<"Resource not found">>
?SLOG(error, #{ }
msg => "backend_rollback_failed", end;
backend => Backend, do_get_backend_status(#?MOD_TAB{last_error = <<>>}) ->
reason => emqx_utils:redact(Error) #{running => true, last_error => <<>>};
}) do_get_backend_status(#?MOD_TAB{last_error = LastError}) ->
end #{
). running => false,
last_error => format([LastError])
}.

View File

@ -24,14 +24,14 @@
all() -> all() ->
[ [
t_create_atomicly, t_bad_create,
t_create, t_create,
t_update, t_update,
t_update_atomicly,
t_get, t_get,
t_login_with_bad, t_login_with_bad,
t_first_login, t_first_login,
t_next_login, t_next_login,
t_bad_update,
t_delete t_delete
]. ].
@ -61,10 +61,10 @@ end_per_testcase(Case, _) ->
end, end,
ok. ok.
t_create_atomicly(_) -> t_bad_create(_) ->
Path = uri(["sso", "ldap"]), Path = uri(["sso", "ldap"]),
?assertMatch( ?assertMatch(
{ok, 400, _}, {ok, 200, _},
request( request(
put, put,
Path, Path,
@ -75,12 +75,18 @@ t_create_atomicly(_) ->
}) })
) )
), ),
?assertEqual(undefined, emqx:get_config(?MOD_KEY_PATH, undefined)), ?assertMatch(#{backend := ldap}, emqx:get_config(?MOD_KEY_PATH, undefined)),
?assertEqual([], ets:tab2list(?MOD_TAB)), check_running([]),
?assertMatch(
[#{backend := <<"ldap">>, enable := true, running := false, last_error := _}], get_sso()
),
emqx_dashboard_sso_manager:delete(ldap),
?retry( ?retry(
_Interval = 1000, _Interval = 500,
_NAttempts = 5, _NAttempts = 10,
?assertEqual([], emqx_resource_manager:list_group(?RESOURCE_GROUP)) ?assertMatch([], emqx_resource_manager:list_group(?RESOURCE_GROUP))
), ),
ok. ok.
@ -112,37 +118,6 @@ t_update(_) ->
?assertNotEqual(undefined, emqx_dashboard_sso_manager:lookup_state(ldap)), ?assertNotEqual(undefined, emqx_dashboard_sso_manager:lookup_state(ldap)),
ok. ok.
%% update fails can rollback able
t_update_atomicly(_) ->
CurrRes = emqx_resource_manager:list_group(?RESOURCE_GROUP),
Path = uri(["sso", "ldap"]),
?assertMatch(
{ok, 400, _},
request(
put,
Path,
ldap_config(#{
<<"username">> => <<"invalid">>,
<<"enable">> => true,
<<"request_timeout">> => <<"1s">>
})
)
),
?assertMatch(#{backend := ldap}, emqx:get_config(?MOD_KEY_PATH, undefined)),
?assertMatch([_], ets:tab2list(?MOD_TAB)),
?retry(
_Interval = 5,
_NAttempts = 1000,
begin
Res = emqx_resource_manager:list_group(?RESOURCE_GROUP),
?assertMatch([_], Res),
?assertNotMatch(CurrRes, Res)
end
),
ok.
t_get(_) -> t_get(_) ->
Path = uri(["sso", "ldap"]), Path = uri(["sso", "ldap"]),
{ok, 200, Result} = request(get, Path), {ok, 200, Result} = request(get, Path),
@ -190,6 +165,28 @@ t_next_login(_) ->
?assertMatch(#{license := _, token := _}, decode_json(Result)), ?assertMatch(#{license := _, token := _}, decode_json(Result)),
ok. ok.
t_bad_update(_) ->
Path = uri(["sso", "ldap"]),
?assertMatch(
{ok, 200, _},
request(
put,
Path,
ldap_config(#{
<<"username">> => <<"invalid">>,
<<"enable">> => true,
<<"request_timeout">> => <<"1s">>
})
)
),
?assertMatch(#{backend := ldap}, emqx:get_config(?MOD_KEY_PATH, undefined)),
check_running([]),
?assertMatch(
[#{backend := <<"ldap">>, enable := true, running := false, last_error := _}], get_sso()
),
ok.
t_delete(_) -> t_delete(_) ->
Path = uri(["sso", "ldap"]), Path = uri(["sso", "ldap"]),
?assertMatch({ok, 204, _}, request(delete, Path)), ?assertMatch({ok, 204, _}, request(delete, Path)),

View File

@ -51,4 +51,16 @@ backend_name.desc:
backend_name.label: backend_name.label:
"""Backend Name""" """Backend Name"""
running.desc:
"""Is the backend running"""
running.label:
"""Running"""
last_error.desc:
"""Last error of this backend"""
last_error.label:
"""Last Error"""
} }