feat(modules): Update the configuration file to hocon (#5151)

This commit is contained in:
turtleDeng 2021-07-01 20:08:13 +08:00 committed by GitHub
parent c094e5ddcc
commit c7e540f4f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 217 additions and 412 deletions

View File

@ -1 +1,33 @@
# empty
emqx_modules: {
modules:[
{
type: delayed
enable: false
},
{
type: presence
enable: true
qos: 1
},
{
type: recon
enable: true
},
{
type: rewrite
enable: false
rules:[{
action: publish
source_topic: "x/#"
re: "^x/y/(.+)$"
dest_topic: "z/y/$1"
}]
},
{
type: topic_metrics
enable: false
topics: ["topic/#"]
}
]
}

View File

@ -1 +0,0 @@
% empty

View File

@ -116,11 +116,7 @@ unregister(#{topic := Topic0}, _Params) ->
end).
execute_when_enabled(Fun) ->
Enabled = case emqx_modules:find_module(emqx_mod_topic_metrics) of
[{_, false}] -> false;
[{_, true}] -> true
end,
case Enabled of
case emqx_modules:find_module(topic_metrics) of
true ->
Fun();
false ->

View File

@ -117,7 +117,7 @@ topic(connected, ClientId) ->
topic(disconnected, ClientId) ->
emqx_topic:systop(iolist_to_binary(["clients/", ClientId, "/disconnected"])).
qos(Env) -> proplists:get_value(qos, Env, 0).
qos(Env) -> maps:get(qos, Env, 0).
-compile({inline, [reason/1]}).
reason(Reason) when is_atom(Reason) -> Reason;

View File

@ -43,8 +43,8 @@
%% Load/Unload
%%--------------------------------------------------------------------
load(RawRules) ->
{PubRules, SubRules} = compile(RawRules),
load(Env) ->
{PubRules, SubRules} = compile(maps:get(rules, Env, [])),
emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}),
emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}),
emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}).
@ -70,20 +70,23 @@ description() ->
%%--------------------------------------------------------------------
compile(Rules) ->
PubRules = [ begin
{ok, MP} = re:compile(Re),
{rewrite, Topic, MP, Dest}
end || {rewrite, pub, Topic, Re, Dest}<- Rules ],
SubRules = [ begin
{ok, MP} = re:compile(Re),
{rewrite, Topic, MP, Dest}
end || {rewrite, sub, Topic, Re, Dest}<- Rules ],
{PubRules, SubRules}.
lists:foldl(fun(#{source_topic := Topic,
re := Re,
dest_topic := Dest,
action := Action}, {Acc1, Acc2}) ->
{ok, MP} = re:compile(Re),
case Action of
publish ->
{[{Topic, MP, Dest} | Acc1], Acc2};
subscribe ->
{Acc1, [{Topic, MP, Dest} | Acc2]}
end
end, {[], []}, Rules).
match_and_rewrite(Topic, []) ->
Topic;
match_and_rewrite(Topic, [{rewrite, Filter, MP, Dest} | Rules]) ->
match_and_rewrite(Topic, [{Filter, MP, Dest} | Rules]) ->
case emqx_topic:match(Topic, Filter) of
true -> rewrite(Topic, MP, Dest);
false -> match_and_rewrite(Topic, Rules)

View File

