581 lines
18 KiB
Erlang
581 lines
18 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2021-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_authn_mnesia).
|
|
|
|
-include("emqx_auth_mnesia.hrl").
|
|
-include_lib("emqx_auth/include/emqx_authn.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
-include_lib("stdlib/include/ms_transform.hrl").
|
|
|
|
-behaviour(emqx_authn_provider).
|
|
-behaviour(emqx_db_backup).
|
|
|
|
-export([
|
|
create/2,
|
|
update/2,
|
|
authenticate/2,
|
|
destroy/1
|
|
]).
|
|
|
|
-export([
|
|
import_users/2,
|
|
add_user/2,
|
|
delete_user/2,
|
|
update_user/3,
|
|
lookup_user/2,
|
|
list_users/2
|
|
]).
|
|
|
|
-export([
|
|
qs2ms/2,
|
|
run_fuzzy_filter/2,
|
|
format_user_info/1,
|
|
group_match_spec/1
|
|
]).
|
|
|
|
%% Internal exports (RPC)
|
|
-export([
|
|
do_destroy/1,
|
|
do_add_user/1,
|
|
do_delete_user/2,
|
|
do_update_user/3
|
|
]).
|
|
|
|
-export([init_tables/0]).
|
|
|
|
-export([backup_tables/0]).
|
|
|
|
-type user_group() :: binary().
|
|
-type user_id() :: binary().
|
|
|
|
-record(user_info, {
|
|
user_id :: {user_group(), user_id()},
|
|
password_hash :: binary(),
|
|
salt :: binary(),
|
|
is_superuser :: boolean()
|
|
}).
|
|
|
|
-define(TAB, ?MODULE).
|
|
-define(AUTHN_QSCHEMA, [
|
|
{<<"like_user_id">>, binary},
|
|
{<<"user_group">>, binary},
|
|
{<<"is_superuser">>, atom}
|
|
]).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Mnesia bootstrap
|
|
%%------------------------------------------------------------------------------
|
|
|
|
%% @doc Create or replicate tables.
|
|
-spec create_tables() -> [mria:table()].
|
|
create_tables() ->
|
|
ok = mria:create_table(?TAB, [
|
|
{rlog_shard, ?AUTHN_SHARD},
|
|
{type, ordered_set},
|
|
{storage, disc_copies},
|
|
{record_name, user_info},
|
|
{attributes, record_info(fields, user_info)},
|
|
{storage_properties, [{ets, [{read_concurrency, true}]}]}
|
|
]),
|
|
[?TAB].
|
|
|
|
%% Init
|
|
-spec init_tables() -> ok.
|
|
init_tables() ->
|
|
ok = mria:wait_for_tables(create_tables()).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Data backup
|
|
%%------------------------------------------------------------------------------
|
|
|
|
backup_tables() -> [?TAB].
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% APIs
|
|
%%------------------------------------------------------------------------------
|
|
|
|
create(_AuthenticatorID, Config) ->
|
|
create(Config).
|
|
|
|
create(
|
|
#{
|
|
user_id_type := Type,
|
|
password_hash_algorithm := Algorithm,
|
|
user_group := UserGroup
|
|
} = Config
|
|
) ->
|
|
ok = emqx_authn_password_hashing:init(Algorithm),
|
|
State = #{
|
|
user_group => UserGroup,
|
|
user_id_type => Type,
|
|
password_hash_algorithm => Algorithm
|
|
},
|
|
ok = boostrap_user_from_file(Config, State),
|
|
{ok, State}.
|
|
|
|
update(Config, _State) ->
|
|
create(Config).
|
|
|
|
authenticate(#{auth_method := _}, _) ->
|
|
ignore;
|
|
authenticate(#{password := undefined}, _) ->
|
|
{error, bad_username_or_password};
|
|
authenticate(
|
|
#{password := Password} = Credential,
|
|
#{
|
|
user_group := UserGroup,
|
|
user_id_type := Type,
|
|
password_hash_algorithm := Algorithm
|
|
}
|
|
) ->
|
|
UserID = get_user_identity(Credential, Type),
|
|
case mnesia:dirty_read(?TAB, {UserGroup, UserID}) of
|
|
[] ->
|
|
?TRACE_AUTHN_PROVIDER("user_not_found"),
|
|
ignore;
|
|
[#user_info{password_hash = PasswordHash, salt = Salt, is_superuser = IsSuperuser}] ->
|
|
case
|
|
emqx_authn_password_hashing:check_password(
|
|
Algorithm, Salt, PasswordHash, Password
|
|
)
|
|
of
|
|
true ->
|
|
{ok, #{is_superuser => IsSuperuser}};
|
|
false ->
|
|
{error, bad_username_or_password}
|
|
end
|
|
end.
|
|
|
|
destroy(#{user_group := UserGroup}) ->
|
|
trans(fun ?MODULE:do_destroy/1, [UserGroup]).
|
|
|
|
do_destroy(UserGroup) ->
|
|
ok = lists:foreach(
|
|
fun(User) ->
|
|
mnesia:delete_object(?TAB, User, write)
|
|
end,
|
|
mnesia:select(?TAB, group_match_spec(UserGroup), write)
|
|
).
|
|
|
|
import_users(ImportSource, State) ->
|
|
import_users(ImportSource, State, #{override => true}).
|
|
|
|
import_users({PasswordType, Filename, FileData}, State, Opts) ->
|
|
Convertor = convertor(PasswordType, State),
|
|
try parse_import_users(Filename, FileData, Convertor) of
|
|
{_NewUsersCnt, Users} ->
|
|
case do_import_users(Users, Opts#{filename => Filename}) of
|
|
ok ->
|
|
ok;
|
|
%% Do not log empty user entries.
|
|
%% The default etc/auth-built-in-db.csv file contains an empty user entry.
|
|
{error, empty_users} ->
|
|
{error, empty_users};
|
|
{error, Reason} ->
|
|
?SLOG(
|
|
warning,
|
|
#{
|
|
msg => "import_authn_users_failed",
|
|
reason => Reason,
|
|
type => PasswordType,
|
|
filename => Filename
|
|
}
|
|
),
|
|
{error, Reason}
|
|
end
|
|
catch
|
|
error:Reason:Stk ->
|
|
?SLOG(
|
|
warning,
|
|
#{
|
|
msg => "parse_authn_users_failed",
|
|
reason => Reason,
|
|
type => PasswordType,
|
|
filename => Filename,
|
|
stacktrace => Stk
|
|
}
|
|
),
|
|
{error, Reason}
|
|
end.
|
|
|
|
do_import_users([], _Opts) ->
|
|
{error, empty_users};
|
|
do_import_users(Users, Opts) ->
|
|
trans(
|
|
fun() ->
|
|
lists:foreach(
|
|
fun(User) ->
|
|
insert_user(User, Opts)
|
|
end,
|
|
Users
|
|
)
|
|
end
|
|
).
|
|
|
|
add_user(
|
|
UserInfo,
|
|
State
|
|
) ->
|
|
UserInfoRecord = user_info_record(UserInfo, State),
|
|
trans(fun ?MODULE:do_add_user/1, [UserInfoRecord]).
|
|
|
|
do_add_user(
|
|
#user_info{
|
|
user_id = {_UserGroup, UserID} = DBUserID,
|
|
is_superuser = IsSuperuser
|
|
} = UserInfoRecord
|
|
) ->
|
|
case mnesia:read(?TAB, DBUserID, write) of
|
|
[] ->
|
|
insert_user(UserInfoRecord),
|
|
{ok, #{user_id => UserID, is_superuser => IsSuperuser}};
|
|
[_] ->
|
|
{error, already_exist}
|
|
end.
|
|
|
|
delete_user(UserID, State) ->
|
|
trans(fun ?MODULE:do_delete_user/2, [UserID, State]).
|
|
|
|
do_delete_user(UserID, #{user_group := UserGroup}) ->
|
|
case mnesia:read(?TAB, {UserGroup, UserID}, write) of
|
|
[] ->
|
|
{error, not_found};
|
|
[_] ->
|
|
mnesia:delete(?TAB, {UserGroup, UserID}, write)
|
|
end.
|
|
|
|
update_user(UserID, UserInfo, State) ->
|
|
FieldsToUpdate = fields_to_update(
|
|
UserInfo,
|
|
[
|
|
hash_and_salt,
|
|
is_superuser
|
|
],
|
|
State
|
|
),
|
|
trans(fun ?MODULE:do_update_user/3, [UserID, FieldsToUpdate, State]).
|
|
|
|
do_update_user(
|
|
UserID,
|
|
FieldsToUpdate,
|
|
#{
|
|
user_group := UserGroup
|
|
}
|
|
) ->
|
|
case mnesia:read(?TAB, {UserGroup, UserID}, write) of
|
|
[] ->
|
|
{error, not_found};
|
|
[#user_info{} = UserInfoRecord] ->
|
|
NUserInfoRecord = update_user_record(UserInfoRecord, FieldsToUpdate),
|
|
insert_user(NUserInfoRecord),
|
|
{ok, #{user_id => UserID, is_superuser => NUserInfoRecord#user_info.is_superuser}}
|
|
end.
|
|
|
|
lookup_user(UserID, #{user_group := UserGroup}) ->
|
|
case mnesia:dirty_read(?TAB, {UserGroup, UserID}) of
|
|
[UserInfo] ->
|
|
{ok, format_user_info(UserInfo)};
|
|
[] ->
|
|
{error, not_found}
|
|
end.
|
|
|
|
list_users(QueryString, #{user_group := UserGroup}) ->
|
|
NQueryString = QueryString#{<<"user_group">> => UserGroup},
|
|
emqx_mgmt_api:node_query(
|
|
node(),
|
|
?TAB,
|
|
NQueryString,
|
|
?AUTHN_QSCHEMA,
|
|
fun ?MODULE:qs2ms/2,
|
|
fun ?MODULE:format_user_info/1
|
|
).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% QueryString to MatchSpec
|
|
|
|
-spec qs2ms(atom(), {list(), list()}) -> emqx_mgmt_api:match_spec_and_filter().
|
|
qs2ms(_Tab, {QString, FuzzyQString}) ->
|
|
#{
|
|
match_spec => ms_from_qstring(QString),
|
|
fuzzy_fun => fuzzy_filter_fun(FuzzyQString)
|
|
}.
|
|
|
|
%% Fuzzy username funcs
|
|
fuzzy_filter_fun([]) ->
|
|
undefined;
|
|
fuzzy_filter_fun(Fuzzy) ->
|
|
{fun ?MODULE:run_fuzzy_filter/2, [Fuzzy]}.
|
|
|
|
run_fuzzy_filter(_, []) ->
|
|
true;
|
|
run_fuzzy_filter(
|
|
E = #user_info{user_id = {_, UserID}},
|
|
[{user_id, like, UsernameSubStr} | Fuzzy]
|
|
) ->
|
|
binary:match(UserID, UsernameSubStr) /= nomatch andalso run_fuzzy_filter(E, Fuzzy).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Internal functions
|
|
%%------------------------------------------------------------------------------
|
|
|
|
insert_user(User, Opts) ->
|
|
#{
|
|
<<"user_group">> := UserGroup,
|
|
<<"user_id">> := UserID,
|
|
<<"password_hash">> := PasswordHash,
|
|
<<"salt">> := Salt,
|
|
<<"is_superuser">> := IsSuperuser
|
|
} = User,
|
|
UserInfoRecord =
|
|
#user_info{user_id = DBUserID} =
|
|
user_info_record(UserGroup, UserID, PasswordHash, Salt, IsSuperuser),
|
|
case mnesia:read(?TAB, DBUserID, write) of
|
|
[] ->
|
|
insert_user(UserInfoRecord);
|
|
[UserInfoRecord] ->
|
|
ok;
|
|
[_] ->
|
|
Msg =
|
|
case maps:get(override, Opts, false) of
|
|
true ->
|
|
insert_user(UserInfoRecord),
|
|
"override_an_exists_userid_into_authentication_database_ok";
|
|
false ->
|
|
"import_an_exists_userid_into_authentication_database_failed"
|
|
end,
|
|
?SLOG(warning, #{
|
|
msg => Msg,
|
|
user_id => UserID,
|
|
group_id => UserGroup,
|
|
bootstrap_file => maps:get(filename, Opts),
|
|
suggestion =>
|
|
"If you've altered it differently, delete the user_id from the bootstrap file."
|
|
})
|
|
end.
|
|
|
|
insert_user(#user_info{} = UserInfoRecord) ->
|
|
mnesia:write(?TAB, UserInfoRecord, write).
|
|
|
|
user_info_record(UserGroup, UserID, PasswordHash, Salt, IsSuperuser) ->
|
|
#user_info{
|
|
user_id = {UserGroup, UserID},
|
|
password_hash = PasswordHash,
|
|
salt = Salt,
|
|
is_superuser = IsSuperuser
|
|
}.
|
|
|
|
user_info_record(
|
|
#{
|
|
user_id := UserID,
|
|
password := Password
|
|
} = UserInfo,
|
|
#{
|
|
password_hash_algorithm := Algorithm,
|
|
user_group := UserGroup
|
|
} = _State
|
|
) ->
|
|
IsSuperuser = maps:get(is_superuser, UserInfo, false),
|
|
{PasswordHash, Salt} = emqx_authn_password_hashing:hash(Algorithm, Password),
|
|
user_info_record(UserGroup, UserID, PasswordHash, Salt, IsSuperuser).
|
|
|
|
fields_to_update(
|
|
#{password := Password} = UserInfo,
|
|
[hash_and_salt | Rest],
|
|
#{password_hash_algorithm := Algorithm} = State
|
|
) ->
|
|
[
|
|
{hash_and_salt,
|
|
emqx_authn_password_hashing:hash(
|
|
Algorithm, Password
|
|
)}
|
|
| fields_to_update(UserInfo, Rest, State)
|
|
];
|
|
fields_to_update(#{is_superuser := IsSuperuser} = UserInfo, [is_superuser | Rest], State) ->
|
|
[{is_superuser, IsSuperuser} | fields_to_update(UserInfo, Rest, State)];
|
|
fields_to_update(UserInfo, [_ | Rest], State) ->
|
|
fields_to_update(UserInfo, Rest, State);
|
|
fields_to_update(_UserInfo, [], _State) ->
|
|
[].
|
|
|
|
update_user_record(UserInfoRecord, []) ->
|
|
UserInfoRecord;
|
|
update_user_record(UserInfoRecord, [{hash_and_salt, {PasswordHash, Salt}} | Rest]) ->
|
|
update_user_record(UserInfoRecord#user_info{password_hash = PasswordHash, salt = Salt}, Rest);
|
|
update_user_record(UserInfoRecord, [{is_superuser, IsSuperuser} | Rest]) ->
|
|
update_user_record(UserInfoRecord#user_info{is_superuser = IsSuperuser}, Rest).
|
|
|
|
%% TODO: Support other type
|
|
get_user_identity(#{username := Username}, username) ->
|
|
Username;
|
|
get_user_identity(#{clientid := ClientID}, clientid) ->
|
|
ClientID;
|
|
get_user_identity(_, Type) ->
|
|
{error, {bad_user_identity_type, Type}}.
|
|
|
|
trans(Fun, Args) ->
|
|
case mria:transaction(?AUTHN_SHARD, Fun, Args) of
|
|
{atomic, Res} -> Res;
|
|
{aborted, Reason} -> {error, Reason}
|
|
end.
|
|
|
|
trans(Fun) ->
|
|
case mria:transaction(?AUTHN_SHARD, Fun) of
|
|
{atomic, Res} -> Res;
|
|
{aborted, Reason} -> {error, Reason}
|
|
end.
|
|
|
|
to_binary(B) when is_binary(B) ->
|
|
B;
|
|
to_binary(L) when is_list(L) ->
|
|
iolist_to_binary(L).
|
|
|
|
format_user_info(#user_info{user_id = {_, UserID}, is_superuser = IsSuperuser}) ->
|
|
#{user_id => UserID, is_superuser => IsSuperuser}.
|
|
|
|
ms_from_qstring(QString) ->
|
|
case lists:keytake(user_group, 1, QString) of
|
|
{value, {user_group, '=:=', UserGroup}, QString2} ->
|
|
group_match_spec(UserGroup, QString2);
|
|
_ ->
|
|
[]
|
|
end.
|
|
|
|
group_match_spec(UserGroup) ->
|
|
group_match_spec(UserGroup, []).
|
|
|
|
group_match_spec(UserGroup, QString) ->
|
|
case lists:keyfind(is_superuser, 1, QString) of
|
|
false ->
|
|
ets:fun2ms(fun(#user_info{user_id = {Group, _}} = User) when Group =:= UserGroup ->
|
|
User
|
|
end);
|
|
{is_superuser, '=:=', Value} ->
|
|
ets:fun2ms(fun(#user_info{user_id = {Group, _}, is_superuser = IsSuper} = User) when
|
|
Group =:= UserGroup, IsSuper =:= Value
|
|
->
|
|
User
|
|
end)
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% parse import file/data
|
|
|
|
parse_import_users(Filename, FileData, Convertor) ->
|
|
UserStream = reader_fn(Filename, FileData),
|
|
Users = emqx_utils_stream:consume(emqx_utils_stream:map(Convertor, UserStream)),
|
|
NewUsersCount =
|
|
lists:foldl(
|
|
fun(
|
|
#{
|
|
<<"user_group">> := UserGroup,
|
|
<<"user_id">> := UserID
|
|
},
|
|
Acc
|
|
) ->
|
|
case ets:member(?TAB, {UserGroup, UserID}) of
|
|
true ->
|
|
Acc;
|
|
false ->
|
|
Acc + 1
|
|
end
|
|
end,
|
|
0,
|
|
Users
|
|
),
|
|
{NewUsersCount, Users}.
|
|
|
|
reader_fn(prepared_user_list, List) when is_list(List) ->
|
|
%% Example: [#{<<"user_id">> => <<>>, ...}]
|
|
emqx_utils_stream:list(List);
|
|
reader_fn(Filename0, Data) ->
|
|
case filename:extension(to_binary(Filename0)) of
|
|
<<".json">> ->
|
|
%% Example: data/user-credentials.json
|
|
case emqx_utils_json:safe_decode(Data, [return_maps]) of
|
|
{ok, List} when is_list(List) ->
|
|
emqx_utils_stream:list(List);
|
|
{ok, _} ->
|
|
error(unknown_file_format);
|
|
{error, Reason} ->
|
|
error(Reason)
|
|
end;
|
|
<<".csv">> ->
|
|
%% Example: etc/auth-built-in-db-bootstrap.csv
|
|
emqx_utils_stream:csv(Data);
|
|
<<>> ->
|
|
error(unknown_file_format);
|
|
Extension ->
|
|
error({unsupported_file_format, Extension})
|
|
end.
|
|
|
|
convertor(PasswordType, State) ->
|
|
fun(User) ->
|
|
convert_user(User, PasswordType, State)
|
|
end.
|
|
|
|
convert_user(
|
|
User = #{<<"user_id">> := UserId},
|
|
PasswordType,
|
|
#{user_group := UserGroup, password_hash_algorithm := Algorithm}
|
|
) ->
|
|
{PasswordHash, Salt} = find_password_hash(PasswordType, User, Algorithm),
|
|
#{
|
|
<<"user_id">> => UserId,
|
|
<<"password_hash">> => PasswordHash,
|
|
<<"salt">> => Salt,
|
|
<<"is_superuser">> => is_superuser(User),
|
|
<<"user_group">> => UserGroup
|
|
};
|
|
convert_user(_, _, _) ->
|
|
error(bad_format).
|
|
|
|
find_password_hash(hash, User = #{<<"password_hash">> := PasswordHash}, _) ->
|
|
{PasswordHash, maps:get(<<"salt">>, User, <<>>)};
|
|
find_password_hash(plain, #{<<"password">> := Password}, Algorithm) ->
|
|
emqx_authn_password_hashing:hash(Algorithm, Password);
|
|
find_password_hash(hash, _User, _) ->
|
|
error("hash_import_requires_password_hash_field");
|
|
find_password_hash(plain, _User, _Algorithm) ->
|
|
error("plain_import_requires_password_field");
|
|
find_password_hash(_, _, _) ->
|
|
error(bad_format).
|
|
|
|
is_superuser(#{<<"is_superuser">> := <<"true">>}) -> true;
|
|
is_superuser(#{<<"is_superuser">> := true}) -> true;
|
|
is_superuser(_) -> false.
|
|
|
|
boostrap_user_from_file(Config, State) ->
|
|
case maps:get(bootstrap_file, Config, <<>>) of
|
|
<<>> ->
|
|
ok;
|
|
FileName0 ->
|
|
#{bootstrap_type := Type} = Config,
|
|
FileName = emqx_schema:naive_env_interpolation(FileName0),
|
|
case file:read_file(FileName) of
|
|
{ok, FileData} ->
|
|
_ = import_users({Type, FileName, FileData}, State, #{override => false}),
|
|
ok;
|
|
{error, Reason} ->
|
|
?SLOG(warning, #{
|
|
msg => "boostrap_authn_built_in_database_failed",
|
|
boostrap_file => FileName,
|
|
boostrap_type => Type,
|
|
reason => emqx_utils:explain_posix(Reason)
|
|
})
|
|
end
|
|
end.
|