Merge pull request #6775 from zhongwencool/sync-from-4.3-to-4.4

Sync from 4.3 to 4.4
This commit is contained in:
zhongwencool 2022-01-19 12:48:32 +08:00 committed by GitHub
commit 3549d145a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 235 additions and 94 deletions

View File

@ -10,6 +10,13 @@ File format:
- One list item per change topic
Change log ends with a list of github PRs
## v4.3.12
### Important changes
### Minor changes
* Fix updating `emqx_auth_mnesia.conf` password and restarting the new password does not take effect [#6717]
* Fix import data crash when emqx_auth_mnesia's record is not empty [#6717]
## v4.3.11
Important notes:

View File

@ -1,6 +1,6 @@
{application, emqx_auth_mnesia,
[{description, "EMQ X Authentication with Mnesia"},
{vsn, "4.3.4"}, % strict semver, bump manually
{vsn, "4.3.5"}, % strict semver, bump manually
{modules, []},
{registered, []},
{applications, [kernel,stdlib,mnesia]},

View File

@ -11,6 +11,10 @@
{load_module,emqx_acl_mnesia_api, brutal_purge,soft_purge,[]},
{load_module,emqx_acl_mnesia_cli, brutal_purge,soft_purge,[]}
]},
{<<"4.3.4">>, [
{load_module,emqx_auth_mnesia, brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mnesia_cli, brutal_purge,soft_purge,[]}
]},
{<<".*">>, [
]}
],
@ -25,6 +29,10 @@
{delete_module,emqx_acl_mnesia_migrator},
{delete_module,emqx_acl_mnesia_db}
]},
{<<"4.3.4">>, [
{load_module,emqx_auth_mnesia, brutal_purge,soft_purge,[]},
{load_module,emqx_auth_mnesia_cli, brutal_purge,soft_purge,[]}
]},
{<<".*">>, [
]}
]

View File

