* feat(emqx_conf): move conf manager for emqx_machine to emqx_conf

* chore(emqx_conf): change emqx:get_config/2 to emqx_conf:get/2

* fix: common test failed

* fix: badmatch by typo wrong key

* fix(emqx_conf): get the wrong core nodes

* fix(emqx_conf): get core node's tnx_id not latest tnx_id

* fix: add ro_transation when copy conf file

* fix: delete debug info

* fix: change ekka_rlog to mria_rlog

* fix: remove cluster_rpc from emqx_machine.

* fix: don't call ekka:start/0 explicitly

* fix: ekka should be start in emqx_machine
This commit is contained in:
zhongwencool 2021-10-21 18:08:51 +08:00 committed by GitHub
parent 9fdd5e6a7e
commit d784e63b9f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
68 changed files with 586 additions and 312 deletions

View File

@ -43,7 +43,7 @@ proper: $(REBAR)
@ENABLE_COVER_COMPILE=1 $(REBAR) proper -d test/props -c
.PHONY: ct
ct: $(REBAR)
ct: $(REBAR) conf-segs
@ENABLE_COVER_COMPILE=1 $(REBAR) ct --name 'test@127.0.0.1' -c -v
APPS=$(shell $(CURDIR)/scripts/find-apps.sh)

View File

@ -25,7 +25,8 @@
, get_release/0
, set_init_config_load_done/0
, get_init_config_load_done/0
, set_override_conf_file/1
, set_init_tnx_id/1
, get_init_tnx_id/0
]).
-include("emqx.hrl").
@ -67,21 +68,16 @@ set_init_config_load_done() ->
get_init_config_load_done() ->
application:get_env(emqx, init_config_load_done, false).
%% @doc This API is mostly for testing.
%% The override config file is typically located in the 'data' dir when
%% it is a emqx release, but emqx app should not have to know where the
%% 'data' dir is located.
set_override_conf_file(File) ->
application:set_env(emqx, override_conf_file, File).
set_init_tnx_id(TnxId) ->
application:set_env(emqx, cluster_rpc_init_tnx_id, TnxId).
get_init_tnx_id() ->
application:get_env(emqx, cluster_rpc_init_tnx_id, -1).
maybe_load_config() ->
case get_init_config_load_done() of
true ->
ok;
false ->
%% the app env 'config_files' should be set before emqx get started.
ConfFiles = application:get_env(emqx, config_files, []),
emqx_config:init_load(emqx_schema, ConfFiles)
true -> ok;
false -> emqx_config:init_load(emqx_schema)
end.
maybe_start_listeners() ->

View File

