Mgmt emqx modules (#3376)

This commit is contained in:
turtleDeng 2020-04-10 19:55:21 +08:00 committed by GitHub
parent d4932533ca
commit d49f4118fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 235 additions and 511 deletions

View File

@ -1818,15 +1818,14 @@ listener.wss.external.send_timeout_close = on
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## Modules ## Modules
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## The file to store loaded module names.
##
## Value: File
modules.loaded_file = {{ platform_data_dir }}/loaded_modules
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## Presence Module ## Presence Module
## Enable Presence Module.
##
## Value: on | off
module.presence = on
## Sets the QoS for presence MQTT message. ## Sets the QoS for presence MQTT message.
## ##
## Value: 0 | 1 | 2 ## Value: 0 | 1 | 2
@ -1835,11 +1834,6 @@ module.presence.qos = 1
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## Subscription Module ## Subscription Module
## Enable Subscription Module.
##
## Value: on | off
module.subscription = off
## Subscribe the Topics automatically when client connected. ## Subscribe the Topics automatically when client connected.
## ##
## Value: String ## Value: String
@ -1875,31 +1869,10 @@ module.subscription = off
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## Rewrite Module ## Rewrite Module
## Enable Rewrite Module.
##
## Value: on | off
module.rewrite = off
## {rewrite, Topic, Re, Dest} ## {rewrite, Topic, Re, Dest}
## module.rewrite.rule.1 = x/# ^x/y/(.+)$ z/y/$1 ## module.rewrite.rule.1 = x/# ^x/y/(.+)$ z/y/$1
## module.rewrite.rule.2 = y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2 ## module.rewrite.rule.2 = y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2
##--------------------------------------------------------------------
## Topic Metrics Module
## Enable Topic Metrics Module.
##
## Value: on | off
module.topic_metrics = off
##--------------------------------------------------------------------
## Delayed Module
## Enable Delayed Module.
##
## Value: on | off
module.delayed = off
##------------------------------------------------------------------- ##-------------------------------------------------------------------
## Plugins ## Plugins
##------------------------------------------------------------------- ##-------------------------------------------------------------------

View File

@ -1766,9 +1766,8 @@ end}.
%% Modules %% Modules
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
{mapping, "module.presence", "emqx.modules", [ {mapping, "modules.loaded_file", "emqx.modules_loaded_file", [
{default, off}, {datatype, string}
{datatype, flag}
]}. ]}.
{mapping, "module.presence.qos", "emqx.modules", [ {mapping, "module.presence.qos", "emqx.modules", [
@ -1777,11 +1776,6 @@ end}.
{validators, ["range:0-2"]} {validators, ["range:0-2"]}
]}. ]}.
{mapping, "module.subscription", "emqx.modules", [
{default, off},
{datatype, flag}
]}.
{mapping, "module.subscription.$id.topic", "emqx.modules", [ {mapping, "module.subscription.$id.topic", "emqx.modules", [
{datatype, string} {datatype, string}
]}. ]}.
@ -1810,25 +1804,10 @@ end}.
{validators, ["range:0-2"]} {validators, ["range:0-2"]}
]}. ]}.
{mapping, "module.rewrite", "emqx.modules", [
{default, off},
{datatype, flag}
]}.
{mapping, "module.rewrite.rule.$id", "emqx.modules", [ {mapping, "module.rewrite.rule.$id", "emqx.modules", [
{datatype, string} {datatype, string}
]}. ]}.
{mapping, "module.topic_metrics", "emqx.modules", [
{default, off},
{datatype, flag}
]}.
{mapping, "module.delayed", "emqx.modules", [
{default, off},
{datatype, flag}
]}.
{translation, "emqx.modules", fun(Conf) -> {translation, "emqx.modules", fun(Conf) ->
Subscriptions = fun() -> Subscriptions = fun() ->
List = cuttlefish_variable:filter_by_prefix("module.subscription", Conf), List = cuttlefish_variable:filter_by_prefix("module.subscription", Conf),
@ -1847,26 +1826,12 @@ end}.
end, Rules) end, Rules)
end, end,
lists:append([ lists:append([
case cuttlefish:conf_get("module.presence", Conf) of %% Presence [{emqx_mod_presence, [{qos, cuttlefish:conf_get("module.presence.qos", Conf, 1)}]}],
true -> [{emqx_mod_presence, [{qos, cuttlefish:conf_get("module.presence.qos", Conf, 1)}]}]; [{emqx_mod_subscription, Subscriptions()}],
false -> [] [{emqx_mod_rewrite, Rewrites()}],
end, [{emqx_mod_topic_metrics, []}],
case cuttlefish:conf_get("module.subscription", Conf) of %% Subscription [{emqx_mod_delayed, []}],
true -> [{emqx_mod_subscription, Subscriptions()}]; [{emqx_mod_acl_internal, []}]
false -> []
end,
case cuttlefish:conf_get("module.rewrite", Conf) of %% Rewrite
true -> [{emqx_mod_rewrite, Rewrites()}];
false -> []
end,
case cuttlefish:conf_get("module.topic_metrics", Conf) of %% Topic Metrics
true -> [{emqx_mod_topic_metrics, []}];
false -> []
end,
case cuttlefish:conf_get("module.delayed", Conf) of %% Delayed
true -> [{emqx_mod_delayed, []}];
false -> []
end
]) ])
end}. end}.

