diff --git a/apps/emqx/src/emqx_gen_mod.erl b/apps/emqx/src/emqx_gen_mod.erl deleted file mode 100644 index 0ebf6b59a..000000000 --- a/apps/emqx/src/emqx_gen_mod.erl +++ /dev/null @@ -1,23 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_gen_mod). - --callback(load(Opts :: any()) -> ok | {error, term()}). - --callback(unload(State :: term()) -> term()). - --callback(description() -> any()). diff --git a/apps/emqx_modules/etc/emqx_modules.conf b/apps/emqx_modules/etc/emqx_modules.conf index c90515b74..9049ca6c7 100644 --- a/apps/emqx_modules/etc/emqx_modules.conf +++ b/apps/emqx_modules/etc/emqx_modules.conf @@ -1,37 +1,32 @@ -# empty -emqx_modules: { - modules:[ +delayed: { + enable: true + max_delayed_messages: 0 +} + +recon: { + enable: true +} + +telemetry: { + enable: true +} + +presence: { + enable: true +} + +topic_metrics:{ + topics: ["topic/#"] +} + +rewrite:{ + rules: [ { - type: delayed - enable: false - }, - { - type: presence - enable: true - qos: 1 - }, - { - type: recon - enable: true - }, - { - type: rewrite - enable: false - rules:[{ - action: publish - source_topic: "x/#" - re: "^x/y/(.+)$" - dest_topic: "z/y/$1" - }] - }, - { - type: topic_metrics - enable: false - topics: ["topic/#"] - }, - { - type: telemetry - enable: false + action: publish + source_topic: "x/#" + re: "^x/y/(.+)$" + dest_topic: "z/y/$1" } ] } + diff --git a/apps/emqx_modules/src/emqx_mod_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl similarity index 87% rename from apps/emqx_modules/src/emqx_mod_delayed.erl rename to apps/emqx_modules/src/emqx_delayed.erl index 925952078..e63453b54 100644 --- a/apps/emqx_modules/src/emqx_mod_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -14,10 +14,9 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_mod_delayed). +-module(emqx_delayed). -behaviour(gen_server). --behaviour(emqx_gen_mod). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -30,12 +29,6 @@ -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). -%% emqx_gen_mod callbacks --export([ load/1 - , unload/1 - , description/0 - ]). - -export([ start_link/0 , on_message_publish/1 ]). @@ -49,10 +42,13 @@ , code_change/3 ]). --record(delayed_message, - { key - , msg - }). +%% gen_server callbacks +-export([ get_status/0 + , enable/0 + , disable/0 + ]). + +-record(delayed_message, {key, msg}). -define(TAB, ?MODULE). -define(SERVER, ?MODULE). @@ -63,7 +59,6 @@ %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- - mnesia(boot) -> ok = ekka_mnesia:create_table(?TAB, [ {type, ordered_set}, @@ -75,25 +70,8 @@ mnesia(copy) -> ok = ekka_mnesia:copy_table(?TAB, disc_copies). %%-------------------------------------------------------------------- -%% Load/Unload -%%-------------------------------------------------------------------- - --spec(load(list()) -> ok). -load(_Env) -> - emqx_mod_sup:start_child(?MODULE, worker), - emqx:hook('message.publish', {?MODULE, on_message_publish, []}). - --spec(unload(list()) -> ok). -unload(_Env) -> - emqx:unhook('message.publish', {?MODULE, on_message_publish}), - emqx_mod_sup:stop_child(?MODULE). - -description() -> - "EMQ X Delayed Publish Module". -%%-------------------------------------------------------------------- %% Hooks %%-------------------------------------------------------------------- - on_message_publish(Msg = #message{ id = Id, topic = <<"$delayed/", Topic/binary>>, @@ -124,25 +102,48 @@ on_message_publish(Msg) -> -spec(start_link() -> emqx_types:startlink_ret()). start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + Opts = emqx_config:get([delayed], #{}), + gen_server:start_link({local, ?SERVER}, ?MODULE, [Opts], []). -spec(store(#delayed_message{}) -> ok). store(DelayedMsg) -> gen_server:call(?SERVER, {store, DelayedMsg}, infinity). +get_status() -> + gen_server:call(?SERVER, get_status). + +enable() -> + gen_server:call(?SERVER, enable). + +disable() -> + gen_server:call(?SERVER, disable). + %%-------------------------------------------------------------------- %% gen_server callback %%-------------------------------------------------------------------- -init([]) -> +init([_Opts]) -> {ok, ensure_stats_event( - ensure_publish_timer(#{timer => undefined, publish_at => 0}))}. + ensure_publish_timer(#{timer => undefined, + publish_at => 0, + enabled => false}))}. handle_call({store, DelayedMsg = #delayed_message{key = Key}}, _From, State) -> ok = ekka_mnesia:dirty_write(?TAB, DelayedMsg), emqx_metrics:inc('messages.delayed'), {reply, ok, ensure_publish_timer(Key, State)}; +handle_call(enable, _From, State) -> + emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}), + {reply, ok, State}; + +handle_call(disable, _From, State) -> + emqx_hooks:del('message.publish', {?MODULE, on_message_publish}), + {reply, ok, State}; + +handle_call(get_status, _From, State = #{enabled := Enabled}) -> + {reply, Enabled, State}; + handle_call(Req, _From, State) -> ?LOG(error, "Unexpected call: ~p", [Req]), {reply, ignored, State}. diff --git a/apps/emqx_modules/src/emqx_mod_api_topic_metrics.erl b/apps/emqx_modules/src/emqx_mod_api_topic_metrics.erl deleted file mode 100644 index 2f8fbd017..000000000 --- a/apps/emqx_modules/src/emqx_mod_api_topic_metrics.erl +++ /dev/null @@ -1,207 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_mod_api_topic_metrics). - --rest_api(#{name => list_all_topic_metrics, - method => 'GET', - path => "/topic-metrics", - func => list, - descr => "A list of all topic metrics of all nodes in the cluster"}). - --rest_api(#{name => list_topic_metrics, - method => 'GET', - path => "/topic-metrics/:bin:topic", - func => list, - descr => "A list of specfied topic metrics of all nodes in the cluster"}). - --rest_api(#{name => register_topic_metrics, - method => 'POST', - path => "/topic-metrics", - func => register, - descr => "Register topic metrics"}). - --rest_api(#{name => unregister_all_topic_metrics, - method => 'DELETE', - path => "/topic-metrics", - func => unregister, - descr => "Unregister all topic metrics"}). - --rest_api(#{name => unregister_topic_metrics, - method => 'DELETE', - path => "/topic-metrics/:bin:topic", - func => unregister, - descr => "Unregister topic metrics"}). - --export([ list/2 - , register/2 - , unregister/2 - ]). - --export([ get_topic_metrics/2 - , register_topic_metrics/2 - , unregister_topic_metrics/2 - , unregister_all_topic_metrics/1 - ]). - -list(#{topic := Topic0}, _Params) -> - execute_when_enabled(fun() -> - Topic = emqx_mgmt_util:urldecode(Topic0), - case safe_validate(Topic) of - true -> - case get_topic_metrics(Topic) of - {error, Reason} -> return({error, Reason}); - Metrics -> return({ok, maps:from_list(Metrics)}) - end; - false -> - return({error, invalid_topic_name}) - end - end); - -list(_Bindings, _Params) -> - execute_when_enabled(fun() -> - case get_all_topic_metrics() of - {error, Reason} -> return({error, Reason}); - Metrics -> return({ok, Metrics}) - end - end). - -register(_Bindings, Params) -> - execute_when_enabled(fun() -> - case proplists:get_value(<<"topic">>, Params) of - undefined -> - return({error, missing_required_params}); - Topic -> - case safe_validate(Topic) of - true -> - register_topic_metrics(Topic), - return(ok); - false -> - return({error, invalid_topic_name}) - end - end - end). - -unregister(Bindings, _Params) when map_size(Bindings) =:= 0 -> - execute_when_enabled(fun() -> - unregister_all_topic_metrics(), - return(ok) - end); - -unregister(#{topic := Topic0}, _Params) -> - execute_when_enabled(fun() -> - Topic = emqx_mgmt_util:urldecode(Topic0), - case safe_validate(Topic) of - true -> - unregister_topic_metrics(Topic), - return(ok); - false -> - return({error, invalid_topic_name}) - end - end). - -execute_when_enabled(Fun) -> - case emqx_modules:find_module(topic_metrics) of - true -> - Fun(); - false -> - return({error, module_not_loaded}) - end. - -safe_validate(Topic) -> - try emqx_topic:validate(name, Topic) of - true -> true - catch - error:_Error -> - false - end. - -get_all_topic_metrics() -> - lists:foldl(fun(Topic, Acc) -> - case get_topic_metrics(Topic) of - {error, _Reason} -> - Acc; - Metrics -> - [#{topic => Topic, metrics => Metrics} | Acc] - end - end, [], emqx_mod_topic_metrics:all_registered_topics()). - -get_topic_metrics(Topic) -> - lists:foldl(fun(Node, Acc) -> - case get_topic_metrics(Node, Topic) of - {error, _Reason} -> - Acc; - Metrics -> - case Acc of - [] -> Metrics; - _ -> - lists:foldl(fun({K, V}, Acc0) -> - [{K, V + proplists:get_value(K, Metrics, 0)} | Acc0] - end, [], Acc) - end - end - end, [], ekka_mnesia:running_nodes()). - -get_topic_metrics(Node, Topic) when Node =:= node() -> - emqx_mod_topic_metrics:metrics(Topic); -get_topic_metrics(Node, Topic) -> - rpc_call(Node, get_topic_metrics, [Node, Topic]). - -register_topic_metrics(Topic) -> - Results = [register_topic_metrics(Node, Topic) || Node <- ekka_mnesia:running_nodes()], - case lists:any(fun(Item) -> Item =:= ok end, Results) of - true -> ok; - false -> lists:last(Results) - end. - -register_topic_metrics(Node, Topic) when Node =:= node() -> - emqx_mod_topic_metrics:register(Topic); -register_topic_metrics(Node, Topic) -> - rpc_call(Node, register_topic_metrics, [Node, Topic]). - -unregister_topic_metrics(Topic) -> - Results = [unregister_topic_metrics(Node, Topic) || Node <- ekka_mnesia:running_nodes()], - case lists:any(fun(Item) -> Item =:= ok end, Results) of - true -> ok; - false -> lists:last(Results) - end. - -unregister_topic_metrics(Node, Topic) when Node =:= node() -> - emqx_mod_topic_metrics:unregister(Topic); -unregister_topic_metrics(Node, Topic) -> - rpc_call(Node, unregister_topic_metrics, [Node, Topic]). - -unregister_all_topic_metrics() -> - Results = [unregister_all_topic_metrics(Node) || Node <- ekka_mnesia:running_nodes()], - case lists:any(fun(Item) -> Item =:= ok end, Results) of - true -> ok; - false -> lists:last(Results) - end. - -unregister_all_topic_metrics(Node) when Node =:= node() -> - emqx_mod_topic_metrics:unregister_all(); -unregister_all_topic_metrics(Node) -> - rpc_call(Node, unregister_topic_metrics, [Node]). - -rpc_call(Node, Fun, Args) -> - case rpc:call(Node, ?MODULE, Fun, Args) of - {badrpc, Reason} -> {error, Reason}; - Res -> Res - end. - -return(_) -> -%% TODO: V5 API - ok. diff --git a/apps/emqx_modules/src/emqx_mod_sup.erl b/apps/emqx_modules/src/emqx_mod_sup.erl deleted file mode 100644 index 6cb2cf722..000000000 --- a/apps/emqx_modules/src/emqx_mod_sup.erl +++ /dev/null @@ -1,79 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_mod_sup). - --behaviour(supervisor). - --include_lib("emqx/include/types.hrl"). - --export([ start_link/0 - , start_child/1 - , start_child/2 - , stop_child/1 - ]). - --export([init/1]). - -%% Helper macro for declaring children of supervisor --define(CHILD(Mod, Type), #{id => Mod, - start => {Mod, start_link, []}, - restart => permanent, - shutdown => 5000, - type => Type, - modules => [Mod]}). - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - --spec start_child(supervisor:child_spec()) -> ok. -start_child(ChildSpec) when is_map(ChildSpec) -> - assert_started(supervisor:start_child(?MODULE, ChildSpec)). - --spec start_child(atom(), atom()) -> ok. -start_child(Mod, Type) when is_atom(Mod) andalso is_atom(Type) -> - assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, Type))). - --spec(stop_child(any()) -> ok | {error, term()}). -stop_child(ChildId) -> - case supervisor:terminate_child(?MODULE, ChildId) of - ok -> supervisor:delete_child(?MODULE, ChildId); - Error -> Error - end. - -%%-------------------------------------------------------------------- -%% Supervisor callbacks -%%-------------------------------------------------------------------- - -init([]) -> - Env = [], - {ok, {{one_for_one, 10, 3600}, - [#{id => telemetry, - start => {emqx_mod_telemetry, start_link, [Env]}, - restart => permanent, - shutdown => 5000, - type => worker, - modules => [emqx_mod_telemetry]}]}}. - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -assert_started({ok, _Pid}) -> ok; -assert_started({ok, _Pid, _Info}) -> ok; -assert_started({error, {already_tarted, _Pid}}) -> ok; -assert_started({error, Reason}) -> erlang:error(Reason). - diff --git a/apps/emqx_modules/src/emqx_modules.app.src b/apps/emqx_modules/src/emqx_modules.app.src index 5d251abc9..155f581c0 100644 --- a/apps/emqx_modules/src/emqx_modules.app.src +++ b/apps/emqx_modules/src/emqx_modules.app.src @@ -1,9 +1,9 @@ {application, emqx_modules, - [{description, "EMQ X Module Management"}, + [{description, "EMQ X Modules"}, {vsn, "5.0.0"}, {modules, []}, {applications, [kernel,stdlib]}, {mod, {emqx_modules_app, []}}, - {registered, [emqx_mod_sup]}, + {registered, [emqx_modules_sup]}, {env, []} ]}. diff --git a/apps/emqx_modules/src/emqx_modules.erl b/apps/emqx_modules/src/emqx_modules.erl deleted file mode 100644 index e27f3afc2..000000000 --- a/apps/emqx_modules/src/emqx_modules.erl +++ /dev/null @@ -1,154 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_modules). - --include_lib("emqx/include/logger.hrl"). - --logger_header("[Modules]"). - --export([ list/0 - , load/0 - , load/2 - , unload/0 - , unload/1 - , reload/1 - , find_module/1 - ]). - --export([cli/1]). - -%% @doc List all available plugins --spec(list() -> [{atom(), boolean()}]). -list() -> - persistent_term:get(?MODULE, []). - -%% @doc Load all the extended modules. --spec(load() -> ok). -load() -> - Modules = emqx_config:get([emqx_modules, modules], []), - lists:foreach(fun(#{type := Module, enable := Enable} = Config) -> - case Enable of - true -> - load(name(Module), maps:without([type, enable], Config)); - false -> - ok - end - end, Modules). - -load(Module, Env) -> - ModuleName = name(Module), - case find_module(ModuleName) of - false -> - load_mod(ModuleName, Env); - true -> - ?LOG(notice, "Module ~s is already started", [ModuleName]), - {error, already_started} - end. - -%% @doc Unload all the extended modules. --spec(unload() -> ok). -unload() -> - Modules = emqx_config:get([emqx_modules, modules], []), - lists:foreach(fun(#{type := Module, enable := Enable}) -> - case Enable of - true -> - unload_mod(name(Module)); - false -> - ok - end - end, Modules). - -unload(ModuleName) -> - case find_module(ModuleName) of - false -> - ?LOG(alert, "Module ~s not found, cannot load it", [ModuleName]), - {error, not_started}; - true -> - unload_mod(ModuleName) - end. - --spec(reload(module()) -> ok | ignore | {error, any()}). -reload(_) -> - ignore. - -find_module(ModuleName) -> - lists:member(ModuleName, persistent_term:get(?MODULE, [])). - -load_mod(ModuleName, Env) -> - case ModuleName:load(Env) of - ok -> - Modules = persistent_term:get(?MODULE, []), - persistent_term:put(?MODULE, [ModuleName| Modules]), - ?LOG(info, "Load ~s module successfully.", [ModuleName]); - {error, Error} -> - ?LOG(error, "Load module ~s failed, cannot load for ~0p", [ModuleName, Error]), - {error, Error} - end. - -unload_mod(ModuleName) -> - case ModuleName:unload(#{}) of - ok -> - Modules = persistent_term:get(?MODULE, []), - persistent_term:put(?MODULE, Modules -- [ModuleName]), - ?LOG(info, "Unload ~s module successfully.", [ModuleName]); - {error, Error} -> - ?LOG(error, "Unload module ~s failed, cannot unload for ~0p", [ModuleName, Error]) - end. - -%%-------------------------------------------------------------------- -%% @doc Modules Command -cli(["list"]) -> - lists:foreach(fun(Name) -> - emqx_ctl:print("Module(~s, description=~s)~n", - [Name, Name:description()]) - end, emqx_modules:list()); - -cli(["load", Name]) -> - case emqx_modules:load(list_to_atom(Name), #{}) of - ok -> - emqx_ctl:print("Module ~s loaded successfully.~n", [Name]); - {error, Reason} -> - emqx_ctl:print("Load module ~s error: ~p.~n", [Name, Reason]) - end; - -cli(["unload", Name]) -> - case emqx_modules:unload(list_to_atom(Name)) of - ok -> - emqx_ctl:print("Module ~s unloaded successfully.~n", [Name]); - {error, Reason} -> - emqx_ctl:print("Unload module ~s error: ~p.~n", [Name, Reason]) - end; - -cli(["reload", Name]) -> - emqx_ctl:print("Module: ~p does not need to be reloaded.~n", [Name]); - -cli(_) -> - emqx_ctl:usage([{"modules list", "Show loaded modules"}, - {"modules load ", "Load module"}, - {"modules unload ", "Unload module"}, - {"modules reload ", "Reload module"} - ]). - -name(Name) when is_binary(Name) -> - name(binary_to_atom(Name, utf8)); -name(delayed) -> emqx_mod_delayed; -name(presence) -> emqx_mod_presence; -name(recon) -> emqx_mod_recon; -name(rewrite) -> emqx_mod_rewrite; -name(topic_metrics) -> emqx_mod_topic_metrics; -name(telemetry) -> emqx_mod_telemetry; -name(Name) -> Name. diff --git a/apps/emqx_modules/src/emqx_modules_api.erl b/apps/emqx_modules/src/emqx_modules_api.erl deleted file mode 100644 index 99a3b89f9..000000000 --- a/apps/emqx_modules/src/emqx_modules_api.erl +++ /dev/null @@ -1,171 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_modules_api). - --rest_api(#{name => list_all_modules, - method => 'GET', - path => "/modules/", - func => list, - descr => "List all modules in the cluster"}). - --rest_api(#{name => list_node_modules, - method => 'GET', - path => "/nodes/:atom:node/modules/", - func => list, - descr => "List all modules on a node"}). - --rest_api(#{name => load_node_module, - method => 'PUT', - path => "/nodes/:atom:node/modules/:atom:module/load", - func => load, - descr => "Load a module"}). - --rest_api(#{name => unload_node_module, - method => 'PUT', - path => "/nodes/:atom:node/modules/:atom:module/unload", - func => unload, - descr => "Unload a module"}). - --rest_api(#{name => reload_node_module, - method => 'PUT', - path => "/nodes/:atom:node/modules/:atom:module/reload", - func => reload, - descr => "Reload a module"}). - --rest_api(#{name => load_module, - method => 'PUT', - path => "/modules/:atom:module/load", - func => load, - descr => "load a module in the cluster"}). - --rest_api(#{name => unload_module, - method => 'PUT', - path => "/modules/:atom:module/unload", - func => unload, - descr => "Unload a module in the cluster"}). - --rest_api(#{name => reload_module, - method => 'PUT', - path => "/modules/:atom:module/reload", - func => reload, - descr => "Reload a module in the cluster"}). - --export([ list/2 - , list_modules/1 - , load/2 - , unload/2 - , reload/2 - ]). - --export([ do_load_module/3 - , do_unload_module/2 - ]). - -list(#{node := Node}, _Params) -> - return({ok, [format(Module) || Module <- list_modules(Node)]}); - -list(_Bindings, _Params) -> - return({ok, [format(Node, Modules) || {Node, Modules} <- list_modules()]}). - -load(#{node := Node, module := Module}, Params) -> - return(do_load_module(Node, Module, Params)); - -load(#{module := Module}, Params) -> - Results = [do_load_module(Node, Module, Params) || Node <- ekka_mnesia:running_nodes()], - case lists:filter(fun(Item) -> Item =/= ok end, Results) of - [] -> - return(ok); - Errors -> - return(lists:last(Errors)) - end. - -unload(#{node := Node, module := Module}, _Params) -> - return(do_unload_module(Node, Module)); - -unload(#{module := Module}, _Params) -> - Results = [do_unload_module(Node, Module) || Node <- ekka_mnesia:running_nodes()], - case lists:filter(fun(Item) -> Item =/= ok end, Results) of - [] -> - return(ok); - Errors -> - return(lists:last(Errors)) - end. - -reload(#{node := Node, module := Module}, _Params) -> - case reload_module(Node, Module) of - ignore -> return(ok); - Result -> return(Result) - end; - -reload(#{module := Module}, _Params) -> - Results = [reload_module(Node, Module) || Node <- ekka_mnesia:running_nodes()], - case lists:filter(fun(Item) -> Item =/= ok end, Results) of - [] -> - return(ok); - Errors -> - return(lists:last(Errors)) - end. - -%%------------------------------------------------------------------------------ -%% Internal Functions -%%------------------------------------------------------------------------------ - -format(Node, Modules) -> - #{node => Node, modules => [format(Module) || Module <- Modules]}. - -format(Name) -> - #{name => name(Name), - description => iolist_to_binary(Name:description())}. - -list_modules() -> - [{Node, list_modules(Node)} || Node <- ekka_mnesia:running_nodes()]. - -list_modules(Node) when Node =:= node() -> - emqx_modules:list(); -list_modules(Node) -> - rpc_call(Node, list_modules, [Node]). - -do_load_module(Node, Module, Env) when Node =:= node() -> - emqx_modules:load(Module, Env); -do_load_module(Node, Module, Env) -> - rpc_call(Node, do_load_module, [Node, Module, Env]). - -do_unload_module(Node, Module) when Node =:= node() -> - emqx_modules:unload(Module); -do_unload_module(Node, Module) -> - rpc_call(Node, do_unload_module, [Node, Module]). - -reload_module(Node, Module) when Node =:= node() -> - emqx_modules:reload(Module); -reload_module(Node, Module) -> - rpc_call(Node, reload_module, [Node, Module]). - -rpc_call(Node, Fun, Args) -> - case rpc:call(Node, ?MODULE, Fun, Args) of - {badrpc, Reason} -> {error, Reason}; - Res -> Res - end. - -name(emqx_mod_delayed) -> delayed; -name(emqx_mod_presence) -> presence; -name(emqx_mod_recon) -> recon; -name(emqx_mod_rewrite) -> rewrite; -name(emqx_mod_topic_metrics) -> topic_metrics. - -return(_) -> -%% TODO: V5 API - ok. diff --git a/apps/emqx_modules/src/emqx_modules_app.erl b/apps/emqx_modules/src/emqx_modules_app.erl index 33f18459e..2b0ee482d 100644 --- a/apps/emqx_modules/src/emqx_modules_app.erl +++ b/apps/emqx_modules/src/emqx_modules_app.erl @@ -23,11 +23,26 @@ -export([stop/1]). start(_Type, _Args) -> - {ok, Pid} = emqx_mod_sup:start_link(), - ok = emqx_modules:load(), - emqx_ctl:register_command(modules, {emqx_modules, cli}, []), - {ok, Pid}. + {ok, Sup} = emqx_modules_sup:start_link(), + maybe_enable_modules(), + {ok, Sup}. stop(_State) -> - emqx_ctl:unregister_command(modules), - emqx_modules:unload(). + maybe_disable_modules(), + ok. + +maybe_enable_modules() -> + emqx_config:get([delayed, enable], true) andalso emqx_delayed:enable(), + emqx_config:get([presence, enable], true) andalso emqx_presence:enable(), + emqx_config:get([telemetry, enable], true) andalso emqx_telemetry:enable(), + emqx_config:get([recon, enable], true) andalso emqx_recon:enable(), + emqx_rewrite:enable(), + emqx_topic_metrics:enable(). + +maybe_disable_modules() -> + emqx_config:get([delayed, enable], true) andalso emqx_delayed:disable(), + emqx_config:get([presence, enable], true) andalso emqx_presence:disable(), + emqx_config:get([telemetry, enable], true) andalso emqx_telemetry:disable(), + emqx_config:get([recon, enable], true) andalso emqx_recon:disable(), + emqx_rewrite:disable(), + emqx_topic_metrics:disable(). diff --git a/apps/emqx_modules/src/emqx_modules_schema.erl b/apps/emqx_modules/src/emqx_modules_schema.erl index 55050987e..e7b8742e4 100644 --- a/apps/emqx_modules/src/emqx_modules_schema.erl +++ b/apps/emqx_modules/src/emqx_modules_schema.erl @@ -23,40 +23,31 @@ -export([ structs/0 , fields/1]). -structs() -> ["emqx_modules"]. +structs() -> + ["delayed", + "recon", + "telemetry", + "presence", + "rewrite", + "topic_metrics"]. -fields("emqx_modules") -> - [{modules, hoconsc:array(hoconsc:union([ hoconsc:ref(?MODULE, "common") - , hoconsc:ref(?MODULE, "presence") - , hoconsc:ref(?MODULE, "rewrite") - , hoconsc:ref(?MODULE, "topic_metrics") - , hoconsc:ref(?MODULE, "telemetry") - ]))}]; -fields("common") -> - [ {type, hoconsc:enum([delayed, recon])} - , {enable, emqx_schema:t(boolean(), undefined, false)} +fields(Name) when Name =:= "recon"; + Name =:= "telemetry"; + Name =:= "presence" -> + [ {enable, emqx_schema:t(boolean(), undefined, false)} ]; -fields("presence") -> - [ {type, hoconsc:enum([presence])} - , {enable, emqx_schema:t(boolean(), undefined, false)} - , {qos, emqx_schema:t(integer(), undefined, 1)} +fields("delayed") -> + [ {enable, emqx_schema:t(boolean(), undefined, false)} + , {max_delayed_messages, emqx_schema:t(integer())} ]; + fields("rewrite") -> - [ {type, hoconsc:enum([rewrite])} - , {enable, emqx_schema:t(boolean(), undefined, false)} - , {rules, hoconsc:array(hoconsc:ref(?MODULE, "rules"))} + [ {rules, hoconsc:array(hoconsc:ref(?MODULE, "rules"))} ]; fields("topic_metrics") -> - [ {type, hoconsc:enum([topic_metrics])} - , {enable, emqx_schema:t(boolean(), undefined, false)} - , {topics, hoconsc:array(binary())} - ]; - -fields("telemetry") -> - [ {type, hoconsc:enum([telemetry])} - , {enable, emqx_schema:t(boolean(), undefined, false)} + [ {topics, hoconsc:array(binary())} ]; fields("rules") -> diff --git a/apps/emqx_modules/src/emqx_modules_sup.erl b/apps/emqx_modules/src/emqx_modules_sup.erl new file mode 100644 index 000000000..570082896 --- /dev/null +++ b/apps/emqx_modules/src/emqx_modules_sup.erl @@ -0,0 +1,43 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_modules_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +%% Helper macro for declaring children of supervisor +-define(CHILD(Mod), #{id => Mod, + start => {Mod, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [Mod]}). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%%-------------------------------------------------------------------- +%% Supervisor callbacks +%%-------------------------------------------------------------------- +init([]) -> + {ok, {{one_for_one, 10, 3600}, + [?CHILD(emqx_telemetry), + ?CHILD(emqx_topic_metrics), + ?CHILD(emqx_delayed)]}}. diff --git a/apps/emqx_modules/src/emqx_mod_presence.erl b/apps/emqx_modules/src/emqx_presence.erl similarity index 86% rename from apps/emqx_modules/src/emqx_mod_presence.erl rename to apps/emqx_modules/src/emqx_presence.erl index 729316663..c85d951e1 100644 --- a/apps/emqx_modules/src/emqx_mod_presence.erl +++ b/apps/emqx_modules/src/emqx_presence.erl @@ -14,55 +14,49 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_mod_presence). - --behaviour(emqx_gen_mod). +-module(emqx_presence). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -logger_header("[Presence]"). -%% emqx_gen_mod callbacks --export([ load/1 - , unload/1 - , description/0 +-export([ enable/0 + , disable/0 ]). --export([ on_client_connected/3 - , on_client_disconnected/4 +-export([ on_client_connected/2 + , on_client_disconnected/3 ]). -ifdef(TEST). -export([reason/1]). -endif. -load(Env) -> - emqx_hooks:put('client.connected', {?MODULE, on_client_connected, [Env]}), - emqx_hooks:put('client.disconnected', {?MODULE, on_client_disconnected, [Env]}). +enable() -> + emqx_hooks:put('client.connected', {?MODULE, on_client_connected, []}), + emqx_hooks:put('client.disconnected', {?MODULE, on_client_disconnected, []}). -unload(_Env) -> +disable() -> emqx_hooks:del('client.connected', {?MODULE, on_client_connected}), emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected}). -description() -> - "EMQ X Presence Module". %%-------------------------------------------------------------------- %% Callbacks %%-------------------------------------------------------------------- -on_client_connected(ClientInfo = #{clientid := ClientId}, ConnInfo, Env) -> +on_client_connected(ClientInfo = #{clientid := ClientId}, ConnInfo) -> Presence = connected_presence(ClientInfo, ConnInfo), case emqx_json:safe_encode(Presence) of {ok, Payload} -> emqx_broker:safe_publish( - make_msg(qos(Env), topic(connected, ClientId), Payload)); + make_msg(topic(connected, ClientId), Payload)); {error, _Reason} -> ?LOG(error, "Failed to encode 'connected' presence: ~p", [Presence]) end. on_client_disconnected(_ClientInfo = #{clientid := ClientId, username := Username}, - Reason, _ConnInfo = #{disconnected_at := DisconnectedAt}, Env) -> + Reason, _ConnInfo = #{disconnected_at := DisconnectedAt}) -> Presence = #{clientid => ClientId, username => Username, reason => reason(Reason), @@ -72,7 +66,7 @@ on_client_disconnected(_ClientInfo = #{clientid := ClientId, username := Usernam case emqx_json:safe_encode(Presence) of {ok, Payload} -> emqx_broker:safe_publish( - make_msg(qos(Env), topic(disconnected, ClientId), Payload)); + make_msg(topic(disconnected, ClientId), Payload)); {error, _Reason} -> ?LOG(error, "Failed to encode 'disconnected' presence: ~p", [Presence]) end. @@ -107,18 +101,16 @@ connected_presence(#{peerhost := PeerHost, ts => erlang:system_time(millisecond) }. -make_msg(QoS, Topic, Payload) -> +make_msg(Topic, Payload) -> emqx_message:set_flag( sys, emqx_message:make( - ?MODULE, QoS, Topic, iolist_to_binary(Payload))). + ?MODULE, 0, Topic, iolist_to_binary(Payload))). topic(connected, ClientId) -> emqx_topic:systop(iolist_to_binary(["clients/", ClientId, "/connected"])); topic(disconnected, ClientId) -> emqx_topic:systop(iolist_to_binary(["clients/", ClientId, "/disconnected"])). -qos(Env) -> maps:get(qos, Env, 0). - -compile({inline, [reason/1]}). reason(Reason) when is_atom(Reason) -> Reason; reason({shutdown, Reason}) when is_atom(Reason) -> Reason; diff --git a/apps/emqx_modules/src/emqx_mod_recon.erl b/apps/emqx_modules/src/emqx_recon.erl similarity index 89% rename from apps/emqx_modules/src/emqx_mod_recon.erl rename to apps/emqx_modules/src/emqx_recon.erl index 38d046b10..b16d1b051 100644 --- a/apps/emqx_modules/src/emqx_mod_recon.erl +++ b/apps/emqx_modules/src/emqx_recon.erl @@ -14,37 +14,24 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_mod_recon). +-module(emqx_recon). --behaviour(emqx_gen_mod). - -%% emqx_gen_mod callbacks --export([ load/1 - , unload/1 - , description/0 +-export([ enable/0 + , disable/0 ]). -export([cmd/1]). %%-------------------------------------------------------------------- -%% Load/Unload +%% enable/disable %%-------------------------------------------------------------------- - --spec(load(list()) -> ok). -load(_Env) -> - load(). - --spec(unload(list()) -> ok). -unload(_Env) -> - unload(). - -description() -> - "EMQ X Recon Module". - -load() -> +enable() -> emqx_ctl:register_command(recon, {?MODULE, cmd}, []). +disable() -> + emqx_ctl:unregister_command(recon). + cmd(["memory"]) -> Print = fun(Key, Keyword) -> emqx_ctl:print("~-20s: ~w~n", [concat(Key, Keyword), recon_alloc:memory(Key, Keyword)]) @@ -76,9 +63,6 @@ cmd(_) -> {"recon remote_load Mod", "recon:remote_load(Mod)"}, {"recon proc_count Attr N","recon:proc_count(Attr, N)"}]). -unload() -> - emqx_ctl:unregister_command(recon). - concat(Key, Keyword) -> lists:concat([atom_to_list(Key), "/", atom_to_list(Keyword)]). diff --git a/apps/emqx_modules/src/emqx_mod_rewrite.erl b/apps/emqx_modules/src/emqx_rewrite.erl similarity index 85% rename from apps/emqx_modules/src/emqx_mod_rewrite.erl rename to apps/emqx_modules/src/emqx_rewrite.erl index e1ee6e1b6..20e51ae16 100644 --- a/apps/emqx_modules/src/emqx_mod_rewrite.erl +++ b/apps/emqx_modules/src/emqx_rewrite.erl @@ -14,9 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_mod_rewrite). - --behaviour(emqx_gen_mod). +-module(emqx_rewrite). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). @@ -33,21 +31,29 @@ , rewrite_publish/2 ]). -%% emqx_gen_mod callbacks --export([ load/1 - , unload/1 - , description/0 +-export([ enable/0 + , disable/0 ]). %%-------------------------------------------------------------------- %% Load/Unload %%-------------------------------------------------------------------- -load(Env) -> - {PubRules, SubRules} = compile(maps:get(rules, Env, [])), - emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}), - emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}), - emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}). +enable() -> + Rules = emqx_config:get([rewrite, rules], []), + case Rules =:= [] of + true -> ok; + false -> + {PubRules, SubRules} = compile(Rules), + emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}), + emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}), + emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}) + end. + +disable() -> + emqx_hooks:del('client.subscribe', {?MODULE, rewrite_subscribe}), + emqx_hooks:del('client.unsubscribe', {?MODULE, rewrite_unsubscribe}), + emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}). rewrite_subscribe(_ClientInfo, _Properties, TopicFilters, Rules) -> {ok, [{match_and_rewrite(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}. @@ -58,13 +64,6 @@ rewrite_unsubscribe(_ClientInfo, _Properties, TopicFilters, Rules) -> rewrite_publish(Message = #message{topic = Topic}, Rules) -> {ok, Message#message{topic = match_and_rewrite(Topic, Rules)}}. -unload(_) -> - emqx_hooks:del('client.subscribe', {?MODULE, rewrite_subscribe}), - emqx_hooks:del('client.unsubscribe', {?MODULE, rewrite_unsubscribe}), - emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}). - -description() -> - "EMQ X Topic Rewrite Module". %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- diff --git a/apps/emqx_modules/src/emqx_mod_telemetry.erl b/apps/emqx_modules/src/emqx_telemetry.erl similarity index 97% rename from apps/emqx_modules/src/emqx_mod_telemetry.erl rename to apps/emqx_modules/src/emqx_telemetry.erl index 21082285d..50fc6a243 100644 --- a/apps/emqx_modules/src/emqx_mod_telemetry.erl +++ b/apps/emqx_modules/src/emqx_telemetry.erl @@ -14,12 +14,10 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_mod_telemetry). +-module(emqx_telemetry). -behaviour(gen_server). --behaviour(emqx_gen_mod). - -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -34,7 +32,7 @@ -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). --export([ start_link/1 +-export([ start_link/0 , stop/0 ]). @@ -48,10 +46,8 @@ , code_change/3 ]). -%% emqx_gen_mod callbacks --export([ load/1 - , unload/1 - , description/0 +-export([ enable/0 + , disable/0 ]). -export([ get_status/0 @@ -111,16 +107,17 @@ mnesia(copy) -> %% API %%-------------------------------------------------------------------- -start_link(Opts) -> +start_link() -> + Opts = emqx_config:get([telemetry], #{}), gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). stop() -> gen_server:stop(?MODULE). -load(_Env) -> +enable() -> gen_server:call(?MODULE, enable). -unload(_Env) -> +disable() -> gen_server:call(?MODULE, disable). get_status() -> @@ -132,9 +129,6 @@ get_uuid() -> get_telemetry() -> gen_server:call(?MODULE, get_telemetry). -description() -> - "". - %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- @@ -300,7 +294,8 @@ active_plugins() -> end, [], emqx_plugins:list()). active_modules() -> - emqx_modules:list(). + []. + % emqx_modules:list(). num_clients() -> emqx_stats:getstat('connections.max'). diff --git a/apps/emqx_modules/src/emqx_mod_telemetry_api.erl b/apps/emqx_modules/src/emqx_telemetry_api.erl similarity index 95% rename from apps/emqx_modules/src/emqx_mod_telemetry_api.erl rename to apps/emqx_modules/src/emqx_telemetry_api.erl index ab0d040f7..af5f40b02 100644 --- a/apps/emqx_modules/src/emqx_mod_telemetry_api.erl +++ b/apps/emqx_modules/src/emqx_telemetry_api.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_mod_telemetry_api). +-module(emqx_telemetry_api). -behavior(minirest_api). @@ -149,7 +149,7 @@ status(put, Request) -> {ok, Body, _} = cowboy_req:read_body(Request), Params = emqx_json:decode(Body, [return_maps]), Enable = maps:get(<<"enable">>, Params), - case Enable =:= emqx_mod_telemetry:get_status() of + case Enable =:= emqx_telemetry:get_status() of true -> Reason = case Enable of true -> <<"Telemetry status is already enabled">>; @@ -168,7 +168,7 @@ data(get, _Request) -> %%-------------------------------------------------------------------- % cli(["enable", Enable0]) -> % Enable = list_to_atom(Enable0), -% case Enable =:= emqx_mod_telemetry:is_enabled() of +% case Enable =:= emqx_telemetry:is_enabled() of % true -> % case Enable of % true -> emqx_ctl:print("Telemetry status is already enabled~n"); @@ -215,18 +215,18 @@ enable_telemetry(Enable) -> enable_telemetry(Node, Enable) when Node =:= node() -> case Enable of true -> - emqx_mod_telemetry:load(#{}); + emqx_telemetry:enable(); false -> - emqx_mod_telemetry:unload(#{}) + emqx_telemetry:disable() end; enable_telemetry(Node, Enable) -> rpc_call(Node, ?MODULE, enable_telemetry, [Node, Enable]). get_telemetry_status() -> - #{enabled => emqx_mod_telemetry:get_status()}. + #{enabled => emqx_telemetry:get_status()}. get_telemetry_data() -> - {ok, TelemetryData} = emqx_mod_telemetry:get_telemetry(), + {ok, TelemetryData} = emqx_telemetry:get_telemetry(), TelemetryData. rpc_call(Node, Module, Fun, Args) -> diff --git a/apps/emqx_modules/src/emqx_mod_topic_metrics.erl b/apps/emqx_modules/src/emqx_topic_metrics.erl similarity index 94% rename from apps/emqx_modules/src/emqx_mod_topic_metrics.erl rename to apps/emqx_modules/src/emqx_topic_metrics.erl index 0196b9b2a..7297878fe 100644 --- a/apps/emqx_modules/src/emqx_mod_topic_metrics.erl +++ b/apps/emqx_modules/src/emqx_topic_metrics.erl @@ -14,10 +14,9 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_mod_topic_metrics). +-module(emqx_topic_metrics). -behaviour(gen_server). --behaviour(emqx_gen_mod). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -25,11 +24,6 @@ -logger_header("[TOPIC_METRICS]"). --export([ load/1 - , unload/1 - , description/0 - ]). - -export([ on_message_publish/1 , on_message_delivered/2 , on_message_dropped/3 @@ -40,11 +34,11 @@ , stop/0 ]). --export([ inc/2 - , inc/3 - , val/2 - , rate/2 - , metrics/1 +-export([ enable/0 + , disable/0 + ]). + +-export([ metrics/1 , register/1 , unregister/1 , unregister_all/0 @@ -52,9 +46,6 @@ , all_registered_topics/0 ]). -%% stats. --export([ rates/2 ]). - %% gen_server callbacks -export([ init/1 , handle_call/3 @@ -63,7 +54,10 @@ , terminate/2 ]). --define(CRefID(Topic), {?MODULE, Topic}). +-ifdef(TEST). +-compile(export_all). +-compile(nowarn_export_all). +-endif. -define(MAX_TOPICS, 512). -define(TAB, ?MODULE). @@ -99,21 +93,15 @@ %%------------------------------------------------------------------------------ %% APIs %%------------------------------------------------------------------------------ - -load(_Env) -> - emqx_mod_sup:start_child(?MODULE, worker), +enable() -> emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}), emqx_hooks:put('message.dropped', {?MODULE, on_message_dropped, []}), emqx_hooks:put('message.delivered', {?MODULE, on_message_delivered, []}). -unload(_Env) -> +disable() -> emqx_hooks:del('message.publish', {?MODULE, on_message_publish}), emqx_hooks:del('message.dropped', {?MODULE, on_message_dropped}), - emqx_hooks:del('message.delivered', {?MODULE, on_message_delivered}), - emqx_mod_sup:stop_child(?MODULE). - -description() -> - "EMQ X Topic Metrics Module". + emqx_hooks:del('message.delivered', {?MODULE, on_message_delivered}). on_message_publish(#message{topic = Topic, qos = QoS}) -> case is_registered(Topic) of @@ -150,55 +138,12 @@ on_message_dropped(#message{topic = Topic}, _, _) -> end. start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + Opts = emqx_config:get([topic_metrics], #{}), + gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). stop() -> gen_server:stop(?MODULE). -try_inc(Topic, Metric) -> - _ = inc(Topic, Metric), - ok. - -inc(Topic, Metric) -> - inc(Topic, Metric, 1). - -inc(Topic, Metric, Val) -> - case get_counters(Topic) of - {error, topic_not_found} -> - {error, topic_not_found}; - CRef -> - case metric_idx(Metric) of - {error, invalid_metric} -> - {error, invalid_metric}; - Idx -> - counters:add(CRef, Idx, Val) - end - end. - -val(Topic, Metric) -> - case ets:lookup(?TAB, Topic) of - [] -> - {error, topic_not_found}; - [{Topic, CRef}] -> - case metric_idx(Metric) of - {error, invalid_metric} -> - {error, invalid_metric}; - Idx -> - counters:get(CRef, Idx) - end - end. - -rate(Topic, Metric) -> - case rates(Topic, Metric) of - #{short := Last} -> - Last; - {error, Reason} -> - {error, Reason} - end. - -rates(Topic, Metric) -> - gen_server:call(?MODULE, {get_rates, Topic, Metric}). - metrics(Topic) -> case ets:lookup(?TAB, Topic) of [] -> @@ -229,7 +174,7 @@ all_registered_topics() -> %% gen_server callbacks %%-------------------------------------------------------------------- -init([]) -> +init([_Opts]) -> erlang:process_flag(trap_exit, true), ok = emqx_tables:new(?TAB, [{read_concurrency, true}]), erlang:send_after(timer:seconds(?TICKING_INTERVAL), self(), ticking), @@ -279,7 +224,7 @@ handle_call({get_rates, Topic, Metric}, _From, State = #state{speeds = Speeds}) undefined -> {reply, {error, invalid_metric}, State}; #speed{last = Short, last_medium = Medium, last_long = Long} -> - {reply, #{ short => Short, medium => Medium, long => Long }, State} + {reply, #{short => Short, medium => Medium, long => Long }, State} end end. @@ -309,6 +254,47 @@ terminate(_Reason, _State) -> %% Internal Functions %%------------------------------------------------------------------------------ +try_inc(Topic, Metric) -> + _ = inc(Topic, Metric), + ok. + +inc(Topic, Metric) -> + inc(Topic, Metric, 1). + +inc(Topic, Metric, Val) -> + case get_counters(Topic) of + {error, topic_not_found} -> + {error, topic_not_found}; + CRef -> + case metric_idx(Metric) of + {error, invalid_metric} -> + {error, invalid_metric}; + Idx -> + counters:add(CRef, Idx, Val) + end + end. + +val(Topic, Metric) -> + case ets:lookup(?TAB, Topic) of + [] -> + {error, topic_not_found}; + [{Topic, CRef}] -> + case metric_idx(Metric) of + {error, invalid_metric} -> + {error, invalid_metric}; + Idx -> + counters:get(CRef, Idx) + end + end. + +rate(Topic, Metric) -> + case gen_server:call(?MODULE, {get_rates, Topic, Metric}) of + #{short := Last} -> + Last; + {error, Reason} -> + {error, Reason} + end. + metric_idx('messages.in') -> 01; metric_idx('messages.out') -> 02; metric_idx('messages.qos0.in') -> 03; diff --git a/apps/emqx_modules/src/emqx_topic_metrics_api.erl b/apps/emqx_modules/src/emqx_topic_metrics_api.erl new file mode 100644 index 000000000..1a2365703 --- /dev/null +++ b/apps/emqx_modules/src/emqx_topic_metrics_api.erl @@ -0,0 +1,207 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_topic_metrics_api). + +% -rest_api(#{name => list_all_topic_metrics, +% method => 'GET', +% path => "/topic-metrics", +% func => list, +% descr => "A list of all topic metrics of all nodes in the cluster"}). + +% -rest_api(#{name => list_topic_metrics, +% method => 'GET', +% path => "/topic-metrics/:bin:topic", +% func => list, +% descr => "A list of specfied topic metrics of all nodes in the cluster"}). + +% -rest_api(#{name => register_topic_metrics, +% method => 'POST', +% path => "/topic-metrics", +% func => register, +% descr => "Register topic metrics"}). + +% -rest_api(#{name => unregister_all_topic_metrics, +% method => 'DELETE', +% path => "/topic-metrics", +% func => unregister, +% descr => "Unregister all topic metrics"}). + +% -rest_api(#{name => unregister_topic_metrics, +% method => 'DELETE', +% path => "/topic-metrics/:bin:topic", +% func => unregister, +% descr => "Unregister topic metrics"}). + +% -export([ list/2 +% , register/2 +% , unregister/2 +% ]). + +% -export([ get_topic_metrics/2 +% , register_topic_metrics/2 +% , unregister_topic_metrics/2 +% , unregister_all_topic_metrics/1 +% ]). + +% list(#{topic := Topic0}, _Params) -> +% execute_when_enabled(fun() -> +% Topic = emqx_mgmt_util:urldecode(Topic0), +% case safe_validate(Topic) of +% true -> +% case get_topic_metrics(Topic) of +% {error, Reason} -> return({error, Reason}); +% Metrics -> return({ok, maps:from_list(Metrics)}) +% end; +% false -> +% return({error, invalid_topic_name}) +% end +% end); + +% list(_Bindings, _Params) -> +% execute_when_enabled(fun() -> +% case get_all_topic_metrics() of +% {error, Reason} -> return({error, Reason}); +% Metrics -> return({ok, Metrics}) +% end +% end). + +% register(_Bindings, Params) -> +% execute_when_enabled(fun() -> +% case proplists:get_value(<<"topic">>, Params) of +% undefined -> +% return({error, missing_required_params}); +% Topic -> +% case safe_validate(Topic) of +% true -> +% register_topic_metrics(Topic), +% return(ok); +% false -> +% return({error, invalid_topic_name}) +% end +% end +% end). + +% unregister(Bindings, _Params) when map_size(Bindings) =:= 0 -> +% execute_when_enabled(fun() -> +% unregister_all_topic_metrics(), +% return(ok) +% end); + +% unregister(#{topic := Topic0}, _Params) -> +% execute_when_enabled(fun() -> +% Topic = emqx_mgmt_util:urldecode(Topic0), +% case safe_validate(Topic) of +% true -> +% unregister_topic_metrics(Topic), +% return(ok); +% false -> +% return({error, invalid_topic_name}) +% end +% end). + +% execute_when_enabled(Fun) -> +% case emqx_modules:find_module(topic_metrics) of +% true -> +% Fun(); +% false -> +% return({error, module_not_loaded}) +% end. + +% safe_validate(Topic) -> +% try emqx_topic:validate(name, Topic) of +% true -> true +% catch +% error:_Error -> +% false +% end. + +% get_all_topic_metrics() -> +% lists:foldl(fun(Topic, Acc) -> +% case get_topic_metrics(Topic) of +% {error, _Reason} -> +% Acc; +% Metrics -> +% [#{topic => Topic, metrics => Metrics} | Acc] +% end +% end, [], emqx_mod_topic_metrics:all_registered_topics()). + +% get_topic_metrics(Topic) -> +% lists:foldl(fun(Node, Acc) -> +% case get_topic_metrics(Node, Topic) of +% {error, _Reason} -> +% Acc; +% Metrics -> +% case Acc of +% [] -> Metrics; +% _ -> +% lists:foldl(fun({K, V}, Acc0) -> +% [{K, V + proplists:get_value(K, Metrics, 0)} | Acc0] +% end, [], Acc) +% end +% end +% end, [], ekka_mnesia:running_nodes()). + +% get_topic_metrics(Node, Topic) when Node =:= node() -> +% emqx_mod_topic_metrics:metrics(Topic); +% get_topic_metrics(Node, Topic) -> +% rpc_call(Node, get_topic_metrics, [Node, Topic]). + +% register_topic_metrics(Topic) -> +% Results = [register_topic_metrics(Node, Topic) || Node <- ekka_mnesia:running_nodes()], +% case lists:any(fun(Item) -> Item =:= ok end, Results) of +% true -> ok; +% false -> lists:last(Results) +% end. + +% register_topic_metrics(Node, Topic) when Node =:= node() -> +% emqx_mod_topic_metrics:register(Topic); +% register_topic_metrics(Node, Topic) -> +% rpc_call(Node, register_topic_metrics, [Node, Topic]). + +% unregister_topic_metrics(Topic) -> +% Results = [unregister_topic_metrics(Node, Topic) || Node <- ekka_mnesia:running_nodes()], +% case lists:any(fun(Item) -> Item =:= ok end, Results) of +% true -> ok; +% false -> lists:last(Results) +% end. + +% unregister_topic_metrics(Node, Topic) when Node =:= node() -> +% emqx_mod_topic_metrics:unregister(Topic); +% unregister_topic_metrics(Node, Topic) -> +% rpc_call(Node, unregister_topic_metrics, [Node, Topic]). + +% unregister_all_topic_metrics() -> +% Results = [unregister_all_topic_metrics(Node) || Node <- ekka_mnesia:running_nodes()], +% case lists:any(fun(Item) -> Item =:= ok end, Results) of +% true -> ok; +% false -> lists:last(Results) +% end. + +% unregister_all_topic_metrics(Node) when Node =:= node() -> +% emqx_mod_topic_metrics:unregister_all(); +% unregister_all_topic_metrics(Node) -> +% rpc_call(Node, unregister_topic_metrics, [Node]). + +% rpc_call(Node, Fun, Args) -> +% case rpc:call(Node, ?MODULE, Fun, Args) of +% {badrpc, Reason} -> {error, Reason}; +% Res -> Res +% end. + +% return(_) -> +% %% TODO: V5 API +% ok. diff --git a/apps/emqx_modules/test/emqx_mod_delayed_SUITE.erl b/apps/emqx_modules/test/emqx_delayed_SUITE.erl similarity index 77% rename from apps/emqx_modules/test/emqx_mod_delayed_SUITE.erl rename to apps/emqx_modules/test/emqx_delayed_SUITE.erl index 2cd1107f6..c79903f42 100644 --- a/apps/emqx_modules/test/emqx_mod_delayed_SUITE.erl +++ b/apps/emqx_modules/test/emqx_delayed_SUITE.erl @@ -14,9 +14,9 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_mod_delayed_SUITE). +-module(emqx_delayed_SUITE). --import(emqx_mod_delayed, [on_message_publish/1]). +-import(emqx_delayed, [on_message_publish/1]). -compile(export_all). -compile(nowarn_export_all). @@ -35,6 +35,8 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> + ekka_mnesia:start(), + ok = emqx_delayed:mnesia(boot), emqx_ct_helpers:start_apps([emqx_modules]), Config. @@ -46,26 +48,28 @@ end_per_suite(_) -> %%-------------------------------------------------------------------- t_load_case(_) -> - UnHooks = emqx_hooks:lookup('message.publish'), - ?assertEqual([], UnHooks), - ok = emqx_mod_delayed:load([]), Hooks = emqx_hooks:lookup('message.publish'), - ?assertEqual(1, length(Hooks)), + MFA = {emqx_delayed,on_message_publish,[]}, + ?assertEqual(false, lists:keyfind(MFA, 2, Hooks)), + ok = emqx_delayed:enable(), + Hooks1 = emqx_hooks:lookup('message.publish'), + ct:pal("----~p~n", [Hooks1]), + ?assertNotEqual(false, lists:keyfind(MFA, 2, Hooks1)), ok. t_delayed_message(_) -> - ok = emqx_mod_delayed:load([]), + ok = emqx_delayed:enable(), DelayedMsg = emqx_message:make(?MODULE, 1, <<"$delayed/1/publish">>, <<"delayed_m">>), ?assertEqual({stop, DelayedMsg#message{topic = <<"publish">>, headers = #{allow_publish => false}}}, on_message_publish(DelayedMsg)), Msg = emqx_message:make(?MODULE, 1, <<"no_delayed_msg">>, <<"no_delayed">>), ?assertEqual({ok, Msg}, on_message_publish(Msg)), - [Key] = mnesia:dirty_all_keys(emqx_mod_delayed), - [#delayed_message{msg = #message{payload = Payload}}] = mnesia:dirty_read({emqx_mod_delayed, Key}), + [Key] = mnesia:dirty_all_keys(emqx_delayed), + [#delayed_message{msg = #message{payload = Payload}}] = mnesia:dirty_read({emqx_delayed, Key}), ?assertEqual(<<"delayed_m">>, Payload), timer:sleep(5000), - EmptyKey = mnesia:dirty_all_keys(emqx_mod_delayed), + EmptyKey = mnesia:dirty_all_keys(emqx_delayed), ?assertEqual([], EmptyKey), - ok = emqx_mod_delayed:unload([]). + ok = emqx_delayed:disable(). diff --git a/apps/emqx_modules/test/emqx_mod_topic_metrics_SUITE.erl b/apps/emqx_modules/test/emqx_mod_topic_metrics_SUITE.erl deleted file mode 100644 index d41150e4a..000000000 --- a/apps/emqx_modules/test/emqx_mod_topic_metrics_SUITE.erl +++ /dev/null @@ -1,95 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_mod_topic_metrics_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("eunit/include/eunit.hrl"). - -all() -> emqx_ct:all(?MODULE). - -init_per_suite(Config) -> - emqx_ct_helpers:boot_modules(all), - emqx_ct_helpers:start_apps([emqx_modules]), - Config. - -end_per_suite(_Config) -> - emqx_ct_helpers:stop_apps([emqx_modules]). - -t_nonexistent_topic_metrics(_) -> - emqx_mod_topic_metrics:load([]), - ?assertEqual({error, topic_not_found}, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')), - ?assertEqual({error, topic_not_found}, emqx_mod_topic_metrics:inc(<<"a/b/c">>, 'messages.in')), - ?assertEqual({error, topic_not_found}, emqx_mod_topic_metrics:rate(<<"a/b/c">>, 'messages.in')), - ?assertEqual({error, topic_not_found}, emqx_mod_topic_metrics:rates(<<"a/b/c">>, 'messages.in')), - emqx_mod_topic_metrics:register(<<"a/b/c">>), - ?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')), - ?assertEqual({error, invalid_metric}, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'invalid.metrics')), - ?assertEqual({error, invalid_metric}, emqx_mod_topic_metrics:inc(<<"a/b/c">>, 'invalid.metrics')), - ?assertEqual({error, invalid_metric}, emqx_mod_topic_metrics:rate(<<"a/b/c">>, 'invalid.metrics')), - ?assertEqual({error, invalid_metric}, emqx_mod_topic_metrics:rates(<<"a/b/c">>, 'invalid.metrics')), - emqx_mod_topic_metrics:unregister(<<"a/b/c">>), - emqx_mod_topic_metrics:unload([]). - -t_topic_metrics(_) -> - emqx_mod_topic_metrics:load([]), - - ?assertEqual(false, emqx_mod_topic_metrics:is_registered(<<"a/b/c">>)), - ?assertEqual([], emqx_mod_topic_metrics:all_registered_topics()), - emqx_mod_topic_metrics:register(<<"a/b/c">>), - ?assertEqual(true, emqx_mod_topic_metrics:is_registered(<<"a/b/c">>)), - ?assertEqual([<<"a/b/c">>], emqx_mod_topic_metrics:all_registered_topics()), - - ?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')), - ?assertEqual(ok, emqx_mod_topic_metrics:inc(<<"a/b/c">>, 'messages.in')), - ?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')), - ?assert(emqx_mod_topic_metrics:rate(<<"a/b/c">>, 'messages.in') =:= 0), - ?assert(emqx_mod_topic_metrics:rates(<<"a/b/c">>, 'messages.in') =:= #{long => 0,medium => 0,short => 0}), - emqx_mod_topic_metrics:unregister(<<"a/b/c">>), - emqx_mod_topic_metrics:unload([]). - -t_hook(_) -> - emqx_mod_topic_metrics:load([]), - emqx_mod_topic_metrics:register(<<"a/b/c">>), - - ?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')), - ?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.in')), - ?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.out')), - ?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.out')), - ?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')), - - {ok, C} = emqtt:start_link([{host, "localhost"}, - {clientid, "myclient"}, - {username, "myuser"}]), - {ok, _} = emqtt:connect(C), - emqtt:publish(C, <<"a/b/c">>, <<"Hello world">>, 0), - ct:sleep(100), - ?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')), - ?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.in')), - ?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')), - - emqtt:subscribe(C, <<"a/b/c">>), - emqtt:publish(C, <<"a/b/c">>, <<"Hello world">>, 0), - ct:sleep(100), - ?assertEqual(2, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')), - ?assertEqual(2, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.in')), - ?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.out')), - ?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.out')), - ?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')), - emqx_mod_topic_metrics:unregister(<<"a/b/c">>), - emqx_mod_topic_metrics:unload([]). diff --git a/apps/emqx_modules/test/emqx_modules_SUITE.erl b/apps/emqx_modules/test/emqx_modules_SUITE.erl deleted file mode 100644 index 0ee097258..000000000 --- a/apps/emqx_modules/test/emqx_modules_SUITE.erl +++ /dev/null @@ -1,203 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_modules_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("eunit/include/eunit.hrl"). - --define(CONTENT_TYPE, "application/x-www-form-urlencoded"). - --define(HOST, "http://127.0.0.1:8081/"). - --define(API_VERSION, "v4"). - --define(BASE_PATH, "api"). - -all() -> emqx_ct:all(?MODULE). - -init_per_suite(Config) -> - emqx_ct_helpers:start_apps([emqx_management, emqx_modules], fun set_special_configs/1), - emqx_ct_http:create_default_app(), - Config. - -set_special_configs(emqx_management) -> - emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}], - applications =>[#{id => "admin", secret => "public"}]}), - ok; -set_special_configs(_) -> - ok. - -end_per_suite(_Config) -> - emqx_ct_http:delete_default_app(), - emqx_ct_helpers:stop_apps([emqx_modules, emqx_management]). - -t_load(_) -> - ?assertEqual(ok, emqx_modules:unload()), - ?assertEqual(ok, emqx_modules:load()), - ?assertEqual({error, not_started}, emqx_modules:unload(rewrite)), - ?assertEqual(ignore, emqx_modules:reload(rewrite)). - -t_list(_) -> - emqx_modules:load(presence, #{qos => 1}), - ?assertMatch([_ | _ ], emqx_modules:list()), - emqx_modules:unload(presence). - -%% TODO: V5 API -%%t_modules_api(_) -> -%% emqx_modules:load(presence, #{qos => 1}), -%% timer:sleep(50), -%% {ok, Modules1} = request_api(get, api_path(["modules"]), auth_header_()), -%% [Modules11] = filter(get(<<"data">>, Modules1), <<"node">>, atom_to_binary(node(), utf8)), -%% [Module1] = filter(maps:get(<<"modules">>, Modules11), <<"name">>, <<"presence">>), -%% ?assertEqual(<<"presence">>, maps:get(<<"name">>, Module1)), -%% {ok, _} = request_api(put, -%% api_path(["modules", -%% atom_to_list(presence), -%% "unload"]), -%% auth_header_()), -%% {ok, Error1} = request_api(put, -%% api_path(["modules", -%% atom_to_list(presence), -%% "unload"]), -%% auth_header_()), -%% ?assertEqual(<<"not_started">>, get(<<"message">>, Error1)), -%% {ok, Modules2} = request_api(get, -%% api_path(["nodes", atom_to_list(node()), "modules"]), -%% auth_header_()), -%% [Module2] = filter(get(<<"data">>, Modules2), <<"name">>, <<"presence">>), -%% ?assertEqual(<<"presence">>, maps:get(<<"name">>, Module2)), -%% -%% {ok, _} = request_api(put, -%% api_path(["nodes", -%% atom_to_list(node()), -%% "modules", -%% atom_to_list(presence), -%% "load"]), -%% auth_header_()), -%% {ok, Modules3} = request_api(get, -%% api_path(["nodes", atom_to_list(node()), "modules"]), -%% auth_header_()), -%% [Module3] = filter(get(<<"data">>, Modules3), <<"name">>, <<"presence">>), -%% ?assertEqual(<<"presence">>, maps:get(<<"name">>, Module3)), -%% -%% {ok, _} = request_api(put, -%% api_path(["nodes", -%% atom_to_list(node()), -%% "modules", -%% atom_to_list(presence), -%% "unload"]), -%% auth_header_()), -%% {ok, Error2} = request_api(put, -%% api_path(["nodes", -%% atom_to_list(node()), -%% "modules", -%% atom_to_list(presence), -%% "unload"]), -%% auth_header_()), -%% ?assertEqual(<<"not_started">>, get(<<"message">>, Error2)), -%% emqx_modules:unload(presence). - - -t_modules_cmd(_) -> - mock_print(), - meck:new(emqx_modules, [non_strict, passthrough]), - meck:expect(emqx_modules, load, fun(_) -> ok end), - meck:expect(emqx_modules, unload, fun(_) -> ok end), - meck:expect(emqx_modules, reload, fun(_) -> ok end), - ?assertEqual(emqx_modules:cli(["list"]), ok), - ?assertEqual(emqx_modules:cli(["load", "delayed"]), - "Module delayed loaded successfully.\n"), - ?assertEqual(emqx_modules:cli(["unload", "delayed"]), - "Module delayed unloaded successfully.\n"), - unmock_print(). - -%% For: https://github.com/emqx/emqx/issues/4511 -t_join_cluster(_) -> - %% Started by emqx application - {error, {already_started, emqx_modules}} = application:start(emqx_modules), - %% After clustered - emqx:shutdown(), - emqx:reboot(), - {error,{already_started,emqx_modules}} = application:start(emqx_modules), - %% After emqx reboot, we should not interfere with other tests - _ = end_per_suite([]), - _ = init_per_suite([]), - ok. - -mock_print() -> - catch meck:unload(emqx_ctl), - meck:new(emqx_ctl, [non_strict, passthrough]), - meck:expect(emqx_ctl, print, fun(Arg) -> emqx_ctl:format(Arg) end), - meck:expect(emqx_ctl, print, fun(Msg, Arg) -> emqx_ctl:format(Msg, Arg) end), - meck:expect(emqx_ctl, usage, fun(Usages) -> emqx_ctl:format_usage(Usages) end), - meck:expect(emqx_ctl, usage, fun(Cmd, Descr) -> emqx_ctl:format_usage(Cmd, Descr) end). - -unmock_print() -> - meck:unload(emqx_ctl). - -get(Key, ResponseBody) -> - maps:get(Key, jiffy:decode(list_to_binary(ResponseBody), [return_maps])). - -request_api(Method, Url, Auth) -> - request_api(Method, Url, [], Auth, []). - -request_api(Method, Url, QueryParams, Auth) -> - request_api(Method, Url, QueryParams, Auth, []). - -request_api(Method, Url, QueryParams, Auth, []) -> - NewUrl = case QueryParams of - "" -> Url; - _ -> Url ++ "?" ++ QueryParams - end, - do_request_api(Method, {NewUrl, [Auth]}); -request_api(Method, Url, QueryParams, Auth, Body) -> - NewUrl = case QueryParams of - "" -> Url; - _ -> Url ++ "?" ++ QueryParams - end, - do_request_api(Method, {NewUrl, [Auth], "application/json", emqx_json:encode(Body)}). - -do_request_api(Method, Request)-> - ct:pal("Method: ~p, Request: ~p", [Method, Request]), - case httpc:request(Method, Request, [], []) of - {error, socket_closed_remotely} -> - {error, socket_closed_remotely}; - {ok, {{"HTTP/1.1", Code, _}, _, Return} } - when Code =:= 200 orelse Code =:= 201 -> - {ok, Return}; - {ok, {Reason, _, _}} -> - {error, Reason} - end. - -auth_header_() -> - AppId = <<"admin">>, - AppSecret = <<"public">>, - auth_header_(binary_to_list(AppId), binary_to_list(AppSecret)). - -auth_header_(User, Pass) -> - Encoded = base64:encode_to_string(lists:append([User,":",Pass])), - {"Authorization","Basic " ++ Encoded}. - -api_path(Parts)-> - ?HOST ++ filename:join([?BASE_PATH, ?API_VERSION] ++ Parts). - -filter(List, Key, Value) -> - lists:filter(fun(Item) -> - maps:get(Key, Item) == Value - end, List). diff --git a/apps/emqx_modules/test/emqx_mod_presence_SUITE.erl b/apps/emqx_modules/test/emqx_presence_SUITE.erl similarity index 83% rename from apps/emqx_modules/test/emqx_mod_presence_SUITE.erl rename to apps/emqx_modules/test/emqx_presence_SUITE.erl index e02fa6fdd..de6e7bccc 100644 --- a/apps/emqx_modules/test/emqx_mod_presence_SUITE.erl +++ b/apps/emqx_modules/test/emqx_presence_SUITE.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_mod_presence_SUITE). +-module(emqx_presence_SUITE). -compile(export_all). -compile(nowarn_export_all). @@ -27,16 +27,13 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:start_apps([emqx_modules]), - %% Ensure all the modules unloaded. - ok = emqx_modules:unload(), Config. end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([emqx_modules]). -%% Test case for emqx_mod_presence t_mod_presence(_) -> - ok = emqx_mod_presence:load(#{qos => ?QOS_1}), + ok = emqx_presence:enable(), {ok, C1} = emqtt:start_link([{clientid, <<"monsys">>}]), {ok, _} = emqtt:connect(C1), {ok, _Props, [?QOS_1]} = emqtt:subscribe(C1, <<"$SYS/brokers/+/clients/#">>, qos1), @@ -49,16 +46,16 @@ t_mod_presence(_) -> ok = emqtt:disconnect(C2), ok = recv_and_check_presence(<<"clientid">>, <<"disconnected">>), ok = emqtt:disconnect(C1), - ok = emqx_mod_presence:unload([]). + ok = emqx_presence:disable(). t_mod_presence_reason(_) -> - ?assertEqual(normal, emqx_mod_presence:reason(normal)), - ?assertEqual(discarded, emqx_mod_presence:reason({shutdown, discarded})), - ?assertEqual(tcp_error, emqx_mod_presence:reason({tcp_error, einval})), - ?assertEqual(internal_error, emqx_mod_presence:reason(<<"unknown error">>)). + ?assertEqual(normal, emqx_presence:reason(normal)), + ?assertEqual(discarded, emqx_presence:reason({shutdown, discarded})), + ?assertEqual(tcp_error, emqx_presence:reason({tcp_error, einval})), + ?assertEqual(internal_error, emqx_presence:reason(<<"unknown error">>)). recv_and_check_presence(ClientId, Presence) -> - {ok, #{qos := ?QOS_1, topic := Topic, payload := Payload}} = receive_publish(100), + {ok, #{qos := ?QOS_0, topic := Topic, payload := Payload}} = receive_publish(100), ?assertMatch([<<"$SYS">>, <<"brokers">>, _Node, <<"clients">>, ClientId, Presence], binary:split(Topic, <<"/">>, [global])), case Presence of diff --git a/apps/emqx_modules/test/emqx_mod_rewrite_SUITE.erl b/apps/emqx_modules/test/emqx_rewrite_SUITE.erl similarity index 79% rename from apps/emqx_modules/test/emqx_mod_rewrite_SUITE.erl rename to apps/emqx_modules/test/emqx_rewrite_SUITE.erl index 9b94cba8c..f7a681ffb 100644 --- a/apps/emqx_modules/test/emqx_mod_rewrite_SUITE.erl +++ b/apps/emqx_modules/test/emqx_rewrite_SUITE.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_mod_rewrite_SUITE). +-module(emqx_rewrite_SUITE). -compile(export_all). -compile(nowarn_export_all). @@ -38,8 +38,6 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:start_apps([emqx_modules]), - %% Ensure all the modules unloaded. - ok = emqx_modules:unload(), Config. end_per_suite(_Config) -> @@ -47,7 +45,12 @@ end_per_suite(_Config) -> %% Test case for emqx_mod_write t_mod_rewrite(_Config) -> - ok = emqx_mod_rewrite:load(#{rules => ?RULES}), + %% important! let emqx_schema include the current app! + meck:new(emqx_schema, [non_strict, passthrough, no_history, no_link]), + meck:expect(emqx_schema, includes, fun() -> ["rewrite"] end ), + meck:expect(emqx_schema, extra_schema_fields, fun(FieldName) -> emqx_modules_schema:fields(FieldName) end), + ok = emqx_config:update([rewrite, rules], ?RULES), + ok = emqx_rewrite:enable(), {ok, C} = emqtt:start_link([{clientid, <<"rewrite_client">>}]), {ok, _} = emqtt:connect(C), PubOrigTopics = [<<"x/y/2">>, <<"x/1/2">>], @@ -79,14 +82,15 @@ t_mod_rewrite(_Config) -> {ok, _, _} = emqtt:unsubscribe(C, PubDestTopics), ok = emqtt:disconnect(C), - ok = emqx_mod_rewrite:unload(?RULES). + ok = emqx_rewrite:disable(), + meck:unload(emqx_schema). t_rewrite_rule(_Config) -> - {PubRules, SubRules} = emqx_mod_rewrite:compile(?RULES), - ?assertEqual(<<"z/y/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/y/2">>, PubRules)), - ?assertEqual(<<"x/1/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/1/2">>, PubRules)), - ?assertEqual(<<"y/z/b">>, emqx_mod_rewrite:match_and_rewrite(<<"y/a/z/b">>, SubRules)), - ?assertEqual(<<"y/def">>, emqx_mod_rewrite:match_and_rewrite(<<"y/def">>, SubRules)). + {PubRules, SubRules} = emqx_rewrite:compile(?RULES), + ?assertEqual(<<"z/y/2">>, emqx_rewrite:match_and_rewrite(<<"x/y/2">>, PubRules)), + ?assertEqual(<<"x/1/2">>, emqx_rewrite:match_and_rewrite(<<"x/1/2">>, PubRules)), + ?assertEqual(<<"y/z/b">>, emqx_rewrite:match_and_rewrite(<<"y/a/z/b">>, SubRules)), + ?assertEqual(<<"y/def">>, emqx_rewrite:match_and_rewrite(<<"y/def">>, SubRules)). %%-------------------------------------------------------------------- %% Internal functions diff --git a/apps/emqx_modules/test/emqx_mod_telemetry_SUITE.erl b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl similarity index 51% rename from apps/emqx_modules/test/emqx_mod_telemetry_SUITE.erl rename to apps/emqx_modules/test/emqx_telemetry_SUITE.erl index 1aaa2f3c7..7352c4de5 100644 --- a/apps/emqx_modules/test/emqx_mod_telemetry_SUITE.erl +++ b/apps/emqx_modules/test/emqx_telemetry_SUITE.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_mod_telemetry_SUITE). +-module(emqx_telemetry_SUITE). -compile(export_all). -compile(nowarn_export_all). @@ -29,8 +29,7 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> ok = ekka_mnesia:start(), - ok = emqx_mod_telemetry:mnesia(boot), - emqx_ct_helpers:boot_modules(all), + ok = emqx_telemetry:mnesia(boot), emqx_ct_helpers:start_apps([emqx_modules]), Config. @@ -38,57 +37,57 @@ end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([emqx_modules]). t_uuid(_) -> - UUID = emqx_mod_telemetry:generate_uuid(), + UUID = emqx_telemetry:generate_uuid(), Parts = binary:split(UUID, <<"-">>, [global, trim]), ?assertEqual(5, length(Parts)), - {ok, UUID2} = emqx_mod_telemetry:get_uuid(), - emqx_mod_telemetry:unload(#{}), - emqx_mod_telemetry:load(#{}), - {ok, UUID3} = emqx_mod_telemetry:get_uuid(), + {ok, UUID2} = emqx_telemetry:get_uuid(), + emqx_telemetry:disable(), + emqx_telemetry:enable(), + {ok, UUID3} = emqx_telemetry:get_uuid(), ?assertEqual(UUID2, UUID3). t_official_version(_) -> - true = emqx_mod_telemetry:official_version("0.0.0"), - true = emqx_mod_telemetry:official_version("1.1.1"), - true = emqx_mod_telemetry:official_version("10.10.10"), - false = emqx_mod_telemetry:official_version("0.0.0.0"), - false = emqx_mod_telemetry:official_version("1.1.a"), - true = emqx_mod_telemetry:official_version("0.0-alpha.1"), - true = emqx_mod_telemetry:official_version("1.1-alpha.1"), - true = emqx_mod_telemetry:official_version("10.10-alpha.10"), - false = emqx_mod_telemetry:official_version("1.1-alpha.0"), - true = emqx_mod_telemetry:official_version("1.1-beta.1"), - true = emqx_mod_telemetry:official_version("1.1-rc.1"), - false = emqx_mod_telemetry:official_version("1.1-alpha.a"). + true = emqx_telemetry:official_version("0.0.0"), + true = emqx_telemetry:official_version("1.1.1"), + true = emqx_telemetry:official_version("10.10.10"), + false = emqx_telemetry:official_version("0.0.0.0"), + false = emqx_telemetry:official_version("1.1.a"), + true = emqx_telemetry:official_version("0.0-alpha.1"), + true = emqx_telemetry:official_version("1.1-alpha.1"), + true = emqx_telemetry:official_version("10.10-alpha.10"), + false = emqx_telemetry:official_version("1.1-alpha.0"), + true = emqx_telemetry:official_version("1.1-beta.1"), + true = emqx_telemetry:official_version("1.1-rc.1"), + false = emqx_telemetry:official_version("1.1-alpha.a"). t_get_telemetry(_) -> - {ok, TelemetryData} = emqx_mod_telemetry:get_telemetry(), + {ok, TelemetryData} = emqx_telemetry:get_telemetry(), OTPVersion = bin(erlang:system_info(otp_release)), ?assertEqual(OTPVersion, get_value(otp_version, TelemetryData)), - {ok, UUID} = emqx_mod_telemetry:get_uuid(), + {ok, UUID} = emqx_telemetry:get_uuid(), ?assertEqual(UUID, get_value(uuid, TelemetryData)), ?assertEqual(0, get_value(num_clients, TelemetryData)). t_enable(_) -> - ok = meck:new(emqx_mod_telemetry, [non_strict, passthrough, no_history, no_link]), - ok = meck:expect(emqx_mod_telemetry, official_version, fun(_) -> true end), - ok = emqx_mod_telemetry:load(#{}), - ?assertEqual(true, emqx_mod_telemetry:get_status()), - ok = emqx_mod_telemetry:unload(#{}), - ?assertEqual(false, emqx_mod_telemetry:get_status()), - meck:unload([emqx_mod_telemetry]). + ok = meck:new(emqx_telemetry, [non_strict, passthrough, no_history, no_link]), + ok = meck:expect(emqx_telemetry, official_version, fun(_) -> true end), + ok = emqx_telemetry:enable(), + ?assertEqual(true, emqx_telemetry:get_status()), + ok = emqx_telemetry:disable(), + ?assertEqual(false, emqx_telemetry:get_status()), + meck:unload([emqx_telemetry]). t_send_after_enable(_) -> - ok = meck:new(emqx_mod_telemetry, [non_strict, passthrough, no_history, no_link]), - ok = meck:expect(emqx_mod_telemetry, official_version, fun(_) -> true end), - ok = emqx_mod_telemetry:unload(#{}), + ok = meck:new(emqx_telemetry, [non_strict, passthrough, no_history, no_link]), + ok = meck:expect(emqx_telemetry, official_version, fun(_) -> true end), + ok = emqx_telemetry:disable(), ok = snabbkaffe:start_trace(), try - ok = emqx_mod_telemetry:load(#{}), + ok = emqx_telemetry:enable(), ?assertMatch({ok, _}, ?block_until(#{?snk_kind := telemetry_data_reported}, 2000, 100)) after ok = snabbkaffe:stop(), - meck:unload([emqx_mod_telemetry]) + meck:unload([emqx_telemetry]) end. bin(L) when is_list(L) -> diff --git a/apps/emqx_modules/test/emqx_topic_metrics_SUITE.erl b/apps/emqx_modules/test/emqx_topic_metrics_SUITE.erl new file mode 100644 index 000000000..5d1c5f84a --- /dev/null +++ b/apps/emqx_modules/test/emqx_topic_metrics_SUITE.erl @@ -0,0 +1,95 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_topic_metrics_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + emqx_ct_helpers:boot_modules(all), + emqx_ct_helpers:start_apps([emqx_modules]), + Config. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([emqx_modules]). + +t_nonexistent_topic_metrics(_) -> + emqx_topic_metrics:enable(), + ?assertEqual({error, topic_not_found}, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')), + ?assertEqual({error, topic_not_found}, emqx_topic_metrics:inc(<<"a/b/c">>, 'messages.in')), + ?assertEqual({error, topic_not_found}, emqx_topic_metrics:rate(<<"a/b/c">>, 'messages.in')), + % ?assertEqual({error, topic_not_found}, emqx_topic_metrics:rates(<<"a/b/c">>, 'messages.in')), + emqx_topic_metrics:register(<<"a/b/c">>), + ?assertEqual(0, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')), + ?assertEqual({error, invalid_metric}, emqx_topic_metrics:val(<<"a/b/c">>, 'invalid.metrics')), + ?assertEqual({error, invalid_metric}, emqx_topic_metrics:inc(<<"a/b/c">>, 'invalid.metrics')), + ?assertEqual({error, invalid_metric}, emqx_topic_metrics:rate(<<"a/b/c">>, 'invalid.metrics')), + % ?assertEqual({error, invalid_metric}, emqx_topic_metrics:rates(<<"a/b/c">>, 'invalid.metrics')), + emqx_topic_metrics:unregister(<<"a/b/c">>), + emqx_topic_metrics:disable(). + +t_topic_metrics(_) -> + emqx_topic_metrics:enable(), + + ?assertEqual(false, emqx_topic_metrics:is_registered(<<"a/b/c">>)), + ?assertEqual([], emqx_topic_metrics:all_registered_topics()), + emqx_topic_metrics:register(<<"a/b/c">>), + ?assertEqual(true, emqx_topic_metrics:is_registered(<<"a/b/c">>)), + ?assertEqual([<<"a/b/c">>], emqx_topic_metrics:all_registered_topics()), + + ?assertEqual(0, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')), + ?assertEqual(ok, emqx_topic_metrics:inc(<<"a/b/c">>, 'messages.in')), + ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')), + ?assert(emqx_topic_metrics:rate(<<"a/b/c">>, 'messages.in') =:= 0), + % ?assert(emqx_topic_metrics:rates(<<"a/b/c">>, 'messages.in') =:= #{long => 0,medium => 0,short => 0}), + emqx_topic_metrics:unregister(<<"a/b/c">>), + emqx_topic_metrics:disable(). + +t_hook(_) -> + emqx_topic_metrics:enable(), + emqx_topic_metrics:register(<<"a/b/c">>), + + ?assertEqual(0, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')), + ?assertEqual(0, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.in')), + ?assertEqual(0, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.out')), + ?assertEqual(0, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.out')), + ?assertEqual(0, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')), + + {ok, C} = emqtt:start_link([{host, "localhost"}, + {clientid, "myclient"}, + {username, "myuser"}]), + {ok, _} = emqtt:connect(C), + emqtt:publish(C, <<"a/b/c">>, <<"Hello world">>, 0), + ct:sleep(100), + ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')), + ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.in')), + ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')), + + emqtt:subscribe(C, <<"a/b/c">>), + emqtt:publish(C, <<"a/b/c">>, <<"Hello world">>, 0), + ct:sleep(100), + ?assertEqual(2, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')), + ?assertEqual(2, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.in')), + ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.out')), + ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.out')), + ?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')), + emqx_topic_metrics:unregister(<<"a/b/c">>), + emqx_topic_metrics:disable(). diff --git a/extension_schemas.config b/extension_schemas.config index 2eb4f3d08..4880dfa86 100644 --- a/extension_schemas.config +++ b/extension_schemas.config @@ -8,10 +8,15 @@ , {"emqx_authn", emqx_authn_schema} , {"authorization", emqx_authz_schema} , {"emqx_bridge_mqtt", emqx_bridge_mqtt_schema} -, {"emqx_modules", emqx_modules_schema} , {"emqx_management", emqx_management_schema} , {"emqx_dashboard", emqx_dashboard_schema} , {"emqx_gateway", emqx_gateway_schema} , {"prometheus", emqx_prometheus_schema} , {"statsd", emqx_statsd_schema} +, {"delayed", emqx_modules_schema} +, {"recon", emqx_modules_schema} +, {"telemetry", emqx_modules_schema} +, {"presence", emqx_modules_schema} +, {"rewrite", emqx_modules_schema} +, {"topic_metrics", emqx_modules_schema} ].