chore: don't use emqx_machine.hrl in emqx

This commit is contained in:
zhongwencool 2021-09-30 15:21:00 +08:00 committed by Rory Z
parent 44a6f04a45
commit 93c210887b
8 changed files with 45 additions and 69 deletions

View File

@ -17,7 +17,17 @@
-ifndef(EMQ_X_HRL).
-define(EMQ_X_HRL, true).
-include_lib("emqx_machine/include/emqx_machine.hrl").
%% Shard
%%--------------------------------------------------------------------
-define(COMMON_SHARD, emqx_common_shard).
-define(SHARED_SUB_SHARD, emqx_shared_sub_shard).
-define(CM_SHARD, emqx_cm_shard).
-define(ROUTE_SHARD, route_shard).
-define(BOOT_SHARDS, [ ?ROUTE_SHARD
, ?COMMON_SHARD
, ?SHARED_SUB_SHARD
]).
%% Banner
%%--------------------------------------------------------------------

View File

@ -29,7 +29,6 @@
]).
-include("emqx.hrl").
-include_lib("emqx_machine/include/emqx_machine.hrl").
-include("emqx_release.hrl").
-include("logger.hrl").
@ -42,7 +41,7 @@
start(_Type, _Args) ->
ok = maybe_load_config(),
ok = maybe_start_quicer(),
start_ekka(),
ensure_ekka_started(),
{ok, Sup} = emqx_sup:start_link(),
ok = maybe_start_listeners(),
ok = emqx_alarm_handler:load(),
@ -56,6 +55,10 @@ prep_stop(_State) ->
stop(_State) -> ok.
ensure_ekka_started() ->
ekka:start(),
ok = ekka_rlog:wait_for_shards(?BOOT_SHARDS, infinity).
%% @doc Call this function to make emqx boot without loading config,
%% in case we want to delegate the config load to a higher level app
%% which manages emqx app.
@ -81,11 +84,6 @@ maybe_load_config() ->
ConfFiles = application:get_env(emqx, config_files, []),
emqx_config:init_load(emqx_schema, ConfFiles)
end.
%% @doc This API is mostly for testing
%% we already start ekka in emqx_machine
start_ekka() ->
ekka:start(),
ok = ekka_rlog:wait_for_shards(?BOOT_SHARDS, infinity).
maybe_start_listeners() ->
case emqx_boot:is_enabled(listeners) of

View File

@ -1,35 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-ifndef(EMQ_X_CLUSTER_RPC_HRL).
-define(EMQ_X_CLUSTER_RPC_HRL, true).
-define(CLUSTER_MFA, cluster_rpc_mfa).
-define(CLUSTER_COMMIT, cluster_rpc_commit).
-record(cluster_rpc_mfa, {
tnx_id :: pos_integer(),
mfa :: mfa(),
created_at :: calendar:datetime(),
initiator :: node()
}).
-record(cluster_rpc_commit, {
node :: node(),
tnx_id :: pos_integer() | '$1'
}).
-endif.

View File

@ -14,17 +14,24 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-ifndef(EMQ_MACHINE_X_HRL).
-define(EMQ_MACHINE_X_HRL, true).
-ifndef(EMQ_X_CLUSTER_RPC_HRL).
-define(EMQ_X_CLUSTER_RPC_HRL, true).
-define(COMMON_SHARD, emqx_common_shard).
-define(SHARED_SUB_SHARD, emqx_shared_sub_shard).
-define(CM_SHARD, emqx_cm_shard).
-define(ROUTE_SHARD, route_shard).
-define(CLUSTER_MFA, cluster_rpc_mfa).
-define(CLUSTER_COMMIT, cluster_rpc_commit).
-define(BOOT_SHARDS, [ ?ROUTE_SHARD
, ?COMMON_SHARD
, ?SHARED_SUB_SHARD
]).
-define(EMQX_MACHINE_SHARD, emqx_machine_shard).
-record(cluster_rpc_mfa, {
tnx_id :: pos_integer(),
mfa :: mfa(),
created_at :: calendar:datetime(),
initiator :: node()
}).
-record(cluster_rpc_commit, {
node :: node(),
tnx_id :: pos_integer() | '$1'
}).
-endif.

View File