View File

@ -21,7 +21,6 @@
-export([authenticate/1]). -export([authenticate/1]).
-export([ check_acl/3 -export([ check_acl/3
, reload_acl/0
]). ]).
-type(result() :: #{auth_result := emqx_types:auth_result(), -type(result() :: #{auth_result := emqx_types:auth_result(),
@ -67,11 +66,6 @@ do_check_acl(ClientInfo = #{zone := Zone}, PubSub, Topic) ->
_Other -> deny _Other -> deny
end. end.
-spec(reload_acl() -> ok | {error, term()}).
reload_acl() ->
emqx_acl_cache:is_enabled() andalso emqx_acl_cache:empty_acl_cache(),
emqx_mod_acl_internal:reload_acl().
default_auth_result(Zone) -> default_auth_result(Zone) ->
case emqx_zone:get_env(Zone, allow_anonymous, false) of case emqx_zone:get_env(Zone, allow_anonymous, false) of
true -> #{auth_result => success, anonymous => true}; true -> #{auth_result => success, anonymous => true};

View File

@ -22,6 +22,8 @@
-callback(unload(State :: term()) -> term()). -callback(unload(State :: term()) -> term()).
-callback(description() -> any()).
-else. -else.
-export([behaviour_info/1]). -export([behaviour_info/1]).

View File

@ -24,14 +24,13 @@
-logger_header("[ACL_INTERNAL]"). -logger_header("[ACL_INTERNAL]").
%% APIs %% APIs
-export([ all_rules/0 -export([check_acl/5]).
, check_acl/5
, reload_acl/0
]).
%% emqx_gen_mod callbacks %% emqx_gen_mod callbacks
-export([ load/1 -export([ load/1
, unload/1 , unload/1
, reload/1
, description/0
]). ]).
-define(MFA(M, F, A), {M, F, A}). -define(MFA(M, F, A), {M, F, A}).
@ -44,18 +43,19 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
load(_Env) -> load(_Env) ->
Rules = rules_from_file(acl_file()), Rules = rules_from_file(emqx:get_env(acl_file)),
emqx_hooks:add('client.check_acl', ?MFA(?MODULE, check_acl, [Rules]), -1). emqx_hooks:add('client.check_acl', ?MFA(?MODULE, check_acl, [Rules]), -1).
unload(_Env) -> unload(_Env) ->
Rules = rules_from_file(acl_file()), Rules = rules_from_file(emqx:get_env(acl_file)),
emqx_hooks:del('client.check_acl', ?MFA(?MODULE, check_acl, [Rules])). emqx_hooks:del('client.check_acl', ?MFA(?MODULE, check_acl, [Rules])).
%% @doc Read all rules reload(_Env) ->
-spec(all_rules() -> list(emqx_access_rule:rule())). emqx_acl_cache:is_enabled() andalso emqx_acl_cache:empty_acl_cache(),
all_rules() -> unload([]), load([]).
rules_from_file(acl_file()).
description() ->
"EMQ X Internal ACL Module".
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% ACL callbacks %% ACL callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -71,16 +71,9 @@ check_acl(Client, PubSub, Topic, _AclResult, Rules) ->
nomatch -> ok nomatch -> ok
end. end.
-spec(reload_acl() -> ok | {error, term()}).
reload_acl() ->
unload([]), load([]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal Functions %% Internal Functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
acl_file() -> emqx:get_env(acl_file).
lookup(PubSub, Rules) -> lookup(PubSub, Rules) ->
maps:get(PubSub, Rules, []). maps:get(PubSub, Rules, []).

View File

@ -17,6 +17,7 @@
-module(emqx_mod_delayed). -module(emqx_mod_delayed).
-behaviour(gen_server). -behaviour(gen_server).
-behaviour(emqx_gen_mod).
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
@ -24,6 +25,7 @@
%% emqx_gen_mod callbacks %% emqx_gen_mod callbacks
-export([ load/1 -export([ load/1
, unload/1 , unload/1
, description/0
]). ]).
-export([ start_link/0 -export([ start_link/0
@ -62,6 +64,8 @@ unload(_Env) ->
emqx:unhook('message.publish', {?MODULE, on_message_publish, []}), emqx:unhook('message.publish', {?MODULE, on_message_publish, []}),
emqx_mod_sup:stop_child(?MODULE). emqx_mod_sup:stop_child(?MODULE).
description() ->
"EMQ X Delayed Publish Module".
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Hooks %% Hooks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -26,6 +26,7 @@
%% emqx_gen_mod callbacks %% emqx_gen_mod callbacks
-export([ load/1 -export([ load/1
, unload/1 , unload/1
, description/0
]). ]).
-export([ on_client_connected/3 -export([ on_client_connected/3
@ -44,6 +45,8 @@ unload(_Env) ->
emqx_hooks:del('client.connected', {?MODULE, on_client_connected}), emqx_hooks:del('client.connected', {?MODULE, on_client_connected}),
emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected}). emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected}).
description() ->
"EMQ X Presence Module".
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Callbacks %% Callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -16,7 +16,7 @@
-module(emqx_mod_rewrite). -module(emqx_mod_rewrite).
-behavior(emqx_gen_mod). -behaviour(emqx_gen_mod).
-include_lib("emqx.hrl"). -include_lib("emqx.hrl").
-include_lib("emqx_mqtt.hrl"). -include_lib("emqx_mqtt.hrl").
@ -36,6 +36,7 @@
%% emqx_gen_mod callbacks %% emqx_gen_mod callbacks
-export([ load/1 -export([ load/1
, unload/1 , unload/1
, description/0
]). ]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -62,6 +63,8 @@ unload(_) ->
emqx_hooks:del('client.unsubscribe', {?MODULE, rewrite_unsubscribe}), emqx_hooks:del('client.unsubscribe', {?MODULE, rewrite_unsubscribe}),
emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}). emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}).
description() ->
"EMQ X Topic Rewrite Module".
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -24,6 +24,7 @@
%% emqx_gen_mod callbacks %% emqx_gen_mod callbacks
-export([ load/1 -export([ load/1
, unload/1 , unload/1
, description/0
]). ]).
%% APIs %% APIs
@ -49,6 +50,8 @@ on_client_connected(#{clientid := ClientId, username := Username}, _ConnInfo = #
unload(_) -> unload(_) ->
emqx_hooks:del('client.connected', {?MODULE, on_client_connected}). emqx_hooks:del('client.connected', {?MODULE, on_client_connected}).
description() ->
"EMQ X Subscription Module".
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -58,5 +58,6 @@ stop_child(ChildId) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([]) -> init([]) ->
ok = emqx_tables:new(emqx_modules, [set, public, {write_concurrency, true}]),
{ok, {{one_for_one, 10, 100}, []}}. {ok, {{one_for_one, 10, 100}, []}}.

View File

@ -17,6 +17,7 @@
-module(emqx_mod_topic_metrics). -module(emqx_mod_topic_metrics).
-behaviour(gen_server). -behaviour(gen_server).
-behaviour(emqx_gen_mod).
-include("emqx.hrl"). -include("emqx.hrl").
-include("logger.hrl"). -include("logger.hrl").
@ -26,6 +27,7 @@
-export([ load/1 -export([ load/1
, unload/1 , unload/1
, description/0
]). ]).
-export([ on_message_publish/1 -export([ on_message_publish/1
@ -104,6 +106,9 @@ unload(_Env) ->
emqx:unhook('message.delivered', fun ?MODULE:on_message_delivered/2), emqx:unhook('message.delivered', fun ?MODULE:on_message_delivered/2),
emqx_mod_sup:stop_child(?MODULE). emqx_mod_sup:stop_child(?MODULE).
description() ->
"EMQ X Topic Metrics Module".
on_message_publish(#message{topic = Topic, qos = QoS}) -> on_message_publish(#message{topic = Topic, qos = QoS}) ->
case is_registered(Topic) of case is_registered(Topic) of
true -> true ->

View File

@ -20,28 +20,149 @@
-logger_header("[Modules]"). -logger_header("[Modules]").
-export([ load/0 -export([ list/0
, load/0
, load/1
, unload/0 , unload/0
, unload/1
, reload/1
, load_module/2
]). ]).
%% @doc List all available plugins
-spec(list() -> [{atom(), boolean()}]).
list() ->
ets:tab2list(?MODULE).
%% @doc Load all the extended modules. %% @doc Load all the extended modules.
-spec(load() -> ok). -spec(load() -> ok).
load() -> load() ->
ok = emqx_mod_acl_internal:load([]), case emqx:get_env(modules_loaded_file) of
lists:foreach(fun load/1, modules()). undefined -> ignore;
File ->
load_modules(File)
end.
load({Mod, Env}) -> load(ModuleName) ->
ok = Mod:load(Env), case find_module(ModuleName) of
?LOG(info, "Load ~s module successfully.", [Mod]). [] ->
?LOG(alert, "Module ~s not found, cannot load it", [ModuleName]),
modules() -> emqx:get_env(modules, []). {error, not_found};
[{ModuleName, true}] ->
?LOG(notice, "Module ~s is already started", [ModuleName]),
{error, already_started};
[{ModuleName, false}] ->
emqx_modules:load_module(ModuleName, true)
end.
%% @doc Unload all the extended modules. %% @doc Unload all the extended modules.
-spec(unload() -> ok). -spec(unload() -> ok).
unload() -> unload() ->
ok = emqx_mod_acl_internal:unload([]), case emqx:get_env(modules_loaded_file) of
lists:foreach(fun unload/1, modules()). undefined -> ignore;
File ->
unload_modules(File)
end.
unload({Mod, Env}) -> unload(ModuleName) ->
Mod:unload(Env). case find_module(ModuleName) of
[] ->
?LOG(alert, "Module ~s not found, cannot load it", [ModuleName]),
{error, not_found};
[{ModuleName, false}] ->
?LOG(error, "Module ~s is not started", [ModuleName]),
{error, not_started};
[{ModuleName, true}] ->
unload_module(ModuleName, true)
end.
reload(emqx_mod_acl_internal) ->
Modules = emqx:get_env(modules, []),
Env = proplists:get_value(emqx_mod_acl_internal, Modules, undefined),
case emqx_mod_acl_internal:reload(Env) of
ok ->
?LOG(info, "Reload ~s module successfully.", [emqx_mod_acl_internal]);
{error, Error} ->
?LOG(error, "Reload module ~s failed, cannot start for ~0p", [emqx_mod_acl_internal, Error])
end;
reload(_) ->
ignore.
find_module(ModuleName) ->
ets:lookup(?MODULE, ModuleName).
filter_module(ModuleNames) ->
filter_module(ModuleNames, emqx:get_env(modules, [])).
filter_module([], Acc) ->
Acc;
filter_module([{ModuleName, true} | ModuleNames], Acc) ->
filter_module(ModuleNames, lists:keydelete(ModuleName, 1, Acc));
filter_module([{_, false} | ModuleNames], Acc) ->
filter_module(ModuleNames, Acc).
load_modules(File) ->
case file:consult(File) of
{ok, ModuleNames} ->
lists:foreach(fun({ModuleName, _}) ->
ets:insert(?MODULE, {ModuleName, false})
end, filter_module(ModuleNames)),
lists:foreach(fun load_module/1, ModuleNames);
{error, Error} ->
?LOG(alert, "Failed to read: ~p, error: ~p", [File, Error])
end.
load_module({ModuleName, true}) ->
emqx_modules:load_module(ModuleName, false);
load_module({ModuleName, false}) ->
ets:insert(?MODULE, {ModuleName, false});
load_module(ModuleName) ->
load_module({ModuleName, true}).
load_module(ModuleName, Persistent) ->
Modules = emqx:get_env(modules, []),
Env = proplists:get_value(ModuleName, Modules, undefined),
case ModuleName:load(Env) of
ok ->
ets:insert(?MODULE, {ModuleName, true}),
write_loaded(Persistent),
?LOG(info, "Load ~s module successfully.", [ModuleName]);
{error, Error} ->
?LOG(error, "Load module ~s failed, cannot load for ~0p", [ModuleName, Error]),
{error, Error}
end.
unload_modules(File) ->
case file:consult(File) of
{ok, ModuleNames} ->
lists:foreach(fun unload_module/1, ModuleNames);
{error, Error} ->
?LOG(alert, "Failed to read: ~p, error: ~p", [File, Error])
end.
unload_module({ModuleName, true}) ->
unload_module(ModuleName, false);
unload_module({ModuleName, false}) ->
ets:insert(?MODULE, {ModuleName, false});
unload_module(ModuleName) ->
unload_module({ModuleName, true}).
unload_module(ModuleName, Persistent) ->
Modules = emqx:get_env(modules, []),
Env = proplists:get_value(ModuleName, Modules, undefined),
case ModuleName:unload(Env) of
ok ->
ets:insert(?MODULE, {ModuleName, false}),
write_loaded(Persistent),
?LOG(info, "Unload ~s module successfully.", [ModuleName]);
{error, Error} ->
?LOG(error, "Unload module ~s failed, cannot unload for ~0p", [ModuleName, Error])
end.
write_loaded(true) ->
FilePath = emqx:get_env(modules_loaded_file),
case file:write_file(FilePath, [io_lib:format("~p.~n", [Name]) || Name <- list()]) of
ok -> ok;
{error, Error} ->
?LOG(error, "Write File ~p Error: ~p", [FilePath, Error]),
{error, Error}
end;
write_loaded(false) -> ok.

View File

@ -0,0 +1,2 @@
{emqx_mod_acl_internal, true}.
{emqx_mod_presence, true}.

View File

@ -1,378 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_access_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(AC, emqx_access_control).
-define(CACHE, emqx_acl_cache).
-import(emqx_access_rule,
[ compile/1
, match/3
]).
all() ->
[{group, access_control},
{group, acl_cache},
{group, access_control_cache_mode},
{group, access_rule}
].
groups() ->
[{access_control, [sequence],
[t_reload_acl,
t_check_acl_1,
t_check_acl_2]},
{access_control_cache_mode, [sequence],
[t_acl_cache_basic,
t_acl_cache_expiry,
t_acl_cache_cleanup,
t_acl_cache_full]},
{acl_cache, [sequence],
[t_put_get_del_cache,
t_cache_update,
t_cache_expiry,
t_cache_replacement,
t_cache_cleanup,
t_cache_auto_emtpy,
t_cache_auto_cleanup]},
{access_rule, [parallel],
[t_compile_rule,
t_match_rule]
}].
init_per_suite(Config) ->
emqx_ct_helpers:boot_modules([router, broker]),
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
init_per_group(Group, Config) when Group =:= access_control;
Group =:= access_control_cache_mode ->
prepare_config(Group),
application:load(emqx),
Config;
init_per_group(_Group, Config) ->
Config.
prepare_config(Group = access_control) ->
set_acl_config_file(Group),
application:set_env(emqx, enable_acl_cache, false);
prepare_config(Group = access_control_cache_mode) ->
set_acl_config_file(Group),
application:set_env(emqx, enable_acl_cache, true),
application:set_env(emqx, acl_cache_max_size, 100).
set_acl_config_file(_Group) ->
Rules = [{allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]},
{allow, {user, "testuser"}, subscribe, ["a/b/c", "d/e/f/#"]},
{allow, {user, "admin"}, pubsub, ["a/b/c", "d/e/f/#"]},
{allow, {client, "testClient"}, subscribe, ["testTopics/testClient"]},
{allow, all, subscribe, ["clients/%c"]},
{allow, all, pubsub, ["users/%u/#"]},
{deny, all, subscribe, ["$SYS/#", "#"]},
{deny, all}],
write_config("access_SUITE_acl.conf", Rules),
application:set_env(emqx, acl_file, "access_SUITE_acl.conf").
write_config(Filename, Terms) ->
file:write_file(Filename, [io_lib:format("~tp.~n", [Term]) || Term <- Terms]).
end_per_group(_Group, Config) ->
Config.
%%--------------------------------------------------------------------
%% emqx_access_control
%%--------------------------------------------------------------------
t_reload_acl(_) ->
ok = ?AC:reload_acl().
t_check_acl_1(_) ->
Client = #{zone => external,
clientid => <<"client1">>,
username => <<"testuser">>
},
allow = ?AC:check_acl(Client, subscribe, <<"users/testuser/1">>),
allow = ?AC:check_acl(Client, subscribe, <<"clients/client1">>),
deny = ?AC:check_acl(Client, subscribe, <<"clients/client1/x/y">>),
allow = ?AC:check_acl(Client, publish, <<"users/testuser/1">>),
allow = ?AC:check_acl(Client, subscribe, <<"a/b/c">>).
t_check_acl_2(_) ->
Client = #{zone => external,
clientid => <<"client2">>,
username => <<"xyz">>
},
deny = ?AC:check_acl(Client, subscribe, <<"a/b/c">>).
t_acl_cache_basic(_) ->
Client = #{zone => external,
clientid => <<"client1">>,
username => <<"testuser">>
},
not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>),
not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
allow = ?AC:check_acl(Client, subscribe, <<"users/testuser/1">>),
allow = ?AC:check_acl(Client, subscribe, <<"clients/client1">>),
allow = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>),
allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>).
t_acl_cache_expiry(_) ->
application:set_env(emqx, acl_cache_ttl, 100),
Client = #{zone => external,
clientid => <<"client1">>,
username => <<"testuser">>
},
allow = ?AC:check_acl(Client, subscribe, <<"clients/client1">>),
allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
ct:sleep(150),
not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>).
t_acl_cache_full(_) ->
application:set_env(emqx, acl_cache_max_size, 1),
Client = #{zone => external,
clientid => <<"client1">>,
username => <<"testuser">>
},
allow = ?AC:check_acl(Client, subscribe, <<"users/testuser/1">>),
allow = ?AC:check_acl(Client, subscribe, <<"clients/client1">>),
%% the older ones (the <<"users/testuser/1">>) will be evicted first
not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>),
allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>).
t_acl_cache_cleanup(_) ->
%% The acl cache will try to evict memory, if the size is full and the newest
%% cache entry is expired
application:set_env(emqx, acl_cache_ttl, 100),
application:set_env(emqx, acl_cache_max_size, 2),
Client = #{zone => external,
clientid => <<"client1">>,
username => <<"testuser">>
},
allow = ?AC:check_acl(Client, subscribe, <<"users/testuser/1">>),
allow = ?AC:check_acl(Client, subscribe, <<"clients/client1">>),
allow = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>),
allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
ct:sleep(150),
%% now the cache is full and the newest one - "clients/client1"
%% should be expired, so we'll empty the cache before putting
%% the next cache entry
deny = ?AC:check_acl(Client, subscribe, <<"#">>),
not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>),
not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>),
deny = ?CACHE:get_acl_cache(subscribe, <<"#">>).
t_put_get_del_cache(_) ->
application:set_env(emqx, acl_cache_ttl, 300000),
application:set_env(emqx, acl_cache_max_size, 30),
not_found = ?CACHE:get_acl_cache(publish, <<"a">>),
ok = ?CACHE:put_acl_cache(publish, <<"a">>, allow),
allow = ?CACHE:get_acl_cache(publish, <<"a">>),
not_found = ?CACHE:get_acl_cache(subscribe, <<"b">>),
ok = ?CACHE:put_acl_cache(subscribe, <<"b">>, deny),
deny = ?CACHE:get_acl_cache(subscribe, <<"b">>),
2 = ?CACHE:get_cache_size(),
?assertEqual(?CACHE:cache_k(subscribe, <<"b">>), ?CACHE:get_newest_key()).
t_cache_expiry(_) ->
application:set_env(emqx, acl_cache_ttl, 100),
application:set_env(emqx, acl_cache_max_size, 30),
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow),
allow = ?CACHE:get_acl_cache(subscribe, <<"a">>),
ct:sleep(150),
not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>),
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, deny),
deny = ?CACHE:get_acl_cache(subscribe, <<"a">>),
ct:sleep(150),
not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>).
t_cache_update(_) ->
application:set_env(emqx, acl_cache_ttl, 300000),
application:set_env(emqx, acl_cache_max_size, 30),
[] = ?CACHE:dump_acl_cache(),
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow),
ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow),
ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow),
3 = ?CACHE:get_cache_size(),
?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_newest_key()),
%% update the 2nd one
ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow),
ct:pal("dump acl cache: ~p~n", [?CACHE:dump_acl_cache()]),
3 = ?CACHE:get_cache_size(),
?assertEqual(?CACHE:cache_k(publish, <<"b">>), ?CACHE:get_newest_key()),
?assertEqual(?CACHE:cache_k(subscribe, <<"a">>), ?CACHE:get_oldest_key()).
t_cache_replacement(_) ->
application:set_env(emqx, acl_cache_ttl, 300000),
application:set_env(emqx, acl_cache_max_size, 3),
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow),
ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow),
ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow),
allow = ?CACHE:get_acl_cache(subscribe, <<"a">>),
allow = ?CACHE:get_acl_cache(publish, <<"b">>),
allow = ?CACHE:get_acl_cache(publish, <<"c">>),
3 = ?CACHE:get_cache_size(),
?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_newest_key()),
ok = ?CACHE:put_acl_cache(publish, <<"d">>, deny),
3 = ?CACHE:get_cache_size(),
?assertEqual(?CACHE:cache_k(publish, <<"d">>), ?CACHE:get_newest_key()),
?assertEqual(?CACHE:cache_k(publish, <<"b">>), ?CACHE:get_oldest_key()),
ok = ?CACHE:put_acl_cache(publish, <<"e">>, deny),
3 = ?CACHE:get_cache_size(),
?assertEqual(?CACHE:cache_k(publish, <<"e">>), ?CACHE:get_newest_key()),
?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_oldest_key()),
not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>),
not_found = ?CACHE:get_acl_cache(publish, <<"b">>),
allow = ?CACHE:get_acl_cache(publish, <<"c">>).
t_cache_cleanup(_) ->
application:set_env(emqx, acl_cache_ttl, 100),
application:set_env(emqx, acl_cache_max_size, 30),
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow),
ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow),
ct:sleep(150),
ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow),
3 = ?CACHE:get_cache_size(),
?CACHE:cleanup_acl_cache(),
?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_oldest_key()),
1 = ?CACHE:get_cache_size().
t_cache_auto_emtpy(_) ->
%% verify cache is emptied when cache full and even the newest
%% one is expired.
application:set_env(emqx, acl_cache_ttl, 100),
application:set_env(emqx, acl_cache_max_size, 3),
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow),
ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow),
ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow),
3 = ?CACHE:get_cache_size(),
ct:sleep(150),
ok = ?CACHE:put_acl_cache(subscribe, <<"d">>, deny),
1 = ?CACHE:get_cache_size().
t_cache_auto_cleanup(_) ->
%% verify we'll cleanup expired entries when we got a exipired acl
%% from cache.
application:set_env(emqx, acl_cache_ttl, 100),
application:set_env(emqx, acl_cache_max_size, 30),
ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow),
ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow),
ct:sleep(150),
ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow),
ok = ?CACHE:put_acl_cache(publish, <<"d">>, deny),
4 = ?CACHE:get_cache_size(),
%% "a" and "b" expires, while "c" and "d" not
not_found = ?CACHE:get_acl_cache(publish, <<"b">>),
2 = ?CACHE:get_cache_size(),
ct:sleep(150), %% now "c" and "d" expires
not_found = ?CACHE:get_acl_cache(publish, <<"c">>),
0 = ?CACHE:get_cache_size().
%%--------------------------------------------------------------------
%% emqx_access_rule
%%--------------------------------------------------------------------
t_compile_rule(_) ->
{allow, {'and', [{ipaddr, {{127,0,0,1}, {127,0,0,1}, 32}},
{user, <<"user">>}]}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} =
compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"user">>}]}, subscribe, ["$SYS/#", "#"]}),
{allow, {'or', [{ipaddr, {{127,0,0,1}, {127,0,0,1}, 32}},
{user, <<"user">>}]}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} =
compile({allow, {'or', [{ipaddr, "127.0.0.1"}, {user, <<"user">>}]}, subscribe, ["$SYS/#", "#"]}),
{allow, {ipaddr, {{127,0,0,1}, {127,0,0,1}, 32}}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} =
compile({allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]}),
{allow, {user, <<"testuser">>}, subscribe, [ [<<"a">>, <<"b">>, <<"c">>], [<<"d">>, <<"e">>, <<"f">>, '#'] ]} =
compile({allow, {user, "testuser"}, subscribe, ["a/b/c", "d/e/f/#"]}),
{allow, {user, <<"admin">>}, pubsub, [ [<<"d">>, <<"e">>, <<"f">>, '#'] ]} =
compile({allow, {user, "admin"}, pubsub, ["d/e/f/#"]}),
{allow, {client, <<"testClient">>}, publish, [ [<<"testTopics">>, <<"testClient">>] ]} =
compile({allow, {client, "testClient"}, publish, ["testTopics/testClient"]}),
{allow, all, pubsub, [{pattern, [<<"clients">>, <<"%c">>]}]} =
compile({allow, all, pubsub, ["clients/%c"]}),
{allow, all, subscribe, [{pattern, [<<"users">>, <<"%u">>, '#']}]} =
compile({allow, all, subscribe, ["users/%u/#"]}),
{deny, all, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} =
compile({deny, all, subscribe, ["$SYS/#", "#"]}),
{allow, all} = compile({allow, all}),
{deny, all} = compile({deny, all}).
t_match_rule(_) ->
ClientInfo1 = #{zone => external,
clientid => <<"testClient">>,
username => <<"TestUser">>,
peerhost => {127,0,0,1}
},
ClientInfo2 = #{zone => external,
clientid => <<"testClient">>,
username => <<"TestUser">>,
peerhost => {192,168,0,10}
},
{matched, allow} = match(ClientInfo1, <<"Test/Topic">>, {allow, all}),
{matched, deny} = match(ClientInfo1, <<"Test/Topic">>, {deny, all}),
{matched, allow} = match(ClientInfo1, <<"Test/Topic">>,
compile({allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]})),
{matched, allow} = match(ClientInfo2, <<"Test/Topic">>,
compile({allow, {ipaddr, "192.168.0.1/24"}, subscribe, ["$SYS/#", "#"]})),
{matched, allow} = match(ClientInfo1, <<"d/e/f/x">>,
compile({allow, {user, "TestUser"}, subscribe, ["a/b/c", "d/e/f/#"]})),
nomatch = match(ClientInfo1, <<"d/e/f/x">>, compile({allow, {user, "admin"}, pubsub, ["d/e/f/#"]})),
{matched, allow} = match(ClientInfo1, <<"testTopics/testClient">>,
compile({allow, {client, "testClient"}, publish, ["testTopics/testClient"]})),
{matched, allow} = match(ClientInfo1, <<"clients/testClient">>, compile({allow, all, pubsub, ["clients/%c"]})),
{matched, allow} = match(#{username => <<"user2">>}, <<"users/user2/abc/def">>,
compile({allow, all, subscribe, ["users/%u/#"]})),
{matched, deny} = match(ClientInfo1, <<"d/e/f">>, compile({deny, all, subscribe, ["$SYS/#", "#"]})),
Rule = compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"WrongUser">>}]}, publish, <<"Topic">>}),
nomatch = match(ClientInfo1, <<"Topic">>, Rule),
AndRule = compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"TestUser">>}]}, publish, <<"Topic">>}),
{matched, allow} = match(ClientInfo1, <<"Topic">>, AndRule),
OrRule = compile({allow, {'or', [{ipaddr, "127.0.0.1"}, {user, <<"WrongUser">>}]}, publish, ["Topic"]}),
{matched, allow} = match(ClientInfo1, <<"Topic">>, OrRule).

