From d49f4118fe46d386bd894fd6eba5f37bf69df125 Mon Sep 17 00:00:00 2001 From: turtleDeng Date: Fri, 10 Apr 2020 19:55:21 +0800 Subject: [PATCH] Mgmt emqx modules (#3376) --- etc/emqx.conf | 35 +-- priv/emqx.schema | 51 +--- src/emqx_access_control.erl | 6 - src/emqx_gen_mod.erl | 2 + src/emqx_mod_acl_internal.erl | 27 +- src/emqx_mod_delayed.erl | 4 + src/emqx_mod_presence.erl | 3 + src/emqx_mod_rewrite.erl | 5 +- src/emqx_mod_subscription.erl | 3 + src/emqx_mod_sup.erl | 1 + src/emqx_mod_topic_metrics.erl | 5 + src/emqx_modules.erl | 145 +++++++++- test/emqx_SUITE_data/loaded_modules | 2 + test/emqx_access_SUITE.erl | 378 --------------------------- test/emqx_access_control_SUITE.erl | 3 - test/emqx_acl_test_mod.erl | 4 - test/emqx_mod_acl_internal_SUITE.erl | 13 +- test/emqx_mod_delayed_SUITE.erl | 10 +- test/emqx_modules_SUITE.erl | 49 ++++ 19 files changed, 235 insertions(+), 511 deletions(-) create mode 100644 test/emqx_SUITE_data/loaded_modules delete mode 100644 test/emqx_access_SUITE.erl create mode 100644 test/emqx_modules_SUITE.erl diff --git a/etc/emqx.conf b/etc/emqx.conf index 3c6547a65..e5b52617b 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -1818,15 +1818,14 @@ listener.wss.external.send_timeout_close = on ##-------------------------------------------------------------------- ## Modules ##-------------------------------------------------------------------- +## The file to store loaded module names. +## +## Value: File +modules.loaded_file = {{ platform_data_dir }}/loaded_modules ##-------------------------------------------------------------------- ## Presence Module -## Enable Presence Module. -## -## Value: on | off -module.presence = on - ## Sets the QoS for presence MQTT message. ## ## Value: 0 | 1 | 2 @@ -1835,11 +1834,6 @@ module.presence.qos = 1 ##-------------------------------------------------------------------- ## Subscription Module -## Enable Subscription Module. -## -## Value: on | off -module.subscription = off - ## Subscribe the Topics automatically when client connected. ## ## Value: String @@ -1875,31 +1869,10 @@ module.subscription = off ##-------------------------------------------------------------------- ## Rewrite Module -## Enable Rewrite Module. -## -## Value: on | off -module.rewrite = off - ## {rewrite, Topic, Re, Dest} ## module.rewrite.rule.1 = x/# ^x/y/(.+)$ z/y/$1 ## module.rewrite.rule.2 = y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2 -##-------------------------------------------------------------------- -## Topic Metrics Module - -## Enable Topic Metrics Module. -## -## Value: on | off -module.topic_metrics = off - -##-------------------------------------------------------------------- -## Delayed Module - -## Enable Delayed Module. -## -## Value: on | off -module.delayed = off - ##------------------------------------------------------------------- ## Plugins ##------------------------------------------------------------------- diff --git a/priv/emqx.schema b/priv/emqx.schema index 2a4a4cd81..f3bf3fb3f 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1766,9 +1766,8 @@ end}. %% Modules %%-------------------------------------------------------------------- -{mapping, "module.presence", "emqx.modules", [ - {default, off}, - {datatype, flag} +{mapping, "modules.loaded_file", "emqx.modules_loaded_file", [ + {datatype, string} ]}. {mapping, "module.presence.qos", "emqx.modules", [ @@ -1777,11 +1776,6 @@ end}. {validators, ["range:0-2"]} ]}. -{mapping, "module.subscription", "emqx.modules", [ - {default, off}, - {datatype, flag} -]}. - {mapping, "module.subscription.$id.topic", "emqx.modules", [ {datatype, string} ]}. @@ -1810,25 +1804,10 @@ end}. {validators, ["range:0-2"]} ]}. -{mapping, "module.rewrite", "emqx.modules", [ - {default, off}, - {datatype, flag} -]}. - {mapping, "module.rewrite.rule.$id", "emqx.modules", [ {datatype, string} ]}. -{mapping, "module.topic_metrics", "emqx.modules", [ - {default, off}, - {datatype, flag} -]}. - -{mapping, "module.delayed", "emqx.modules", [ - {default, off}, - {datatype, flag} -]}. - {translation, "emqx.modules", fun(Conf) -> Subscriptions = fun() -> List = cuttlefish_variable:filter_by_prefix("module.subscription", Conf), @@ -1847,26 +1826,12 @@ end}. end, Rules) end, lists:append([ - case cuttlefish:conf_get("module.presence", Conf) of %% Presence - true -> [{emqx_mod_presence, [{qos, cuttlefish:conf_get("module.presence.qos", Conf, 1)}]}]; - false -> [] - end, - case cuttlefish:conf_get("module.subscription", Conf) of %% Subscription - true -> [{emqx_mod_subscription, Subscriptions()}]; - false -> [] - end, - case cuttlefish:conf_get("module.rewrite", Conf) of %% Rewrite - true -> [{emqx_mod_rewrite, Rewrites()}]; - false -> [] - end, - case cuttlefish:conf_get("module.topic_metrics", Conf) of %% Topic Metrics - true -> [{emqx_mod_topic_metrics, []}]; - false -> [] - end, - case cuttlefish:conf_get("module.delayed", Conf) of %% Delayed - true -> [{emqx_mod_delayed, []}]; - false -> [] - end + [{emqx_mod_presence, [{qos, cuttlefish:conf_get("module.presence.qos", Conf, 1)}]}], + [{emqx_mod_subscription, Subscriptions()}], + [{emqx_mod_rewrite, Rewrites()}], + [{emqx_mod_topic_metrics, []}], + [{emqx_mod_delayed, []}], + [{emqx_mod_acl_internal, []}] ]) end}. diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index e891ca275..32383b3f0 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -21,7 +21,6 @@ -export([authenticate/1]). -export([ check_acl/3 - , reload_acl/0 ]). -type(result() :: #{auth_result := emqx_types:auth_result(), @@ -67,11 +66,6 @@ do_check_acl(ClientInfo = #{zone := Zone}, PubSub, Topic) -> _Other -> deny end. --spec(reload_acl() -> ok | {error, term()}). -reload_acl() -> - emqx_acl_cache:is_enabled() andalso emqx_acl_cache:empty_acl_cache(), - emqx_mod_acl_internal:reload_acl(). - default_auth_result(Zone) -> case emqx_zone:get_env(Zone, allow_anonymous, false) of true -> #{auth_result => success, anonymous => true}; diff --git a/src/emqx_gen_mod.erl b/src/emqx_gen_mod.erl index 533f5fa64..e320ec877 100644 --- a/src/emqx_gen_mod.erl +++ b/src/emqx_gen_mod.erl @@ -22,6 +22,8 @@ -callback(unload(State :: term()) -> term()). +-callback(description() -> any()). + -else. -export([behaviour_info/1]). diff --git a/src/emqx_mod_acl_internal.erl b/src/emqx_mod_acl_internal.erl index e429d6899..7b5b5c2a7 100644 --- a/src/emqx_mod_acl_internal.erl +++ b/src/emqx_mod_acl_internal.erl @@ -24,14 +24,13 @@ -logger_header("[ACL_INTERNAL]"). %% APIs --export([ all_rules/0 - , check_acl/5 - , reload_acl/0 - ]). +-export([check_acl/5]). %% emqx_gen_mod callbacks -export([ load/1 , unload/1 + , reload/1 + , description/0 ]). -define(MFA(M, F, A), {M, F, A}). @@ -44,18 +43,19 @@ %%-------------------------------------------------------------------- load(_Env) -> - Rules = rules_from_file(acl_file()), + Rules = rules_from_file(emqx:get_env(acl_file)), emqx_hooks:add('client.check_acl', ?MFA(?MODULE, check_acl, [Rules]), -1). unload(_Env) -> - Rules = rules_from_file(acl_file()), + Rules = rules_from_file(emqx:get_env(acl_file)), emqx_hooks:del('client.check_acl', ?MFA(?MODULE, check_acl, [Rules])). -%% @doc Read all rules --spec(all_rules() -> list(emqx_access_rule:rule())). -all_rules() -> - rules_from_file(acl_file()). +reload(_Env) -> + emqx_acl_cache:is_enabled() andalso emqx_acl_cache:empty_acl_cache(), + unload([]), load([]). +description() -> + "EMQ X Internal ACL Module". %%-------------------------------------------------------------------- %% ACL callbacks %%-------------------------------------------------------------------- @@ -71,16 +71,9 @@ check_acl(Client, PubSub, Topic, _AclResult, Rules) -> nomatch -> ok end. --spec(reload_acl() -> ok | {error, term()}). -reload_acl() -> - unload([]), load([]). - %%-------------------------------------------------------------------- %% Internal Functions %%-------------------------------------------------------------------- - -acl_file() -> emqx:get_env(acl_file). - lookup(PubSub, Rules) -> maps:get(PubSub, Rules, []). diff --git a/src/emqx_mod_delayed.erl b/src/emqx_mod_delayed.erl index c88523f59..f9a1c4756 100644 --- a/src/emqx_mod_delayed.erl +++ b/src/emqx_mod_delayed.erl @@ -17,6 +17,7 @@ -module(emqx_mod_delayed). -behaviour(gen_server). +-behaviour(emqx_gen_mod). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -24,6 +25,7 @@ %% emqx_gen_mod callbacks -export([ load/1 , unload/1 + , description/0 ]). -export([ start_link/0 @@ -62,6 +64,8 @@ unload(_Env) -> emqx:unhook('message.publish', {?MODULE, on_message_publish, []}), emqx_mod_sup:stop_child(?MODULE). +description() -> + "EMQ X Delayed Publish Module". %%-------------------------------------------------------------------- %% Hooks %%-------------------------------------------------------------------- diff --git a/src/emqx_mod_presence.erl b/src/emqx_mod_presence.erl index 1abfaf73a..f5aa2279e 100644 --- a/src/emqx_mod_presence.erl +++ b/src/emqx_mod_presence.erl @@ -26,6 +26,7 @@ %% emqx_gen_mod callbacks -export([ load/1 , unload/1 + , description/0 ]). -export([ on_client_connected/3 @@ -44,6 +45,8 @@ unload(_Env) -> emqx_hooks:del('client.connected', {?MODULE, on_client_connected}), emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected}). +description() -> + "EMQ X Presence Module". %%-------------------------------------------------------------------- %% Callbacks %%-------------------------------------------------------------------- diff --git a/src/emqx_mod_rewrite.erl b/src/emqx_mod_rewrite.erl index 8098c9595..9702b7350 100644 --- a/src/emqx_mod_rewrite.erl +++ b/src/emqx_mod_rewrite.erl @@ -16,7 +16,7 @@ -module(emqx_mod_rewrite). --behavior(emqx_gen_mod). +-behaviour(emqx_gen_mod). -include_lib("emqx.hrl"). -include_lib("emqx_mqtt.hrl"). @@ -36,6 +36,7 @@ %% emqx_gen_mod callbacks -export([ load/1 , unload/1 + , description/0 ]). %%-------------------------------------------------------------------- @@ -62,6 +63,8 @@ unload(_) -> emqx_hooks:del('client.unsubscribe', {?MODULE, rewrite_unsubscribe}), emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}). +description() -> + "EMQ X Topic Rewrite Module". %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- diff --git a/src/emqx_mod_subscription.erl b/src/emqx_mod_subscription.erl index 79bb8dc63..b6d04528b 100644 --- a/src/emqx_mod_subscription.erl +++ b/src/emqx_mod_subscription.erl @@ -24,6 +24,7 @@ %% emqx_gen_mod callbacks -export([ load/1 , unload/1 + , description/0 ]). %% APIs @@ -49,6 +50,8 @@ on_client_connected(#{clientid := ClientId, username := Username}, _ConnInfo = # unload(_) -> emqx_hooks:del('client.connected', {?MODULE, on_client_connected}). +description() -> + "EMQ X Subscription Module". %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- diff --git a/src/emqx_mod_sup.erl b/src/emqx_mod_sup.erl index 74caa419b..b5512013c 100644 --- a/src/emqx_mod_sup.erl +++ b/src/emqx_mod_sup.erl @@ -58,5 +58,6 @@ stop_child(ChildId) -> %%-------------------------------------------------------------------- init([]) -> + ok = emqx_tables:new(emqx_modules, [set, public, {write_concurrency, true}]), {ok, {{one_for_one, 10, 100}, []}}. diff --git a/src/emqx_mod_topic_metrics.erl b/src/emqx_mod_topic_metrics.erl index 53dde662e..5deb42b92 100644 --- a/src/emqx_mod_topic_metrics.erl +++ b/src/emqx_mod_topic_metrics.erl @@ -17,6 +17,7 @@ -module(emqx_mod_topic_metrics). -behaviour(gen_server). +-behaviour(emqx_gen_mod). -include("emqx.hrl"). -include("logger.hrl"). @@ -26,6 +27,7 @@ -export([ load/1 , unload/1 + , description/0 ]). -export([ on_message_publish/1 @@ -104,6 +106,9 @@ unload(_Env) -> emqx:unhook('message.delivered', fun ?MODULE:on_message_delivered/2), emqx_mod_sup:stop_child(?MODULE). +description() -> + "EMQ X Topic Metrics Module". + on_message_publish(#message{topic = Topic, qos = QoS}) -> case is_registered(Topic) of true -> diff --git a/src/emqx_modules.erl b/src/emqx_modules.erl index 0e7e931d0..948359aa7 100644 --- a/src/emqx_modules.erl +++ b/src/emqx_modules.erl @@ -20,28 +20,149 @@ -logger_header("[Modules]"). --export([ load/0 +-export([ list/0 + , load/0 + , load/1 , unload/0 + , unload/1 + , reload/1 + , load_module/2 ]). +%% @doc List all available plugins +-spec(list() -> [{atom(), boolean()}]). +list() -> + ets:tab2list(?MODULE). + %% @doc Load all the extended modules. -spec(load() -> ok). load() -> - ok = emqx_mod_acl_internal:load([]), - lists:foreach(fun load/1, modules()). + case emqx:get_env(modules_loaded_file) of + undefined -> ignore; + File -> + load_modules(File) + end. -load({Mod, Env}) -> - ok = Mod:load(Env), - ?LOG(info, "Load ~s module successfully.", [Mod]). - -modules() -> emqx:get_env(modules, []). +load(ModuleName) -> + case find_module(ModuleName) of + [] -> + ?LOG(alert, "Module ~s not found, cannot load it", [ModuleName]), + {error, not_found}; + [{ModuleName, true}] -> + ?LOG(notice, "Module ~s is already started", [ModuleName]), + {error, already_started}; + [{ModuleName, false}] -> + emqx_modules:load_module(ModuleName, true) + end. %% @doc Unload all the extended modules. -spec(unload() -> ok). unload() -> - ok = emqx_mod_acl_internal:unload([]), - lists:foreach(fun unload/1, modules()). + case emqx:get_env(modules_loaded_file) of + undefined -> ignore; + File -> + unload_modules(File) + end. -unload({Mod, Env}) -> - Mod:unload(Env). +unload(ModuleName) -> + case find_module(ModuleName) of + [] -> + ?LOG(alert, "Module ~s not found, cannot load it", [ModuleName]), + {error, not_found}; + [{ModuleName, false}] -> + ?LOG(error, "Module ~s is not started", [ModuleName]), + {error, not_started}; + [{ModuleName, true}] -> + unload_module(ModuleName, true) + end. +reload(emqx_mod_acl_internal) -> + Modules = emqx:get_env(modules, []), + Env = proplists:get_value(emqx_mod_acl_internal, Modules, undefined), + case emqx_mod_acl_internal:reload(Env) of + ok -> + ?LOG(info, "Reload ~s module successfully.", [emqx_mod_acl_internal]); + {error, Error} -> + ?LOG(error, "Reload module ~s failed, cannot start for ~0p", [emqx_mod_acl_internal, Error]) + end; +reload(_) -> + ignore. + +find_module(ModuleName) -> + ets:lookup(?MODULE, ModuleName). + +filter_module(ModuleNames) -> + filter_module(ModuleNames, emqx:get_env(modules, [])). +filter_module([], Acc) -> + Acc; +filter_module([{ModuleName, true} | ModuleNames], Acc) -> + filter_module(ModuleNames, lists:keydelete(ModuleName, 1, Acc)); +filter_module([{_, false} | ModuleNames], Acc) -> + filter_module(ModuleNames, Acc). + +load_modules(File) -> + case file:consult(File) of + {ok, ModuleNames} -> + lists:foreach(fun({ModuleName, _}) -> + ets:insert(?MODULE, {ModuleName, false}) + end, filter_module(ModuleNames)), + lists:foreach(fun load_module/1, ModuleNames); + {error, Error} -> + ?LOG(alert, "Failed to read: ~p, error: ~p", [File, Error]) + end. + +load_module({ModuleName, true}) -> + emqx_modules:load_module(ModuleName, false); +load_module({ModuleName, false}) -> + ets:insert(?MODULE, {ModuleName, false}); +load_module(ModuleName) -> + load_module({ModuleName, true}). + +load_module(ModuleName, Persistent) -> + Modules = emqx:get_env(modules, []), + Env = proplists:get_value(ModuleName, Modules, undefined), + case ModuleName:load(Env) of + ok -> + ets:insert(?MODULE, {ModuleName, true}), + write_loaded(Persistent), + ?LOG(info, "Load ~s module successfully.", [ModuleName]); + {error, Error} -> + ?LOG(error, "Load module ~s failed, cannot load for ~0p", [ModuleName, Error]), + {error, Error} + end. + +unload_modules(File) -> + case file:consult(File) of + {ok, ModuleNames} -> + lists:foreach(fun unload_module/1, ModuleNames); + {error, Error} -> + ?LOG(alert, "Failed to read: ~p, error: ~p", [File, Error]) + end. +unload_module({ModuleName, true}) -> + unload_module(ModuleName, false); +unload_module({ModuleName, false}) -> + ets:insert(?MODULE, {ModuleName, false}); +unload_module(ModuleName) -> + unload_module({ModuleName, true}). + +unload_module(ModuleName, Persistent) -> + Modules = emqx:get_env(modules, []), + Env = proplists:get_value(ModuleName, Modules, undefined), + case ModuleName:unload(Env) of + ok -> + ets:insert(?MODULE, {ModuleName, false}), + write_loaded(Persistent), + ?LOG(info, "Unload ~s module successfully.", [ModuleName]); + {error, Error} -> + ?LOG(error, "Unload module ~s failed, cannot unload for ~0p", [ModuleName, Error]) + end. + +write_loaded(true) -> + FilePath = emqx:get_env(modules_loaded_file), + case file:write_file(FilePath, [io_lib:format("~p.~n", [Name]) || Name <- list()]) of + ok -> ok; + {error, Error} -> + ?LOG(error, "Write File ~p Error: ~p", [FilePath, Error]), + {error, Error} + end; +write_loaded(false) -> ok. \ No newline at end of file diff --git a/test/emqx_SUITE_data/loaded_modules b/test/emqx_SUITE_data/loaded_modules new file mode 100644 index 000000000..49effa0c5 --- /dev/null +++ b/test/emqx_SUITE_data/loaded_modules @@ -0,0 +1,2 @@ +{emqx_mod_acl_internal, true}. +{emqx_mod_presence, true}. \ No newline at end of file diff --git a/test/emqx_access_SUITE.erl b/test/emqx_access_SUITE.erl deleted file mode 100644 index 7cc808b5a..000000000 --- a/test/emqx_access_SUITE.erl +++ /dev/null @@ -1,378 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_access_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include("emqx.hrl"). - --include_lib("eunit/include/eunit.hrl"). --include_lib("common_test/include/ct.hrl"). - --define(AC, emqx_access_control). --define(CACHE, emqx_acl_cache). - --import(emqx_access_rule, - [ compile/1 - , match/3 - ]). - -all() -> - [{group, access_control}, - {group, acl_cache}, - {group, access_control_cache_mode}, - {group, access_rule} - ]. - -groups() -> - [{access_control, [sequence], - [t_reload_acl, - t_check_acl_1, - t_check_acl_2]}, - {access_control_cache_mode, [sequence], - [t_acl_cache_basic, - t_acl_cache_expiry, - t_acl_cache_cleanup, - t_acl_cache_full]}, - {acl_cache, [sequence], - [t_put_get_del_cache, - t_cache_update, - t_cache_expiry, - t_cache_replacement, - t_cache_cleanup, - t_cache_auto_emtpy, - t_cache_auto_cleanup]}, - {access_rule, [parallel], - [t_compile_rule, - t_match_rule] - }]. - -init_per_suite(Config) -> - emqx_ct_helpers:boot_modules([router, broker]), - emqx_ct_helpers:start_apps([]), - Config. - -end_per_suite(_Config) -> - emqx_ct_helpers:stop_apps([]). - -init_per_group(Group, Config) when Group =:= access_control; - Group =:= access_control_cache_mode -> - prepare_config(Group), - application:load(emqx), - Config; -init_per_group(_Group, Config) -> - Config. - -prepare_config(Group = access_control) -> - set_acl_config_file(Group), - application:set_env(emqx, enable_acl_cache, false); -prepare_config(Group = access_control_cache_mode) -> - set_acl_config_file(Group), - application:set_env(emqx, enable_acl_cache, true), - application:set_env(emqx, acl_cache_max_size, 100). - -set_acl_config_file(_Group) -> - Rules = [{allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]}, - {allow, {user, "testuser"}, subscribe, ["a/b/c", "d/e/f/#"]}, - {allow, {user, "admin"}, pubsub, ["a/b/c", "d/e/f/#"]}, - {allow, {client, "testClient"}, subscribe, ["testTopics/testClient"]}, - {allow, all, subscribe, ["clients/%c"]}, - {allow, all, pubsub, ["users/%u/#"]}, - {deny, all, subscribe, ["$SYS/#", "#"]}, - {deny, all}], - write_config("access_SUITE_acl.conf", Rules), - application:set_env(emqx, acl_file, "access_SUITE_acl.conf"). - -write_config(Filename, Terms) -> - file:write_file(Filename, [io_lib:format("~tp.~n", [Term]) || Term <- Terms]). - -end_per_group(_Group, Config) -> - Config. - -%%-------------------------------------------------------------------- -%% emqx_access_control -%%-------------------------------------------------------------------- - -t_reload_acl(_) -> - ok = ?AC:reload_acl(). - -t_check_acl_1(_) -> - Client = #{zone => external, - clientid => <<"client1">>, - username => <<"testuser">> - }, - allow = ?AC:check_acl(Client, subscribe, <<"users/testuser/1">>), - allow = ?AC:check_acl(Client, subscribe, <<"clients/client1">>), - deny = ?AC:check_acl(Client, subscribe, <<"clients/client1/x/y">>), - allow = ?AC:check_acl(Client, publish, <<"users/testuser/1">>), - allow = ?AC:check_acl(Client, subscribe, <<"a/b/c">>). - -t_check_acl_2(_) -> - Client = #{zone => external, - clientid => <<"client2">>, - username => <<"xyz">> - }, - deny = ?AC:check_acl(Client, subscribe, <<"a/b/c">>). - -t_acl_cache_basic(_) -> - Client = #{zone => external, - clientid => <<"client1">>, - username => <<"testuser">> - }, - not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), - not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), - - allow = ?AC:check_acl(Client, subscribe, <<"users/testuser/1">>), - allow = ?AC:check_acl(Client, subscribe, <<"clients/client1">>), - - allow = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), - allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>). - -t_acl_cache_expiry(_) -> - application:set_env(emqx, acl_cache_ttl, 100), - Client = #{zone => external, - clientid => <<"client1">>, - username => <<"testuser">> - }, - allow = ?AC:check_acl(Client, subscribe, <<"clients/client1">>), - allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), - ct:sleep(150), - not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>). - -t_acl_cache_full(_) -> - application:set_env(emqx, acl_cache_max_size, 1), - Client = #{zone => external, - clientid => <<"client1">>, - username => <<"testuser">> - }, - allow = ?AC:check_acl(Client, subscribe, <<"users/testuser/1">>), - allow = ?AC:check_acl(Client, subscribe, <<"clients/client1">>), - - %% the older ones (the <<"users/testuser/1">>) will be evicted first - not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), - allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>). - -t_acl_cache_cleanup(_) -> - %% The acl cache will try to evict memory, if the size is full and the newest - %% cache entry is expired - application:set_env(emqx, acl_cache_ttl, 100), - application:set_env(emqx, acl_cache_max_size, 2), - Client = #{zone => external, - clientid => <<"client1">>, - username => <<"testuser">> - }, - allow = ?AC:check_acl(Client, subscribe, <<"users/testuser/1">>), - allow = ?AC:check_acl(Client, subscribe, <<"clients/client1">>), - - allow = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), - allow = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), - - ct:sleep(150), - %% now the cache is full and the newest one - "clients/client1" - %% should be expired, so we'll empty the cache before putting - %% the next cache entry - deny = ?AC:check_acl(Client, subscribe, <<"#">>), - - not_found = ?CACHE:get_acl_cache(subscribe, <<"users/testuser/1">>), - not_found = ?CACHE:get_acl_cache(subscribe, <<"clients/client1">>), - deny = ?CACHE:get_acl_cache(subscribe, <<"#">>). - -t_put_get_del_cache(_) -> - application:set_env(emqx, acl_cache_ttl, 300000), - application:set_env(emqx, acl_cache_max_size, 30), - - not_found = ?CACHE:get_acl_cache(publish, <<"a">>), - ok = ?CACHE:put_acl_cache(publish, <<"a">>, allow), - allow = ?CACHE:get_acl_cache(publish, <<"a">>), - - not_found = ?CACHE:get_acl_cache(subscribe, <<"b">>), - ok = ?CACHE:put_acl_cache(subscribe, <<"b">>, deny), - deny = ?CACHE:get_acl_cache(subscribe, <<"b">>), - - 2 = ?CACHE:get_cache_size(), - ?assertEqual(?CACHE:cache_k(subscribe, <<"b">>), ?CACHE:get_newest_key()). - -t_cache_expiry(_) -> - application:set_env(emqx, acl_cache_ttl, 100), - application:set_env(emqx, acl_cache_max_size, 30), - ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), - allow = ?CACHE:get_acl_cache(subscribe, <<"a">>), - - ct:sleep(150), - not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>), - - ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, deny), - deny = ?CACHE:get_acl_cache(subscribe, <<"a">>), - - ct:sleep(150), - not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>). - -t_cache_update(_) -> - application:set_env(emqx, acl_cache_ttl, 300000), - application:set_env(emqx, acl_cache_max_size, 30), - [] = ?CACHE:dump_acl_cache(), - - ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), - ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), - ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), - 3 = ?CACHE:get_cache_size(), - ?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_newest_key()), - - %% update the 2nd one - ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), - ct:pal("dump acl cache: ~p~n", [?CACHE:dump_acl_cache()]), - - 3 = ?CACHE:get_cache_size(), - ?assertEqual(?CACHE:cache_k(publish, <<"b">>), ?CACHE:get_newest_key()), - ?assertEqual(?CACHE:cache_k(subscribe, <<"a">>), ?CACHE:get_oldest_key()). - -t_cache_replacement(_) -> - application:set_env(emqx, acl_cache_ttl, 300000), - application:set_env(emqx, acl_cache_max_size, 3), - ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), - ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), - ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), - allow = ?CACHE:get_acl_cache(subscribe, <<"a">>), - allow = ?CACHE:get_acl_cache(publish, <<"b">>), - allow = ?CACHE:get_acl_cache(publish, <<"c">>), - 3 = ?CACHE:get_cache_size(), - ?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_newest_key()), - - ok = ?CACHE:put_acl_cache(publish, <<"d">>, deny), - 3 = ?CACHE:get_cache_size(), - ?assertEqual(?CACHE:cache_k(publish, <<"d">>), ?CACHE:get_newest_key()), - ?assertEqual(?CACHE:cache_k(publish, <<"b">>), ?CACHE:get_oldest_key()), - - ok = ?CACHE:put_acl_cache(publish, <<"e">>, deny), - 3 = ?CACHE:get_cache_size(), - ?assertEqual(?CACHE:cache_k(publish, <<"e">>), ?CACHE:get_newest_key()), - ?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_oldest_key()), - - not_found = ?CACHE:get_acl_cache(subscribe, <<"a">>), - not_found = ?CACHE:get_acl_cache(publish, <<"b">>), - allow = ?CACHE:get_acl_cache(publish, <<"c">>). - -t_cache_cleanup(_) -> - application:set_env(emqx, acl_cache_ttl, 100), - application:set_env(emqx, acl_cache_max_size, 30), - ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), - ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), - ct:sleep(150), - ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), - 3 = ?CACHE:get_cache_size(), - - ?CACHE:cleanup_acl_cache(), - ?assertEqual(?CACHE:cache_k(publish, <<"c">>), ?CACHE:get_oldest_key()), - 1 = ?CACHE:get_cache_size(). - -t_cache_auto_emtpy(_) -> - %% verify cache is emptied when cache full and even the newest - %% one is expired. - application:set_env(emqx, acl_cache_ttl, 100), - application:set_env(emqx, acl_cache_max_size, 3), - ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), - ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), - ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), - 3 = ?CACHE:get_cache_size(), - - ct:sleep(150), - ok = ?CACHE:put_acl_cache(subscribe, <<"d">>, deny), - 1 = ?CACHE:get_cache_size(). - -t_cache_auto_cleanup(_) -> - %% verify we'll cleanup expired entries when we got a exipired acl - %% from cache. - application:set_env(emqx, acl_cache_ttl, 100), - application:set_env(emqx, acl_cache_max_size, 30), - ok = ?CACHE:put_acl_cache(subscribe, <<"a">>, allow), - ok = ?CACHE:put_acl_cache(publish, <<"b">>, allow), - ct:sleep(150), - ok = ?CACHE:put_acl_cache(publish, <<"c">>, allow), - ok = ?CACHE:put_acl_cache(publish, <<"d">>, deny), - 4 = ?CACHE:get_cache_size(), - - %% "a" and "b" expires, while "c" and "d" not - not_found = ?CACHE:get_acl_cache(publish, <<"b">>), - 2 = ?CACHE:get_cache_size(), - - ct:sleep(150), %% now "c" and "d" expires - not_found = ?CACHE:get_acl_cache(publish, <<"c">>), - 0 = ?CACHE:get_cache_size(). - -%%-------------------------------------------------------------------- -%% emqx_access_rule -%%-------------------------------------------------------------------- - -t_compile_rule(_) -> - {allow, {'and', [{ipaddr, {{127,0,0,1}, {127,0,0,1}, 32}}, - {user, <<"user">>}]}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} = - compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"user">>}]}, subscribe, ["$SYS/#", "#"]}), - {allow, {'or', [{ipaddr, {{127,0,0,1}, {127,0,0,1}, 32}}, - {user, <<"user">>}]}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} = - compile({allow, {'or', [{ipaddr, "127.0.0.1"}, {user, <<"user">>}]}, subscribe, ["$SYS/#", "#"]}), - - {allow, {ipaddr, {{127,0,0,1}, {127,0,0,1}, 32}}, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} = - compile({allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]}), - {allow, {user, <<"testuser">>}, subscribe, [ [<<"a">>, <<"b">>, <<"c">>], [<<"d">>, <<"e">>, <<"f">>, '#'] ]} = - compile({allow, {user, "testuser"}, subscribe, ["a/b/c", "d/e/f/#"]}), - {allow, {user, <<"admin">>}, pubsub, [ [<<"d">>, <<"e">>, <<"f">>, '#'] ]} = - compile({allow, {user, "admin"}, pubsub, ["d/e/f/#"]}), - {allow, {client, <<"testClient">>}, publish, [ [<<"testTopics">>, <<"testClient">>] ]} = - compile({allow, {client, "testClient"}, publish, ["testTopics/testClient"]}), - {allow, all, pubsub, [{pattern, [<<"clients">>, <<"%c">>]}]} = - compile({allow, all, pubsub, ["clients/%c"]}), - {allow, all, subscribe, [{pattern, [<<"users">>, <<"%u">>, '#']}]} = - compile({allow, all, subscribe, ["users/%u/#"]}), - {deny, all, subscribe, [ [<<"$SYS">>, '#'], ['#'] ]} = - compile({deny, all, subscribe, ["$SYS/#", "#"]}), - {allow, all} = compile({allow, all}), - {deny, all} = compile({deny, all}). - -t_match_rule(_) -> - ClientInfo1 = #{zone => external, - clientid => <<"testClient">>, - username => <<"TestUser">>, - peerhost => {127,0,0,1} - }, - ClientInfo2 = #{zone => external, - clientid => <<"testClient">>, - username => <<"TestUser">>, - peerhost => {192,168,0,10} - }, - {matched, allow} = match(ClientInfo1, <<"Test/Topic">>, {allow, all}), - {matched, deny} = match(ClientInfo1, <<"Test/Topic">>, {deny, all}), - {matched, allow} = match(ClientInfo1, <<"Test/Topic">>, - compile({allow, {ipaddr, "127.0.0.1"}, subscribe, ["$SYS/#", "#"]})), - {matched, allow} = match(ClientInfo2, <<"Test/Topic">>, - compile({allow, {ipaddr, "192.168.0.1/24"}, subscribe, ["$SYS/#", "#"]})), - {matched, allow} = match(ClientInfo1, <<"d/e/f/x">>, - compile({allow, {user, "TestUser"}, subscribe, ["a/b/c", "d/e/f/#"]})), - nomatch = match(ClientInfo1, <<"d/e/f/x">>, compile({allow, {user, "admin"}, pubsub, ["d/e/f/#"]})), - {matched, allow} = match(ClientInfo1, <<"testTopics/testClient">>, - compile({allow, {client, "testClient"}, publish, ["testTopics/testClient"]})), - {matched, allow} = match(ClientInfo1, <<"clients/testClient">>, compile({allow, all, pubsub, ["clients/%c"]})), - {matched, allow} = match(#{username => <<"user2">>}, <<"users/user2/abc/def">>, - compile({allow, all, subscribe, ["users/%u/#"]})), - {matched, deny} = match(ClientInfo1, <<"d/e/f">>, compile({deny, all, subscribe, ["$SYS/#", "#"]})), - Rule = compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"WrongUser">>}]}, publish, <<"Topic">>}), - nomatch = match(ClientInfo1, <<"Topic">>, Rule), - AndRule = compile({allow, {'and', [{ipaddr, "127.0.0.1"}, {user, <<"TestUser">>}]}, publish, <<"Topic">>}), - {matched, allow} = match(ClientInfo1, <<"Topic">>, AndRule), - OrRule = compile({allow, {'or', [{ipaddr, "127.0.0.1"}, {user, <<"WrongUser">>}]}, publish, ["Topic"]}), - {matched, allow} = match(ClientInfo1, <<"Topic">>, OrRule). - diff --git a/test/emqx_access_control_SUITE.erl b/test/emqx_access_control_SUITE.erl index ca4db1fbb..a327eea6d 100644 --- a/test/emqx_access_control_SUITE.erl +++ b/test/emqx_access_control_SUITE.erl @@ -49,9 +49,6 @@ t_check_acl(_) -> Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>), ?assertEqual(allow, emqx_access_control:check_acl(clientinfo(), Publish, <<"t">>)). -t_reload_acl(_) -> - ?assertEqual(ok, emqx_access_control:reload_acl()). - t_bypass_auth_plugins(_) -> AuthFun = fun(#{zone := bypass_zone}, AuthRes) -> {stop, AuthRes#{auth_result => password_error}}; diff --git a/test/emqx_acl_test_mod.erl b/test/emqx_acl_test_mod.erl index e698efc4d..5d36cce78 100644 --- a/test/emqx_acl_test_mod.erl +++ b/test/emqx_acl_test_mod.erl @@ -19,7 +19,6 @@ %% ACL callbacks -export([ init/1 , check_acl/2 - , reload_acl/1 , description/0 ]). @@ -29,9 +28,6 @@ init(AclOpts) -> check_acl({_User, _PubSub, _Topic}, _State) -> allow. -reload_acl(_State) -> - ok. - description() -> "Test ACL Mod". diff --git a/test/emqx_mod_acl_internal_SUITE.erl b/test/emqx_mod_acl_internal_SUITE.erl index 4849fab7d..bd9a0bf5a 100644 --- a/test/emqx_mod_acl_internal_SUITE.erl +++ b/test/emqx_mod_acl_internal_SUITE.erl @@ -33,16 +33,9 @@ end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([]). t_load_unload(_) -> - ?assertEqual({error,already_exists}, emqx_mod_acl_internal:load([])), ?assertEqual(ok, emqx_mod_acl_internal:unload([])), - ?assertEqual(ok, emqx_mod_acl_internal:load([])). - -t_all_rules(_) -> - application:set_env(emqx, acl_file, ""), - ?assertMatch(#{}, emqx_mod_acl_internal:all_rules()), - - application:set_env(emqx, acl_file, emqx_ct_helpers:deps_path(emqx, "etc/acl.conf")), - ?assertMatch(#{publish := _, subscribe := _}, emqx_mod_acl_internal:all_rules()). + ?assertEqual(ok, emqx_mod_acl_internal:load([])), + ?assertEqual({error,already_exists}, emqx_mod_acl_internal:load([])). t_check_acl(_) -> Rules=#{publish => [{allow,all}], subscribe => [{deny, all}]}, @@ -51,7 +44,7 @@ t_check_acl(_) -> ?assertEqual(ok, emqx_mod_acl_internal:check_acl(clientinfo(), connect, <<"t">>, [], Rules)). t_reload_acl(_) -> - ?assertEqual(ok, emqx_mod_acl_internal:reload_acl()). + ?assertEqual(ok, emqx_mod_acl_internal:reload([])). %%-------------------------------------------------------------------- %% Helper functions diff --git a/test/emqx_mod_delayed_SUITE.erl b/test/emqx_mod_delayed_SUITE.erl index 99883eb16..3759e8c4d 100644 --- a/test/emqx_mod_delayed_SUITE.erl +++ b/test/emqx_mod_delayed_SUITE.erl @@ -44,9 +44,7 @@ end_per_suite(_) -> set_special_configs(emqx) -> application:set_env(emqx, modules, [{emqx_mod_delayed, []}]), application:set_env(emqx, allow_anonymous, false), - application:set_env(emqx, enable_acl_cache, false), - application:set_env(emqx, plugins_loaded_file, - emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins")); + application:set_env(emqx, enable_acl_cache, false); set_special_configs(_App) -> ok. @@ -55,8 +53,6 @@ set_special_configs(_App) -> %%-------------------------------------------------------------------- t_load_case(_) -> - ok = emqx_mod_delayed:unload([]), - timer:sleep(100), UnHooks = emqx_hooks:lookup('message.publish'), ?assertEqual([], UnHooks), ok = emqx_mod_delayed:load([]), @@ -65,6 +61,7 @@ t_load_case(_) -> ok. t_delayed_message(_) -> + ok = emqx_mod_delayed:load([]), DelayedMsg = emqx_message:make(?MODULE, 1, <<"$delayed/1/publish">>, <<"delayed_m">>), ?assertEqual({stop, DelayedMsg#message{topic = <<"publish">>, headers = #{allow_publish => false}}}, on_message_publish(DelayedMsg)), @@ -77,4 +74,5 @@ t_delayed_message(_) -> timer:sleep(5000), EmptyKey = mnesia:dirty_all_keys(emqx_mod_delayed), - ?assertEqual([], EmptyKey). + ?assertEqual([], EmptyKey), + ok = emqx_mod_delayed:unload([]). diff --git a/test/emqx_modules_SUITE.erl b/test/emqx_modules_SUITE.erl new file mode 100644 index 000000000..eb161d02b --- /dev/null +++ b/test/emqx_modules_SUITE.erl @@ -0,0 +1,49 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_modules_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + + emqx_ct_helpers:boot_modules([]), + emqx_ct_helpers:start_apps([], fun set_sepecial_cfg/1), + Config. + +set_sepecial_cfg(_) -> + application:set_env(emqx, modules_loaded_file, emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_modules")), + ok. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([]). + +t_load(_) -> + ?assertEqual(ok, emqx_modules:unload()), + ?assertEqual(ok, emqx_modules:load()), + ?assertEqual({error, not_found}, emqx_modules:load(not_existed_module)), + ?assertEqual({error, not_started}, emqx_modules:unload(emqx_mod_rewrite)), + ?assertEqual(ignore, emqx_modules:reload(emqx_mod_rewrite)), + ?assertEqual(ok, emqx_modules:reload(emqx_mod_acl_internal)). + +t_list(_) -> + ?assertMatch([{_, _} | _ ], emqx_modules:list()). +