diff --git a/Makefile b/Makefile index 83fa2f15b..50cdcb99d 100644 --- a/Makefile +++ b/Makefile @@ -43,7 +43,7 @@ proper: $(REBAR) @ENABLE_COVER_COMPILE=1 $(REBAR) proper -d test/props -c .PHONY: ct -ct: $(REBAR) +ct: $(REBAR) conf-segs @ENABLE_COVER_COMPILE=1 $(REBAR) ct --name 'test@127.0.0.1' -c -v APPS=$(shell $(CURDIR)/scripts/find-apps.sh) diff --git a/apps/emqx/src/emqx_app.erl b/apps/emqx/src/emqx_app.erl index 504b245f3..9091304fc 100644 --- a/apps/emqx/src/emqx_app.erl +++ b/apps/emqx/src/emqx_app.erl @@ -25,7 +25,8 @@ , get_release/0 , set_init_config_load_done/0 , get_init_config_load_done/0 - , set_override_conf_file/1 + , set_init_tnx_id/1 + , get_init_tnx_id/0 ]). -include("emqx.hrl"). @@ -67,21 +68,16 @@ set_init_config_load_done() -> get_init_config_load_done() -> application:get_env(emqx, init_config_load_done, false). -%% @doc This API is mostly for testing. -%% The override config file is typically located in the 'data' dir when -%% it is a emqx release, but emqx app should not have to know where the -%% 'data' dir is located. -set_override_conf_file(File) -> - application:set_env(emqx, override_conf_file, File). +set_init_tnx_id(TnxId) -> + application:set_env(emqx, cluster_rpc_init_tnx_id, TnxId). + +get_init_tnx_id() -> + application:get_env(emqx, cluster_rpc_init_tnx_id, -1). maybe_load_config() -> case get_init_config_load_done() of - true -> - ok; - false -> - %% the app env 'config_files' should be set before emqx get started. - ConfFiles = application:get_env(emqx, config_files, []), - emqx_config:init_load(emqx_schema, ConfFiles) + true -> ok; + false -> emqx_config:init_load(emqx_schema) end. maybe_start_listeners() -> diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index 05dd3d122..8202d5a6a 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -15,17 +15,18 @@ %%-------------------------------------------------------------------- -module(emqx_config). --compile({no_auto_import, [get/0, get/1, put/2]}). +-compile({no_auto_import, [get/0, get/1, put/2, erase/1]}). --export([ init_load/2 - , read_override_conf/0 +-export([ init_load/1 + , init_load/2 + , read_override_conf/1 , check_config/2 , fill_defaults/1 , fill_defaults/2 - , save_configs/4 + , save_configs/5 , save_to_app_env/1 , save_to_config_map/2 - , save_to_override_conf/1 + , save_to_override_conf/2 ]). -export([ get_root/1 @@ -41,6 +42,7 @@ , find_raw/1 , put/1 , put/2 + , erase/1 ]). -export([ get_raw/1 @@ -96,7 +98,8 @@ %% persistent: %% save the updated config to the emqx_override.conf file %% defaults to `true` - persistent => boolean() + persistent => boolean(), + override_to => local | cluster }. -type update_args() :: {update_cmd(), Opts :: update_opts()}. -type update_stage() :: pre_config_update | post_config_update. @@ -199,6 +202,10 @@ put(Config) -> ?MODULE:put([RootName], RootValue) end, ok, Config). +erase(RootName) -> + persistent_term:erase(?PERSIS_KEY(?CONF, bin(RootName))), + persistent_term:erase(?PERSIS_KEY(?RAW_CONF, bin(RootName))). + -spec put(emqx_map_lib:config_key_path(), term()) -> ok. put(KeyPath, Config) -> do_put(?CONF, KeyPath, Config). @@ -237,13 +244,17 @@ put_raw(KeyPath, Config) -> do_put(?RAW_CONF, KeyPath, Config). %%============================================================================ %% Load/Update configs From/To files %%============================================================================ +init_load(SchemaMod) -> + ConfFiles = application:get_env(emqx, config_files, []), + init_load(SchemaMod, ConfFiles). %% @doc Initial load of the given config files. %% NOTE: The order of the files is significant, configs from files orderd %% in the rear of the list overrides prior values. -spec init_load(module(), [string()] | binary() | hocon:config()) -> ok. init_load(SchemaMod, Conf) when is_list(Conf) orelse is_binary(Conf) -> - ParseOptions = #{format => map}, + IncDir = include_dirs(), + ParseOptions = #{format => map, include_dirs => IncDir}, Parser = case is_binary(Conf) of true -> fun hocon:binary/2; false -> fun hocon:files/2 @@ -253,21 +264,20 @@ init_load(SchemaMod, Conf) when is_list(Conf) orelse is_binary(Conf) -> init_load(SchemaMod, RawRichConf); {error, Reason} -> ?SLOG(error, #{msg => failed_to_load_hocon_conf, - reason => Reason + reason => Reason, + include_dirs => IncDir }), error(failed_to_load_hocon_conf) end; init_load(SchemaMod, RawConf0) when is_map(RawConf0) -> ok = save_schema_mod_and_names(SchemaMod), - %% override part of the input conf using emqx_override.conf - RawConf = merge_with_override_conf(RawConf0), %% check and save configs - {_AppEnvs, CheckedConf} = check_config(SchemaMod, RawConf), + {_AppEnvs, CheckedConf} = check_config(SchemaMod, RawConf0), ok = save_to_config_map(maps:with(get_atom_root_names(), CheckedConf), - maps:with(get_root_names(), RawConf)). + maps:with(get_root_names(), RawConf0)). -merge_with_override_conf(RawConf) -> - maps:merge(RawConf, maps:with(maps:keys(RawConf), read_override_conf())). +include_dirs() -> + [filename:join(application:get_env(emqx, data_dir, "data/"), "configs") ++ "/"]. -spec check_config(module(), raw_config()) -> {AppEnvs, CheckedConf} when AppEnvs :: app_envs(), CheckedConf :: config(). @@ -299,9 +309,18 @@ fill_defaults(SchemaMod, RawConf) -> #{nullable => true, only_fill_defaults => true}, root_names_from_conf(RawConf)). --spec read_override_conf() -> raw_config(). -read_override_conf() -> - load_hocon_file(emqx_override_conf_name(), map). +-spec read_override_conf(map()) -> raw_config(). +read_override_conf(#{} = Opts) -> + File = override_conf_file(Opts), + load_hocon_file(File, map). + +override_conf_file(Opts) -> + Key = + case maps:get(override_to, Opts, local) of + local -> local_override_conf_file; + cluster -> cluster_override_conf_file + end, + application:get_env(emqx, Key, undefined). -spec save_schema_mod_and_names(module()) -> ok. save_schema_mod_and_names(SchemaMod) -> @@ -330,14 +349,13 @@ get_root_names() -> get_atom_root_names() -> [atom(N) || N <- get_root_names()]. --spec save_configs(app_envs(), config(), raw_config(), raw_config()) -> ok | {error, term()}. -save_configs(_AppEnvs, Conf, RawConf, OverrideConf) -> +-spec save_configs(app_envs(), config(), raw_config(), raw_config(), update_opts()) -> ok | {error, term()}. +save_configs(_AppEnvs, Conf, RawConf, OverrideConf, Opts) -> %% We may need also support hot config update for the apps that use application envs. %% If that is the case uncomment the following line to update the configs to app env %save_to_app_env(AppEnvs), save_to_config_map(Conf, RawConf), - %% TODO: merge RawConf to OverrideConf can be done here - save_to_override_conf(OverrideConf). + save_to_override_conf(OverrideConf, Opts). -spec save_to_app_env([tuple()]) -> ok. save_to_app_env(AppEnvs) -> @@ -350,11 +368,11 @@ save_to_config_map(Conf, RawConf) -> ?MODULE:put(Conf), ?MODULE:put_raw(RawConf). --spec save_to_override_conf(raw_config()) -> ok | {error, term()}. -save_to_override_conf(undefined) -> +-spec save_to_override_conf(raw_config(), update_opts()) -> ok | {error, term()}. +save_to_override_conf(undefined, _) -> ok; -save_to_override_conf(RawConf) -> - case emqx_override_conf_name() of +save_to_override_conf(RawConf, Opts) -> + case override_conf_file(Opts) of undefined -> ok; FileName -> ok = filelib:ensure_dir(FileName), @@ -371,14 +389,12 @@ save_to_override_conf(RawConf) -> load_hocon_file(FileName, LoadType) -> case filelib:is_regular(FileName) of true -> - {ok, Raw0} = hocon:load(FileName, #{format => LoadType}), + Opts = #{include_dirs => include_dirs(), format => LoadType}, + {ok, Raw0} = hocon:load(FileName, Opts), Raw0; false -> #{} end. -emqx_override_conf_name() -> - application:get_env(emqx, override_conf_file, undefined). - do_get(Type, KeyPath) -> Ref = make_ref(), Res = do_get(Type, KeyPath, Ref), diff --git a/apps/emqx/src/emqx_config_handler.erl b/apps/emqx/src/emqx_config_handler.erl index c44a0cb96..c75f0ee4d 100644 --- a/apps/emqx/src/emqx_config_handler.erl +++ b/apps/emqx/src/emqx_config_handler.erl @@ -27,6 +27,7 @@ , add_handler/2 , remove_handler/1 , update_config/3 + , get_raw_cluster_override_conf/0 , merge_to_old_config/2 ]). @@ -82,6 +83,9 @@ add_handler(ConfKeyPath, HandlerName) -> remove_handler(ConfKeyPath) -> gen_server:cast(?MODULE, {remove_handler, ConfKeyPath}). +get_raw_cluster_override_conf() -> + gen_server:call(?MODULE, get_raw_cluster_override_conf). + %%============================================================================ -spec init(term()) -> {ok, state()}. @@ -100,9 +104,9 @@ handle_call({change_config, SchemaModule, ConfKeyPath, UpdateArgs}, _From, #{handlers := Handlers} = State) -> Reply = try case process_update_request(ConfKeyPath, Handlers, UpdateArgs) of - {ok, NewRawConf, OverrideConf} -> + {ok, NewRawConf, OverrideConf, Opts} -> check_and_save_configs(SchemaModule, ConfKeyPath, Handlers, NewRawConf, - OverrideConf, UpdateArgs); + OverrideConf, UpdateArgs, Opts); {error, Result} -> {error, Result} end @@ -116,7 +120,9 @@ handle_call({change_config, SchemaModule, ConfKeyPath, UpdateArgs}, _From, {error, Reason} end, {reply, Reply, State}; - +handle_call(get_raw_cluster_override_conf, _From, State) -> + Reply = emqx_config:read_override_conf(#{override_to => cluster}), + {reply, Reply, State}; handle_call(_Request, _From, State) -> Reply = ok, {reply, Reply, State}. @@ -163,14 +169,15 @@ process_update_request(ConfKeyPath, _Handlers, {remove, Opts}) -> OldRawConf = emqx_config:get_root_raw(ConfKeyPath), BinKeyPath = bin_path(ConfKeyPath), NewRawConf = emqx_map_lib:deep_remove(BinKeyPath, OldRawConf), + _ = remove_from_local_if_cluster_change(BinKeyPath, Opts), OverrideConf = remove_from_override_config(BinKeyPath, Opts), - {ok, NewRawConf, OverrideConf}; + {ok, NewRawConf, OverrideConf, Opts}; process_update_request(ConfKeyPath, Handlers, {{update, UpdateReq}, Opts}) -> OldRawConf = emqx_config:get_root_raw(ConfKeyPath), case do_update_config(ConfKeyPath, Handlers, OldRawConf, UpdateReq) of {ok, NewRawConf} -> OverrideConf = update_override_config(NewRawConf, Opts), - {ok, NewRawConf, OverrideConf}; + {ok, NewRawConf, OverrideConf, Opts}; Error -> Error end. @@ -187,15 +194,16 @@ do_update_config([ConfKey | ConfKeyPath], Handlers, OldRawConf, UpdateReq) -> end. check_and_save_configs(SchemaModule, ConfKeyPath, Handlers, NewRawConf, OverrideConf, - UpdateArgs) -> + UpdateArgs, Opts) -> OldConf = emqx_config:get_root(ConfKeyPath), FullRawConf = with_full_raw_confs(NewRawConf), {AppEnvs, CheckedConf} = emqx_config:check_config(SchemaModule, FullRawConf), NewConf = maps:with(maps:keys(OldConf), CheckedConf), + _ = remove_from_local_if_cluster_change(ConfKeyPath, Opts), case do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, #{}) of {ok, Result0} -> case save_configs(ConfKeyPath, AppEnvs, NewConf, NewRawConf, OverrideConf, - UpdateArgs) of + UpdateArgs, Opts) of {ok, Result1} -> {ok, Result1#{post_config_update => Result0}}; Error -> Error @@ -253,8 +261,8 @@ call_post_config_update(Handlers, OldConf, NewConf, AppEnvs, UpdateReq, Result) false -> {ok, Result} end. -save_configs(ConfKeyPath, AppEnvs, CheckedConf, NewRawConf, OverrideConf, UpdateArgs) -> - case emqx_config:save_configs(AppEnvs, CheckedConf, NewRawConf, OverrideConf) of +save_configs(ConfKeyPath, AppEnvs, CheckedConf, NewRawConf, OverrideConf, UpdateArgs, Opts) -> + case emqx_config:save_configs(AppEnvs, CheckedConf, NewRawConf, OverrideConf, Opts) of ok -> {ok, return_change_result(ConfKeyPath, UpdateArgs)}; {error, Reason} -> {error, {save_configs, Reason}} end. @@ -269,16 +277,26 @@ merge_to_old_config(UpdateReq, RawConf) when is_map(UpdateReq), is_map(RawConf) merge_to_old_config(UpdateReq, _RawConf) -> {ok, UpdateReq}. +%% local-override.conf priority is higher than cluster-override.conf +%% If we want cluster to take effect, we must remove the local. +remove_from_local_if_cluster_change(BinKeyPath, Opts) -> + case maps:get(override, Opts, local) of + local -> ok; + cluster -> + Local = remove_from_override_config(BinKeyPath, Opts#{override_to => local}), + emqx_config:save_to_override_conf(Local, Opts) + end. + remove_from_override_config(_BinKeyPath, #{persistent := false}) -> undefined; -remove_from_override_config(BinKeyPath, _Opts) -> - OldConf = emqx_config:read_override_conf(), +remove_from_override_config(BinKeyPath, Opts) -> + OldConf = emqx_config:read_override_conf(Opts), emqx_map_lib:deep_remove(BinKeyPath, OldConf). update_override_config(_RawConf, #{persistent := false}) -> undefined; -update_override_config(RawConf, _Opts) -> - OldConf = emqx_config:read_override_conf(), +update_override_config(RawConf, Opts) -> + OldConf = emqx_config:read_override_conf(Opts), maps:merge(OldConf, RawConf). up_req({remove, _Opts}) -> '$remove'; diff --git a/apps/emqx/src/emqx_map_lib.erl b/apps/emqx/src/emqx_map_lib.erl index 6aa6606c0..729c5f13d 100644 --- a/apps/emqx/src/emqx_map_lib.erl +++ b/apps/emqx/src/emqx_map_lib.erl @@ -31,7 +31,7 @@ ]). -export_type([config_key/0, config_key_path/0]). --type config_key() :: atom() | binary(). +-type config_key() :: atom() | binary() | string(). -type config_key_path() :: [config_key()]. -type convert_fun() :: fun((...) -> {K1::any(), V1::any()} | drop). diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 5958167cd..118cea5df 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -135,7 +135,7 @@ roots(low) -> , {"quota", sc(ref("quota"), #{})} - , {"plugins", %% TODO: move to emqx_machine_schema + , {"plugins", %% TODO: move to emqx_conf_schema sc(ref("plugins"), #{})} , {"stats", diff --git a/apps/emqx/test/emqx_channel_SUITE.erl b/apps/emqx/test/emqx_channel_SUITE.erl index 2b9051de0..10e7db9cf 100644 --- a/apps/emqx/test/emqx_channel_SUITE.erl +++ b/apps/emqx/test/emqx_channel_SUITE.erl @@ -22,6 +22,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). all() -> @@ -132,7 +133,9 @@ basic_conf() -> }. set_test_listenser_confs() -> - emqx_config:put(basic_conf()). + Conf = emqx_config:get([]), + emqx_config:put(basic_conf()), + Conf. %%-------------------------------------------------------------------- %% CT Callbacks @@ -174,10 +177,11 @@ end_per_suite(_Config) -> ]). init_per_testcase(_TestCase, Config) -> - set_test_listenser_confs(), - Config. + NewConf = set_test_listenser_confs(), + [{config, NewConf}|Config]. end_per_testcase(_TestCase, Config) -> + emqx_config:put(?config(config, Config)), Config. %%-------------------------------------------------------------------- @@ -283,7 +287,7 @@ t_handle_in_re_auth(_) -> ?AUTH_PACKET(?RC_RE_AUTHENTICATE,Properties), channel(#{conninfo => #{proto_ver => ?MQTT_PROTO_V5, conn_props => undefined}}) ), - + Channel1 = channel(), ConnInfo = emqx_channel:info(conninfo, Channel1), Channel2 = emqx_channel:set_field(conninfo, ConnInfo#{conn_props => Properties}, Channel1), @@ -953,4 +957,3 @@ session(InitFields) when is_map(InitFields) -> quota() -> emqx_limiter:init(zone, [{conn_messages_routing, {5, 1}}, {overall_messages_routing, {10, 1}}]). - diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index f9cc2d0b0..127a0892c 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -146,9 +146,13 @@ load(App) -> start_app(App, Handler) -> start_app(App, app_schema(App), - app_path(App, filename:join(["etc", atom_to_list(App) ++ ".conf"])), + app_path(App, filename:join(["etc", app_conf_file(App)])), Handler). +app_conf_file(emqx_conf) -> "emqx.conf.all"; +app_conf_file(App) -> atom_to_list(App) ++ ".conf". + +%% TODO: get rid of cuttlefish app_schema(App) -> Mod = list_to_atom(atom_to_list(App) ++ "_schema"), true = is_list(Mod:roots()), @@ -166,6 +170,7 @@ start_app(App, Schema, ConfigFile, SpecAppConfig) -> RenderedConfigFile = render_config_file(ConfigFile, Vars), read_schema_configs(Schema, RenderedConfigFile), force_set_config_file_paths(App, [RenderedConfigFile]), + copy_certs(App, RenderedConfigFile), SpecAppConfig(App), case application:ensure_all_started(App) of {ok, _} -> ok; @@ -288,7 +293,7 @@ change_emqx_opts(SslType, MoreOpts) -> lists:map(fun(Listener) -> maybe_inject_listener_ssl_options(SslType, MoreOpts, Listener) end, Listeners), - application:set_env(emqx, listeners, NewListeners). + emqx_conf:update([listeners], NewListeners, #{}). maybe_inject_listener_ssl_options(SslType, MoreOpts, {sll, Port, Opts}) -> %% this clause is kept to be backward compatible @@ -409,8 +414,16 @@ catch_call(F) -> C : E : S -> {crashed, {C, E, S}} end. - +force_set_config_file_paths(emqx_conf, Paths) -> + application:set_env(emqx, config_files, Paths); force_set_config_file_paths(emqx, Paths) -> application:set_env(emqx, config_files, Paths); force_set_config_file_paths(_, _) -> ok. + +copy_certs(emqx_conf, Dest0) -> + Dest = filename:dirname(Dest0), + From = string:replace(Dest, "emqx_conf", "emqx"), + os:cmd( ["cp -rf ", From, "/certs ", Dest, "/"]), + ok; +copy_certs(_, _) -> ok. diff --git a/apps/emqx_authz/include/emqx_authz.hrl b/apps/emqx_authz/include/emqx_authz.hrl index fe4514614..c22b93c48 100644 --- a/apps/emqx_authz/include/emqx_authz.hrl +++ b/apps/emqx_authz/include/emqx_authz.hrl @@ -65,3 +65,5 @@ -define(AUTHZ_METRICS, ?METRICS(authz_metrics)). -define(AUTHZ_METRICS(K), ?METRICS(authz_metrics, K)). + +-define(CONF_KEY_PATH, [authorization, sources]). diff --git a/apps/emqx_authz/src/emqx_authz.erl b/apps/emqx_authz/src/emqx_authz.erl index ab49eb894..49570b8fe 100644 --- a/apps/emqx_authz/src/emqx_authz.erl +++ b/apps/emqx_authz/src/emqx_authz.erl @@ -38,7 +38,6 @@ -export([post_config_update/4, pre_config_update/2]). --define(CONF_KEY_PATH, [authorization, sources]). -spec(register_metrics() -> ok). register_metrics() -> @@ -46,8 +45,8 @@ register_metrics() -> init() -> ok = register_metrics(), - emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE), - Sources = emqx:get_config(?CONF_KEY_PATH, []), + emqx_conf:add_handler(?CONF_KEY_PATH, ?MODULE), + Sources = emqx_conf:get(?CONF_KEY_PATH, []), ok = check_dup_types(Sources), NSources = init_sources(Sources), ok = emqx_hooks:add('client.authorize', {?MODULE, authorize, [NSources]}, -1). diff --git a/apps/emqx_authz/src/emqx_authz_app.erl b/apps/emqx_authz/src/emqx_authz_app.erl index a5044443b..5874d5733 100644 --- a/apps/emqx_authz/src/emqx_authz_app.erl +++ b/apps/emqx_authz/src/emqx_authz_app.erl @@ -18,6 +18,7 @@ start(_StartType, _StartArgs) -> {ok, Sup}. stop(_State) -> + emqx_conf:remove_handler(?CONF_KEY_PATH), ok. %% internal functions diff --git a/apps/emqx_authz/test/emqx_authz_SUITE.erl b/apps/emqx_authz/test/emqx_authz_SUITE.erl index fd22bbc7a..d7df8eaa0 100644 --- a/apps/emqx_authz/test/emqx_authz_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_SUITE.erl @@ -133,7 +133,7 @@ t_update_source(_) -> , #{type := postgresql, enable := true} , #{type := redis, enable := true} , #{type := file, enable := true} - ], emqx:get_config([authorization, sources], [])), + ], emqx_conf:get([authorization, sources], [])), {ok, _} = emqx_authz:update({?CMD_REPLACE, http}, ?SOURCE1#{<<"enable">> := false}), {ok, _} = emqx_authz:update({?CMD_REPLACE, mongodb}, ?SOURCE2#{<<"enable">> := false}), @@ -148,7 +148,7 @@ t_update_source(_) -> , #{type := postgresql, enable := false} , #{type := redis, enable := false} , #{type := file, enable := false} - ], emqx:get_config([authorization, sources], [])), + ], emqx_conf:get([authorization, sources], [])), {ok, _} = emqx_authz:update(?CMD_REPLACE, []). diff --git a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl index 56bbf3a1c..558e5005c 100644 --- a/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl +++ b/apps/emqx_auto_subscribe/src/emqx_auto_subscribe.erl @@ -38,7 +38,7 @@ max_limit() -> ?MAX_AUTO_SUBSCRIBE. list() -> - format(emqx:get_config([auto_subscribe, topics], [])). + format(emqx_conf:get([auto_subscribe, topics], [])). update(Topics) -> update_(Topics). diff --git a/apps/emqx_auto_subscribe/src/topics_handler/emqx_auto_subscribe_handler.erl b/apps/emqx_auto_subscribe/src/topics_handler/emqx_auto_subscribe_handler.erl index 8e541eaec..b2b5f4473 100644 --- a/apps/emqx_auto_subscribe/src/topics_handler/emqx_auto_subscribe_handler.erl +++ b/apps/emqx_auto_subscribe/src/topics_handler/emqx_auto_subscribe_handler.erl @@ -19,7 +19,7 @@ -spec(init() -> {Module :: atom(), Config :: term()}). init() -> - do_init(emqx:get_config([auto_subscribe], #{})). + do_init(emqx_conf:get([auto_subscribe], #{})). do_init(Config = #{topics := _Topics}) -> Options = emqx_auto_subscribe_internal:init(Config), diff --git a/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl b/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl index 744267e74..0e5022533 100644 --- a/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl +++ b/apps/emqx_auto_subscribe/test/emqx_auto_subscribe_SUITE.erl @@ -44,7 +44,6 @@ all() -> init_per_suite(Config) -> mria:start(), application:stop(?APP), - meck:new(emqx_schema, [non_strict, passthrough, no_history, no_link]), meck:expect(emqx_schema, fields, fun("auto_subscribe") -> meck:passthrough(["auto_subscribe"]) ++ @@ -86,8 +85,7 @@ init_per_suite(Config) -> } ] }">>), - emqx_common_test_helpers:start_apps([emqx_dashboard], fun set_special_configs/1), - emqx_common_test_helpers:start_apps([?APP]), + emqx_common_test_helpers:start_apps([emqx_dashboard, ?APP], fun set_special_configs/1), Config. set_special_configs(emqx_dashboard) -> diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 351e6aeca..c07a5b842 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -51,7 +51,7 @@ reload_hook() -> unload_hook(), - Bridges = emqx:get_config([bridges], #{}), + Bridges = emqx_conf:get([bridges], #{}), lists:foreach(fun({_Type, Bridge}) -> lists:foreach(fun({_Name, BridgeConf}) -> load_hook(BridgeConf) @@ -124,7 +124,7 @@ perform_bridge_changes([{Action, MapConfs} | Tasks], Result0) -> perform_bridge_changes(Tasks, Result). load_bridges() -> - Bridges = emqx:get_config([bridges], #{}), + Bridges = emqx_conf:get([bridges], #{}), emqx_bridge_monitor:ensure_all_started(Bridges). resource_id(BridgeId) when is_binary(BridgeId) -> @@ -244,7 +244,7 @@ has_subscribe_local_topic(Channels) -> end, maps:to_list(Channels)). get_matched_channels(Topic) -> - Bridges = emqx:get_config([bridges], #{}), + Bridges = emqx_conf:get([bridges], #{}), maps:fold(fun %% TODO: also trigger 'message.publish' for mqtt bridges. (mqtt, _Conf, Acc0) -> Acc0; diff --git a/apps/emqx_bridge/src/emqx_bridge_app.erl b/apps/emqx_bridge/src/emqx_bridge_app.erl index 2f3601646..3fa8f12dd 100644 --- a/apps/emqx_bridge/src/emqx_bridge_app.erl +++ b/apps/emqx_bridge/src/emqx_bridge_app.erl @@ -23,12 +23,12 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqx_bridge_sup:start_link(), ok = emqx_bridge:load_bridges(), ok = emqx_bridge:reload_hook(), - emqx_config_handler:add_handler(emqx_bridge:config_key_path(), emqx_bridge), + emqx_conf:add_handler(emqx_bridge:config_key_path(), emqx_bridge), {ok, Sup}. stop(_State) -> - emqx_config_handler:remove_handler(emqx_bridge:config_key_path()), + emqx_conf:remove_handler(emqx_bridge:config_key_path()), ok = emqx_bridge:unload_hook(), ok. -%% internal functions \ No newline at end of file +%% internal functions diff --git a/apps/emqx_machine/etc/emqx_machine.conf b/apps/emqx_conf/etc/emqx_conf.conf similarity index 97% rename from apps/emqx_machine/etc/emqx_machine.conf rename to apps/emqx_conf/etc/emqx_conf.conf index eeab91154..fcb7a2947 100644 --- a/apps/emqx_machine/etc/emqx_machine.conf +++ b/apps/emqx_conf/etc/emqx_conf.conf @@ -93,28 +93,10 @@ node { backtrace_depth = 23 cluster_call { - ## Time interval to retry after a failed call - ## - ## @doc node.cluster_call.retry_interval - ## ValueType: Duration - ## Default: 1s retry_interval = 1s - ## Retain the maximum number of completed transactions (for queries) - ## - ## @doc node.cluster_call.max_history - ## ValueType: Integer - ## Range: [1, 500] - ## Default: 100 max_history = 100 - ## Time interval to clear completed but stale transactions. - ## Ensure that the number of completed transactions is less than the max_history - ## - ## @doc node.cluster_call.cleanup_interval - ## ValueType: Duration - ## Default: 5m cleanup_interval = 5m - } - + } } ##================================================================== diff --git a/apps/emqx_conf/include/emqx_conf.hrl b/apps/emqx_conf/include/emqx_conf.hrl new file mode 100644 index 000000000..82ad8264d --- /dev/null +++ b/apps/emqx_conf/include/emqx_conf.hrl @@ -0,0 +1,22 @@ + +-ifndef(EMQ_X_CONF_HRL). +-define(EMQ_X_CONF_HRL, true). + +-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). + +-define(CLUSTER_MFA, cluster_rpc_mfa). +-define(CLUSTER_COMMIT, cluster_rpc_commit). + +-record(cluster_rpc_mfa, { + tnx_id :: pos_integer(), + mfa :: mfa(), + created_at :: calendar:datetime(), + initiator :: node() +}). + +-record(cluster_rpc_commit, { + node :: node(), + tnx_id :: pos_integer() | '$1' +}). + +-endif. diff --git a/apps/emqx_conf/rebar.config b/apps/emqx_conf/rebar.config new file mode 100644 index 000000000..e0456112b --- /dev/null +++ b/apps/emqx_conf/rebar.config @@ -0,0 +1,7 @@ +{erl_opts, [debug_info]}. +{deps, []}. + +{shell, [ + % {config, "config/sys.config"}, + {apps, [emqx_conf]} +]}. diff --git a/apps/emqx_machine/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl similarity index 92% rename from apps/emqx_machine/src/emqx_cluster_rpc.erl rename to apps/emqx_conf/src/emqx_cluster_rpc.erl index 479561206..4187b35aa 100644 --- a/apps/emqx_machine/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -19,6 +19,7 @@ %% API -export([start_link/0, mnesia/1]). -export([multicall/3, multicall/5, query/1, reset/0, status/0, skip_failed_commit/1]). +-export([get_node_tnx_id/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, handle_continue/2, code_change/3]). @@ -26,13 +27,12 @@ -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). --export([start_link/3]). -endif. -boot_mnesia({mnesia, [boot]}). -include_lib("emqx/include/logger.hrl"). --include("emqx_machine.hrl"). +-include("emqx_conf.hrl"). -define(CATCH_UP, catch_up). -define(TIMEOUT, timer:minutes(1)). @@ -43,13 +43,13 @@ mnesia(boot) -> ok = mria:create_table(?CLUSTER_MFA, [ {type, ordered_set}, - {rlog_shard, ?EMQX_MACHINE_SHARD}, + {rlog_shard, ?CLUSTER_RPC_SHARD}, {storage, disc_copies}, {record_name, cluster_rpc_mfa}, {attributes, record_info(fields, cluster_rpc_mfa)}]), ok = mria:create_table(?CLUSTER_COMMIT, [ {type, set}, - {rlog_shard, ?EMQX_MACHINE_SHARD}, + {rlog_shard, ?CLUSTER_RPC_SHARD}, {storage, disc_copies}, {record_name, cluster_rpc_commit}, {attributes, record_info(fields, cluster_rpc_commit)}]). @@ -87,7 +87,7 @@ multicall(M, F, A, RequireNum, Timeout) when RequireNum =:= all orelse RequireNu %% the initiate transaction must happened on core node %% make sure MFA(in the transaction) and the transaction on the same node %% don't need rpc again inside transaction. - case mria_status:upstream_node(?EMQX_MACHINE_SHARD) of + case mria_status:upstream_node(?CLUSTER_RPC_SHARD) of {ok, Node} -> gen_server:call({?MODULE, Node}, MFA, Timeout); disconnected -> {error, disconnected} end @@ -122,6 +122,13 @@ reset() -> gen_server:call(?MODULE, reset). status() -> transaction(fun trans_status/0, []). +-spec get_node_tnx_id(node()) -> integer(). +get_node_tnx_id(Node) -> + case mnesia:wread({?CLUSTER_COMMIT, Node}) of + [] -> -1; + [#cluster_rpc_commit{tnx_id = TnxId}] -> TnxId + end. + %% Regardless of what MFA is returned, consider it a success), %% then move to the next tnxId. %% if the next TnxId failed, need call the function again to skip. @@ -135,9 +142,12 @@ skip_failed_commit(Node) -> %% @private init([Node, RetryMs]) -> - _ = mria:wait_for_tables([?CLUSTER_MFA]), + _ = mria:wait_for_tables([?CLUSTER_MFA, ?CLUSTER_COMMIT]), {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}), - {ok, #{node => Node, retry_interval => RetryMs}, {continue, ?CATCH_UP}}. + State = #{node => Node, retry_interval => RetryMs}, + TnxId = emqx_app:get_init_tnx_id(), + ok = maybe_init_tnx_id(Node, TnxId), + {ok, State, {continue, ?CATCH_UP}}. %% @private handle_continue(?CATCH_UP, State) -> @@ -274,7 +284,7 @@ do_catch_up_in_one_trans(LatestId, Node) -> end. transaction(Func, Args) -> - mria:transaction(?EMQX_MACHINE_SHARD, Func, Args). + mria:transaction(?CLUSTER_RPC_SHARD, Func, Args). trans_status() -> mnesia:foldl(fun(Rec, Acc) -> @@ -363,4 +373,15 @@ commit_status_trans(Operator, TnxId) -> mnesia:select(?CLUSTER_COMMIT, [{MatchHead, [Guard], [Result]}]). get_retry_ms() -> - application:get_env(emqx_machine, cluster_call_retry_interval, 1000). + emqx_conf:get(["node", "cluster_call", "retry_interval"], 1000). + +maybe_init_tnx_id(_Node, TnxId)when TnxId < 0 -> ok; +maybe_init_tnx_id(Node, TnxId) -> + {atomic, _} = transaction(fun init_node_tnx_id/2, [Node, TnxId]), + ok. + +init_node_tnx_id(Node, TnxId) -> + case mnesia:read(?CLUSTER_COMMIT, Node) of + [] -> commit(Node, TnxId); + _ -> ok + end. diff --git a/apps/emqx_machine/src/emqx_cluster_rpc_handler.erl b/apps/emqx_conf/src/emqx_cluster_rpc_handler.erl similarity index 91% rename from apps/emqx_machine/src/emqx_cluster_rpc_handler.erl rename to apps/emqx_conf/src/emqx_cluster_rpc_handler.erl index 2b1242fa7..ab2b24d27 100644 --- a/apps/emqx_machine/src/emqx_cluster_rpc_handler.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc_handler.erl @@ -18,15 +18,15 @@ -behaviour(gen_server). -include_lib("emqx/include/logger.hrl"). --include("emqx_machine.hrl"). +-include("emqx_conf.hrl"). -export([start_link/0, start_link/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). start_link() -> - MaxHistory = application:get_env(emqx_machine, cluster_call_max_history, 100), - CleanupMs = application:get_env(emqx_machine, cluster_call_cleanup_interval, 5*60*1000), + MaxHistory = emqx_conf:get(["node", "cluster_call", "max_history"], 100), + CleanupMs = emqx_conf:get(["node", "cluster_call", "cleanup_interval"], 5*60*1000), start_link(MaxHistory, CleanupMs). start_link(MaxHistory, CleanupMs) -> @@ -49,7 +49,7 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info({timeout, TRef, del_stale_mfa}, State = #{timer := TRef, max_history := MaxHistory}) -> - case mria:transaction(?EMQX_MACHINE_SHARD, fun del_stale_mfa/1, [MaxHistory]) of + case mria:transaction(?CLUSTER_RPC_SHARD, fun del_stale_mfa/1, [MaxHistory]) of {atomic, ok} -> ok; Error -> ?LOG(error, "del_stale_cluster_rpc_mfa error:~p", [Error]) end, diff --git a/apps/emqx_conf/src/emqx_conf.app.src b/apps/emqx_conf/src/emqx_conf.app.src new file mode 100644 index 000000000..563c9dc1d --- /dev/null +++ b/apps/emqx_conf/src/emqx_conf.app.src @@ -0,0 +1,10 @@ +{application, emqx_conf, + [{description, "EMQX configuration management"}, + {vsn, "0.1.0"}, + {registered, []}, + {mod, {emqx_conf_app, []}}, + {included_applications, [hocon]}, + {applications, [kernel, stdlib]}, + {env, []}, + {modules, []} + ]}. diff --git a/apps/emqx_conf/src/emqx_conf.erl b/apps/emqx_conf/src/emqx_conf.erl new file mode 100644 index 000000000..3ce3aa52b --- /dev/null +++ b/apps/emqx_conf/src/emqx_conf.erl @@ -0,0 +1,122 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_conf). + +-compile({no_auto_import, [get/1, get/2]}). + +-export([add_handler/2, remove_handler/1]). +-export([get/1, get/2, get_all/1]). +-export([get_by_node/2, get_by_node/3]). +-export([update/3, update/4]). +-export([remove/2, remove/3]). +-export([reset/2, reset/3]). + +%% for rpc +-export([get_node_and_config/1]). + +%% API +%% @doc Adds a new config handler to emqx_config_handler. +-spec add_handler(emqx_config:config_key_path(), module()) -> ok. +add_handler(ConfKeyPath, HandlerName) -> + emqx_config_handler:add_handler(ConfKeyPath, HandlerName). + +%% @doc remove config handler from emqx_config_handler. +-spec remove_handler(emqx_config:config_key_path()) -> ok. +remove_handler(ConfKeyPath) -> + emqx_config_handler:remove_handler(ConfKeyPath). + +-spec get(emqx_map_lib:config_key_path()) -> term(). +get(KeyPath) -> + emqx:get_config(KeyPath). + +-spec get(emqx_map_lib:config_key_path(), term()) -> term(). +get(KeyPath, Default) -> + emqx:get_config(KeyPath, Default). + +%% @doc Returns all values in the cluster. +-spec get_all(emqx_map_lib:config_key_path()) -> #{node() => term()}. +get_all(KeyPath) -> + {ResL, []} = rpc:multicall(?MODULE, get_node_and_config, [KeyPath], 5000), + maps:from_list(ResL). + +%% @doc Returns the specified node's KeyPath, or exception if not found +-spec get_by_node(node(), emqx_map_lib:config_key_path()) -> term(). +get_by_node(Node, KeyPath)when Node =:= node() -> + emqx:get_config(KeyPath); +get_by_node(Node, KeyPath) -> + rpc:call(Node, ?MODULE, get_by_node, [Node, KeyPath]). + +%% @doc Returns the specified node's KeyPath, or the default value if not found +-spec get_by_node(node(), emqx_map_lib:config_key_path(), term()) -> term(). +get_by_node(Node, KeyPath, Default)when Node =:= node() -> + emqx:get_config(KeyPath, Default); +get_by_node(Node, KeyPath, Default) -> + rpc:call(Node, ?MODULE, get_by_node, [Node, KeyPath, Default]). + +%% @doc Returns the specified node's KeyPath, or config_not_found if key path not found +-spec get_node_and_config(emqx_map_lib:config_key_path()) -> term(). +get_node_and_config(KeyPath) -> + {node(), emqx:get_config(KeyPath, config_not_found)}. + +%% @doc Update all value of key path in cluster-override.conf or local-override.conf. +-spec update(emqx_map_lib:config_key_path(), emqx_config:update_args(), + emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. +update(KeyPath, UpdateReq, Opts0) -> + Args = [KeyPath, UpdateReq, Opts0], + {ok, _TnxId, Res} = emqx_cluster_rpc:multicall(emqx, update_config, Args), + Res. + +%% @doc Update the specified node's key path in local-override.conf. +-spec update(node(), emqx_map_lib:config_key_path(), emqx_config:update_args(), + emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. +update(Node, KeyPath, UpdateReq, Opts0)when Node =:= node() -> + emqx:update_config(KeyPath, UpdateReq, Opts0#{override_to => local}); +update(Node, KeyPath, UpdateReq, Opts0) -> + rpc:call(Node, ?MODULE, update, [Node, KeyPath, UpdateReq, Opts0], 5000). + +%% @doc remove all value of key path in cluster-override.conf or local-override.conf. +-spec remove(emqx_map_lib:config_key_path(), emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. +remove(KeyPath, Opts0) -> + Args = [KeyPath, Opts0], + {ok, _TnxId, Res} = emqx_cluster_rpc:multicall(emqx, remove_config, Args), + Res. + +%% @doc remove the specified node's key path in local-override.conf. +-spec remove(node(), emqx_map_lib:config_key_path(), emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. +remove(Node, KeyPath, Opts) when Node =:= node() -> + emqx:remove_config(KeyPath, Opts#{override_to => local}); +remove(Node, KeyPath, Opts) -> + rpc:call(Node, ?MODULE, remove, [KeyPath, Opts]). + +%% @doc reset all value of key path in cluster-override.conf or local-override.conf. +-spec reset(emqx_map_lib:config_key_path(), emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. +reset(KeyPath, Opts0) -> + Args = [KeyPath, Opts0], + {ok, _TnxId, Res} = emqx_cluster_rpc:multicall(emqx, reset_config, Args), + Res. + +%% @doc reset the specified node's key path in local-override.conf. +-spec reset(node(), emqx_map_lib:config_key_path(), emqx_config:update_opts()) -> + {ok, emqx_config:update_result()} | {error, emqx_config:update_error()}. +reset(Node, KeyPath, Opts) when Node =:= node() -> + emqx:reset_config(KeyPath, Opts#{override_to => local}); +reset(Node, KeyPath, Opts) -> + rpc:call(Node, ?MODULE, reset, [KeyPath, Opts]). diff --git a/apps/emqx_conf/src/emqx_conf_app.erl b/apps/emqx_conf/src/emqx_conf_app.erl new file mode 100644 index 000000000..0a33605fe --- /dev/null +++ b/apps/emqx_conf/src/emqx_conf_app.erl @@ -0,0 +1,97 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_conf_app). + +-behaviour(application). + +-export([start/2, stop/1]). +-export([get_override_config_file/0]). + +-include_lib("emqx/include/logger.hrl"). +-include("emqx_conf.hrl"). + +start(_StartType, _StartArgs) -> + init_conf(), + emqx_conf_sup:start_link(). + +stop(_State) -> + ok. + +%% internal functions +init_conf() -> + {ok, TnxId} = copy_override_conf_from_core_node(), + emqx_app:set_init_tnx_id(TnxId), + emqx_config:init_load(emqx_conf_schema), + emqx_app:set_init_config_load_done(). + +copy_override_conf_from_core_node() -> + case nodes() of + [] -> %% The first core nodes is self. + ?SLOG(debug, #{msg => "skip_copy_overide_conf_from_core_node"}), + {ok, -1}; + Nodes -> + {Results, Failed} = rpc:multicall(Nodes, ?MODULE, get_override_config_file, [], 20000), + {Ready, NotReady0} = lists:partition(fun(Res) -> element(1, Res) =:= ok end, Results), + NotReady = lists:filter(fun(Res) -> element(1, Res) =:= error end, NotReady0), + case (Failed =/= [] orelse NotReady =/= []) andalso Ready =/= [] of + true -> + Warning = #{nodes => Nodes, failed => Failed, not_ready => NotReady, + msg => "ignored_bad_nodes_when_copy_init_config"}, + ?SLOG(warning, Warning); + false -> ok + end, + case Ready of + [] -> + %% Other core nodes running but no one replicated it successfully. + ?SLOG(error, #{msg => "copy_overide_conf_from_core_node_failed", + nodes => Nodes, failed => Failed, not_ready => NotReady}), + {error, "core node not ready"}; + _ -> + SortFun = fun({ok, #{wall_clock := W1}}, {ok, #{wall_clock := W2}}) -> W1 > W2 end, + [{ok, Info} | _] = lists:sort(SortFun, Ready), + #{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info, + ?SLOG(debug, #{msg => "copy_overide_conf_from_core_node_success", node => Node}), + ok = emqx_config:save_to_override_conf(RawOverrideConf, #{override_to => cluster}), + {ok, TnxId} + end + end. + +get_override_config_file() -> + Node = node(), + case emqx_app:get_init_config_load_done() of + false -> {error, #{node => Node, msg => "init_conf_load_not_done"}}; + true -> + case mria_rlog:role() of + core -> + case erlang:whereis(emqx_config_handler) of + undefined -> {error, #{node => Node, msg => "emqx_config_handler_not_ready"}}; + _ -> + Fun = fun() -> + TnxId = emqx_cluster_rpc:get_node_tnx_id(Node), + WallClock = erlang:statistics(wall_clock), + Conf = emqx_config_handler:get_raw_cluster_override_conf(), + #{wall_clock => WallClock, conf => Conf, tnx_id => TnxId, node => Node} + end, + case mria:ro_transaction(?CLUSTER_RPC_SHARD, Fun) of + {atomic, Res} -> {ok, Res}; + {aborted, Reason} -> {error, #{node => Node, msg => Reason}} + end + end; + replicant -> + {ignore, #{node => Node}} + end + end. diff --git a/apps/emqx_machine/src/emqx_machine_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl similarity index 95% rename from apps/emqx_machine/src/emqx_machine_schema.erl rename to apps/emqx_conf/src/emqx_conf_schema.erl index a67b0ceeb..a187bc58c 100644 --- a/apps/emqx_machine/src/emqx_machine_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_machine_schema). +-module(emqx_conf_schema). -dialyzer(no_return). -dialyzer(no_match). @@ -279,7 +279,8 @@ fields("node") -> })} , {"data_dir", sc(string(), - #{ nullable => false + #{ nullable => false, + mapping => "emqx.data_dir" })} , {"config_files", sc(list(string()), @@ -288,8 +289,9 @@ fields("node") -> })} , {"global_gc_interval", sc(emqx_schema:duration(), - #{ default => "15m" - })} + #{ mapping => "emqx_machine.global_gc_interval" + , default => "15m" + })} , {"crash_dump_dir", sc(file(), #{ mapping => "vm_args.-env ERL_CRASH_DUMP" @@ -314,32 +316,34 @@ fields("node") -> #{ mapping => "emqx_machine.backtrace_depth" , default => 23 })} - , {"cluster_call", - sc(ref("cluster_call"), - #{} - )} , {"etc_dir", sc(string(), #{ desc => "`etc` dir for the node" } )} + , {"cluster_call", + sc(ref("cluster_call"), + #{ + } + )} ]; fields("cluster_call") -> [ {"retry_interval", sc(emqx_schema:duration(), - #{ mapping => "emqx_machine.retry_interval" - , default => "1s" - })} + #{ desc => "Time interval to retry after a failed call." + , default => "1s" + })} , {"max_history", sc(range(1, 500), - #{mapping => "emqx_machine.max_history", - default => 100 + #{ desc => "Retain the maximum number of completed transactions (for queries)." + , default => 100 })} , {"cleanup_interval", sc(emqx_schema:duration(), - #{mapping => "emqx_machine.cleanup_interval", - default => "5m" + #{ desc => "Time interval to clear completed but stale transactions. + Ensure that the number of completed transactions is less than the max_history." + , default => "5m" })} ]; @@ -507,7 +511,8 @@ translation("kernel") -> , {"logger", fun tr_logger/1}]; translation("emqx") -> [ {"config_files", fun tr_config_files/1} - , {"override_conf_file", fun tr_override_conf_fie/1} + , {"cluster_override_conf_file", fun tr_cluster_override_conf_file/1} + , {"local_override_conf_file", fun tr_local_override_conf_file/1} ]. tr_config_files(Conf) -> @@ -523,11 +528,17 @@ tr_config_files(Conf) -> end end. -tr_override_conf_fie(Conf) -> +tr_cluster_override_conf_file(Conf) -> + tr_override_conf_file(Conf, "cluster-override.conf"). + +tr_local_override_conf_file(Conf) -> + tr_override_conf_file(Conf, "local-override.conf"). + +tr_override_conf_file(Conf, Filename) -> DataDir = conf_get("node.data_dir", Conf), %% assert, this config is not nullable [_ | _] = DataDir, - filename:join([DataDir, "emqx_override.conf"]). + filename:join([DataDir, "configs", Filename]). tr_cluster__discovery(Conf) -> Strategy = conf_get("cluster.discovery_strategy", Conf), diff --git a/apps/emqx_conf/src/emqx_conf_sup.erl b/apps/emqx_conf/src/emqx_conf_sup.erl new file mode 100644 index 000000000..bcecb9798 --- /dev/null +++ b/apps/emqx_conf/src/emqx_conf_sup.erl @@ -0,0 +1,48 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_conf_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +-define(SERVER, ?MODULE). + +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +init([]) -> + SupFlags = #{strategy => one_for_all, + intensity => 10, + period => 100}, + ChildSpecs = + [ child_spec(emqx_cluster_rpc, []) + , child_spec(emqx_cluster_rpc_handler, []) + ], + {ok, {SupFlags, ChildSpecs}}. + +child_spec(Mod, Args) -> + #{ + id => Mod, + start => {Mod, start_link, Args}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [Mod] + }. diff --git a/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl similarity index 94% rename from apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl rename to apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl index cf6c794a7..cb79151ce 100644 --- a/apps/emqx_machine/test/emqx_cluster_rpc_SUITE.erl +++ b/apps/emqx_conf/test/emqx_cluster_rpc_SUITE.erl @@ -19,8 +19,7 @@ -compile(export_all). -compile(nowarn_export_all). --include_lib("emqx/include/emqx.hrl"). --include("emqx_machine.hrl"). +-include("emqx_conf.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -define(NODE1, emqx_cluster_rpc). @@ -40,13 +39,9 @@ suite() -> [{timetrap, {minutes, 3}}]. groups() -> []. init_per_suite(Config) -> - application:load(emqx), - application:load(emqx_machine), + application:load(emqx_conf), ok = ekka:start(), - ok = mria_rlog:wait_for_shards([?EMQX_MACHINE_SHARD], infinity), - application:set_env(emqx_machine, cluster_call_max_history, 100), - application:set_env(emqx_machine, cluster_call_clean_interval, 1000), - application:set_env(emqx_machine, cluster_call_retry_interval, 900), + ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity), meck:new(emqx_alarm, [non_strict, passthrough, no_link]), meck:expect(emqx_alarm, activate, 2, ok), meck:expect(emqx_alarm, deactivate, 2, ok), @@ -68,6 +63,7 @@ end_per_testcase(_Config) -> ok. t_base_test(_Config) -> + emqx_cluster_rpc:reset(), ?assertEqual(emqx_cluster_rpc:status(), {atomic, []}), Pid = self(), MFA = {M, F, A} = {?MODULE, echo, [Pid, test]}, @@ -181,6 +177,7 @@ t_skip_failed_commit(_Config) -> emqx_cluster_rpc:reset(), {atomic, []} = emqx_cluster_rpc:status(), {ok, 1, ok} = emqx_cluster_rpc:multicall(io, format, ["test~n"], all, 1000), + sleep(180), {atomic, List1} = emqx_cluster_rpc:status(), Node = node(), ?assertEqual([{Node, 1}, {{Node, ?NODE2}, 1}, {{Node, ?NODE3}, 1}], @@ -254,7 +251,7 @@ failed_on_other_recover_after_5_second(Pid, CreatedAt) -> end end. -sleep(Second) -> +sleep(Ms) -> receive _ -> ok - after Second -> timeout + after Ms -> timeout end. diff --git a/apps/emqx_machine/test/emqx_global_gc_SUITE.erl b/apps/emqx_conf/test/emqx_global_gc_SUITE.erl similarity index 100% rename from apps/emqx_machine/test/emqx_global_gc_SUITE.erl rename to apps/emqx_conf/test/emqx_global_gc_SUITE.erl diff --git a/apps/emqx_dashboard/src/emqx_dashboard.erl b/apps/emqx_dashboard/src/emqx_dashboard.erl index d4d5621e7..2a5066d50 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard.erl @@ -88,7 +88,7 @@ listeners() -> Name = listener_name(Protocol, Port), RanchOptions = ranch_opts(maps:without([protocol], ListenerOptions)), {Name, Protocol, Port, RanchOptions} - end || ListenerOptions <- emqx_config:get([emqx_dashboard, listeners], [])]. + end || ListenerOptions <- emqx_conf:get([emqx_dashboard, listeners], [])]. ranch_opts(RanchOptions) -> Keys = [ {ack_timeout, handshake_timeout} diff --git a/apps/emqx_dashboard/src/emqx_dashboard_admin.erl b/apps/emqx_dashboard/src/emqx_dashboard_admin.erl index 2a7f2410a..e2ab0ad8f 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_admin.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_admin.erl @@ -205,7 +205,7 @@ add_default_user() -> add_default_user(binenv(default_username), binenv(default_password)). binenv(Key) -> - iolist_to_binary(emqx:get_config([emqx_dashboard, Key], "")). + iolist_to_binary(emqx_conf:get([emqx_dashboard, Key], "")). add_default_user(Username, Password) when ?EMPTY_KEY(Username) orelse ?EMPTY_KEY(Password) -> igonre; diff --git a/apps/emqx_dashboard/src/emqx_dashboard_collection.erl b/apps/emqx_dashboard/src/emqx_dashboard_collection.erl index 0eb1e033b..ab5767229 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_collection.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_collection.erl @@ -55,7 +55,7 @@ get_collect() -> gen_server:call(whereis(?MODULE), get_collect). init([]) -> timer(next_interval(), collect), timer(get_today_remaining_seconds(), clear_expire_data), - ExpireInterval = emqx:get_config([emqx_dashboard, monitor, interval], ?EXPIRE_INTERVAL), + ExpireInterval = emqx_conf:get([emqx_dashboard, monitor, interval], ?EXPIRE_INTERVAL), State = #{ count => count(), expire_interval => ExpireInterval, @@ -75,7 +75,7 @@ next_interval() -> (1000 * interval()) - (erlang:system_time(millisecond) rem (1000 * interval())) - 1. interval() -> - emqx:get_config([?APP, sample_interval], ?DEFAULT_INTERVAL). + emqx_conf:get([?APP, sample_interval], ?DEFAULT_INTERVAL). count() -> 60 div interval(). diff --git a/apps/emqx_dashboard/src/emqx_dashboard_token.erl b/apps/emqx_dashboard/src/emqx_dashboard_token.erl index 5a6771ae5..e0fa9c415 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_token.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_token.erl @@ -152,7 +152,7 @@ jwk(Username, Password, Salt) -> }. jwt_expiration_time() -> - ExpTime = emqx:get_config([emqx_dashboard, token_expired_time], ?EXPTIME), + ExpTime = emqx_conf:get([emqx_dashboard, token_expired_time], ?EXPTIME), erlang:system_time(millisecond) + ExpTime. salt() -> diff --git a/apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl b/apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl index a9969ba4b..92f411da7 100644 --- a/apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl +++ b/apps/emqx_dashboard/test/emqx_swagger_response_SUITE.erl @@ -339,7 +339,7 @@ schema("/ref/complicated_type") -> {maps, hoconsc:mk(map(), #{})}, {comma_separated_list, hoconsc:mk(emqx_schema:comma_separated_list(), #{})}, {comma_separated_atoms, hoconsc:mk(emqx_schema:comma_separated_atoms(), #{})}, - {log_level, hoconsc:mk(emqx_machine_schema:log_level(), #{})}, + {log_level, hoconsc:mk(emqx_conf_schema:log_level(), #{})}, {fix_integer, hoconsc:mk(typerefl:integer(100), #{})} ] }} diff --git a/apps/emqx_exhook/src/emqx_exhook_sup.erl b/apps/emqx_exhook/src/emqx_exhook_sup.erl index 60a6a2915..ea03a54f9 100644 --- a/apps/emqx_exhook/src/emqx_exhook_sup.erl +++ b/apps/emqx_exhook/src/emqx_exhook_sup.erl @@ -58,7 +58,7 @@ request_options() -> }. env(Key, Def) -> - emqx:get_config([exhook, Key], Def). + emqx_conf:get([exhook, Key], Def). %%-------------------------------------------------------------------- %% APIs diff --git a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl index 839567d1e..acce25807 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl @@ -279,7 +279,7 @@ try_takeover(idle, DesireId, Msg, Channel) -> %% udp connection baseon the clientid call_session(handle_request, Msg, Channel); _ -> - case emqx:get_config([gateway, coap, authentication], undefined) of + case emqx_conf:get([gateway, coap, authentication], undefined) of undefined -> call_session(handle_request, Msg, Channel); _ -> diff --git a/apps/emqx_gateway/src/coap/emqx_coap_session.erl b/apps/emqx_gateway/src/coap/emqx_coap_session.erl index 0fbc47cf8..13f1be240 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_session.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_session.erl @@ -216,7 +216,7 @@ mqtt_to_coap(MQTT, Token, SeqId) -> options = #{observe => SeqId}}. get_notify_type(#message{qos = Qos}) -> - case emqx:get_config([gateway, coap, notify_qos], non) of + case emqx_conf:get([gateway, coap, notify_qos], non) of qos -> case Qos of ?QOS_0 -> diff --git a/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl b/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl index 85cf32c6d..d1d73e9b9 100644 --- a/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl +++ b/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl @@ -86,7 +86,7 @@ get_sub_opts(#coap_message{options = Opts} = Msg) -> #{qos := _} -> maps:merge(SubOpts, ?SUBOPTS); _ -> - CfgType = emqx:get_config([gateway, coap, subscribe_qos], ?QOS_0), + CfgType = emqx_conf:get([gateway, coap, subscribe_qos], ?QOS_0), maps:merge(SubOpts, ?SUBOPTS#{qos => type_to_qos(CfgType, Msg)}) end. @@ -115,7 +115,7 @@ get_publish_qos(Msg) -> #{<<"qos">> := QOS} -> erlang:binary_to_integer(QOS); _ -> - CfgType = emqx:get_config([gateway, coap, publish_qos], ?QOS_0), + CfgType = emqx_conf:get([gateway, coap, publish_qos], ?QOS_0), type_to_qos(CfgType, Msg) end. diff --git a/apps/emqx_gateway/src/emqx_gateway_app.erl b/apps/emqx_gateway/src/emqx_gateway_app.erl index 8b09f18a0..9d1c878e3 100644 --- a/apps/emqx_gateway/src/emqx_gateway_app.erl +++ b/apps/emqx_gateway/src/emqx_gateway_app.erl @@ -88,4 +88,4 @@ load_gateway_by_default([{Type, Confs}|More]) -> load_gateway_by_default(More). confs() -> - maps:to_list(emqx:get_config([gateway], #{})). + maps:to_list(emqx_conf:get([gateway], #{})). diff --git a/apps/emqx_gateway/src/emqx_gateway_conf.erl b/apps/emqx_gateway/src/emqx_gateway_conf.erl index 1d3500b09..557e46693 100644 --- a/apps/emqx_gateway/src/emqx_gateway_conf.erl +++ b/apps/emqx_gateway/src/emqx_gateway_conf.erl @@ -65,11 +65,11 @@ -spec load() -> ok. load() -> - emqx_config_handler:add_handler([gateway], ?MODULE). + emqx_conf:add_handler([gateway], ?MODULE). -spec unload() -> ok. unload() -> - emqx_config_handler:remove_handler([gateway]). + emqx_conf:remove_handler([gateway]). %%-------------------------------------------------------------------- %% APIs diff --git a/apps/emqx_machine/include/emqx_machine.hrl b/apps/emqx_machine/include/emqx_machine.hrl deleted file mode 100644 index cea62c5c3..000000000 --- a/apps/emqx_machine/include/emqx_machine.hrl +++ /dev/null @@ -1,37 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --ifndef(EMQ_X_CLUSTER_RPC_HRL). --define(EMQ_X_CLUSTER_RPC_HRL, true). - --define(CLUSTER_MFA, cluster_rpc_mfa). --define(CLUSTER_COMMIT, cluster_rpc_commit). - --define(EMQX_MACHINE_SHARD, emqx_machine_shard). - --record(cluster_rpc_mfa, { - tnx_id :: pos_integer(), - mfa :: mfa(), - created_at :: calendar:datetime(), - initiator :: node() -}). - --record(cluster_rpc_commit, { - node :: node(), - tnx_id :: pos_integer() | '$1' -}). - --endif. diff --git a/apps/emqx/src/emqx_global_gc.erl b/apps/emqx_machine/src/emqx_global_gc.erl similarity index 91% rename from apps/emqx/src/emqx_global_gc.erl rename to apps/emqx_machine/src/emqx_global_gc.erl index 5192508e5..87a2f9b5b 100644 --- a/apps/emqx/src/emqx_global_gc.erl +++ b/apps/emqx_machine/src/emqx_global_gc.erl @@ -18,8 +18,6 @@ -behaviour(gen_server). --include("types.hrl"). - -export([start_link/0, stop/0]). -export([run/0]). @@ -40,7 +38,7 @@ %% APIs %%-------------------------------------------------------------------- --spec(start_link() -> startlink_ret()). +-spec(start_link() -> {ok, pid()} | ignore | {error, term()}). start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -85,10 +83,11 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- ensure_timer(State) -> - case emqx:get_config([node, global_gc_interval]) of + case application:get_env(emqx_machine, global_gc_interval) of undefined -> State; - Interval -> TRef = emqx_misc:start_timer(Interval, run), - State#{timer := TRef} + {ok, Interval} -> + TRef = emqx_misc:start_timer(Interval, run), + State#{timer := TRef} end. run_gc() -> lists:foreach(fun do_gc/1, processes()). @@ -99,4 +98,3 @@ do_gc(Pid) -> -compile({inline, [is_waiting/1]}). is_waiting(Pid) -> {status, waiting} == process_info(Pid, status). - diff --git a/apps/emqx_machine/src/emqx_machine.erl b/apps/emqx_machine/src/emqx_machine.erl index 313f45dd3..fae21eece 100644 --- a/apps/emqx_machine/src/emqx_machine.erl +++ b/apps/emqx_machine/src/emqx_machine.erl @@ -22,7 +22,6 @@ ]). -include_lib("emqx/include/logger.hrl"). --include("emqx_machine.hrl"). %% @doc EMQ X boot entrypoint. start() -> @@ -33,11 +32,8 @@ start() -> os:set_signal(sigterm, handle) %% default is handle end, ok = set_backtrace_depth(), - ok = print_otp_version_warning(), - ok = load_config_files(), ekka:start(), - ok = mria_rlog:wait_for_shards([?EMQX_MACHINE_SHARD], infinity), - ok. + ok = print_otp_version_warning(). graceful_shutdown() -> emqx_machine_terminator:graceful_wait(). @@ -58,12 +54,3 @@ print_otp_version_warning() -> ?ULOG("WARNING: Running on Erlang/OTP version ~p. Recommended: 23~n", [?OTP_RELEASE]). -endif. % OTP_RELEASE > 22 - -load_config_files() -> - %% the app env 'config_files' for 'emqx` app should be set - %% in app.time.config by boot script before starting Erlang VM - ConfFiles = application:get_env(emqx, config_files, []), - %% emqx_machine_schema is a superset of emqx_schema - ok = emqx_config:init_load(emqx_machine_schema, ConfFiles), - %% to avoid config being loaded again when emqx app starts. - ok = emqx_app:set_init_config_load_done(). diff --git a/apps/emqx_machine/src/emqx_machine_boot.erl b/apps/emqx_machine/src/emqx_machine_boot.erl index ed3b21a9d..9fab5854e 100644 --- a/apps/emqx_machine/src/emqx_machine_boot.erl +++ b/apps/emqx_machine/src/emqx_machine_boot.erl @@ -50,8 +50,7 @@ start_autocluster() -> stop_apps() -> ?SLOG(notice, #{msg => "stopping_emqx_apps"}), _ = emqx_alarm_handler:unload(), - lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())), - emqx_machine_sup:stop_cluster_rpc(). + lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())). stop_one_app(App) -> ?SLOG(debug, #{msg => "stopping_app", app => App}), @@ -67,9 +66,6 @@ stop_one_app(App) -> ensure_apps_started() -> ?SLOG(notice, #{msg => "(re)starting_emqx_apps"}), - %% FIXME: Hack spawning the cluster RPC asynchronously to avoid a - %% deadlock somewhere in EMQ X startup - spawn_link(fun() -> emqx_machine_sup:start_cluster_rpc() end), lists:foreach(fun start_one_app/1, sorted_reboot_apps()). start_one_app(App) -> @@ -90,6 +86,7 @@ reboot_apps() -> , esockd , ranch , cowboy + , emqx_conf , emqx , emqx_prometheus , emqx_modules diff --git a/apps/emqx_machine/src/emqx_machine_sup.erl b/apps/emqx_machine/src/emqx_machine_sup.erl index bf5403d43..844cc171b 100644 --- a/apps/emqx_machine/src/emqx_machine_sup.erl +++ b/apps/emqx_machine/src/emqx_machine_sup.erl @@ -21,8 +21,6 @@ -behaviour(supervisor). -export([ start_link/0 - , stop_cluster_rpc/0 - , start_cluster_rpc/0 ]). -export([init/1]). @@ -30,33 +28,11 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). -stop_cluster_rpc() -> - case whereis(?MODULE) of - undefined -> - ok; - _ -> - _ = supervisor:terminate_child(?MODULE, emqx_cluster_rpc_handler), - _ = supervisor:terminate_child(?MODULE, emqx_cluster_rpc), - ok - end. - -start_cluster_rpc() -> - case whereis(?MODULE) of - undefined -> - ok; - _ -> - ensure_running(emqx_cluster_rpc), - ensure_running(emqx_cluster_rpc_handler), - ok - end. - init([]) -> - GlobalGC = child_worker(emqx_global_gc, [], permanent), Terminator = child_worker(emqx_machine_terminator, [], transient), - ClusterRpc = child_worker(emqx_cluster_rpc, [], permanent), - ClusterHandler = child_worker(emqx_cluster_rpc_handler, [], permanent), BootApps = child_worker(emqx_machine_boot, post_boot, [], temporary), - Children = [GlobalGC, Terminator, ClusterRpc, ClusterHandler, BootApps], + GlobalGC = child_worker(emqx_global_gc, [], permanent), + Children = [Terminator, BootApps, GlobalGC], SupFlags = #{strategy => one_for_one, intensity => 100, period => 10 @@ -74,13 +50,3 @@ child_worker(M, Func, Args, Restart) -> type => worker, modules => [M] }. - -ensure_running(Id) -> - %% Assuming Id == locally registered name - case whereis(Id) of - undefined -> - _ = supervisor:restart_child(?MODULE, Id), - ok; - _ -> - ok - end. diff --git a/apps/emqx_management/src/emqx_mgmt_api_configs.erl b/apps/emqx_management/src/emqx_mgmt_api_configs.erl index de2774cae..abf9d7cff 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_configs.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_configs.erl @@ -109,10 +109,10 @@ find_schema(Path) -> {Root, element(2, lists:keyfind(RootAtom, 1, Configs))} end. -%% we load all configs from emqx_machine_schema, some of them are defined as local ref -%% we need redirect to emqx_machine_schema. -%% such as hoconsc:ref("node") to hoconsc:ref(emqx_machine_schema, "node") -fields(Field) -> emqx_machine_schema:fields(Field). +%% we load all configs from emqx_conf_schema, some of them are defined as local ref +%% we need redirect to emqx_conf_schema. +%% such as hoconsc:ref("node") to hoconsc:ref(emqx_conf_schema, "node") +fields(Field) -> emqx_conf_schema:fields(Field). %%%============================================================================================== %% HTTP API Callbacks @@ -165,7 +165,7 @@ conf_path_from_querystr(Req) -> end. config_list(Exclude) -> - Roots = emqx_machine_schema:roots(), + Roots = emqx_conf_schema:roots(), lists:foldl(fun(Key, Acc) -> lists:delete(Key, Acc) end, Roots, Exclude). to_list(L) when is_list(L) -> L; diff --git a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl index 52c7a8709..4568bcd9a 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_listeners.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_listeners.erl @@ -238,7 +238,7 @@ crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Conf}) -> case lists:filter(fun filter_errors/1, Results) of [{error, {invalid_listener_id, Id}} | _] -> {400, #{code => 'BAD_REQUEST', message => ?INVALID_LISTENER_PROTOCOL}}; - [{error, {emqx_machine_schema, _}} | _] -> + [{error, {emqx_conf_schema, _}} | _] -> {400, #{code => 'BAD_REQUEST', message => ?CONFIG_SCHEMA_ERROR}}; [{error, {eaddrinuse, _}} | _] -> {400, #{code => 'BAD_REQUEST', message => ?ADDR_PORT_INUSE}}; @@ -280,7 +280,7 @@ crud_listener_by_id_on_node(put, #{bindings := #{id := Id, node := Node}, body : {404, #{code => 'RESOURCE_NOT_FOUND', message => ?NODE_NOT_FOUND_OR_DOWN}}; {error, {invalid_listener_id, _}} -> {400, #{code => 'BAD_REQUEST', message => ?INVALID_LISTENER_PROTOCOL}}; - {error, {emqx_machine_schema, _}} -> + {error, {emqx_conf_schema, _}} -> {400, #{code => 'BAD_REQUEST', message => ?CONFIG_SCHEMA_ERROR}}; {error, {eaddrinuse, _}} -> {400, #{code => 'BAD_REQUEST', message => ?ADDR_PORT_INUSE}}; diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index 401e2a10c..fd831c040 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -107,7 +107,7 @@ on_message_publish(Msg) -> -spec(start_link() -> emqx_types:startlink_ret()). start_link() -> - Opts = emqx:get_config([delayed], #{}), + Opts = emqx_conf:get([delayed], #{}), gen_server:start_link({local, ?SERVER}, ?MODULE, [Opts], []). -spec(store(#delayed_message{}) -> ok | {error, atom()}). diff --git a/apps/emqx_modules/src/emqx_delayed_api.erl b/apps/emqx_modules/src/emqx_delayed_api.erl index 768ef4590..3cc2c7c21 100644 --- a/apps/emqx_modules/src/emqx_delayed_api.erl +++ b/apps/emqx_modules/src/emqx_delayed_api.erl @@ -177,7 +177,7 @@ delayed_message(delete, #{bindings := #{msgid := Id}}) -> %% internal function %%-------------------------------------------------------------------- get_status() -> - emqx:get_config([delayed], #{}). + emqx_conf:get([delayed], #{}). update_config(Config) -> case generate_config(Config) of diff --git a/apps/emqx_modules/src/emqx_event_message.erl b/apps/emqx_modules/src/emqx_event_message.erl index 842ddae04..ccdb75ccb 100644 --- a/apps/emqx_modules/src/emqx_event_message.erl +++ b/apps/emqx_modules/src/emqx_event_message.erl @@ -40,7 +40,7 @@ -endif. list() -> - emqx:get_config([event_message], #{}). + emqx_conf:get([event_message], #{}). update(Params) -> disable(), diff --git a/apps/emqx_modules/src/emqx_modules_app.erl b/apps/emqx_modules/src/emqx_modules_app.erl index 123431605..8804bbc4a 100644 --- a/apps/emqx_modules/src/emqx_modules_app.erl +++ b/apps/emqx_modules/src/emqx_modules_app.erl @@ -32,17 +32,17 @@ stop(_State) -> ok. maybe_enable_modules() -> - emqx:get_config([delayed, enable], true) andalso emqx_delayed:enable(), - emqx:get_config([telemetry, enable], true) andalso emqx_telemetry:enable(), - emqx:get_config([observer_cli, enable], true) andalso emqx_observer_cli:enable(), + emqx_conf:get([delayed, enable], true) andalso emqx_delayed:enable(), + emqx_conf:get([telemetry, enable], true) andalso emqx_telemetry:enable(), + emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:enable(), emqx_event_message:enable(), emqx_rewrite:enable(), emqx_topic_metrics:enable(). maybe_disable_modules() -> - emqx:get_config([delayed, enable], true) andalso emqx_delayed:disable(), - emqx:get_config([telemetry, enable], true) andalso emqx_telemetry:disable(), - emqx:get_config([observer_cli, enable], true) andalso emqx_observer_cli:disable(), + emqx_conf:get([delayed, enable], true) andalso emqx_delayed:disable(), + emqx_conf:get([telemetry, enable], true) andalso emqx_telemetry:disable(), + emqx_conf:get([observer_cli, enable], true) andalso emqx_observer_cli:disable(), emqx_event_message:disable(), emqx_rewrite:disable(), emqx_topic_metrics:disable(). diff --git a/apps/emqx_modules/src/emqx_rewrite.erl b/apps/emqx_modules/src/emqx_rewrite.erl index 1b057ca51..16009d46b 100644 --- a/apps/emqx_modules/src/emqx_rewrite.erl +++ b/apps/emqx_modules/src/emqx_rewrite.erl @@ -43,7 +43,7 @@ %%-------------------------------------------------------------------- enable() -> - Rules = emqx:get_config([rewrite], []), + Rules = emqx_conf:get([rewrite], []), register_hook(Rules). disable() -> diff --git a/apps/emqx_modules/src/emqx_telemetry.erl b/apps/emqx_modules/src/emqx_telemetry.erl index e81d1257d..61f00745e 100644 --- a/apps/emqx_modules/src/emqx_telemetry.erl +++ b/apps/emqx_modules/src/emqx_telemetry.erl @@ -116,7 +116,7 @@ disable() -> gen_server:call(?MODULE, disable). get_status() -> - emqx:get_config([telemetry, enable], true). + emqx_conf:get([telemetry, enable], true). get_uuid() -> gen_server:call(?MODULE, get_uuid). diff --git a/apps/emqx_modules/src/emqx_topic_metrics.erl b/apps/emqx_modules/src/emqx_topic_metrics.erl index 05c45f469..7ca14b921 100644 --- a/apps/emqx_modules/src/emqx_topic_metrics.erl +++ b/apps/emqx_modules/src/emqx_topic_metrics.erl @@ -146,7 +146,7 @@ on_message_dropped(#message{topic = Topic}, _, _) -> end. start_link() -> - Opts = emqx:get_config([topic_metrics], []), + Opts = emqx_conf:get([topic_metrics], []), gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). stop() -> diff --git a/apps/emqx_prometheus/src/emqx_prometheus_app.erl b/apps/emqx_prometheus/src/emqx_prometheus_app.erl index 4f954a792..380f8d8f6 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus_app.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus_app.erl @@ -34,9 +34,9 @@ stop(_State) -> ok. maybe_enable_prometheus() -> - case emqx:get_config([prometheus, enable], false) of + case emqx_conf:get([prometheus, enable], false) of true -> - emqx_prometheus_sup:start_child(?APP, emqx:get_config([prometheus], #{})); + emqx_prometheus_sup:start_child(?APP, emqx_conf:get([prometheus], #{})); false -> ok end. diff --git a/apps/emqx_psk/src/emqx_psk.erl b/apps/emqx_psk/src/emqx_psk.erl index 7de332cb9..ff89041ce 100644 --- a/apps/emqx_psk/src/emqx_psk.erl +++ b/apps/emqx_psk/src/emqx_psk.erl @@ -142,13 +142,13 @@ code_change(_OldVsn, State, _Extra) -> %%------------------------------------------------------------------------------ get_config(enable) -> - emqx_config:get([psk, enable]); + emqx_conf:get([psk, enable]); get_config(init_file) -> - emqx_config:get([psk, init_file], undefined); + emqx_conf:get([psk, init_file], undefined); get_config(separator) -> - emqx_config:get([psk, separator], ?DEFAULT_DELIMITER); + emqx_conf:get([psk, separator], ?DEFAULT_DELIMITER); get_config(chunk_size) -> - emqx_config:get([psk, chunk_size]). + emqx_conf:get([psk, chunk_size]). import_psks(SrcFile) -> case file:open(SrcFile, [read, raw, binary, read_ahead]) of diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 2bb62e645..5d248e638 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -151,7 +151,7 @@ get_expiry_time(#message{headers = #{properties := #{'Message-Expiry-Interval' : timestamp = Ts}) -> Ts + Interval * 1000; get_expiry_time(#message{timestamp = Ts}) -> - Interval = emqx:get_config([?APP, msg_expiry_interval], ?DEF_EXPIRY_INTERVAL), + Interval = emqx_conf:get([?APP, msg_expiry_interval], ?DEF_EXPIRY_INTERVAL), case Interval of 0 -> 0; _ -> Ts + Interval @@ -219,7 +219,7 @@ handle_cast(Msg, State) -> handle_info(clear_expired, #{context := Context} = State) -> Mod = get_backend_module(), Mod:clear_expired(Context), - Interval = emqx:get_config([?APP, msg_clear_interval], ?DEF_EXPIRY_INTERVAL), + Interval = emqx_conf:get([?APP, msg_clear_interval], ?DEF_EXPIRY_INTERVAL), {noreply, State#{clear_timer := add_timer(Interval, clear_expired)}, hibernate}; handle_info(release_deliver_quota, #{context := Context, wait_quotas := Waits} = State) -> @@ -268,7 +268,7 @@ new_context(Id) -> #{context_id => Id}. is_too_big(Size) -> - Limit = emqx:get_config([?APP, max_payload_size], ?DEF_MAX_PAYLOAD_SIZE), + Limit = emqx_conf:get([?APP, max_payload_size], ?DEF_MAX_PAYLOAD_SIZE), Limit > 0 andalso (Size > Limit). %% @private diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 8a6d78ff6..42f652d59 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -103,7 +103,7 @@ post_config_update(_Req, NewRules, OldRules, _AppEnvs) -> load_rules() -> maps_foreach(fun({Id, Rule}) -> {ok, _} = create_rule(Rule#{id => bin(Id)}) - end, emqx:get_config([rule_engine, rules], #{})). + end, emqx_conf:get([rule_engine, rules], #{})). -spec create_rule(map()) -> {ok, rule()} | {error, term()}. create_rule(Params = #{id := RuleId}) when is_binary(RuleId) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl index e9ca443ec..610244350 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_app.erl @@ -29,9 +29,9 @@ start(_Type, _Args) -> ok = emqx_rule_events:reload(), SupRet = emqx_rule_engine_sup:start_link(), ok = emqx_rule_engine:load_rules(), - emqx_config_handler:add_handler(emqx_rule_engine:config_key_path(), emqx_rule_engine), + emqx_conf:add_handler(emqx_rule_engine:config_key_path(), emqx_rule_engine), SupRet. stop(_State) -> - emqx_config_handler:remove_handler(emqx_rule_engine:config_key_path()), + emqx_conf:remove_handler(emqx_rule_engine:config_key_path()), ok = emqx_rule_events:unload(). diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 56af13acc..a920b18ad 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -103,12 +103,12 @@ groups() -> %%------------------------------------------------------------------------------ init_per_suite(Config) -> - application:load(emqx_machine), - ok = emqx_common_test_helpers:start_apps([emqx_rule_engine]), + application:load(emqx_conf), + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_rule_engine]), Config. end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([emqx_rule_engine]), + emqx_common_test_helpers:stop_apps([emqx_conf, emqx_rule_engine]), ok. on_resource_create(_id, _) -> #{}. @@ -136,7 +136,6 @@ end_per_group(_Groupname, _Config) -> init_per_testcase(t_events, Config) -> init_events_counters(), - {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), SQL = "SELECT * FROM \"$events/client_connected\", " "\"$events/client_disconnected\", " "\"$events/session_subscribed\", " @@ -157,7 +156,6 @@ init_per_testcase(t_events, Config) -> ?assertMatch(#{id := <<"rule:t_events">>}, Rule), [{hook_points_rules, Rule} | Config]; init_per_testcase(_TestCase, Config) -> - emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), Config. end_per_testcase(t_events, Config) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl index 4b0f027f7..10ea55a69 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_SUITE.erl @@ -12,17 +12,16 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - application:load(emqx_machine), + application:load(emqx_conf), ok = emqx_config:init_load(emqx_rule_engine_schema, ?CONF_DEFAULT), - ok = emqx_common_test_helpers:start_apps([emqx_rule_engine]), + ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_rule_engine]), Config. end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([emqx_rule_engine]), + emqx_common_test_helpers:stop_apps([emqx_conf, emqx_rule_engine]), ok. init_per_testcase(_, Config) -> - {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000), Config. end_per_testcase(_, _Config) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_metrics_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_metrics_SUITE.erl index e30966ab8..418e8dd0f 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_metrics_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_metrics_SUITE.erl @@ -40,13 +40,13 @@ groups() -> ]. init_per_suite(Config) -> - emqx_common_test_helpers:start_apps([emqx]), + emqx_common_test_helpers:start_apps([emqx_conf]), {ok, _} = emqx_rule_metrics:start_link(), Config. end_per_suite(_Config) -> catch emqx_rule_metrics:stop(), - emqx_common_test_helpers:stop_apps([emqx]), + emqx_common_test_helpers:stop_apps([emqx_conf]), ok. init_per_testcase(_, Config) -> diff --git a/apps/emqx_statsd/src/emqx_statsd_app.erl b/apps/emqx_statsd/src/emqx_statsd_app.erl index 4a5ff0496..820a86c04 100644 --- a/apps/emqx_statsd/src/emqx_statsd_app.erl +++ b/apps/emqx_statsd/src/emqx_statsd_app.erl @@ -32,9 +32,9 @@ stop(_) -> ok. maybe_enable_statsd() -> - case emqx:get_config([statsd, enable], false) of + case emqx_conf:get([statsd, enable], false) of true -> - emqx_statsd_sup:start_child(?APP, emqx:get_config([statsd], #{})); + emqx_statsd_sup:start_child(?APP, emqx_conf:get([statsd], #{})); false -> ok end. diff --git a/bin/emqx b/bin/emqx index 1be7996d1..23b991337 100755 --- a/bin/emqx +++ b/bin/emqx @@ -22,7 +22,7 @@ export REL_VSN RUNNER_SCRIPT="$RUNNER_BIN_DIR/$REL_NAME" CODE_LOADING_MODE="${CODE_LOADING_MODE:-embedded}" REL_DIR="$RUNNER_ROOT_DIR/releases/$REL_VSN" -SCHEMA_MOD=emqx_machine_schema +SCHEMA_MOD=emqx_conf_schema WHOAMI=$(whoami) @@ -244,7 +244,7 @@ generate_config() { ## ths command populates two files: app.