@ -15,17 +15,18 @@
%%--------------------------------------------------------------------
-module(emqx_config).
-compile({no_auto_import, [get/0, get/1, put/2]}).
-compile({no_auto_import, [get/0, get/1, put/2, erase/1]}).
-export([ init_load/2
, read_override_conf/0
-export([ init_load/1
, init_load/2
, read_override_conf/1
, check_config/2
, fill_defaults/1
, fill_defaults/2
, save_configs/4
, save_configs/5
, save_to_app_env/1
, save_to_config_map/2
, save_to_override_conf/1
, save_to_override_conf/2
]).
-export([ get_root/1
@ -41,6 +42,7 @@
, find_raw/1
, put/1
, put/2
, erase/1
]).
-export([ get_raw/1
@ -96,7 +98,8 @@
%% persistent:
%% save the updated config to the emqx_override.conf file
%% defaults to `true`
persistent => boolean()
persistent => boolean(),
override_to => local | cluster
}.
-type update_args() :: {update_cmd(), Opts :: update_opts()}.
-type update_stage() :: pre_config_update | post_config_update.
@ -199,6 +202,10 @@ put(Config) ->
?MODULE:put([RootName], RootValue)
end, ok, Config).
erase(RootName) ->
persistent_term:erase(?PERSIS_KEY(?CONF, bin(RootName))),
persistent_term:erase(?PERSIS_KEY(?RAW_CONF, bin(RootName))).
-spec put(emqx_map_lib:config_key_path(), term()) -> ok.
put(KeyPath, Config) -> do_put(?CONF, KeyPath, Config).
@ -237,13 +244,17 @@ put_raw(KeyPath, Config) -> do_put(?RAW_CONF, KeyPath, Config).
%%============================================================================
%% Load/Update configs From/To files
%%============================================================================
init_load(SchemaMod) ->
ConfFiles = application:get_env(emqx, config_files, []),
init_load(SchemaMod, ConfFiles).
%% @doc Initial load of the given config files.
%% NOTE: The order of the files is significant, configs from files orderd
%% in the rear of the list overrides prior values.
-spec init_load(module(), [string()] | binary() | hocon:config()) -> ok.
init_load(SchemaMod, Conf) when is_list(Conf) orelse is_binary(Conf) ->
ParseOptions = #{format => map},
IncDir = include_dirs(),
ParseOptions = #{format => map, include_dirs => IncDir},
Parser = case is_binary(Conf) of
true -> fun hocon:binary/2;
false -> fun hocon:files/2
@ -253,21 +264,20 @@ init_load(SchemaMod, Conf) when is_list(Conf) orelse is_binary(Conf) ->
init_load(SchemaMod, RawRichConf);
{error, Reason} ->
?SLOG(error, #{msg => failed_to_load_hocon_conf,
reason => Reason
reason => Reason,
include_dirs => IncDir
}),
error(failed_to_load_hocon_conf)
end;
init_load(SchemaMod, RawConf0) when is_map(RawConf0) ->
ok = save_schema_mod_and_names(SchemaMod),
%% override part of the input conf using emqx_override.conf
RawConf = merge_with_override_conf(RawConf0),
%% check and save configs
{_AppEnvs, CheckedConf} = check_config(SchemaMod, RawConf),
{_AppEnvs, CheckedConf} = check_config(SchemaMod, RawConf0),
ok = save_to_config_map(maps:with(get_atom_root_names(), CheckedConf),
maps:with(get_root_names(), RawConf)).
maps:with(get_root_names(), RawConf0)).
merge_with_override_conf(RawConf) ->
maps:merge(RawConf, maps:with(maps:keys(RawConf), read_override_conf())).
include_dirs() ->
[filename:join(application:get_env(emqx, data_dir, "data/"), "configs") ++ "/"].
-spec check_config(module(), raw_config()) -> {AppEnvs, CheckedConf}
when AppEnvs :: app_envs(), CheckedConf :: config().
@ -299,9 +309,18 @@ fill_defaults(SchemaMod, RawConf) ->
#{nullable => true, only_fill_defaults => true},
root_names_from_conf(RawConf)).
-spec read_override_conf() -> raw_config().
read_override_conf() ->
load_hocon_file(emqx_override_conf_name(), map).
-spec read_override_conf(map()) -> raw_config().
read_override_conf(#{} = Opts) ->
File = override_conf_file(Opts),
load_hocon_file(File, map).
override_conf_file(Opts) ->
Key =
case maps:get(override_to, Opts, local) of
local -> local_override_conf_file;
cluster -> cluster_override_conf_file
end,
application:get_env(emqx, Key, undefined).
-spec save_schema_mod_and_names(module()) -> ok.
save_schema_mod_and_names(SchemaMod) ->
@ -330,14 +349,13 @@ get_root_names() ->
get_atom_root_names() ->
[atom(N) || N <- get_root_names()].
-spec save_configs(app_envs(), config(), raw_config(), raw_config()) -> ok | {error, term()}.
save_configs(_AppEnvs, Conf, RawConf, OverrideConf) ->
-spec save_configs(app_envs(), config(), raw_config(), raw_config(), update_opts()) -> ok | {error, term()}.
save_configs(_AppEnvs, Conf, RawConf, OverrideConf, Opts) ->
%% We may need also support hot config update for the apps that use application envs.
%% If that is the case uncomment the following line to update the configs to app env
%save_to_app_env(AppEnvs),
save_to_config_map(Conf, RawConf),
%% TODO: merge RawConf to OverrideConf can be done here
save_to_override_conf(OverrideConf).
save_to_override_conf(OverrideConf, Opts).
-spec save_to_app_env([tuple()]) -> ok.
save_to_app_env(AppEnvs) ->
@ -350,11 +368,11 @@ save_to_config_map(Conf, RawConf) ->
?MODULE:put(Conf),
?MODULE:put_raw(RawConf).
-spec save_to_override_conf(raw_config()) -> ok | {error, term()}.
save_to_override_conf(undefined) ->
-spec save_to_override_conf(raw_config(), update_opts()) -> ok | {error, term()}.
save_to_override_conf(undefined, _) ->
ok;
save_to_override_conf(RawConf) ->
case emqx_override_conf_name() of
save_to_override_conf(RawConf, Opts) ->
case override_conf_file(Opts) of
undefined -> ok;
FileName ->
ok = filelib:ensure_dir(FileName),
@ -371,14 +389,12 @@ save_to_override_conf(RawConf) ->
load_hocon_file(FileName, LoadType) ->
case filelib:is_regular(FileName) of
true ->
{ok, Raw0} = hocon:load(FileName, #{format => LoadType}),
Opts = #{include_dirs => include_dirs(), format => LoadType},
{ok, Raw0} = hocon:load(FileName, Opts),
Raw0;
false -> #{}
end.
emqx_override_conf_name() ->
application:get_env(emqx, override_conf_file, undefined).
do_get(Type, KeyPath) ->
Ref = make_ref(),
Res = do_get(Type, KeyPath, Ref),

View File

@ -27,6 +27,7 @@
, add_handler/2
, remove_handler/1
, update_config/3
, get_raw_cluster_override_conf/0
, merge_to_old_config/2
]).
@ -82,6 +83,9 @@ add_handler(ConfKeyPath, HandlerName) ->
remove_handler(ConfKeyPath) ->
gen_server:cast(?MODULE, {remove_handler, ConfKeyPath}).
get_raw_cluster_override_conf() ->
gen_server:call(?MODULE, get_raw_cluster_override_conf).
%%============================================================================
-spec init(term()) -> {ok, state()}.
@ -100,9 +104,9 @@ handle_call({change_config, SchemaModule, ConfKeyPath, UpdateArgs}, _From,
#{handlers := Handlers} = State) ->
Reply = try
case process_update_request(ConfKeyPath, Handlers, UpdateArgs) of
{ok, NewRawConf, OverrideConf} ->
{ok, NewRawConf, OverrideConf, Opts} ->
check_and_save_configs(SchemaModule, ConfKeyPath, Handlers, NewRawConf,
OverrideConf, UpdateArgs);
OverrideConf, UpdateArgs, Opts);
{error, Result} ->
{error, Result}
end
@ -116,7 +120,9 @@ handle_call({change_config, SchemaModule, ConfKeyPath, UpdateArgs}, _From,
{error, Reason}
end,
{reply, Reply, State};
handle_call(get_raw_cluster_override_conf, _From, State) ->
Reply = emqx_config:read_override_conf(#{override_to => cluster}),
{reply, Reply, State};
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
@ -163,14 +169,15 @@ process_update_request(ConfKeyPath, _Handlers, {remove, Opts}) ->
OldRawConf = emqx_config:get_root_raw(ConfKeyPath),
BinKeyPath = bin_path(ConfKeyPath),
NewRawConf = emqx_map_lib:deep_remove(BinKeyPath, OldRawConf),
_ = remove_from_local_if_cluster_change(BinKeyPath, Opts),
OverrideConf = remove_from_override_config(BinKeyPath, Opts),
{ok, NewRawConf, OverrideConf};
{ok, NewRawConf, OverrideConf, Opts};
process_update_request(ConfKeyPath, Handlers, {{update, UpdateReq}, Opts}) ->
OldRawConf = emqx_config:get_root_raw(ConfKeyPath),
case do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq) of
{ok, NewRawConf} ->
OverrideConf = update_override_config(NewRawConf, Opts),
{ok, NewRawConf, OverrideConf};
{ok, NewRawConf, OverrideConf, Opts};
Error -> Error
end.
@ -187,15 +194,16 @@ do_update_config([ConfKey | ConfKeyPath], Handlers, OldRawConf, UpdateReq) ->
end.
check_and_save_configs(SchemaModule, ConfKeyPath, Handlers, NewRawConf, OverrideConf,
UpdateArgs) ->
UpdateArgs, Opts) ->
OldConf = emqx_config:get_root(ConfKeyPath),
FullRawConf = with_full_raw_confs(NewRawConf),
{AppEnvs, CheckedConf} = emqx_config:check_config(SchemaModule, FullRawConf),
NewConf = maps:with(maps:keys(OldConf), CheckedConf),
_ = remove_from_local_if_cluster_change(ConfKeyPath, Opts),
case do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, #{}) of
{ok, Result0} ->
case save_configs(ConfKeyPath, AppEnvs, NewConf, NewRawConf, OverrideConf,
UpdateArgs) of
UpdateArgs, Opts) of
{ok, Result1} ->
{ok, Result1#{post_config_update => Result0}};
Error -> Error
@ -253,8 +261,8 @@ call_post_config_update(Handlers, OldConf, NewConf, AppEnvs, UpdateReq, Result)
false -> {ok, Result}
end.
save_configs(ConfKeyPath, AppEnvs, CheckedConf, NewRawConf, OverrideConf, UpdateArgs) ->
case emqx_config:save_configs(AppEnvs, CheckedConf, NewRawConf, OverrideConf) of
save_configs(ConfKeyPath, AppEnvs, CheckedConf, NewRawConf, OverrideConf, UpdateArgs, Opts) ->
case emqx_config:save_configs(AppEnvs, CheckedConf, NewRawConf, OverrideConf, Opts) of
ok -> {ok, return_change_result(ConfKeyPath, UpdateArgs)};
{error, Reason} -> {error, {save_configs, Reason}}
end.
@ -269,16 +277,26 @@ merge_to_old_config(UpdateReq, RawConf) when is_map(UpdateReq), is_map(RawConf)
merge_to_old_config(UpdateReq, _RawConf) ->
{ok, UpdateReq}.
%% local-override.conf priority is higher than cluster-override.conf
%% If we want cluster to take effect, we must remove the local.
remove_from_local_if_cluster_change(BinKeyPath, Opts) ->
case maps:get(override, Opts, local) of
local -> ok;
cluster ->
Local = remove_from_override_config(BinKeyPath, Opts#{override_to => local}),
emqx_config:save_to_override_conf(Local, Opts)
end.
remove_from_override_config(_BinKeyPath, #{persistent := false}) ->
undefined;
remove_from_override_config(BinKeyPath, _Opts) ->
OldConf = emqx_config:read_override_conf(),
remove_from_override_config(BinKeyPath, Opts) ->
OldConf = emqx_config:read_override_conf(Opts),
emqx_map_lib:deep_remove(BinKeyPath, OldConf).
update_override_config(_RawConf, #{persistent := false}) ->
undefined;
update_override_config(RawConf, _Opts) ->
OldConf = emqx_config:read_override_conf(),
update_override_config(RawConf, Opts) ->
OldConf = emqx_config:read_override_conf(Opts),
maps:merge(OldConf, RawConf).
up_req({remove, _Opts}) -> '$remove';

View File

@ -31,7 +31,7 @@
]).
-export_type([config_key/0, config_key_path/0]).
-type config_key() :: atom() | binary().
-type config_key() :: atom() | binary() | string().
-type config_key_path() :: [config_key()].
-type convert_fun() :: fun((...) -> {K1::any(), V1::any()} | drop).

View File

@ -135,7 +135,7 @@ roots(low) ->
, {"quota",
sc(ref("quota"),
#{})}
, {"plugins", %% TODO: move to emqx_machine_schema
, {"plugins", %% TODO: move to emqx_conf_schema
sc(ref("plugins"),
#{})}
, {"stats",

View File

@ -22,6 +22,7 @@
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
all() ->
@ -132,7 +133,9 @@ basic_conf() ->
}.
set_test_listenser_confs() ->
emqx_config:put(basic_conf()).
Conf = emqx_config:get([]),
emqx_config:put(basic_conf()),
Conf.
%%--------------------------------------------------------------------
%% CT Callbacks
@ -174,10 +177,11 @@ end_per_suite(_Config) ->
]).
init_per_testcase(_TestCase, Config) ->
set_test_listenser_confs(),
Config.
NewConf = set_test_listenser_confs(),
[{config, NewConf}|Config].
end_per_testcase(_TestCase, Config) ->
emqx_config:put(?config(config, Config)),
Config.
%%--------------------------------------------------------------------
@ -953,4 +957,3 @@ session(InitFields) when is_map(InitFields) ->
quota() ->
emqx_limiter:init(zone, [{conn_messages_routing, {5, 1}},
{overall_messages_routing, {10, 1}}]).

View File

@ -146,9 +146,13 @@ load(App) ->
start_app(App, Handler) ->
start_app(App,
app_schema(App),
app_path(App, filename:join(["etc", atom_to_list(App) ++ ".conf"])),
app_path(App, filename:join(["etc", app_conf_file(App)])),
Handler).
app_conf_file(emqx_conf) -> "emqx.conf.all";
app_conf_file(App) -> atom_to_list(App) ++ ".conf".
%% TODO: get rid of cuttlefish
app_schema(App) ->
Mod = list_to_atom(atom_to_list(App) ++ "_schema"),
true = is_list(Mod:roots()),
@ -166,6 +170,7 @@ start_app(App, Schema, ConfigFile, SpecAppConfig) ->
RenderedConfigFile = render_config_file(ConfigFile, Vars),
read_schema_configs(Schema, RenderedConfigFile),
force_set_config_file_paths(App, [RenderedConfigFile]),
copy_certs(App, RenderedConfigFile),
SpecAppConfig(App),
case application:ensure_all_started(App) of
{ok, _} -> ok;
@ -288,7 +293,7 @@ change_emqx_opts(SslType, MoreOpts) ->
lists:map(fun(Listener) ->
maybe_inject_listener_ssl_options(SslType, MoreOpts, Listener)
end, Listeners),
application:set_env(emqx, listeners, NewListeners).
emqx_conf:update([listeners], NewListeners, #{}).
maybe_inject_listener_ssl_options(SslType, MoreOpts, {sll, Port, Opts}) ->
%% this clause is kept to be backward compatible
@ -409,8 +414,16 @@ catch_call(F) ->
C : E : S ->
{crashed, {C, E, S}}
end.
force_set_config_file_paths(emqx_conf, Paths) ->
application:set_env(emqx, config_files, Paths);
force_set_config_file_paths(emqx, Paths) ->
application:set_env(emqx, config_files, Paths);
force_set_config_file_paths(_, _) ->
ok.
copy_certs(emqx_conf, Dest0) ->
Dest = filename:dirname(Dest0),
From = string:replace(Dest, "emqx_conf", "emqx"),
os:cmd( ["cp -rf ", From, "/certs ", Dest, "/"]),
ok;
copy_certs(_, _) -> ok.

View File

@ -65,3 +65,5 @@
-define(AUTHZ_METRICS, ?METRICS(authz_metrics)).
-define(AUTHZ_METRICS(K), ?METRICS(authz_metrics, K)).
-define(CONF_KEY_PATH, [authorization, sources]).

View File

@ -38,7 +38,6 @@
-export([post_config_update/4, pre_config_update/2]).
-define(CONF_KEY_PATH, [authorization, sources]).
-spec(register_metrics() -> ok).
register_metrics() ->
@ -46,8 +45,8 @@ register_metrics() ->
init() ->
ok = register_metrics(),
emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE),
Sources = emqx:get_config(?CONF_KEY_PATH, []),
emqx_conf:add_handler(?CONF_KEY_PATH, ?MODULE),
Sources = emqx_conf:get(?CONF_KEY_PATH, []),
ok = check_dup_types(Sources),
NSources = init_sources(Sources),
ok = emqx_hooks:add('client.authorize', {?MODULE, authorize, [NSources]}, -1).

View File

@ -18,6 +18,7 @@ start(_StartType, _StartArgs) ->
{ok, Sup}.
stop(_State) ->
emqx_conf:remove_handler(?CONF_KEY_PATH),
ok.
%% internal functions

View File

@ -133,7 +133,7 @@ t_update_source(_) ->
, #{type := postgresql, enable := true}
, #{type := redis, enable := true}
, #{type := file, enable := true}
], emqx:get_config([authorization, sources], [])),
], emqx_conf:get([authorization, sources], [])),
{ok, _} = emqx_authz:update({?CMD_REPLACE, http}, ?SOURCE1#{<<"enable">> := false}),
{ok, _} = emqx_authz:update({?CMD_REPLACE, mongodb}, ?SOURCE2#{<<"enable">> := false}),
@ -148,7 +148,7 @@ t_update_source(_) ->
, #{type := postgresql, enable := false}
, #{type := redis, enable := false}
, #{type := file, enable := false}
], emqx:get_config([authorization, sources], [])),
], emqx_conf:get([authorization, sources], [])),
{ok, _} = emqx_authz:update(?CMD_REPLACE, []).

