Merge pull request #6819 from HJianBo/merge-main-v4.3-into-v4.4
[Conflicts Resolved] Sync main-v4.3 into main-v4.4
This commit is contained in:
commit
ceeeaf37e1
|
@ -341,7 +341,7 @@ jobs:
|
|||
shell: bash
|
||||
steps:
|
||||
- uses: actions/download-artifact@v2
|
||||
name: Dowload built emqx and test scenario
|
||||
name: Download built emqx and test scenario
|
||||
with:
|
||||
name: emqx_built
|
||||
path: emqx_built
|
||||
|
|
|
@ -7,17 +7,8 @@
|
|||
ignore = 'client.auth.ignore'
|
||||
}).
|
||||
|
||||
-record(acl_metrics, {
|
||||
allow = 'client.acl.allow',
|
||||
deny = 'client.acl.deny',
|
||||
ignore = 'client.acl.ignore'
|
||||
}).
|
||||
|
||||
-define(METRICS(Type), tl(tuple_to_list(#Type{}))).
|
||||
-define(METRICS(Type, K), #Type{}#Type.K).
|
||||
|
||||
-define(AUTH_METRICS, ?METRICS(auth_metrics)).
|
||||
-define(AUTH_METRICS(K), ?METRICS(auth_metrics, K)).
|
||||
|
||||
-define(ACL_METRICS, ?METRICS(acl_metrics)).
|
||||
-define(ACL_METRICS(K), ?METRICS(acl_metrics, K)).
|
||||
|
|
|
@ -29,26 +29,17 @@
|
|||
]).
|
||||
|
||||
%% ACL callbacks
|
||||
-export([ register_metrics/0
|
||||
, check_acl/5
|
||||
-export([ check_acl/5
|
||||
, description/0
|
||||
]).
|
||||
|
||||
-spec(register_metrics() -> ok).
|
||||
register_metrics() ->
|
||||
lists:foreach(fun emqx_metrics:ensure/1, ?ACL_METRICS).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% ACL callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
check_acl(ClientInfo, PubSub, Topic, AclResult, Params) ->
|
||||
return_with(fun inc_metrics/1,
|
||||
do_check_acl(ClientInfo, PubSub, Topic, AclResult, Params)).
|
||||
|
||||
do_check_acl(#{username := <<$$, _/binary>>}, _PubSub, _Topic, _AclResult, _Params) ->
|
||||
check_acl(#{username := <<$$, _/binary>>}, _PubSub, _Topic, _AclResult, _Params) ->
|
||||
ok;
|
||||
do_check_acl(ClientInfo, PubSub, Topic, _AclResult, #{acl := ACLParams = #{path := Path}}) ->
|
||||
check_acl(ClientInfo, PubSub, Topic, _AclResult, #{acl := ACLParams = #{path := Path}}) ->
|
||||
ClientInfo1 = ClientInfo#{access => access(PubSub), topic => Topic},
|
||||
case check_acl_request(ACLParams, ClientInfo1) of
|
||||
{ok, 200, <<"ignore">>} -> ok;
|
||||
|
@ -65,16 +56,6 @@ description() -> "ACL with HTTP API".
|
|||
%% Internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
inc_metrics(ok) ->
|
||||
emqx_metrics:inc(?ACL_METRICS(ignore));
|
||||
inc_metrics({stop, allow}) ->
|
||||
emqx_metrics:inc(?ACL_METRICS(allow));
|
||||
inc_metrics({stop, deny}) ->
|
||||
emqx_metrics:inc(?ACL_METRICS(deny)).
|
||||
|
||||
return_with(Fun, Result) ->
|
||||
Fun(Result), Result.
|
||||
|
||||
check_acl_request(#{pool_name := PoolName,
|
||||
path := Path,
|
||||
method := Method,
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_auth_http,
|
||||
[{description, "EMQ X Authentication/ACL with HTTP API"},
|
||||
{vsn, "4.3.3"}, % strict semver, bump manually!
|
||||
{vsn, "4.3.4"}, % strict semver, bump manually!
|
||||
{modules, []},
|
||||
{registered, [emqx_auth_http_sup]},
|
||||
{applications, [kernel,stdlib,ehttpc]},
|
||||
|
|
|
@ -1,15 +1,23 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{VSN,
|
||||
[{"4.3.2",
|
||||
[{"4.3.3",
|
||||
[{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.2",
|
||||
[{apply,{application,stop,[emqx_auth_http]}},
|
||||
{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}]},
|
||||
{<<"4.3.[0-1]">>,
|
||||
[{restart_application,emqx_auth_http}]},
|
||||
{<<".*">>,[]}],
|
||||
[{"4.3.2",
|
||||
[{"4.3.3",
|
||||
[{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.2",
|
||||
[{apply,{application,stop,[emqx_auth_http]}},
|
||||
{load_module,emqx_auth_http_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_acl_http,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_auth_http_cli,brutal_purge,soft_purge,[]}]},
|
||||
{<<"4.3.[0-1]">>,
|
||||
[{restart_application,emqx_auth_http}]},
|
||||
|
|
|
@ -130,7 +130,6 @@ load_hooks() ->
|
|||
case application:get_env(?APP, acl_req) of
|
||||
undefined -> ok;
|
||||
{ok, ACLReq} ->
|
||||
ok = emqx_acl_http:register_metrics(),
|
||||
PoolOpts2 = proplists:get_value(pool_opts, ACLReq),
|
||||
PoolName2 = proplists:get_value(pool_name, ACLReq),
|
||||
{ok, _} = ehttpc_sup:start_pool(PoolName2, PoolOpts2),
|
||||
|
|
|
@ -7,17 +7,8 @@
|
|||
ignore = 'client.auth.ignore'
|
||||
}).
|
||||
|
||||
-record(acl_metrics, {
|
||||
allow = 'client.acl.allow',
|
||||
deny = 'client.acl.deny',
|
||||
ignore = 'client.acl.ignore'
|
||||
}).
|
||||
|
||||
-define(METRICS(Type), tl(tuple_to_list(#Type{}))).
|
||||
-define(METRICS(Type, K), #Type{}#Type.K).
|
||||
|
||||
-define(AUTH_METRICS, ?METRICS(auth_metrics)).
|
||||
-define(AUTH_METRICS(K), ?METRICS(auth_metrics, K)).
|
||||
|
||||
-define(ACL_METRICS, ?METRICS(acl_metrics)).
|
||||
-define(ACL_METRICS(K), ?METRICS(acl_metrics, K)).
|
||||
|
|
|
@ -22,20 +22,15 @@
|
|||
-include_lib("eldap/include/eldap.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
-export([ register_metrics/0
|
||||
, check_acl/5
|
||||
-export([ check_acl/5
|
||||
, description/0
|
||||
]).
|
||||
|
||||
-spec(register_metrics() -> ok).
|
||||
register_metrics() ->
|
||||
lists:foreach(fun emqx_metrics:ensure/1, ?ACL_METRICS).
|
||||
|
||||
check_acl(ClientInfo, PubSub, Topic, NoMatchAction, State) ->
|
||||
case do_check_acl(ClientInfo, PubSub, Topic, NoMatchAction, State) of
|
||||
ok -> emqx_metrics:inc(?ACL_METRICS(ignore)), ok;
|
||||
{stop, allow} -> emqx_metrics:inc(?ACL_METRICS(allow)), {stop, allow};
|
||||
{stop, deny} -> emqx_metrics:inc(?ACL_METRICS(deny)), {stop, deny}
|
||||
ok -> ok;
|
||||
{stop, allow} -> {stop, allow};
|
||||
{stop, deny} -> {stop, deny}
|
||||
end.
|
||||
|
||||
do_check_acl(#{username := <<$$, _/binary>>}, _PubSub, _Topic, _NoMatchAction, _State) ->
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_auth_ldap,
|
||||
[{description, "EMQ X Authentication/ACL with LDAP"},
|
||||
{vsn, "4.3.2"}, % strict semver, bump manually!
|
||||
{vsn, "4.3.3"}, % strict semver, bump manually!
|
||||
{modules, []},
|
||||
{registered, [emqx_auth_ldap_sup]},
|
||||
{applications, [kernel,stdlib,eldap2,ecpool]},
|
||||
|
|
|
@ -3,9 +3,16 @@
|
|||
[ {"4.3.0",
|
||||
[ {load_module, emqx_acl_ldap, brutal_purge, soft_purge, []}
|
||||
, {load_module, emqx_auth_ldap_cli, brutal_purge, soft_purge, []}
|
||||
, {load_module, emqx_auth_ldap_app, brutal_purge, soft_purge, []}
|
||||
]},
|
||||
{"4.3.1",
|
||||
[ {load_module, emqx_auth_ldap_cli, brutal_purge, soft_purge, []}
|
||||
, {load_module, emqx_acl_ldap, brutal_purge, soft_purge, []}
|
||||
, {load_module, emqx_auth_ldap_app, brutal_purge, soft_purge, []}
|
||||
]},
|
||||
{"4.3.2",
|
||||
[ {load_module, emqx_acl_ldap, brutal_purge, soft_purge, []}
|
||||
, {load_module, emqx_auth_ldap_app, brutal_purge, soft_purge, []}
|
||||
]},
|
||||
{<<".*">>, []}
|
||||
],
|
||||
|
@ -13,9 +20,16 @@
|
|||
{"4.3.0",
|
||||
[ {load_module, emqx_acl_ldap, brutal_purge, soft_purge, []}
|
||||
, {load_module, emqx_auth_ldap_cli, brutal_purge, soft_purge, []}
|
||||
, {load_module, emqx_auth_ldap_app, brutal_purge, soft_purge, []}
|
||||
]},
|
||||
{"4.3.1",
|
||||
[ {load_module, emqx_auth_ldap_cli, brutal_purge, soft_purge, []}
|
||||
, {load_module, emqx_acl_ldap, brutal_purge, soft_purge, []}
|
||||
, {load_module, emqx_auth_ldap_app, brutal_purge, soft_purge, []}
|
||||
]},
|
||||
{"4.3.2",
|
||||
[ {load_module, emqx_acl_ldap, brutal_purge, soft_purge, []}
|
||||
, {load_module, emqx_auth_ldap_app, brutal_purge, soft_purge, []}
|
||||
]},
|
||||
{<<".*">>, []}
|
||||
]
|
||||
|
|
|
@ -54,7 +54,6 @@ load_auth_hook(DeviceDn) ->
|
|||
emqx:hook('client.authenticate', fun emqx_auth_ldap:check/3, [Params#{pool => ?APP}]).
|
||||
|
||||
load_acl_hook(DeviceDn) ->
|
||||
ok = emqx_acl_ldap:register_metrics(),
|
||||
Params = maps:from_list(DeviceDn),
|
||||
emqx:hook('client.check_acl', fun emqx_acl_ldap:check_acl/5 , [Params#{pool => ?APP}]).
|
||||
|
||||
|
|
|
@ -48,17 +48,8 @@
|
|||
ignore = 'client.auth.ignore'
|
||||
}).
|
||||
|
||||
-record(acl_metrics, {
|
||||
allow = 'client.acl.allow',
|
||||
deny = 'client.acl.deny',
|
||||
ignore = 'client.acl.ignore'
|
||||
}).
|
||||
|
||||
-define(METRICS(Type), tl(tuple_to_list(#Type{}))).
|
||||
-define(METRICS(Type, K), #Type{}#Type.K).
|
||||
|
||||
-define(AUTH_METRICS, ?METRICS(auth_metrics)).
|
||||
-define(AUTH_METRICS(K), ?METRICS(auth_metrics, K)).
|
||||
|
||||
-define(ACL_METRICS, ?METRICS(acl_metrics)).
|
||||
-define(ACL_METRICS(K), ?METRICS(acl_metrics, K)).
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
|
||||
%% ACL Callbacks
|
||||
-export([ init/0
|
||||
, register_metrics/0
|
||||
, check_acl/5
|
||||
, description/0
|
||||
]).
|
||||
|
@ -29,10 +28,6 @@ init() ->
|
|||
ok = emqx_acl_mnesia_db:create_table(),
|
||||
ok = emqx_acl_mnesia_db:create_table2().
|
||||
|
||||
-spec(register_metrics() -> ok).
|
||||
register_metrics() ->
|
||||
lists:foreach(fun emqx_metrics:ensure/1, ?ACL_METRICS).
|
||||
|
||||
check_acl(ClientInfo = #{ clientid := Clientid }, PubSub, Topic, _NoMatchAction, _Params) ->
|
||||
Username = maps:get(username, ClientInfo, undefined),
|
||||
|
||||
|
@ -48,13 +43,10 @@ check_acl(ClientInfo = #{ clientid := Clientid }, PubSub, Topic, _NoMatchAction,
|
|||
|
||||
case match(ClientInfo, PubSub, Topic, Acls) of
|
||||
allow ->
|
||||
emqx_metrics:inc(?ACL_METRICS(allow)),
|
||||
{stop, allow};
|
||||
deny ->
|
||||
emqx_metrics:inc(?ACL_METRICS(deny)),
|
||||
{stop, deny};
|
||||
_ ->
|
||||
emqx_metrics:inc(?ACL_METRICS(ignore)),
|
||||
ok
|
||||
end.
|
||||
|
||||
|
|
|
@ -8,12 +8,15 @@
|
|||
{apply, {emqx_acl_mnesia_migrator, start_supervised, []}},
|
||||
{load_module,emqx_auth_mnesia_api, brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_acl_mnesia, brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_auth_mnesia_app, brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_acl_mnesia_api, brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_acl_mnesia_cli, brutal_purge,soft_purge,[]}
|
||||
]},
|
||||
{<<"4.3.4">>, [
|
||||
{load_module,emqx_auth_mnesia, brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_auth_mnesia_cli, brutal_purge,soft_purge,[]}
|
||||
{load_module,emqx_auth_mnesia_cli, brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_acl_mnesia, brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_auth_mnesia_app, brutal_purge,soft_purge,[]}
|
||||
]},
|
||||
{<<".*">>, [
|
||||
]}
|
||||
|
@ -26,12 +29,15 @@
|
|||
{load_module,emqx_acl_mnesia_api, brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_auth_mnesia_api, brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_acl_mnesia, brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_auth_mnesia_app, brutal_purge,soft_purge,[]},
|
||||
{delete_module,emqx_acl_mnesia_migrator},
|
||||
{delete_module,emqx_acl_mnesia_db}
|
||||
]},
|
||||
{<<"4.3.4">>, [
|
||||
{load_module,emqx_auth_mnesia, brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_auth_mnesia_cli, brutal_purge,soft_purge,[]}
|
||||
{load_module,emqx_auth_mnesia_cli, brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_acl_mnesia, brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_auth_mnesia_app, brutal_purge,soft_purge,[]}
|
||||
]},
|
||||
{<<".*">>, [
|
||||
]}
|
||||
|
|
|
@ -32,27 +32,32 @@
|
|||
, description/0
|
||||
]).
|
||||
|
||||
-export([match_password/3]).
|
||||
-export([ match_password/3
|
||||
, hash_type/0
|
||||
]).
|
||||
|
||||
init(#{clientid_list := ClientidList, username_list := UsernameList}) ->
|
||||
ok = ekka_mnesia:create_table(?TABLE, [
|
||||
{disc_copies, [node()]},
|
||||
{attributes, record_info(fields, emqx_user)},
|
||||
{storage_properties, [{ets, [{read_concurrency, true}]}]}]),
|
||||
_ = [ add_default_user({{clientid, iolist_to_binary(Clientid)}, iolist_to_binary(Password)})
|
||||
|| {Clientid, Password} <- ClientidList],
|
||||
_ = [ add_default_user({{username, iolist_to_binary(Username)}, iolist_to_binary(Password)})
|
||||
|| {Username, Password} <- UsernameList],
|
||||
ok = ekka_mnesia:copy_table(?TABLE, disc_copies).
|
||||
lists:foreach(fun({Clientid, Password}) ->
|
||||
emqx_auth_mnesia_cli:add_default_user(clientid, iolist_to_binary(Clientid), iolist_to_binary(Password))
|
||||
end, ClientidList),
|
||||
|
||||
%% @private
|
||||
add_default_user({Login, Password}) when is_tuple(Login) ->
|
||||
emqx_auth_mnesia_cli:force_add_user(Login, Password).
|
||||
lists:foreach(fun({Username, Password}) ->
|
||||
emqx_auth_mnesia_cli:add_default_user(username, iolist_to_binary(Username), iolist_to_binary(Password))
|
||||
end, UsernameList),
|
||||
|
||||
ok = ekka_mnesia:copy_table(?TABLE, disc_copies).
|
||||
|
||||
-spec(register_metrics() -> ok).
|
||||
register_metrics() ->
|
||||
lists:foreach(fun emqx_metrics:ensure/1, ?AUTH_METRICS).
|
||||
|
||||
hash_type() ->
|
||||
application:get_env(emqx_auth_mnesia, password_hash, sha256).
|
||||
|
||||
check(ClientInfo = #{ clientid := Clientid
|
||||
, password := NPassword
|
||||
}, AuthResult, #{hash_type := HashType}) ->
|
||||
|
|
|
@ -57,12 +57,9 @@ load_auth_hook() ->
|
|||
UsernameList = application:get_env(?APP, username_list, []),
|
||||
ok = emqx_auth_mnesia:init(#{clientid_list => ClientidList, username_list => UsernameList}),
|
||||
ok = emqx_auth_mnesia:register_metrics(),
|
||||
Params = #{
|
||||
hash_type => application:get_env(emqx_auth_mnesia, password_hash, sha256)
|
||||
},
|
||||
Params = #{hash_type => emqx_auth_mnesia:hash_type()},
|
||||
emqx:hook('client.authenticate', fun emqx_auth_mnesia:check/3, [Params]).
|
||||
|
||||
load_acl_hook() ->
|
||||
ok = emqx_acl_mnesia:init(),
|
||||
ok = emqx_acl_mnesia:register_metrics(),
|
||||
emqx:hook('client.check_acl', fun emqx_acl_mnesia:check_acl/5, [#{}]).
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
%% Auth APIs
|
||||
-export([ add_user/2
|
||||
, force_add_user/2
|
||||
, add_default_user/3
|
||||
, update_user/2
|
||||
, remove_user/1
|
||||
, lookup_user/1
|
||||
|
@ -57,6 +58,39 @@ insert_user(User = #emqx_user{login = Login}) ->
|
|||
[_|_] -> mnesia:abort(existed)
|
||||
end.
|
||||
|
||||
-spec(add_default_user(clientid | username, tuple(), binary()) -> ok | {error, any()}).
|
||||
add_default_user(Type, Key, Password) ->
|
||||
Login = {Type, Key},
|
||||
case add_user(Login, Password) of
|
||||
ok -> ok;
|
||||
{error, existed} ->
|
||||
NewPwd = encrypted_data(Password),
|
||||
[#emqx_user{password = OldPwd}] = emqx_auth_mnesia_cli:lookup_user(Login),
|
||||
HashType = emqx_auth_mnesia:hash_type(),
|
||||
case emqx_auth_mnesia:match_password(NewPwd, HashType, [OldPwd]) of
|
||||
true -> ok;
|
||||
false ->
|
||||
%% We can't force add default,
|
||||
%% otherwise passwords that have been updated via HTTP API will be reset after reboot.
|
||||
TypeCtl =
|
||||
case Type of
|
||||
clientid -> clientid;
|
||||
username -> user
|
||||
end,
|
||||
?LOG(warning,
|
||||
"[Auth Mnesia] auth.client.x.~p=~s's password in the emqx_auth_mnesia.conf\n"
|
||||
"does not match the password in the database(mnesia).\n"
|
||||
"1. If you have already changed the password via the HTTP API, this warning has no effect.\n"
|
||||
"You can remove the warning from emqx_auth_mnesia.conf to resolve the warning.\n"
|
||||
"2. If you just want to update the password by manually changing the configuration file,\n"
|
||||
"you need to delete the old user and password using `emqx_ctl ~p delete ~s` first\n"
|
||||
"the new password in emqx_auth_mnesia.conf can take effect after reboot.",
|
||||
[Type, Key, TypeCtl, Key]),
|
||||
ok
|
||||
end;
|
||||
Error -> Error
|
||||
end.
|
||||
|
||||
force_add_user(Login, Password) ->
|
||||
User = #emqx_user{
|
||||
login = Login,
|
||||
|
@ -74,7 +108,7 @@ insert_or_update_user(NewPwd, User = #emqx_user{login = Login}) ->
|
|||
case mnesia:read(?TABLE, Login) of
|
||||
[] -> mnesia:write(User);
|
||||
[#emqx_user{password = Pwd}] ->
|
||||
case emqx_auth_mnesia:match_password(NewPwd, hash_type(), [Pwd]) of
|
||||
case emqx_auth_mnesia:match_password(NewPwd, emqx_auth_mnesia:hash_type(), [Pwd]) of
|
||||
true -> ok;
|
||||
false ->
|
||||
ok = mnesia:write(User),
|
||||
|
@ -136,7 +170,7 @@ ret({atomic, Res}) -> Res;
|
|||
ret({aborted, Error}) -> {error, Error}.
|
||||
|
||||
encrypted_data(Password) ->
|
||||
HashType = hash_type(),
|
||||
HashType = emqx_auth_mnesia:hash_type(),
|
||||
SaltBin = salt(),
|
||||
<<SaltBin/binary, (hash(Password, SaltBin, HashType))/binary>>.
|
||||
|
||||
|
@ -219,5 +253,3 @@ auth_username_cli(_) ->
|
|||
{"user add <Username> <Password>", "Add username auth rule"},
|
||||
{"user update <Username> <NewPassword>", "Update username auth rule"},
|
||||
{"user delete <Username>", "Delete username auth rule"}]).
|
||||
hash_type() ->
|
||||
application:get_env(emqx_auth_mnesia, password_hash, sha256).
|
||||
|
|
|
@ -103,6 +103,17 @@ t_boot(_Config) ->
|
|||
|
||||
%% change default pwd
|
||||
NewPwd = <<"emqx654321">>,
|
||||
ok = emqx_ct_helpers:start_apps([emqx_auth_mnesia],
|
||||
fun(_) -> set_default(ClientId, UserName, NewPwd, sha256) end),
|
||||
?assertEqual(Failed,
|
||||
emqx_auth_mnesia:check(#{clientid => ClientId, password => NewPwd},
|
||||
#{}, #{hash_type => sha256})),
|
||||
?assertEqual(Failed,
|
||||
emqx_auth_mnesia:check(#{clientid => <<"NotExited">>, username => UserName, password => NewPwd},
|
||||
#{}, #{hash_type => sha256})),
|
||||
clean_all_users(),
|
||||
emqx_ct_helpers:stop_apps([emqx_auth_mnesia]),
|
||||
|
||||
ok = emqx_ct_helpers:start_apps([emqx_auth_mnesia],
|
||||
fun(_) -> set_default(ClientId, UserName, NewPwd, sha256) end),
|
||||
?assertEqual(Ok,
|
||||
|
@ -115,6 +126,17 @@ t_boot(_Config) ->
|
|||
|
||||
%% change hash_type
|
||||
NewPwd2 = <<"emqx6543210">>,
|
||||
ok = emqx_ct_helpers:start_apps([emqx_auth_mnesia],
|
||||
fun(_) -> set_default(ClientId, UserName, NewPwd2, plain) end),
|
||||
?assertEqual(Failed,
|
||||
emqx_auth_mnesia:check(#{clientid => ClientId, password => NewPwd2},
|
||||
#{}, #{hash_type => plain})),
|
||||
?assertEqual(Failed,
|
||||
emqx_auth_mnesia:check(#{clientid => <<"NotExited">>, username => UserName, password => NewPwd2},
|
||||
#{}, #{hash_type => plain})),
|
||||
clean_all_users(),
|
||||
emqx_ct_helpers:stop_apps([emqx_auth_mnesia]),
|
||||
|
||||
ok = emqx_ct_helpers:start_apps([emqx_auth_mnesia],
|
||||
fun(_) -> set_default(ClientId, UserName, NewPwd2, plain) end),
|
||||
?assertEqual(Ok,
|
||||
|
|
|
@ -21,17 +21,8 @@
|
|||
ignore = 'client.auth.ignore'
|
||||
}).
|
||||
|
||||
-record(acl_metrics, {
|
||||
allow = 'client.acl.allow',
|
||||
deny = 'client.acl.deny',
|
||||
ignore = 'client.acl.ignore'
|
||||
}).
|
||||
|
||||
-define(METRICS(Type), tl(tuple_to_list(#Type{}))).
|
||||
-define(METRICS(Type, K), #Type{}#Type.K).
|
||||
|
||||
-define(AUTH_METRICS, ?METRICS(auth_metrics)).
|
||||
-define(AUTH_METRICS(K), ?METRICS(auth_metrics, K)).
|
||||
|
||||
-define(ACL_METRICS, ?METRICS(acl_metrics)).
|
||||
-define(ACL_METRICS(K), ?METRICS(acl_metrics, K)).
|
||||
|
|
|
@ -21,17 +21,12 @@
|
|||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
%% ACL callbacks
|
||||
-export([ register_metrics/0
|
||||
, check_acl/5
|
||||
-export([ check_acl/5
|
||||
, description/0
|
||||
]).
|
||||
-spec(register_metrics() -> ok).
|
||||
register_metrics() ->
|
||||
lists:foreach(fun emqx_metrics:ensure/1, ?ACL_METRICS).
|
||||
|
||||
check_acl(#{username := <<$$, _/binary>>}, _PubSub, _Topic, _AclResult, _State) ->
|
||||
ok;
|
||||
|
||||
check_acl(ClientInfo, PubSub, Topic, _AclResult, Env = #{aclquery := AclQuery}) ->
|
||||
#aclquery{collection = Coll, selector = SelectorList} = AclQuery,
|
||||
Pool = maps:get(pool, Env, ?APP),
|
||||
|
@ -43,20 +38,16 @@ check_acl(ClientInfo, PubSub, Topic, _AclResult, Env = #{aclquery := AclQuery})
|
|||
[] -> ok;
|
||||
Rows ->
|
||||
try match(ClientInfo, Topic, topics(PubSub, Rows)) of
|
||||
matched -> emqx_metrics:inc(?ACL_METRICS(allow)),
|
||||
{stop, allow};
|
||||
nomatch -> emqx_metrics:inc(?ACL_METRICS(deny)),
|
||||
{stop, deny}
|
||||
matched -> {stop, allow};
|
||||
nomatch -> {stop, deny}
|
||||
catch
|
||||
_Err:Reason->
|
||||
?LOG(error, "[MongoDB] Check mongo ~p ACL failed, got ACL config: ~p, error: :~p",
|
||||
[PubSub, Rows, Reason]),
|
||||
emqx_metrics:inc(?ACL_METRICS(ignore)),
|
||||
ignore
|
||||
end
|
||||
end.
|
||||
|
||||
|
||||
match(_ClientInfo, _Topic, []) ->
|
||||
nomatch;
|
||||
match(ClientInfo, Topic, [TopicFilter|More]) ->
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_auth_mongo,
|
||||
[{description, "EMQ X Authentication/ACL with MongoDB"},
|
||||
{vsn, "4.4.0"}, % strict semver, bump manually!
|
||||
{vsn, "4.4.1"}, % strict semver, bump manually!
|
||||
{modules, []},
|
||||
{registered, [emqx_auth_mongo_sup]},
|
||||
{applications, [kernel,stdlib,mongodb,ecpool]},
|
||||
|
|
|
@ -0,0 +1,11 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{VSN,
|
||||
[{"4.4.0",
|
||||
[{load_module,emqx_auth_mongo_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_acl_mongo,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}],
|
||||
[{"4.4.0",
|
||||
[{load_module,emqx_auth_mongo_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_acl_mongo,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}]
|
||||
}.
|
|
@ -55,7 +55,6 @@ reg_authmod(AuthQuery) ->
|
|||
[#{authquery => AuthQuery, superquery => SuperQuery, pool => ?APP}]).
|
||||
|
||||
reg_aclmod(AclQuery) ->
|
||||
emqx_acl_mongo:register_metrics(),
|
||||
ok = emqx:hook('client.check_acl', fun emqx_acl_mongo:check_acl/5, [#{aclquery => AclQuery, pool => ?APP}]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -7,17 +7,8 @@
|
|||
ignore = 'client.auth.ignore'
|
||||
}).
|
||||
|
||||
-record(acl_metrics, {
|
||||
allow = 'client.acl.allow',
|
||||
deny = 'client.acl.deny',
|
||||
ignore = 'client.acl.ignore'
|
||||
}).
|
||||
|
||||
-define(METRICS(Type), tl(tuple_to_list(#Type{}))).
|
||||
-define(METRICS(Type, K), #Type{}#Type.K).
|
||||
|
||||
-define(AUTH_METRICS, ?METRICS(auth_metrics)).
|
||||
-define(AUTH_METRICS(K), ?METRICS(auth_metrics, K)).
|
||||
|
||||
-define(ACL_METRICS, ?METRICS(acl_metrics)).
|
||||
-define(ACL_METRICS(K), ?METRICS(acl_metrics, K)).
|
||||
|
|
|
@ -22,20 +22,15 @@
|
|||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
%% ACL Callbacks
|
||||
-export([ register_metrics/0
|
||||
, check_acl/5
|
||||
-export([ check_acl/5
|
||||
, description/0
|
||||
]).
|
||||
|
||||
-spec(register_metrics() -> ok).
|
||||
register_metrics() ->
|
||||
lists:foreach(fun emqx_metrics:ensure/1, ?ACL_METRICS).
|
||||
|
||||
check_acl(ClientInfo, PubSub, Topic, NoMatchAction, #{pool := Pool} = State) ->
|
||||
case do_check_acl(Pool, ClientInfo, PubSub, Topic, NoMatchAction, State) of
|
||||
ok -> emqx_metrics:inc(?ACL_METRICS(ignore)), ok;
|
||||
{stop, allow} -> emqx_metrics:inc(?ACL_METRICS(allow)), {stop, allow};
|
||||
{stop, deny} -> emqx_metrics:inc(?ACL_METRICS(deny)), {stop, deny}
|
||||
ok -> ok;
|
||||
{stop, allow} -> {stop, allow};
|
||||
{stop, deny} -> {stop, deny}
|
||||
end.
|
||||
|
||||
do_check_acl(_Pool, #{username := <<$$, _/binary>>}, _PubSub, _Topic, _NoMatchAction, _State) ->
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_auth_mysql,
|
||||
[{description, "EMQ X Authentication/ACL with MySQL"},
|
||||
{vsn, "4.3.0"}, % strict semver, bump manually!
|
||||
{vsn, "4.3.1"}, % strict semver, bump manually!
|
||||
{modules, []},
|
||||
{registered, [emqx_auth_mysql_sup]},
|
||||
{applications, [kernel,stdlib,mysql,ecpool]},
|
||||
|
|
|
@ -0,0 +1,11 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{VSN,
|
||||
[{"4.3.0",
|
||||
[{load_module,emqx_auth_mysql_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_acl_mysql,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}],
|
||||
[{"4.3.0",
|
||||
[{load_module,emqx_auth_mysql_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_acl_mysql,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}]
|
||||
}.
|
|
@ -60,7 +60,6 @@ load_auth_hook(AuthQuery) ->
|
|||
emqx:hook('client.authenticate', fun emqx_auth_mysql:check/3, [Params]).
|
||||
|
||||
load_acl_hook(AclQuery) ->
|
||||
ok = emqx_acl_mysql:register_metrics(),
|
||||
emqx:hook('client.check_acl', fun emqx_acl_mysql:check_acl/5, [#{acl_query => AclQuery, pool =>?APP}]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -6,18 +6,8 @@
|
|||
ignore = 'client.auth.ignore'
|
||||
}).
|
||||
|
||||
-record(acl_metrics, {
|
||||
allow = 'client.acl.allow',
|
||||
deny = 'client.acl.deny',
|
||||
ignore = 'client.acl.ignore'
|
||||
}).
|
||||
|
||||
-define(METRICS(Type), tl(tuple_to_list(#Type{}))).
|
||||
-define(METRICS(Type, K), #Type{}#Type.K).
|
||||
|
||||
-define(AUTH_METRICS, ?METRICS(auth_metrics)).
|
||||
-define(AUTH_METRICS(K), ?METRICS(auth_metrics, K)).
|
||||
|
||||
-define(ACL_METRICS, ?METRICS(acl_metrics)).
|
||||
-define(ACL_METRICS(K), ?METRICS(acl_metrics, K)).
|
||||
|
||||
|
|
|
@ -21,21 +21,12 @@
|
|||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
%% ACL callbacks
|
||||
-export([ register_metrics/0
|
||||
, check_acl/5
|
||||
-export([ check_acl/5
|
||||
, description/0
|
||||
]).
|
||||
|
||||
-spec(register_metrics() -> ok).
|
||||
register_metrics() ->
|
||||
lists:foreach(fun emqx_metrics:ensure/1, ?ACL_METRICS).
|
||||
|
||||
check_acl(ClientInfo, PubSub, Topic, NoMatchAction, #{pool := Pool} = State) ->
|
||||
case do_check_acl(Pool, ClientInfo, PubSub, Topic, NoMatchAction, State) of
|
||||
ok -> emqx_metrics:inc(?ACL_METRICS(ignore)), ok;
|
||||
{stop, allow} -> emqx_metrics:inc(?ACL_METRICS(allow)), {stop, allow};
|
||||
{stop, deny} -> emqx_metrics:inc(?ACL_METRICS(deny)), {stop, deny}
|
||||
end.
|
||||
do_check_acl(Pool, ClientInfo, PubSub, Topic, NoMatchAction, State).
|
||||
|
||||
do_check_acl(_Pool, #{username := <<$$, _/binary>>}, _PubSub, _Topic, _NoMatchAction, _State) ->
|
||||
ok;
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_auth_pgsql,
|
||||
[{description, "EMQ X Authentication/ACL with PostgreSQL"},
|
||||
{vsn, "4.4.0"}, % strict semver, bump manually!
|
||||
{vsn, "4.4.1"}, % strict semver, bump manually!
|
||||
{modules, []},
|
||||
{registered, [emqx_auth_pgsql_sup]},
|
||||
{applications, [kernel,stdlib,epgsql,ecpool]},
|
||||
|
|
|
@ -0,0 +1,11 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{VSN,
|
||||
[{"4.4.0",
|
||||
[{load_module,emqx_auth_pgsql_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_acl_pgsql,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}],
|
||||
[{"4.4.0",
|
||||
[{load_module,emqx_auth_pgsql_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_acl_pgsql,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}]
|
||||
}.
|
|
@ -46,7 +46,6 @@ start(_StartType, _StartArgs) ->
|
|||
ok = emqx:hook('client.authenticate', fun emqx_auth_pgsql:check/3, [AuthEnv])
|
||||
end),
|
||||
if_enabled(acl_query, fun(AclQuery) ->
|
||||
ok = emqx_acl_pgsql:register_metrics(),
|
||||
ok = emqx:hook('client.check_acl', fun emqx_acl_pgsql:check_acl/5, [#{acl_query => AclQuery, pool => ?APP}])
|
||||
end),
|
||||
{ok, Sup}.
|
||||
|
|
|
@ -7,17 +7,8 @@
|
|||
ignore = 'client.auth.ignore'
|
||||
}).
|
||||
|
||||
-record(acl_metrics, {
|
||||
allow = 'client.acl.allow',
|
||||
deny = 'client.acl.deny',
|
||||
ignore = 'client.acl.ignore'
|
||||
}).
|
||||
|
||||
-define(METRICS(Type), tl(tuple_to_list(#Type{}))).
|
||||
-define(METRICS(Type, K), #Type{}#Type.K).
|
||||
|
||||
-define(AUTH_METRICS, ?METRICS(auth_metrics)).
|
||||
-define(AUTH_METRICS(K), ?METRICS(auth_metrics, K)).
|
||||
|
||||
-define(ACL_METRICS, ?METRICS(acl_metrics)).
|
||||
-define(ACL_METRICS(K), ?METRICS(acl_metrics, K)).
|
||||
|
|
|
@ -21,26 +21,14 @@
|
|||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
-export([ register_metrics/0
|
||||
, check_acl/5
|
||||
-export([ check_acl/5
|
||||
, description/0
|
||||
]).
|
||||
|
||||
-spec(register_metrics() -> ok).
|
||||
register_metrics() ->
|
||||
lists:foreach(fun emqx_metrics:ensure/1, ?ACL_METRICS).
|
||||
|
||||
check_acl(ClientInfo, PubSub, Topic, AclResult, Config) ->
|
||||
case do_check_acl(ClientInfo, PubSub, Topic, AclResult, Config) of
|
||||
ok -> emqx_metrics:inc(?ACL_METRICS(ignore)), ok;
|
||||
{stop, allow} -> emqx_metrics:inc(?ACL_METRICS(allow)), {stop, allow};
|
||||
{stop, deny} -> emqx_metrics:inc(?ACL_METRICS(deny)), {stop, deny}
|
||||
end.
|
||||
|
||||
do_check_acl(#{username := <<$$, _/binary>>}, _PubSub, _Topic, _AclResult, _Config) ->
|
||||
check_acl(#{username := <<$$, _/binary>>}, _PubSub, _Topic, _AclResult, _Config) ->
|
||||
ok;
|
||||
do_check_acl(ClientInfo, PubSub, Topic, _AclResult,
|
||||
#{acl_cmd := AclCmd, timeout := Timeout, type := Type, pool := Pool}) ->
|
||||
check_acl(ClientInfo, PubSub, Topic, _AclResult,
|
||||
#{acl_cmd := AclCmd, timeout := Timeout, type := Type, pool := Pool}) ->
|
||||
case emqx_auth_redis_cli:q(Pool, Type, AclCmd, ClientInfo, Timeout) of
|
||||
{ok, []} -> ok;
|
||||
{ok, Rules} ->
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_auth_redis,
|
||||
[{description, "EMQ X Authentication/ACL with Redis"},
|
||||
{vsn, "4.3.0"}, % strict semver, bump manually!
|
||||
{vsn, "4.3.1"}, % strict semver, bump manually!
|
||||
{modules, []},
|
||||
{registered, [emqx_auth_redis_sup]},
|
||||
{applications, [kernel,stdlib,eredis,eredis_cluster,ecpool]},
|
||||
|
|
|
@ -0,0 +1,11 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{VSN,
|
||||
[{"4.3.0",
|
||||
[{load_module,emqx_auth_redis_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_acl_redis,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}],
|
||||
[{"4.3.0",
|
||||
[{load_module,emqx_auth_redis_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_acl_redis,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}]
|
||||
}.
|
|
@ -59,7 +59,6 @@ load_acl_hook(AclCmd) ->
|
|||
timeout => Timeout,
|
||||
type => Type,
|
||||
pool => ?APP},
|
||||
ok = emqx_acl_redis:register_metrics(),
|
||||
emqx:hook('client.check_acl', fun emqx_acl_redis:check_acl/5, [Config]).
|
||||
|
||||
if_cmd_enabled(Par, Fun) ->
|
||||
|
|
|
@ -30,6 +30,10 @@
|
|||
|
||||
-export([do_query/5]).
|
||||
|
||||
-export([ page/1
|
||||
, limit/1
|
||||
]).
|
||||
|
||||
paginate(Tables, Params, RowFun) ->
|
||||
Qh = query_handle(Tables),
|
||||
Count = count(Tables),
|
||||
|
|
|
@ -64,8 +64,10 @@ list(Bindings, Params) when map_size(Bindings) == 0 ->
|
|||
case proplists:get_value(<<"topic">>, Params) of
|
||||
undefined ->
|
||||
minirest:return({ok, emqx_mgmt_api:cluster_query(Params, ?SUBS_QS_SCHEMA, ?query_fun)});
|
||||
Topic ->
|
||||
minirest:return({ok, emqx_mgmt:list_subscriptions_via_topic(emqx_mgmt_util:urldecode(Topic), ?format_fun)})
|
||||
Topic0 ->
|
||||
Topic = emqx_mgmt_util:urldecode(Topic0),
|
||||
Data = emqx_mgmt:list_subscriptions_via_topic(Topic, ?format_fun),
|
||||
minirest:return({ok, add_meta(Params, Data)})
|
||||
end;
|
||||
|
||||
list(#{node := Node} = Bindings, Params) ->
|
||||
|
@ -80,10 +82,27 @@ list(#{node := Node} = Bindings, Params) ->
|
|||
Res -> Res
|
||||
end
|
||||
end;
|
||||
Topic ->
|
||||
minirest:return({ok, emqx_mgmt:list_subscriptions_via_topic(Node, emqx_mgmt_util:urldecode(Topic), ?format_fun)})
|
||||
Topic0 ->
|
||||
Topic = emqx_mgmt_util:urldecode(Topic0),
|
||||
Data = emqx_mgmt:list_subscriptions_via_topic(Node, Topic, ?format_fun),
|
||||
minirest:return({ok, add_meta(Params, Data)})
|
||||
end.
|
||||
|
||||
add_meta(Params, List) ->
|
||||
Page = emqx_mgmt_api:page(Params),
|
||||
Limit = emqx_mgmt_api:limit(Params),
|
||||
Count = erlang:length(List),
|
||||
Start = (Page - 1) * Limit + 1,
|
||||
Data = lists:sublist(List, Start, Limit),
|
||||
#{meta => #{
|
||||
page => Page,
|
||||
limit => Limit,
|
||||
hasnext => Start + Limit - 1 < Count,
|
||||
count => Count},
|
||||
data => Data,
|
||||
code => 0
|
||||
}.
|
||||
|
||||
lookup(#{node := Node, clientid := ClientId}, _Params) ->
|
||||
minirest:return({ok, emqx_mgmt:lookup_subscriptions(Node, emqx_mgmt_util:urldecode(ClientId), ?format_fun)});
|
||||
|
||||
|
|
|
@ -604,6 +604,8 @@ t_routes_and_subscriptions(_) ->
|
|||
[Subscription] = get(<<"data">>, Result3),
|
||||
?assertEqual(Topic, maps:get(<<"topic">>, Subscription)),
|
||||
?assertEqual(ClientId, maps:get(<<"clientid">>, Subscription)),
|
||||
?assertMatch(#{<<"page">> := 1, <<"limit">> := 10000, <<"hasnext">> := false, <<"count">> := 1},
|
||||
get(<<"meta">>, Result3)),
|
||||
|
||||
{ok, Result3} = request_api(get,
|
||||
api_path(["nodes", atom_to_list(node()), "subscriptions"]), auth_header_()),
|
||||
|
@ -617,6 +619,61 @@ t_routes_and_subscriptions(_) ->
|
|||
|
||||
ok = emqtt:disconnect(C1).
|
||||
|
||||
t_subscription_topic(_Config) ->
|
||||
ClientId = <<"myclient">>,
|
||||
Topic = <<"topic">>,
|
||||
Query = "topic=" ++ binary_to_list(Topic),
|
||||
{ok, NonSubscription} = request_api(get, api_path(["subscriptions"]), Query, auth_header_()),
|
||||
?assertEqual([], get(<<"data">>, NonSubscription)),
|
||||
?assertMatch(#{<<"page">> := 1, <<"limit">> := 10000, <<"hasnext">> := false, <<"count">> := 0},
|
||||
get(<<"meta">>, NonSubscription)),
|
||||
{ok, NonSubscription1} = request_api(get, api_path(["nodes", atom_to_list(node()), "subscriptions"]),
|
||||
Query, auth_header_()),
|
||||
?assertEqual([], get(<<"data">>, NonSubscription1)),
|
||||
?assertMatch(#{<<"page">> := 1, <<"limit">> := 10000, <<"hasnext">> := false, <<"count">> := 0},
|
||||
get(<<"meta">>, NonSubscription)),
|
||||
|
||||
Conn =
|
||||
[begin
|
||||
{ok, C1} = emqtt:start_link(#{clean_start => true, proto_ver => ?MQTT_PROTO_V5,
|
||||
clientid => <<ClientId/binary, (integer_to_binary(I))/binary>>}),
|
||||
{ok, _} = emqtt:connect(C1),
|
||||
{ok, _, [2]} = emqtt:subscribe(C1, Topic, qos2),
|
||||
C1
|
||||
end|| I <- lists:seq(1,10)],
|
||||
|
||||
{ok, Result3} = request_api(get, api_path(["subscriptions"]), Query, auth_header_()),
|
||||
[Subscription | Subscriptions] = get(<<"data">>, Result3),
|
||||
?assertEqual(Topic, maps:get(<<"topic">>, Subscription)),
|
||||
?assertEqual(9, erlang:length(Subscriptions)),
|
||||
?assertMatch(#{<<"page">> := 1, <<"limit">> := 10000, <<"hasnext">> := false, <<"count">> := 10},
|
||||
get(<<"meta">>, Result3)),
|
||||
|
||||
{ok, Result3} = request_api(get, api_path(["nodes", atom_to_list(node()), "subscriptions"]), Query, auth_header_()),
|
||||
|
||||
?assertMatch(#{<<"page">> := 1, <<"limit">> := 10000, <<"hasnext">> := false, <<"count">> := 10},
|
||||
get(<<"meta">>, Result3)),
|
||||
|
||||
Query1 = Query ++ "&_page=1&_limit=5",
|
||||
{ok, Result4} = request_api(get, api_path(["subscriptions"]), Query1, auth_header_()),
|
||||
?assertMatch(#{<<"page">> := 1, <<"limit">> := 5, <<"hasnext">> := true, <<"count">> := 10},
|
||||
get(<<"meta">>, Result4)),
|
||||
?assertEqual(5, erlang:length(get(<<"data">>, Result4))),
|
||||
|
||||
Query2 = Query ++ "&_page=2&_limit=5",
|
||||
{ok, Result5} = request_api(get, api_path(["subscriptions"]), Query2, auth_header_()),
|
||||
?assertMatch(#{<<"page">> := 2, <<"limit">> := 5, <<"hasnext">> := false, <<"count">> := 10},
|
||||
get(<<"meta">>, Result5)),
|
||||
?assertEqual(5, erlang:length(get(<<"data">>, Result4))),
|
||||
|
||||
Query3 = Query ++ "&_page=3&_limit=3",
|
||||
{ok, Result6} = request_api(get, api_path(["subscriptions"]), Query3, auth_header_()),
|
||||
?assertMatch(#{<<"page">> := 3, <<"limit">> := 3, <<"hasnext">> := true, <<"count">> := 10},
|
||||
get(<<"meta">>, Result6)),
|
||||
|
||||
[ok = emqtt:disconnect(C1) ||C1 <- Conn],
|
||||
ok.
|
||||
|
||||
t_stats(_) ->
|
||||
{ok, _} = request_api(get, api_path(["stats"]), auth_header_()),
|
||||
{ok, _} = request_api(get, api_path(["nodes", atom_to_list(node()), "stats"]), auth_header_()),
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}
|
||||
]},
|
||||
{<<".*">>,[]}],
|
||||
[{"4.4.0",
|
||||
|
@ -12,6 +13,7 @@
|
|||
, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}
|
||||
]},
|
||||
{<<".*">>,[]}]
|
||||
}.
|
||||
|
|
|
@ -211,20 +211,32 @@ test_rule_sql(Params) ->
|
|||
end.
|
||||
|
||||
do_create_rule(Params) ->
|
||||
case emqx_rule_engine:create_rule(parse_rule_params(Params)) of
|
||||
{ok, Rule} -> return({ok, record_to_map(Rule)});
|
||||
{error, {action_not_found, ActionName}} ->
|
||||
return({error, 400, ?ERR_NO_ACTION(ActionName)});
|
||||
case parse_rule_params(Params) of
|
||||
{ok, ParsedParams} ->
|
||||
case emqx_rule_engine:create_rule(ParsedParams) of
|
||||
{ok, Rule} -> return({ok, record_to_map(Rule)});
|
||||
{error, {action_not_found, ActionName}} ->
|
||||
return({error, 400, ?ERR_NO_ACTION(ActionName)});
|
||||
{error, Reason} ->
|
||||
?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
|
||||
return({error, 400, ?ERR_BADARGS(Reason)})
|
||||
end;
|
||||
{error, Reason} ->
|
||||
?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
|
||||
return({error, 400, ?ERR_BADARGS(Reason)})
|
||||
end.
|
||||
|
||||
update_rule(#{id := Id}, Params) ->
|
||||
case emqx_rule_engine:update_rule(parse_rule_params(Params, #{id => Id})) of
|
||||
{ok, Rule} -> return({ok, record_to_map(Rule)});
|
||||
{error, {not_found, RuleId}} ->
|
||||
return({error, 400, ?ERR_NO_RULE(RuleId)});
|
||||
case parse_rule_params(Params, #{id => Id}) of
|
||||
{ok, ParsedParams} ->
|
||||
case emqx_rule_engine:update_rule(ParsedParams) of
|
||||
{ok, Rule} -> return({ok, record_to_map(Rule)});
|
||||
{error, {not_found, RuleId}} ->
|
||||
return({error, 400, ?ERR_NO_RULE(RuleId)});
|
||||
{error, Reason} ->
|
||||
?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
|
||||
return({error, 400, ?ERR_BADARGS(Reason)})
|
||||
end;
|
||||
{error, Reason} ->
|
||||
?LOG(error, "~p failed: ~0p", [?FUNCTION_NAME, Reason]),
|
||||
return({error, 400, ?ERR_BADARGS(Reason)})
|
||||
|
@ -481,7 +493,9 @@ printable_actions(Actions) ->
|
|||
parse_rule_params(Params) ->
|
||||
parse_rule_params(Params, #{description => <<"">>}).
|
||||
parse_rule_params([], Rule) ->
|
||||
Rule;
|
||||
{ok, Rule};
|
||||
parse_rule_params([{<<"id">>, <<>>} | _], _) ->
|
||||
{error, {empty_string_not_allowed, id}};
|
||||
parse_rule_params([{<<"id">>, Id} | Params], Rule) ->
|
||||
parse_rule_params(Params, Rule#{id => Id});
|
||||
parse_rule_params([{<<"rawsql">>, RawSQL} | Params], Rule) ->
|
||||
|
@ -516,6 +530,8 @@ parse_resource_params(Params) ->
|
|||
parse_resource_params(Params, #{config => #{}, description => <<"">>}).
|
||||
parse_resource_params([], Res) ->
|
||||
{ok, Res};
|
||||
parse_resource_params([{<<"id">>, <<>>} | _], _Res) ->
|
||||
{error, {empty_string_not_allowed, id}};
|
||||
parse_resource_params([{<<"id">>, Id} | Params], Res) ->
|
||||
parse_resource_params(Params, Res#{id => Id});
|
||||
parse_resource_params([{<<"type">>, ResourceType} | Params], Res) ->
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_web_hook,
|
||||
[{description, "EMQ X WebHook Plugin"},
|
||||
{vsn, "4.3.8"}, % strict semver, bump manually!
|
||||
{vsn, "4.3.9"}, % strict semver, bump manually!
|
||||
{modules, []},
|
||||
{registered, [emqx_web_hook_sup]},
|
||||
{applications, [kernel,stdlib,ehttpc]},
|
||||
|
|
|
@ -5,9 +5,10 @@
|
|||
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
|
||||
{<<"4\\.3\\.[3-7]">>,
|
||||
{<<"4\\.3\\.[3-8]">>,
|
||||
[{apply,{application,stop,[emqx_web_hook]}},
|
||||
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}],
|
||||
[{<<"4\\.3\\.[0-2]">>,
|
||||
|
@ -15,8 +16,9 @@
|
|||
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
|
||||
{<<"4\\.3\\.[3-7]">>,
|
||||
{<<"4\\.3\\.[3-8]">>,
|
||||
[{apply,{application,stop,[emqx_web_hook]}},
|
||||
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}]}.
|
||||
|
|
|
@ -93,6 +93,7 @@ on_client_connect(ConnInfo = #{clientid := ClientId, username := Username, peern
|
|||
, ipaddress => iolist_to_binary(ntoa(Peerhost))
|
||||
, keepalive => maps:get(keepalive, ConnInfo)
|
||||
, proto_ver => maps:get(proto_ver, ConnInfo)
|
||||
, connected_at => maps:get(connected_at, ConnInfo)
|
||||
},
|
||||
send_http_request(ClientId, Params).
|
||||
|
||||
|
@ -109,6 +110,7 @@ on_client_connack(ConnInfo = #{clientid := ClientId, username := Username, peern
|
|||
, ipaddress => iolist_to_binary(ntoa(Peerhost))
|
||||
, keepalive => maps:get(keepalive, ConnInfo)
|
||||
, proto_ver => maps:get(proto_ver, ConnInfo)
|
||||
, connected_at => maps:get(connected_at, ConnInfo)
|
||||
, conn_ack => Rc
|
||||
},
|
||||
send_http_request(ClientId, Params).
|
||||
|
@ -143,6 +145,7 @@ on_client_disconnected(#{clientid := ClientId, username := Username}, Reason, Co
|
|||
, clientid => ClientId
|
||||
, username => maybe(Username)
|
||||
, reason => stringfy(maybe(Reason))
|
||||
, connected_at => maps:get(connected_at, ConnInfo)
|
||||
, disconnected_at => maps:get(disconnected_at, ConnInfo, erlang:system_time(millisecond))
|
||||
},
|
||||
send_http_request(ClientId, Params).
|
||||
|
|
|
@ -52,7 +52,8 @@ prop_client_connect() ->
|
|||
username => maybe(maps:get(username, ConnInfo)),
|
||||
ipaddress => peer2addr(maps:get(peername, ConnInfo)),
|
||||
keepalive => maps:get(keepalive, ConnInfo),
|
||||
proto_ver => maps:get(proto_ver, ConnInfo)
|
||||
proto_ver => maps:get(proto_ver, ConnInfo),
|
||||
connected_at => maps:get(connected_at, ConnInfo)
|
||||
}),
|
||||
true
|
||||
end).
|
||||
|
@ -71,6 +72,7 @@ prop_client_connack() ->
|
|||
ipaddress => peer2addr(maps:get(peername, ConnInfo)),
|
||||
keepalive => maps:get(keepalive, ConnInfo),
|
||||
proto_ver => maps:get(proto_ver, ConnInfo),
|
||||
connected_at => maps:get(connected_at, ConnInfo),
|
||||
conn_ack => Rc
|
||||
}),
|
||||
true
|
||||
|
@ -106,6 +108,7 @@ prop_client_disconnected() ->
|
|||
node => stringfy(node()),
|
||||
clientid => maps:get(clientid, ClientInfo),
|
||||
username => maybe(maps:get(username, ClientInfo)),
|
||||
connected_at => maps:get(connected_at, ConnInfo),
|
||||
disconnected_at => maps:get(disconnected_at, ConnInfo),
|
||||
reason => stringfy(Reason)
|
||||
}),
|
||||
|
|
|
@ -57,8 +57,7 @@ on_client_connected(ClientInfo = #{clientid := ClientId}, ConnInfo, Env) ->
|
|||
connack => 0, %% XXX: connack will be removed in 5.0
|
||||
keepalive => maps:get(keepalive, ConnInfo, 0),
|
||||
clean_start => maps:get(clean_start, ConnInfo, true),
|
||||
expiry_interval => maps:get(expiry_interval, ConnInfo, 0),
|
||||
connected_at => maps:get(connected_at, ConnInfo)
|
||||
expiry_interval => maps:get(expiry_interval, ConnInfo, 0)
|
||||
},
|
||||
case emqx_json:safe_encode(NPresence) of
|
||||
{ok, Payload} ->
|
||||
|
@ -95,7 +94,8 @@ common_infos(
|
|||
sockport := SockPort
|
||||
},
|
||||
_ConnInfo = #{proto_name := ProtoName,
|
||||
proto_ver := ProtoVer
|
||||
proto_ver := ProtoVer,
|
||||
connected_at := ConnectedAt
|
||||
}) ->
|
||||
#{clientid => ClientId,
|
||||
username => Username,
|
||||
|
@ -103,6 +103,7 @@ common_infos(
|
|||
sockport => SockPort,
|
||||
proto_name => ProtoName,
|
||||
proto_ver => ProtoVer,
|
||||
connected_at => ConnectedAt,
|
||||
ts => erlang:system_time(millisecond)
|
||||
}.
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_modules,
|
||||
[{description, "EMQ X Module Management"},
|
||||
{vsn, "4.4.0"},
|
||||
{vsn, "4.4.1"},
|
||||
{modules, []},
|
||||
{applications, [kernel,stdlib]},
|
||||
{mod, {emqx_modules_app, []}},
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
%% -*-: erlang -*-
|
||||
{VSN,
|
||||
[
|
||||
{<<".*">>, []}
|
||||
[{"4.4.0",
|
||||
[{load_module, emqx_mod_presence, brutal_purge, soft_purge, []}]},
|
||||
{<<".*">>, []}
|
||||
],
|
||||
[
|
||||
{<<".*">>, []}
|
||||
[{"4.4.0",
|
||||
[{load_module, emqx_mod_presence, brutal_purge, soft_purge, []}]},
|
||||
{<<".*">>, []}
|
||||
]
|
||||
}.
|
||||
}.
|
||||
|
|
|
@ -38,10 +38,12 @@ t_load_unload(_) ->
|
|||
?assertEqual({error,already_exists}, emqx_mod_acl_internal:load([])).
|
||||
|
||||
t_check_acl(_) ->
|
||||
emqx_mod_acl_internal:load([]),
|
||||
Rules=#{publish => [{allow,all}], subscribe => [{deny, all}]},
|
||||
?assertEqual({ok, allow}, emqx_mod_acl_internal:check_acl(clientinfo(), publish, <<"t">>, [], Rules)),
|
||||
?assertEqual({ok, deny}, emqx_mod_acl_internal:check_acl(clientinfo(), subscribe, <<"t">>, [], Rules)),
|
||||
?assertEqual(ok, emqx_mod_acl_internal:check_acl(clientinfo(), connect, <<"t">>, [], Rules)).
|
||||
?assertEqual(ok, emqx_mod_acl_internal:check_acl(clientinfo(), connect, <<"t">>, [], Rules)),
|
||||
emqx_mod_acl_internal:unload([]).
|
||||
|
||||
t_reload_acl(_) ->
|
||||
?assertEqual(ok, emqx_mod_acl_internal:reload([])).
|
||||
|
|
|
@ -1,7 +1,10 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{VSN,
|
||||
[{"4.4.0",
|
||||
[ {load_module,emqx_channel,brutal_purge,soft_purge,[]}
|
||||
[ {load_module,emqx_metrics,brutal_purge,soft_purge,[]}
|
||||
, {apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}}
|
||||
, {load_module,emqx_access_control,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_channel,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_connection,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_session,brutal_purge,soft_purge,[]}
|
||||
|
@ -13,7 +16,9 @@
|
|||
]},
|
||||
{<<".*">>,[]}],
|
||||
[{"4.4.0",
|
||||
[ {load_module,emqx_channel,brutal_purge,soft_purge,[]}
|
||||
[ {load_module,emqx_metrics,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_access_control,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_channel,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_connection,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_metrics,brutal_purge,soft_purge,[]}
|
||||
, {load_module,emqx_session,brutal_purge,soft_purge,[]}
|
||||
|
|
|
@ -45,10 +45,11 @@ authenticate(ClientInfo = #{zone := Zone}) ->
|
|||
-spec(check_acl(emqx_types:clientinfo(), emqx_types:pubsub(), emqx_types:topic())
|
||||
-> allow | deny).
|
||||
check_acl(ClientInfo, PubSub, Topic) ->
|
||||
case emqx_acl_cache:is_enabled() of
|
||||
Result = case emqx_acl_cache:is_enabled() of
|
||||
true -> check_acl_cache(ClientInfo, PubSub, Topic);
|
||||
false -> do_check_acl(ClientInfo, PubSub, Topic)
|
||||
end.
|
||||
end,
|
||||
inc_acl_metrics(Result), Result.
|
||||
|
||||
check_acl_cache(ClientInfo, PubSub, Topic) ->
|
||||
case emqx_acl_cache:get_acl_cache(PubSub, Topic) of
|
||||
|
@ -56,7 +57,9 @@ check_acl_cache(ClientInfo, PubSub, Topic) ->
|
|||
AclResult = do_check_acl(ClientInfo, PubSub, Topic),
|
||||
emqx_acl_cache:put_acl_cache(PubSub, Topic, AclResult),
|
||||
AclResult;
|
||||
AclResult -> AclResult
|
||||
AclResult ->
|
||||
inc_acl_metrics(cache_hit),
|
||||
AclResult
|
||||
end.
|
||||
|
||||
do_check_acl(ClientInfo = #{zone := Zone}, PubSub, Topic) ->
|
||||
|
@ -76,6 +79,14 @@ default_auth_result(Zone) ->
|
|||
run_hooks(Name, Args, Acc) ->
|
||||
ok = emqx_metrics:inc(Name), emqx_hooks:run_fold(Name, Args, Acc).
|
||||
|
||||
-compile({inline, [inc_acl_metrics/1]}).
|
||||
inc_acl_metrics(allow) ->
|
||||
emqx_metrics:inc('client.acl.allow');
|
||||
inc_acl_metrics(deny) ->
|
||||
emqx_metrics:inc('client.acl.deny');
|
||||
inc_acl_metrics(cache_hit) ->
|
||||
emqx_metrics:inc('client.acl.cache_hit').
|
||||
|
||||
-compile({inline, [return_auth_result/1]}).
|
||||
return_auth_result(Result = #{auth_result := success}) ->
|
||||
{ok, Result};
|
||||
|
|
|
@ -65,8 +65,11 @@
|
|||
, code_change/3
|
||||
]).
|
||||
|
||||
%% BACKW: v4.3.0
|
||||
-export([ upgrade_retained_delayed_counter_type/0
|
||||
%% BACKW
|
||||
-export([%% v4.3.0
|
||||
upgrade_retained_delayed_counter_type/0,
|
||||
%% e4.4.0, e4.3.0-e4.3.6, v4.3.0-v4.3.11
|
||||
assign_acl_stats_from_ets_to_counter/0
|
||||
]).
|
||||
|
||||
-export_type([metric_idx/0]).
|
||||
|
@ -186,6 +189,12 @@
|
|||
{counter, 'session.discarded'},
|
||||
{counter, 'session.terminated'}
|
||||
]).
|
||||
%% Statistic metrics for ACL checking
|
||||
-define(STASTS_ACL_METRICS,
|
||||
[ {counter, 'client.acl.allow'},
|
||||
{counter, 'client.acl.deny'},
|
||||
{counter, 'client.acl.cache_hit'}
|
||||
]).
|
||||
|
||||
-record(state, {next_idx = 1}).
|
||||
|
||||
|
@ -204,6 +213,21 @@ upgrade_retained_delayed_counter_type() ->
|
|||
Ks = ['messages.retained', 'messages.delayed'],
|
||||
gen_server:call(?SERVER, {set_type_to_counter, Ks}, infinity).
|
||||
|
||||
%% BACKW: %% e4.4.0, e4.3.0-e4.3.6, v4.3.0-v4.3.11
|
||||
assign_acl_stats_from_ets_to_counter() ->
|
||||
CRef = persistent_term:get(?MODULE),
|
||||
Names = ['client.acl.allow', 'client.acl.deny', 'client.acl.cache_hit'],
|
||||
lists:foreach(fun(Name) ->
|
||||
Val = case emqx_metrics:val(Name) of
|
||||
undefined -> 0;
|
||||
Val0 -> Val0
|
||||
end,
|
||||
Idx = reserved_idx(Name),
|
||||
Metric = #metric{name = Name, type = counter, idx = Idx},
|
||||
ok = gen_server:call(?SERVER, {set, Metric}),
|
||||
ok = counters:put(CRef, Idx, Val)
|
||||
end, Names).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Metrics API
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -433,7 +457,8 @@ init([]) ->
|
|||
?MESSAGE_METRICS,
|
||||
?DELIVERY_METRICS,
|
||||
?CLIENT_METRICS,
|
||||
?SESSION_METRICS
|
||||
?SESSION_METRICS,
|
||||
?STASTS_ACL_METRICS
|
||||
]),
|
||||
% Store reserved indices
|
||||
ok = lists:foreach(fun({Type, Name}) ->
|
||||
|
@ -466,6 +491,10 @@ handle_call({set_type_to_counter, Keys}, _From, State) ->
|
|||
end, Keys),
|
||||
{reply, ok, State};
|
||||
|
||||
handle_call({set, Metric}, _From, State) ->
|
||||
true = ets:insert(?TAB, Metric),
|
||||
{reply, ok, State};
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
?LOG(error, "Unexpected call: ~p", [Req]),
|
||||
{reply, ignored, State}.
|
||||
|
@ -574,6 +603,10 @@ reserved_idx('session.resumed') -> 221;
|
|||
reserved_idx('session.takeovered') -> 222;
|
||||
reserved_idx('session.discarded') -> 223;
|
||||
reserved_idx('session.terminated') -> 224;
|
||||
%% Stats metrics
|
||||
reserved_idx('client.acl.allow') -> 300;
|
||||
reserved_idx('client.acl.deny') -> 301;
|
||||
reserved_idx('client.acl.cache_hit') -> 302;
|
||||
|
||||
reserved_idx(_) -> undefined.
|
||||
|
||||
|
|
Loading…
Reference in New Issue