emqx/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl

581 lines
18 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_authn_mnesia).
-include("emqx_auth_mnesia.hrl").
-include_lib("emqx_auth/include/emqx_authn.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
-behaviour(emqx_authn_provider).
-behaviour(emqx_db_backup).
-export([
create/2,
update/2,
authenticate/2,
destroy/1
]).
-export([
import_users/2,
add_user/2,
delete_user/2,
update_user/3,
lookup_user/2,
list_users/2
]).
-export([
qs2ms/2,
run_fuzzy_filter/2,
format_user_info/1,
group_match_spec/1
]).
%% Internal exports (RPC)
-export([
do_destroy/1,
do_add_user/1,
do_delete_user/2,
do_update_user/3
]).
-export([init_tables/0]).
-export([backup_tables/0]).
-type user_group() :: binary().
-type user_id() :: binary().
-record(user_info, {
user_id :: {user_group(), user_id()},
password_hash :: binary(),
salt :: binary(),
is_superuser :: boolean()
}).
-define(TAB, ?MODULE).
-define(AUTHN_QSCHEMA, [
{<<"like_user_id">>, binary},
{<<"user_group">>, binary},
{<<"is_superuser">>, atom}
]).
%%------------------------------------------------------------------------------
%% Mnesia bootstrap
%%------------------------------------------------------------------------------
%% @doc Create or replicate tables.
-spec create_tables() -> [mria:table()].
create_tables() ->
ok = mria:create_table(?TAB, [
{rlog_shard, ?AUTHN_SHARD},
{type, ordered_set},
{storage, disc_copies},
{record_name, user_info},
{attributes, record_info(fields, user_info)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}
]),
[?TAB].
%% Init
-spec init_tables() -> ok.
init_tables() ->
ok = mria:wait_for_tables(create_tables()).
%%------------------------------------------------------------------------------
%% Data backup
%%------------------------------------------------------------------------------
backup_tables() -> [?TAB].
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
create(_AuthenticatorID, Config) ->
create(Config).
create(
#{
user_id_type := Type,
password_hash_algorithm := Algorithm,
user_group := UserGroup
} = Config
) ->
ok = emqx_authn_password_hashing:init(Algorithm),
State = #{
user_group => UserGroup,
user_id_type => Type,
password_hash_algorithm => Algorithm
},
ok = boostrap_user_from_file(Config, State),
{ok, State}.
update(Config, _State) ->
create(Config).
authenticate(#{auth_method := _}, _) ->
ignore;
authenticate(#{password := undefined}, _) ->
{error, bad_username_or_password};
authenticate(
#{password := Password} = Credential,
#{
user_group := UserGroup,
user_id_type := Type,
password_hash_algorithm := Algorithm
}
) ->
UserID = get_user_identity(Credential, Type),
case mnesia:dirty_read(?TAB, {UserGroup, UserID}) of
[] ->
?TRACE_AUTHN_PROVIDER("user_not_found"),
ignore;
[#user_info{password_hash = PasswordHash, salt = Salt, is_superuser = IsSuperuser}] ->
case
emqx_authn_password_hashing:check_password(
Algorithm, Salt, PasswordHash, Password
)
of
true ->
{ok, #{is_superuser => IsSuperuser}};
false ->
{error, bad_username_or_password}
end
end.
destroy(#{user_group := UserGroup}) ->
trans(fun ?MODULE:do_destroy/1, [UserGroup]).
do_destroy(UserGroup) ->
ok = lists:foreach(
fun(User) ->
mnesia:delete_object(?TAB, User, write)
end,
mnesia:select(?TAB, group_match_spec(UserGroup), write)
).
import_users(ImportSource, State) ->
import_users(ImportSource, State, #{override => true}).
import_users({PasswordType, Filename, FileData}, State, Opts) ->
Convertor = convertor(PasswordType, State),
try parse_import_users(Filename, FileData, Convertor) of
{_NewUsersCnt, Users} ->
case do_import_users(Users, Opts#{filename => Filename}) of
ok ->
ok;
%% Do not log empty user entries.
%% The default etc/auth-built-in-db.csv file contains an empty user entry.
{error, empty_users} ->
{error, empty_users};
{error, Reason} ->
?SLOG(
warning,
#{
msg => "import_authn_users_failed",
reason => Reason,
type => PasswordType,
filename => Filename
}
),
{error, Reason}
end
catch
error:Reason:Stk ->
?SLOG(
warning,
#{
msg => "parse_authn_users_failed",
reason => Reason,
type => PasswordType,
filename => Filename,
stacktrace => Stk
}
),
{error, Reason}
end.
do_import_users([], _Opts) ->
{error, empty_users};
do_import_users(Users, Opts) ->
trans(
fun() ->
lists:foreach(
fun(User) ->
insert_user(User, Opts)
end,
Users
)
end
).
add_user(
UserInfo,
State
) ->
UserInfoRecord = user_info_record(UserInfo, State),
trans(fun ?MODULE:do_add_user/1, [UserInfoRecord]).
do_add_user(
#user_info{
user_id = {_UserGroup, UserID} = DBUserID,
is_superuser = IsSuperuser
} = UserInfoRecord
) ->
case mnesia:read(?TAB, DBUserID, write) of
[] ->
insert_user(UserInfoRecord),
{ok, #{user_id => UserID, is_superuser => IsSuperuser}};
[_] ->
{error, already_exist}
end.
delete_user(UserID, State) ->
trans(fun ?MODULE:do_delete_user/2, [UserID, State]).
do_delete_user(UserID, #{user_group := UserGroup}) ->
case mnesia:read(?TAB, {UserGroup, UserID}, write) of
[] ->
{error, not_found};
[_] ->
mnesia:delete(?TAB, {UserGroup, UserID}, write)
end.
update_user(UserID, UserInfo, State) ->
FieldsToUpdate = fields_to_update(
UserInfo,
[
hash_and_salt,
is_superuser
],
State
),
trans(fun ?MODULE:do_update_user/3, [UserID, FieldsToUpdate, State]).
do_update_user(
UserID,
FieldsToUpdate,
#{
user_group := UserGroup
}
) ->
case mnesia:read(?TAB, {UserGroup, UserID}, write) of
[] ->
{error, not_found};
[#user_info{} = UserInfoRecord] ->
NUserInfoRecord = update_user_record(UserInfoRecord, FieldsToUpdate),
insert_user(NUserInfoRecord),
{ok, #{user_id => UserID, is_superuser => NUserInfoRecord#user_info.is_superuser}}
end.
lookup_user(UserID, #{user_group := UserGroup}) ->
case mnesia:dirty_read(?TAB, {UserGroup, UserID}) of
[UserInfo] ->
{ok, format_user_info(UserInfo)};
[] ->
{error, not_found}
end.
list_users(QueryString, #{user_group := UserGroup}) ->
NQueryString = QueryString#{<<"user_group">> => UserGroup},
emqx_mgmt_api:node_query(
node(),
?TAB,
NQueryString,
?AUTHN_QSCHEMA,
fun ?MODULE:qs2ms/2,
fun ?MODULE:format_user_info/1
).
%%--------------------------------------------------------------------
%% QueryString to MatchSpec
-spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter().
qs2ms(_Tab, {QString, FuzzyQString}) ->
#{
match_spec => ms_from_qstring(QString),
fuzzy_fun => fuzzy_filter_fun(FuzzyQString)
}.
%% Fuzzy username funcs
fuzzy_filter_fun([]) ->
undefined;
fuzzy_filter_fun(Fuzzy) ->
{fun ?MODULE:run_fuzzy_filter/2, [Fuzzy]}.
run_fuzzy_filter(_, []) ->
true;
run_fuzzy_filter(
E = #user_info{user_id = {_, UserID}},
[{user_id, like, UsernameSubStr} | Fuzzy]
) ->
binary:match(UserID, UsernameSubStr) /= nomatch andalso run_fuzzy_filter(E, Fuzzy).
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
insert_user(User, Opts) ->
#{
<<"user_group">> := UserGroup,
<<"user_id">> := UserID,
<<"password_hash">> := PasswordHash,
<<"salt">> := Salt,
<<"is_superuser">> := IsSuperuser
} = User,
UserInfoRecord =
#user_info{user_id = DBUserID} =
user_info_record(UserGroup, UserID, PasswordHash, Salt, IsSuperuser),
case mnesia:read(?TAB, DBUserID, write) of
[] ->
insert_user(UserInfoRecord);
[UserInfoRecord] ->
ok;
[_] ->
Msg =
case maps:get(override, Opts, false) of
true ->
insert_user(UserInfoRecord),
"override_an_exists_userid_into_authentication_database_ok";
false ->
"import_an_exists_userid_into_authentication_database_failed"
end,
?SLOG(warning, #{
msg => Msg,
user_id => UserID,
group_id => UserGroup,
bootstrap_file => maps:get(filename, Opts),
suggestion =>
"If you've altered it differently, delete the user_id from the bootstrap file."
})
end.
insert_user(#user_info{} = UserInfoRecord) ->
mnesia:write(?TAB, UserInfoRecord, write).
user_info_record(UserGroup, UserID, PasswordHash, Salt, IsSuperuser) ->
#user_info{
user_id = {UserGroup, UserID},
password_hash = PasswordHash,
salt = Salt,
is_superuser = IsSuperuser
}.
user_info_record(
#{
user_id := UserID,
password := Password
} = UserInfo,
#{
password_hash_algorithm := Algorithm,
user_group := UserGroup
} = _State
) ->
IsSuperuser = maps:get(is_superuser, UserInfo, false),
{PasswordHash, Salt} = emqx_authn_password_hashing:hash(Algorithm, Password),
user_info_record(UserGroup, UserID, PasswordHash, Salt, IsSuperuser).
fields_to_update(
#{password := Password} = UserInfo,
[hash_and_salt | Rest],
#{password_hash_algorithm := Algorithm} = State
) ->
[
{hash_and_salt,
emqx_authn_password_hashing:hash(
Algorithm, Password
)}
| fields_to_update(UserInfo, Rest, State)
];
fields_to_update(#{is_superuser := IsSuperuser} = UserInfo, [is_superuser | Rest], State) ->
[{is_superuser, IsSuperuser} | fields_to_update(UserInfo, Rest, State)];
fields_to_update(UserInfo, [_ | Rest], State) ->
fields_to_update(UserInfo, Rest, State);
fields_to_update(_UserInfo, [], _State) ->
[].
update_user_record(UserInfoRecord, []) ->
UserInfoRecord;
update_user_record(UserInfoRecord, [{hash_and_salt, {PasswordHash, Salt}} | Rest]) ->
update_user_record(UserInfoRecord#user_info{password_hash = PasswordHash, salt = Salt}, Rest);
update_user_record(UserInfoRecord, [{is_superuser, IsSuperuser} | Rest]) ->
update_user_record(UserInfoRecord#user_info{is_superuser = IsSuperuser}, Rest).
%% TODO: Support other type
get_user_identity(#{username := Username}, username) ->
Username;
get_user_identity(#{clientid := ClientID}, clientid) ->
ClientID;
get_user_identity(_, Type) ->
{error, {bad_user_identity_type, Type}}.
trans(Fun, Args) ->
case mria:transaction(?AUTHN_SHARD, Fun, Args) of
{atomic, Res} -> Res;
{aborted, Reason} -> {error, Reason}
end.
trans(Fun) ->
case mria:transaction(?AUTHN_SHARD, Fun) of
{atomic, Res} -> Res;
{aborted, Reason} -> {error, Reason}
end.
to_binary(B) when is_binary(B) ->
B;
to_binary(L) when is_list(L) ->
iolist_to_binary(L).
format_user_info(#user_info{user_id = {_, UserID}, is_superuser = IsSuperuser}) ->
#{user_id => UserID, is_superuser => IsSuperuser}.
ms_from_qstring(QString) ->
case lists:keytake(user_group, 1, QString) of
{value, {user_group, '=:=', UserGroup}, QString2} ->
group_match_spec(UserGroup, QString2);
_ ->
[]
end.
group_match_spec(UserGroup) ->
group_match_spec(UserGroup, []).
group_match_spec(UserGroup, QString) ->
case lists:keyfind(is_superuser, 1, QString) of
false ->
ets:fun2ms(fun(#user_info{user_id = {Group, _}} = User) when Group =:= UserGroup ->
User
end);
{is_superuser, '=:=', Value} ->
ets:fun2ms(fun(#user_info{user_id = {Group, _}, is_superuser = IsSuper} = User) when
Group =:= UserGroup, IsSuper =:= Value
->
User
end)
end.
%%--------------------------------------------------------------------
%% parse import file/data
parse_import_users(Filename, FileData, Convertor) ->
UserStream = reader_fn(Filename, FileData),
Users = emqx_utils_stream:consume(emqx_utils_stream:map(Convertor, UserStream)),
NewUsersCount =
lists:foldl(
fun(
#{
<<"user_group">> := UserGroup,
<<"user_id">> := UserID
},
Acc
) ->
case ets:member(?TAB, {UserGroup, UserID}) of
true ->
Acc;
false ->
Acc + 1
end
end,
0,
Users
),
{NewUsersCount, Users}.
reader_fn(prepared_user_list, List) when is_list(List) ->
%% Example: [#{<<"user_id">> => <<>>, ...}]
emqx_utils_stream:list(List);
reader_fn(Filename0, Data) ->
case filename:extension(to_binary(Filename0)) of
<<".json">> ->
%% Example: data/user-credentials.json
case emqx_utils_json:safe_decode(Data, [return_maps]) of
{ok, List} when is_list(List) ->
emqx_utils_stream:list(List);
{ok, _} ->
error(unknown_file_format);
{error, Reason} ->
error(Reason)
end;
<<".csv">> ->
%% Example: etc/auth-built-in-db-bootstrap.csv
emqx_utils_stream:csv(Data);
<<>> ->
error(unknown_file_format);
Extension ->
error({unsupported_file_format, Extension})
end.
convertor(PasswordType, State) ->
fun(User) ->
convert_user(User, PasswordType, State)
end.
convert_user(
User = #{<<"user_id">> := UserId},
PasswordType,
#{user_group := UserGroup, password_hash_algorithm := Algorithm}
) ->
{PasswordHash, Salt} = find_password_hash(PasswordType, User, Algorithm),
#{
<<"user_id">> => UserId,
<<"password_hash">> => PasswordHash,
<<"salt">> => Salt,
<<"is_superuser">> => is_superuser(User),
<<"user_group">> => UserGroup
};
convert_user(_, _, _) ->
error(bad_format).
find_password_hash(hash, User = #{<<"password_hash">> := PasswordHash}, _) ->
{PasswordHash, maps:get(<<"salt">>, User, <<>>)};
find_password_hash(plain, #{<<"password">> := Password}, Algorithm) ->
emqx_authn_password_hashing:hash(Algorithm, Password);
find_password_hash(hash, _User, _) ->
error("hash_import_requires_password_hash_field");
find_password_hash(plain, _User, _Algorithm) ->
error("plain_import_requires_password_field");
find_password_hash(_, _, _) ->
error(bad_format).
is_superuser(#{<<"is_superuser">> := <<"true">>}) -> true;
is_superuser(#{<<"is_superuser">> := true}) -> true;
is_superuser(_) -> false.
boostrap_user_from_file(Config, State) ->
case maps:get(bootstrap_file, Config, <<>>) of
<<>> ->
ok;
FileName0 ->
#{bootstrap_type := Type} = Config,
FileName = emqx_schema:naive_env_interpolation(FileName0),
case file:read_file(FileName) of
{ok, FileData} ->
_ = import_users({Type, FileName, FileData}, State, #{override => false}),
ok;
{error, Reason} ->
?SLOG(warning, #{
msg => "boostrap_authn_built_in_database_failed",
boostrap_file => FileName,
boostrap_type => Type,
reason => emqx_utils:explain_posix(Reason)
})
end
end.