chore(modules): presence/delayed/telemetry/rewrite/topic_metrics independent conf

This commit is contained in:
Turtle 2021-07-29 14:54:35 +08:00 committed by turtleDeng
parent 59f645dc59
commit cdc8000493
26 changed files with 639 additions and 1259 deletions

View File

@ -1,23 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_gen_mod).
-callback(load(Opts :: any()) -> ok | {error, term()}).
-callback(unload(State :: term()) -> term()).
-callback(description() -> any()).

View File

@ -1,37 +1,32 @@
# empty
emqx_modules: {
modules:[
delayed: {
enable: true
max_delayed_messages: 0
}
recon: {
enable: true
}
telemetry: {
enable: true
}
presence: {
enable: true
}
topic_metrics:{
topics: ["topic/#"]
}
rewrite:{
rules: [
{
type: delayed
enable: false
},
{
type: presence
enable: true
qos: 1
},
{
type: recon
enable: true
},
{
type: rewrite
enable: false
rules:[{
action: publish
source_topic: "x/#"
re: "^x/y/(.+)$"
dest_topic: "z/y/$1"
}]
},
{
type: topic_metrics
enable: false
topics: ["topic/#"]
},
{
type: telemetry
enable: false
action: publish
source_topic: "x/#"
re: "^x/y/(.+)$"
dest_topic: "z/y/$1"
}
]
}

View File

@ -14,10 +14,9 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_delayed).
-module(emqx_delayed).
-behaviour(gen_server).
-behaviour(emqx_gen_mod).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
@ -30,12 +29,6 @@
-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
%% emqx_gen_mod callbacks
-export([ load/1
, unload/1
, description/0
]).
-export([ start_link/0
, on_message_publish/1
]).
@ -49,10 +42,13 @@
, code_change/3
]).
-record(delayed_message,
{ key
, msg
}).
%% gen_server callbacks
-export([ get_status/0
, enable/0
, disable/0
]).
-record(delayed_message, {key, msg}).
-define(TAB, ?MODULE).
-define(SERVER, ?MODULE).
@ -63,7 +59,6 @@
%%--------------------------------------------------------------------
%% Mnesia bootstrap
%%--------------------------------------------------------------------
mnesia(boot) ->
ok = ekka_mnesia:create_table(?TAB, [
{type, ordered_set},
@ -75,25 +70,8 @@ mnesia(copy) ->
ok = ekka_mnesia:copy_table(?TAB, disc_copies).
%%--------------------------------------------------------------------
%% Load/Unload
%%--------------------------------------------------------------------
-spec(load(list()) -> ok).
load(_Env) ->
emqx_mod_sup:start_child(?MODULE, worker),
emqx:hook('message.publish', {?MODULE, on_message_publish, []}).
-spec(unload(list()) -> ok).
unload(_Env) ->
emqx:unhook('message.publish', {?MODULE, on_message_publish}),
emqx_mod_sup:stop_child(?MODULE).
description() ->
"EMQ X Delayed Publish Module".
%%--------------------------------------------------------------------
%% Hooks
%%--------------------------------------------------------------------
on_message_publish(Msg = #message{
id = Id,
topic = <<"$delayed/", Topic/binary>>,
@ -124,25 +102,48 @@ on_message_publish(Msg) ->
-spec(start_link() -> emqx_types:startlink_ret()).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
Opts = emqx_config:get([delayed], #{}),
gen_server:start_link({local, ?SERVER}, ?MODULE, [Opts], []).
-spec(store(#delayed_message{}) -> ok).
store(DelayedMsg) ->
gen_server:call(?SERVER, {store, DelayedMsg}, infinity).
get_status() ->
gen_server:call(?SERVER, get_status).
enable() ->
gen_server:call(?SERVER, enable).
disable() ->
gen_server:call(?SERVER, disable).
%%--------------------------------------------------------------------
%% gen_server callback
%%--------------------------------------------------------------------
init([]) ->
init([_Opts]) ->
{ok, ensure_stats_event(
ensure_publish_timer(#{timer => undefined, publish_at => 0}))}.
ensure_publish_timer(#{timer => undefined,
publish_at => 0,
enabled => false}))}.
handle_call({store, DelayedMsg = #delayed_message{key = Key}}, _From, State) ->
ok = ekka_mnesia:dirty_write(?TAB, DelayedMsg),
emqx_metrics:inc('messages.delayed'),
{reply, ok, ensure_publish_timer(Key, State)};
handle_call(enable, _From, State) ->
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}),
{reply, ok, State};
handle_call(disable, _From, State) ->
emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
{reply, ok, State};
handle_call(get_status, _From, State = #{enabled := Enabled}) ->
{reply, Enabled, State};
handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.

View File

@ -1,207 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_api_topic_metrics).
-rest_api(#{name => list_all_topic_metrics,
method => 'GET',
path => "/topic-metrics",
func => list,
descr => "A list of all topic metrics of all nodes in the cluster"}).
-rest_api(#{name => list_topic_metrics,
method => 'GET',
path => "/topic-metrics/:bin:topic",
func => list,
descr => "A list of specfied topic metrics of all nodes in the cluster"}).
-rest_api(#{name => register_topic_metrics,
method => 'POST',
path => "/topic-metrics",
func => register,
descr => "Register topic metrics"}).
-rest_api(#{name => unregister_all_topic_metrics,
method => 'DELETE',
path => "/topic-metrics",
func => unregister,
descr => "Unregister all topic metrics"}).
-rest_api(#{name => unregister_topic_metrics,
method => 'DELETE',
path => "/topic-metrics/:bin:topic",
func => unregister,
descr => "Unregister topic metrics"}).
-export([ list/2
, register/2
, unregister/2
]).
-export([ get_topic_metrics/2
, register_topic_metrics/2
, unregister_topic_metrics/2
, unregister_all_topic_metrics/1
]).
list(#{topic := Topic0}, _Params) ->
execute_when_enabled(fun() ->
Topic = emqx_mgmt_util:urldecode(Topic0),
case safe_validate(Topic) of
true ->
case get_topic_metrics(Topic) of
{error, Reason} -> return({error, Reason});
Metrics -> return({ok, maps:from_list(Metrics)})
end;
false ->
return({error, invalid_topic_name})
end
end);
list(_Bindings, _Params) ->
execute_when_enabled(fun() ->
case get_all_topic_metrics() of
{error, Reason} -> return({error, Reason});
Metrics -> return({ok, Metrics})
end
end).
register(_Bindings, Params) ->
execute_when_enabled(fun() ->
case proplists:get_value(<<"topic">>, Params) of
undefined ->
return({error, missing_required_params});
Topic ->
case safe_validate(Topic) of
true ->
register_topic_metrics(Topic),
return(ok);
false ->
return({error, invalid_topic_name})
end
end
end).
unregister(Bindings, _Params) when map_size(Bindings) =:= 0 ->
execute_when_enabled(fun() ->
unregister_all_topic_metrics(),
return(ok)
end);
unregister(#{topic := Topic0}, _Params) ->
execute_when_enabled(fun() ->
Topic = emqx_mgmt_util:urldecode(Topic0),
case safe_validate(Topic) of
true ->
unregister_topic_metrics(Topic),
return(ok);
false ->
return({error, invalid_topic_name})
end
end).
execute_when_enabled(Fun) ->
case emqx_modules:find_module(topic_metrics) of
true ->
Fun();
false ->
return({error, module_not_loaded})
end.
safe_validate(Topic) ->
try emqx_topic:validate(name, Topic) of
true -> true
catch
error:_Error ->
false
end.
get_all_topic_metrics() ->
lists:foldl(fun(Topic, Acc) ->
case get_topic_metrics(Topic) of
{error, _Reason} ->
Acc;
Metrics ->
[#{topic => Topic, metrics => Metrics} | Acc]
end
end, [], emqx_mod_topic_metrics:all_registered_topics()).
get_topic_metrics(Topic) ->
lists:foldl(fun(Node, Acc) ->
case get_topic_metrics(Node, Topic) of
{error, _Reason} ->
Acc;
Metrics ->
case Acc of
[] -> Metrics;
_ ->
lists:foldl(fun({K, V}, Acc0) ->
[{K, V + proplists:get_value(K, Metrics, 0)} | Acc0]
end, [], Acc)
end
end
end, [], ekka_mnesia:running_nodes()).
get_topic_metrics(Node, Topic) when Node =:= node() ->
emqx_mod_topic_metrics:metrics(Topic);
get_topic_metrics(Node, Topic) ->
rpc_call(Node, get_topic_metrics, [Node, Topic]).
register_topic_metrics(Topic) ->
Results = [register_topic_metrics(Node, Topic) || Node <- ekka_mnesia:running_nodes()],
case lists:any(fun(Item) -> Item =:= ok end, Results) of
true -> ok;
false -> lists:last(Results)
end.
register_topic_metrics(Node, Topic) when Node =:= node() ->
emqx_mod_topic_metrics:register(Topic);
register_topic_metrics(Node, Topic) ->
rpc_call(Node, register_topic_metrics, [Node, Topic]).
unregister_topic_metrics(Topic) ->
Results = [unregister_topic_metrics(Node, Topic) || Node <- ekka_mnesia:running_nodes()],
case lists:any(fun(Item) -> Item =:= ok end, Results) of
true -> ok;
false -> lists:last(Results)
end.
unregister_topic_metrics(Node, Topic) when Node =:= node() ->
emqx_mod_topic_metrics:unregister(Topic);
unregister_topic_metrics(Node, Topic) ->
rpc_call(Node, unregister_topic_metrics, [Node, Topic]).
unregister_all_topic_metrics() ->
Results = [unregister_all_topic_metrics(Node) || Node <- ekka_mnesia:running_nodes()],
case lists:any(fun(Item) -> Item =:= ok end, Results) of
true -> ok;
false -> lists:last(Results)
end.
unregister_all_topic_metrics(Node) when Node =:= node() ->
emqx_mod_topic_metrics:unregister_all();
unregister_all_topic_metrics(Node) ->
rpc_call(Node, unregister_topic_metrics, [Node]).
rpc_call(Node, Fun, Args) ->
case rpc:call(Node, ?MODULE, Fun, Args) of
{badrpc, Reason} -> {error, Reason};
Res -> Res
end.
return(_) ->
%% TODO: V5 API
ok.

View File

@ -1,79 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_sup).
-behaviour(supervisor).
-include_lib("emqx/include/types.hrl").
-export([ start_link/0
, start_child/1
, start_child/2
, stop_child/1
]).
-export([init/1]).
%% Helper macro for declaring children of supervisor
-define(CHILD(Mod, Type), #{id => Mod,
start => {Mod, start_link, []},
restart => permanent,
shutdown => 5000,
type => Type,
modules => [Mod]}).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
-spec start_child(supervisor:child_spec()) -> ok.
start_child(ChildSpec) when is_map(ChildSpec) ->
assert_started(supervisor:start_child(?MODULE, ChildSpec)).
-spec start_child(atom(), atom()) -> ok.
start_child(Mod, Type) when is_atom(Mod) andalso is_atom(Type) ->
assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, Type))).
-spec(stop_child(any()) -> ok | {error, term()}).
stop_child(ChildId) ->
case supervisor:terminate_child(?MODULE, ChildId) of
ok -> supervisor:delete_child(?MODULE, ChildId);
Error -> Error
end.
%%--------------------------------------------------------------------
%% Supervisor callbacks
%%--------------------------------------------------------------------
init([]) ->
Env = [],
{ok, {{one_for_one, 10, 3600},
[#{id => telemetry,
start => {emqx_mod_telemetry, start_link, [Env]},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [emqx_mod_telemetry]}]}}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
assert_started({ok, _Pid}) -> ok;
assert_started({ok, _Pid, _Info}) -> ok;
assert_started({error, {already_tarted, _Pid}}) -> ok;
assert_started({error, Reason}) -> erlang:error(Reason).

