fix(emqx_st_statistics): change emqx_st_statistics implementation fro… (#6115)

* fix(emqx_st_statistics): change emqx_st_statistics implementation from plugin to module
This commit is contained in:
lafirest 2021-11-11 10:16:06 +08:00 committed by GitHub
parent 06a1b37992
commit 1dd18aa07a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 140 additions and 282 deletions

View File

@ -1,38 +0,0 @@
##--------------------------------------------------------------------
## EMQ X Slow Topics Statistics
##--------------------------------------------------------------------
## Threshold time of slow topics statistics
##
## Default: 10 seconds
st_statistics.threshold_time = 10S
## Time window of slow topics statistics
##
## Value: 5 minutes
st_statistics.time_window = 5M
## Maximum of slow topics log, log will clear when enter new time window
##
## Value: 500
st_statistics.max_log_num = 500
## Top-K record for slow topics, update from logs
##
## Value: 500
st_statistics.top_k_num = 500
## Topic of notification
##
## Defaut: $slow_topics
st_statistics.notice_topic = $slow_topics
## QoS of notification message in notice topic
##
## Defaut: 0
st_statistics.notice_qos = 0
## Maximum information number in one notification
##
## Default: 500
st_statistics.notice_batch_size = 500

View File

@ -1,58 +0,0 @@
%%-*- mode: erlang -*-
%% st_statistics config mapping
%% Threshold time of slow topics statistics
%% {$configurable}
{mapping, "st_statistics.threshold_time", "emqx_st_statistics.threshold_time",
[
{default, "10S"},
{datatype, [integer, {duration, ms}]}
]}.
%% Time window of slow topics statistics
%% {$configurable}
{mapping, "st_statistics.time_window", "emqx_st_statistics.time_window",
[
{default, "5M"},
{datatype, [integer, {duration, ms}]}
]}.
%% Maximum of slow topics log
%% {$configurable}
{mapping, "st_statistics.max_log_num", "emqx_st_statistics.max_log_num",
[
{default, 500},
{datatype, integer}
]}.
%% Top-K record for slow topics, update from logs
%% {$configurable}
{mapping, "st_statistics.top_k_num", "emqx_st_statistics.top_k_num",
[
{default, 500},
{datatype, integer}
]}.
%% Topic of notification
%% {$configurable}
{mapping, "st_statistics.notice_topic", "emqx_st_statistics.notice_topic",
[
{default, <<"slow_topics">>},
{datatype, string}
]}.
%% QoS of notification message in notice topic
%% {$configurable}
{mapping, "st_statistics.notice_qos", "emqx_st_statistics.notice_qos",
[
{default, 0},
{datatype, integer}
]}.
%% Maximum entities per notification message
%% {$configurable}
{mapping, "st_statistics.notice_batch_size", "emqx_st_statistics.notice_batch_size",
[
{default, 500},
{datatype, integer}
]}.

View File

@ -1,23 +0,0 @@
{deps, []}.
{edoc_opts, [{preprocess, true}]}.
{erl_opts, [warn_unused_vars,
warn_shadow_vars,
warn_unused_import,
warn_obsolete_guard
]}.
{xref_checks, [undefined_function_calls, undefined_functions,
locals_not_used, deprecated_function_calls,
warnings_as_errors, deprecated_functions]}.
{cover_enabled, true}.
{cover_opts, [verbose]}.
{cover_export_enabled, true}.
{profiles,
[{test,
[{deps,
[{emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.2.2"}}},
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3"}}}]}
]}
]}.

View File

@ -1,12 +0,0 @@
{application, emqx_st_statistics,
[{description, "EMQ X Slow Topics Statistics"},
{vsn, "1.0.0"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_st_statistics_sup]},
{applications, [kernel,stdlib]},
{mod, {emqx_st_statistics_app,[]}},
{env, []},
{licenses, ["Apache-2.0"]},
{maintainers, ["EMQ X Team <contact@emqx.io>"]},
{links, []}
]}.

View File

@ -1,37 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_st_statistics_app).
-behaviour(application).
-emqx_plugin(?MODULE).
-export([ start/2
, stop/1
, start/0
]).
start(_Type, _Args) ->
start().
start() ->
Conf = application:get_all_env(emqx_st_statistics),
{ok, Sup} = emqx_st_statistics_sup:start_link(Conf),
{ok, Sup}.
stop(_State) ->
ok.

View File

@ -1,35 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_st_statistics_sup).
-behaviour(supervisor).
-export([start_link/1]).
-export([init/1]).
start_link(Env) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, [Env]).
init([Env]) ->
{ok, {{one_for_one, 10, 3600},
[#{id => st_statistics,
start => {emqx_st_statistics, start_link, [Env]},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [emqx_st_statistics]}]}}.

