Merge pull request #4169 from zmstone/refactor-move-modules-config-to-emqx-modules-app
refactor(emqx_modules): Move modules config to app dir
This commit is contained in:
commit
514c2c0af5
|
@ -62,7 +62,6 @@ do_teardown(_) ->
|
||||||
|
|
||||||
set_special_cfgs(_) ->
|
set_special_cfgs(_) ->
|
||||||
application:set_env(emqx, plugins_loaded_file, undefined),
|
application:set_env(emqx, plugins_loaded_file, undefined),
|
||||||
application:set_env(emqx, modules_loaded_file, undefined),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
assert_confs([{"web.hook.api.url", Url}|More], Envs) ->
|
assert_confs([{"web.hook.api.url", Url}|More], Envs) ->
|
||||||
|
|
|
@ -2027,64 +2027,6 @@ listener.wss.external.allow_origin_absence = true
|
||||||
## Value: http://url eg. https://localhost:8084, https://127.0.0.1:8084
|
## Value: http://url eg. https://localhost:8084, https://127.0.0.1:8084
|
||||||
listener.wss.external.check_origins = https://localhost:8084, https://127.0.0.1:8084
|
listener.wss.external.check_origins = https://localhost:8084, https://127.0.0.1:8084
|
||||||
|
|
||||||
##--------------------------------------------------------------------
|
|
||||||
## Modules
|
|
||||||
##--------------------------------------------------------------------
|
|
||||||
## The file to store loaded module names.
|
|
||||||
##
|
|
||||||
## Value: File
|
|
||||||
modules.loaded_file = {{ platform_data_dir }}/loaded_modules
|
|
||||||
|
|
||||||
##--------------------------------------------------------------------
|
|
||||||
## Presence Module
|
|
||||||
|
|
||||||
## Sets the QoS for presence MQTT message.
|
|
||||||
##
|
|
||||||
## Value: 0 | 1 | 2
|
|
||||||
module.presence.qos = 1
|
|
||||||
|
|
||||||
##--------------------------------------------------------------------
|
|
||||||
## Subscription Module
|
|
||||||
|
|
||||||
## Subscribe the Topics automatically when client connected.
|
|
||||||
##
|
|
||||||
## Value: String
|
|
||||||
## module.subscription.1.topic = connected/%c/%u
|
|
||||||
|
|
||||||
## Qos of the proxy subscription.
|
|
||||||
##
|
|
||||||
## Value: 0 | 1 | 2
|
|
||||||
## Default: 0
|
|
||||||
## module.subscription.1.qos = 0
|
|
||||||
|
|
||||||
## No Local of the proxy subscription options.
|
|
||||||
## This configuration only takes effect in the MQTT V5 protocol.
|
|
||||||
##
|
|
||||||
## Value: 0 | 1
|
|
||||||
## Default: 0
|
|
||||||
## module.subscription.1.nl = 0
|
|
||||||
|
|
||||||
## Retain As Published of the proxy subscription options.
|
|
||||||
## This configuration only takes effect in the MQTT V5 protocol.
|
|
||||||
##
|
|
||||||
## Value: 0 | 1
|
|
||||||
## Default: 0
|
|
||||||
## module.subscription.1.rap = 0
|
|
||||||
|
|
||||||
## Retain Handling of the proxy subscription options.
|
|
||||||
## This configuration only takes effect in the MQTT V5 protocol.
|
|
||||||
##
|
|
||||||
## Value: 0 | 1 | 2
|
|
||||||
## Default: 0
|
|
||||||
## module.subscription.1.rh = 0
|
|
||||||
|
|
||||||
##--------------------------------------------------------------------
|
|
||||||
## Rewrite Module
|
|
||||||
|
|
||||||
## {rewrite, Topic, Re, Dest}
|
|
||||||
## module.rewrite.pub.rule.1 = x/# ^x/y/(.+)$ z/y/$1
|
|
||||||
## module.rewrite.sub.rule.1 = y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2
|
|
||||||
|
|
||||||
##-------------------------------------------------------------------
|
##-------------------------------------------------------------------
|
||||||
## Plugins
|
## Plugins
|
||||||
##-------------------------------------------------------------------
|
##-------------------------------------------------------------------
|
||||||
|
|
|
@ -1 +1,57 @@
|
||||||
# empty
|
##--------------------------------------------------------------------
|
||||||
|
## Modules
|
||||||
|
##--------------------------------------------------------------------
|
||||||
|
## The file to store loaded module names.
|
||||||
|
##
|
||||||
|
## Value: File
|
||||||
|
modules.loaded_file = {{ platform_data_dir }}/loaded_modules
|
||||||
|
|
||||||
|
##--------------------------------------------------------------------
|
||||||
|
## Presence Module
|
||||||
|
|
||||||
|
## Sets the QoS for presence MQTT message.
|
||||||
|
##
|
||||||
|
## Value: 0 | 1 | 2
|
||||||
|
module.presence.qos = 1
|
||||||
|
|
||||||
|
##--------------------------------------------------------------------
|
||||||
|
## Subscription Module
|
||||||
|
|
||||||
|
## Subscribe the Topics automatically when client connected.
|
||||||
|
##
|
||||||
|
## Value: String
|
||||||
|
## module.subscription.1.topic = connected/%c/%u
|
||||||
|
|
||||||
|
## Qos of the proxy subscription.
|
||||||
|
##
|
||||||
|
## Value: 0 | 1 | 2
|
||||||
|
## Default: 0
|
||||||
|
## module.subscription.1.qos = 0
|
||||||
|
|
||||||
|
## No Local of the proxy subscription options.
|
||||||
|
## This configuration only takes effect in the MQTT V5 protocol.
|
||||||
|
##
|
||||||
|
## Value: 0 | 1
|
||||||
|
## Default: 0
|
||||||
|
## module.subscription.1.nl = 0
|
||||||
|
|
||||||
|
## Retain As Published of the proxy subscription options.
|
||||||
|
## This configuration only takes effect in the MQTT V5 protocol.
|
||||||
|
##
|
||||||
|
## Value: 0 | 1
|
||||||
|
## Default: 0
|
||||||
|
## module.subscription.1.rap = 0
|
||||||
|
|
||||||
|
## Retain Handling of the proxy subscription options.
|
||||||
|
## This configuration only takes effect in the MQTT V5 protocol.
|
||||||
|
##
|
||||||
|
## Value: 0 | 1 | 2
|
||||||
|
## Default: 0
|
||||||
|
## module.subscription.1.rh = 0
|
||||||
|
|
||||||
|
##--------------------------------------------------------------------
|
||||||
|
## Rewrite Module
|
||||||
|
|
||||||
|
## {rewrite, Topic, Re, Dest}
|
||||||
|
## module.rewrite.pub.rule.1 = x/# ^x/y/(.+)$ z/y/$1
|
||||||
|
## module.rewrite.sub.rule.1 = y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2
|
||||||
|
|
|
@ -1 +1,89 @@
|
||||||
% empty
|
%%--------------------------------------------------------------------
|
||||||
|
%% Modules
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
{mapping, "modules.loaded_file", "emqx_modules.modules_loaded_file", [
|
||||||
|
{datatype, string}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "module.presence.qos", "emqx_modules.modules", [
|
||||||
|
{default, 1},
|
||||||
|
{datatype, integer},
|
||||||
|
{validators, ["range:0-2"]}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "module.subscription.$id.topic", "emqx_modules.modules", [
|
||||||
|
{datatype, string}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "module.subscription.$id.qos", "emqx_modules.modules", [
|
||||||
|
{default, 1},
|
||||||
|
{datatype, integer},
|
||||||
|
{validators, ["range:0-2"]}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "module.subscription.$id.nl", "emqx_modules.modules", [
|
||||||
|
{default, 0},
|
||||||
|
{datatype, integer},
|
||||||
|
{validators, ["range:0-1"]}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "module.subscription.$id.rap", "emqx_modules.modules", [
|
||||||
|
{default, 0},
|
||||||
|
{datatype, integer},
|
||||||
|
{validators, ["range:0-1"]}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "module.subscription.$id.rh", "emqx_modules.modules", [
|
||||||
|
{default, 0},
|
||||||
|
{datatype, integer},
|
||||||
|
{validators, ["range:0-2"]}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "module.rewrite.rule.$id", "emqx_modules.modules", [
|
||||||
|
{datatype, string}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "module.rewrite.pub.rule.$id", "emqx_modules.modules", [
|
||||||
|
{datatype, string}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{mapping, "module.rewrite.sub.rule.$id", "emqx_modules.modules", [
|
||||||
|
{datatype, string}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
{translation, "emqx_modules.modules", fun(Conf, _, Conf1) ->
|
||||||
|
Subscriptions = fun() ->
|
||||||
|
List = cuttlefish_variable:filter_by_prefix("module.subscription", Conf),
|
||||||
|
TopicList = [{N, Topic}|| {[_,"subscription",N,"topic"], Topic} <- List],
|
||||||
|
[{iolist_to_binary(T), #{ qos => cuttlefish:conf_get("module.subscription." ++ N ++ ".qos", Conf, 0),
|
||||||
|
nl => cuttlefish:conf_get("module.subscription." ++ N ++ ".nl", Conf, 0),
|
||||||
|
rap => cuttlefish:conf_get("module.subscription." ++ N ++ ".rap", Conf, 0),
|
||||||
|
rh => cuttlefish:conf_get("module.subscription." ++ N ++ ".rh", Conf, 0)
|
||||||
|
}} || {N, T} <- TopicList]
|
||||||
|
end,
|
||||||
|
Rewrites = fun() ->
|
||||||
|
Rules = cuttlefish_variable:filter_by_prefix("module.rewrite.rule", Conf),
|
||||||
|
PubRules = cuttlefish_variable:filter_by_prefix("module.rewrite.pub.rule", Conf),
|
||||||
|
SubRules = cuttlefish_variable:filter_by_prefix("module.rewrite.sub.rule", Conf),
|
||||||
|
TotalRules = lists:append(
|
||||||
|
[ {["module", "rewrite", "pub", "rule", I], Rule} || {["module", "rewrite", "rule", I], Rule} <- Rules] ++ PubRules,
|
||||||
|
[ {["module", "rewrite", "sub", "rule", I], Rule} || {["module", "rewrite", "rule", I], Rule} <- Rules] ++ SubRules
|
||||||
|
),
|
||||||
|
lists:map(fun({[_, "rewrite", PubOrSub, "rule", I], Rule}) ->
|
||||||
|
[Topic, Re, Dest] = string:tokens(Rule, " "),
|
||||||
|
{rewrite, list_to_atom(PubOrSub), list_to_binary(Topic), list_to_binary(Re), list_to_binary(Dest)}
|
||||||
|
end, TotalRules)
|
||||||
|
end,
|
||||||
|
lists:append([
|
||||||
|
[{emqx_mod_presence, [{qos, cuttlefish:conf_get("module.presence.qos", Conf, 1)}]}],
|
||||||
|
[{emqx_mod_subscription, Subscriptions()}],
|
||||||
|
[{emqx_mod_rewrite, Rewrites()}],
|
||||||
|
[{emqx_mod_topic_metrics, []}],
|
||||||
|
[{emqx_mod_delayed, []}],
|
||||||
|
%% TODO: acl_file config should be moved to emqx_modules.conf
|
||||||
|
%% when all the plubin tests stops using it in the old way.
|
||||||
|
[{emqx_mod_acl_internal, [{acl_file, {emqx, get_env, [acl_file]}}]}]
|
||||||
|
%[{emqx_mod_acl_internal, [{acl_file, cuttlefish:conf_get("acl_file", Conf1)}]}]
|
||||||
|
])
|
||||||
|
end}.
|
||||||
|
|
|
@ -43,7 +43,13 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
load(Env) ->
|
load(Env) ->
|
||||||
Rules = rules_from_file(proplists:get_value(acl_file, Env)),
|
%% TODO: acl_file config should be moved to emqx_modules.conf
|
||||||
|
%% when all the plubin tests stops using it in the old way.
|
||||||
|
File = case proplists:get_value(acl_file, Env) of
|
||||||
|
{emqx, get_env, _} -> emqx:get_env(acl_file);
|
||||||
|
F -> F
|
||||||
|
end,
|
||||||
|
Rules = rules_from_file(File),
|
||||||
emqx_hooks:add('client.check_acl', {?MODULE, check_acl, [Rules]}, -1).
|
emqx_hooks:add('client.check_acl', {?MODULE, check_acl, [Rules]}, -1).
|
||||||
|
|
||||||
unload(_Env) ->
|
unload(_Env) ->
|
||||||
|
|
|
@ -30,6 +30,8 @@
|
||||||
, load_module/2
|
, load_module/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-define(APP, ?MODULE).
|
||||||
|
|
||||||
%% @doc List all available plugins
|
%% @doc List all available plugins
|
||||||
-spec(list() -> [{atom(), boolean()}]).
|
-spec(list() -> [{atom(), boolean()}]).
|
||||||
list() ->
|
list() ->
|
||||||
|
@ -38,7 +40,7 @@ list() ->
|
||||||
%% @doc Load all the extended modules.
|
%% @doc Load all the extended modules.
|
||||||
-spec(load() -> ok).
|
-spec(load() -> ok).
|
||||||
load() ->
|
load() ->
|
||||||
case emqx:get_env(modules_loaded_file) of
|
case get_env(modules_loaded_file) of
|
||||||
undefined -> ok;
|
undefined -> ok;
|
||||||
File ->
|
File ->
|
||||||
load_modules(File)
|
load_modules(File)
|
||||||
|
@ -59,7 +61,7 @@ load(ModuleName) ->
|
||||||
%% @doc Unload all the extended modules.
|
%% @doc Unload all the extended modules.
|
||||||
-spec(unload() -> ok).
|
-spec(unload() -> ok).
|
||||||
unload() ->
|
unload() ->
|
||||||
case emqx:get_env(modules_loaded_file) of
|
case get_env(modules_loaded_file) of
|
||||||
undefined -> ignore;
|
undefined -> ignore;
|
||||||
File ->
|
File ->
|
||||||
unload_modules(File)
|
unload_modules(File)
|
||||||
|
@ -79,7 +81,7 @@ unload(ModuleName) ->
|
||||||
|
|
||||||
-spec(reload(module()) -> ok | ignore | {error, any()}).
|
-spec(reload(module()) -> ok | ignore | {error, any()}).
|
||||||
reload(emqx_mod_acl_internal) ->
|
reload(emqx_mod_acl_internal) ->
|
||||||
Modules = emqx:get_env(modules, []),
|
Modules = get_env(modules, []),
|
||||||
Env = proplists:get_value(emqx_mod_acl_internal, Modules, undefined),
|
Env = proplists:get_value(emqx_mod_acl_internal, Modules, undefined),
|
||||||
case emqx_mod_acl_internal:reload(Env) of
|
case emqx_mod_acl_internal:reload(Env) of
|
||||||
ok ->
|
ok ->
|
||||||
|
@ -96,7 +98,7 @@ find_module(ModuleName) ->
|
||||||
ets:lookup(?MODULE, ModuleName).
|
ets:lookup(?MODULE, ModuleName).
|
||||||
|
|
||||||
filter_module(ModuleNames) ->
|
filter_module(ModuleNames) ->
|
||||||
filter_module(ModuleNames, emqx:get_env(modules, [])).
|
filter_module(ModuleNames, get_env(modules, [])).
|
||||||
filter_module([], Acc) ->
|
filter_module([], Acc) ->
|
||||||
Acc;
|
Acc;
|
||||||
filter_module([{ModuleName, true} | ModuleNames], Acc) ->
|
filter_module([{ModuleName, true} | ModuleNames], Acc) ->
|
||||||
|
@ -123,7 +125,7 @@ load_module(ModuleName) ->
|
||||||
load_module({ModuleName, true}).
|
load_module({ModuleName, true}).
|
||||||
|
|
||||||
load_module(ModuleName, Persistent) ->
|
load_module(ModuleName, Persistent) ->
|
||||||
Modules = emqx:get_env(modules, []),
|
Modules = get_env(modules, []),
|
||||||
Env = proplists:get_value(ModuleName, Modules, undefined),
|
Env = proplists:get_value(ModuleName, Modules, undefined),
|
||||||
case ModuleName:load(Env) of
|
case ModuleName:load(Env) of
|
||||||
ok ->
|
ok ->
|
||||||
|
@ -150,7 +152,7 @@ unload_module(ModuleName) ->
|
||||||
unload_module({ModuleName, true}).
|
unload_module({ModuleName, true}).
|
||||||
|
|
||||||
unload_module(ModuleName, Persistent) ->
|
unload_module(ModuleName, Persistent) ->
|
||||||
Modules = emqx:get_env(modules, []),
|
Modules = get_env(modules, []),
|
||||||
Env = proplists:get_value(ModuleName, Modules, undefined),
|
Env = proplists:get_value(ModuleName, Modules, undefined),
|
||||||
case ModuleName:unload(Env) of
|
case ModuleName:unload(Env) of
|
||||||
ok ->
|
ok ->
|
||||||
|
@ -162,7 +164,7 @@ unload_module(ModuleName, Persistent) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
write_loaded(true) ->
|
write_loaded(true) ->
|
||||||
FilePath = emqx:get_env(modules_loaded_file),
|
FilePath = get_env(modules_loaded_file),
|
||||||
case file:write_file(FilePath, [io_lib:format("~p.~n", [Name]) || Name <- list()]) of
|
case file:write_file(FilePath, [io_lib:format("~p.~n", [Name]) || Name <- list()]) of
|
||||||
ok -> ok;
|
ok -> ok;
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
|
@ -170,3 +172,7 @@ write_loaded(true) ->
|
||||||
ok
|
ok
|
||||||
end;
|
end;
|
||||||
write_loaded(false) -> ok.
|
write_loaded(false) -> ok.
|
||||||
|
|
||||||
|
get_env(Key) -> get_env(Key, undefined).
|
||||||
|
|
||||||
|
get_env(Key, Default) -> application:get_env(?APP, Key, Default).
|
||||||
|
|
|
@ -24,13 +24,28 @@
|
||||||
|
|
||||||
-export([stop/1]).
|
-export([stop/1]).
|
||||||
|
|
||||||
|
-define(APP, emqx_modules).
|
||||||
|
|
||||||
start(_Type, _Args) ->
|
start(_Type, _Args) ->
|
||||||
% the configs for emqx_modules is so far still in emqx application
|
% the configs for emqx_modules is so far still in emqx application
|
||||||
% Ensure it's loaded
|
% Ensure it's loaded
|
||||||
application:load(emqx),
|
application:load(emqx),
|
||||||
|
ok = load_app_env(),
|
||||||
{ok, Pid} = emqx_mod_sup:start_link(),
|
{ok, Pid} = emqx_mod_sup:start_link(),
|
||||||
ok = emqx_modules:load(),
|
ok = emqx_modules:load(),
|
||||||
{ok, Pid}.
|
{ok, Pid}.
|
||||||
|
|
||||||
stop(_State) ->
|
stop(_State) ->
|
||||||
emqx_modules:unload().
|
emqx_modules:unload().
|
||||||
|
|
||||||
|
load_app_env() ->
|
||||||
|
Schema = filename:join([code:priv_dir(?APP), "emqx_modules.schema"]),
|
||||||
|
Conf1 = filename:join([code:lib_dir(?APP), "etc", "emqx_modules.conf"]),
|
||||||
|
Conf2 = filename:join([emqx:get_env(plugins_etc_dir), "emqx_modules.conf"]),
|
||||||
|
[ConfFile | _] = lists:filter(fun filelib:is_regular/1, [Conf1, Conf2]),
|
||||||
|
Conf = cuttlefish_conf:file(ConfFile),
|
||||||
|
AppEnv = cuttlefish_generator:map(cuttlefish_schema:files([Schema]), Conf),
|
||||||
|
lists:foreach(fun({AppName, Envs}) ->
|
||||||
|
[application:set_env(AppName, Par, Val) || {Par, Val} <- Envs]
|
||||||
|
end, AppEnv).
|
||||||
|
|
||||||
|
|
|
@ -24,20 +24,20 @@
|
||||||
all() -> emqx_ct:all(?MODULE).
|
all() -> emqx_ct:all(?MODULE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
emqx_ct_helpers:start_apps([emqx_modules], fun set_sepecial_cfg/1),
|
emqx_ct_helpers:start_apps([emqx_modules]),
|
||||||
|
File = emqx_ct_helpers:deps_path(emqx_modules, "test/emqx_modules_SUITE_data/loaded_modules"),
|
||||||
|
application:set_env(emqx_modules, modules_loaded_file, File),
|
||||||
|
ok = emqx_modules:unload(),
|
||||||
|
ok = emqx_modules:load(),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
set_sepecial_cfg(_) ->
|
|
||||||
application:set_env(emqx, modules_loaded_file, emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_modules")),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
emqx_ct_helpers:stop_apps([emqx_modules]).
|
emqx_ct_helpers:stop_apps([emqx_modules]).
|
||||||
|
|
||||||
t_load(_) ->
|
t_load(_) ->
|
||||||
?assertEqual(ok, emqx_modules:unload()),
|
?assertEqual(ok, emqx_modules:unload()),
|
||||||
?assertEqual(ok, emqx_modules:load()),
|
?assertEqual(ok, emqx_modules:load()),
|
||||||
?assertEqual({error, not_found}, emqx_modules:load(not_existed_module)),
|
?assertEqual({error, not_found}, emqx_modules:load(foo)),
|
||||||
?assertEqual({error, not_started}, emqx_modules:unload(emqx_mod_rewrite)),
|
?assertEqual({error, not_started}, emqx_modules:unload(emqx_mod_rewrite)),
|
||||||
?assertEqual(ignore, emqx_modules:reload(emqx_mod_rewrite)),
|
?assertEqual(ignore, emqx_modules:reload(emqx_mod_rewrite)),
|
||||||
?assertEqual(ok, emqx_modules:reload(emqx_mod_acl_internal)).
|
?assertEqual(ok, emqx_modules:reload(emqx_mod_acl_internal)).
|
||||||
|
|
|
@ -2020,93 +2020,6 @@ end}.
|
||||||
++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)])
|
++ cuttlefish_variable:filter_by_prefix("listener.wss", Conf)])
|
||||||
end}.
|
end}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Modules
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
{mapping, "modules.loaded_file", "emqx.modules_loaded_file", [
|
|
||||||
{datatype, string}
|
|
||||||
]}.
|
|
||||||
|
|
||||||
{mapping, "module.presence.qos", "emqx.modules", [
|
|
||||||
{default, 1},
|
|
||||||
{datatype, integer},
|
|
||||||
{validators, ["range:0-2"]}
|
|
||||||
]}.
|
|
||||||
|
|
||||||
{mapping, "module.subscription.$id.topic", "emqx.modules", [
|
|
||||||
{datatype, string}
|
|
||||||
]}.
|
|
||||||
|
|
||||||
{mapping, "module.subscription.$id.qos", "emqx.modules", [
|
|
||||||
{default, 1},
|
|
||||||
{datatype, integer},
|
|
||||||
{validators, ["range:0-2"]}
|
|
||||||
]}.
|
|
||||||
|
|
||||||
{mapping, "module.subscription.$id.nl", "emqx.modules", [
|
|
||||||
{default, 0},
|
|
||||||
{datatype, integer},
|
|
||||||
{validators, ["range:0-1"]}
|
|
||||||
]}.
|
|
||||||
|
|
||||||
{mapping, "module.subscription.$id.rap", "emqx.modules", [
|
|
||||||
{default, 0},
|
|
||||||
{datatype, integer},
|
|
||||||
{validators, ["range:0-1"]}
|
|
||||||
]}.
|
|
||||||
|
|
||||||
{mapping, "module.subscription.$id.rh", "emqx.modules", [
|
|
||||||
{default, 0},
|
|
||||||
{datatype, integer},
|
|
||||||
{validators, ["range:0-2"]}
|
|
||||||
]}.
|
|
||||||
|
|
||||||
{mapping, "module.rewrite.rule.$id", "emqx.modules", [
|
|
||||||
{datatype, string}
|
|
||||||
]}.
|
|
||||||
|
|
||||||
{mapping, "module.rewrite.pub.rule.$id", "emqx.modules", [
|
|
||||||
{datatype, string}
|
|
||||||
]}.
|
|
||||||
|
|
||||||
{mapping, "module.rewrite.sub.rule.$id", "emqx.modules", [
|
|
||||||
{datatype, string}
|
|
||||||
]}.
|
|
||||||
|
|
||||||
{translation, "emqx.modules", fun(Conf, _, Conf1) ->
|
|
||||||
Subscriptions = fun() ->
|
|
||||||
List = cuttlefish_variable:filter_by_prefix("module.subscription", Conf),
|
|
||||||
TopicList = [{N, Topic}|| {[_,"subscription",N,"topic"], Topic} <- List],
|
|
||||||
[{iolist_to_binary(T), #{ qos => cuttlefish:conf_get("module.subscription." ++ N ++ ".qos", Conf, 0),
|
|
||||||
nl => cuttlefish:conf_get("module.subscription." ++ N ++ ".nl", Conf, 0),
|
|
||||||
rap => cuttlefish:conf_get("module.subscription." ++ N ++ ".rap", Conf, 0),
|
|
||||||
rh => cuttlefish:conf_get("module.subscription." ++ N ++ ".rh", Conf, 0)
|
|
||||||
}} || {N, T} <- TopicList]
|
|
||||||
end,
|
|
||||||
Rewrites = fun() ->
|
|
||||||
Rules = cuttlefish_variable:filter_by_prefix("module.rewrite.rule", Conf),
|
|
||||||
PubRules = cuttlefish_variable:filter_by_prefix("module.rewrite.pub.rule", Conf),
|
|
||||||
SubRules = cuttlefish_variable:filter_by_prefix("module.rewrite.sub.rule", Conf),
|
|
||||||
TotalRules = lists:append(
|
|
||||||
[ {["module", "rewrite", "pub", "rule", I], Rule} || {["module", "rewrite", "rule", I], Rule} <- Rules] ++ PubRules,
|
|
||||||
[ {["module", "rewrite", "sub", "rule", I], Rule} || {["module", "rewrite", "rule", I], Rule} <- Rules] ++ SubRules
|
|
||||||
),
|
|
||||||
lists:map(fun({[_, "rewrite", PubOrSub, "rule", I], Rule}) ->
|
|
||||||
[Topic, Re, Dest] = string:tokens(Rule, " "),
|
|
||||||
{rewrite, list_to_atom(PubOrSub), list_to_binary(Topic), list_to_binary(Re), list_to_binary(Dest)}
|
|
||||||
end, TotalRules)
|
|
||||||
end,
|
|
||||||
lists:append([
|
|
||||||
[{emqx_mod_presence, [{qos, cuttlefish:conf_get("module.presence.qos", Conf, 1)}]}],
|
|
||||||
[{emqx_mod_subscription, Subscriptions()}],
|
|
||||||
[{emqx_mod_rewrite, Rewrites()}],
|
|
||||||
[{emqx_mod_topic_metrics, []}],
|
|
||||||
[{emqx_mod_delayed, []}],
|
|
||||||
[{emqx_mod_acl_internal, [{acl_file, cuttlefish:conf_get("acl_file", Conf1)}]}]
|
|
||||||
])
|
|
||||||
end}.
|
|
||||||
|
|
||||||
%%-------------------------------------------------------------------
|
%%-------------------------------------------------------------------
|
||||||
%% Plugins
|
%% Plugins
|
||||||
%%-------------------------------------------------------------------
|
%%-------------------------------------------------------------------
|
||||||
|
|
|
@ -56,13 +56,14 @@ t_clean_acl_cache(_) ->
|
||||||
emqtt:stop(Client).
|
emqtt:stop(Client).
|
||||||
|
|
||||||
% optimize??
|
% optimize??
|
||||||
t_reload_aclfile_and_cleanall(Config) ->
|
t_reload_aclfile_and_cleanall(_Config) ->
|
||||||
|
|
||||||
RasieMsg = fun() -> Self = self(), #{puback => fun(Msg) -> Self ! {puback, Msg} end,
|
RasieMsg = fun() -> Self = self(), #{puback => fun(Msg) -> Self ! {puback, Msg} end,
|
||||||
disconnected => fun(_) -> ok end,
|
disconnected => fun(_) -> ok end,
|
||||||
publish => fun(_) -> ok end } end,
|
publish => fun(_) -> ok end } end,
|
||||||
|
|
||||||
{ok, Client} = emqtt:start_link([{clientid, <<"emqx_c">>}, {proto_ver, v5}, {msg_handler, RasieMsg()}]),
|
{ok, Client} = emqtt:start_link([{clientid, <<"emqx_c">>}, {proto_ver, v5},
|
||||||
|
{msg_handler, RasieMsg()}]),
|
||||||
{ok, _} = emqtt:connect(Client),
|
{ok, _} = emqtt:connect(Client),
|
||||||
|
|
||||||
{ok, PktId} = emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, qos1),
|
{ok, PktId} = emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, qos1),
|
||||||
|
@ -78,27 +79,6 @@ t_reload_aclfile_and_cleanall(Config) ->
|
||||||
%% Check acl cache list
|
%% Check acl cache list
|
||||||
[ClientPid] = emqx_cm:lookup_channels(<<"emqx_c">>),
|
[ClientPid] = emqx_cm:lookup_channels(<<"emqx_c">>),
|
||||||
?assert(length(gen_server:call(ClientPid, list_acl_cache)) > 0),
|
?assert(length(gen_server:call(ClientPid, list_acl_cache)) > 0),
|
||||||
|
|
||||||
%% Update acl file and reload mod_acl_internal
|
|
||||||
Path = filename:join([testdir(proplists:get_value(data_dir, Config)), "acl2.conf"]),
|
|
||||||
ok = file:write_file(Path, <<"{deny, all}.">>),
|
|
||||||
OldPath = emqx:get_env(acl_file),
|
|
||||||
% application:set_env(emqx, acl_file, Path),
|
|
||||||
emqx_mod_acl_internal:reload([{acl_file, Path}]),
|
|
||||||
|
|
||||||
?assert(length(gen_server:call(ClientPid, list_acl_cache)) == 0),
|
|
||||||
{ok, PktId2} = emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, qos1),
|
|
||||||
|
|
||||||
receive
|
|
||||||
{puback, #{packet_id := PktId2, reason_code := Rc2}} ->
|
|
||||||
%% Not authorized
|
|
||||||
?assertEqual(16#87, Rc2);
|
|
||||||
_ ->
|
|
||||||
?assert(false)
|
|
||||||
end,
|
|
||||||
application:set_env(emqx, acl_file, OldPath),
|
|
||||||
file:delete(Path),
|
|
||||||
emqx_mod_acl_internal:reload([{acl_file, OldPath}]),
|
|
||||||
emqtt:stop(Client).
|
emqtt:stop(Client).
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
|
|
|
@ -111,11 +111,13 @@ t_basic_test(_) ->
|
||||||
|
|
||||||
t_connect_clean_start(_) ->
|
t_connect_clean_start(_) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
{ok, Client1} = emqtt:start_link([{clientid, <<"t_connect_clean_start">>},{proto_ver, v5},{clean_start, true}]),
|
{ok, Client1} = emqtt:start_link([{clientid, <<"t_connect_clean_start">>},
|
||||||
|
{proto_ver, v5},{clean_start, true}]),
|
||||||
{ok, _} = emqtt:connect(Client1),
|
{ok, _} = emqtt:connect(Client1),
|
||||||
?assertEqual(0, client_info(session_present, Client1)), %% [MQTT-3.1.2-4]
|
?assertEqual(0, client_info(session_present, Client1)), %% [MQTT-3.1.2-4]
|
||||||
ok = emqtt:pause(Client1),
|
ok = emqtt:pause(Client1),
|
||||||
{ok, Client2} = emqtt:start_link([{clientid, <<"t_connect_clean_start">>},{proto_ver, v5},{clean_start, false}]),
|
{ok, Client2} = emqtt:start_link([{clientid, <<"t_connect_clean_start">>},
|
||||||
|
{proto_ver, v5},{clean_start, false}]),
|
||||||
{ok, _} = emqtt:connect(Client2),
|
{ok, _} = emqtt:connect(Client2),
|
||||||
?assertEqual(1, client_info(session_present, Client2)), %% [MQTT-3.1.2-5]
|
?assertEqual(1, client_info(session_present, Client2)), %% [MQTT-3.1.2-5]
|
||||||
?assertEqual(142, receive_disconnect_reasoncode()),
|
?assertEqual(142, receive_disconnect_reasoncode()),
|
||||||
|
@ -124,7 +126,8 @@ t_connect_clean_start(_) ->
|
||||||
ok = emqtt:disconnect(Client2),
|
ok = emqtt:disconnect(Client2),
|
||||||
waiting_client_process_exit(Client2),
|
waiting_client_process_exit(Client2),
|
||||||
|
|
||||||
{ok, Client3} = emqtt:start_link([{clientid, <<"new_client">>},{proto_ver, v5},{clean_start, false}]),
|
{ok, Client3} = emqtt:start_link([{clientid, <<"new_client">>},
|
||||||
|
{proto_ver, v5},{clean_start, false}]),
|
||||||
{ok, _} = emqtt:connect(Client3),
|
{ok, _} = emqtt:connect(Client3),
|
||||||
?assertEqual(0, client_info(session_present, Client3)), %% [MQTT-3.1.2-6]
|
?assertEqual(0, client_info(session_present, Client3)), %% [MQTT-3.1.2-6]
|
||||||
ok = emqtt:disconnect(Client3),
|
ok = emqtt:disconnect(Client3),
|
||||||
|
@ -145,7 +148,8 @@ t_connect_will_message(_) ->
|
||||||
]),
|
]),
|
||||||
{ok, _} = emqtt:connect(Client1),
|
{ok, _} = emqtt:connect(Client1),
|
||||||
[ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client1)),
|
[ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client1)),
|
||||||
?assertNotEqual(undefined, maps:find(will_msg, emqx_connection:info(sys:get_state(ClientPid)))), %% [MQTT-3.1.2-7]
|
Info = emqx_connection:info(sys:get_state(ClientPid)),
|
||||||
|
?assertNotEqual(undefined, maps:find(will_msg, Info)), %% [MQTT-3.1.2-7]
|
||||||
|
|
||||||
{ok, Client2} = emqtt:start_link([{proto_ver, v5}]),
|
{ok, Client2} = emqtt:start_link([{proto_ver, v5}]),
|
||||||
{ok, _} = emqtt:connect(Client2),
|
{ok, _} = emqtt:connect(Client2),
|
||||||
|
@ -179,10 +183,7 @@ t_batch_subscribe(_) ->
|
||||||
{ok, Client} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"batch_test">>}]),
|
{ok, Client} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"batch_test">>}]),
|
||||||
{ok, _} = emqtt:connect(Client),
|
{ok, _} = emqtt:connect(Client),
|
||||||
application:set_env(emqx, enable_acl_cache, false),
|
application:set_env(emqx, enable_acl_cache, false),
|
||||||
TempAcl = emqx_ct_helpers:deps_path(emqx, "test/emqx_access_SUITE_data/acl_temp.conf"),
|
application:set_env(emqx, acl_nomatch, deny),
|
||||||
file:write_file(TempAcl, "{deny, {client, \"batch_test\"}, subscribe, [\"t1\", \"t2\", \"t3\"]}.\n"),
|
|
||||||
timer:sleep(10),
|
|
||||||
emqx_mod_acl_internal:reload([{acl_file, TempAcl}]),
|
|
||||||
{ok, _, [?RC_NOT_AUTHORIZED,
|
{ok, _, [?RC_NOT_AUTHORIZED,
|
||||||
?RC_NOT_AUTHORIZED,
|
?RC_NOT_AUTHORIZED,
|
||||||
?RC_NOT_AUTHORIZED]} = emqtt:subscribe(Client, [{<<"t1">>, qos1},
|
?RC_NOT_AUTHORIZED]} = emqtt:subscribe(Client, [{<<"t1">>, qos1},
|
||||||
|
@ -193,7 +194,7 @@ t_batch_subscribe(_) ->
|
||||||
?RC_NO_SUBSCRIPTION_EXISTED]} = emqtt:unsubscribe(Client, [<<"t1">>,
|
?RC_NO_SUBSCRIPTION_EXISTED]} = emqtt:unsubscribe(Client, [<<"t1">>,
|
||||||
<<"t2">>,
|
<<"t2">>,
|
||||||
<<"t3">>]),
|
<<"t3">>]),
|
||||||
file:delete(TempAcl),
|
application:set_env(emqx, acl_nomatch, allow),
|
||||||
emqtt:disconnect(Client).
|
emqtt:disconnect(Client).
|
||||||
|
|
||||||
t_connect_will_retain(_) ->
|
t_connect_will_retain(_) ->
|
||||||
|
@ -261,9 +262,10 @@ t_connect_limit_timeout(_) ->
|
||||||
[ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)),
|
[ClientPid] = emqx_cm:lookup_channels(client_info(clientid, Client)),
|
||||||
|
|
||||||
?assertEqual(undefined, emqx_connection:info(limit_timer, sys:get_state(ClientPid))),
|
?assertEqual(undefined, emqx_connection:info(limit_timer, sys:get_state(ClientPid))),
|
||||||
ok = emqtt:publish(Client, Topic, <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 0),
|
Payload = <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>,
|
||||||
ok = emqtt:publish(Client, Topic, <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 0),
|
ok = emqtt:publish(Client, Topic, Payload, 0),
|
||||||
ok = emqtt:publish(Client, Topic, <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 0),
|
ok = emqtt:publish(Client, Topic, Payload, 0),
|
||||||
|
ok = emqtt:publish(Client, Topic, Payload, 0),
|
||||||
timer:sleep(200),
|
timer:sleep(200),
|
||||||
?assert(is_reference(emqx_connection:info(limit_timer, sys:get_state(ClientPid)))),
|
?assert(is_reference(emqx_connection:info(limit_timer, sys:get_state(ClientPid)))),
|
||||||
|
|
||||||
|
@ -523,7 +525,8 @@ t_publish_rap(_) ->
|
||||||
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
||||||
{ok, _} = emqtt:connect(Client1),
|
{ok, _} = emqtt:connect(Client1),
|
||||||
{ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{Topic, [{rap, true}, {qos, 2}]}]),
|
{ok, _, [2]} = emqtt:subscribe(Client1, #{}, [{Topic, [{rap, true}, {qos, 2}]}]),
|
||||||
{ok, _} = emqtt:publish(Client1, Topic, #{}, <<"retained message">>, [{qos, ?QOS_1}, {retain, true}]),
|
{ok, _} = emqtt:publish(Client1, Topic, #{}, <<"retained message">>,
|
||||||
|
[{qos, ?QOS_1}, {retain, true}]),
|
||||||
[Msg1 | _] = receive_messages(1),
|
[Msg1 | _] = receive_messages(1),
|
||||||
?assertEqual(true, maps:get(retain, Msg1)), %% [MQTT-3.3.1-12]
|
?assertEqual(true, maps:get(retain, Msg1)), %% [MQTT-3.3.1-12]
|
||||||
ok = emqtt:disconnect(Client1),
|
ok = emqtt:disconnect(Client1),
|
||||||
|
@ -531,7 +534,8 @@ t_publish_rap(_) ->
|
||||||
{ok, Client2} = emqtt:start_link([{proto_ver, v5}]),
|
{ok, Client2} = emqtt:start_link([{proto_ver, v5}]),
|
||||||
{ok, _} = emqtt:connect(Client2),
|
{ok, _} = emqtt:connect(Client2),
|
||||||
{ok, _, [2]} = emqtt:subscribe(Client2, #{}, [{Topic, [{rap, false}, {qos, 2}]}]),
|
{ok, _, [2]} = emqtt:subscribe(Client2, #{}, [{Topic, [{rap, false}, {qos, 2}]}]),
|
||||||
{ok, _} = emqtt:publish(Client2, Topic, #{}, <<"retained message">>, [{qos, ?QOS_1}, {retain, true}]),
|
{ok, _} = emqtt:publish(Client2, Topic, #{}, <<"retained message">>,
|
||||||
|
[{qos, ?QOS_1}, {retain, true}]),
|
||||||
[Msg2 | _] = receive_messages(1),
|
[Msg2 | _] = receive_messages(1),
|
||||||
?assertEqual(false, maps:get(retain, Msg2)), %% [MQTT-3.3.1-13]
|
?assertEqual(false, maps:get(retain, Msg2)), %% [MQTT-3.3.1-13]
|
||||||
ok = emqtt:disconnect(Client2),
|
ok = emqtt:disconnect(Client2),
|
||||||
|
@ -575,8 +579,10 @@ t_publish_topic_alias(_) ->
|
||||||
{ok, Client2} = emqtt:start_link([{proto_ver, v5}]),
|
{ok, Client2} = emqtt:start_link([{proto_ver, v5}]),
|
||||||
{ok, _} = emqtt:connect(Client2),
|
{ok, _} = emqtt:connect(Client2),
|
||||||
{ok, _, [2]} = emqtt:subscribe(Client2, Topic, qos2),
|
{ok, _, [2]} = emqtt:subscribe(Client2, Topic, qos2),
|
||||||
ok = emqtt:publish(Client2, Topic, #{'Topic-Alias' => 233}, <<"Topic-Alias">>, [{qos, ?QOS_0}]),
|
ok = emqtt:publish(Client2, Topic, #{'Topic-Alias' => 233},
|
||||||
ok = emqtt:publish(Client2, <<"">>, #{'Topic-Alias' => 233}, <<"Topic-Alias">>, [{qos, ?QOS_0}]),
|
<<"Topic-Alias">>, [{qos, ?QOS_0}]),
|
||||||
|
ok = emqtt:publish(Client2, <<"">>, #{'Topic-Alias' => 233},
|
||||||
|
<<"Topic-Alias">>, [{qos, ?QOS_0}]),
|
||||||
?assertEqual(2, length(receive_messages(2))), %% [MQTT-3.3.2-12]
|
?assertEqual(2, length(receive_messages(2))), %% [MQTT-3.3.2-12]
|
||||||
ok = emqtt:disconnect(Client2),
|
ok = emqtt:disconnect(Client2),
|
||||||
waiting_client_process_exit(Client2),
|
waiting_client_process_exit(Client2),
|
||||||
|
@ -589,7 +595,8 @@ t_publish_response_topic(_) ->
|
||||||
|
|
||||||
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
{ok, Client1} = emqtt:start_link([{proto_ver, v5}]),
|
||||||
{ok, _} = emqtt:connect(Client1),
|
{ok, _} = emqtt:connect(Client1),
|
||||||
ok = emqtt:publish(Client1, Topic, #{'Response-Topic' => nth(1, ?WILD_TOPICS)}, <<"Response-Topic">>, [{qos, ?QOS_0}]),
|
ok = emqtt:publish(Client1, Topic, #{'Response-Topic' => nth(1, ?WILD_TOPICS)},
|
||||||
|
<<"Response-Topic">>, [{qos, ?QOS_0}]),
|
||||||
?assertEqual(130, receive_disconnect_reasoncode()), %% [MQTT-3.3.2-14]
|
?assertEqual(130, receive_disconnect_reasoncode()), %% [MQTT-3.3.2-14]
|
||||||
waiting_client_process_exit(Client1),
|
waiting_client_process_exit(Client1),
|
||||||
|
|
||||||
|
@ -620,7 +627,8 @@ t_publish_overlapping_subscriptions(_) ->
|
||||||
{ok, _} = emqtt:connect(Client1),
|
{ok, _} = emqtt:connect(Client1),
|
||||||
{ok, _, [1]} = emqtt:subscribe(Client1, Properties, nth(1, ?WILD_TOPICS), qos1),
|
{ok, _, [1]} = emqtt:subscribe(Client1, Properties, nth(1, ?WILD_TOPICS), qos1),
|
||||||
{ok, _, [0]} = emqtt:subscribe(Client1, Properties, nth(3, ?WILD_TOPICS), qos0),
|
{ok, _, [0]} = emqtt:subscribe(Client1, Properties, nth(3, ?WILD_TOPICS), qos0),
|
||||||
{ok, _} = emqtt:publish(Client1, Topic, #{}, <<"t_publish_overlapping_subscriptions">>, [{qos, ?QOS_2}]),
|
{ok, _} = emqtt:publish(Client1, Topic, #{},
|
||||||
|
<<"t_publish_overlapping_subscriptions">>, [{qos, ?QOS_2}]),
|
||||||
|
|
||||||
[Msg1 | _ ] = receive_messages(2),
|
[Msg1 | _ ] = receive_messages(2),
|
||||||
?assert( maps:get(qos, Msg1) < 2 ), %% [MQTT-3.3.4-2]
|
?assert( maps:get(qos, Msg1) < 2 ), %% [MQTT-3.3.4-2]
|
||||||
|
@ -684,8 +692,9 @@ t_subscribe_actions(_) ->
|
||||||
{ok, _} = emqtt:publish(Client1, Topic, <<"t_subscribe_actions">>, 2),
|
{ok, _} = emqtt:publish(Client1, Topic, <<"t_subscribe_actions">>, 2),
|
||||||
[Msg1 | _ ] = receive_messages(1),
|
[Msg1 | _ ] = receive_messages(1),
|
||||||
?assertEqual(1, maps:get(qos, Msg1)), %% [MQTT-3.8.4-3] [MQTT-3.8.4-8]
|
?assertEqual(1, maps:get(qos, Msg1)), %% [MQTT-3.8.4-3] [MQTT-3.8.4-8]
|
||||||
|
%% [MQTT-3.8.4-5] [MQTT-3.8.4-6] [MQTT-3.8.4-7]
|
||||||
{ok, _, [2,2]} = emqtt:subscribe(Client1, [{nth(1, ?TOPICS), qos2}, {nth(2, ?TOPICS), qos2}] ), %% [MQTT-3.8.4-5] [MQTT-3.8.4-6] [MQTT-3.8.4-7]
|
{ok, _, [2,2]} = emqtt:subscribe(Client1, [{nth(1, ?TOPICS), qos2},
|
||||||
|
{nth(2, ?TOPICS), qos2}] ),
|
||||||
ok = emqtt:disconnect(Client1).
|
ok = emqtt:disconnect(Client1).
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Unsubsctibe Unsuback
|
%% Unsubsctibe Unsuback
|
||||||
|
@ -702,7 +711,8 @@ t_unscbsctibe(_) ->
|
||||||
{ok, _, [17]} = emqtt:unsubscribe(Client1, <<"noExistTopic">>), %% [MQTT-3.10.4-5]
|
{ok, _, [17]} = emqtt:unsubscribe(Client1, <<"noExistTopic">>), %% [MQTT-3.10.4-5]
|
||||||
|
|
||||||
{ok, _, [2, 2]} = emqtt:subscribe(Client1, [{Topic1, qos2}, {Topic2, qos2}]),
|
{ok, _, [2, 2]} = emqtt:subscribe(Client1, [{Topic1, qos2}, {Topic2, qos2}]),
|
||||||
{ok, _, [0, 0, 17]} = emqtt:unsubscribe(Client1, [Topic1, Topic2, <<"noExistTopic">>]), %% [[MQTT-3.10.4-6]] [MQTT-3.11.3-1] [MQTT-3.11.3-2]
|
%% [[MQTT-3.10.4-6]] [MQTT-3.11.3-1] [MQTT-3.11.3-2]
|
||||||
|
{ok, _, [0, 0, 17]} = emqtt:unsubscribe(Client1, [Topic1, Topic2, <<"noExistTopic">>]),
|
||||||
ok = emqtt:disconnect(Client1).
|
ok = emqtt:disconnect(Client1).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -748,7 +758,8 @@ t_shared_subscriptions_client_terminates_when_qos_eq_2(_) ->
|
||||||
|
|
||||||
{ok, Pub} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"pub_client">>}]),
|
{ok, Pub} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"pub_client">>}]),
|
||||||
{ok, _} = emqtt:connect(Pub),
|
{ok, _} = emqtt:connect(Pub),
|
||||||
{ok, _} = emqtt:publish(Pub, Topic, <<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 2),
|
{ok, _} = emqtt:publish(Pub, Topic,
|
||||||
|
<<"t_shared_subscriptions_client_terminates_when_qos_eq_2">>, 2),
|
||||||
|
|
||||||
receive
|
receive
|
||||||
{'EXIT', _,{shutdown, for_testiong}} ->
|
{'EXIT', _,{shutdown, for_testiong}} ->
|
||||||
|
|
Loading…
Reference in New Issue