diff --git a/apps/emqx/src/emqx_kernel_sup.erl b/apps/emqx/src/emqx_kernel_sup.erl index 39a8d4fba..defe96182 100644 --- a/apps/emqx/src/emqx_kernel_sup.erl +++ b/apps/emqx/src/emqx_kernel_sup.erl @@ -29,8 +29,6 @@ init([]) -> {ok, {{one_for_one, 10, 100}, %% always start emqx_config_handler first to load the emqx.conf to emqx_config [ child_spec(emqx_config_handler, worker) - , child_spec(emqx_cluster_rpc, worker) - , child_spec(emqx_cluster_rpc_handler, worker) , child_spec(emqx_pool_sup, supervisor) , child_spec(emqx_hooks, worker) , child_spec(emqx_stats, worker) diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 3fb986f1e..3bbeb1d07 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -293,7 +293,6 @@ fields("broker") -> , {"shared_dispatch_ack_enabled", t(boolean(), undefined, false)} , {"route_batch_clean", t(boolean(), undefined, true)} , {"perf", ref("perf")} - , {"hot_config_loader", ref("hot_config_loader")} ]; fields("perf") -> @@ -326,12 +325,6 @@ fields("sysmon_os") -> , {"procmem_high_watermark", t(percent(), undefined, "5%")} ]; -fields("hot_config_loader") -> - [{"retry_interval", t(duration(), undefined, "1s")} - , {"mfa_max_history", t(range(1, 500), undefined, 50)} - , {"mfa_cleanup_interval", t(duration(), undefined, "5m")} - ]; - fields("alarm") -> [ {"actions", t(hoconsc:array(atom()), undefined, [log, publish])} , {"size_limit", t(integer(), undefined, 1000)} diff --git a/apps/emqx_machine/etc/emqx_machine.conf b/apps/emqx_machine/etc/emqx_machine.conf index d21a19bc2..3ec09f2d4 100644 --- a/apps/emqx_machine/etc/emqx_machine.conf +++ b/apps/emqx_machine/etc/emqx_machine.conf @@ -89,6 +89,29 @@ node { ## Default: 23 backtrace_depth = 23 + cluster_call { + ## Time interval to retry after a failed call + ## + ## @doc node.cluster_call.retry_interval + ## ValueType: Duration + ## Default: 1s + retry_interval = 1s + ## Retain the maximum number of completed transactions (for queries) + ## + ## @doc node.cluster_call.max_history + ## ValueType: Integer + ## Range: [1, 500] + ## Default: 100 + max_history = 100 + ## Time interval to clear completed but stale transactions. + ## Ensure that the number of completed transactions is less than the max_history + ## + ## @doc node.cluster_call.cleanup_interval + ## ValueType: Duration + ## Default: 5m + cleanup_interval = 5m + } + } ##================================================================== diff --git a/apps/emqx/include/emqx_cluster_rpc.hrl b/apps/emqx_machine/include/emqx_cluster_rpc.hrl similarity index 100% rename from apps/emqx/include/emqx_cluster_rpc.hrl rename to apps/emqx_machine/include/emqx_cluster_rpc.hrl diff --git a/apps/emqx/src/emqx_cluster_rpc.erl b/apps/emqx_machine/src/emqx_cluster_rpc.erl similarity index 97% rename from apps/emqx/src/emqx_cluster_rpc.erl rename to apps/emqx_machine/src/emqx_cluster_rpc.erl index 5ddfe5742..b988d55fb 100644 --- a/apps/emqx/src/emqx_cluster_rpc.erl +++ b/apps/emqx_machine/src/emqx_cluster_rpc.erl @@ -32,8 +32,8 @@ -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). --include("emqx.hrl"). --include("logger.hrl"). +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). -include("emqx_cluster_rpc.hrl"). -rlog_shard({?COMMON_SHARD, ?CLUSTER_MFA}). @@ -64,11 +64,17 @@ mnesia(copy) -> ok = ekka_mnesia:copy_table(cluster_rpc_commit, disc_copies). start_link() -> - RetryMs = emqx:get_config([broker, hot_config_loader, retry_interval]), + RetryMs = application:get_env(emqx_machine, cluster_call_retry_interval, 1000), start_link(node(), ?MODULE, RetryMs). start_link(Node, Name, RetryMs) -> gen_statem:start_link({local, Name}, ?MODULE, [Node, RetryMs], []). +-spec multicall(Module, Function, Args) -> {ok, TnxId} | {error, Reason} when + Module :: module(), + Function :: atom(), + Args :: [term()], + TnxId :: pos_integer(), + Reason :: term(). multicall(M, F, A) -> multicall(M, F, A, timer:minutes(2)). diff --git a/apps/emqx/src/emqx_cluster_rpc_handler.erl b/apps/emqx_machine/src/emqx_cluster_rpc_handler.erl similarity index 92% rename from apps/emqx/src/emqx_cluster_rpc_handler.erl rename to apps/emqx_machine/src/emqx_cluster_rpc_handler.erl index 4da219165..803b7f9fc 100644 --- a/apps/emqx/src/emqx_cluster_rpc_handler.erl +++ b/apps/emqx_machine/src/emqx_cluster_rpc_handler.erl @@ -17,19 +17,17 @@ -behaviour(gen_server). --include("emqx.hrl"). --include("logger.hrl"). +-include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/logger.hrl"). -include("emqx_cluster_rpc.hrl"). -export([start_link/0, start_link/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --define(MFA_HISTORY_LEN, 100). - start_link() -> - MaxHistory = emqx:get_config([broker, hot_config_loader, mfa_max_history]), - CleanupMs = emqx:get_config([broker, hot_config_loader, mfa_cleanup_interval]), + MaxHistory = application:get_env(emqx_machine, cluster_call_max_history, 100), + CleanupMs = application:get_env(emqx_machine, cluster_call_cleanup_interval, 5*60*1000), start_link(MaxHistory, CleanupMs). start_link(MaxHistory, CleanupMs) -> diff --git a/apps/emqx_machine/src/emqx_machine_schema.erl b/apps/emqx_machine/src/emqx_machine_schema.erl index 7dd193e63..2dde8aae3 100644 --- a/apps/emqx_machine/src/emqx_machine_schema.erl +++ b/apps/emqx_machine/src/emqx_machine_schema.erl @@ -139,6 +139,14 @@ fields("node") -> , {"dist_listen_min", t(range(1024, 65535), "kernel.inet_dist_listen_min", 6369)} , {"dist_listen_max", t(range(1024, 65535), "kernel.inet_dist_listen_max", 6369)} , {"backtrace_depth", t(integer(), "emqx_machine.backtrace_depth", 23)} + , {"cluster_call", ref("cluster_call")} + ]; + + +fields("cluster_call") -> + [ {"retry_interval", t(emqx_schema:duration(), "emqx_machine.retry_interval", "1s")} + , {"max_history", t(range(1, 500), "emqx_machine.max_history", 100)} + , {"cleanup_interval", t(emqx_schema:duration(), "emqx_machine.cleanup_interval", "5m")} ]; fields("rpc") -> diff --git a/apps/emqx_machine/src/emqx_machine_sup.erl b/apps/emqx_machine/src/emqx_machine_sup.erl index 0810eb267..798beee1c 100644 --- a/apps/emqx_machine/src/emqx_machine_sup.erl +++ b/apps/emqx_machine/src/emqx_machine_sup.erl @@ -31,7 +31,9 @@ start_link() -> init([]) -> GlobalGC = child_worker(emqx_global_gc, [], permanent), Terminator = child_worker(emqx_machine_terminator, [], transient), - Children = [GlobalGC, Terminator], + ClusterRpc = child_worker(emqx_cluster_rpc, [], permanent), + ClusterHandler = child_worker(emqx_cluster_rpc_handler, [], permanent), + Children = [GlobalGC, Terminator, ClusterRpc, ClusterHandler], SupFlags = #{strategy => one_for_one, intensity => 100, period => 10 diff --git a/apps/emqx/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl similarity index 96% rename from apps/emqx/test/emqx_cluster_rpc_SUITE.erl rename to apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl index b03137796..92a89790a 100644 --- a/apps/emqx/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl @@ -39,13 +39,12 @@ groups() -> []. init_per_suite(Config) -> application:load(emqx), + application:load(emqx_machine), ok = ekka:start(), - emqx_cluster_rpc:mnesia(copy), - emqx_config:put([broker, hot_config_loader], #{ - mfa_max_history => 100, - mfa_cleanup_interval => 1000, - retry_interval => 900 - }), + ok = ekka_rlog:wait_for_shards([emqx_common_shard], infinity), + application:set_env(emqx_machine, cluster_call_max_history, 100), + application:set_env(emqx_machine, cluster_call_clean_interval, 1000), + application:set_env(emqx_machine, cluster_call_retry_interval, 900), %%dbg:tracer(), %%dbg:p(all, c), %%dbg:tpl(emqx_cluster_rpc, cx),