View File

@ -49,9 +49,6 @@ t_check_acl(_) ->
Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>), Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>),
?assertEqual(allow, emqx_access_control:check_acl(clientinfo(), Publish, <<"t">>)). ?assertEqual(allow, emqx_access_control:check_acl(clientinfo(), Publish, <<"t">>)).
t_reload_acl(_) ->
?assertEqual(ok, emqx_access_control:reload_acl()).
t_bypass_auth_plugins(_) -> t_bypass_auth_plugins(_) ->
AuthFun = fun(#{zone := bypass_zone}, AuthRes) -> AuthFun = fun(#{zone := bypass_zone}, AuthRes) ->
{stop, AuthRes#{auth_result => password_error}}; {stop, AuthRes#{auth_result => password_error}};

View File

@ -19,7 +19,6 @@
%% ACL callbacks %% ACL callbacks
-export([ init/1 -export([ init/1
, check_acl/2 , check_acl/2
, reload_acl/1
, description/0 , description/0
]). ]).
@ -29,9 +28,6 @@ init(AclOpts) ->
check_acl({_User, _PubSub, _Topic}, _State) -> check_acl({_User, _PubSub, _Topic}, _State) ->
allow. allow.
reload_acl(_State) ->
ok.
description() -> description() ->
"Test ACL Mod". "Test ACL Mod".

View File

@ -33,16 +33,9 @@ end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]). emqx_ct_helpers:stop_apps([]).
t_load_unload(_) -> t_load_unload(_) ->
?assertEqual({error,already_exists}, emqx_mod_acl_internal:load([])),
?assertEqual(ok, emqx_mod_acl_internal:unload([])), ?assertEqual(ok, emqx_mod_acl_internal:unload([])),
?assertEqual(ok, emqx_mod_acl_internal:load([])). ?assertEqual(ok, emqx_mod_acl_internal:load([])),
?assertEqual({error,already_exists}, emqx_mod_acl_internal:load([])).
t_all_rules(_) ->
application:set_env(emqx, acl_file, ""),
?assertMatch(#{}, emqx_mod_acl_internal:all_rules()),
application:set_env(emqx, acl_file, emqx_ct_helpers:deps_path(emqx, "etc/acl.conf")),
?assertMatch(#{publish := _, subscribe := _}, emqx_mod_acl_internal:all_rules()).
t_check_acl(_) -> t_check_acl(_) ->
Rules=#{publish => [{allow,all}], subscribe => [{deny, all}]}, Rules=#{publish => [{allow,all}], subscribe => [{deny, all}]},
@ -51,7 +44,7 @@ t_check_acl(_) ->
?assertEqual(ok, emqx_mod_acl_internal:check_acl(clientinfo(), connect, <<"t">>, [], Rules)). ?assertEqual(ok, emqx_mod_acl_internal:check_acl(clientinfo(), connect, <<"t">>, [], Rules)).
t_reload_acl(_) -> t_reload_acl(_) ->
?assertEqual(ok, emqx_mod_acl_internal:reload_acl()). ?assertEqual(ok, emqx_mod_acl_internal:reload([])).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper functions %% Helper functions

View File

@ -44,9 +44,7 @@ end_per_suite(_) ->
set_special_configs(emqx) -> set_special_configs(emqx) ->
application:set_env(emqx, modules, [{emqx_mod_delayed, []}]), application:set_env(emqx, modules, [{emqx_mod_delayed, []}]),
application:set_env(emqx, allow_anonymous, false), application:set_env(emqx, allow_anonymous, false),
application:set_env(emqx, enable_acl_cache, false), application:set_env(emqx, enable_acl_cache, false);
application:set_env(emqx, plugins_loaded_file,
emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins"));
set_special_configs(_App) -> set_special_configs(_App) ->
ok. ok.
@ -55,8 +53,6 @@ set_special_configs(_App) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_load_case(_) -> t_load_case(_) ->
ok = emqx_mod_delayed:unload([]),
timer:sleep(100),
UnHooks = emqx_hooks:lookup('message.publish'), UnHooks = emqx_hooks:lookup('message.publish'),
?assertEqual([], UnHooks), ?assertEqual([], UnHooks),
ok = emqx_mod_delayed:load([]), ok = emqx_mod_delayed:load([]),
@ -65,6 +61,7 @@ t_load_case(_) ->
ok. ok.
t_delayed_message(_) -> t_delayed_message(_) ->
ok = emqx_mod_delayed:load([]),
DelayedMsg = emqx_message:make(?MODULE, 1, <<"$delayed/1/publish">>, <<"delayed_m">>), DelayedMsg = emqx_message:make(?MODULE, 1, <<"$delayed/1/publish">>, <<"delayed_m">>),
?assertEqual({stop, DelayedMsg#message{topic = <<"publish">>, headers = #{allow_publish => false}}}, on_message_publish(DelayedMsg)), ?assertEqual({stop, DelayedMsg#message{topic = <<"publish">>, headers = #{allow_publish => false}}}, on_message_publish(DelayedMsg)),
@ -77,4 +74,5 @@ t_delayed_message(_) ->
timer:sleep(5000), timer:sleep(5000),
EmptyKey = mnesia:dirty_all_keys(emqx_mod_delayed), EmptyKey = mnesia:dirty_all_keys(emqx_mod_delayed),
?assertEqual([], EmptyKey). ?assertEqual([], EmptyKey),
ok = emqx_mod_delayed:unload([]).

View File

@ -0,0 +1,49 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_modules_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:boot_modules([]),
emqx_ct_helpers:start_apps([], fun set_sepecial_cfg/1),
Config.
set_sepecial_cfg(_) ->
application:set_env(emqx, modules_loaded_file, emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_modules")),
ok.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
t_load(_) ->
?assertEqual(ok, emqx_modules:unload()),
?assertEqual(ok, emqx_modules:load()),
?assertEqual({error, not_found}, emqx_modules:load(not_existed_module)),
?assertEqual({error, not_started}, emqx_modules:unload(emqx_mod_rewrite)),
?assertEqual(ignore, emqx_modules:reload(emqx_mod_rewrite)),
?assertEqual(ok, emqx_modules:reload(emqx_mod_acl_internal)).
t_list(_) ->
?assertMatch([{_, _} | _ ], emqx_modules:list()).