View File

@ -1,9 +1,9 @@
{application, emqx_modules,
[{description, "EMQ X Module Management"},
[{description, "EMQ X Modules"},
{vsn, "5.0.0"},
{modules, []},
{applications, [kernel,stdlib]},
{mod, {emqx_modules_app, []}},
{registered, [emqx_mod_sup]},
{registered, [emqx_modules_sup]},
{env, []}
]}.

View File

@ -1,154 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_modules).
-include_lib("emqx/include/logger.hrl").
-logger_header("[Modules]").
-export([ list/0
, load/0
, load/2
, unload/0
, unload/1
, reload/1
, find_module/1
]).
-export([cli/1]).
%% @doc List all available plugins
-spec(list() -> [{atom(), boolean()}]).
list() ->
persistent_term:get(?MODULE, []).
%% @doc Load all the extended modules.
-spec(load() -> ok).
load() ->
Modules = emqx_config:get([emqx_modules, modules], []),
lists:foreach(fun(#{type := Module, enable := Enable} = Config) ->
case Enable of
true ->
load(name(Module), maps:without([type, enable], Config));
false ->
ok
end
end, Modules).
load(Module, Env) ->
ModuleName = name(Module),
case find_module(ModuleName) of
false ->
load_mod(ModuleName, Env);
true ->
?LOG(notice, "Module ~s is already started", [ModuleName]),
{error, already_started}
end.
%% @doc Unload all the extended modules.
-spec(unload() -> ok).
unload() ->
Modules = emqx_config:get([emqx_modules, modules], []),
lists:foreach(fun(#{type := Module, enable := Enable}) ->
case Enable of
true ->
unload_mod(name(Module));
false ->
ok
end
end, Modules).
unload(ModuleName) ->
case find_module(ModuleName) of
false ->
?LOG(alert, "Module ~s not found, cannot load it", [ModuleName]),
{error, not_started};
true ->
unload_mod(ModuleName)
end.
-spec(reload(module()) -> ok | ignore | {error, any()}).
reload(_) ->
ignore.
find_module(ModuleName) ->
lists:member(ModuleName, persistent_term:get(?MODULE, [])).
load_mod(ModuleName, Env) ->
case ModuleName:load(Env) of
ok ->
Modules = persistent_term:get(?MODULE, []),
persistent_term:put(?MODULE, [ModuleName| Modules]),
?LOG(info, "Load ~s module successfully.", [ModuleName]);
{error, Error} ->
?LOG(error, "Load module ~s failed, cannot load for ~0p", [ModuleName, Error]),
{error, Error}
end.
unload_mod(ModuleName) ->
case ModuleName:unload(#{}) of
ok ->
Modules = persistent_term:get(?MODULE, []),
persistent_term:put(?MODULE, Modules -- [ModuleName]),
?LOG(info, "Unload ~s module successfully.", [ModuleName]);
{error, Error} ->
?LOG(error, "Unload module ~s failed, cannot unload for ~0p", [ModuleName, Error])
end.
%%--------------------------------------------------------------------
%% @doc Modules Command
cli(["list"]) ->
lists:foreach(fun(Name) ->
emqx_ctl:print("Module(~s, description=~s)~n",
[Name, Name:description()])
end, emqx_modules:list());
cli(["load", Name]) ->
case emqx_modules:load(list_to_atom(Name), #{}) of
ok ->
emqx_ctl:print("Module ~s loaded successfully.~n", [Name]);
{error, Reason} ->
emqx_ctl:print("Load module ~s error: ~p.~n", [Name, Reason])
end;
cli(["unload", Name]) ->
case emqx_modules:unload(list_to_atom(Name)) of
ok ->
emqx_ctl:print("Module ~s unloaded successfully.~n", [Name]);
{error, Reason} ->
emqx_ctl:print("Unload module ~s error: ~p.~n", [Name, Reason])
end;
cli(["reload", Name]) ->
emqx_ctl:print("Module: ~p does not need to be reloaded.~n", [Name]);
cli(_) ->
emqx_ctl:usage([{"modules list", "Show loaded modules"},
{"modules load <Module>", "Load module"},
{"modules unload <Module>", "Unload module"},
{"modules reload <Module>", "Reload module"}
]).
name(Name) when is_binary(Name) ->
name(binary_to_atom(Name, utf8));
name(delayed) -> emqx_mod_delayed;
name(presence) -> emqx_mod_presence;
name(recon) -> emqx_mod_recon;
name(rewrite) -> emqx_mod_rewrite;
name(topic_metrics) -> emqx_mod_topic_metrics;
name(telemetry) -> emqx_mod_telemetry;
name(Name) -> Name.

View File

@ -1,171 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_modules_api).
-rest_api(#{name => list_all_modules,
method => 'GET',
path => "/modules/",
func => list,
descr => "List all modules in the cluster"}).
-rest_api(#{name => list_node_modules,
method => 'GET',
path => "/nodes/:atom:node/modules/",
func => list,
descr => "List all modules on a node"}).
-rest_api(#{name => load_node_module,
method => 'PUT',
path => "/nodes/:atom:node/modules/:atom:module/load",
func => load,
descr => "Load a module"}).
-rest_api(#{name => unload_node_module,
method => 'PUT',
path => "/nodes/:atom:node/modules/:atom:module/unload",
func => unload,
descr => "Unload a module"}).
-rest_api(#{name => reload_node_module,
method => 'PUT',
path => "/nodes/:atom:node/modules/:atom:module/reload",
func => reload,
descr => "Reload a module"}).
-rest_api(#{name => load_module,
method => 'PUT',
path => "/modules/:atom:module/load",
func => load,
descr => "load a module in the cluster"}).
-rest_api(#{name => unload_module,
method => 'PUT',
path => "/modules/:atom:module/unload",
func => unload,
descr => "Unload a module in the cluster"}).
-rest_api(#{name => reload_module,
method => 'PUT',
path => "/modules/:atom:module/reload",
func => reload,
descr => "Reload a module in the cluster"}).
-export([ list/2
, list_modules/1
, load/2
, unload/2
, reload/2
]).
-export([ do_load_module/3
, do_unload_module/2
]).
list(#{node := Node}, _Params) ->
return({ok, [format(Module) || Module <- list_modules(Node)]});
list(_Bindings, _Params) ->
return({ok, [format(Node, Modules) || {Node, Modules} <- list_modules()]}).
load(#{node := Node, module := Module}, Params) ->
return(do_load_module(Node, Module, Params));
load(#{module := Module}, Params) ->
Results = [do_load_module(Node, Module, Params) || Node <- ekka_mnesia:running_nodes()],
case lists:filter(fun(Item) -> Item =/= ok end, Results) of
[] ->
return(ok);
Errors ->
return(lists:last(Errors))
end.
unload(#{node := Node, module := Module}, _Params) ->
return(do_unload_module(Node, Module));
unload(#{module := Module}, _Params) ->
Results = [do_unload_module(Node, Module) || Node <- ekka_mnesia:running_nodes()],
case lists:filter(fun(Item) -> Item =/= ok end, Results) of
[] ->
return(ok);
Errors ->
return(lists:last(Errors))
end.
reload(#{node := Node, module := Module}, _Params) ->
case reload_module(Node, Module) of
ignore -> return(ok);
Result -> return(Result)
end;
reload(#{module := Module}, _Params) ->
Results = [reload_module(Node, Module) || Node <- ekka_mnesia:running_nodes()],
case lists:filter(fun(Item) -> Item =/= ok end, Results) of
[] ->
return(ok);
Errors ->
return(lists:last(Errors))
end.
%%------------------------------------------------------------------------------
%% Internal Functions
%%------------------------------------------------------------------------------
format(Node, Modules) ->
#{node => Node, modules => [format(Module) || Module <- Modules]}.
format(Name) ->
#{name => name(Name),
description => iolist_to_binary(Name:description())}.
list_modules() ->
[{Node, list_modules(Node)} || Node <- ekka_mnesia:running_nodes()].
list_modules(Node) when Node =:= node() ->
emqx_modules:list();
list_modules(Node) ->
rpc_call(Node, list_modules, [Node]).
do_load_module(Node, Module, Env) when Node =:= node() ->
emqx_modules:load(Module, Env);
do_load_module(Node, Module, Env) ->
rpc_call(Node, do_load_module, [Node, Module, Env]).
do_unload_module(Node, Module) when Node =:= node() ->
emqx_modules:unload(Module);
do_unload_module(Node, Module) ->
rpc_call(Node, do_unload_module, [Node, Module]).
reload_module(Node, Module) when Node =:= node() ->
emqx_modules:reload(Module);
reload_module(Node, Module) ->
rpc_call(Node, reload_module, [Node, Module]).
rpc_call(Node, Fun, Args) ->
case rpc:call(Node, ?MODULE, Fun, Args) of
{badrpc, Reason} -> {error, Reason};
Res -> Res
end.
name(emqx_mod_delayed) -> delayed;
name(emqx_mod_presence) -> presence;
name(emqx_mod_recon) -> recon;
name(emqx_mod_rewrite) -> rewrite;
name(emqx_mod_topic_metrics) -> topic_metrics.
return(_) ->
%% TODO: V5 API
ok.

View File

@ -23,11 +23,26 @@
-export([stop/1]).
start(_Type, _Args) ->
{ok, Pid} = emqx_mod_sup:start_link(),
ok = emqx_modules:load(),
emqx_ctl:register_command(modules, {emqx_modules, cli}, []),
{ok, Pid}.
{ok, Sup} = emqx_modules_sup:start_link(),
maybe_enable_modules(),
{ok, Sup}.
stop(_State) ->
emqx_ctl:unregister_command(modules),
emqx_modules:unload().
maybe_disable_modules(),
ok.
maybe_enable_modules() ->
emqx_config:get([delayed, enable], true) andalso emqx_delayed:enable(),
emqx_config:get([presence, enable], true) andalso emqx_presence:enable(),
emqx_config:get([telemetry, enable], true) andalso emqx_telemetry:enable(),
emqx_config:get([recon, enable], true) andalso emqx_recon:enable(),
emqx_rewrite:enable(),
emqx_topic_metrics:enable().
maybe_disable_modules() ->
emqx_config:get([delayed, enable], true) andalso emqx_delayed:disable(),
emqx_config:get([presence, enable], true) andalso emqx_presence:disable(),
emqx_config:get([telemetry, enable], true) andalso emqx_telemetry:disable(),
emqx_config:get([recon, enable], true) andalso emqx_recon:disable(),
emqx_rewrite:disable(),
emqx_topic_metrics:disable().

View File

@ -23,40 +23,31 @@
-export([ structs/0
, fields/1]).
structs() -> ["emqx_modules"].
structs() ->
["delayed",
"recon",
"telemetry",
"presence",
"rewrite",
"topic_metrics"].
fields("emqx_modules") ->
[{modules, hoconsc:array(hoconsc:union([ hoconsc:ref(?MODULE, "common")
, hoconsc:ref(?MODULE, "presence")
, hoconsc:ref(?MODULE, "rewrite")
, hoconsc:ref(?MODULE, "topic_metrics")
, hoconsc:ref(?MODULE, "telemetry")
]))}];
fields("common") ->
[ {type, hoconsc:enum([delayed, recon])}
, {enable, emqx_schema:t(boolean(), undefined, false)}
fields(Name) when Name =:= "recon";
Name =:= "telemetry";
Name =:= "presence" ->
[ {enable, emqx_schema:t(boolean(), undefined, false)}
];
fields("presence") ->
[ {type, hoconsc:enum([presence])}
, {enable, emqx_schema:t(boolean(), undefined, false)}
, {qos, emqx_schema:t(integer(), undefined, 1)}
fields("delayed") ->
[ {enable, emqx_schema:t(boolean(), undefined, false)}
, {max_delayed_messages, emqx_schema:t(integer())}
];
fields("rewrite") ->
[ {type, hoconsc:enum([rewrite])}
, {enable, emqx_schema:t(boolean(), undefined, false)}
, {rules, hoconsc:array(hoconsc:ref(?MODULE, "rules"))}
[ {rules, hoconsc:array(hoconsc:ref(?MODULE, "rules"))}
];
fields("topic_metrics") ->
[ {type, hoconsc:enum([topic_metrics])}
, {enable, emqx_schema:t(boolean(), undefined, false)}
, {topics, hoconsc:array(binary())}
];
fields("telemetry") ->
[ {type, hoconsc:enum([telemetry])}
, {enable, emqx_schema:t(boolean(), undefined, false)}
[ {topics, hoconsc:array(binary())}
];
fields("rules") ->

View File

@ -0,0 +1,43 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_modules_sup).
-behaviour(supervisor).
-export([start_link/0]).
-export([init/1]).
%% Helper macro for declaring children of supervisor
-define(CHILD(Mod), #{id => Mod,
start => {Mod, start_link, []},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [Mod]}).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%%--------------------------------------------------------------------
%% Supervisor callbacks
%%--------------------------------------------------------------------
init([]) ->
{ok, {{one_for_one, 10, 3600},
[?CHILD(emqx_telemetry),
?CHILD(emqx_topic_metrics),
?CHILD(emqx_delayed)]}}.

View File

@ -14,55 +14,49 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_presence).
-behaviour(emqx_gen_mod).
-module(emqx_presence).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-logger_header("[Presence]").
%% emqx_gen_mod callbacks
-export([ load/1
, unload/1
, description/0
-export([ enable/0
, disable/0
]).
-export([ on_client_connected/3
, on_client_disconnected/4
-export([ on_client_connected/2
, on_client_disconnected/3
]).
-ifdef(TEST).
-export([reason/1]).
-endif.
load(Env) ->
emqx_hooks:put('client.connected', {?MODULE, on_client_connected, [Env]}),
emqx_hooks:put('client.disconnected', {?MODULE, on_client_disconnected, [Env]}).
enable() ->
emqx_hooks:put('client.connected', {?MODULE, on_client_connected, []}),
emqx_hooks:put('client.disconnected', {?MODULE, on_client_disconnected, []}).
unload(_Env) ->
disable() ->
emqx_hooks:del('client.connected', {?MODULE, on_client_connected}),
emqx_hooks:del('client.disconnected', {?MODULE, on_client_disconnected}).
description() ->
"EMQ X Presence Module".
%%--------------------------------------------------------------------
%% Callbacks
%%--------------------------------------------------------------------
on_client_connected(ClientInfo = #{clientid := ClientId}, ConnInfo, Env) ->
on_client_connected(ClientInfo = #{clientid := ClientId}, ConnInfo) ->
Presence = connected_presence(ClientInfo, ConnInfo),
case emqx_json:safe_encode(Presence) of
{ok, Payload} ->
emqx_broker:safe_publish(
make_msg(qos(Env), topic(connected, ClientId), Payload));
make_msg(topic(connected, ClientId), Payload));
{error, _Reason} ->
?LOG(error, "Failed to encode 'connected' presence: ~p", [Presence])
end.
on_client_disconnected(_ClientInfo = #{clientid := ClientId, username := Username},
Reason, _ConnInfo = #{disconnected_at := DisconnectedAt}, Env) ->
Reason, _ConnInfo = #{disconnected_at := DisconnectedAt}) ->
Presence = #{clientid => ClientId,
username => Username,
reason => reason(Reason),
@ -72,7 +66,7 @@ on_client_disconnected(_ClientInfo = #{clientid := ClientId, username := Usernam
case emqx_json:safe_encode(Presence) of
{ok, Payload} ->
emqx_broker:safe_publish(
make_msg(qos(Env), topic(disconnected, ClientId), Payload));
make_msg(topic(disconnected, ClientId), Payload));
{error, _Reason} ->
?LOG(error, "Failed to encode 'disconnected' presence: ~p", [Presence])
end.
@ -107,18 +101,16 @@ connected_presence(#{peerhost := PeerHost,
ts => erlang:system_time(millisecond)
}.
make_msg(QoS, Topic, Payload) ->
make_msg(Topic, Payload) ->
emqx_message:set_flag(
sys, emqx_message:make(
?MODULE, QoS, Topic, iolist_to_binary(Payload))).
?MODULE, 0, Topic, iolist_to_binary(Payload))).
topic(connected, ClientId) ->
emqx_topic:systop(iolist_to_binary(["clients/", ClientId, "/connected"]));
topic(disconnected, ClientId) ->
emqx_topic:systop(iolist_to_binary(["clients/", ClientId, "/disconnected"])).
qos(Env) -> maps:get(qos, Env, 0).
-compile({inline, [reason/1]}).
reason(Reason) when is_atom(Reason) -> Reason;
reason({shutdown, Reason}) when is_atom(Reason) -> Reason;

View File

@ -14,37 +14,24 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_recon).
-module(emqx_recon).
-behaviour(emqx_gen_mod).
%% emqx_gen_mod callbacks
-export([ load/1
, unload/1
, description/0
-export([ enable/0
, disable/0
]).
-export([cmd/1]).
%%--------------------------------------------------------------------
%% Load/Unload
%% enable/disable
%%--------------------------------------------------------------------
-spec(load(list()) -> ok).
load(_Env) ->
load().
-spec(unload(list()) -> ok).
unload(_Env) ->
unload().
description() ->
"EMQ X Recon Module".
load() ->
enable() ->
emqx_ctl:register_command(recon, {?MODULE, cmd}, []).
disable() ->
emqx_ctl:unregister_command(recon).
cmd(["memory"]) ->
Print = fun(Key, Keyword) ->
emqx_ctl:print("~-20s: ~w~n", [concat(Key, Keyword), recon_alloc:memory(Key, Keyword)])
@ -76,9 +63,6 @@ cmd(_) ->
{"recon remote_load Mod", "recon:remote_load(Mod)"},
{"recon proc_count Attr N","recon:proc_count(Attr, N)"}]).
unload() ->
emqx_ctl:unregister_command(recon).
concat(Key, Keyword) ->
lists:concat([atom_to_list(Key), "/", atom_to_list(Keyword)]).

View File

@ -14,9 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_rewrite).
-behaviour(emqx_gen_mod).
-module(emqx_rewrite).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
@ -33,21 +31,29 @@
, rewrite_publish/2
]).
%% emqx_gen_mod callbacks
-export([ load/1
, unload/1
, description/0
-export([ enable/0
, disable/0
]).
%%--------------------------------------------------------------------
%% Load/Unload
%%--------------------------------------------------------------------
load(Env) ->
{PubRules, SubRules} = compile(maps:get(rules, Env, [])),
emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}),
emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}),
emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]}).
enable() ->
Rules = emqx_config:get([rewrite, rules], []),
case Rules =:= [] of
true -> ok;
false ->
{PubRules, SubRules} = compile(Rules),
emqx_hooks:put('client.subscribe', {?MODULE, rewrite_subscribe, [SubRules]}),
emqx_hooks:put('client.unsubscribe', {?MODULE, rewrite_unsubscribe, [SubRules]}),
emqx_hooks:put('message.publish', {?MODULE, rewrite_publish, [PubRules]})
end.
disable() ->
emqx_hooks:del('client.subscribe', {?MODULE, rewrite_subscribe}),
emqx_hooks:del('client.unsubscribe', {?MODULE, rewrite_unsubscribe}),
emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}).
rewrite_subscribe(_ClientInfo, _Properties, TopicFilters, Rules) ->
{ok, [{match_and_rewrite(Topic, Rules), Opts} || {Topic, Opts} <- TopicFilters]}.
@ -58,13 +64,6 @@ rewrite_unsubscribe(_ClientInfo, _Properties, TopicFilters, Rules) ->
rewrite_publish(Message = #message{topic = Topic}, Rules) ->
{ok, Message#message{topic = match_and_rewrite(Topic, Rules)}}.
unload(_) ->
emqx_hooks:del('client.subscribe', {?MODULE, rewrite_subscribe}),
emqx_hooks:del('client.unsubscribe', {?MODULE, rewrite_unsubscribe}),
emqx_hooks:del('message.publish', {?MODULE, rewrite_publish}).
description() ->
"EMQ X Topic Rewrite Module".
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

View File

@ -14,12 +14,10 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_telemetry).
-module(emqx_telemetry).
-behaviour(gen_server).
-behaviour(emqx_gen_mod).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
@ -34,7 +32,7 @@
-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
-export([ start_link/1
-export([ start_link/0
, stop/0
]).
@ -48,10 +46,8 @@
, code_change/3
]).
%% emqx_gen_mod callbacks
-export([ load/1
, unload/1
, description/0
-export([ enable/0
, disable/0
]).
-export([ get_status/0
@ -111,16 +107,17 @@ mnesia(copy) ->
%% API
%%--------------------------------------------------------------------
start_link(Opts) ->
start_link() ->
Opts = emqx_config:get([telemetry], #{}),
gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
stop() ->
gen_server:stop(?MODULE).
load(_Env) ->
enable() ->
gen_server:call(?MODULE, enable).
unload(_Env) ->
disable() ->
gen_server:call(?MODULE, disable).
get_status() ->
@ -132,9 +129,6 @@ get_uuid() ->
get_telemetry() ->
gen_server:call(?MODULE, get_telemetry).
description() ->
"".
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
@ -300,7 +294,8 @@ active_plugins() ->
end, [], emqx_plugins:list()).
active_modules() ->
emqx_modules:list().
[].
% emqx_modules:list().
num_clients() ->
emqx_stats:getstat('connections.max').

View File

@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_telemetry_api).
-module(emqx_telemetry_api).
-behavior(minirest_api).
@ -149,7 +149,7 @@ status(put, Request) ->
{ok, Body, _} = cowboy_req:read_body(Request),
Params = emqx_json:decode(Body, [return_maps]),
Enable = maps:get(<<"enable">>, Params),
case Enable =:= emqx_mod_telemetry:get_status() of
case Enable =:= emqx_telemetry:get_status() of
true ->
Reason = case Enable of
true -> <<"Telemetry status is already enabled">>;
@ -168,7 +168,7 @@ data(get, _Request) ->
%%--------------------------------------------------------------------
% cli(["enable", Enable0]) ->
% Enable = list_to_atom(Enable0),
% case Enable =:= emqx_mod_telemetry:is_enabled() of
% case Enable =:= emqx_telemetry:is_enabled() of
% true ->
% case Enable of
% true -> emqx_ctl:print("Telemetry status is already enabled~n");
@ -215,18 +215,18 @@ enable_telemetry(Enable) ->
enable_telemetry(Node, Enable) when Node =:= node() ->
case Enable of
true ->
emqx_mod_telemetry:load(#{});
emqx_telemetry:enable();
false ->
emqx_mod_telemetry:unload(#{})
emqx_telemetry:disable()
end;
enable_telemetry(Node, Enable) ->
rpc_call(Node, ?MODULE, enable_telemetry, [Node, Enable]).
get_telemetry_status() ->
#{enabled => emqx_mod_telemetry:get_status()}.
#{enabled => emqx_telemetry:get_status()}.
get_telemetry_data() ->
{ok, TelemetryData} = emqx_mod_telemetry:get_telemetry(),
{ok, TelemetryData} = emqx_telemetry:get_telemetry(),
TelemetryData.
rpc_call(Node, Module, Fun, Args) ->

View File

@ -14,10 +14,9 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_topic_metrics).
-module(emqx_topic_metrics).
-behaviour(gen_server).
-behaviour(emqx_gen_mod).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
@ -25,11 +24,6 @@
-logger_header("[TOPIC_METRICS]").
-export([ load/1
, unload/1
, description/0
]).
-export([ on_message_publish/1
, on_message_delivered/2
, on_message_dropped/3
@ -40,11 +34,11 @@
, stop/0
]).
-export([ inc/2
, inc/3
, val/2
, rate/2
, metrics/1
-export([ enable/0
, disable/0
]).
-export([ metrics/1
, register/1
, unregister/1
, unregister_all/0
@ -52,9 +46,6 @@
, all_registered_topics/0
]).
%% stats.
-export([ rates/2 ]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
@ -63,7 +54,10 @@
, terminate/2
]).
-define(CRefID(Topic), {?MODULE, Topic}).
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
-define(MAX_TOPICS, 512).
-define(TAB, ?MODULE).
@ -99,21 +93,15 @@
%%------------------------------------------------------------------------------
%% APIs
%%------------------------------------------------------------------------------
load(_Env) ->
emqx_mod_sup:start_child(?MODULE, worker),
enable() ->
emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}),
emqx_hooks:put('message.dropped', {?MODULE, on_message_dropped, []}),
emqx_hooks:put('message.delivered', {?MODULE, on_message_delivered, []}).
unload(_Env) ->
disable() ->
emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
emqx_hooks:del('message.dropped', {?MODULE, on_message_dropped}),
emqx_hooks:del('message.delivered', {?MODULE, on_message_delivered}),
emqx_mod_sup:stop_child(?MODULE).
description() ->
"EMQ X Topic Metrics Module".
emqx_hooks:del('message.delivered', {?MODULE, on_message_delivered}).
on_message_publish(#message{topic = Topic, qos = QoS}) ->
case is_registered(Topic) of
@ -150,55 +138,12 @@ on_message_dropped(#message{topic = Topic}, _, _) ->
end.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
Opts = emqx_config:get([topic_metrics], #{}),
gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
stop() ->
gen_server:stop(?MODULE).
try_inc(Topic, Metric) ->
_ = inc(Topic, Metric),
ok.
inc(Topic, Metric) ->
inc(Topic, Metric, 1).
inc(Topic, Metric, Val) ->
case get_counters(Topic) of
{error, topic_not_found} ->
{error, topic_not_found};
CRef ->
case metric_idx(Metric) of
{error, invalid_metric} ->
{error, invalid_metric};
Idx ->
counters:add(CRef, Idx, Val)
end
end.
val(Topic, Metric) ->
case ets:lookup(?TAB, Topic) of
[] ->
{error, topic_not_found};
[{Topic, CRef}] ->
case metric_idx(Metric) of
{error, invalid_metric} ->
{error, invalid_metric};
Idx ->
counters:get(CRef, Idx)
end
end.
rate(Topic, Metric) ->
case rates(Topic, Metric) of
#{short := Last} ->
Last;
{error, Reason} ->
{error, Reason}
end.
rates(Topic, Metric) ->
gen_server:call(?MODULE, {get_rates, Topic, Metric}).
metrics(Topic) ->
case ets:lookup(?TAB, Topic) of
[] ->
@ -229,7 +174,7 @@ all_registered_topics() ->
%% gen_server callbacks
%%--------------------------------------------------------------------
init([]) ->
init([_Opts]) ->
erlang:process_flag(trap_exit, true),
ok = emqx_tables:new(?TAB, [{read_concurrency, true}]),
erlang:send_after(timer:seconds(?TICKING_INTERVAL), self(), ticking),
@ -279,7 +224,7 @@ handle_call({get_rates, Topic, Metric}, _From, State = #state{speeds = Speeds})
undefined ->
{reply, {error, invalid_metric}, State};
#speed{last = Short, last_medium = Medium, last_long = Long} ->
{reply, #{ short => Short, medium => Medium, long => Long }, State}
{reply, #{short => Short, medium => Medium, long => Long }, State}
end
end.
@ -309,6 +254,47 @@ terminate(_Reason, _State) ->
%% Internal Functions
%%------------------------------------------------------------------------------
try_inc(Topic, Metric) ->
_ = inc(Topic, Metric),
ok.
inc(Topic, Metric) ->
inc(Topic, Metric, 1).
inc(Topic, Metric, Val) ->
case get_counters(Topic) of
{error, topic_not_found} ->
{error, topic_not_found};
CRef ->
case metric_idx(Metric) of
{error, invalid_metric} ->
{error, invalid_metric};
Idx ->
counters:add(CRef, Idx, Val)
end
end.
val(Topic, Metric) ->
case ets:lookup(?TAB, Topic) of
[] ->
{error, topic_not_found};
[{Topic, CRef}] ->
case metric_idx(Metric) of
{error, invalid_metric} ->
{error, invalid_metric};
Idx ->
counters:get(CRef, Idx)
end
end.
rate(Topic, Metric) ->
case gen_server:call(?MODULE, {get_rates, Topic, Metric}) of
#{short := Last} ->
Last;
{error, Reason} ->
{error, Reason}
end.
metric_idx('messages.in') -> 01;
metric_idx('messages.out') -> 02;
metric_idx('messages.qos0.in') -> 03;

View File

@ -0,0 +1,207 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_topic_metrics_api).
% -rest_api(#{name => list_all_topic_metrics,
% method => 'GET',
% path => "/topic-metrics",
% func => list,
% descr => "A list of all topic metrics of all nodes in the cluster"}).
% -rest_api(#{name => list_topic_metrics,
% method => 'GET',
% path => "/topic-metrics/:bin:topic",
% func => list,
% descr => "A list of specfied topic metrics of all nodes in the cluster"}).
% -rest_api(#{name => register_topic_metrics,
% method => 'POST',
% path => "/topic-metrics",
% func => register,
% descr => "Register topic metrics"}).
% -rest_api(#{name => unregister_all_topic_metrics,
% method => 'DELETE',
% path => "/topic-metrics",
% func => unregister,
% descr => "Unregister all topic metrics"}).
% -rest_api(#{name => unregister_topic_metrics,
% method => 'DELETE',
% path => "/topic-metrics/:bin:topic",
% func => unregister,
% descr => "Unregister topic metrics"}).
% -export([ list/2
% , register/2
% , unregister/2
% ]).
% -export([ get_topic_metrics/2
% , register_topic_metrics/2
% , unregister_topic_metrics/2
% , unregister_all_topic_metrics/1
% ]).
% list(#{topic := Topic0}, _Params) ->
% execute_when_enabled(fun() ->
% Topic = emqx_mgmt_util:urldecode(Topic0),
% case safe_validate(Topic) of
% true ->
% case get_topic_metrics(Topic) of
% {error, Reason} -> return({error, Reason});
% Metrics -> return({ok, maps:from_list(Metrics)})
% end;
% false ->
% return({error, invalid_topic_name})
% end
% end);
% list(_Bindings, _Params) ->
% execute_when_enabled(fun() ->
% case get_all_topic_metrics() of
% {error, Reason} -> return({error, Reason});
% Metrics -> return({ok, Metrics})
% end
% end).
% register(_Bindings, Params) ->
% execute_when_enabled(fun() ->
% case proplists:get_value(<<"topic">>, Params) of
% undefined ->
% return({error, missing_required_params});
% Topic ->
% case safe_validate(Topic) of
% true ->
% register_topic_metrics(Topic),
% return(ok);
% false ->
% return({error, invalid_topic_name})
% end
% end
% end).
% unregister(Bindings, _Params) when map_size(Bindings) =:= 0 ->
% execute_when_enabled(fun() ->
% unregister_all_topic_metrics(),
% return(ok)
% end);
% unregister(#{topic := Topic0}, _Params) ->
% execute_when_enabled(fun() ->
% Topic = emqx_mgmt_util:urldecode(Topic0),
% case safe_validate(Topic) of
% true ->
% unregister_topic_metrics(Topic),
% return(ok);
% false ->
% return({error, invalid_topic_name})
% end
% end).
% execute_when_enabled(Fun) ->
% case emqx_modules:find_module(topic_metrics) of
% true ->
% Fun();
% false ->
% return({error, module_not_loaded})
% end.
% safe_validate(Topic) ->
% try emqx_topic:validate(name, Topic) of
% true -> true
% catch
% error:_Error ->
% false
% end.
% get_all_topic_metrics() ->
% lists:foldl(fun(Topic, Acc) ->
% case get_topic_metrics(Topic) of
% {error, _Reason} ->
% Acc;
% Metrics ->
% [#{topic => Topic, metrics => Metrics} | Acc]
% end
% end, [], emqx_mod_topic_metrics:all_registered_topics()).
% get_topic_metrics(Topic) ->
% lists:foldl(fun(Node, Acc) ->
% case get_topic_metrics(Node, Topic) of
% {error, _Reason} ->
% Acc;
% Metrics ->
% case Acc of
% [] -> Metrics;
% _ ->
% lists:foldl(fun({K, V}, Acc0) ->
% [{K, V + proplists:get_value(K, Metrics, 0)} | Acc0]
% end, [], Acc)
% end
% end
% end, [], ekka_mnesia:running_nodes()).
% get_topic_metrics(Node, Topic) when Node =:= node() ->
% emqx_mod_topic_metrics:metrics(Topic);
% get_topic_metrics(Node, Topic) ->
% rpc_call(Node, get_topic_metrics, [Node, Topic]).
% register_topic_metrics(Topic) ->
% Results = [register_topic_metrics(Node, Topic) || Node <- ekka_mnesia:running_nodes()],
% case lists:any(fun(Item) -> Item =:= ok end, Results) of
% true -> ok;
% false -> lists:last(Results)
% end.
% register_topic_metrics(Node, Topic) when Node =:= node() ->
% emqx_mod_topic_metrics:register(Topic);
% register_topic_metrics(Node, Topic) ->
% rpc_call(Node, register_topic_metrics, [Node, Topic]).
% unregister_topic_metrics(Topic) ->
% Results = [unregister_topic_metrics(Node, Topic) || Node <- ekka_mnesia:running_nodes()],
% case lists:any(fun(Item) -> Item =:= ok end, Results) of
% true -> ok;
% false -> lists:last(Results)
% end.
% unregister_topic_metrics(Node, Topic) when Node =:= node() ->
% emqx_mod_topic_metrics:unregister(Topic);
% unregister_topic_metrics(Node, Topic) ->
% rpc_call(Node, unregister_topic_metrics, [Node, Topic]).
% unregister_all_topic_metrics() ->
% Results = [unregister_all_topic_metrics(Node) || Node <- ekka_mnesia:running_nodes()],
% case lists:any(fun(Item) -> Item =:= ok end, Results) of
% true -> ok;
% false -> lists:last(Results)
% end.
% unregister_all_topic_metrics(Node) when Node =:= node() ->
% emqx_mod_topic_metrics:unregister_all();
% unregister_all_topic_metrics(Node) ->
% rpc_call(Node, unregister_topic_metrics, [Node]).
% rpc_call(Node, Fun, Args) ->
% case rpc:call(Node, ?MODULE, Fun, Args) of
% {badrpc, Reason} -> {error, Reason};
% Res -> Res
% end.
% return(_) ->
% %% TODO: V5 API
% ok.

View File

@ -14,9 +14,9 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_delayed_SUITE).
-module(emqx_delayed_SUITE).
-import(emqx_mod_delayed, [on_message_publish/1]).
-import(emqx_delayed, [on_message_publish/1]).
-compile(export_all).
-compile(nowarn_export_all).
@ -35,6 +35,8 @@ all() ->
emqx_ct:all(?MODULE).
init_per_suite(Config) ->
ekka_mnesia:start(),
ok = emqx_delayed:mnesia(boot),
emqx_ct_helpers:start_apps([emqx_modules]),
Config.
@ -46,26 +48,28 @@ end_per_suite(_) ->
%%--------------------------------------------------------------------
t_load_case(_) ->
UnHooks = emqx_hooks:lookup('message.publish'),
?assertEqual([], UnHooks),
ok = emqx_mod_delayed:load([]),
Hooks = emqx_hooks:lookup('message.publish'),
?assertEqual(1, length(Hooks)),
MFA = {emqx_delayed,on_message_publish,[]},
?assertEqual(false, lists:keyfind(MFA, 2, Hooks)),
ok = emqx_delayed:enable(),
Hooks1 = emqx_hooks:lookup('message.publish'),
ct:pal("----~p~n", [Hooks1]),
?assertNotEqual(false, lists:keyfind(MFA, 2, Hooks1)),
ok.
t_delayed_message(_) ->
ok = emqx_mod_delayed:load([]),
ok = emqx_delayed:enable(),
DelayedMsg = emqx_message:make(?MODULE, 1, <<"$delayed/1/publish">>, <<"delayed_m">>),
?assertEqual({stop, DelayedMsg#message{topic = <<"publish">>, headers = #{allow_publish => false}}}, on_message_publish(DelayedMsg)),
Msg = emqx_message:make(?MODULE, 1, <<"no_delayed_msg">>, <<"no_delayed">>),
?assertEqual({ok, Msg}, on_message_publish(Msg)),
[Key] = mnesia:dirty_all_keys(emqx_mod_delayed),
[#delayed_message{msg = #message{payload = Payload}}] = mnesia:dirty_read({emqx_mod_delayed, Key}),
[Key] = mnesia:dirty_all_keys(emqx_delayed),
[#delayed_message{msg = #message{payload = Payload}}] = mnesia:dirty_read({emqx_delayed, Key}),
?assertEqual(<<"delayed_m">>, Payload),
timer:sleep(5000),
EmptyKey = mnesia:dirty_all_keys(emqx_mod_delayed),
EmptyKey = mnesia:dirty_all_keys(emqx_delayed),
?assertEqual([], EmptyKey),
ok = emqx_mod_delayed:unload([]).
ok = emqx_delayed:disable().

View File

@ -1,95 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_topic_metrics_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([emqx_modules]),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([emqx_modules]).
t_nonexistent_topic_metrics(_) ->
emqx_mod_topic_metrics:load([]),
?assertEqual({error, topic_not_found}, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
?assertEqual({error, topic_not_found}, emqx_mod_topic_metrics:inc(<<"a/b/c">>, 'messages.in')),
?assertEqual({error, topic_not_found}, emqx_mod_topic_metrics:rate(<<"a/b/c">>, 'messages.in')),
?assertEqual({error, topic_not_found}, emqx_mod_topic_metrics:rates(<<"a/b/c">>, 'messages.in')),
emqx_mod_topic_metrics:register(<<"a/b/c">>),
?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
?assertEqual({error, invalid_metric}, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'invalid.metrics')),
?assertEqual({error, invalid_metric}, emqx_mod_topic_metrics:inc(<<"a/b/c">>, 'invalid.metrics')),
?assertEqual({error, invalid_metric}, emqx_mod_topic_metrics:rate(<<"a/b/c">>, 'invalid.metrics')),
?assertEqual({error, invalid_metric}, emqx_mod_topic_metrics:rates(<<"a/b/c">>, 'invalid.metrics')),
emqx_mod_topic_metrics:unregister(<<"a/b/c">>),
emqx_mod_topic_metrics:unload([]).
t_topic_metrics(_) ->
emqx_mod_topic_metrics:load([]),
?assertEqual(false, emqx_mod_topic_metrics:is_registered(<<"a/b/c">>)),
?assertEqual([], emqx_mod_topic_metrics:all_registered_topics()),
emqx_mod_topic_metrics:register(<<"a/b/c">>),
?assertEqual(true, emqx_mod_topic_metrics:is_registered(<<"a/b/c">>)),
?assertEqual([<<"a/b/c">>], emqx_mod_topic_metrics:all_registered_topics()),
?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
?assertEqual(ok, emqx_mod_topic_metrics:inc(<<"a/b/c">>, 'messages.in')),
?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
?assert(emqx_mod_topic_metrics:rate(<<"a/b/c">>, 'messages.in') =:= 0),
?assert(emqx_mod_topic_metrics:rates(<<"a/b/c">>, 'messages.in') =:= #{long => 0,medium => 0,short => 0}),
emqx_mod_topic_metrics:unregister(<<"a/b/c">>),
emqx_mod_topic_metrics:unload([]).
t_hook(_) ->
emqx_mod_topic_metrics:load([]),
emqx_mod_topic_metrics:register(<<"a/b/c">>),
?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.in')),
?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.out')),
?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.out')),
?assertEqual(0, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')),
{ok, C} = emqtt:start_link([{host, "localhost"},
{clientid, "myclient"},
{username, "myuser"}]),
{ok, _} = emqtt:connect(C),
emqtt:publish(C, <<"a/b/c">>, <<"Hello world">>, 0),
ct:sleep(100),
?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.in')),
?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')),
emqtt:subscribe(C, <<"a/b/c">>),
emqtt:publish(C, <<"a/b/c">>, <<"Hello world">>, 0),
ct:sleep(100),
?assertEqual(2, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
?assertEqual(2, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.in')),
?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.out')),
?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.out')),
?assertEqual(1, emqx_mod_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')),
emqx_mod_topic_metrics:unregister(<<"a/b/c">>),
emqx_mod_topic_metrics:unload([]).

View File

@ -1,203 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_modules_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-define(CONTENT_TYPE, "application/x-www-form-urlencoded").
-define(HOST, "http://127.0.0.1:8081/").
-define(API_VERSION, "v4").
-define(BASE_PATH, "api").
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([emqx_management, emqx_modules], fun set_special_configs/1),
emqx_ct_http:create_default_app(),
Config.
set_special_configs(emqx_management) ->
emqx_config:put([emqx_management], #{listeners => [#{protocol => http, port => 8081}],
applications =>[#{id => "admin", secret => "public"}]}),
ok;
set_special_configs(_) ->
ok.
end_per_suite(_Config) ->
emqx_ct_http:delete_default_app(),
emqx_ct_helpers:stop_apps([emqx_modules, emqx_management]).
t_load(_) ->
?assertEqual(ok, emqx_modules:unload()),
?assertEqual(ok, emqx_modules:load()),
?assertEqual({error, not_started}, emqx_modules:unload(rewrite)),
?assertEqual(ignore, emqx_modules:reload(rewrite)).
t_list(_) ->
emqx_modules:load(presence, #{qos => 1}),
?assertMatch([_ | _ ], emqx_modules:list()),
emqx_modules:unload(presence).
%% TODO: V5 API
%%t_modules_api(_) ->
%% emqx_modules:load(presence, #{qos => 1}),
%% timer:sleep(50),
%% {ok, Modules1} = request_api(get, api_path(["modules"]), auth_header_()),
%% [Modules11] = filter(get(<<"data">>, Modules1), <<"node">>, atom_to_binary(node(), utf8)),
%% [Module1] = filter(maps:get(<<"modules">>, Modules11), <<"name">>, <<"presence">>),
%% ?assertEqual(<<"presence">>, maps:get(<<"name">>, Module1)),
%% {ok, _} = request_api(put,
%% api_path(["modules",
%% atom_to_list(presence),
%% "unload"]),
%% auth_header_()),
%% {ok, Error1} = request_api(put,
%% api_path(["modules",
%% atom_to_list(presence),
%% "unload"]),
%% auth_header_()),
%% ?assertEqual(<<"not_started">>, get(<<"message">>, Error1)),
%% {ok, Modules2} = request_api(get,
%% api_path(["nodes", atom_to_list(node()), "modules"]),
%% auth_header_()),
%% [Module2] = filter(get(<<"data">>, Modules2), <<"name">>, <<"presence">>),
%% ?assertEqual(<<"presence">>, maps:get(<<"name">>, Module2)),
%%
%% {ok, _} = request_api(put,
%% api_path(["nodes",
%% atom_to_list(node()),
%% "modules",
%% atom_to_list(presence),
%% "load"]),
%% auth_header_()),
%% {ok, Modules3} = request_api(get,
%% api_path(["nodes", atom_to_list(node()), "modules"]),
%% auth_header_()),
%% [Module3] = filter(get(<<"data">>, Modules3), <<"name">>, <<"presence">>),
%% ?assertEqual(<<"presence">>, maps:get(<<"name">>, Module3)),
%%
%% {ok, _} = request_api(put,
%% api_path(["nodes",
%% atom_to_list(node()),
%% "modules",
%% atom_to_list(presence),
%% "unload"]),
%% auth_header_()),
%% {ok, Error2} = request_api(put,
%% api_path(["nodes",
%% atom_to_list(node()),
%% "modules",
%% atom_to_list(presence),
%% "unload"]),
%% auth_header_()),
%% ?assertEqual(<<"not_started">>, get(<<"message">>, Error2)),
%% emqx_modules:unload(presence).
t_modules_cmd(_) ->
mock_print(),
meck:new(emqx_modules, [non_strict, passthrough]),
meck:expect(emqx_modules, load, fun(_) -> ok end),
meck:expect(emqx_modules, unload, fun(_) -> ok end),
meck:expect(emqx_modules, reload, fun(_) -> ok end),
?assertEqual(emqx_modules:cli(["list"]), ok),
?assertEqual(emqx_modules:cli(["load", "delayed"]),
"Module delayed loaded successfully.\n"),
?assertEqual(emqx_modules:cli(["unload", "delayed"]),
"Module delayed unloaded successfully.\n"),
unmock_print().
%% For: https://github.com/emqx/emqx/issues/4511
t_join_cluster(_) ->
%% Started by emqx application
{error, {already_started, emqx_modules}} = application:start(emqx_modules),
%% After clustered
emqx:shutdown(),
emqx:reboot(),
{error,{already_started,emqx_modules}} = application:start(emqx_modules),
%% After emqx reboot, we should not interfere with other tests
_ = end_per_suite([]),
_ = init_per_suite([]),
ok.
mock_print() ->
catch meck:unload(emqx_ctl),
meck:new(emqx_ctl, [non_strict, passthrough]),
meck:expect(emqx_ctl, print, fun(Arg) -> emqx_ctl:format(Arg) end),
meck:expect(emqx_ctl, print, fun(Msg, Arg) -> emqx_ctl:format(Msg, Arg) end),
meck:expect(emqx_ctl, usage, fun(Usages) -> emqx_ctl:format_usage(Usages) end),
meck:expect(emqx_ctl, usage, fun(Cmd, Descr) -> emqx_ctl:format_usage(Cmd, Descr) end).
unmock_print() ->
meck:unload(emqx_ctl).
get(Key, ResponseBody) ->
maps:get(Key, jiffy:decode(list_to_binary(ResponseBody), [return_maps])).
request_api(Method, Url, Auth) ->
request_api(Method, Url, [], Auth, []).
request_api(Method, Url, QueryParams, Auth) ->
request_api(Method, Url, QueryParams, Auth, []).
request_api(Method, Url, QueryParams, Auth, []) ->
NewUrl = case QueryParams of
"" -> Url;
_ -> Url ++ "?" ++ QueryParams
end,
do_request_api(Method, {NewUrl, [Auth]});
request_api(Method, Url, QueryParams, Auth, Body) ->
NewUrl = case QueryParams of
"" -> Url;
_ -> Url ++ "?" ++ QueryParams
end,
do_request_api(Method, {NewUrl, [Auth], "application/json", emqx_json:encode(Body)}).
do_request_api(Method, Request)->
ct:pal("Method: ~p, Request: ~p", [Method, Request]),
case httpc:request(Method, Request, [], []) of
{error, socket_closed_remotely} ->
{error, socket_closed_remotely};
{ok, {{"HTTP/1.1", Code, _}, _, Return} }
when Code =:= 200 orelse Code =:= 201 ->
{ok, Return};
{ok, {Reason, _, _}} ->
{error, Reason}
end.
auth_header_() ->
AppId = <<"admin">>,
AppSecret = <<"public">>,
auth_header_(binary_to_list(AppId), binary_to_list(AppSecret)).
auth_header_(User, Pass) ->
Encoded = base64:encode_to_string(lists:append([User,":",Pass])),
{"Authorization","Basic " ++ Encoded}.
api_path(Parts)->
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION] ++ Parts).
filter(List, Key, Value) ->
lists:filter(fun(Item) ->
maps:get(Key, Item) == Value
end, List).

View File

@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_presence_SUITE).
-module(emqx_presence_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
@ -27,16 +27,13 @@ all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([emqx_modules]),
%% Ensure all the modules unloaded.
ok = emqx_modules:unload(),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([emqx_modules]).
%% Test case for emqx_mod_presence
t_mod_presence(_) ->
ok = emqx_mod_presence:load(#{qos => ?QOS_1}),
ok = emqx_presence:enable(),
{ok, C1} = emqtt:start_link([{clientid, <<"monsys">>}]),
{ok, _} = emqtt:connect(C1),
{ok, _Props, [?QOS_1]} = emqtt:subscribe(C1, <<"$SYS/brokers/+/clients/#">>, qos1),
@ -49,16 +46,16 @@ t_mod_presence(_) ->
ok = emqtt:disconnect(C2),
ok = recv_and_check_presence(<<"clientid">>, <<"disconnected">>),
ok = emqtt:disconnect(C1),
ok = emqx_mod_presence:unload([]).
ok = emqx_presence:disable().
t_mod_presence_reason(_) ->
?assertEqual(normal, emqx_mod_presence:reason(normal)),
?assertEqual(discarded, emqx_mod_presence:reason({shutdown, discarded})),
?assertEqual(tcp_error, emqx_mod_presence:reason({tcp_error, einval})),
?assertEqual(internal_error, emqx_mod_presence:reason(<<"unknown error">>)).
?assertEqual(normal, emqx_presence:reason(normal)),
?assertEqual(discarded, emqx_presence:reason({shutdown, discarded})),
?assertEqual(tcp_error, emqx_presence:reason({tcp_error, einval})),
?assertEqual(internal_error, emqx_presence:reason(<<"unknown error">>)).
recv_and_check_presence(ClientId, Presence) ->
{ok, #{qos := ?QOS_1, topic := Topic, payload := Payload}} = receive_publish(100),
{ok, #{qos := ?QOS_0, topic := Topic, payload := Payload}} = receive_publish(100),
?assertMatch([<<"$SYS">>, <<"brokers">>, _Node, <<"clients">>, ClientId, Presence],
binary:split(Topic, <<"/">>, [global])),
case Presence of

View File

@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_rewrite_SUITE).
-module(emqx_rewrite_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
@ -38,8 +38,6 @@ all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([emqx_modules]),
%% Ensure all the modules unloaded.
ok = emqx_modules:unload(),
Config.
end_per_suite(_Config) ->
@ -47,7 +45,12 @@ end_per_suite(_Config) ->
%% Test case for emqx_mod_write
t_mod_rewrite(_Config) ->
ok = emqx_mod_rewrite:load(#{rules => ?RULES}),
%% important! let emqx_schema include the current app!
meck:new(emqx_schema, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_schema, includes, fun() -> ["rewrite"] end ),
meck:expect(emqx_schema, extra_schema_fields, fun(FieldName) -> emqx_modules_schema:fields(FieldName) end),
ok = emqx_config:update([rewrite, rules], ?RULES),
ok = emqx_rewrite:enable(),
{ok, C} = emqtt:start_link([{clientid, <<"rewrite_client">>}]),
{ok, _} = emqtt:connect(C),
PubOrigTopics = [<<"x/y/2">>, <<"x/1/2">>],
@ -79,14 +82,15 @@ t_mod_rewrite(_Config) ->
{ok, _, _} = emqtt:unsubscribe(C, PubDestTopics),
ok = emqtt:disconnect(C),
ok = emqx_mod_rewrite:unload(?RULES).
ok = emqx_rewrite:disable(),
meck:unload(emqx_schema).
t_rewrite_rule(_Config) ->
{PubRules, SubRules} = emqx_mod_rewrite:compile(?RULES),
?assertEqual(<<"z/y/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/y/2">>, PubRules)),
?assertEqual(<<"x/1/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/1/2">>, PubRules)),
?assertEqual(<<"y/z/b">>, emqx_mod_rewrite:match_and_rewrite(<<"y/a/z/b">>, SubRules)),
?assertEqual(<<"y/def">>, emqx_mod_rewrite:match_and_rewrite(<<"y/def">>, SubRules)).
{PubRules, SubRules} = emqx_rewrite:compile(?RULES),
?assertEqual(<<"z/y/2">>, emqx_rewrite:match_and_rewrite(<<"x/y/2">>, PubRules)),
?assertEqual(<<"x/1/2">>, emqx_rewrite:match_and_rewrite(<<"x/1/2">>, PubRules)),
?assertEqual(<<"y/z/b">>, emqx_rewrite:match_and_rewrite(<<"y/a/z/b">>, SubRules)),
?assertEqual(<<"y/def">>, emqx_rewrite:match_and_rewrite(<<"y/def">>, SubRules)).
%%--------------------------------------------------------------------
%% Internal functions

View File

@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_telemetry_SUITE).
-module(emqx_telemetry_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
@ -29,8 +29,7 @@ all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
ok = ekka_mnesia:start(),
ok = emqx_mod_telemetry:mnesia(boot),
emqx_ct_helpers:boot_modules(all),
ok = emqx_telemetry:mnesia(boot),
emqx_ct_helpers:start_apps([emqx_modules]),
Config.
@ -38,57 +37,57 @@ end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([emqx_modules]).
t_uuid(_) ->
UUID = emqx_mod_telemetry:generate_uuid(),
UUID = emqx_telemetry:generate_uuid(),
Parts = binary:split(UUID, <<"-">>, [global, trim]),
?assertEqual(5, length(Parts)),
{ok, UUID2} = emqx_mod_telemetry:get_uuid(),
emqx_mod_telemetry:unload(#{}),
emqx_mod_telemetry:load(#{}),
{ok, UUID3} = emqx_mod_telemetry:get_uuid(),
{ok, UUID2} = emqx_telemetry:get_uuid(),
emqx_telemetry:disable(),
emqx_telemetry:enable(),
{ok, UUID3} = emqx_telemetry:get_uuid(),
?assertEqual(UUID2, UUID3).
t_official_version(_) ->
true = emqx_mod_telemetry:official_version("0.0.0"),
true = emqx_mod_telemetry:official_version("1.1.1"),
true = emqx_mod_telemetry:official_version("10.10.10"),
false = emqx_mod_telemetry:official_version("0.0.0.0"),
false = emqx_mod_telemetry:official_version("1.1.a"),
true = emqx_mod_telemetry:official_version("0.0-alpha.1"),
true = emqx_mod_telemetry:official_version("1.1-alpha.1"),
true = emqx_mod_telemetry:official_version("10.10-alpha.10"),
false = emqx_mod_telemetry:official_version("1.1-alpha.0"),
true = emqx_mod_telemetry:official_version("1.1-beta.1"),
true = emqx_mod_telemetry:official_version("1.1-rc.1"),
false = emqx_mod_telemetry:official_version("1.1-alpha.a").
true = emqx_telemetry:official_version("0.0.0"),
true = emqx_telemetry:official_version("1.1.1"),
true = emqx_telemetry:official_version("10.10.10"),
false = emqx_telemetry:official_version("0.0.0.0"),
false = emqx_telemetry:official_version("1.1.a"),
true = emqx_telemetry:official_version("0.0-alpha.1"),
true = emqx_telemetry:official_version("1.1-alpha.1"),
true = emqx_telemetry:official_version("10.10-alpha.10"),
false = emqx_telemetry:official_version("1.1-alpha.0"),
true = emqx_telemetry:official_version("1.1-beta.1"),
true = emqx_telemetry:official_version("1.1-rc.1"),
false = emqx_telemetry:official_version("1.1-alpha.a").
t_get_telemetry(_) ->
{ok, TelemetryData} = emqx_mod_telemetry:get_telemetry(),
{ok, TelemetryData} = emqx_telemetry:get_telemetry(),
OTPVersion = bin(erlang:system_info(otp_release)),
?assertEqual(OTPVersion, get_value(otp_version, TelemetryData)),
{ok, UUID} = emqx_mod_telemetry:get_uuid(),
{ok, UUID} = emqx_telemetry:get_uuid(),
?assertEqual(UUID, get_value(uuid, TelemetryData)),
?assertEqual(0, get_value(num_clients, TelemetryData)).
t_enable(_) ->
ok = meck:new(emqx_mod_telemetry, [non_strict, passthrough, no_history, no_link]),
ok = meck:expect(emqx_mod_telemetry, official_version, fun(_) -> true end),
ok = emqx_mod_telemetry:load(#{}),
?assertEqual(true, emqx_mod_telemetry:get_status()),
ok = emqx_mod_telemetry:unload(#{}),
?assertEqual(false, emqx_mod_telemetry:get_status()),
meck:unload([emqx_mod_telemetry]).
ok = meck:new(emqx_telemetry, [non_strict, passthrough, no_history, no_link]),
ok = meck:expect(emqx_telemetry, official_version, fun(_) -> true end),
ok = emqx_telemetry:enable(),
?assertEqual(true, emqx_telemetry:get_status()),
ok = emqx_telemetry:disable(),
?assertEqual(false, emqx_telemetry:get_status()),
meck:unload([emqx_telemetry]).
t_send_after_enable(_) ->
ok = meck:new(emqx_mod_telemetry, [non_strict, passthrough, no_history, no_link]),
ok = meck:expect(emqx_mod_telemetry, official_version, fun(_) -> true end),
ok = emqx_mod_telemetry:unload(#{}),
ok = meck:new(emqx_telemetry, [non_strict, passthrough, no_history, no_link]),
ok = meck:expect(emqx_telemetry, official_version, fun(_) -> true end),
ok = emqx_telemetry:disable(),
ok = snabbkaffe:start_trace(),
try
ok = emqx_mod_telemetry:load(#{}),
ok = emqx_telemetry:enable(),
?assertMatch({ok, _}, ?block_until(#{?snk_kind := telemetry_data_reported}, 2000, 100))
after
ok = snabbkaffe:stop(),
meck:unload([emqx_mod_telemetry])
meck:unload([emqx_telemetry])
end.
bin(L) when is_list(L) ->

View File

@ -0,0 +1,95 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_topic_metrics_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([emqx_modules]),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([emqx_modules]).
t_nonexistent_topic_metrics(_) ->
emqx_topic_metrics:enable(),
?assertEqual({error, topic_not_found}, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
?assertEqual({error, topic_not_found}, emqx_topic_metrics:inc(<<"a/b/c">>, 'messages.in')),
?assertEqual({error, topic_not_found}, emqx_topic_metrics:rate(<<"a/b/c">>, 'messages.in')),
% ?assertEqual({error, topic_not_found}, emqx_topic_metrics:rates(<<"a/b/c">>, 'messages.in')),
emqx_topic_metrics:register(<<"a/b/c">>),
?assertEqual(0, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
?assertEqual({error, invalid_metric}, emqx_topic_metrics:val(<<"a/b/c">>, 'invalid.metrics')),
?assertEqual({error, invalid_metric}, emqx_topic_metrics:inc(<<"a/b/c">>, 'invalid.metrics')),
?assertEqual({error, invalid_metric}, emqx_topic_metrics:rate(<<"a/b/c">>, 'invalid.metrics')),
% ?assertEqual({error, invalid_metric}, emqx_topic_metrics:rates(<<"a/b/c">>, 'invalid.metrics')),
emqx_topic_metrics:unregister(<<"a/b/c">>),
emqx_topic_metrics:disable().
t_topic_metrics(_) ->
emqx_topic_metrics:enable(),
?assertEqual(false, emqx_topic_metrics:is_registered(<<"a/b/c">>)),
?assertEqual([], emqx_topic_metrics:all_registered_topics()),
emqx_topic_metrics:register(<<"a/b/c">>),
?assertEqual(true, emqx_topic_metrics:is_registered(<<"a/b/c">>)),
?assertEqual([<<"a/b/c">>], emqx_topic_metrics:all_registered_topics()),
?assertEqual(0, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
?assertEqual(ok, emqx_topic_metrics:inc(<<"a/b/c">>, 'messages.in')),
?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
?assert(emqx_topic_metrics:rate(<<"a/b/c">>, 'messages.in') =:= 0),
% ?assert(emqx_topic_metrics:rates(<<"a/b/c">>, 'messages.in') =:= #{long => 0,medium => 0,short => 0}),
emqx_topic_metrics:unregister(<<"a/b/c">>),
emqx_topic_metrics:disable().
t_hook(_) ->
emqx_topic_metrics:enable(),
emqx_topic_metrics:register(<<"a/b/c">>),
?assertEqual(0, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
?assertEqual(0, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.in')),
?assertEqual(0, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.out')),
?assertEqual(0, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.out')),
?assertEqual(0, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')),
{ok, C} = emqtt:start_link([{host, "localhost"},
{clientid, "myclient"},
{username, "myuser"}]),
{ok, _} = emqtt:connect(C),
emqtt:publish(C, <<"a/b/c">>, <<"Hello world">>, 0),
ct:sleep(100),
?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.in')),
?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')),
emqtt:subscribe(C, <<"a/b/c">>),
emqtt:publish(C, <<"a/b/c">>, <<"Hello world">>, 0),
ct:sleep(100),
?assertEqual(2, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.in')),
?assertEqual(2, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.in')),
?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.out')),
?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.qos0.out')),
?assertEqual(1, emqx_topic_metrics:val(<<"a/b/c">>, 'messages.dropped')),
emqx_topic_metrics:unregister(<<"a/b/c">>),
emqx_topic_metrics:disable().

View File

@ -8,10 +8,15 @@
, {"emqx_authn", emqx_authn_schema}
, {"authorization", emqx_authz_schema}
, {"emqx_bridge_mqtt", emqx_bridge_mqtt_schema}
, {"emqx_modules", emqx_modules_schema}
, {"emqx_management", emqx_management_schema}
, {"emqx_dashboard", emqx_dashboard_schema}
, {"emqx_gateway", emqx_gateway_schema}
, {"prometheus", emqx_prometheus_schema}
, {"statsd", emqx_statsd_schema}
, {"delayed", emqx_modules_schema}
, {"recon", emqx_modules_schema}
, {"telemetry", emqx_modules_schema}
, {"presence", emqx_modules_schema}
, {"rewrite", emqx_modules_schema}
, {"topic_metrics", emqx_modules_schema}
].