View File

@ -2211,6 +2211,44 @@ module.presence.qos = 1
## module.rewrite.pub.rule.1 = x/# ^x/y/(.+)$ z/y/$1 ## module.rewrite.pub.rule.1 = x/# ^x/y/(.+)$ z/y/$1
## module.rewrite.sub.rule.1 = y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2 ## module.rewrite.sub.rule.1 = y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2
##--------------------------------------------------------------------
## Slow Topic Statistics Module
## Threshold time of slow topics statistics
##
## Default: 10 seconds
#module.st_statistics.threshold_time = 10S
## Time window of slow topics statistics
##
## Value: 5 minutes
#module.st_statistics.time_window = 5M
## Maximum of slow topics log, log will clear when enter new time window
##
## Value: 500
#module.st_statistics.max_log_num = 500
## Top-K record for slow topics, update from logs
##
## Value: 500
#module.st_statistics.top_k_num = 500
## Topic of notification
##
## Defaut: $slow_topics
#module.st_statistics.notice_topic = $slow_topics
## QoS of notification message in notice topic
##
## Defaut: 0
#module.st_statistics.notice_qos = 0
## Maximum information number in one notification
##
## Default: 500
#module.st_statistics.notice_batch_size = 500
## CONFIG_SECTION_END=modules ================================================== ## CONFIG_SECTION_END=modules ==================================================
##------------------------------------------------------------------- ##-------------------------------------------------------------------

View File

@ -1,47 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_st_statistics).
-behaviour(emqx_gen_mod).
-include_lib("emqx/include/logger.hrl").
-logger_header("[st_statistics]").
%% emqx_gen_mod callbacks
-export([ load/1
, unload/1
, description/0
]).
%%--------------------------------------------------------------------
%% Load/Unload
%%--------------------------------------------------------------------
-spec(load(list()) -> ok).
load(_Env) ->
_ = emqx_st_statistics_app:start(),
ok.
-spec(unload(list()) -> ok).
unload(_Env) ->
emqx_st_statistics_app:stop(undefined).
description() ->
"EMQ X Slow Topic Statistics Module".
%%--------------------------------------------------------------------

View File

