From 60f0e8e5a5bb305588482888c529f76f243c066e Mon Sep 17 00:00:00 2001 From: zhouzb Date: Tue, 10 Aug 2021 17:04:20 +0800 Subject: [PATCH] refactor(authn): replace mnesia with ets table --- apps/emqx_authn/src/emqx_authn.erl | 465 ++++++++++++++----------- apps/emqx_authn/src/emqx_authn_sup.erl | 9 +- 2 files changed, 273 insertions(+), 201 deletions(-) diff --git a/apps/emqx_authn/src/emqx_authn.erl b/apps/emqx_authn/src/emqx_authn.erl index 7ead53638..703b0efcf 100644 --- a/apps/emqx_authn/src/emqx_authn.erl +++ b/apps/emqx_authn/src/emqx_authn.erl @@ -16,11 +16,12 @@ -module(emqx_authn). +-behaviour(gen_server). + -behaviour(emqx_config_handler). -include("emqx_authn.hrl"). - --export([mnesia/1]). +-include_lib("emqx/include/logger.hrl"). -export([ pre_config_update/2 , post_config_update/3 @@ -34,6 +35,10 @@ -export([authenticate/2]). +-export([ start_link/0 + , stop/0 + ]). + -export([ create_chain/1 , delete_chain/1 , lookup_chain/1 @@ -55,33 +60,17 @@ , list_users/2 ]). --boot_mnesia({mnesia, [boot]}). --copy_mnesia({mnesia, [copy]}). +%% gen_server callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). -define(CHAIN_TAB, emqx_authn_chain). --rlog_shard({?AUTH_SHARD, ?CHAIN_TAB}). - -%%------------------------------------------------------------------------------ -%% Mnesia bootstrap -%%------------------------------------------------------------------------------ - -%% @doc Create or replicate tables. --spec(mnesia(boot) -> ok). -mnesia(boot) -> - %% Optimize storage - StoreProps = [{ets, [{read_concurrency, true}]}], - %% Chain table - ok = ekka_mnesia:create_table(?CHAIN_TAB, [ - {ram_copies, [node()]}, - {record_name, chain}, - {local_content, true}, - {attributes, record_info(fields, chain)}, - {storage_properties, StoreProps}]); - -mnesia(copy) -> - ok = ekka_mnesia:copy_table(?CHAIN_TAB, ram_copies). - %%------------------------------------------------------------------------------ %% APIs %%------------------------------------------------------------------------------ @@ -192,7 +181,7 @@ is_enabled() -> end, Callbacks). authenticate(Credential, _AuthResult) -> - case mnesia:dirty_read(?CHAIN_TAB, ?CHAIN) of + case ets:lookup(?CHAIN_TAB, ?CHAIN) of [#chain{authenticators = Authenticators}] -> do_authenticate(Authenticators, Credential); [] -> @@ -214,154 +203,39 @@ do_authenticate([{_, _, #authenticator{provider = Provider, state = State}} | Mo {stop, Result} end. +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +stop() -> + gen_server:stop(?MODULE). + create_chain(#{id := ID}) -> - trans( - fun() -> - case mnesia:read(?CHAIN_TAB, ID, write) of - [] -> - Chain = #chain{id = ID, - authenticators = [], - created_at = erlang:system_time(millisecond)}, - mnesia:write(?CHAIN_TAB, Chain, write), - {ok, serialize_chain(Chain)}; - [_ | _] -> - {error, {already_exists, {chain, ID}}} - end - end). + gen_server:call(?MODULE, {create_chain, ID}). delete_chain(ID) -> - trans( - fun() -> - case mnesia:read(?CHAIN_TAB, ID, write) of - [] -> - {error, {not_found, {chain, ID}}}; - [#chain{authenticators = Authenticators}] -> - _ = [do_delete_authenticator(Authenticator) || {_, _, Authenticator} <- Authenticators], - mnesia:delete(?CHAIN_TAB, ID, write) - end - end). + gen_server:call(?MODULE, {delete_chain, ID}). lookup_chain(ID) -> - case mnesia:dirty_read(?CHAIN_TAB, ID) of - [] -> - {error, {not_found, {chain, ID}}}; - [Chain] -> - {ok, serialize_chain(Chain)} - end. + gen_server:call(?MODULE, {lookup_chain, ID}). list_chains() -> Chains = ets:tab2list(?CHAIN_TAB), {ok, [serialize_chain(Chain) || Chain <- Chains]}. -create_authenticator(ChainID, #{name := Name} = Config) -> - UpdateFun = - fun(Chain = #chain{authenticators = Authenticators}) -> - case lists:keymember(Name, 2, Authenticators) of - true -> - {error, name_has_be_used}; - false -> - AlreadyExist = fun(ID) -> - lists:keymember(ID, 1, Authenticators) - end, - AuthenticatorID = gen_id(AlreadyExist), - case do_create_authenticator(ChainID, AuthenticatorID, Config) of - {ok, Authenticator} -> - NAuthenticators = Authenticators ++ [{AuthenticatorID, Name, Authenticator}], - ok = mnesia:write(?CHAIN_TAB, Chain#chain{authenticators = NAuthenticators}, write), - {ok, serialize_authenticator(Authenticator)}; - {error, Reason} -> - {error, Reason} - end - end - end, - update_chain(ChainID, UpdateFun). +create_authenticator(ChainID, Config) -> + gen_server:call(?MODULE, {create_authenticator, ChainID, Config}). delete_authenticator(ChainID, AuthenticatorID) -> - UpdateFun = fun(Chain = #chain{authenticators = Authenticators}) -> - case lists:keytake(AuthenticatorID, 1, Authenticators) of - false -> - {error, {not_found, {authenticator, AuthenticatorID}}}; - {value, {_, _, Authenticator}, NAuthenticators} -> - _ = do_delete_authenticator(Authenticator), - NChain = Chain#chain{authenticators = NAuthenticators}, - mnesia:write(?CHAIN_TAB, NChain, write) - end - end, - update_chain(ChainID, UpdateFun). + gen_server:call(?MODULE, {delete_authenticator, ChainID, AuthenticatorID}). update_authenticator(ChainID, AuthenticatorID, Config) -> - do_update_authenticator(ChainID, AuthenticatorID, Config, false). + gen_server:call(?MODULE, {update_authenticator, ChainID, AuthenticatorID, Config}). update_or_create_authenticator(ChainID, AuthenticatorID, Config) -> - do_update_authenticator(ChainID, AuthenticatorID, Config, true). - -do_update_authenticator(ChainID, AuthenticatorID, #{name := NewName} = Config, CreateWhenNotFound) -> - UpdateFun = fun(Chain = #chain{authenticators = Authenticators}) -> - case lists:keytake(AuthenticatorID, 1, Authenticators) of - false -> - case CreateWhenNotFound of - true -> - case lists:keymember(NewName, 2, Authenticators) of - true -> - {error, name_has_be_used}; - false -> - case do_create_authenticator(ChainID, AuthenticatorID, Config) of - {ok, Authenticator} -> - NAuthenticators = Authenticators ++ [{AuthenticatorID, NewName, Authenticator}], - ok = mnesia:write(?CHAIN_TAB, Chain#chain{authenticators = NAuthenticators}, write), - {ok, serialize_authenticator(Authenticator)}; - {error, Reason} -> - {error, Reason} - end - end; - false -> - {error, {not_found, {authenticator, AuthenticatorID}}} - end; - {value, - {_, _, #authenticator{provider = Provider, - state = #{version := Version} = State} = Authenticator}, - Others} -> - case lists:keymember(NewName, 2, Others) of - true -> - {error, name_has_be_used}; - false -> - case (NewProvider = authenticator_provider(Config)) =:= Provider of - true -> - Unique = <>, - case Provider:update(Config#{'_unique' => Unique}, State) of - {ok, NewState} -> - NewAuthenticator = Authenticator#authenticator{name = NewName, - config = Config, - state = switch_version(NewState)}, - NewAuthenticators = replace_authenticator(AuthenticatorID, NewAuthenticator, Authenticators), - ok = mnesia:write(?CHAIN_TAB, Chain#chain{authenticators = NewAuthenticators}, write), - {ok, serialize_authenticator(NewAuthenticator)}; - {error, Reason} -> - {error, Reason} - end; - false -> - Unique = <>, - case NewProvider:create(Config#{'_unique' => Unique}) of - {ok, NewState} -> - NewAuthenticator = Authenticator#authenticator{name = NewName, - provider = NewProvider, - config = Config, - state = switch_version(NewState)}, - NewAuthenticators = replace_authenticator(AuthenticatorID, NewAuthenticator, Authenticators), - ok = mnesia:write(?CHAIN_TAB, Chain#chain{authenticators = NewAuthenticators}, write), - _ = Provider:destroy(State), - {ok, serialize_authenticator(NewAuthenticator)}; - {error, Reason} -> - {error, Reason} - end - end - end - end - end, - update_chain(ChainID, UpdateFun). + gen_server:call(?MODULE, {update_or_create_authenticator, ChainID, AuthenticatorID, Config}). lookup_authenticator(ChainID, AuthenticatorID) -> - case mnesia:dirty_read(?CHAIN_TAB, ChainID) of + case ets:lookup(?CHAIN_TAB, ChainID) of [] -> {error, {not_found, {chain, ChainID}}}; [#chain{authenticators = Authenticators}] -> @@ -374,7 +248,7 @@ lookup_authenticator(ChainID, AuthenticatorID) -> end. list_authenticators(ChainID) -> - case mnesia:dirty_read(?CHAIN_TAB, ChainID) of + case ets:lookup(?CHAIN_TAB, ChainID) of [] -> {error, {not_found, {chain, ChainID}}}; [#chain{authenticators = Authenticators}] -> @@ -382,34 +256,172 @@ list_authenticators(ChainID) -> end. move_authenticator_to_the_nth(ChainID, AuthenticatorID, N) -> - UpdateFun = fun(Chain = #chain{authenticators = Authenticators}) -> - case move_authenticator_to_the_nth_(AuthenticatorID, Authenticators, N) of - {ok, NAuthenticators} -> - NChain = Chain#chain{authenticators = NAuthenticators}, - mnesia:write(?CHAIN_TAB, NChain, write); + gen_server:call(?MODULE, {move_authenticator, ChainID, AuthenticatorID, N}). + +import_users(ChainID, AuthenticatorID, Filename) -> + gen_server:call(?MODULE, {import_users, ChainID, AuthenticatorID, Filename}). + +add_user(ChainID, AuthenticatorID, UserInfo) -> + gen_server:call(?MODULE, {add_user, ChainID, AuthenticatorID, UserInfo}). + +delete_user(ChainID, AuthenticatorID, UserID) -> + gen_server:call(?MODULE, {delete_user, ChainID, AuthenticatorID, UserID}). + +update_user(ChainID, AuthenticatorID, UserID, NewUserInfo) -> + gen_server:call(?MODULE, {update_user, ChainID, AuthenticatorID, UserID, NewUserInfo}). + +lookup_user(ChainID, AuthenticatorID, UserID) -> + gen_server:call(?MODULE, {lookup_user, ChainID, AuthenticatorID, UserID}). + +%% TODO: Support pagination +list_users(ChainID, AuthenticatorID) -> + gen_server:call(?MODULE, {list_users, ChainID, AuthenticatorID}). + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init(_Opts) -> + ets:new(?CHAIN_TAB, [ named_table, set, public + , {keypos, #chain.id} + , {read_concurrency, true}]), + {ok, #{}}. + +handle_call({create_chain, ID}, _From, State) -> + case ets:member(?CHAIN_TAB, ID) of + true -> + reply({error, {already_exists, {chain, ID}}}, State); + false -> + Chain = #chain{id = ID, + authenticators = [], + created_at = erlang:system_time(millisecond)}, + true = ets:insert(?CHAIN_TAB, Chain), + reply({ok, serialize_chain(Chain)}, State) + end; + +handle_call({delete_chain, ID}, _From, State) -> + case ets:lookup(?CHAIN_TAB, ID) of + [] -> + reply({error, {not_found, {chain, ID}}}, State); + [#chain{authenticators = Authenticators}] -> + _ = [do_delete_authenticator(Authenticator) || {_, _, Authenticator} <- Authenticators], + true = ets:delete(?CHAIN_TAB, ID), + reply(ok, State) + end; + +handle_call({lookup_chain, ID}, _From, State) -> + case ets:lookup(?CHAIN_TAB, ID) of + [] -> + reply({error, {not_found, {chain, ID}}}, State); + [Chain] -> + reply({ok, serialize_chain(Chain)}, State) + end; + +handle_call({create_authenticator, ChainID, #{name := Name} = Config}, _From, State) -> + UpdateFun = + fun(#chain{authenticators = Authenticators} = Chain) -> + case lists:keymember(Name, 2, Authenticators) of + true -> + {error, name_has_be_used}; + false -> + AlreadyExist = fun(ID) -> + lists:keymember(ID, 1, Authenticators) + end, + AuthenticatorID = gen_id(AlreadyExist), + case do_create_authenticator(ChainID, AuthenticatorID, Config) of + {ok, Authenticator} -> + NAuthenticators = Authenticators ++ [{AuthenticatorID, Name, Authenticator}], + true = ets:insert(?CHAIN_TAB, Chain#chain{authenticators = NAuthenticators}), + {ok, serialize_authenticator(Authenticator)}; {error, Reason} -> {error, Reason} end - end, - update_chain(ChainID, UpdateFun). + end + end, + Reply = update_chain(ChainID, UpdateFun), + reply(Reply, State); -import_users(ChainID, AuthenticatorID, Filename) -> - call_authenticator(ChainID, AuthenticatorID, import_users, [Filename]). +handle_call({delete_authenticator, ChainID, AuthenticatorID}, _From, State) -> + UpdateFun = + fun(#chain{authenticators = Authenticators} = Chain) -> + case lists:keytake(AuthenticatorID, 1, Authenticators) of + false -> + {error, {not_found, {authenticator, AuthenticatorID}}}; + {value, {_, _, Authenticator}, NAuthenticators} -> + _ = do_delete_authenticator(Authenticator), + true = ets:insert(?CHAIN_TAB, Chain#chain{authenticators = NAuthenticators}), + ok + end + end, + Reply = update_chain(ChainID, UpdateFun), + reply(Reply, State); -add_user(ChainID, AuthenticatorID, UserInfo) -> - call_authenticator(ChainID, AuthenticatorID, add_user, [UserInfo]). +handle_call({update_authenticator, ChainID, AuthenticatorID, Config}, _From, State) -> + Reply = update_or_create_authenticator(ChainID, AuthenticatorID, Config, false), + reply(Reply, State); -delete_user(ChainID, AuthenticatorID, UserID) -> - call_authenticator(ChainID, AuthenticatorID, delete_user, [UserID]). +handle_call({update_or_create_authenticator, ChainID, AuthenticatorID, Config}, _From, State) -> + Reply = update_or_create_authenticator(ChainID, AuthenticatorID, Config, true), + reply(Reply, State); -update_user(ChainID, AuthenticatorID, UserID, NewUserInfo) -> - call_authenticator(ChainID, AuthenticatorID, update_user, [UserID, NewUserInfo]). +handle_call({move_authenticator, ChainID, AuthenticatorID, N}, _From, State) -> + UpdateFun = + fun(#chain{authenticators = Authenticators} = Chain) -> + case move_authenticator_to_the_nth_(AuthenticatorID, Authenticators, N) of + {ok, NAuthenticators} -> + true = ets:insert(?CHAIN_TAB, Chain#chain{authenticators = NAuthenticators}), + ok; + {error, Reason} -> + {error, Reason} + end + end, + Reply = update_chain(ChainID, UpdateFun), + reply(Reply, State); -lookup_user(ChainID, AuthenticatorID, UserID) -> - call_authenticator(ChainID, AuthenticatorID, lookup_user, [UserID]). +handle_call({import_users, ChainID, AuthenticatorID, Filename}, _From, State) -> + Reply = call_authenticator(ChainID, AuthenticatorID, import_users, [Filename]), + reply(Reply, State); -list_users(ChainID, AuthenticatorID) -> - call_authenticator(ChainID, AuthenticatorID, list_users, []). +handle_call({add_user, ChainID, AuthenticatorID, UserInfo}, _From, State) -> + Reply = call_authenticator(ChainID, AuthenticatorID, add_user, [UserInfo]), + reply(Reply, State); + +handle_call({delete_user, ChainID, AuthenticatorID, UserID}, _From, State) -> + Reply = call_authenticator(ChainID, AuthenticatorID, delete_user, [UserID]), + reply(Reply, State); + +handle_call({update_user, ChainID, AuthenticatorID, UserID, NewUserInfo}, _From, State) -> + Reply = call_authenticator(ChainID, AuthenticatorID, update_user, [UserID, NewUserInfo]), + reply(Reply, State); + +handle_call({lookup_user, ChainID, AuthenticatorID, UserID}, _From, State) -> + Reply = call_authenticator(ChainID, AuthenticatorID, lookup_user, [UserID]), + reply(Reply, State); + +handle_call({list_users, ChainID, AuthenticatorID}, _From, State) -> + Reply = call_authenticator(ChainID, AuthenticatorID, list_users, []), + reply(Reply, State); + +handle_call(Req, _From, State) -> + ?LOG(error, "Unexpected call: ~p", [Req]), + {reply, ignored, State}. + +handle_cast(Req, State) -> + ?LOG(error, "Unexpected case: ~p", [Req]), + {noreply, State}. + +handle_info(Info, State) -> + ?LOG(error, "Unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +reply(Reply, State) -> + {reply, Reply, State}. %%------------------------------------------------------------------------------ %% Internal functions @@ -464,6 +476,72 @@ do_create_authenticator(ChainID, AuthenticatorID, #{name := Name} = Config) -> do_delete_authenticator(#authenticator{provider = Provider, state = State}) -> _ = Provider:destroy(State), ok. + +update_or_create_authenticator(ChainID, AuthenticatorID, #{name := NewName} = Config, CreateWhenNotFound) -> + UpdateFun = + fun(#chain{authenticators = Authenticators} = Chain) -> + case lists:keytake(AuthenticatorID, 1, Authenticators) of + false -> + case CreateWhenNotFound of + true -> + case lists:keymember(NewName, 2, Authenticators) of + true -> + {error, name_has_be_used}; + false -> + case do_create_authenticator(ChainID, AuthenticatorID, Config) of + {ok, Authenticator} -> + NAuthenticators = Authenticators ++ [{AuthenticatorID, NewName, Authenticator}], + true = ets:insert(?CHAIN_TAB, Chain#chain{authenticators = NAuthenticators}), + {ok, serialize_authenticator(Authenticator)}; + {error, Reason} -> + {error, Reason} + end + end; + false -> + {error, {not_found, {authenticator, AuthenticatorID}}} + end; + {value, + {_, _, #authenticator{provider = Provider, + state = #{version := Version} = State} = Authenticator}, + Others} -> + case lists:keymember(NewName, 2, Others) of + true -> + {error, name_has_be_used}; + false -> + case (NewProvider = authenticator_provider(Config)) =:= Provider of + true -> + Unique = <>, + case Provider:update(Config#{'_unique' => Unique}, State) of + {ok, NewState} -> + NewAuthenticator = Authenticator#authenticator{name = NewName, + config = Config, + state = switch_version(NewState)}, + NewAuthenticators = replace_authenticator(AuthenticatorID, NewAuthenticator, Authenticators), + true = ets:insert(?CHAIN_TAB, Chain#chain{authenticators = NewAuthenticators}), + {ok, serialize_authenticator(NewAuthenticator)}; + {error, Reason} -> + {error, Reason} + end; + false -> + Unique = <>, + case NewProvider:create(Config#{'_unique' => Unique}) of + {ok, NewState} -> + NewAuthenticator = Authenticator#authenticator{name = NewName, + provider = NewProvider, + config = Config, + state = switch_version(NewState)}, + NewAuthenticators = replace_authenticator(AuthenticatorID, NewAuthenticator, Authenticators), + true = ets:insert(?CHAIN_TAB, Chain#chain{authenticators = NewAuthenticators}), + _ = Provider:destroy(State), + {ok, serialize_authenticator(NewAuthenticator)}; + {error, Reason} -> + {error, Reason} + end + end + end + end + end, + update_chain(ChainID, UpdateFun). replace_authenticator(ID, #authenticator{name = Name} = Authenticator, Authenticators) -> lists:keyreplace(ID, 1, Authenticators, {ID, Name, Authenticator}). @@ -487,21 +565,16 @@ move_authenticator_to_the_nth_(AuthenticatorID, [Authenticator | More], N, Passe move_authenticator_to_the_nth_(AuthenticatorID, More, N, [Authenticator | Passed]). update_chain(ChainID, UpdateFun) -> - trans( - fun() -> - case mnesia:read(?CHAIN_TAB, ChainID, write) of - [] -> - {error, {not_found, {chain, ChainID}}}; - [Chain] -> - UpdateFun(Chain) - end - end). - -call_authenticator(ChainID, AuthenticatorID, Func, Args) -> - case mnesia:dirty_read(?CHAIN_TAB, ChainID) of + case ets:lookup(?CHAIN_TAB, ChainID) of [] -> {error, {not_found, {chain, ChainID}}}; - [#chain{authenticators = Authenticators}] -> + [Chain] -> + UpdateFun(Chain) + end. + +call_authenticator(ChainID, AuthenticatorID, Func, Args) -> + UpdateFun = + fun(#chain{authenticators = Authenticators}) -> case lists:keyfind(AuthenticatorID, 1, Authenticators) of false -> {error, {not_found, {authenticator, AuthenticatorID}}}; @@ -513,7 +586,8 @@ call_authenticator(ChainID, AuthenticatorID, Func, Args) -> {error, unsupported_feature} end end - end. + end, + update_chain(ChainID, UpdateFun). serialize_chain(#chain{id = ID, authenticators = Authenticators, @@ -528,12 +602,3 @@ serialize_authenticators(Authenticators) -> serialize_authenticator(#authenticator{id = ID, config = Config}) -> Config#{id => ID}. - -trans(Fun) -> - trans(Fun, []). - -trans(Fun, Args) -> - case ekka_mnesia:transaction(?AUTH_SHARD, Fun, Args) of - {atomic, Res} -> Res; - {aborted, Reason} -> {error, Reason} - end. diff --git a/apps/emqx_authn/src/emqx_authn_sup.erl b/apps/emqx_authn/src/emqx_authn_sup.erl index bb26af0ad..56fcf299a 100644 --- a/apps/emqx_authn/src/emqx_authn_sup.erl +++ b/apps/emqx_authn/src/emqx_authn_sup.erl @@ -26,4 +26,11 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - {ok, {{one_for_one, 10, 10}, []}}. + ChildSpecs = [ + #{id => emqx_authn, + start => {emqx_authn, start_link, []}, + restart => permanent, + type => worker, + modules => [emqx_authn]} + ], + {ok, {{one_for_one, 10, 10}, ChildSpecs}}.