@ -32,12 +32,11 @@
-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
-include("emqx_machine.hrl").
-include_lib("emqx/include/logger.hrl").
-include("emqx_cluster_rpc.hrl").
-include("emqx_machine.hrl").
-rlog_shard({?COMMON_SHARD, ?CLUSTER_MFA}).
-rlog_shard({?COMMON_SHARD, ?CLUSTER_COMMIT}).
-rlog_shard({?EMQX_MACHINE_SHARD, ?CLUSTER_MFA}).
-rlog_shard({?EMQX_MACHINE_SHARD, ?CLUSTER_COMMIT}).
-define(CATCH_UP, catch_up).
-define(TIMEOUT, timer:minutes(1)).
@ -48,13 +47,13 @@
mnesia(boot) ->
ok = ekka_mnesia:create_table(?CLUSTER_MFA, [
{type, ordered_set},
{rlog_shard, ?COMMON_SHARD},
{rlog_shard, ?EMQX_MACHINE_SHARD},
{disc_copies, [node()]},
{record_name, cluster_rpc_mfa},
{attributes, record_info(fields, cluster_rpc_mfa)}]),
ok = ekka_mnesia:create_table(?CLUSTER_COMMIT, [
{type, set},
{rlog_shard, ?COMMON_SHARD},
{rlog_shard, ?EMQX_MACHINE_SHARD},
{disc_copies, [node()]},
{record_name, cluster_rpc_commit},
{attributes, record_info(fields, cluster_rpc_commit)}]);
@ -95,7 +94,7 @@ multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNu
%% the initiate transaction must happened on core node
%% make sure MFA(in the transaction) and the transaction on the same node
%% don't need rpc again inside transaction.
case ekka_rlog_status:upstream_node(?COMMON_SHARD) of
case ekka_rlog_status:upstream_node(?EMQX_MACHINE_SHARD) of
{ok, Node} -> gen_server:call({?MODULE, Node}, MFA, Timeout);
disconnected -> {error, disconnected}
end
@ -281,7 +280,7 @@ do_catch_up_in_one_trans(LatestId, Node) ->
end.
transaction(Func, Args) ->
ekka_mnesia:transaction(?COMMON_SHARD, Func, Args).
ekka_mnesia:transaction(?EMQX_MACHINE_SHARD, Func, Args).
trans_status() ->
mnesia:foldl(fun(Rec, Acc) ->

View File

@ -18,7 +18,6 @@
-behaviour(gen_server).
-include_lib("emqx/include/logger.hrl").
-include("emqx_cluster_rpc.hrl").
-include("emqx_machine.hrl").
-export([start_link/0, start_link/2]).
@ -50,7 +49,7 @@ handle_cast(Msg, State) ->
{noreply, State}.
handle_info({timeout, TRef, del_stale_mfa}, State = #{timer := TRef, max_history := MaxHistory}) ->
case ekka_mnesia:transaction(?COMMON_SHARD, fun del_stale_mfa/1, [MaxHistory]) of
case ekka_mnesia:transaction(?EMQX_MACHINE_SHARD, fun del_stale_mfa/1, [MaxHistory]) of
{atomic, ok} -> ok;
Error -> ?LOG(error, "del_stale_cluster_rpc_mfa error:~p", [Error])
end,

View File

@ -22,8 +22,7 @@
]).
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx.hrl").
-include("emqx_machine.hrl").
-include("emqx_cluster_rpc.hrl").
%% @doc EMQ X boot entrypoint.
start() ->
@ -35,11 +34,9 @@ start() ->
end,
ok = set_backtrace_depth(),
ok = print_otp_version_warning(),
ok = load_config_files(),
%% Load application first for ekka_mnesia scanner
ekka:start(),
ok = ekka_rlog:wait_for_shards(?BOOT_SHARDS, infinity),
ekka_rlog:wait_for_shards([?EMQX_MACHINE_SHARD], infinity),
ok.
graceful_shutdown() ->

View File

@ -20,6 +20,7 @@
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include("emqx_machine.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(NODE1, emqx_cluster_rpc).
@ -42,7 +43,7 @@ init_per_suite(Config) ->
application:load(emqx),
application:load(emqx_machine),
ok = ekka:start(),
ok = ekka_rlog:wait_for_shards([emqx_common_shard], infinity),
ok = ekka_rlog:wait_for_shards([?EMQX_MACHINE_SHARD], infinity),
application:set_env(emqx_machine, cluster_call_max_history, 100),
application:set_env(emqx_machine, cluster_call_clean_interval, 1000),
application:set_env(emqx_machine, cluster_call_retry_interval, 900),