@ -23,18 +23,22 @@
-export([ start_link/0 -export([ start_link/0
, start_child/1 , start_child/1
, start_child/2 , start_child/2
, start_child/3
, stop_child/1 , stop_child/1
]). ]).
-export([init/1]). -export([init/1]).
%% Helper macro for declaring children of supervisor %% Helper macro for declaring children of supervisor
-define(CHILD(Mod, Type), #{id => Mod, -define(CHILD(Mod, Type, Args),
start => {Mod, start_link, []}, #{id => Mod,
restart => permanent, start => {Mod, start_link, Args},
shutdown => 5000, restart => permanent,
type => Type, shutdown => 5000,
modules => [Mod]}). type => Type,
modules => [Mod]}).
-define(CHILD(MOD, Type), ?CHILD(MOD, Type, [])).
-spec(start_link() -> startlink_ret()). -spec(start_link() -> startlink_ret()).
start_link() -> start_link() ->
@ -48,6 +52,10 @@ start_child(ChildSpec) when is_map(ChildSpec) ->
start_child(Mod, Type) when is_atom(Mod) andalso is_atom(Type) -> start_child(Mod, Type) when is_atom(Mod) andalso is_atom(Type) ->
assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, Type))). assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, Type))).
-spec start_child(atom(), atom(), list(any())) -> ok.
start_child(Mod, Type, Args) when is_atom(Mod) andalso is_atom(Type) ->
assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, Type, Args))).
-spec(stop_child(any()) -> ok | {error, term()}). -spec(stop_child(any()) -> ok | {error, term()}).
stop_child(ChildId) -> stop_child(ChildId) ->
case supervisor:terminate_child(?MODULE, ChildId) of case supervisor:terminate_child(?MODULE, ChildId) of
@ -71,4 +79,3 @@ assert_started({ok, _Pid}) -> ok;
assert_started({ok, _Pid, _Info}) -> ok; assert_started({ok, _Pid, _Info}) -> ok;
assert_started({error, {already_tarted, _Pid}}) -> ok; assert_started({error, {already_tarted, _Pid}}) -> ok;
assert_started({error, Reason}) -> erlang:error(Reason). assert_started({error, Reason}) -> erlang:error(Reason).

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -14,13 +14,14 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_st_statistics). -module(emqx_mod_st_statistics).
-behaviour(emqx_gen_mod).
-behaviour(gen_server). -behaviour(gen_server).
-include_lib("include/emqx.hrl"). -include_lib("include/emqx.hrl").
-include_lib("include/logger.hrl"). -include_lib("include/logger.hrl").
-include_lib("stdlib/include/ms_transform.hrl"). -include("include/emqx_st_statistics.hrl").
-logger_header("[SLOW TOPICS]"). -logger_header("[SLOW TOPICS]").
@ -28,6 +29,12 @@
, disable/0, clear_history/0 , disable/0, clear_history/0
]). ]).
%% emqx_gen_mod callbacks
-export([ load/1
, unload/1
, description/0
]).
%% gen_server callbacks %% gen_server callbacks
-export([ init/1 -export([ init/1
, handle_call/3 , handle_call/3
@ -60,22 +67,14 @@
, elapsed :: pos_integer() , elapsed :: pos_integer()
}). }).
-record(top_k, { rank :: pos_integer()
, topic :: emqx_types:topic()
, average_count :: number()
, average_elapsed :: number()}).
-type message() :: #message{}. -type message() :: #message{}.
-import(proplists, [get_value/2]). -import(proplists, [get_value/2]).
-define(LOG_TAB, emqx_st_statistics_log).
-define(TOPK_TAB, emqx_st_statistics_topk).
-define(NOW, erlang:system_time(millisecond)). -define(NOW, erlang:system_time(millisecond)).
-define(QUOTA_IDX, 1). -define(QUOTA_IDX, 1).
-type slow_log() :: #slow_log{}. -type slow_log() :: #slow_log{}.
-type top_k() :: #top_k{}.
-type top_k_map() :: #{emqx_types:topic() => top_k()}. -type top_k_map() :: #{emqx_types:topic() => top_k()}.
-ifdef(TEST). -ifdef(TEST).
@ -90,9 +89,26 @@
%% ets ordered_set is ascending by term order %% ets ordered_set is ascending by term order
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% APIs %% Load/Unload
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec(load(list()) -> ok).
load(Env) ->
emqx_mod_sup:start_child(?MODULE, worker, [Env]),
ok.
-spec(unload(list()) -> ok).
unload(_Env) ->
_ = emqx_mod_sup:stop_child(?MODULE),
ok.
description() ->
"EMQ X Slow Topic Statistics Module".
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
%% @doc Start the st_statistics %% @doc Start the st_statistics
-spec(start_link(Env :: list()) -> emqx_types:startlink_ret()). -spec(start_link(Env :: list()) -> emqx_types:startlink_ret()).
start_link(Env) -> start_link(Env) ->

View File

@ -32,7 +32,7 @@
, get_history/2 , get_history/2
]). ]).
-include_lib("emqx_st_statistics/include/emqx_st_statistics.hrl"). -include("include/emqx_st_statistics.hrl").
-import(minirest, [return/1]). -import(minirest, [return/1]).
@ -41,7 +41,7 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
clear_history(_Bindings, _Params) -> clear_history(_Bindings, _Params) ->
ok = emqx_st_statistics:clear_history(), ok = emqx_mod_st_statistics:clear_history(),
return(ok). return(ok).
get_history(_Bindings, Params) -> get_history(_Bindings, Params) ->

View File

@ -31,17 +31,21 @@
all() -> emqx_ct:all(?MODULE). all() -> emqx_ct:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_ct_helpers:start_apps([emqx_st_statistics], fun set_special_cfg/1), emqx_ct_helpers:start_apps([emqx]),
Config. Config.
set_special_cfg(_) ->
application:set_env([{emqx_st_statistics, base_conf()}]),
ok.
end_per_suite(Config) -> end_per_suite(Config) ->
emqx_ct_helpers:stop_apps([emqx_st_statistics]), emqx_ct_helpers:stop_apps([emqx]),
Config. Config.
init_per_testcase(_, Config) ->
emqx_mod_st_statistics:load(base_conf()),
Config.
end_per_testcase(_, _) ->
emqx_mod_st_statistics:unload(undefined),
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Test Cases %% Test Cases
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -24,7 +24,7 @@
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx_management/include/emqx_mgmt.hrl"). -include_lib("emqx_management/include/emqx_mgmt.hrl").
-include_lib("emqx_st_statistics/include/emqx_st_statistics.hrl"). -include_lib("emqx_modules/include/emqx_st_statistics.hrl").
-define(CONTENT_TYPE, "application/x-www-form-urlencoded"). -define(CONTENT_TYPE, "application/x-www-form-urlencoded").
@ -39,17 +39,19 @@ all() ->
init_per_suite(Config) -> init_per_suite(Config) ->
application:load(emqx_modules), application:load(emqx_modules),
emqx_ct_helpers:start_apps([emqx_st_statistics, emqx_management]), emqx_ct_helpers:start_apps([emqx_management]),
Config. Config.
end_per_suite(Config) -> end_per_suite(Config) ->
emqx_ct_helpers:stop_apps([emqx_st_statistics, emqx_management]), emqx_ct_helpers:stop_apps([emqx_management]),
Config. Config.
init_per_testcase(_, Config) -> init_per_testcase(_, Config) ->
emqx_mod_st_statistics:load(emqx_st_statistics_SUITE:base_conf()),
Config. Config.
end_per_testcase(_, Config) -> end_per_testcase(_, Config) ->
emqx_mod_st_statistics:unload(undefined),
Config. Config.
get(Key, ResponseBody) -> get(Key, ResponseBody) ->

