diff --git a/apps/emqx_web_hook/test/props/prop_webhook_confs.erl b/apps/emqx_web_hook/test/props/prop_webhook_confs.erl index 24903ddec..8a637c720 100644 --- a/apps/emqx_web_hook/test/props/prop_webhook_confs.erl +++ b/apps/emqx_web_hook/test/props/prop_webhook_confs.erl @@ -62,7 +62,6 @@ do_teardown(_) -> set_special_cfgs(_) -> application:set_env(emqx, plugins_loaded_file, undefined), - application:set_env(emqx, modules_loaded_file, undefined), ok. assert_confs([{"web.hook.api.url", Url}|More], Envs) -> diff --git a/etc/emqx.conf b/etc/emqx.conf index 33db7abb6..9259b1de1 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -2027,64 +2027,6 @@ listener.wss.external.allow_origin_absence = true ## Value: http://url eg. https://localhost:8084, https://127.0.0.1:8084 listener.wss.external.check_origins = https://localhost:8084, https://127.0.0.1:8084 -##-------------------------------------------------------------------- -## Modules -##-------------------------------------------------------------------- -## The file to store loaded module names. -## -## Value: File -modules.loaded_file = {{ platform_data_dir }}/loaded_modules - -##-------------------------------------------------------------------- -## Presence Module - -## Sets the QoS for presence MQTT message. -## -## Value: 0 | 1 | 2 -module.presence.qos = 1 - -##-------------------------------------------------------------------- -## Subscription Module - -## Subscribe the Topics automatically when client connected. -## -## Value: String -## module.subscription.1.topic = connected/%c/%u - -## Qos of the proxy subscription. -## -## Value: 0 | 1 | 2 -## Default: 0 -## module.subscription.1.qos = 0 - -## No Local of the proxy subscription options. -## This configuration only takes effect in the MQTT V5 protocol. -## -## Value: 0 | 1 -## Default: 0 -## module.subscription.1.nl = 0 - -## Retain As Published of the proxy subscription options. -## This configuration only takes effect in the MQTT V5 protocol. -## -## Value: 0 | 1 -## Default: 0 -## module.subscription.1.rap = 0 - -## Retain Handling of the proxy subscription options. -## This configuration only takes effect in the MQTT V5 protocol. -## -## Value: 0 | 1 | 2 -## Default: 0 -## module.subscription.1.rh = 0 - -##-------------------------------------------------------------------- -## Rewrite Module - -## {rewrite, Topic, Re, Dest} -## module.rewrite.pub.rule.1 = x/# ^x/y/(.+)$ z/y/$1 -## module.rewrite.sub.rule.1 = y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2 - ##------------------------------------------------------------------- ## Plugins ##------------------------------------------------------------------- diff --git a/lib-opensource/emqx_modules/etc/emqx_modules.conf b/lib-opensource/emqx_modules/etc/emqx_modules.conf index 1bb8bf6d7..218d86234 100644 --- a/lib-opensource/emqx_modules/etc/emqx_modules.conf +++ b/lib-opensource/emqx_modules/etc/emqx_modules.conf @@ -1 +1,57 @@ -# empty +##-------------------------------------------------------------------- +## Modules +##-------------------------------------------------------------------- +## The file to store loaded module names. +## +## Value: File +modules.loaded_file = {{ platform_data_dir }}/loaded_modules + +##-------------------------------------------------------------------- +## Presence Module + +## Sets the QoS for presence MQTT message. +## +## Value: 0 | 1 | 2 +module.presence.qos = 1 + +##-------------------------------------------------------------------- +## Subscription Module + +## Subscribe the Topics automatically when client connected. +## +## Value: String +## module.subscription.1.topic = connected/%c/%u + +## Qos of the proxy subscription. +## +## Value: 0 | 1 | 2 +## Default: 0 +## module.subscription.1.qos = 0 + +## No Local of the proxy subscription options. +## This configuration only takes effect in the MQTT V5 protocol. +## +## Value: 0 | 1 +## Default: 0 +## module.subscription.1.nl = 0 + +## Retain As Published of the proxy subscription options. +## This configuration only takes effect in the MQTT V5 protocol. +## +## Value: 0 | 1 +## Default: 0 +## module.subscription.1.rap = 0 + +## Retain Handling of the proxy subscription options. +## This configuration only takes effect in the MQTT V5 protocol. +## +## Value: 0 | 1 | 2 +## Default: 0 +## module.subscription.1.rh = 0 + +##-------------------------------------------------------------------- +## Rewrite Module + +## {rewrite, Topic, Re, Dest} +## module.rewrite.pub.rule.1 = x/# ^x/y/(.+)$ z/y/$1 +## module.rewrite.sub.rule.1 = y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2 diff --git a/lib-opensource/emqx_modules/priv/emqx_modules.schema b/lib-opensource/emqx_modules/priv/emqx_modules.schema index d7c52c644..55606754c 100644 --- a/lib-opensource/emqx_modules/priv/emqx_modules.schema +++ b/lib-opensource/emqx_modules/priv/emqx_modules.schema @@ -1 +1,89 @@ -% empty +%%-------------------------------------------------------------------- +%% Modules +%%-------------------------------------------------------------------- + +{mapping, "modules.loaded_file", "emqx_modules.modules_loaded_file", [ + {datatype, string} +]}. + +{mapping, "module.presence.qos", "emqx_modules.modules", [ + {default, 1}, + {datatype, integer}, + {validators, ["range:0-2"]} +]}. + +{mapping, "module.subscription.$id.topic", "emqx_modules.modules", [ + {datatype, string} +]}. + +{mapping, "module.subscription.$id.qos", "emqx_modules.modules", [ + {default, 1}, + {datatype, integer}, + {validators, ["range:0-2"]} +]}. + +{mapping, "module.subscription.$id.nl", "emqx_modules.modules", [ + {default, 0}, + {datatype, integer}, + {validators, ["range:0-1"]} +]}. + +{mapping, "module.subscription.$id.rap", "emqx_modules.modules", [ + {default, 0}, + {datatype, integer}, + {validators, ["range:0-1"]} +]}. + +{mapping, "module.subscription.$id.rh", "emqx_modules.modules", [ + {default, 0}, + {datatype, integer}, + {validators, ["range:0-2"]} +]}. + +{mapping, "module.rewrite.rule.$id", "emqx_modules.modules", [ + {datatype, string} +]}. + +{mapping, "module.rewrite.pub.rule.$id", "emqx_modules.modules", [ + {datatype, string} +]}. + +{mapping, "module.rewrite.sub.rule.$id", "emqx_modules.modules", [ + {datatype, string} +]}. + +{translation, "emqx_modules.modules", fun(Conf, _, Conf1) -> + Subscriptions = fun() -> + List = cuttlefish_variable:filter_by_prefix("module.subscription", Conf), + TopicList = [{N, Topic}|| {[_,"subscription",N,"topic"], Topic} <- List], + [{iolist_to_binary(T), #{ qos => cuttlefish:conf_get("module.subscription." ++ N ++ ".qos", Conf, 0), + nl => cuttlefish:conf_get("module.subscription." ++ N ++ ".nl", Conf, 0), + rap => cuttlefish:conf_get("module.subscription." ++ N ++ ".rap", Conf, 0), + rh => cuttlefish:conf_get("module.subscription." ++ N ++ ".rh", Conf, 0) + }} || {N, T} <- TopicList] + end, + Rewrites = fun() -> + Rules = cuttlefish_variable:filter_by_prefix("module.rewrite.rule", Conf), + PubRules = cuttlefish_variable:filter_by_prefix("module.rewrite.pub.rule", Conf), + SubRules = cuttlefish_variable:filter_by_prefix("module.rewrite.sub.rule", Conf), + TotalRules = lists:append( + [ {["module", "rewrite", "pub", "rule", I], Rule} || {["module", "rewrite", "rule", I], Rule} <- Rules] ++ PubRules, + [ {["module", "rewrite", "sub", "rule", I], Rule} || {["module", "rewrite", "rule", I], Rule} <- Rules] ++ SubRules + ), + lists:map(fun({[_, "rewrite", PubOrSub, "rule", I], Rule}) -> + [Topic, Re, Dest] = string:tokens(Rule, " "), + {rewrite, list_to_atom(PubOrSub), list_to_binary(Topic), list_to_binary(Re), list_to_binary(Dest)} + end, TotalRules) + end, + lists:append([ + [{emqx_mod_presence, [{qos, cuttlefish:conf_get("module.presence.qos", Conf, 1)}]}], + [{emqx_mod_subscription, Subscriptions()}], + [{emqx_mod_rewrite, Rewrites()}], + [{emqx_mod_topic_metrics, []}], + [{emqx_mod_delayed, []}], + %% TODO: acl_file config should be moved to emqx_modules.conf + %% when all the plubin tests stops using it in the old way. + [{emqx_mod_acl_internal, [{acl_file, {emqx, get_env, [acl_file]}}]}] + %[{emqx_mod_acl_internal, [{acl_file, cuttlefish:conf_get("acl_file", Conf1)}]}] + ]) +end}. diff --git a/lib-opensource/emqx_modules/src/emqx_mod_acl_internal.erl b/lib-opensource/emqx_modules/src/emqx_mod_acl_internal.erl index 1701de69d..2d347151d 100644 --- a/lib-opensource/emqx_modules/src/emqx_mod_acl_internal.erl +++ b/lib-opensource/emqx_modules/src/emqx_mod_acl_internal.erl @@ -43,7 +43,13 @@ %%-------------------------------------------------------------------- load(Env) -> - Rules = rules_from_file(proplists:get_value(acl_file, Env)), + %% TODO: acl_file config should be moved to emqx_modules.conf + %% when all the plubin tests stops using it in the old way. + File = case proplists:get_value(acl_file, Env) of + {emqx, get_env, _} -> emqx:get_env(acl_file); + F -> F + end, + Rules = rules_from_file(File), emqx_hooks:add('client.check_acl', {?MODULE, check_acl, [Rules]}, -1). unload(_Env) -> diff --git a/lib-opensource/emqx_modules/src/emqx_modules.erl b/lib-opensource/emqx_modules/src/emqx_modules.erl index ffcc456f0..18c6fb856 100644 --- a/lib-opensource/emqx_modules/src/emqx_modules.erl +++ b/lib-opensource/emqx_modules/src/emqx_modules.erl @@ -30,6 +30,8 @@ , load_module/2 ]). +-define(APP, ?MODULE). + %% @doc List all available plugins -spec(list() -> [{atom(), boolean()}]). list() -> @@ -38,7 +40,7 @@ list() -> %% @doc Load all the extended modules. -spec(load() -> ok). load() -> - case emqx:get_env(modules_loaded_file) of + case get_env(modules_loaded_file) of undefined -> ok; File -> load_modules(File) @@ -59,7 +61,7 @@ load(ModuleName) -> %% @doc Unload all the extended modules. -spec(unload() -> ok). unload() -> - case emqx:get_env(modules_loaded_file) of + case get_env(modules_loaded_file) of undefined -> ignore; File -> unload_modules(File) @@ -79,7 +81,7 @@ unload(ModuleName) -> -spec(reload(module()) -> ok | ignore | {error, any()}). reload(emqx_mod_acl_internal) -> - Modules = emqx:get_env(modules, []), + Modules = get_env(modules, []), Env = proplists:get_value(emqx_mod_acl_internal, Modules, undefined), case emqx_mod_acl_internal:reload(Env) of ok -> @@ -96,7 +98,7 @@ find_module(ModuleName) -> ets:lookup(?MODULE, ModuleName). filter_module(ModuleNames) -> - filter_module(ModuleNames, emqx:get_env(modules, [])). + filter_module(ModuleNames, get_env(modules, [])). filter_module([], Acc) -> Acc; filter_module([{ModuleName, true} | ModuleNames], Acc) -> @@ -123,7 +125,7 @@ load_module(ModuleName) -> load_module({ModuleName, true}). load_module(ModuleName, Persistent) -> - Modules = emqx:get_env(modules, []), + Modules = get_env(modules, []), Env = proplists:get_value(ModuleName, Modules, undefined), case ModuleName:load(Env) of ok -> @@ -150,7 +152,7 @@ unload_module(ModuleName) -> unload_module({ModuleName, true}). unload_module(ModuleName, Persistent) -> - Modules = emqx:get_env(modules, []), + Modules = get_env(modules, []), Env = proplists:get_value(ModuleName, Modules, undefined), case ModuleName:unload(Env) of ok -> @@ -162,7 +164,7 @@ unload_module(ModuleName, Persistent) -> end. write_loaded(true) -> - FilePath = emqx:get_env(modules_loaded_file), + FilePath = get_env(modules_loaded_file), case file:write_file(FilePath, [io_lib:format("~p.~n", [Name]) || Name <- list()]) of ok -> ok; {error, Error} -> @@ -170,3 +172,7 @@ write_loaded(true) -> ok end; write_loaded(false) -> ok. + +get_env(Key) -> get_env(Key, undefined). + +get_env(Key, Default) -> application:get_env(?APP, Key, Default). diff --git a/lib-opensource/emqx_modules/src/emqx_modules_app.erl b/lib-opensource/emqx_modules/src/emqx_modules_app.erl index 832a39c8e..9eeb0bd33 100644 --- a/lib-opensource/emqx_modules/src/emqx_modules_app.erl +++ b/lib-opensource/emqx_modules/src/emqx_modules_app.erl @@ -24,13 +24,28 @@ -export([stop/1]). +-define(APP, emqx_modules). + start(_Type, _Args) -> % the configs for emqx_modules is so far still in emqx application % Ensure it's loaded application:load(emqx), + ok = load_app_env(), {ok, Pid} = emqx_mod_sup:start_link(), ok = emqx_modules:load(), {ok, Pid}. stop(_State) -> emqx_modules:unload(). + +load_app_env() -> + Schema = filename:join([code:priv_dir(?APP), "emqx_modules.schema"]), + Conf1 = filename:join([code:lib_dir(?APP), "etc", "emqx_modules.conf"]), + Conf2 = filename:join([emqx:get_env(plugins_etc_dir), "emqx_modules.conf"]), + [ConfFile | _] = lists:filter(fun filelib:is_regular/1, [Conf1, Conf2]), + Conf = cuttlefish_conf:file(ConfFile), + AppEnv = cuttlefish_generator:map(cuttlefish_schema:files([Schema]), Conf), + lists:foreach(fun({AppName, Envs}) -> + [application:set_env(AppName, Par, Val) || {Par, Val} <- Envs] + end, AppEnv). + diff --git a/lib-opensource/emqx_modules/test/emqx_modules_SUITE.erl b/lib-opensource/emqx_modules/test/emqx_modules_SUITE.erl index b0ff65758..ffbf5e32f 100644 --- a/lib-opensource/emqx_modules/test/emqx_modules_SUITE.erl +++ b/lib-opensource/emqx_modules/test/emqx_modules_SUITE.erl @@ -24,20 +24,20 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> - emqx_ct_helpers:start_apps([emqx_modules], fun set_sepecial_cfg/1), + emqx_ct_helpers:start_apps([emqx_modules]), + File = emqx_ct_helpers:deps_path(emqx_modules, "test/emqx_modules_SUITE_data/loaded_modules"), + application:set_env(emqx_modules, modules_loaded_file, File), + ok = emqx_modules:unload(), + ok = emqx_modules:load(), Config. -set_sepecial_cfg(_) -> - application:set_env(emqx, modules_loaded_file, emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_modules")), - ok. - end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([emqx_modules]). t_load(_) -> ?assertEqual(ok, emqx_modules:unload()), ?assertEqual(ok, emqx_modules:load()), - ?assertEqual({error, not_found}, emqx_modules:load(not_existed_module)), + ?assertEqual({error, not_found}, emqx_modules:load(foo)), ?assertEqual({error, not_started}, emqx_modules:unload(emqx_mod_rewrite)), ?assertEqual(ignore, emqx_modules:reload(emqx_mod_rewrite)), ?assertEqual(ok, emqx_modules:reload(emqx_mod_acl_internal)). diff --git a/test/emqx_SUITE_data/loaded_modules b/lib-opensource/emqx_modules/test/emqx_modules_SUITE_data/loaded_modules similarity index 100% rename from test/emqx_SUITE_data/loaded_modules rename to lib-opensource/emqx_modules/test/emqx_modules_SUITE_data/loaded_modules diff --git a/priv/emqx.schema b/priv/emqx.schema index bdf8a053f..ea7bde6a5 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -2020,93 +2020,6 @@ end}. ++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)]) end}. -%%-------------------------------------------------------------------- -%% Modules -%%-------------------------------------------------------------------- - -{mapping, "modules.loaded_file", "emqx.modules_loaded_file", [ - {datatype, string} -]}. - -{mapping, "module.presence.qos", "emqx.modules", [ - {default, 1}, - {datatype, integer}, - {validators, ["range:0-2"]} -]}. - -{mapping, "module.subscription.$id.topic", "emqx.modules", [ - {datatype, string} -]}. - -{mapping, "module.subscription.$id.qos", "emqx.modules", [ - {default, 1}, - {datatype, integer}, - {validators, ["range:0-2"]} -]}. - -{mapping, "module.subscription.$id.nl", "emqx.modules", [ - {default, 0}, - {datatype, integer}, - {validators, ["range:0-1"]} -]}. - -{mapping, "module.subscription.$id.rap", "emqx.modules", [ - {default, 0}, - {datatype, integer}, - {validators, ["range:0-1"]} -]}. - -{mapping, "module.subscription.$id.rh", "emqx.modules", [ - {default, 0}, - {datatype, integer}, - {validators, ["range:0-2"]} -]}. - -{mapping, "module.rewrite.rule.$id", "emqx.modules", [ - {datatype, string} -]}. - -{mapping, "module.rewrite.pub.rule.$id", "emqx.modules", [ - {datatype, string} -]}. - -{mapping, "module.rewrite.sub.rule.$id", "emqx.modules", [ - {datatype, string} -]}. - -{translation, "emqx.modules", fun(Conf, _, Conf1) -> - Subscriptions = fun() -> - List = cuttlefish_variable:filter_by_prefix("module.subscription", Conf), - TopicList = [{N, Topic}|| {[_,"subscription",N,"topic"], Topic} <- List], - [{iolist_to_binary(T), #{ qos => cuttlefish:conf_get("module.subscription." ++ N ++ ".qos", Conf, 0), - nl => cuttlefish:conf_get("module.subscription." ++ N ++ ".nl", Conf, 0), - rap => cuttlefish:conf_get("module.subscription." ++ N ++ ".rap", Conf, 0), - rh => cuttlefish:conf_get("module.subscription." ++ N ++ ".rh", Conf, 0) - }} || {N, T} <- TopicList] - end, - Rewrites = fun() -> - Rules = cuttlefish_variable:filter_by_prefix("module.rewrite.rule", Conf), - PubRules = cuttlefish_variable:filter_by_prefix("module.rewrite.pub.rule", Conf), - SubRules = cuttlefish_variable:filter_by_prefix("module.rewrite.sub.rule", Conf), - TotalRules = lists:append( - [ {["module", "rewrite", "pub", "rule", I], Rule} || {["module", "rewrite", "rule", I], Rule} <- Rules] ++ PubRules, - [ {["module", "rewrite", "sub", "rule", I], Rule} || {["module", "rewrite", "rule", I], Rule} <- Rules] ++ SubRules - ), - lists:map(fun({[_, "rewrite", PubOrSub, "rule", I], Rule}) -> - [Topic, Re, Dest] = string:tokens(Rule, " "), - {rewrite, list_to_atom(PubOrSub), list_to_binary(Topic), list_to_binary(Re), list_to_binary(Dest)} - end, TotalRules) - end, - lists:append([ - [{emqx_mod_presence, [{qos, cuttlefish:conf_get("module.presence.qos", Conf, 1)}]}], - [{emqx_mod_subscription, Subscriptions()}], - [{emqx_mod_rewrite, Rewrites()}], - [{emqx_mod_topic_metrics, []}], - [{emqx_mod_delayed, []}], - [{emqx_mod_acl_internal, [{acl_file, cuttlefish:conf_get("acl_file", Conf1)}]}] - ]) -end}. - %%------------------------------------------------------------------- %% Plugins %%------------------------------------------------------------------- diff --git a/test/emqx_acl_cache_SUITE.erl b/test/emqx_acl_cache_SUITE.erl index 6d314598c..8c7685aa2 100644 --- a/test/emqx_acl_cache_SUITE.erl +++ b/test/emqx_acl_cache_SUITE.erl @@ -56,13 +56,14 @@ t_clean_acl_cache(_) -> emqtt:stop(Client). % optimize?? -t_reload_aclfile_and_cleanall(Config) -> +t_reload_aclfile_and_cleanall(_Config) -> RasieMsg = fun() -> Self = self(), #{puback => fun(Msg) -> Self ! {puback, Msg} end, disconnected => fun(_) -> ok end, publish => fun(_) -> ok end } end, - {ok, Client} = emqtt:start_link([{clientid, <<"emqx_c">>}, {proto_ver, v5}, {msg_handler, RasieMsg()}]), + {ok, Client} = emqtt:start_link([{clientid, <<"emqx_c">>}, {proto_ver, v5}, + {msg_handler, RasieMsg()}]), {ok, _} = emqtt:connect(Client), {ok, PktId} = emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, qos1), @@ -78,27 +79,6 @@ t_reload_aclfile_and_cleanall(Config) -> %% Check acl cache list [ClientPid] = emqx_cm:lookup_channels(<<"emqx_c">>), ?assert(length(gen_server:call(ClientPid, list_acl_cache)) > 0), - - %% Update acl file and reload mod_acl_internal - Path = filename:join([testdir(proplists:get_value(data_dir, Config)), "acl2.conf"]), - ok = file:write_file(Path, <<"{deny, all}.">>), - OldPath = emqx:get_env(acl_file), - % application:set_env(emqx, acl_file, Path), - emqx_mod_acl_internal:reload([{acl_file, Path}]), - - ?assert(length(gen_server:call(ClientPid, list_acl_cache)) == 0), - {ok, PktId2} = emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, qos1), - - receive - {puback, #{packet_id := PktId2, reason_code := Rc2}} -> - %% Not authorized - ?assertEqual(16#87, Rc2); - _ -> - ?assert(false) - end, - application:set_env(emqx, acl_file, OldPath), - file:delete(Path), - emqx_mod_acl_internal:reload([{acl_file, OldPath}]), emqtt:stop(Client). %% @private diff --git a/test/mqtt_protocol_v5_SUITE.erl b/test/mqtt_protocol_v5_SUITE.erl index fd798e759..f4d92fe8b 100644 --- a/test/mqtt_protocol_v5_SUITE.erl +++ b/test/mqtt_protocol_v5_SUITE.erl @@ -111,11 +111,13 @@ t_basic_test(_) -> t_connect_clean_start(_) -> process_flag(trap_exit, true), - {ok, Client1} = emqtt:start_link([{clientid, <<"t_connect_clean_start">>},{proto_ver, v5},{clean_start, true}]), + {ok, Client1} = emqtt:start_link([{clientid, <<"t_connect_clean_start">>}, + {proto_ver, v5},{clean_start, true}]), {ok, _} = emqtt:connect(Client1), ?assertEqual(0, client_info(session_present, Client1)), %% [MQTT-3.1.2-4] ok = emqtt:pause(Client1), - {ok, Client2} = emqtt:start_link([{clientid, <<"t_connect_clean_start">>},{proto_ver, v5},{clean_start, false}]), + {ok, Client2} = emqtt:start_link([{clientid, <<"t_connect_clean_start">>}, + {proto_ver, v5},{clean_start, false}]), {ok, _} = emqtt:connect(Client2), ?assertEqual(1, client_info(session_present, Client2)), %% [MQTT-3.1.2-5] ?assertEqual(142, receive_disconnect_reasoncode()), @@ -124,7 +126,8 @@ t_connect_clean_start(_) -> ok = emqtt:disconnect(Client2), waiting_client_process_exit(Client2), - {ok, Client3} = emqtt:start_link([{clientid, <<"new_client">>},{proto_ver, v5},{clean_start, false}]), + {ok, Client3} = emqtt:start_link([{clientid, <<"new_client">>}, + {proto_ver, v5},{clean_start, false}]), {ok, _} = emqtt:connect(Client3), ?assertEqual(0, client_info(session_present, Client3)), %% [MQTT-3.1.2-6] ok = emqtt:disconnect(Client3), @@ -145,7 +148,8 @@ t_connect_will_message(_) -> ]), {ok, _} = emqtt:connect(Client1), [ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client1)), - ?assertNotEqual(undefined, maps:find(will_msg, emqx_connection:info(sys:get_state(ClientPid)))), %% [MQTT-3.1.2-7] + Info = emqx_connection:info(sys:get_state(ClientPid)), + ?assertNotEqual(undefined, maps:find(will_msg, Info)), %% [MQTT-3.1.2-7] {ok, Client2} = emqtt:start_link([{proto_ver, v5}]), {ok, _} = emqtt:connect(Client2), @@ -179,10 +183,7 @@ t_batch_subscribe(_) -> {ok, Client} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"batch_test">>}]), {ok, _} = emqtt:connect(Client), application:set_env(emqx, enable_acl_cache, false), - TempAcl = emqx_ct_helpers:deps_path(emqx, "test/emqx_access_SUITE_data/acl_temp.conf"), - file:write_file(TempAcl, "{deny, {client, \"batch_test\"}, subscribe, [\"t1\", \"t2\", \"t3\"]}.\n"), - timer:sleep(10), - emqx_mod_acl_internal:reload([{acl_file, TempAcl}]), + application:set_env(emqx, acl_nomatch, deny), {ok, _, [?RC_NOT_AUTHORIZED, ?RC_NOT_AUTHORIZED, ?RC_NOT_AUTHORIZED]} = emqtt:subscribe(Client, [{<<"t1">>, qos1}, @@ -193,7 +194,7 @@ t_batch_subscribe(_) -> ?RC_NO_SUBSCRIPTION_EXISTED]} = emqtt:unsubscribe(Client, [<<"t1">>, <<"t2">>, <<"t3">>]), - file:delete(TempAcl), + application:set_env(emqx, acl_nomatch, allow), emqtt:disconnect(Client). t_connect_will_retain(_) -> @@ -261,9 +262,10 @@ t_connect_limit_timeout(_) -> [ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)), ?assertEqual(undefined, emqx_connection:info(limit_timer, sys:get_state(ClientPid))), - ok = emqtt:publish(Client, Topic, <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 0), - ok = emqtt:publish(Client, Topic, <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 0), - ok = emqtt:publish(Client, Topic, <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 0), + Payload = <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, + ok = emqtt:publish(Client, Topic, Payload, 0), + ok = emqtt:publish(Client, Topic, Payload, 0), + ok = emqtt:publish(Client, Topic, Payload, 0), timer:sleep(200), ?assert(is_reference(emqx_connection:info(limit_timer, sys:get_state(ClientPid)))), @@ -523,7 +525,8 @@ t_publish_rap(_) -> {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, _} = emqtt:connect(Client1), {ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{Topic, [{rap, true}, {qos, 2}]}]), - {ok, _} = emqtt:publish(Client1, Topic, #{}, <<"retained message">>, [{qos, ?QOS_1}, {retain, true}]), + {ok, _} = emqtt:publish(Client1, Topic, #{}, <<"retained message">>, + [{qos, ?QOS_1}, {retain, true}]), [Msg1 | _] = receive_messages(1), ?assertEqual(true, maps:get(retain, Msg1)), %% [MQTT-3.3.1-12] ok = emqtt:disconnect(Client1), @@ -531,7 +534,8 @@ t_publish_rap(_) -> {ok, Client2} = emqtt:start_link([{proto_ver, v5}]), {ok, _} = emqtt:connect(Client2), {ok, _, [2]} = emqtt:subscribe(Client2, #{}, [{Topic, [{rap, false}, {qos, 2}]}]), - {ok, _} = emqtt:publish(Client2, Topic, #{}, <<"retained message">>, [{qos, ?QOS_1}, {retain, true}]), + {ok, _} = emqtt:publish(Client2, Topic, #{}, <<"retained message">>, + [{qos, ?QOS_1}, {retain, true}]), [Msg2 | _] = receive_messages(1), ?assertEqual(false, maps:get(retain, Msg2)), %% [MQTT-3.3.1-13] ok = emqtt:disconnect(Client2), @@ -575,8 +579,10 @@ t_publish_topic_alias(_) -> {ok, Client2} = emqtt:start_link([{proto_ver, v5}]), {ok, _} = emqtt:connect(Client2), {ok, _, [2]} = emqtt:subscribe(Client2, Topic, qos2), - ok = emqtt:publish(Client2, Topic, #{'Topic-Alias' => 233}, <<"Topic-Alias">>, [{qos, ?QOS_0}]), - ok = emqtt:publish(Client2, <<"">>, #{'Topic-Alias' => 233}, <<"Topic-Alias">>, [{qos, ?QOS_0}]), + ok = emqtt:publish(Client2, Topic, #{'Topic-Alias' => 233}, + <<"Topic-Alias">>, [{qos, ?QOS_0}]), + ok = emqtt:publish(Client2, <<"">>, #{'Topic-Alias' => 233}, + <<"Topic-Alias">>, [{qos, ?QOS_0}]), ?assertEqual(2, length(receive_messages(2))), %% [MQTT-3.3.2-12] ok = emqtt:disconnect(Client2), waiting_client_process_exit(Client2), @@ -589,7 +595,8 @@ t_publish_response_topic(_) -> {ok, Client1} = emqtt:start_link([{proto_ver, v5}]), {ok, _} = emqtt:connect(Client1), - ok = emqtt:publish(Client1, Topic, #{'Response-Topic' => nth(1, ?WILD_TOPICS)}, <<"Response-Topic">>, [{qos, ?QOS_0}]), + ok = emqtt:publish(Client1, Topic, #{'Response-Topic' => nth(1, ?WILD_TOPICS)}, + <<"Response-Topic">>, [{qos, ?QOS_0}]), ?assertEqual(130, receive_disconnect_reasoncode()), %% [MQTT-3.3.2-14] waiting_client_process_exit(Client1), @@ -620,7 +627,8 @@ t_publish_overlapping_subscriptions(_) -> {ok, _} = emqtt:connect(Client1), {ok, _, [1]} = emqtt:subscribe(Client1, Properties, nth(1, ?WILD_TOPICS), qos1), {ok, _, [0]} = emqtt:subscribe(Client1, Properties, nth(3, ?WILD_TOPICS), qos0), - {ok, _} = emqtt:publish(Client1, Topic, #{}, <<"t_publish_overlapping_subscriptions">>, [{qos, ?QOS_2}]), + {ok, _} = emqtt:publish(Client1, Topic, #{}, + <<"t_publish_overlapping_subscriptions">>, [{qos, ?QOS_2}]), [Msg1 | _ ] = receive_messages(2), ?assert( maps:get(qos, Msg1) < 2 ), %% [MQTT-3.3.4-2] @@ -684,8 +692,9 @@ t_subscribe_actions(_) -> {ok, _} = emqtt:publish(Client1, Topic, <<"t_subscribe_actions">>, 2), [Msg1 | _ ] = receive_messages(1), ?assertEqual(1, maps:get(qos, Msg1)), %% [MQTT-3.8.4-3] [MQTT-3.8.4-8] - - {ok, _, [2,2]} = emqtt:subscribe(Client1, [{nth(1, ?TOPICS), qos2}, {nth(2, ?TOPICS), qos2}] ), %% [MQTT-3.8.4-5] [MQTT-3.8.4-6] [MQTT-3.8.4-7] + %% [MQTT-3.8.4-5] [MQTT-3.8.4-6] [MQTT-3.8.4-7] + {ok, _, [2,2]} = emqtt:subscribe(Client1, [{nth(1, ?TOPICS), qos2}, + {nth(2, ?TOPICS), qos2}] ), ok = emqtt:disconnect(Client1). %%-------------------------------------------------------------------- %% Unsubsctibe Unsuback @@ -702,7 +711,8 @@ t_unscbsctibe(_) -> {ok, _, [17]} = emqtt:unsubscribe(Client1, <<"noExistTopic">>), %% [MQTT-3.10.4-5] {ok, _, [2, 2]} = emqtt:subscribe(Client1, [{Topic1, qos2}, {Topic2, qos2}]), - {ok, _, [0, 0, 17]} = emqtt:unsubscribe(Client1, [Topic1, Topic2, <<"noExistTopic">>]), %% [[MQTT-3.10.4-6]] [MQTT-3.11.3-1] [MQTT-3.11.3-2] + %% [[MQTT-3.10.4-6]] [MQTT-3.11.3-1] [MQTT-3.11.3-2] + {ok, _, [0, 0, 17]} = emqtt:unsubscribe(Client1, [Topic1, Topic2, <<"noExistTopic">>]), ok = emqtt:disconnect(Client1). %%-------------------------------------------------------------------- @@ -748,7 +758,8 @@ t_shared_subscriptions_client_terminates_when_qos_eq_2(_) -> {ok, Pub} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"pub_client">>}]), {ok, _} = emqtt:connect(Pub), - {ok, _} = emqtt:publish(Pub, Topic, <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 2), + {ok, _} = emqtt:publish(Pub, Topic, + <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 2), receive {'EXIT', _,{shutdown, for_testiong}} ->