From 93c210887b94a89cda48bec860966ae9333dd9d9 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 30 Sep 2021 15:21:00 +0800 Subject: [PATCH] chore: don't use emqx_machine.hrl in emqx --- apps/emqx/include/emqx.hrl | 12 ++++++- apps/emqx/src/emqx_app.erl | 12 +++---- .../emqx_machine/include/emqx_cluster_rpc.hrl | 35 ------------------- apps/emqx_machine/include/emqx_machine.hrl | 27 ++++++++------ apps/emqx_machine/src/emqx_cluster_rpc.erl | 15 ++++---- .../src/emqx_cluster_rpc_handler.erl | 3 +- apps/emqx_machine/src/emqx_machine.erl | 7 ++-- .../test/emqx_cluster_rpc_SUITE.erl | 3 +- 8 files changed, 45 insertions(+), 69 deletions(-) delete mode 100644 apps/emqx_machine/include/emqx_cluster_rpc.hrl diff --git a/apps/emqx/include/emqx.hrl b/apps/emqx/include/emqx.hrl index 29350a34a..6afe10c29 100644 --- a/apps/emqx/include/emqx.hrl +++ b/apps/emqx/include/emqx.hrl @@ -17,7 +17,17 @@ -ifndef(EMQ_X_HRL). -define(EMQ_X_HRL, true). --include_lib("emqx_machine/include/emqx_machine.hrl"). +%% Shard +%%-------------------------------------------------------------------- +-define(COMMON_SHARD, emqx_common_shard). +-define(SHARED_SUB_SHARD, emqx_shared_sub_shard). +-define(CM_SHARD, emqx_cm_shard). +-define(ROUTE_SHARD, route_shard). + +-define(BOOT_SHARDS, [ ?ROUTE_SHARD + , ?COMMON_SHARD + , ?SHARED_SUB_SHARD + ]). %% Banner %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_app.erl b/apps/emqx/src/emqx_app.erl index d5ea0d24a..cd435b864 100644 --- a/apps/emqx/src/emqx_app.erl +++ b/apps/emqx/src/emqx_app.erl @@ -29,7 +29,6 @@ ]). -include("emqx.hrl"). --include_lib("emqx_machine/include/emqx_machine.hrl"). -include("emqx_release.hrl"). -include("logger.hrl"). @@ -42,7 +41,7 @@ start(_Type, _Args) -> ok = maybe_load_config(), ok = maybe_start_quicer(), - start_ekka(), + ensure_ekka_started(), {ok, Sup} = emqx_sup:start_link(), ok = maybe_start_listeners(), ok = emqx_alarm_handler:load(), @@ -56,6 +55,10 @@ prep_stop(_State) -> stop(_State) -> ok. +ensure_ekka_started() -> + ekka:start(), + ok = ekka_rlog:wait_for_shards(?BOOT_SHARDS, infinity). + %% @doc Call this function to make emqx boot without loading config, %% in case we want to delegate the config load to a higher level app %% which manages emqx app. @@ -81,11 +84,6 @@ maybe_load_config() -> ConfFiles = application:get_env(emqx, config_files, []), emqx_config:init_load(emqx_schema, ConfFiles) end. -%% @doc This API is mostly for testing -%% we already start ekka in emqx_machine -start_ekka() -> - ekka:start(), - ok = ekka_rlog:wait_for_shards(?BOOT_SHARDS, infinity). maybe_start_listeners() -> case emqx_boot:is_enabled(listeners) of diff --git a/apps/emqx_machine/include/emqx_cluster_rpc.hrl b/apps/emqx_machine/include/emqx_cluster_rpc.hrl deleted file mode 100644 index 046331871..000000000 --- a/apps/emqx_machine/include/emqx_cluster_rpc.hrl +++ /dev/null @@ -1,35 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --ifndef(EMQ_X_CLUSTER_RPC_HRL). --define(EMQ_X_CLUSTER_RPC_HRL, true). - --define(CLUSTER_MFA, cluster_rpc_mfa). --define(CLUSTER_COMMIT, cluster_rpc_commit). - --record(cluster_rpc_mfa, { - tnx_id :: pos_integer(), - mfa :: mfa(), - created_at :: calendar:datetime(), - initiator :: node() -}). - --record(cluster_rpc_commit, { - node :: node(), - tnx_id :: pos_integer() | '$1' -}). - --endif. diff --git a/apps/emqx_machine/include/emqx_machine.hrl b/apps/emqx_machine/include/emqx_machine.hrl index 4f270c67a..cea62c5c3 100644 --- a/apps/emqx_machine/include/emqx_machine.hrl +++ b/apps/emqx_machine/include/emqx_machine.hrl @@ -14,17 +14,24 @@ %% limitations under the License. %%-------------------------------------------------------------------- --ifndef(EMQ_MACHINE_X_HRL). --define(EMQ_MACHINE_X_HRL, true). +-ifndef(EMQ_X_CLUSTER_RPC_HRL). +-define(EMQ_X_CLUSTER_RPC_HRL, true). --define(COMMON_SHARD, emqx_common_shard). --define(SHARED_SUB_SHARD, emqx_shared_sub_shard). --define(CM_SHARD, emqx_cm_shard). --define(ROUTE_SHARD, route_shard). +-define(CLUSTER_MFA, cluster_rpc_mfa). +-define(CLUSTER_COMMIT, cluster_rpc_commit). --define(BOOT_SHARDS, [ ?ROUTE_SHARD - , ?COMMON_SHARD - , ?SHARED_SUB_SHARD - ]). +-define(EMQX_MACHINE_SHARD, emqx_machine_shard). + +-record(cluster_rpc_mfa, { + tnx_id :: pos_integer(), + mfa :: mfa(), + created_at :: calendar:datetime(), + initiator :: node() +}). + +-record(cluster_rpc_commit, { + node :: node(), + tnx_id :: pos_integer() | '$1' +}). -endif. diff --git a/apps/emqx_machine/src/emqx_cluster_rpc.erl b/apps/emqx_machine/src/emqx_cluster_rpc.erl index b32812f5b..66616f3ea 100644 --- a/apps/emqx_machine/src/emqx_cluster_rpc.erl +++ b/apps/emqx_machine/src/emqx_cluster_rpc.erl @@ -32,12 +32,11 @@ -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). --include("emqx_machine.hrl"). -include_lib("emqx/include/logger.hrl"). --include("emqx_cluster_rpc.hrl"). +-include("emqx_machine.hrl"). --rlog_shard({?COMMON_SHARD, ?CLUSTER_MFA}). --rlog_shard({?COMMON_SHARD, ?CLUSTER_COMMIT}). +-rlog_shard({?EMQX_MACHINE_SHARD, ?CLUSTER_MFA}). +-rlog_shard({?EMQX_MACHINE_SHARD, ?CLUSTER_COMMIT}). -define(CATCH_UP, catch_up). -define(TIMEOUT, timer:minutes(1)). @@ -48,13 +47,13 @@ mnesia(boot) -> ok = ekka_mnesia:create_table(?CLUSTER_MFA, [ {type, ordered_set}, - {rlog_shard, ?COMMON_SHARD}, + {rlog_shard, ?EMQX_MACHINE_SHARD}, {disc_copies, [node()]}, {record_name, cluster_rpc_mfa}, {attributes, record_info(fields, cluster_rpc_mfa)}]), ok = ekka_mnesia:create_table(?CLUSTER_COMMIT, [ {type, set}, - {rlog_shard, ?COMMON_SHARD}, + {rlog_shard, ?EMQX_MACHINE_SHARD}, {disc_copies, [node()]}, {record_name, cluster_rpc_commit}, {attributes, record_info(fields, cluster_rpc_commit)}]); @@ -95,7 +94,7 @@ multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNu %% the initiate transaction must happened on core node %% make sure MFA(in the transaction) and the transaction on the same node %% don't need rpc again inside transaction. - case ekka_rlog_status:upstream_node(?COMMON_SHARD) of + case ekka_rlog_status:upstream_node(?EMQX_MACHINE_SHARD) of {ok, Node} -> gen_server:call({?MODULE, Node}, MFA, Timeout); disconnected -> {error, disconnected} end @@ -281,7 +280,7 @@ do_catch_up_in_one_trans(LatestId, Node) -> end. transaction(Func, Args) -> - ekka_mnesia:transaction(?COMMON_SHARD, Func, Args). + ekka_mnesia:transaction(?EMQX_MACHINE_SHARD, Func, Args). trans_status() -> mnesia:foldl(fun(Rec, Acc) -> diff --git a/apps/emqx_machine/src/emqx_cluster_rpc_handler.erl b/apps/emqx_machine/src/emqx_cluster_rpc_handler.erl index e5b692272..6dcbd3d25 100644 --- a/apps/emqx_machine/src/emqx_cluster_rpc_handler.erl +++ b/apps/emqx_machine/src/emqx_cluster_rpc_handler.erl @@ -18,7 +18,6 @@ -behaviour(gen_server). -include_lib("emqx/include/logger.hrl"). --include("emqx_cluster_rpc.hrl"). -include("emqx_machine.hrl"). -export([start_link/0, start_link/2]). @@ -50,7 +49,7 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info({timeout, TRef, del_stale_mfa}, State = #{timer := TRef, max_history := MaxHistory}) -> - case ekka_mnesia:transaction(?COMMON_SHARD, fun del_stale_mfa/1, [MaxHistory]) of + case ekka_mnesia:transaction(?EMQX_MACHINE_SHARD, fun del_stale_mfa/1, [MaxHistory]) of {atomic, ok} -> ok; Error -> ?LOG(error, "del_stale_cluster_rpc_mfa error:~p", [Error]) end, diff --git a/apps/emqx_machine/src/emqx_machine.erl b/apps/emqx_machine/src/emqx_machine.erl index b7969056f..7b8f26031 100644 --- a/apps/emqx_machine/src/emqx_machine.erl +++ b/apps/emqx_machine/src/emqx_machine.erl @@ -22,8 +22,7 @@ ]). -include_lib("emqx/include/logger.hrl"). --include_lib("emqx/include/emqx.hrl"). --include("emqx_machine.hrl"). +-include("emqx_cluster_rpc.hrl"). %% @doc EMQ X boot entrypoint. start() -> @@ -35,11 +34,9 @@ start() -> end, ok = set_backtrace_depth(), ok = print_otp_version_warning(), - ok = load_config_files(), - %% Load application first for ekka_mnesia scanner ekka:start(), - ok = ekka_rlog:wait_for_shards(?BOOT_SHARDS, infinity), + ekka_rlog:wait_for_shards([?EMQX_MACHINE_SHARD], infinity), ok. graceful_shutdown() -> diff --git a/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl index b1530afd2..4e3b2d2c2 100644 --- a/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl @@ -20,6 +20,7 @@ -compile(nowarn_export_all). -include_lib("emqx/include/emqx.hrl"). +-include("emqx_machine.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -define(NODE1, emqx_cluster_rpc). @@ -42,7 +43,7 @@ init_per_suite(Config) -> application:load(emqx), application:load(emqx_machine), ok = ekka:start(), - ok = ekka_rlog:wait_for_shards([emqx_common_shard], infinity), + ok = ekka_rlog:wait_for_shards([?EMQX_MACHINE_SHARD], infinity), application:set_env(emqx_machine, cluster_call_max_history, 100), application:set_env(emqx_machine, cluster_call_clean_interval, 1000), application:set_env(emqx_machine, cluster_call_retry_interval, 900),