View File

@ -2188,6 +2188,42 @@ end}.
{datatype, string} {datatype, string}
]}. ]}.
{mapping, "module.st_statistics.threshold_time", "emqx.modules", [
{default, "10s"},
{datatype, {duration, ms}}
]}.
{mapping, "module.st_statistics.time_window", "emqx.modules", [
{default, "5M"},
{datatype, {duration, ms}}
]}.
{mapping, "module.st_statistics.max_log_num", "emqx.modules", [
{default, 500},
{datatype, integer}
]}.
{mapping, "module.st_statistics.top_k_num", "emqx.modules", [
{default, 500},
{datatype, integer}
]}.
{mapping, "module.st_statistics.notice_topic", "emqx.modules", [
{default, "$slow_topics"},
{datatype, string}
]}.
{mapping, "module.st_statistics.notice_qos", "emqx.modules", [
{default, 0},
{datatype, integer},
{validators, ["range:0-1"]}
]}.
{mapping, "module.st_statistics.notice_batch_size", "emqx.modules", [
{default, 500},
{datatype, integer}
]}.
{translation, "emqx.modules", fun(Conf, _, Conf1) -> {translation, "emqx.modules", fun(Conf, _, Conf1) ->
Subscriptions = fun() -> Subscriptions = fun() ->
List = cuttlefish_variable:filter_by_prefix("module.subscription", Conf), List = cuttlefish_variable:filter_by_prefix("module.subscription", Conf),
@ -2211,13 +2247,19 @@ end}.
{rewrite, list_to_atom(PubOrSub), list_to_binary(Topic), list_to_binary(Re), list_to_binary(Dest)} {rewrite, list_to_atom(PubOrSub), list_to_binary(Topic), list_to_binary(Re), list_to_binary(Dest)}
end, TotalRules) end, TotalRules)
end, end,
SlowTopic = fun() ->
List = cuttlefish_variable:filter_by_prefix("module.st_statistics", Conf),
[{erlang:list_to_atom(Key), Value} || {[_, _, Key], Value} <- List]
end,
lists:append([ lists:append([
[{emqx_mod_presence, [{qos, cuttlefish:conf_get("module.presence.qos", Conf, 1)}]}], [{emqx_mod_presence, [{qos, cuttlefish:conf_get("module.presence.qos", Conf, 1)}]}],
[{emqx_mod_subscription, Subscriptions()}], [{emqx_mod_subscription, Subscriptions()}],
[{emqx_mod_rewrite, Rewrites()}], [{emqx_mod_rewrite, Rewrites()}],
[{emqx_mod_topic_metrics, []}], [{emqx_mod_topic_metrics, []}],
[{emqx_mod_delayed, []}], [{emqx_mod_delayed, []}],
[{emqx_mod_st_statistics, []}], [{emqx_mod_st_statistics, SlowTopic()}],
[{emqx_mod_acl_internal, [{acl_file, cuttlefish:conf_get("acl_file", Conf1)}]}] [{emqx_mod_acl_internal, [{acl_file, cuttlefish:conf_get("acl_file", Conf1)}]}]
]) ])
end}. end}.

View File

@ -303,7 +303,6 @@ relx_plugin_apps(ReleaseType) ->
, emqx_recon , emqx_recon
, emqx_rule_engine , emqx_rule_engine
, emqx_sasl , emqx_sasl
, emqx_st_statistics
] ]
++ [emqx_telemetry || not is_enterprise()] ++ [emqx_telemetry || not is_enterprise()]
++ relx_plugin_apps_per_rel(ReleaseType) ++ relx_plugin_apps_per_rel(ReleaseType)