View File

@ -38,7 +38,7 @@ max_limit() ->
?MAX_AUTO_SUBSCRIBE.
list() ->
format(emqx:get_config([auto_subscribe, topics], [])).
format(emqx_conf:get([auto_subscribe, topics], [])).
update(Topics) ->
update_(Topics).

View File

@ -19,7 +19,7 @@
-spec(init() -> {Module :: atom(), Config :: term()}).
init() ->
do_init(emqx:get_config([auto_subscribe], #{})).
do_init(emqx_conf:get([auto_subscribe], #{})).
do_init(Config = #{topics := _Topics}) ->
Options = emqx_auto_subscribe_internal:init(Config),

View File

@ -44,7 +44,6 @@ all() ->
init_per_suite(Config) ->
mria:start(),
application:stop(?APP),
meck:new(emqx_schema, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_schema, fields, fun("auto_subscribe") ->
meck:passthrough(["auto_subscribe"]) ++
@ -86,8 +85,7 @@ init_per_suite(Config) ->
}
]
}">>),
emqx_common_test_helpers:start_apps([emqx_dashboard], fun set_special_configs/1),
emqx_common_test_helpers:start_apps([?APP]),
emqx_common_test_helpers:start_apps([emqx_dashboard, ?APP], fun set_special_configs/1),
Config.
set_special_configs(emqx_dashboard) ->

View File

@ -51,7 +51,7 @@
reload_hook() ->
unload_hook(),
Bridges = emqx:get_config([bridges], #{}),
Bridges = emqx_conf:get([bridges], #{}),
lists:foreach(fun({_Type, Bridge}) ->
lists:foreach(fun({_Name, BridgeConf}) ->
load_hook(BridgeConf)
@ -124,7 +124,7 @@ perform_bridge_changes([{Action, MapConfs} | Tasks], Result0) ->
perform_bridge_changes(Tasks, Result).
load_bridges() ->
Bridges = emqx:get_config([bridges], #{}),
Bridges = emqx_conf:get([bridges], #{}),
emqx_bridge_monitor:ensure_all_started(Bridges).
resource_id(BridgeId) when is_binary(BridgeId) ->
@ -244,7 +244,7 @@ has_subscribe_local_topic(Channels) ->
end, maps:to_list(Channels)).
get_matched_channels(Topic) ->
Bridges = emqx:get_config([bridges], #{}),
Bridges = emqx_conf:get([bridges], #{}),
maps:fold(fun
%% TODO: also trigger 'message.publish' for mqtt bridges.
(mqtt, _Conf, Acc0) -> Acc0;

View File

@ -23,11 +23,11 @@ start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_bridge_sup:start_link(),
ok = emqx_bridge:load_bridges(),
ok = emqx_bridge:reload_hook(),
emqx_config_handler:add_handler(emqx_bridge:config_key_path(), emqx_bridge),
emqx_conf:add_handler(emqx_bridge:config_key_path(), emqx_bridge),
{ok, Sup}.
stop(_State) ->
emqx_config_handler:remove_handler(emqx_bridge:config_key_path()),
emqx_conf:remove_handler(emqx_bridge:config_key_path()),
ok = emqx_bridge:unload_hook(),
ok.

View File

@ -93,28 +93,10 @@ node {
backtrace_depth = 23
cluster_call {
## Time interval to retry after a failed call
##
## @doc node.cluster_call.retry_interval
## ValueType: Duration
## Default: 1s
retry_interval = 1s
## Retain the maximum number of completed transactions (for queries)
##
## @doc node.cluster_call.max_history
## ValueType: Integer
## Range: [1, 500]
## Default: 100
max_history = 100
## Time interval to clear completed but stale transactions.
## Ensure that the number of completed transactions is less than the max_history
##
## @doc node.cluster_call.cleanup_interval
## ValueType: Duration
## Default: 5m
cleanup_interval = 5m
}
}
##==================================================================

View File

@ -0,0 +1,22 @@
-ifndef(EMQ_X_CONF_HRL).
-define(EMQ_X_CONF_HRL, true).
-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
-define(CLUSTER_MFA, cluster_rpc_mfa).
-define(CLUSTER_COMMIT, cluster_rpc_commit).
-record(cluster_rpc_mfa, {
tnx_id :: pos_integer(),
mfa :: mfa(),
created_at :: calendar:datetime(),
initiator :: node()
}).
-record(cluster_rpc_commit, {
node :: node(),
tnx_id :: pos_integer() | '$1'
}).
-endif.

View File

@ -0,0 +1,7 @@
{erl_opts, [debug_info]}.
{deps, []}.
{shell, [
% {config, "config/sys.config"},
{apps, [emqx_conf]}
]}.

View File

@ -19,6 +19,7 @@
%% API
-export([start_link/0, mnesia/1]).
-export([multicall/3, multicall/5, query/1, reset/0, status/0, skip_failed_commit/1]).
-export([get_node_tnx_id/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
handle_continue/2, code_change/3]).
@ -26,13 +27,12 @@
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-export([start_link/3]).
-endif.
-boot_mnesia({mnesia, [boot]}).
-include_lib("emqx/include/logger.hrl").
-include("emqx_machine.hrl").
-include("emqx_conf.hrl").
-define(CATCH_UP, catch_up).
-define(TIMEOUT, timer:minutes(1)).
@ -43,13 +43,13 @@
mnesia(boot) ->
ok = mria:create_table(?CLUSTER_MFA, [
{type, ordered_set},
{rlog_shard, ?EMQX_MACHINE_SHARD},
{rlog_shard, ?CLUSTER_RPC_SHARD},
{storage, disc_copies},
{record_name, cluster_rpc_mfa},
{attributes, record_info(fields, cluster_rpc_mfa)}]),
ok = mria:create_table(?CLUSTER_COMMIT, [
{type, set},
{rlog_shard, ?EMQX_MACHINE_SHARD},
{rlog_shard, ?CLUSTER_RPC_SHARD},
{storage, disc_copies},
{record_name, cluster_rpc_commit},
{attributes, record_info(fields, cluster_rpc_commit)}]).
@ -87,7 +87,7 @@ multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNu
%% the initiate transaction must happened on core node
%% make sure MFA(in the transaction) and the transaction on the same node
%% don't need rpc again inside transaction.
case mria_status:upstream_node(?EMQX_MACHINE_SHARD) of
case mria_status:upstream_node(?CLUSTER_RPC_SHARD) of
{ok, Node} -> gen_server:call({?MODULE, Node}, MFA, Timeout);
disconnected -> {error, disconnected}
end
@ -122,6 +122,13 @@ reset() -> gen_server:call(?MODULE, reset).
status() ->
transaction(fun trans_status/0, []).
-spec get_node_tnx_id(node()) -> integer().
get_node_tnx_id(Node) ->
case mnesia:wread({?CLUSTER_COMMIT, Node}) of
[] -> -1;
[#cluster_rpc_commit{tnx_id = TnxId}] -> TnxId
end.
%% Regardless of what MFA is returned, consider it a success),
%% then move to the next tnxId.
%% if the next TnxId failed, need call the function again to skip.
@ -135,9 +142,12 @@ skip_failed_commit(Node) ->
%% @private
init([Node, RetryMs]) ->
_ = mria:wait_for_tables([?CLUSTER_MFA]),
_ = mria:wait_for_tables([?CLUSTER_MFA, ?CLUSTER_COMMIT]),
{ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}),
{ok, #{node => Node, retry_interval => RetryMs}, {continue, ?CATCH_UP}}.
State = #{node => Node, retry_interval => RetryMs},
TnxId = emqx_app:get_init_tnx_id(),
ok = maybe_init_tnx_id(Node, TnxId),
{ok, State, {continue, ?CATCH_UP}}.
%% @private
handle_continue(?CATCH_UP, State) ->
@ -274,7 +284,7 @@ do_catch_up_in_one_trans(LatestId, Node) ->
end.
transaction(Func, Args) ->
mria:transaction(?EMQX_MACHINE_SHARD, Func, Args).
mria:transaction(?CLUSTER_RPC_SHARD, Func, Args).
trans_status() ->
mnesia:foldl(fun(Rec, Acc) ->
@ -363,4 +373,15 @@ commit_status_trans(Operator, TnxId) ->
mnesia:select(?CLUSTER_COMMIT, [{MatchHead, [Guard], [Result]}]).
get_retry_ms() ->
application:get_env(emqx_machine, cluster_call_retry_interval, 1000).
emqx_conf:get(["node", "cluster_call", "retry_interval"], 1000).
maybe_init_tnx_id(_Node, TnxId)when TnxId < 0 -> ok;
maybe_init_tnx_id(Node, TnxId) ->
{atomic, _} = transaction(fun init_node_tnx_id/2, [Node, TnxId]),
ok.
init_node_tnx_id(Node, TnxId) ->
case mnesia:read(?CLUSTER_COMMIT, Node) of
[] -> commit(Node, TnxId);
_ -> ok
end.

View File

@ -18,15 +18,15 @@
-behaviour(gen_server).
-include_lib("emqx/include/logger.hrl").
-include("emqx_machine.hrl").
-include("emqx_conf.hrl").
-export([start_link/0, start_link/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
start_link() ->
MaxHistory = application:get_env(emqx_machine, cluster_call_max_history, 100),
CleanupMs = application:get_env(emqx_machine, cluster_call_cleanup_interval, 5*60*1000),
MaxHistory = emqx_conf:get(["node", "cluster_call", "max_history"], 100),
CleanupMs = emqx_conf:get(["node", "cluster_call", "cleanup_interval"], 5*60*1000),
start_link(MaxHistory, CleanupMs).
start_link(MaxHistory, CleanupMs) ->
@ -49,7 +49,7 @@ handle_cast(Msg, State) ->
{noreply, State}.
handle_info({timeout, TRef, del_stale_mfa}, State = #{timer := TRef, max_history := MaxHistory}) ->
case mria:transaction(?EMQX_MACHINE_SHARD, fun del_stale_mfa/1, [MaxHistory]) of
case mria:transaction(?CLUSTER_RPC_SHARD, fun del_stale_mfa/1, [MaxHistory]) of
{atomic, ok} -> ok;
Error -> ?LOG(error, "del_stale_cluster_rpc_mfa error:~p", [Error])
end,

View File

@ -0,0 +1,10 @@
{application, emqx_conf,
[{description, "EMQX configuration management"},
{vsn, "0.1.0"},
{registered, []},
{mod, {emqx_conf_app, []}},
{included_applications, [hocon]},
{applications, [kernel, stdlib]},
{env, []},
{modules, []}
]}.

View File

@ -0,0 +1,122 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_conf).
-compile({no_auto_import, [get/1, get/2]}).
-export([add_handler/2, remove_handler/1]).
-export([get/1, get/2, get_all/1]).
-export([get_by_node/2, get_by_node/3]).
-export([update/3, update/4]).
-export([remove/2, remove/3]).
-export([reset/2, reset/3]).
%% for rpc
-export([get_node_and_config/1]).
%% API
%% @doc Adds a new config handler to emqx_config_handler.
-spec add_handler(emqx_config:config_key_path(), module()) -> ok.
add_handler(ConfKeyPath, HandlerName) ->
emqx_config_handler:add_handler(ConfKeyPath, HandlerName).
%% @doc remove config handler from emqx_config_handler.
-spec remove_handler(emqx_config:config_key_path()) -> ok.
remove_handler(ConfKeyPath) ->
emqx_config_handler:remove_handler(ConfKeyPath).
-spec get(emqx_map_lib:config_key_path()) -> term().
get(KeyPath) ->
emqx:get_config(KeyPath).
-spec get(emqx_map_lib:config_key_path(), term()) -> term().
get(KeyPath, Default) ->
emqx:get_config(KeyPath, Default).
%% @doc Returns all values in the cluster.
-spec get_all(emqx_map_lib:config_key_path()) -> #{node() => term()}.
get_all(KeyPath) ->
{ResL, []} = rpc:multicall(?MODULE, get_node_and_config, [KeyPath], 5000),
maps:from_list(ResL).
%% @doc Returns the specified node's KeyPath, or exception if not found
-spec get_by_node(node(), emqx_map_lib:config_key_path()) -> term().
get_by_node(Node, KeyPath)when Node =:= node() ->
emqx:get_config(KeyPath);
get_by_node(Node, KeyPath) ->
rpc:call(Node, ?MODULE, get_by_node, [Node, KeyPath]).
%% @doc Returns the specified node's KeyPath, or the default value if not found
-spec get_by_node(node(), emqx_map_lib:config_key_path(), term()) -> term().
get_by_node(Node, KeyPath, Default)when Node =:= node() ->
emqx:get_config(KeyPath, Default);
get_by_node(Node, KeyPath, Default) ->
rpc:call(Node, ?MODULE, get_by_node, [Node, KeyPath, Default]).
%% @doc Returns the specified node's KeyPath, or config_not_found if key path not found
-spec get_node_and_config(emqx_map_lib:config_key_path()) -> term().
get_node_and_config(KeyPath) ->
{node(), emqx:get_config(KeyPath, config_not_found)}.
%% @doc Update all value of key path in cluster-override.conf or local-override.conf.
-spec update(emqx_map_lib:config_key_path(), emqx_config:update_args(),
emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
update(KeyPath, UpdateReq, Opts0) ->
Args = [KeyPath, UpdateReq, Opts0],
{ok, _TnxId, Res} = emqx_cluster_rpc:multicall(emqx, update_config, Args),
Res.
%% @doc Update the specified node's key path in local-override.conf.
-spec update(node(), emqx_map_lib:config_key_path(), emqx_config:update_args(),
emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
update(Node, KeyPath, UpdateReq, Opts0)when Node =:= node() ->
emqx:update_config(KeyPath, UpdateReq, Opts0#{override_to => local});
update(Node, KeyPath, UpdateReq, Opts0) ->
rpc:call(Node, ?MODULE, update, [Node, KeyPath, UpdateReq, Opts0], 5000).
%% @doc remove all value of key path in cluster-override.conf or local-override.conf.
-spec remove(emqx_map_lib:config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
remove(KeyPath, Opts0) ->
Args = [KeyPath, Opts0],
{ok, _TnxId, Res} = emqx_cluster_rpc:multicall(emqx, remove_config, Args),
Res.
%% @doc remove the specified node's key path in local-override.conf.
-spec remove(node(), emqx_map_lib:config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
remove(Node, KeyPath, Opts) when Node =:= node() ->
emqx:remove_config(KeyPath, Opts#{override_to => local});
remove(Node, KeyPath, Opts) ->
rpc:call(Node, ?MODULE, remove, [KeyPath, Opts]).
%% @doc reset all value of key path in cluster-override.conf or local-override.conf.
-spec reset(emqx_map_lib:config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
reset(KeyPath, Opts0) ->
Args = [KeyPath, Opts0],
{ok, _TnxId, Res} = emqx_cluster_rpc:multicall(emqx, reset_config, Args),
Res.
%% @doc reset the specified node's key path in local-override.conf.
-spec reset(node(), emqx_map_lib:config_key_path(), emqx_config:update_opts()) ->
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
reset(Node, KeyPath, Opts) when Node =:= node() ->
emqx:reset_config(KeyPath, Opts#{override_to => local});
reset(Node, KeyPath, Opts) ->
rpc:call(Node, ?MODULE, reset, [KeyPath, Opts]).

View File

@ -0,0 +1,97 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_conf_app).
-behaviour(application).
-export([start/2, stop/1]).
-export([get_override_config_file/0]).
-include_lib("emqx/include/logger.hrl").
-include("emqx_conf.hrl").
start(_StartType, _StartArgs) ->
init_conf(),
emqx_conf_sup:start_link().
stop(_State) ->
ok.
%% internal functions
init_conf() ->
{ok, TnxId} = copy_override_conf_from_core_node(),
emqx_app:set_init_tnx_id(TnxId),
emqx_config:init_load(emqx_conf_schema),
emqx_app:set_init_config_load_done().
copy_override_conf_from_core_node() ->
case nodes() of
[] -> %% The first core nodes is self.
?SLOG(debug, #{msg => "skip_copy_overide_conf_from_core_node"}),
{ok, -1};
Nodes ->
{Results, Failed} = rpc:multicall(Nodes, ?MODULE, get_override_config_file, [], 20000),
{Ready, NotReady0} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results),
NotReady = lists:filter(fun(Res) -> element(1, Res) =:= error end, NotReady0),
case (Failed =/= [] orelse NotReady =/= []) andalso Ready =/= [] of
true ->
Warning = #{nodes => Nodes, failed => Failed, not_ready => NotReady,
msg => "ignored_bad_nodes_when_copy_init_config"},
?SLOG(warning, Warning);
false -> ok
end,
case Ready of
[] ->
%% Other core nodes running but no one replicated it successfully.
?SLOG(error, #{msg => "copy_overide_conf_from_core_node_failed",
nodes => Nodes, failed => Failed, not_ready => NotReady}),
{error, "core node not ready"};
_ ->
SortFun = fun({ok, #{wall_clock := W1}}, {ok, #{wall_clock := W2}}) -> W1 > W2 end,
[{ok, Info} | _] = lists:sort(SortFun, Ready),
#{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info,
?SLOG(debug, #{msg => "copy_overide_conf_from_core_node_success", node => Node}),
ok = emqx_config:save_to_override_conf(RawOverrideConf, #{override_to => cluster}),
{ok, TnxId}
end
end.
get_override_config_file() ->
Node = node(),
case emqx_app:get_init_config_load_done() of
false -> {error, #{node => Node, msg => "init_conf_load_not_done"}};
true ->
case mria_rlog:role() of
core ->
case erlang:whereis(emqx_config_handler) of
undefined -> {error, #{node => Node, msg => "emqx_config_handler_not_ready"}};
_ ->
Fun = fun() ->
TnxId = emqx_cluster_rpc:get_node_tnx_id(Node),
WallClock = erlang:statistics(wall_clock),
Conf = emqx_config_handler:get_raw_cluster_override_conf(),
#{wall_clock => WallClock, conf => Conf, tnx_id => TnxId, node => Node}
end,
case mria:ro_transaction(?CLUSTER_RPC_SHARD, Fun) of
{atomic, Res} -> {ok, Res};
{aborted, Reason} -> {error, #{node => Node, msg => Reason}}
end
end;
replicant ->
{ignore, #{node => Node}}
end
end.

View File

@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_machine_schema).
-module(emqx_conf_schema).
-dialyzer(no_return).
-dialyzer(no_match).
@ -279,7 +279,8 @@ fields("node") ->
})}
, {"data_dir",
sc(string(),
#{ nullable => false
#{ nullable => false,
mapping => "emqx.data_dir"
})}
, {"config_files",
sc(list(string()),
@ -288,7 +289,8 @@ fields("node") ->
})}
, {"global_gc_interval",
sc(emqx_schema:duration(),
#{ default => "15m"
#{ mapping => "emqx_machine.global_gc_interval"
, default => "15m"
})}
, {"crash_dump_dir",
sc(file(),
@ -314,32 +316,34 @@ fields("node") ->
#{ mapping => "emqx_machine.backtrace_depth"
, default => 23
})}
, {"cluster_call",
sc(ref("cluster_call"),
#{}
)}
, {"etc_dir",
sc(string(),
#{ desc => "`etc` dir for the node"
}
)}
, {"cluster_call",
sc(ref("cluster_call"),
#{
}
)}
];
fields("cluster_call") ->
[ {"retry_interval",
sc(emqx_schema:duration(),
#{ mapping => "emqx_machine.retry_interval"
#{ desc => "Time interval to retry after a failed call."
, default => "1s"
})}
, {"max_history",
sc(range(1, 500),
#{mapping => "emqx_machine.max_history",
default => 100
#{ desc => "Retain the maximum number of completed transactions (for queries)."
, default => 100
})}
, {"cleanup_interval",
sc(emqx_schema:duration(),
#{mapping => "emqx_machine.cleanup_interval",
default => "5m"
#{ desc => "Time interval to clear completed but stale transactions.
Ensure that the number of completed transactions is less than the max_history."
, default => "5m"
})}
];
@ -507,7 +511,8 @@ translation("kernel") ->
, {"logger", fun tr_logger/1}];
translation("emqx") ->
[ {"config_files", fun tr_config_files/1}
, {"override_conf_file", fun tr_override_conf_fie/1}
, {"cluster_override_conf_file", fun tr_cluster_override_conf_file/1}
, {"local_override_conf_file", fun tr_local_override_conf_file/1}
].
tr_config_files(Conf) ->
@ -523,11 +528,17 @@ tr_config_files(Conf) ->
end
end.
tr_override_conf_fie(Conf) ->
tr_cluster_override_conf_file(Conf) ->
tr_override_conf_file(Conf, "cluster-override.conf").
tr_local_override_conf_file(Conf) ->
tr_override_conf_file(Conf, "local-override.conf").
tr_override_conf_file(Conf, Filename) ->
DataDir = conf_get("node.data_dir", Conf),
%% assert, this config is not nullable
[_ | _] = DataDir,
filename:join([DataDir, "emqx_override.conf"]).
filename:join([DataDir, "configs", Filename]).
tr_cluster__discovery(Conf) ->
Strategy = conf_get("cluster.discovery_strategy", Conf),

View File

@ -0,0 +1,48 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_conf_sup).
-behaviour(supervisor).
-export([start_link/0]).
-export([init/1]).
-define(SERVER, ?MODULE).
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
init([]) ->
SupFlags = #{strategy => one_for_all,
intensity => 10,
period => 100},
ChildSpecs =
[ child_spec(emqx_cluster_rpc, [])
, child_spec(emqx_cluster_rpc_handler, [])
],
{ok, {SupFlags, ChildSpecs}}.
child_spec(Mod, Args) ->
#{
id => Mod,
start => {Mod, start_link, Args},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [Mod]
}.

View File

@ -19,8 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include("emqx_machine.hrl").
-include("emqx_conf.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(NODE1, emqx_cluster_rpc).
@ -40,13 +39,9 @@ suite() -> [{timetrap, {minutes, 3}}].
groups() -> [].
init_per_suite(Config) ->
application:load(emqx),
application:load(emqx_machine),
application:load(emqx_conf),
ok = ekka:start(),
ok = mria_rlog:wait_for_shards([?EMQX_MACHINE_SHARD], infinity),
application:set_env(emqx_machine, cluster_call_max_history, 100),
application:set_env(emqx_machine, cluster_call_clean_interval, 1000),
application:set_env(emqx_machine, cluster_call_retry_interval, 900),
ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
meck:new(emqx_alarm, [non_strict, passthrough, no_link]),
meck:expect(emqx_alarm, activate, 2, ok),
meck:expect(emqx_alarm, deactivate, 2, ok),
@ -68,6 +63,7 @@ end_per_testcase(_Config) ->
ok.
t_base_test(_Config) ->
emqx_cluster_rpc:reset(),
?assertEqual(emqx_cluster_rpc:status(), {atomic, []}),
Pid = self(),
MFA = {M, F, A} = {?MODULE, echo, [Pid, test]},
@ -181,6 +177,7 @@ t_skip_failed_commit(_Config) ->
emqx_cluster_rpc:reset(),
{atomic, []} = emqx_cluster_rpc:status(),
{ok, 1, ok} = emqx_cluster_rpc:multicall(io, format, ["test~n"], all, 1000),
sleep(180),
{atomic, List1} = emqx_cluster_rpc:status(),
Node = node(),
?assertEqual([{Node, 1}, {{Node, ?NODE2}, 1}, {{Node, ?NODE3}, 1}],
@ -254,7 +251,7 @@ failed_on_other_recover_after_5_second(Pid, CreatedAt) ->
end
end.
sleep(Second) ->
sleep(Ms) ->
receive _ -> ok
after Second -> timeout
after Ms -> timeout
end.

View File

@ -88,7 +88,7 @@ listeners() ->
Name = listener_name(Protocol, Port),
RanchOptions = ranch_opts(maps:without([protocol], ListenerOptions)),
{Name, Protocol, Port, RanchOptions}
end || ListenerOptions <- emqx_config:get([emqx_dashboard, listeners], [])].
end || ListenerOptions <- emqx_conf:get([emqx_dashboard, listeners], [])].
ranch_opts(RanchOptions) ->
Keys = [ {ack_timeout, handshake_timeout}

View File

@ -205,7 +205,7 @@ add_default_user() ->
add_default_user(binenv(default_username), binenv(default_password)).
binenv(Key) ->
iolist_to_binary(emqx:get_config([emqx_dashboard, Key], "")).
iolist_to_binary(emqx_conf:get([emqx_dashboard, Key], "")).
add_default_user(Username, Password) when ?EMPTY_KEY(Username) orelse ?EMPTY_KEY(Password) ->
igonre;

View File

@ -55,7 +55,7 @@ get_collect() -> gen_server:call(whereis(?MODULE), get_collect).
init([]) ->
timer(next_interval(), collect),
timer(get_today_remaining_seconds(), clear_expire_data),
ExpireInterval = emqx:get_config([emqx_dashboard, monitor, interval], ?EXPIRE_INTERVAL),
ExpireInterval = emqx_conf:get([emqx_dashboard, monitor, interval], ?EXPIRE_INTERVAL),
State = #{
count => count(),
expire_interval => ExpireInterval,
@ -75,7 +75,7 @@ next_interval() ->
(1000 * interval()) - (erlang:system_time(millisecond) rem (1000 * interval())) - 1.
interval() ->
emqx:get_config([?APP, sample_interval], ?DEFAULT_INTERVAL).
emqx_conf:get([?APP, sample_interval], ?DEFAULT_INTERVAL).
count() ->
60 div interval().

View File

@ -152,7 +152,7 @@ jwk(Username, Password, Salt) ->
}.
jwt_expiration_time() ->
ExpTime = emqx:get_config([emqx_dashboard, token_expired_time], ?EXPTIME),
ExpTime = emqx_conf:get([emqx_dashboard, token_expired_time], ?EXPTIME),
erlang:system_time(millisecond) + ExpTime.
salt() ->

View File

@ -339,7 +339,7 @@ schema("/ref/complicated_type") ->
{maps, hoconsc:mk(map(), #{})},
{comma_separated_list, hoconsc:mk(emqx_schema:comma_separated_list(), #{})},
{comma_separated_atoms, hoconsc:mk(emqx_schema:comma_separated_atoms(), #{})},
{log_level, hoconsc:mk(emqx_machine_schema:log_level(), #{})},
{log_level, hoconsc:mk(emqx_conf_schema:log_level(), #{})},
{fix_integer, hoconsc:mk(typerefl:integer(100), #{})}
]
}}

View File

@ -58,7 +58,7 @@ request_options() ->
}.
env(Key, Def) ->
emqx:get_config([exhook, Key], Def).
emqx_conf:get([exhook, Key], Def).
%%--------------------------------------------------------------------
%% APIs

View File

@ -279,7 +279,7 @@ try_takeover(idle, DesireId, Msg, Channel) ->
%% udp connection baseon the clientid
call_session(handle_request, Msg, Channel);
_ ->
case emqx:get_config([gateway, coap, authentication], undefined) of
case emqx_conf:get([gateway, coap, authentication], undefined) of
undefined ->
call_session(handle_request, Msg, Channel);
_ ->

View File

@ -216,7 +216,7 @@ mqtt_to_coap(MQTT, Token, SeqId) ->
options = #{observe => SeqId}}.
get_notify_type(#message{qos = Qos}) ->
case emqx:get_config([gateway, coap, notify_qos], non) of
case emqx_conf:get([gateway, coap, notify_qos], non) of
qos ->
case Qos of
?QOS_0 ->

View File

@ -86,7 +86,7 @@ get_sub_opts(#coap_message{options = Opts} = Msg) ->
#{qos := _} ->
maps:merge(SubOpts, ?SUBOPTS);
_ ->
CfgType = emqx:get_config([gateway, coap, subscribe_qos], ?QOS_0),
CfgType = emqx_conf:get([gateway, coap, subscribe_qos], ?QOS_0),
maps:merge(SubOpts, ?SUBOPTS#{qos => type_to_qos(CfgType, Msg)})
end.
@ -115,7 +115,7 @@ get_publish_qos(Msg) ->
#{<<"qos">> := QOS} ->
erlang:binary_to_integer(QOS);
_ ->
CfgType = emqx:get_config([gateway, coap, publish_qos], ?QOS_0),
CfgType = emqx_conf:get([gateway, coap, publish_qos], ?QOS_0),
type_to_qos(CfgType, Msg)
end.

View File

@ -88,4 +88,4 @@ load_gateway_by_default([{Type, Confs}|More]) ->
load_gateway_by_default(More).
confs() ->
maps:to_list(emqx:get_config([gateway], #{})).
maps:to_list(emqx_conf:get([gateway], #{})).

View File

@ -65,11 +65,11 @@
-spec load() -> ok.
load() ->
emqx_config_handler:add_handler([gateway], ?MODULE).
emqx_conf:add_handler([gateway], ?MODULE).
-spec unload() -> ok.
unload() ->
emqx_config_handler:remove_handler([gateway]).
emqx_conf:remove_handler([gateway]).
%%--------------------------------------------------------------------
%% APIs

View File

@ -1,37 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-ifndef(EMQ_X_CLUSTER_RPC_HRL).
-define(EMQ_X_CLUSTER_RPC_HRL, true).
-define(CLUSTER_MFA, cluster_rpc_mfa).
-define(CLUSTER_COMMIT, cluster_rpc_commit).
-define(EMQX_MACHINE_SHARD, emqx_machine_shard).
-record(cluster_rpc_mfa, {
tnx_id :: pos_integer(),
mfa :: mfa(),
created_at :: calendar:datetime(),
initiator :: node()
}).
-record(cluster_rpc_commit, {
node :: node(),
tnx_id :: pos_integer() | '$1'
}).
-endif.

View File

@ -18,8 +18,6 @@
-behaviour(gen_server).
-include("types.hrl").
-export([start_link/0, stop/0]).
-export([run/0]).
@ -40,7 +38,7 @@
%% APIs
%%--------------------------------------------------------------------
-spec(start_link() -> startlink_ret()).
-spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
@ -85,9 +83,10 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
ensure_timer(State) ->
case emqx:get_config([node, global_gc_interval]) of
case application:get_env(emqx_machine, global_gc_interval) of
undefined -> State;
Interval -> TRef = emqx_misc:start_timer(Interval, run),
{ok, Interval} ->
TRef = emqx_misc:start_timer(Interval, run),
State#{timer := TRef}
end.
@ -99,4 +98,3 @@ do_gc(Pid) ->
-compile({inline, [is_waiting/1]}).
is_waiting(Pid) ->
{status, waiting} == process_info(Pid, status).

View File

@ -22,7 +22,6 @@
]).
-include_lib("emqx/include/logger.hrl").
-include("emqx_machine.hrl").
%% @doc EMQ X boot entrypoint.
start() ->
@ -33,11 +32,8 @@ start() ->
os:set_signal(sigterm, handle) %% default is handle
end,
ok = set_backtrace_depth(),
ok = print_otp_version_warning(),
ok = load_config_files(),
ekka:start(),
ok = mria_rlog:wait_for_shards([?EMQX_MACHINE_SHARD], infinity),
ok.
ok = print_otp_version_warning().
graceful_shutdown() ->
emqx_machine_terminator:graceful_wait().
@ -58,12 +54,3 @@ print_otp_version_warning() ->
?ULOG("WARNING: Running on Erlang/OTP version ~p. Recommended: 23~n",
[?OTP_RELEASE]).
-endif. % OTP_RELEASE > 22
load_config_files() ->
%% the app env 'config_files' for 'emqx` app should be set
%% in app.time.config by boot script before starting Erlang VM
ConfFiles = application:get_env(emqx, config_files, []),
%% emqx_machine_schema is a superset of emqx_schema
ok = emqx_config:init_load(emqx_machine_schema, ConfFiles),
%% to avoid config being loaded again when emqx app starts.
ok = emqx_app:set_init_config_load_done().

View File

@ -50,8 +50,7 @@ start_autocluster() ->
stop_apps() ->
?SLOG(notice, #{msg => "stopping_emqx_apps"}),
_ = emqx_alarm_handler:unload(),
lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())),
emqx_machine_sup:stop_cluster_rpc().
lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())).
stop_one_app(App) ->
?SLOG(debug, #{msg => "stopping_app", app => App}),
@ -67,9 +66,6 @@ stop_one_app(App) ->
ensure_apps_started() ->
?SLOG(notice, #{msg => "(re)starting_emqx_apps"}),
%% FIXME: Hack spawning the cluster RPC asynchronously to avoid a
%% deadlock somewhere in EMQ X startup
spawn_link(fun() -> emqx_machine_sup:start_cluster_rpc() end),
lists:foreach(fun start_one_app/1, sorted_reboot_apps()).
start_one_app(App) ->
@ -90,6 +86,7 @@ reboot_apps() ->
, esockd
, ranch
, cowboy
, emqx_conf
, emqx
, emqx_prometheus
, emqx_modules

View File

@ -21,8 +21,6 @@
-behaviour(supervisor).
-export([ start_link/0
, stop_cluster_rpc/0
, start_cluster_rpc/0
]).
-export([init/1]).
@ -30,33 +28,11 @@
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
stop_cluster_rpc() ->
case whereis(?MODULE) of
undefined ->
ok;
_ ->
_ = supervisor:terminate_child(?MODULE, emqx_cluster_rpc_handler),
_ = supervisor:terminate_child(?MODULE, emqx_cluster_rpc),
ok
end.
start_cluster_rpc() ->
case whereis(?MODULE) of
undefined ->
ok;
_ ->
ensure_running(emqx_cluster_rpc),
ensure_running(emqx_cluster_rpc_handler),
ok
end.
init([]) ->
GlobalGC = child_worker(emqx_global_gc, [], permanent),
Terminator = child_worker(emqx_machine_terminator, [], transient),
ClusterRpc = child_worker(emqx_cluster_rpc, [], permanent),
ClusterHandler = child_worker(emqx_cluster_rpc_handler, [], permanent),
BootApps = child_worker(emqx_machine_boot, post_boot, [], temporary),
Children = [GlobalGC, Terminator, ClusterRpc, ClusterHandler, BootApps],
GlobalGC = child_worker(emqx_global_gc, [], permanent),
Children = [Terminator, BootApps, GlobalGC],
SupFlags = #{strategy => one_for_one,
intensity => 100,
period => 10
@ -74,13 +50,3 @@ child_worker(M, Func, Args, Restart) ->
type => worker,
modules => [M]
}.
ensure_running(Id) ->
%% Assuming Id == locally registered name
case whereis(Id) of
undefined ->
_ = supervisor:restart_child(?MODULE, Id),
ok;
_ ->
ok
end.

View File

@ -109,10 +109,10 @@ find_schema(Path) ->
{Root, element(2, lists:keyfind(RootAtom, 1, Configs))}
end.
%% we load all configs from emqx_machine_schema, some of them are defined as local ref
%% we need redirect to emqx_machine_schema.
%% such as hoconsc:ref("node") to hoconsc:ref(emqx_machine_schema, "node")
fields(Field) -> emqx_machine_schema:fields(Field).
%% we load all configs from emqx_conf_schema, some of them are defined as local ref
%% we need redirect to emqx_conf_schema.
%% such as hoconsc:ref("node") to hoconsc:ref(emqx_conf_schema, "node")
fields(Field) -> emqx_conf_schema:fields(Field).
%%%==============================================================================================
%% HTTP API Callbacks
@ -165,7 +165,7 @@ conf_path_from_querystr(Req) ->
end.
config_list(Exclude) ->
Roots = emqx_machine_schema:roots(),
Roots = emqx_conf_schema:roots(),
lists:foldl(fun(Key, Acc) -> lists:delete(Key, Acc) end, Roots, Exclude).
to_list(L) when is_list(L) -> L;

View File

@ -238,7 +238,7 @@ crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Conf}) ->
case lists:filter(fun filter_errors/1, Results) of
[{error, {invalid_listener_id, Id}} | _] ->
{400, #{code => 'BAD_REQUEST', message => ?INVALID_LISTENER_PROTOCOL}};
[{error, {emqx_machine_schema, _}} | _] ->
[{error, {emqx_conf_schema, _}} | _] ->
{400, #{code => 'BAD_REQUEST', message => ?CONFIG_SCHEMA_ERROR}};
[{error, {eaddrinuse, _}} | _] ->
{400, #{code => 'BAD_REQUEST', message => ?ADDR_PORT_INUSE}};
@ -280,7 +280,7 @@ crud_listener_by_id_on_node(put, #{bindings := #{id := Id, node := Node}, body :
{404, #{code => 'RESOURCE_NOT_FOUND', message => ?NODE_NOT_FOUND_OR_DOWN}};
{error, {invalid_listener_id, _}} ->
{400, #{code => 'BAD_REQUEST', message => ?INVALID_LISTENER_PROTOCOL}};
{error, {emqx_machine_schema, _}} ->
{error, {emqx_conf_schema, _}} ->
{400, #{code => 'BAD_REQUEST', message => ?CONFIG_SCHEMA_ERROR}};
{error, {eaddrinuse, _}} ->
{400, #{code => 'BAD_REQUEST', message => ?ADDR_PORT_INUSE}};

View File

@ -107,7 +107,7 @@ on_message_publish(Msg) ->
-spec(start_link() -> emqx_types:startlink_ret()).
start_link() ->
Opts = emqx:get_config([delayed], #{}),
Opts = emqx_conf:get([delayed], #{}),
gen_server:start_link({local, ?SERVER}, ?MODULE, [Opts], []).
-spec(store(#delayed_message{}) -> ok | {error, atom()}).

View File

@ -177,7 +177,7 @@ delayed_message(delete, #{bindings := #{msgid := Id}}) ->
%% internal function
%%--------------------------------------------------------------------
get_status() ->
emqx:get_config([delayed], #{}).
emqx_conf:get([delayed], #{}).
update_config(Config) ->
case generate_config(Config) of

View File

@ -40,7 +40,7 @@
-endif.
list() ->
emqx:get_config([event_message], #{}).
emqx_conf:get([event_message], #{}).
update(Params) ->
disable(),

View File

@ -32,17 +32,17 @@ stop(_State) ->
ok.
maybe_enable_modules() ->
emqx:get_config([delayed, enable], true) andalso emqx_delayed:enable(),
emqx:get_config([telemetry, enable], true) andalso emqx_telemetry:enable(),
emqx:get_config([observer_cli, enable], true) andalso emqx_observer_cli:enable(),
emqx_conf:get([delayed, enable], true) andalso emqx_delayed:enable(),
emqx_conf:get([telemetry, enable], true) andalso emqx_telemetry:enable(),
emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:enable(),
emqx_event_message:enable(),
emqx_rewrite:enable(),
emqx_topic_metrics:enable().
maybe_disable_modules() ->
emqx:get_config([delayed, enable], true) andalso emqx_delayed:disable(),
emqx:get_config([telemetry, enable], true) andalso emqx_telemetry:disable(),
emqx:get_config([observer_cli, enable], true) andalso emqx_observer_cli:disable(),
emqx_conf:get([delayed, enable], true) andalso emqx_delayed:disable(),
emqx_conf:get([telemetry, enable], true) andalso emqx_telemetry:disable(),
emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:disable(),
emqx_event_message:disable(),
emqx_rewrite:disable(),
emqx_topic_metrics:disable().

View File

@ -43,7 +43,7 @@
%%--------------------------------------------------------------------
enable() ->
Rules = emqx:get_config([rewrite], []),
Rules = emqx_conf:get([rewrite], []),
register_hook(Rules).
disable() ->

View File

@ -116,7 +116,7 @@ disable() ->
gen_server:call(?MODULE, disable).
get_status() ->
emqx:get_config([telemetry, enable], true).
emqx_conf:get([telemetry, enable], true).
get_uuid() ->
gen_server:call(?MODULE, get_uuid).

View File

@ -146,7 +146,7 @@ on_message_dropped(#message{topic = Topic}, _, _) ->
end.
start_link() ->
Opts = emqx:get_config([topic_metrics], []),
Opts = emqx_conf:get([topic_metrics], []),
gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
stop() ->

View File

@ -34,9 +34,9 @@ stop(_State) ->
ok.
maybe_enable_prometheus() ->
case emqx:get_config([prometheus, enable], false) of
case emqx_conf:get([prometheus, enable], false) of
true ->
emqx_prometheus_sup:start_child(?APP, emqx:get_config([prometheus], #{}));
emqx_prometheus_sup:start_child(?APP, emqx_conf:get([prometheus], #{}));
false ->
ok
end.

View File

@ -142,13 +142,13 @@ code_change(_OldVsn, State, _Extra) ->
%%------------------------------------------------------------------------------
get_config(enable) ->
emqx_config:get([psk, enable]);
emqx_conf:get([psk, enable]);
get_config(init_file) ->
emqx_config:get([psk, init_file], undefined);
emqx_conf:get([psk, init_file], undefined);
get_config(separator) ->
emqx_config:get([psk, separator], ?DEFAULT_DELIMITER);
emqx_conf:get([psk, separator], ?DEFAULT_DELIMITER);
get_config(chunk_size) ->
emqx_config:get([psk, chunk_size]).
emqx_conf:get([psk, chunk_size]).
import_psks(SrcFile) ->
case file:open(SrcFile, [read, raw, binary, read_ahead]) of

View File

@ -151,7 +151,7 @@ get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' :
timestamp = Ts}) ->
Ts + Interval * 1000;
get_expiry_time(#message{timestamp = Ts}) ->
Interval = emqx:get_config([?APP, msg_expiry_interval], ?DEF_EXPIRY_INTERVAL),
Interval = emqx_conf:get([?APP, msg_expiry_interval], ?DEF_EXPIRY_INTERVAL),
case Interval of
0 -> 0;
_ -> Ts + Interval
@ -219,7 +219,7 @@ handle_cast(Msg, State) ->
handle_info(clear_expired, #{context := Context} = State) ->
Mod = get_backend_module(),
Mod:clear_expired(Context),
Interval = emqx:get_config([?APP, msg_clear_interval], ?DEF_EXPIRY_INTERVAL),
Interval = emqx_conf:get([?APP, msg_clear_interval], ?DEF_EXPIRY_INTERVAL),
{noreply, State#{clear_timer := add_timer(Interval, clear_expired)}, hibernate};
handle_info(release_deliver_quota, #{context := Context, wait_quotas := Waits} = State) ->
@ -268,7 +268,7 @@ new_context(Id) ->
#{context_id => Id}.
is_too_big(Size) ->
Limit = emqx:get_config([?APP, max_payload_size], ?DEF_MAX_PAYLOAD_SIZE),
Limit = emqx_conf:get([?APP, max_payload_size], ?DEF_MAX_PAYLOAD_SIZE),
Limit > 0 andalso (Size > Limit).
%% @private

View File

@ -103,7 +103,7 @@ post_config_update(_Req, NewRules, OldRules, _AppEnvs) ->
load_rules() ->
maps_foreach(fun({Id, Rule}) ->
{ok, _} = create_rule(Rule#{id => bin(Id)})
end, emqx:get_config([rule_engine, rules], #{})).
end, emqx_conf:get([rule_engine, rules], #{})).
-spec create_rule(map()) -> {ok, rule()} | {error, term()}.
create_rule(Params = #{id := RuleId}) when is_binary(RuleId) ->

View File

@ -29,9 +29,9 @@ start(_Type, _Args) ->
ok = emqx_rule_events:reload(),
SupRet = emqx_rule_engine_sup:start_link(),
ok = emqx_rule_engine:load_rules(),
emqx_config_handler:add_handler(emqx_rule_engine:config_key_path(), emqx_rule_engine),
emqx_conf:add_handler(emqx_rule_engine:config_key_path(), emqx_rule_engine),
SupRet.
stop(_State) ->
emqx_config_handler:remove_handler(emqx_rule_engine:config_key_path()),
emqx_conf:remove_handler(emqx_rule_engine:config_key_path()),
ok = emqx_rule_events:unload().

View File

@ -103,12 +103,12 @@ groups() ->
%%------------------------------------------------------------------------------
init_per_suite(Config) ->
application:load(emqx_machine),
ok = emqx_common_test_helpers:start_apps([emqx_rule_engine]),
application:load(emqx_conf),
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_rule_engine]),
Config.
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_rule_engine]),
emqx_common_test_helpers:stop_apps([emqx_conf, emqx_rule_engine]),
ok.
on_resource_create(_id, _) -> #{}.
@ -136,7 +136,6 @@ end_per_group(_Groupname, _Config) ->
init_per_testcase(t_events, Config) ->
init_events_counters(),
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
SQL = "SELECT * FROM \"$events/client_connected\", "
"\"$events/client_disconnected\", "
"\"$events/session_subscribed\", "
@ -157,7 +156,6 @@ init_per_testcase(t_events, Config) ->
?assertMatch(#{id := <<"rule:t_events">>}, Rule),
[{hook_points_rules, Rule} | Config];
init_per_testcase(_TestCase, Config) ->
emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
Config.
end_per_testcase(t_events, Config) ->

View File

@ -12,17 +12,16 @@ all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
application:load(emqx_machine),
application:load(emqx_conf),
ok = emqx_config:init_load(emqx_rule_engine_schema, ?CONF_DEFAULT),
ok = emqx_common_test_helpers:start_apps([emqx_rule_engine]),
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_rule_engine]),
Config.
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_rule_engine]),
emqx_common_test_helpers:stop_apps([emqx_conf, emqx_rule_engine]),
ok.
init_per_testcase(_, Config) ->
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
Config.
end_per_testcase(_, _Config) ->

View File

@ -40,13 +40,13 @@ groups() ->
].
init_per_suite(Config) ->
emqx_common_test_helpers:start_apps([emqx]),
emqx_common_test_helpers:start_apps([emqx_conf]),
{ok, _} = emqx_rule_metrics:start_link(),
Config.
end_per_suite(_Config) ->
catch emqx_rule_metrics:stop(),
emqx_common_test_helpers:stop_apps([emqx]),
emqx_common_test_helpers:stop_apps([emqx_conf]),
ok.
init_per_testcase(_, Config) ->

View File

@ -32,9 +32,9 @@ stop(_) ->
ok.
maybe_enable_statsd() ->
case emqx:get_config([statsd, enable], false) of
case emqx_conf:get([statsd, enable], false) of
true ->
emqx_statsd_sup:start_child(?APP, emqx:get_config([statsd], #{}));
emqx_statsd_sup:start_child(?APP, emqx_conf:get([statsd], #{}));
false ->
ok
end.

View File

@ -22,7 +22,7 @@ export REL_VSN
RUNNER_SCRIPT="$RUNNER_BIN_DIR/$REL_NAME"
CODE_LOADING_MODE="${CODE_LOADING_MODE:-embedded}"
REL_DIR="$RUNNER_ROOT_DIR/releases/$REL_VSN"
SCHEMA_MOD=emqx_machine_schema
SCHEMA_MOD=emqx_conf_schema
WHOAMI=$(whoami)
@ -244,7 +244,7 @@ generate_config() {
## ths command populates two files: app.<time>.config and vm.<time>.args
## disable SC2086 to allow EMQX_LICENSE_CONF_OPTION to split
# shellcheck disable=SC2086
call_hocon -v -t "$NOW_TIME" -s $SCHEMA_MOD -c "$RUNNER_ETC_DIR"/emqx.conf $EMQX_LICENSE_CONF_OPTION -d "$RUNNER_DATA_DIR"/configs generate
call_hocon -v -t "$NOW_TIME" -I "$CONFIGS_DIR/" -s $SCHEMA_MOD -c "$RUNNER_ETC_DIR"/emqx.conf $EMQX_LICENSE_CONF_OPTION -d "$RUNNER_DATA_DIR"/configs generate
## filenames are per-hocon convention
local CONF_FILE="$CONFIGS_DIR/app.$NOW_TIME.config"
@ -363,7 +363,7 @@ NAME="${EMQX_NODE_NAME:-}"
if [ -z "$NAME" ]; then
if [ "$IS_BOOT_COMMAND" = 'yes' ]; then
# for boot commands, inspect emqx.conf for node name
NAME="$(call_hocon -s $SCHEMA_MOD -c "$RUNNER_ETC_DIR"/emqx.conf get node.name | tr -d \")"
NAME="$(call_hocon -s $SCHEMA_MOD -I "$CONFIGS_DIR/" -c "$RUNNER_ETC_DIR"/emqx.conf get node.name | tr -d \")"
else
# for non-boot commands, inspect vm.<time>.args for node name
# shellcheck disable=SC2012,SC2086
@ -390,7 +390,7 @@ PIPE_DIR="${PIPE_DIR:-/$RUNNER_DATA_DIR/${WHOAMI}_erl_pipes/$NAME/}"
COOKIE="${EMQX_NODE_COOKIE:-}"
if [ -z "$COOKIE" ]; then
if [ "$IS_BOOT_COMMAND" = 'yes' ]; then
COOKIE="$(call_hocon -s $SCHEMA_MOD -c "$RUNNER_ETC_DIR"/emqx.conf get node.cookie | tr -d \")"
COOKIE="$(call_hocon -s $SCHEMA_MOD -I "$CONFIGS_DIR/" -c "$RUNNER_ETC_DIR"/emqx.conf get node.cookie | tr -d \")"
else
# shellcheck disable=SC2012,SC2086
LATEST_VM_ARGS="$(ls -t $CONFIGS_DIR/vm.*.args | head -1)"
@ -403,7 +403,7 @@ if [ -z "$COOKIE" ]; then
fi
# Support for IPv6 Dist. See: https://github.com/emqtt/emqttd/issues/1460
PROTO_DIST="$(call_hocon -s $SCHEMA_MOD -c "$RUNNER_ETC_DIR"/emqx.conf get cluster.proto_dist | tr -d \")"
PROTO_DIST="$(call_hocon -s $SCHEMA_MOD -I "$CONFIGS_DIR/" -c "$RUNNER_ETC_DIR"/emqx.conf get cluster.proto_dist | tr -d \")"
if [ -z "$PROTO_DIST" ]; then
PROTO_DIST_ARG=""
else

View File

@ -57,7 +57,7 @@
@set nodetool="%rel_root_dir%\bin\nodetool"
@set cuttlefish="%rel_root_dir%\bin\cuttlefish"
@set node_type="-name"
@set schema_mod="emqx_machine_schema"
@set schema_mod="emqx_conf_schema"
:: Extract node name from emqx.conf
@for /f "usebackq delims=" %%I in (`"%escript% %nodetool% hocon -s %schema_mod% -c %etc_dir%\emqx.conf get node.name"`) do @(

4
build
View File

@ -71,12 +71,12 @@ docgen() {
libs_dir1="$(find "_build/default/lib/" -maxdepth 2 -name ebin -type d)"
libs_dir2="$(find "_build/$PROFILE/lib/" -maxdepth 2 -name ebin -type d)"
# shellcheck disable=SC2086
erl -noshell -pa $libs_dir1 $libs_dir2 -eval "file:write_file('$conf_doc_html', hocon_schema_html:gen(emqx_machine_schema, \"EMQ X ${PKG_VSN}\")), halt(0)."
erl -noshell -pa $libs_dir1 $libs_dir2 -eval "file:write_file('$conf_doc_html', hocon_schema_html:gen(emqx_conf_schema, \"EMQ X ${PKG_VSN}\")), halt(0)."
local conf_doc_markdown
conf_doc_markdown="$(pwd)/_build/${PROFILE}/rel/emqx/etc/emqx-config-doc.md"
echo "===< Generating config document $conf_doc_markdown"
# shellcheck disable=SC2086
erl -noshell -pa $libs_dir1 $libs_dir2 -eval "file:write_file('$conf_doc_markdown', hocon_schema_doc:gen(emqx_machine_schema)), halt(0)."
erl -noshell -pa $libs_dir1 $libs_dir2 -eval "file:write_file('$conf_doc_markdown', hocon_schema_doc:gen(emqx_conf_schema)), halt(0)."
}
make_rel() {

View File

@ -256,6 +256,7 @@ relx_apps(ReleaseType) ->
, compiler
, runtime_tools
, {emqx, load} % started by emqx_machine
, {emqx_conf, load}
, emqx_machine
, {mnesia, load}
, {ekka, load}
@ -377,7 +378,7 @@ emqx_etc_overlay(edge) ->
].
emqx_etc_overlay_common() ->
[ {"{{base_dir}}/lib/emqx_machine/etc/emqx.conf.all", "etc/emqx.conf"}
[ {"{{base_dir}}/lib/emqx_conf/etc/emqx.conf.all", "etc/emqx.conf"}
, {"{{base_dir}}/lib/emqx/etc/ssl_dist.conf", "etc/ssl_dist.conf"}
].

View File

@ -11,8 +11,8 @@
-mode(compile).
main(_) ->
{ok, BaseConf} = file:read_file("apps/emqx_machine/etc/emqx_machine.conf"),
Apps = filelib:wildcard("*", "apps/") -- ["emqx_machine"],
{ok, BaseConf} = file:read_file("apps/emqx_conf/etc/emqx_conf.conf"),
Apps = filelib:wildcard("*", "apps/") -- ["emqx_machine", "emqx_conf"],
Conf = lists:foldl(fun(App, Acc) ->
Filename = filename:join([apps, App, "etc", App]) ++ ".conf",
case filelib:is_regular(Filename) of
@ -22,4 +22,6 @@ main(_) ->
false -> Acc
end
end, BaseConf, Apps),
ok = file:write_file("apps/emqx_machine/etc/emqx.conf.all", Conf).
ClusterInc = "include \"cluster-override.conf\"\n",
LocalInc = "include \"local-override.conf\"\n",
ok = file:write_file("apps/emqx_conf/etc/emqx.conf.all", [Conf, ClusterInc, LocalInc]).