@ -1,65 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_subscription).
-behaviour(emqx_gen_mod).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
%% emqx_gen_mod callbacks
-export([ load/1
, unload/1
, description/0
]).
%% APIs
-export([on_client_connected/3]).
%%--------------------------------------------------------------------
%% Load/Unload Hook
%%--------------------------------------------------------------------
load(Topics) ->
emqx_hooks:add('client.connected', {?MODULE, on_client_connected, [Topics]}).
on_client_connected(#{clientid := ClientId, username := Username}, _ConnInfo = #{proto_ver := ProtoVer}, Topics) ->
Replace = fun(Topic) ->
rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic))
end,
TopicFilters = case ProtoVer of
?MQTT_PROTO_V5 -> [{Replace(Topic), SubOpts} || {Topic, SubOpts} <- Topics];
_ -> [{Replace(Topic), #{qos => Qos}} || {Topic, #{qos := Qos}} <- Topics]
end,
self() ! {subscribe, TopicFilters}.
unload(_) ->
emqx_hooks:del('client.connected', {?MODULE, on_client_connected}).
description() ->
"EMQ X Subscription Module".
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
rep(<<"%c">>, ClientId, Topic) ->
emqx_topic:feed_var(<<"%c">>, ClientId, Topic);
rep(<<"%u">>, undefined, Topic) ->
Topic;
rep(<<"%u">>, Username, Topic) ->
emqx_topic:feed_var(<<"%u">>, Username, Topic).

View File

@ -60,7 +60,6 @@ stop_child(ChildId) ->
%%--------------------------------------------------------------------
init([]) ->
ok = emqx_tables:new(emqx_modules, [set, public, {write_concurrency, true}]),
{ok, {{one_for_one, 10, 100}, []}}.
%%--------------------------------------------------------------------

View File

@ -1,6 +1,6 @@
{application, emqx_modules,
[{description, "EMQ X Module Management"},
{vsn, "4.3.2"},
{vsn, "5.0.0"},
{modules, []},
{applications, [kernel,stdlib]},
{mod, {emqx_modules_app, []}},

View File

@ -1,23 +0,0 @@
%% -*-: erlang -*-
{VSN,
[
{"4.3.1", [
{load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []}
]},
{"4.3.0", [
{update, emqx_mod_delayed, {advanced, []}},
{load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}
],
[
{"4.3.1", [
{load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []}
]},
{"4.3.0", [
{update, emqx_mod_delayed, {advanced, []}},
{load_module, emqx_mod_api_topic_metrics, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}
]
}.

View File

@ -22,12 +22,11 @@
-export([ list/0
, load/0
, load/1
, load/2
, unload/0
, unload/1
, reload/1
, find_module/1
, load_module/2
]).
-export([cli/1]).
@ -35,48 +34,51 @@
%% @doc List all available plugins
-spec(list() -> [{atom(), boolean()}]).
list() ->
ets:tab2list(?MODULE).
persistent_term:get(?MODULE, []).
%% @doc Load all the extended modules.
-spec(load() -> ok).
load() ->
case emqx:get_env(modules_loaded_file) of
undefined -> ok;
File ->
load_modules(File)
end.
Modules = emqx_config:get([emqx_modules, modules], []),
lists:foreach(fun(#{type := Module, enable := Enable} = Config) ->
case Enable of
true ->
load(name(Module), maps:without([type, enable], Config));
false ->
ok
end
end, Modules).
load(ModuleName) ->
load(Module, Env) ->
ModuleName = name(Module),
case find_module(ModuleName) of
[] ->
?LOG(alert, "Module ~s not found, cannot load it", [ModuleName]),
{error, not_found};
[{ModuleName, true}] ->
false ->
load_mod(ModuleName, Env);
true ->
?LOG(notice, "Module ~s is already started", [ModuleName]),
{error, already_started};
[{ModuleName, false}] ->
emqx_modules:load_module(ModuleName, true)
{error, already_started}
end.
%% @doc Unload all the extended modules.
-spec(unload() -> ok).
unload() ->
case emqx:get_env(modules_loaded_file) of
undefined -> ignore;
File ->
unload_modules(File)
end.
Modules = emqx_config:get([emqx_modules, modules], []),
lists:foreach(fun(#{type := Module, enable := Enable}) ->
case Enable of
true ->
unload_mod(name(Module));
false ->
ok
end
end, Modules).
unload(ModuleName) ->
case find_module(ModuleName) of
[] ->
false ->
?LOG(alert, "Module ~s not found, cannot load it", [ModuleName]),
{error, not_found};
[{ModuleName, false}] ->
?LOG(error, "Module ~s is not started", [ModuleName]),
{error, not_started};
[{ModuleName, true}] ->
unload_module(ModuleName, true)
true ->
unload_mod(ModuleName)
end.
-spec(reload(module()) -> ok | ignore | {error, any()}).
@ -84,94 +86,39 @@ reload(_) ->
ignore.
find_module(ModuleName) ->
ets:lookup(?MODULE, ModuleName).
lists:member(ModuleName, persistent_term:get(?MODULE, [])).
filter_module(ModuleNames) ->
filter_module(ModuleNames, emqx:get_env(modules, [])).
filter_module([], Acc) ->
Acc;
filter_module([{ModuleName, true} | ModuleNames], Acc) ->
filter_module(ModuleNames, lists:keydelete(ModuleName, 1, Acc));
filter_module([{_, false} | ModuleNames], Acc) ->
filter_module(ModuleNames, Acc).
load_modules(File) ->
case file:consult(File) of
{ok, ModuleNames} ->
lists:foreach(fun({ModuleName, _}) ->
ets:insert(?MODULE, {ModuleName, false})
end, filter_module(ModuleNames)),
lists:foreach(fun load_module/1, ModuleNames);
{error, Error} ->
?LOG(alert, "Failed to read: ~p, error: ~p", [File, Error])
end.
load_module({ModuleName, true}) ->
emqx_modules:load_module(ModuleName, false);
load_module({ModuleName, false}) ->
ets:insert(?MODULE, {ModuleName, false});
load_module(ModuleName) ->
load_module({ModuleName, true}).
load_module(ModuleName, Persistent) ->
Modules = emqx:get_env(modules, []),
Env = proplists:get_value(ModuleName, Modules, undefined),
load_mod(ModuleName, Env) ->
case ModuleName:load(Env) of
ok ->
ets:insert(?MODULE, {ModuleName, true}),
ok = write_loaded(Persistent),
Modules = persistent_term:get(?MODULE, []),
persistent_term:put(?MODULE, [ModuleName| Modules]),
?LOG(info, "Load ~s module successfully.", [ModuleName]);
{error, Error} ->
?LOG(error, "Load module ~s failed, cannot load for ~0p", [ModuleName, Error]),
{error, Error}
end.
unload_modules(File) ->
case file:consult(File) of
{ok, ModuleNames} ->
lists:foreach(fun unload_module/1, ModuleNames);
{error, Error} ->
?LOG(alert, "Failed to read: ~p, error: ~p", [File, Error])
end.
unload_module({ModuleName, true}) ->
unload_module(ModuleName, false);
unload_module({ModuleName, false}) ->
ets:insert(?MODULE, {ModuleName, false});
unload_module(ModuleName) ->
unload_module({ModuleName, true}).
unload_module(ModuleName, Persistent) ->
Modules = emqx:get_env(modules, []),
Env = proplists:get_value(ModuleName, Modules, undefined),
case ModuleName:unload(Env) of
unload_mod(ModuleName) ->
case ModuleName:unload(#{}) of
ok ->
ets:insert(?MODULE, {ModuleName, false}),
ok = write_loaded(Persistent),
Modules = persistent_term:get(?MODULE, []),
persistent_term:put(?MODULE, Modules -- [ModuleName]),
?LOG(info, "Unload ~s module successfully.", [ModuleName]);
{error, Error} ->
?LOG(error, "Unload module ~s failed, cannot unload for ~0p", [ModuleName, Error])
end.
write_loaded(true) ->
FilePath = emqx:get_env(modules_loaded_file),
case file:write_file(FilePath, [io_lib:format("~p.~n", [Name]) || Name <- list()]) of
ok -> ok;
{error, Error} ->
?LOG(error, "Write File ~p Error: ~p", [FilePath, Error]),
ok
end;
write_loaded(false) -> ok.
%%--------------------------------------------------------------------
%% @doc Modules Command
cli(["list"]) ->
lists:foreach(fun({Name, Active}) ->
emqx_ctl:print("Module(~s, description=~s, active=~s)~n",
[Name, Name:description(), Active])
lists:foreach(fun(Name) ->
emqx_ctl:print("Module(~s, description=~s)~n",
[Name, Name:description()])
end, emqx_modules:list());
cli(["load", Name]) ->
case emqx_modules:load(list_to_atom(Name)) of
case emqx_modules:load(list_to_atom(Name), #{}) of
ok ->
emqx_ctl:print("Module ~s loaded successfully.~n", [Name]);
{error, Reason} ->
@ -195,3 +142,10 @@ cli(_) ->
{"modules unload <Module>", "Unload module"},
{"modules reload <Module>", "Reload module"}
]).
name(delayed) -> emqx_mod_delayed;
name(presence) -> emqx_mod_presence;
name(recon) -> emqx_mod_recon;
name(rewrite) -> emqx_mod_rewrite;
name(topic_metrics) -> emqx_mod_topic_metrics;
name(Name) -> Name.

View File

@ -73,7 +73,7 @@
, reload/2
]).
-export([ do_load_module/2
-export([ do_load_module/3
, do_unload_module/2
]).
@ -83,11 +83,11 @@ list(#{node := Node}, _Params) ->
list(_Bindings, _Params) ->
return({ok, [format(Node, Modules) || {Node, Modules} <- list_modules()]}).
load(#{node := Node, module := Module}, _Params) ->
return(do_load_module(Node, Module));
load(#{node := Node, module := Module}, Params) ->
return(do_load_module(Node, Module, Params));
load(#{module := Module}, _Params) ->
Results = [do_load_module(Node, Module) || Node <- ekka_mnesia:running_nodes()],
load(#{module := Module}, Params) ->
Results = [do_load_module(Node, Module, Params) || Node <- ekka_mnesia:running_nodes()],
case lists:filter(fun(Item) -> Item =/= ok end, Results) of
[] ->
return(ok);
@ -129,10 +129,9 @@ reload(#{module := Module}, _Params) ->
format(Node, Modules) ->
#{node => Node, modules => [format(Module) || Module <- Modules]}.
format({Name, Active}) ->
#{name => Name,
description => iolist_to_binary(Name:description()),
active => Active}.
format(Name) ->
#{name => name(Name),
description => iolist_to_binary(Name:description())}.
list_modules() ->
[{Node, list_modules(Node)} || Node <- ekka_mnesia:running_nodes()].
@ -142,10 +141,10 @@ list_modules(Node) when Node =:= node() ->
list_modules(Node) ->
rpc_call(Node, list_modules, [Node]).
do_load_module(Node, Module) when Node =:= node() ->
emqx_modules:load(Module);
do_load_module(Node, Module) ->
rpc_call(Node, do_load_module, [Node, Module]).
do_load_module(Node, Module, Env) when Node =:= node() ->
emqx_modules:load(Module, Env);
do_load_module(Node, Module, Env) ->
rpc_call(Node, do_load_module, [Node, Module, Env]).
do_unload_module(Node, Module) when Node =:= node() ->
emqx_modules:unload(Module);
@ -162,3 +161,9 @@ rpc_call(Node, Fun, Args) ->
{badrpc, Reason} -> {error, Reason};
Res -> Res
end.
name(emqx_mod_delayed) -> delayed;
name(emqx_mod_presence) -> presence;
name(emqx_mod_recon) -> recon;
name(emqx_mod_rewrite) -> rewrite;
name(emqx_mod_topic_metrics) -> topic_metrics.

View File

@ -23,9 +23,6 @@
-export([stop/1]).
start(_Type, _Args) ->
% the configs for emqx_modules is so far still in emqx application
% Ensure it's loaded
_ = application:load(emqx),
{ok, Pid} = emqx_mod_sup:start_link(),
ok = emqx_modules:load(),
emqx_ctl:register_command(modules, {emqx_modules, cli}, []),

View File

@ -0,0 +1,61 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_modules_schema).
-include_lib("typerefl/include/types.hrl").
-behaviour(hocon_schema).
-export([ structs/0
, fields/1]).
structs() -> ["emqx_modules"].
fields("emqx_modules") ->
[{modules, hoconsc:array(hoconsc:union([ hoconsc:ref(?MODULE, "common")
, hoconsc:ref(?MODULE, "presence")
, hoconsc:ref(?MODULE, "rewrite")
, hoconsc:ref(?MODULE, "topic_metrics")
]))}];
fields("common") ->
[ {type, hoconsc:enum([delayed, recon])}
, {enable, emqx_schema:t(boolean(), undefined, false)}
];
fields("presence") ->
[ {type, hoconsc:enum([presence])}
, {enable, emqx_schema:t(boolean(), undefined, false)}
, {qos, emqx_schema:t(integer(), undefined, 1)}
];
fields("rewrite") ->
[ {type, hoconsc:enum([rewrite])}
, {enable, emqx_schema:t(boolean(), undefined, false)}
, {rules, hoconsc:array(hoconsc:ref(?MODULE, "rules"))}
];
fields("topic_metrics") ->
[ {type, hoconsc:enum([topic_metrics])}
, {enable, emqx_schema:t(boolean(), undefined, false)}
, {topics, hoconsc:array(binary())}
];
fields("rules") ->
[ {action, hoconsc:enum([publish, subscribe])}
, {source_topic, emqx_schema:t(binary())}
, {re, emqx_schema:t(binary())}
, {dest_topic, emqx_schema:t(binary())}
].

View File

@ -35,19 +35,12 @@ all() ->
emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([emqx_modules], fun set_special_configs/1),
emqx_ct_helpers:start_apps([emqx_modules]),
Config.
end_per_suite(_) ->
emqx_ct_helpers:stop_apps([emqx_modules]).
set_special_configs(emqx) ->
application:set_env(emqx, modules, [{emqx_mod_delayed, []}]),
application:set_env(emqx, allow_anonymous, false),
application:set_env(emqx, enable_acl_cache, false);
set_special_configs(_App) ->
ok.
%%--------------------------------------------------------------------
%% Test cases
%%--------------------------------------------------------------------

View File

@ -36,7 +36,7 @@ end_per_suite(_Config) ->
%% Test case for emqx_mod_presence
t_mod_presence(_) ->
ok = emqx_mod_presence:load([{qos, ?QOS_1}]),
ok = emqx_mod_presence:load(#{qos => ?QOS_1}),
{ok, C1} = emqtt:start_link([{clientid, <<"monsys">>}]),
{ok, _} = emqtt:connect(C1),
{ok, _Props, [?QOS_1]} = emqtt:subscribe(C1, <<"$SYS/brokers/+/clients/#">>, qos1),
@ -49,7 +49,7 @@ t_mod_presence(_) ->
ok = emqtt:disconnect(C2),
ok = recv_and_check_presence(<<"clientid">>, <<"disconnected">>),
ok = emqtt:disconnect(C1),
ok = emqx_mod_presence:unload([{qos, ?QOS_1}]).
ok = emqx_mod_presence:unload([]).
t_mod_presence_reason(_) ->
?assertEqual(normal, emqx_mod_presence:reason(normal)),

View File

@ -22,8 +22,15 @@
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(RULES, [{rewrite, pub, <<"x/#">>,<<"^x/y/(.+)$">>,<<"z/y/$1">>},
{rewrite, sub, <<"y/+/z/#">>,<<"^y/(.+)/z/(.+)$">>,<<"y/z/$2">>}
-define(RULES, [#{action => publish,
source_topic => <<"x/#">>,
re => <<"^x/y/(.+)$">>,
dest_topic => <<"z/y/$1">>
},
#{action => subscribe,
source_topic => <<"y/+/z/#">>,
re => <<"^y/(.+)/z/(.+)$">>,
dest_topic => <<"y/z/$2">>}
]).
all() -> emqx_ct:all(?MODULE).
@ -40,7 +47,7 @@ end_per_suite(_Config) ->
%% Test case for emqx_mod_write
t_mod_rewrite(_Config) ->
ok = emqx_mod_rewrite:load(?RULES),
ok = emqx_mod_rewrite:load(#{rules => ?RULES}),
{ok, C} = emqtt:start_link([{clientid, <<"rewrite_client">>}]),
{ok, _} = emqtt:connect(C),
PubOrigTopics = [<<"x/y/2">>, <<"x/1/2">>],

View File

@ -1,92 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_subscription_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([]).
t_on_client_connected(_) ->
?assertEqual(ok, emqx_mod_subscription:load([{<<"connected/%c/%u">>, #{qos => ?QOS_0}}])),
{ok, C} = emqtt:start_link([{host, "localhost"},
{clientid, "myclient"},
{username, "admin"}]),
{ok, _} = emqtt:connect(C),
emqtt:publish(C, <<"connected/myclient/admin">>, <<"Hello world">>, ?QOS_0),
{ok, #{topic := Topic, payload := Payload}} = receive_publish(100),
?assertEqual(<<"connected/myclient/admin">>, Topic),
?assertEqual(<<"Hello world">>, Payload),
ok = emqtt:disconnect(C),
?assertEqual(ok, emqx_mod_subscription:unload([{<<"connected/%c/%u">>, #{qos => ?QOS_0}}])).
t_on_undefined_client_connected(_) ->
?assertEqual(ok, emqx_mod_subscription:load([{<<"connected/undefined">>, #{qos => ?QOS_1}}])),
{ok, C} = emqtt:start_link([{host, "localhost"}]),
{ok, _} = emqtt:connect(C),
emqtt:publish(C, <<"connected/undefined">>, <<"Hello world">>, ?QOS_1),
{ok, #{topic := Topic, payload := Payload}} = receive_publish(100),
?assertEqual(<<"connected/undefined">>, Topic),
?assertEqual(<<"Hello world">>, Payload),
ok = emqtt:disconnect(C),
?assertEqual(ok, emqx_mod_subscription:unload([{<<"connected/undefined">>, #{qos => ?QOS_1}}])).
t_suboption(_) ->
Client_info = fun(Key, Client) -> maps:get(Key, maps:from_list(emqtt:info(Client)), undefined) end,
Suboption = #{qos => ?QOS_2, nl => 1, rap => 1, rh => 2},
?assertEqual(ok, emqx_mod_subscription:load([{<<"connected/%c/%u">>, Suboption}])),
{ok, C1} = emqtt:start_link([{proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
timer:sleep(200),
[CPid1] = emqx_cm:lookup_channels(Client_info(clientid, C1)),
[ Sub1 | _ ] = ets:lookup(emqx_subscription,CPid1),
[ Suboption1 | _ ] = ets:lookup(emqx_suboption,Sub1),
?assertMatch({Sub1, #{qos := 2, nl := 1, rap := 1, rh := 2, subid := _}}, Suboption1),
ok = emqtt:disconnect(C1),
%% The subscription option is not valid for MQTT V3.1.1
{ok, C2} = emqtt:start_link([{proto_ver, v4}]),
{ok, _} = emqtt:connect(C2),
timer:sleep(200),
[CPid2] = emqx_cm:lookup_channels(Client_info(clientid, C2)),
[ Sub2 | _ ] = ets:lookup(emqx_subscription,CPid2),
[ Suboption2 | _ ] = ets:lookup(emqx_suboption,Sub2),
ok = emqtt:disconnect(C2),
?assertMatch({Sub2, #{qos := 2, nl := 0, rap := 0, rh := 0, subid := _}}, Suboption2),
?assertEqual(ok, emqx_mod_subscription:unload([{<<"connected/undefined">>, Suboption}])).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
receive_publish(Timeout) ->
receive
{publish, Publish} -> {ok, Publish}
after
Timeout -> {error, timeout}
end.

View File

@ -1,49 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_sup_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).
%%--------------------------------------------------------------------
%% Test cases
%%--------------------------------------------------------------------
t_start(_) ->
?assertEqual([], supervisor:which_children(emqx_mod_sup)).
t_start_child(_) ->
%% Set the emqx_mod_sup child with emqx_hooks for test
Mod = emqx_hooks,
Spec = #{id => Mod,
start => {Mod, start_link, []},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [Mod]},
ok = emqx_mod_sup:start_child(Mod, worker),
?assertError({already_started, _}, emqx_mod_sup:start_child(Spec)),
ok = emqx_mod_sup:stop_child(Mod),
{error, not_found} = emqx_mod_sup:stop_child(Mod),
ok.

View File

@ -37,7 +37,6 @@ init_per_suite(Config) ->
Config.
set_special_cfg(_) ->
application:set_env(emqx, modules_loaded_file, emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_modules")),
ok.
end_per_suite(_Config) ->
@ -47,70 +46,67 @@ end_per_suite(_Config) ->
t_load(_) ->
?assertEqual(ok, emqx_modules:unload()),
?assertEqual(ok, emqx_modules:load()),
?assertEqual({error, not_found}, emqx_modules:load(not_existed_module)),
?assertEqual({error, not_started}, emqx_modules:unload(emqx_mod_rewrite)),
?assertEqual(ignore, emqx_modules:reload(emqx_mod_rewrite)).
?assertEqual({error, not_started}, emqx_modules:unload(rewrite)),
?assertEqual(ignore, emqx_modules:reload(rewrite)).
t_list(_) ->
?assertMatch([{_, _} | _ ], emqx_modules:list()).
emqx_modules:load(presence, #{qos => 1}),
?assertMatch([_ | _ ], emqx_modules:list()),
emqx_modules:unload(presence).
t_modules_api(_) ->
emqx_modules:load_module(emqx_mod_presence, false),
emqx_modules:load(presence, #{qos => 1}),
timer:sleep(50),
{ok, Modules1} = request_api(get, api_path(["modules"]), auth_header_()),
[Modules11] = filter(get(<<"data">>, Modules1), <<"node">>, atom_to_binary(node(), utf8)),
[Module1] = filter(maps:get(<<"modules">>, Modules11), <<"name">>, <<"emqx_mod_presence">>),
?assertEqual(<<"emqx_mod_presence">>, maps:get(<<"name">>, Module1)),
?assertEqual(true, maps:get(<<"active">>, Module1)),
[Module1] = filter(maps:get(<<"modules">>, Modules11), <<"name">>, <<"presence">>),
?assertEqual(<<"presence">>, maps:get(<<"name">>, Module1)),
{ok, _} = request_api(put,
api_path(["modules",
atom_to_list(emqx_mod_presence),
atom_to_list(presence),
"unload"]),
auth_header_()),
{ok, Error1} = request_api(put,
api_path(["modules",
atom_to_list(emqx_mod_presence),
atom_to_list(presence),
"unload"]),
auth_header_()),
?assertEqual(<<"not_started">>, get(<<"message">>, Error1)),
{ok, Modules2} = request_api(get,
api_path(["nodes", atom_to_list(node()), "modules"]),
auth_header_()),
[Module2] = filter(get(<<"data">>, Modules2), <<"name">>, <<"emqx_mod_presence">>),
?assertEqual(<<"emqx_mod_presence">>, maps:get(<<"name">>, Module2)),
?assertEqual(false, maps:get(<<"active">>, Module2)),
[Module2] = filter(get(<<"data">>, Modules2), <<"name">>, <<"presence">>),
?assertEqual(<<"presence">>, maps:get(<<"name">>, Module2)),
{ok, _} = request_api(put,
api_path(["nodes",
atom_to_list(node()),
"modules",
atom_to_list(emqx_mod_presence),
atom_to_list(presence),
"load"]),
auth_header_()),
{ok, Modules3} = request_api(get,
api_path(["nodes", atom_to_list(node()), "modules"]),
auth_header_()),
[Module3] = filter(get(<<"data">>, Modules3), <<"name">>, <<"emqx_mod_presence">>),
?assertEqual(<<"emqx_mod_presence">>, maps:get(<<"name">>, Module3)),
?assertEqual(true, maps:get(<<"active">>, Module3)),
[Module3] = filter(get(<<"data">>, Modules3), <<"name">>, <<"presence">>),
?assertEqual(<<"presence">>, maps:get(<<"name">>, Module3)),
{ok, _} = request_api(put,
api_path(["nodes",
atom_to_list(node()),
"modules",
atom_to_list(emqx_mod_presence),
atom_to_list(presence),
"unload"]),
auth_header_()),
{ok, Error2} = request_api(put,
api_path(["nodes",
atom_to_list(node()),
"modules",
atom_to_list(emqx_mod_presence),
atom_to_list(presence),
"unload"]),
auth_header_()),
?assertEqual(<<"not_started">>, get(<<"message">>, Error2)),
emqx_modules:unload(emqx_mod_presence).
emqx_modules:unload(presence).
t_modules_cmd(_) ->
@ -120,10 +116,10 @@ t_modules_cmd(_) ->
meck:expect(emqx_modules, unload, fun(_) -> ok end),
meck:expect(emqx_modules, reload, fun(_) -> ok end),
?assertEqual(emqx_modules:cli(["list"]), ok),
?assertEqual(emqx_modules:cli(["load", "emqx_mod_presence"]),
"Module emqx_mod_presence loaded successfully.\n"),
?assertEqual(emqx_modules:cli(["unload", "emqx_mod_presence"]),
"Module emqx_mod_presence unloaded successfully.\n"),
?assertEqual(emqx_modules:cli(["load", "delayed"]),
"Module delayed loaded successfully.\n"),
?assertEqual(emqx_modules:cli(["unload", "delayed"]),
"Module delayed unloaded successfully.\n"),
unmock_print().
%% For: https://github.com/emqx/emqx/issues/4511

View File

@ -307,12 +307,7 @@ active_plugins() ->
end, [], emqx_plugins:list()).
active_modules() ->
lists:foldl(fun({Name, Persistent}, Acc) ->
case Persistent of
true -> [Name | Acc];
false -> Acc
end
end, [], emqx_modules:list()).
emqx_modules:list().
num_clients() ->
emqx_stats:getstat('connections.max').

View File

@ -1,2 +0,0 @@
{emqx_mod_presence, true}.
{emqx_mod_recon, true}.

View File

@ -1,4 +1,3 @@
{emqx_management, true}.
{emqx_dashboard, true}.
{emqx_modules, {{enable_plugin_emqx_modules}}}.
{emqx_retainer, {{enable_plugin_emqx_retainer}}}.

View File

@ -192,8 +192,8 @@ overlay_vars_rel(RelType) ->
cloud -> "vm.args";
edge -> "vm.args.edge"
end,
[ {enable_plugin_emqx_modules, false} %% modules is not a plugin in ce
, {enable_plugin_emqx_retainer, true}
[
{enable_plugin_emqx_retainer, true}
, {vm_args_file, VmArgs}
].
@ -256,9 +256,9 @@ relx_apps(ReleaseType) ->
, emqx_data_bridge
, emqx_rule_engine
, emqx_bridge_mqtt
, emqx_modules
]
++ [emqx_telemetry || not is_enterprise()]
++ [emqx_modules || not is_enterprise()]
++ [emqx_license || is_enterprise()]
++ [bcrypt || provide_bcrypt_release(ReleaseType)]
++ relx_apps_per_rel(ReleaseType)
@ -318,7 +318,6 @@ relx_overlay(ReleaseType) ->
, {mkdir, "data/patches"}
, {mkdir, "data/scripts"}
, {template, "data/loaded_plugins.tmpl", "data/loaded_plugins"}
, {template, "data/loaded_modules.tmpl", "data/loaded_modules"}
, {template, "data/emqx_vars", "releases/emqx_vars"}
, {template, "data/BUILT_ON", "releases/{{release_version}}/BUILT_ON"}
, {copy, "bin/emqx", "bin/emqx"}
@ -376,6 +375,7 @@ emqx_etc_overlay_common() ->
{"{{base_dir}}/lib/emqx_authz/etc/emqx_authz.conf", "etc/plugins/authz.conf"},
{"{{base_dir}}/lib/emqx_rule_engine/etc/emqx_rule_engine.conf", "etc/plugins/emqx_rule_engine.conf"},
{"{{base_dir}}/lib/emqx_bridge_mqtt/etc/emqx_bridge_mqtt.conf", "etc/plugins/emqx_bridge_mqtt.conf"},
{"{{base_dir}}/lib/emqx_modules/etc/emqx_modules.conf", "etc/plugins/emqx_modules.conf"},
%% TODO: check why it has to end with .paho
%% and why it is put to etc/plugins dir
{"{{base_dir}}/lib/emqx/etc/acl.conf.paho", "etc/plugins/acl.conf.paho"}].