feat: move cluster_call to emqx_machine

This commit is contained in:
zhongwencool 2021-08-24 10:56:11 +08:00
parent 2c1b1fbfa8
commit 60c1c4edba
9 changed files with 52 additions and 25 deletions

View File

@ -29,8 +29,6 @@ init([]) ->
{ok, {{one_for_one, 10, 100}, {ok, {{one_for_one, 10, 100},
%% always start emqx_config_handler first to load the emqx.conf to emqx_config %% always start emqx_config_handler first to load the emqx.conf to emqx_config
[ child_spec(emqx_config_handler, worker) [ child_spec(emqx_config_handler, worker)
, child_spec(emqx_cluster_rpc, worker)
, child_spec(emqx_cluster_rpc_handler, worker)
, child_spec(emqx_pool_sup, supervisor) , child_spec(emqx_pool_sup, supervisor)
, child_spec(emqx_hooks, worker) , child_spec(emqx_hooks, worker)
, child_spec(emqx_stats, worker) , child_spec(emqx_stats, worker)

View File

@ -293,7 +293,6 @@ fields("broker") ->
, {"shared_dispatch_ack_enabled", t(boolean(), undefined, false)} , {"shared_dispatch_ack_enabled", t(boolean(), undefined, false)}
, {"route_batch_clean", t(boolean(), undefined, true)} , {"route_batch_clean", t(boolean(), undefined, true)}
, {"perf", ref("perf")} , {"perf", ref("perf")}
, {"hot_config_loader", ref("hot_config_loader")}
]; ];
fields("perf") -> fields("perf") ->
@ -326,12 +325,6 @@ fields("sysmon_os") ->
, {"procmem_high_watermark", t(percent(), undefined, "5%")} , {"procmem_high_watermark", t(percent(), undefined, "5%")}
]; ];
fields("hot_config_loader") ->
[{"retry_interval", t(duration(), undefined, "1s")}
, {"mfa_max_history", t(range(1, 500), undefined, 50)}
, {"mfa_cleanup_interval", t(duration(), undefined, "5m")}
];
fields("alarm") -> fields("alarm") ->
[ {"actions", t(hoconsc:array(atom()), undefined, [log, publish])} [ {"actions", t(hoconsc:array(atom()), undefined, [log, publish])}
, {"size_limit", t(integer(), undefined, 1000)} , {"size_limit", t(integer(), undefined, 1000)}

View File

@ -89,6 +89,29 @@ node {
## Default: 23 ## Default: 23
backtrace_depth = 23 backtrace_depth = 23
cluster_call {
## Time interval to retry after a failed call
##
## @doc node.cluster_call.retry_interval
## ValueType: Duration
## Default: 1s
retry_interval = 1s
## Retain the maximum number of completed transactions (for queries)
##
## @doc node.cluster_call.max_history
## ValueType: Integer
## Range: [1, 500]
## Default: 100
max_history = 100
## Time interval to clear completed but stale transactions.
## Ensure that the number of completed transactions is less than the max_history
##
## @doc node.cluster_call.cleanup_interval
## ValueType: Duration
## Default: 5m
cleanup_interval = 5m
}
} }
##================================================================== ##==================================================================

View File

@ -32,8 +32,8 @@
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}). -copy_mnesia({mnesia, [copy]}).
-include("emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include("logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include("emqx_cluster_rpc.hrl"). -include("emqx_cluster_rpc.hrl").
-rlog_shard({?COMMON_SHARD, ?CLUSTER_MFA}). -rlog_shard({?COMMON_SHARD, ?CLUSTER_MFA}).
@ -64,11 +64,17 @@ mnesia(copy) ->
ok = ekka_mnesia:copy_table(cluster_rpc_commit, disc_copies). ok = ekka_mnesia:copy_table(cluster_rpc_commit, disc_copies).
start_link() -> start_link() ->
RetryMs = emqx:get_config([broker, hot_config_loader, retry_interval]), RetryMs = application:get_env(emqx_machine, cluster_call_retry_interval, 1000),
start_link(node(), ?MODULE, RetryMs). start_link(node(), ?MODULE, RetryMs).
start_link(Node, Name, RetryMs) -> start_link(Node, Name, RetryMs) ->
gen_statem:start_link({local, Name}, ?MODULE, [Node, RetryMs], []). gen_statem:start_link({local, Name}, ?MODULE, [Node, RetryMs], []).
-spec multicall(Module, Function, Args) -> {ok, TnxId} | {error, Reason} when
Module :: module(),
Function :: atom(),
Args :: [term()],
TnxId :: pos_integer(),
Reason :: term().
multicall(M, F, A) -> multicall(M, F, A) ->
multicall(M, F, A, timer:minutes(2)). multicall(M, F, A, timer:minutes(2)).

View File

@ -17,19 +17,17 @@
-behaviour(gen_server). -behaviour(gen_server).
-include("emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include("logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include("emqx_cluster_rpc.hrl"). -include("emqx_cluster_rpc.hrl").
-export([start_link/0, start_link/2]). -export([start_link/0, start_link/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]). code_change/3]).
-define(MFA_HISTORY_LEN, 100).
start_link() -> start_link() ->
MaxHistory = emqx:get_config([broker, hot_config_loader, mfa_max_history]), MaxHistory = application:get_env(emqx_machine, cluster_call_max_history, 100),
CleanupMs = emqx:get_config([broker, hot_config_loader, mfa_cleanup_interval]), CleanupMs = application:get_env(emqx_machine, cluster_call_cleanup_interval, 5*60*1000),
start_link(MaxHistory, CleanupMs). start_link(MaxHistory, CleanupMs).
start_link(MaxHistory, CleanupMs) -> start_link(MaxHistory, CleanupMs) ->

View File

@ -139,6 +139,14 @@ fields("node") ->
, {"dist_listen_min", t(range(1024, 65535), "kernel.inet_dist_listen_min", 6369)} , {"dist_listen_min", t(range(1024, 65535), "kernel.inet_dist_listen_min", 6369)}
, {"dist_listen_max", t(range(1024, 65535), "kernel.inet_dist_listen_max", 6369)} , {"dist_listen_max", t(range(1024, 65535), "kernel.inet_dist_listen_max", 6369)}
, {"backtrace_depth", t(integer(), "emqx_machine.backtrace_depth", 23)} , {"backtrace_depth", t(integer(), "emqx_machine.backtrace_depth", 23)}
, {"cluster_call", ref("cluster_call")}
];
fields("cluster_call") ->
[ {"retry_interval", t(emqx_schema:duration(), "emqx_machine.retry_interval", "1s")}
, {"max_history", t(range(1, 500), "emqx_machine.max_history", 100)}
, {"cleanup_interval", t(emqx_schema:duration(), "emqx_machine.cleanup_interval", "5m")}
]; ];
fields("rpc") -> fields("rpc") ->