@ -32,6 +32,8 @@
, description/0
]).
-export([match_password/3]).
init(#{clientid_list := ClientidList, username_list := UsernameList}) ->
ok = ekka_mnesia:create_table(?TABLE, [
{disc_copies, [node()]},
@ -45,7 +47,7 @@ init(#{clientid_list := ClientidList, username_list := UsernameList}) ->
%% @private
add_default_user({Login, Password}) when is_tuple(Login) ->
emqx_auth_mnesia_cli:add_user(Login, Password).
emqx_auth_mnesia_cli:force_add_user(Login, Password).
-spec(register_metrics() -> ok).
register_metrics() ->

View File

@ -22,6 +22,7 @@
-define(TABLE, emqx_user).
%% Auth APIs
-export([ add_user/2
, force_add_user/2
, update_user/2
, remove_user/1
, lookup_user/1
@ -56,6 +57,32 @@ insert_user(User = #emqx_user{login = Login}) ->
[_|_] -> mnesia:abort(existed)
end.
force_add_user(Login, Password) ->
User = #emqx_user{
login = Login,
password = encrypted_data(Password),
created_at = erlang:system_time(millisecond)
},
case ret(mnesia:transaction(fun insert_or_update_user/2, [Password, User])) of
{ok, override} ->
?LOG(warning, "[Mnesia] (~p)'s password has be updated.", [Login]),
ok;
Other -> Other
end.
insert_or_update_user(NewPwd, User = #emqx_user{login = Login}) ->
case mnesia:read(?TABLE, Login) of
[] -> mnesia:write(User);
[#emqx_user{password = Pwd}] ->
case emqx_auth_mnesia:match_password(NewPwd, hash_type(), [Pwd]) of
true -> ok;
false ->
ok = mnesia:write(User),
{ok, override}
end
end.
%% @doc Update User
-spec(update_user(tuple(), binary()) -> ok | {error, any()}).
update_user(Login, NewPassword) ->
@ -105,11 +132,11 @@ comparing({?TABLE, _, _, CreatedAt1},
{?TABLE, _, _, CreatedAt2}) ->
CreatedAt1 >= CreatedAt2.
ret({atomic, ok}) -> ok;
ret({atomic, Res}) -> Res;
ret({aborted, Error}) -> {error, Error}.
encrypted_data(Password) ->
HashType = application:get_env(emqx_auth_mnesia, password_hash, sha256),
HashType = hash_type(),
SaltBin = salt(),
<<SaltBin/binary, (hash(Password, SaltBin, HashType))/binary>>.
@ -192,3 +219,5 @@ auth_username_cli(_) ->
{"user add <Username> <Password>", "Add username auth rule"},
{"user update <Username> <NewPassword>", "Update username auth rule"},
{"user delete <Username>", "Delete username auth rule"}]).
hash_type() ->
application:get_env(emqx_auth_mnesia, password_hash, sha256).

View File

@ -46,11 +46,15 @@ all() ->
groups() ->
[].
init_per_suite(t_boot) ->
ok;
init_per_suite(Config) ->
ok = emqx_ct_helpers:start_apps([emqx_management, emqx_auth_mnesia], fun set_special_configs/1),
create_default_app(),
Config.
end_per_suite(t_boot) ->
ok;
end_per_suite(_Config) ->
delete_default_app(),
emqx_ct_helpers:stop_apps([emqx_management, emqx_auth_mnesia]).
@ -65,10 +69,63 @@ set_special_configs(emqx) ->
set_special_configs(_App) ->
ok.
set_default(ClientId, UserName, Pwd, HashType) ->
application:set_env(emqx_auth_mnesia, clientid_list, [{ClientId, Pwd}]),
application:set_env(emqx_auth_mnesia, username_list, [{UserName, Pwd}]),
application:set_env(emqx_auth_mnesia, password_hash, HashType),
ok.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_boot(_Config) ->
clean_all_users(),
emqx_ct_helpers:stop_apps([emqx_auth_mnesia]),
ClientId = <<"clientid-test">>,
UserName = <<"username-test">>,
Pwd = <<"emqx123456">>,
ok = emqx_ct_helpers:start_apps([emqx_auth_mnesia],
fun(_) -> set_default(ClientId, UserName, Pwd, sha256) end),
Ok = {stop, #{anonymous => false, auth_result => success}},
Failed = {stop, #{anonymous => false, auth_result => password_error}},
?assertEqual(Ok,
emqx_auth_mnesia:check(#{clientid => ClientId, password => Pwd}, #{}, #{hash_type => sha256})),
?assertEqual(Ok,
emqx_auth_mnesia:check(#{clientid => <<"NotExited">>, username => UserName, password => Pwd},
#{}, #{hash_type => sha256})),
?assertEqual(Failed,
emqx_auth_mnesia:check(#{clientid => ClientId, password => <<Pwd/binary, "bad">>},
#{}, #{hash_type => sha256})),
?assertEqual(Failed,
emqx_auth_mnesia:check(#{clientid => ClientId, username => UserName, password => <<Pwd/binary, "bad">>},
#{}, #{hash_type => sha256})),
emqx_ct_helpers:stop_apps([emqx_auth_mnesia]),
%% change default pwd
NewPwd = <<"emqx654321">>,
ok = emqx_ct_helpers:start_apps([emqx_auth_mnesia],
fun(_) -> set_default(ClientId, UserName, NewPwd, sha256) end),
?assertEqual(Ok,
emqx_auth_mnesia:check(#{clientid => ClientId, password => NewPwd},
#{}, #{hash_type => sha256})),
?assertEqual(Ok,
emqx_auth_mnesia:check(#{clientid => <<"NotExited">>, username => UserName, password => NewPwd},
#{}, #{hash_type => sha256})),
emqx_ct_helpers:stop_apps([emqx_auth_mnesia]),
%% change hash_type
NewPwd2 = <<"emqx6543210">>,
ok = emqx_ct_helpers:start_apps([emqx_auth_mnesia],
fun(_) -> set_default(ClientId, UserName, NewPwd2, plain) end),
?assertEqual(Ok,
emqx_auth_mnesia:check(#{clientid => ClientId, password => NewPwd2},
#{}, #{hash_type => plain})),
?assertEqual(Ok,
emqx_auth_mnesia:check(#{clientid => <<"NotExited">>, username => UserName, password => NewPwd2},
#{}, #{hash_type => plain})),
clean_all_users(),
ok.
t_management(_Config) ->
clean_all_users(),

View File

@ -46,8 +46,8 @@
, import_users/1
, import_auth_clientid/1 %% BACKW: 4.1.x
, import_auth_username/1 %% BACKW: 4.1.x
, import_auth_mnesia/2
, import_acl_mnesia/2
, import_auth_mnesia/1
, import_acl_mnesia/1
, to_version/1
]).
@ -435,87 +435,103 @@ import_auth_username(Lists) ->
end, Lists)
end.
-ifdef(EMQX_ENTERPRISE).
import_auth_mnesia(Auths, FromVersion) when FromVersion =:= "4.0" orelse
FromVersion =:= "4.1" ->
do_import_auth_mnesia_by_old_data(Auths);
import_auth_mnesia(Auths, _) ->
do_import_auth_mnesia(Auths).
import_auth_mnesia(Auths) ->
case validate_auth(Auths) of
ignore -> ok;
old -> do_import_auth_mnesia_by_old_data(Auths);
new -> do_import_auth_mnesia(Auths)
end.
import_acl_mnesia(Acls, FromVersion) when FromVersion =:= "4.0" orelse
FromVersion =:= "4.1" ->
do_import_acl_mnesia_by_old_data(Acls);
validate_auth(Auths) ->
case ets:info(emqx_user) of
undefined -> ignore;
_ ->
case lists:all(fun is_new_auth_data/1, Auths) of
true -> new;
false ->
case lists:all(fun is_old_auth_data/1, Auths) of
true ->
_ = get_old_type(),
old;
false -> error({auth_mnesia_data_error, Auths})
end
end
end.
import_acl_mnesia(Acls, _) ->
do_import_acl_mnesia(Acls).
-else.
import_auth_mnesia(Auths, FromVersion) when FromVersion =:= "4.3" ->
do_import_auth_mnesia(Auths);
import_auth_mnesia(Auths, _FromVersion) ->
do_import_auth_mnesia_by_old_data(Auths).
is_new_auth_data(#{<<"type">> := _, <<"login">> := _, <<"password">> := _}) -> true;
is_new_auth_data(_) -> false.
import_acl_mnesia(Acls, FromVersion) when FromVersion =:= "4.3" ->
do_import_acl_mnesia(Acls);
import_acl_mnesia(Acls, _FromVersion) ->
do_import_acl_mnesia_by_old_data(Acls).
-endif.
is_old_auth_data(#{<<"login">> := _, <<"password">> := _} = Auth) ->
not maps:is_key(<<"type">>, Auth);
is_old_auth_data(_) -> false.
do_import_auth_mnesia_by_old_data(Auths) ->
case ets:info(emqx_user) of
undefined -> ok;
_ ->
CreatedAt = erlang:system_time(millisecond),
lists:foreach(fun(#{<<"login">> := Login,
<<"password">> := Password}) ->
mnesia:dirty_write({emqx_user, {get_old_type(), Login}, base64:decode(Password), CreatedAt})
end, Auths)
end.
CreatedAt = erlang:system_time(millisecond),
Type = get_old_type(),
lists:foreach(fun(#{<<"login">> := Login, <<"password">> := Password}) ->
mnesia:dirty_write({emqx_user, {Type, Login}, base64:decode(Password), CreatedAt})
end, Auths).
do_import_auth_mnesia(Auths) ->
case ets:info(emqx_user) of
undefined -> ok;
_ ->
lists:foreach(fun(#{<<"login">> := Login,
<<"type">> := Type,
<<"password">> := Password } = Map) ->
CreatedAt = maps:get(<<"created_at">>, Map, erlang:system_time(millisecond)),
mnesia:dirty_write({emqx_user, {any_to_atom(Type), Login}, base64:decode(Password), CreatedAt})
end, Auths)
CreatedAt0 = erlang:system_time(millisecond),
lists:foreach(fun(#{<<"login">> := Login,
<<"type">> := Type, <<"password">> := Password } = Map) ->
CreatedAt = maps:get(<<"created_at">>, Map, CreatedAt0),
mnesia:dirty_write({emqx_user, {any_to_atom(Type), Login}, base64:decode(Password), CreatedAt})
end, Auths).
import_acl_mnesia(Acls) ->
case validate_acl(Acls) of
ignore -> ok;
old -> do_import_acl_mnesia_by_old_data(Acls);
new -> do_import_acl_mnesia(Acls)
end.
validate_acl(Acls) ->
case ets:info(emqx_acl2) of
undefined -> ignore;
_ ->
case lists:all(fun is_new_acl_data/1, Acls) of
true -> new;
false ->
case lists:all(fun is_old_acl_data/1, Acls) of
true ->
_ = get_old_type(),
old;
false -> error({acl_mnesia_data_error, Acls})
end
end
end.
is_new_acl_data(#{<<"action">> := _, <<"access">> := _,
<<"topic">> := _, <<"type">> := _}) -> true;
is_new_acl_data(_) -> false.
is_old_acl_data(#{<<"login">> := _, <<"topic">> := _,
<<"allow">> := Allow, <<"action">> := _}) -> is_boolean(any_to_atom(Allow));
is_old_acl_data(_) -> false.
do_import_acl_mnesia_by_old_data(Acls) ->
case ets:info(emqx_acl2) of
undefined -> ok;
_ ->
lists:foreach(fun(#{<<"login">> := Login,
<<"topic">> := Topic,
<<"allow">> := Allow,
<<"action">> := Action}) ->
Allow1 = case any_to_atom(Allow) of
true -> allow;
false -> deny
end,
emqx_acl_mnesia_db:add_acl({get_old_type(), Login}, Topic, any_to_atom(Action), Allow1)
end, Acls)
end.
lists:foreach(fun(#{<<"login">> := Login,
<<"topic">> := Topic,
<<"allow">> := Allow,
<<"action">> := Action}) ->
Allow1 = case any_to_atom(Allow) of
true -> allow;
false -> deny
end,
emqx_acl_mnesia_db:add_acl({get_old_type(), Login}, Topic, any_to_atom(Action), Allow1)
end, Acls).
do_import_acl_mnesia(Acls) ->
case ets:info(emqx_acl2) of
undefined -> ok;
_ ->
lists:foreach(fun(Map = #{<<"action">> := Action,
<<"access">> := Access}) ->
Topic = maps:get(<<"topic">>, Map),
Login = case maps:get(<<"type_value">>, Map, undefined) of
undefined ->
all;
Value ->
{any_to_atom(maps:get(<<"type">>, Map)), Value}
end,
emqx_acl_mnesia_db:add_acl(Login, Topic, any_to_atom(Action), any_to_atom(Access))
end, Acls)
end.
lists:foreach(fun(Map = #{<<"action">> := Action,
<<"access">> := Access, <<"topic">> := Topic}) ->
Login = case maps:get(<<"type_value">>, Map, undefined) of
undefined -> all;
Value -> {any_to_atom(maps:get(<<"type">>, Map)), Value}
end,
emqx_acl_mnesia_db:add_acl(Login, Topic, any_to_atom(Action), any_to_atom(Access))
end, Acls).
-ifdef(EMQX_ENTERPRISE).
-dialyzer({nowarn_function, [import_modules/1]}).
@ -638,7 +654,7 @@ import(Filename, OverridesJson) ->
Overrides = emqx_json:decode(OverridesJson, [return_maps]),
Data = maps:merge(Imported, Overrides),
Version = to_version(maps:get(<<"version">>, Data)),
read_global_auth_type(Data),
read_global_auth_type(Data, Version),
try
do_import_data(Data, Version),
logger:debug("The emqx data has been imported successfully"),
@ -657,7 +673,7 @@ import(Filename, OverridesJson) ->
Overrides = emqx_json:decode(OverridesJson, [return_maps]),
Data = maps:merge(Imported, Overrides),
Version = to_version(maps:get(<<"version">>, Data)),
read_global_auth_type(Data),
read_global_auth_type(Data, Version),
case is_version_supported(Data, Version) of
true ->
try
@ -684,8 +700,8 @@ do_import_data(Data, Version) ->
import_users(maps:get(<<"users">>, Data, [])),
import_auth_clientid(maps:get(<<"auth_clientid">>, Data, [])),
import_auth_username(maps:get(<<"auth_username">>, Data, [])),
import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, []), Version),
import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, []), Version).
import_auth_mnesia(maps:get(<<"auth_mnesia">>, Data, [])),
import_acl_mnesia(maps:get(<<"acl_mnesia">>, Data, [])).
-ifdef(EMQX_ENTERPRISE).
do_import_extra_data(Data, _Version) ->
@ -717,6 +733,8 @@ is_version_supported2("4.1") ->
true;
is_version_supported2("4.3") ->
true;
is_version_supported2("4.4") ->
true;
is_version_supported2(Version) ->
case re:run(Version, "^4.[02].\\d+$", [{capture, none}]) of
match ->
@ -732,34 +750,36 @@ is_version_supported2(Version) ->
end.
-endif.
read_global_auth_type(Data) ->
read_global_auth_type(Data, Version) ->
case {maps:get(<<"auth_mnesia">>, Data, []), maps:get(<<"acl_mnesia">>, Data, [])} of
{[], []} ->
%% Auth mnesia plugin is not used:
ok;
_ ->
do_read_global_auth_type(Data)
do_read_global_auth_type(Data, Version)
end.
-ifdef(EMQX_ENTERPRISE).
do_read_global_auth_type(Data) ->
do_read_global_auth_type(Data, _Version) ->
case Data of
#{<<"auth.mnesia.as">> := <<"username">>} ->
application:set_env(emqx_auth_mnesia, as, username);
set_old_type(username);
#{<<"auth.mnesia.as">> := <<"clientid">>} ->
application:set_env(emqx_auth_mnesia, as, clientid);
set_old_type(clientid);
_ ->
ok
end.
-else.
do_read_global_auth_type(Data) ->
do_read_global_auth_type(Data, FromVersion) ->
case Data of
#{<<"auth.mnesia.as">> := <<"username">>} ->
application:set_env(emqx_auth_mnesia, as, username);
set_old_type(username);
#{<<"auth.mnesia.as">> := <<"clientid">>} ->
application:set_env(emqx_auth_mnesia, as, clientid);
_ ->
set_old_type(clientid);
_ when FromVersion =:= "4.0" orelse
FromVersion =:= "4.1" orelse
FromVersion =:= "4.2"->
logger:error("While importing data from EMQX versions prior to 4.3 "
"it is necessary to specify the value of \"auth.mnesia.as\" parameter "
"as it was configured in etc/plugins/emqx_auth_mnesia.conf.\n"
@ -768,10 +788,15 @@ do_read_global_auth_type(Data) ->
"or\n"
" $ emqx_ctl data import <filename> --env '{\"auth.mnesia.as\":\"clientid\"}'",
[]),
error(import_failed)
error({import_failed, FromVersion});
_ ->
ok
end.
-endif.
get_old_type() ->
{ok, Type} = application:get_env(emqx_auth_mnesia, as),
Type.
set_old_type(Type) ->
application:set_env(emqx_auth_mnesia, as, Type).

View File

@ -39,13 +39,13 @@ cases() ->
[t_import].
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([emqx_management, emqx_dashboard, emqx_auth_mnesia]),
emqx_ct_helpers:start_apps([emqx_management, emqx_auth_mnesia]),
ekka_mnesia:start(),
emqx_mgmt_auth:mnesia(boot),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([emqx_modules, emqx_management, emqx_dashboard, emqx_auth_mnesia]),
emqx_ct_helpers:stop_apps([emqx_modules, emqx_management, emqx_auth_mnesia]),
ekka_mnesia:ensure_stopped().
init_per_testcase(_, Config) ->
@ -167,7 +167,7 @@ t_export_import(_Config) ->
?assertEqual([], emqx_acl_mnesia_db:all_acls()),
emqx_mgmt_data_backup:import_acl_mnesia(emqx_json:decode(AclData, [return_maps]), "4.3"),
emqx_mgmt_data_backup:import_acl_mnesia(emqx_json:decode(AclData, [return_maps])),
timer:sleep(100),
?assertMatch([

View File

@ -645,7 +645,7 @@ t_data(_) ->
request_api(post, api_path(["data","import"]), [], auth_header_(),
#{<<"filename">> => Filename})),
application:stop(emqx_rule_engine),
application:stop(emqx_dahboard),
application:stop(emqx_dashboard),
ok.
t_data_import_content(_) ->

View File

@ -23,6 +23,7 @@
, init/4 %% XXX: Compatible with before 4.2 version
, info/1
, check/2
, update_overall_limiter/4
]).
-record(limiter, {
@ -152,3 +153,15 @@ is_message_limiter(conn_messages_in) -> true;
is_message_limiter(conn_messages_routing) -> true;
is_message_limiter(overall_messages_routing) -> true;
is_message_limiter(_) -> false.
update_overall_limiter(Zone, Name, Capacity, Interval) ->
case is_overall_limiter(Name) of
false -> false;
_ ->
try
esockd_limiter:update({Zone, Name}, Capacity, Interval),
true
catch _:_:_ ->
false
end
end.