View File

@ -31,7 +31,9 @@ start_link() ->
init([]) -> init([]) ->
GlobalGC = child_worker(emqx_global_gc, [], permanent), GlobalGC = child_worker(emqx_global_gc, [], permanent),
Terminator = child_worker(emqx_machine_terminator, [], transient), Terminator = child_worker(emqx_machine_terminator, [], transient),
Children = [GlobalGC, Terminator], ClusterRpc = child_worker(emqx_cluster_rpc, [], permanent),
ClusterHandler = child_worker(emqx_cluster_rpc_handler, [], permanent),
Children = [GlobalGC, Terminator, ClusterRpc, ClusterHandler],
SupFlags = #{strategy => one_for_one, SupFlags = #{strategy => one_for_one,
intensity => 100, intensity => 100,
period => 10 period => 10

View File

@ -39,13 +39,12 @@ groups() -> [].
init_per_suite(Config) -> init_per_suite(Config) ->
application:load(emqx), application:load(emqx),
application:load(emqx_machine),
ok = ekka:start(), ok = ekka:start(),
emqx_cluster_rpc:mnesia(copy), ok = ekka_rlog:wait_for_shards([emqx_common_shard], infinity),
emqx_config:put([broker, hot_config_loader], #{ application:set_env(emqx_machine, cluster_call_max_history, 100),
mfa_max_history => 100, application:set_env(emqx_machine, cluster_call_clean_interval, 1000),
mfa_cleanup_interval => 1000, application:set_env(emqx_machine, cluster_call_retry_interval, 900),
retry_interval => 900
}),
%%dbg:tracer(), %%dbg:tracer(),
%%dbg:p(all, c), %%dbg:p(all, c),
%%dbg:tpl(emqx_cluster_rpc, cx), %%dbg:tpl(emqx_cluster_rpc, cx),