diff --git a/.github/workflows/run_cts_tests.yaml b/.github/workflows/run_cts_tests.yaml index bfa1d1394..487d8fae7 100644 --- a/.github/workflows/run_cts_tests.yaml +++ b/.github/workflows/run_cts_tests.yaml @@ -50,7 +50,7 @@ jobs: printenv > .env docker exec -i erlang sh -c "make ensure-rebar3" docker exec -i erlang sh -c "./rebar3 eunit --application=emqx_auth_ldap" - docker exec --env-file .env -i erlang sh -c "./rebar3 ct --dir apps/emqx_auth_ldap" + docker exec --env-file .env -i erlang sh -c "make apps/emqx_auth_ldap-ct" - uses: actions/upload-artifact@v1 if: failure() with: @@ -120,7 +120,7 @@ jobs: printenv > .env docker exec -i erlang sh -c "make ensure-rebar3" docker exec -i erlang sh -c "./rebar3 eunit --application=emqx_auth_mongo" - docker exec --env-file .env -i erlang sh -c "./rebar3 ct --dir apps/emqx_auth_mongo" + docker exec --env-file .env -i erlang sh -c "make apps/emqx_auth_mongo-ct" - uses: actions/upload-artifact@v1 if: failure() with: @@ -203,7 +203,7 @@ jobs: printenv > .env docker exec -i erlang sh -c "make ensure-rebar3" docker exec -i erlang sh -c "./rebar3 eunit --application=emqx_auth_mysql" - docker exec --env-file .env -i erlang sh -c "./rebar3 ct --dir apps/emqx_auth_mysql" + docker exec --env-file .env -i erlang sh -c "make apps/emqx_auth_mysql-ct" - uses: actions/upload-artifact@v1 if: failure() with: @@ -278,7 +278,7 @@ jobs: printenv > .env docker exec -i erlang sh -c "make ensure-rebar3" docker exec -i erlang sh -c "./rebar3 eunit --application=emqx_auth_pgsql" - docker exec --env-file .env -i erlang sh -c "./rebar3 ct --dir apps/emqx_auth_pgsql" + docker exec --env-file .env -i erlang sh -c "make apps/emqx_auth_pgsql-ct" - uses: actions/upload-artifact@v1 if: failure() with: @@ -399,7 +399,7 @@ jobs: printenv > .env docker exec -i erlang sh -c "make ensure-rebar3" docker exec -i erlang sh -c "./rebar3 eunit --application=emqx_auth_redis" - docker exec --env-file .env -i erlang sh -c "./rebar3 ct --dir apps/emqx_auth_redis" + docker exec --env-file .env -i erlang sh -c "make apps/emqx_auth_redis-ct" - uses: actions/upload-artifact@v1 if: failure() with: diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index f6627bf1c..d2b5fd11d 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -203,6 +203,11 @@ node.cookie = "emqxsecretcookie" ## Value: Folder node.data_dir = "{{ platform_data_dir }}" +## The config file dir for the node +## +## Value: Folder +node.etc_dir = "{{ platform_etc_dir }}" + ## Heartbeat monitoring of an Erlang runtime system. Comment the line to disable ## heartbeat, or set the value as 'on' ## @@ -457,8 +462,8 @@ log.file = emqx.log ## and Erlang process message queue inspection. ## ## Value: Integer or 'unlimited' (without quotes) -## Default: 20 -#log.max_depth = 20 +## Default: 80 +#log.max_depth = 80 ## Log formatter ## Value: text | json diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 5d48e06a3..22f31a345 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -16,7 +16,7 @@ , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.2"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v4.0.1"}}} %% todo delete when plugins use hocon - , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.7.0"}}} + , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.9.0"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}} , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.13.0"}}} @@ -28,9 +28,8 @@ {test, [{deps, [ meck - , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.6.0"}}} , {bbmustache,"1.10.0"} - , {emqx_ct_helpers, {git,"https://github.com/zmstone/emqx-ct-helpers", {branch,"hocon"}}} + , {emqx_ct_helpers, {git,"https://github.com/emqx/emqx-ct-helpers", {branch,"hocon"}}} , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3.1"}}} ]}, {extra_src_dirs, [{"test",[recursive]}]} diff --git a/apps/emqx/rebar3 b/apps/emqx/rebar3 new file mode 100755 index 000000000..edb85b3c9 Binary files /dev/null and b/apps/emqx/rebar3 differ diff --git a/apps/emqx/src/emqx_config.erl b/apps/emqx/src/emqx_config.erl index d05486328..61423f9af 100644 --- a/apps/emqx/src/emqx_config.erl +++ b/apps/emqx/src/emqx_config.erl @@ -23,6 +23,8 @@ , put/2 , deep_get/3 , deep_put/3 + , safe_atom_key_map/1 + , unsafe_atom_key_map/1 ]). -spec get() -> term(). @@ -58,3 +60,21 @@ deep_put([], Map, Config) when is_map(Map) -> deep_put([Key | KeyPath], Map, Config) -> SubMap = deep_put(KeyPath, maps:get(Key, Map, #{}), Config), Map#{Key => SubMap}. + +unsafe_atom_key_map(Map) -> + covert_keys_to_atom(Map, fun(K) -> binary_to_atom(K, utf8) end). + +safe_atom_key_map(Map) -> + covert_keys_to_atom(Map, fun(K) -> binary_to_existing_atom(K, utf8) end). + +covert_keys_to_atom(BinKeyMap, Conv) when is_map(BinKeyMap) -> + maps:fold( + fun(K, V, Acc) when is_binary(K) -> + Acc#{Conv(K) => covert_keys_to_atom(V, Conv)}; + (K, V, Acc) when is_atom(K) -> + %% richmap keys + Acc#{K => covert_keys_to_atom(V, Conv)} + end, #{}, BinKeyMap); +covert_keys_to_atom(ListV, Conv) when is_list(ListV) -> + [covert_keys_to_atom(V, Conv) || V <- ListV]; +covert_keys_to_atom(Val, _) -> Val. diff --git a/apps/emqx/src/emqx_config_handler.erl b/apps/emqx/src/emqx_config_handler.erl index 248847177..20aa0b5f8 100644 --- a/apps/emqx/src/emqx_config_handler.erl +++ b/apps/emqx/src/emqx_config_handler.erl @@ -17,13 +17,16 @@ %% And there are a top level config handler maintains the overall config map. -module(emqx_config_handler). +-include("logger.hrl"). + -behaviour(gen_server). %% API functions -export([ start_link/0 - , start_handler/3 + , add_handler/2 , update_config/2 - , child_spec/3 + , get_raw_config/0 + , merge_to_old_config/2 ]). %% emqx_config_handler callbacks @@ -38,74 +41,74 @@ terminate/2, code_change/3]). --type config() :: term(). --type config_map() :: #{atom() => config()} | [config_map()]. --type handler_name() :: module() | top. --type key_path() :: [atom()]. +-define(MOD, {mod}). + +-type update_request() :: term(). +-type raw_config() :: hocon:config() | undefined. +-type config_key() :: atom(). +-type handler_name() :: module(). +-type config_key_path() :: [atom()]. +-type handlers() :: #{config_key() => handlers(), ?MOD => handler_name()}. -optional_callbacks([handle_update_config/2]). --callback handle_update_config(config(), config_map()) -> config_map(). +-callback handle_update_config(update_request(), raw_config()) -> update_request(). --record(state, { - handler_name :: handler_name(), - parent :: handler_name(), - key_path :: key_path() -}). +-type state() :: #{ + handlers := handlers(), + raw_config := raw_config(), + atom() => term() +}. start_link() -> - start_handler(?MODULE, top, []). + gen_server:start_link({local, ?MODULE}, ?MODULE, {}, []). --spec start_handler(handler_name(), handler_name(), key_path()) -> - {ok, pid()} | {error, {already_started, pid()}} | {error, term()}. -start_handler(HandlerName, Parent, ConfKeyPath) -> - gen_server:start_link({local, HandlerName}, ?MODULE, {HandlerName, Parent, ConfKeyPath}, []). +-spec update_config(config_key_path(), update_request()) -> ok | {error, term()}. +update_config(ConfKeyPath, UpdateReq) -> + gen_server:call(?MODULE, {update_config, ConfKeyPath, UpdateReq}). --spec child_spec(module(), handler_name(), key_path()) -> supervisor:child_spec(). -child_spec(Mod, Parent, KeyPath) -> - #{id => Mod, - start => {?MODULE, start_handler, [Mod, Parent, KeyPath]}, - restart => permanent, - type => worker, - modules => [?MODULE]}. +-spec add_handler(config_key_path(), handler_name()) -> ok. +add_handler(ConfKeyPath, HandlerName) -> + gen_server:call(?MODULE, {add_child, ConfKeyPath, HandlerName}). --spec update_config(handler_name(), config()) -> ok. -update_config(top, Config) -> - emqx_config:put(Config), - save_config_to_disk(Config); -update_config(Handler, Config) -> - case is_process_alive(whereis(Handler)) of - true -> gen_server:cast(Handler, {handle_update_config, Config}); - false -> error({not_alive, Handler}) - end. +-spec get_raw_config() -> raw_config(). +get_raw_config() -> + gen_server:call(?MODULE, get_raw_config). %%============================================================================ -%% callbacks of emqx_config_handler (the top-level handler) -handle_update_config(Config, undefined) -> - handle_update_config(Config, #{}); -handle_update_config(Config, OldConfigMap) -> - %% simply merge the config to the overall config - maps:merge(OldConfigMap, Config). -init({HandlerName, Parent, ConfKeyPath}) -> - {ok, #state{handler_name = HandlerName, parent = Parent, key_path = ConfKeyPath}}. +-spec init(term()) -> {ok, state()}. +init(_) -> + {ok, RawConf} = hocon:load(emqx_conf_name(), #{format => richmap}), + {_MappedEnvs, Conf} = hocon_schema:map_translate(emqx_schema, RawConf, #{}), + ok = save_config_to_emqx_config(Conf), + {ok, #{raw_config => hocon_schema:richmap_to_map(RawConf), + handlers => #{?MOD => ?MODULE}}}. + +handle_call(get_raw_config, _From, State = #{raw_config := RawConf}) -> + {reply, RawConf, State}; + +handle_call({add_child, ConfKeyPath, HandlerName}, _From, + State = #{handlers := Handlers}) -> + {reply, ok, State#{handlers => + emqx_config:deep_put(ConfKeyPath, Handlers, #{?MOD => HandlerName})}}; + +handle_call({update_config, ConfKeyPath, UpdateReq}, _From, + #{raw_config := RawConf, handlers := Handlers} = State) -> + try {RootKeys, Conf} = do_update_config(ConfKeyPath, Handlers, RawConf, UpdateReq), + {reply, save_configs(RootKeys, Conf), State#{raw_config => Conf}} + catch + throw: Reason -> + {reply, {error, Reason}, State}; + Error : Reason : ST -> + ?LOG(error, "update config failed: ~p", [{Error, Reason, ST}]), + {reply, {error, Reason}, State} + end; handle_call(_Request, _From, State) -> Reply = ok, {reply, Reply, State}. -handle_cast({handle_update_config, Config}, #state{handler_name = HandlerName, - parent = Parent, key_path = ConfKeyPath} = State) -> - %% accumulate the config map of this config handler - OldConfigMap = emqx_config:get(ConfKeyPath, undefined), - SubConfigMap = case erlang:function_exported(HandlerName, handle_update_config, 2) of - true -> HandlerName:handle_update_config(Config, OldConfigMap); - false -> wrap_sub_config(ConfKeyPath, Config) - end, - %% then notify the parent handler - update_config(Parent, SubConfigMap), - {noreply, State}; - handle_cast(_Msg, State) -> {noreply, State}. @@ -118,17 +121,100 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%============================================================================ -save_config_to_disk(ConfigMap) -> - Filename = filename:join([emqx_data_dir(), "emqx_override.conf"]), - case file:write_file(Filename, jsx:encode(ConfigMap)) of - ok -> ok; - {error, Reason} -> logger:error("write to ~s failed, ~p", [Filename, Reason]) +do_update_config([], Handlers, OldConf, UpdateReq) -> + call_handle_update_config(Handlers, OldConf, UpdateReq); +do_update_config([ConfKey | ConfKeyPath], Handlers, OldConf, UpdateReq) -> + SubOldConf = get_sub_config(ConfKey, OldConf), + case maps:find(ConfKey, Handlers) of + error -> throw({handler_not_found, ConfKey}); + {ok, SubHandlers} -> + NewUpdateReq = do_update_config(ConfKeyPath, SubHandlers, SubOldConf, UpdateReq), + call_handle_update_config(Handlers, OldConf, #{bin(ConfKey) => NewUpdateReq}) end. -emqx_data_dir() -> - %emqx_config:get([node, data_dir]) - "data". +get_sub_config(_, undefined) -> + undefined; +get_sub_config(ConfKey, OldConf) when is_map(OldConf) -> + maps:get(bin(ConfKey), OldConf, undefined); +get_sub_config(_, OldConf) -> + OldConf. -wrap_sub_config(ConfKeyPath, Config) -> - emqx_config:deep_put(ConfKeyPath, #{}, Config). +call_handle_update_config(Handlers, OldConf, UpdateReq) -> + HandlerName = maps:get(?MOD, Handlers, undefined), + case erlang:function_exported(HandlerName, handle_update_config, 2) of + true -> HandlerName:handle_update_config(UpdateReq, OldConf); + false -> UpdateReq %% the default behaviour is overwriting the old config + end. + +%% callbacks for the top-level handler +handle_update_config(UpdateReq, OldConf) -> + FullRawConf = merge_to_old_config(UpdateReq, OldConf), + {maps:keys(UpdateReq), FullRawConf}. + +%% default callback of config handlers +merge_to_old_config(UpdateReq, undefined) -> + merge_to_old_config(UpdateReq, #{}); +merge_to_old_config(UpdateReq, RawConf) -> + maps:merge(RawConf, UpdateReq). + +%%============================================================================ +save_configs(RootKeys, Conf0) -> + {_MappedEnvs, Conf1} = hocon_schema:map_translate(emqx_schema, to_richmap(Conf0), #{}), + %save_config_to_app_env(MappedEnvs), + save_config_to_emqx_config(hocon_schema:richmap_to_map(Conf1)), + save_config_to_disk(RootKeys, Conf0). + +%% We may need also support hot config update for the apps that use application envs. +%% If so uncomment the following lines to update the configs to application env +% save_config_to_app_env(MappedEnvs) -> +% lists:foreach(fun({AppName, Envs}) -> +% [application:set_env(AppName, Par, Val) || {Par, Val} <- Envs] +% end, MappedEnvs). + +save_config_to_emqx_config(Conf) -> + emqx_config:put(emqx_config:unsafe_atom_key_map(Conf)). + +save_config_to_disk(RootKeys, Conf) -> + FileName = emqx_override_conf_name(), + OldConf = read_old_config(FileName), + %% We don't save the overall config to file, but only the sub configs + %% under RootKeys + write_new_config(FileName, + maps:merge(OldConf, maps:with(RootKeys, Conf))). + +write_new_config(FileName, Conf) -> + case file:write_file(FileName, jsx:prettify(jsx:encode(Conf))) of + ok -> ok; + {error, Reason} -> + logger:error("write to ~s failed, ~p", [FileName, Reason]), + {error, Reason} + end. + +read_old_config(FileName) -> + case file:read_file(FileName) of + {ok, Text} -> + try jsx:decode(Text, [{return_maps, true}]) of + Conf when is_map(Conf) -> Conf; + _ -> #{} + catch _Err : _Reason -> + #{} + end; + _ -> #{} + end. + +emqx_conf_name() -> + filename:join([etc_dir(), "emqx.conf"]). + +emqx_override_conf_name() -> + filename:join([emqx:get_env(data_dir), "emqx_override.conf"]). + +etc_dir() -> + emqx:get_env(etc_dir). + +to_richmap(Map) -> + {ok, RichMap} = hocon:binary(jsx:encode(Map), #{format => richmap}), + RichMap. + +bin(A) when is_atom(A) -> list_to_binary(atom_to_list(A)); +bin(B) when is_binary(B) -> B; +bin(S) when is_list(S) -> list_to_binary(S). diff --git a/apps/emqx/src/emqx_plugins.erl b/apps/emqx/src/emqx_plugins.erl index bda537f47..2f39edcb4 100644 --- a/apps/emqx/src/emqx_plugins.erl +++ b/apps/emqx/src/emqx_plugins.erl @@ -30,8 +30,6 @@ , reload/1 , list/0 , find_plugin/1 - , generate_configs/1 - , apply_configs/1 ]). -export([funlog/2]). @@ -173,15 +171,7 @@ load_ext_plugin(PluginDir) -> ?LOG(alert, "plugin_app_file_not_found: ~s", [AppFile]), error({plugin_app_file_not_found, AppFile}) end, - ok = load_plugin_app(AppName, Ebin), - try - ok = generate_configs(AppName, PluginDir) - catch - throw : {conf_file_not_found, ConfFile} -> - %% this is maybe a dependency of an external plugin - ?LOG(debug, "config_load_error_ignored for app=~p, path=~s", [AppName, ConfFile]), - ok - end. + load_plugin_app(AppName, Ebin). load_plugin_app(AppName, Ebin) -> _ = code:add_patha(Ebin), @@ -246,7 +236,6 @@ plugin(AppName, Type) -> load_plugin(Name, Persistent) -> try - ok = ?MODULE:generate_configs(Name), case load_app(Name) of ok -> start_app(Name, fun(App) -> plugin_loaded(App, Persistent) end); @@ -362,77 +351,5 @@ plugin_type(backend) -> backend; plugin_type(bridge) -> bridge; plugin_type(_) -> feature. - funlog(Key, Value) -> ?LOG(info, "~s = ~p", [string:join(Key, "."), Value]). - -generate_configs(App) -> - PluginConfDir = emqx:get_env(plugins_etc_dir), - PluginSchemaDir = code:priv_dir(App), - generate_configs(App, PluginConfDir, PluginSchemaDir). - -generate_configs(App, PluginDir) -> - PluginConfDir = filename:join([PluginDir, "etc"]), - PluginSchemaDir = filename:join([PluginDir, "priv"]), - generate_configs(App, PluginConfDir, PluginSchemaDir). - -generate_configs(App, PluginConfDir, PluginSchemaDir) -> - ConfigFile = filename:join([PluginConfDir, App]) ++ ".config", - case filelib:is_file(ConfigFile) of - true -> - {ok, [Configs]} = file:consult(ConfigFile), - apply_configs(Configs); - false -> - SchemaFile = filename:join([PluginSchemaDir, App]) ++ ".schema", - case filelib:is_file(SchemaFile) of - true -> - AppsEnv = do_generate_configs(App), - apply_configs(AppsEnv); - false -> - SchemaMod = lists:concat([App, "_schema"]), - ConfName = filename:join([PluginConfDir, App]) ++ ".conf", - SchemaFile1 = filename:join([code:lib_dir(App), "ebin", SchemaMod]) ++ ".beam", - do_generate_hocon_configs(App, ConfName, SchemaFile1) - end - end. - -do_generate_configs(App) -> - Name1 = filename:join([emqx:get_env(plugins_etc_dir), App]) ++ ".conf", - Name2 = filename:join([code:lib_dir(App), "etc", App]) ++ ".conf", - ConfFile = case {filelib:is_file(Name1), filelib:is_file(Name2)} of - {true, _} -> Name1; - {false, true} -> Name2; - {false, false} -> error({config_not_found, [Name1, Name2]}) - end, - SchemaFile = filename:join([code:priv_dir(App), App]) ++ ".schema", - case filelib:is_file(SchemaFile) of - true -> - Schema = cuttlefish_schema:files([SchemaFile]), - Conf = cuttlefish_conf:file(ConfFile), - cuttlefish_generator:map(Schema, Conf, undefined, fun ?MODULE:funlog/2); - false -> - error({schema_not_found, SchemaFile}) - end. - -do_generate_hocon_configs(App, ConfName, SchemaFile) -> - SchemaMod = lists:concat([App, "_schema"]), - case {filelib:is_file(ConfName), filelib:is_file(SchemaFile)} of - {true, true} -> - {ok, RawConfig} = hocon:load(ConfName, #{format => richmap}), - Config = hocon_schema:check(list_to_atom(SchemaMod), RawConfig, #{atom_key => true, - return_plain => true}), - emqx_config_handler:update_config(emqx_config_handler, Config); - {true, false} -> - error({schema_not_found, [SchemaFile]}); - {false, true} -> - error({config_not_found, [ConfName]}); - {false, false} -> - error({conf_and_schema_not_found, [ConfName, SchemaFile]}) - end. - -apply_configs([]) -> - ok; -apply_configs([{App, Config} | More]) -> - lists:foreach(fun({Key, _}) -> application:unset_env(App, Key) end, application:get_all_env(App)), - lists:foreach(fun({Key, Val}) -> application:set_env(App, Key, Val) end, Config), - apply_configs(More). diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index cb631ea22..d090f26f1 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -50,9 +50,22 @@ -export([conf_get/2, conf_get/3, keys/2, filter/1]). -export([ssl/2, tr_ssl/2, tr_password_hash/2]). +%% will be used by emqx_ct_helper to find the dependent apps +-export([includes/0]). + structs() -> ["cluster", "node", "rpc", "log", "lager", "acl", "mqtt", "zone", "listener", "module", "broker", - "plugins", "sysmon", "os_mon", "vm_mon", "alarm", "telemetry"]. + "plugins", "sysmon", "os_mon", "vm_mon", "alarm", "telemetry"] + ++ includes(). + +-ifdef(TEST). +includes() ->[]. +-else. +includes() -> + [ "emqx_data_bridge" + , "emqx_telemetry" + ]. +-endif. fields("cluster") -> [ {"name", t(atom(), "ekka.cluster_name", emqxcl)} @@ -119,6 +132,7 @@ fields("node") -> override_env => "EMQX_NODE_COOKIE" })} , {"data_dir", t(string(), "emqx.data_dir", undefined)} + , {"etc_dir", t(string(), "emqx.etc_dir", undefined)} , {"heartbeat", t(flag(), undefined, false)} , {"async_threads", t(range(1, 1024), "vm_args.+A", undefined)} , {"process_limit", t(integer(), "vm_args.+P", undefined)} @@ -163,7 +177,7 @@ fields("log") -> , {"chars_limit", t(integer(), undefined, -1)} , {"supervisor_reports", t(union([error, progress]), undefined, error)} , {"max_depth", t(union([infinity, integer()]), - "kernel.error_logger_format_depth", 20)} + "kernel.error_logger_format_depth", 80)} , {"formatter", t(union([text, json]), undefined, text)} , {"single_line", t(boolean(), undefined, true)} , {"rotation", ref("rotation")} @@ -467,8 +481,11 @@ fields("telemetry") -> [ {"enabled", t(boolean(), undefined, false)} , {"url", t(string(), undefined, "https://telemetry-emqx-io.bigpar.vercel.app/api/telemetry")} , {"report_interval", t(duration_s(), undefined, "7d")} - ]. + ]; +fields(ExtraField) -> + Mod = list_to_atom(ExtraField), + Mod:fields(ExtraField). translations() -> ["ekka", "vm_args", "gen_rpc", "kernel", "emqx"]. diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge.erl b/apps/emqx_data_bridge/src/emqx_data_bridge.erl index 2c5b52d8e..fe2900b7f 100644 --- a/apps/emqx_data_bridge/src/emqx_data_bridge.erl +++ b/apps/emqx_data_bridge/src/emqx_data_bridge.erl @@ -22,14 +22,44 @@ , resource_id_to_name/1 , list_bridges/0 , is_bridge/1 + , config_key_path/0 + , update_config/1 ]). +-export([structs/0, fields/1]). + +%%====================================================================================== +%% Hocon Schema Definitions + +-define(BRIDGE_FIELDS(T), + [{name, hoconsc:t(typerefl:binary())}, + {type, hoconsc:t(typerefl:atom(T))}, + {config, hoconsc:t(hoconsc:ref(list_to_atom("emqx_connector_"++atom_to_list(T)), ""))}]). + +-define(TYPES, [mysql, pgsql, mongo, redis, ldap]). +-define(BRIDGES, [hoconsc:ref(?MODULE, T) || T <- ?TYPES]). + +structs() -> ["emqx_data_bridge"]. + +fields("emqx_data_bridge") -> + [{bridges, #{type => hoconsc:array(hoconsc:union(?BRIDGES)), + default => []}}]; + +fields(mysql) -> ?BRIDGE_FIELDS(mysql); +fields(pgsql) -> ?BRIDGE_FIELDS(pgsql); +fields(mongo) -> ?BRIDGE_FIELDS(mongo); +fields(redis) -> ?BRIDGE_FIELDS(redis); +fields(ldap) -> ?BRIDGE_FIELDS(ldap). + +%%====================================================================================== + load_bridges() -> - ConfFile = filename:join([emqx:get_env(plugins_etc_dir), ?MODULE]) ++ ".conf", - {ok, RawConfig} = hocon:load(ConfFile, #{format => richmap}), - #{emqx_data_bridge := #{bridges := Bridges}} = - hocon_schema:check(emqx_data_bridge_schema, RawConfig, - #{atom_key => true, return_plain => true}), + % ConfFile = filename:join([emqx:get_env(plugins_etc_dir), ?MODULE]) ++ ".conf", + % {ok, RawConfig} = hocon:load(ConfFile, #{format => richmap}), + % #{emqx_data_bridge := #{bridges := Bridges}} = + % hocon_schema:check(emqx_data_bridge_schema, RawConfig, + % #{atom_key => true, return_plain => true}), + Bridges = emqx_config:get([emqx_data_bridge, bridges], []), emqx_data_bridge_monitor:ensure_all_started(Bridges). resource_type(mysql) -> emqx_connector_mysql; @@ -57,3 +87,9 @@ is_bridge(#{id := <<"bridge:", _/binary>>}) -> true; is_bridge(_Data) -> false. + +config_key_path() -> + [emqx_data_bridge, bridges]. + +update_config(ConfigReq) -> + emqx_config_handler:update_config(config_key_path(), ConfigReq). diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge_api.erl b/apps/emqx_data_bridge/src/emqx_data_bridge_api.erl index c0992fc46..7b1b4981d 100644 --- a/apps/emqx_data_bridge/src/emqx_data_bridge_api.erl +++ b/apps/emqx_data_bridge/src/emqx_data_bridge_api.erl @@ -57,6 +57,8 @@ , delete_bridge/2 ]). +-define(BRIDGE(N, T, C), #{<<"name">> => N, <<"type">> => T, <<"config">> => C}). + list_bridges(_Binding, _Params) -> {200, #{code => 0, data => [format_api_reply(Data) || Data <- emqx_data_bridge:list_bridges()]}}. @@ -74,10 +76,9 @@ create_bridge(#{name := Name}, Params) -> BridgeType = proplists:get_value(<<"type">>, Params), case emqx_resource:check_and_create( emqx_data_bridge:name_to_resource_id(Name), - emqx_data_bridge:resource_type(BridgeType), Config) of + emqx_data_bridge:resource_type(atom(BridgeType)), maps:from_list(Config)) of {ok, Data} -> - update_config(), - {200, #{code => 0, data => format_api_reply(emqx_resource_api:format_data(Data))}}; + update_config_and_reply(Name, BridgeType, Config, Data); {error, already_created} -> {400, #{code => 102, message => <<"bridge already created: ", Name/binary>>}}; {error, Reason0} -> @@ -91,10 +92,9 @@ update_bridge(#{name := Name}, Params) -> BridgeType = proplists:get_value(<<"type">>, Params), case emqx_resource:check_and_update( emqx_data_bridge:name_to_resource_id(Name), - emqx_data_bridge:resource_type(BridgeType), Config, []) of + emqx_data_bridge:resource_type(atom(BridgeType)), maps:from_list(Config), []) of {ok, Data} -> - update_config(), - {200, #{code => 0, data => format_api_reply(emqx_resource_api:format_data(Data))}}; + update_config_and_reply(Name, BridgeType, Config, Data); {error, not_found} -> {400, #{code => 102, message => <<"bridge not_found: ", Name/binary>>}}; {error, Reason0} -> @@ -105,9 +105,7 @@ update_bridge(#{name := Name}, Params) -> delete_bridge(#{name := Name}, _Params) -> case emqx_resource:remove(emqx_data_bridge:name_to_resource_id(Name)) of - ok -> - update_config(), - {200, #{code => 0, data => #{}}}; + ok -> delete_config_and_reply(Name); {error, Reason} -> {500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}} end. @@ -117,12 +115,28 @@ format_api_reply(#{resource_type := Type, id := Id, config := Conf, status := St name => emqx_data_bridge:resource_id_to_name(Id), config => Conf, status => Status}. -format_conf(#{resource_type := Type, id := Id, config := Conf}) -> - #{type => Type, name => emqx_data_bridge:resource_id_to_name(Id), - config => Conf}. +% format_conf(#{resource_type := Type, id := Id, config := Conf}) -> +% #{type => Type, name => emqx_data_bridge:resource_id_to_name(Id), +% config => Conf}. -get_all_configs() -> - [format_conf(Data) || Data <- emqx_data_bridge:list_bridges()]. +% get_all_configs() -> +% [format_conf(Data) || Data <- emqx_data_bridge:list_bridges()]. -update_config() -> - emqx_config_handler:update_config(emqx_data_bridge_config_handler, get_all_configs()). +update_config_and_reply(Name, BridgeType, Config, Data) -> + case emqx_data_bridge:update_config({update, ?BRIDGE(Name, BridgeType, Config)}) of + ok -> + {200, #{code => 0, data => format_api_reply( + emqx_resource_api:format_data(Data))}}; + {error, Reason} -> + {500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}} + end. + +delete_config_and_reply(Name) -> + case emqx_data_bridge:update_config({delete, Name}) of + ok -> {200, #{code => 0, data => #{}}}; + {error, Reason} -> + {500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}} + end. + +atom(B) when is_binary(B) -> + list_to_existing_atom(binary_to_list(B)). diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge_app.erl b/apps/emqx_data_bridge/src/emqx_data_bridge_app.erl index 90ab8056f..967791643 100644 --- a/apps/emqx_data_bridge/src/emqx_data_bridge_app.erl +++ b/apps/emqx_data_bridge/src/emqx_data_bridge_app.erl @@ -17,14 +17,26 @@ -behaviour(application). --export([start/2, stop/1]). +-behaviour(emqx_config_handler). + +-export([start/2, stop/1, handle_update_config/2]). start(_StartType, _StartArgs) -> {ok, Sup} = emqx_data_bridge_sup:start_link(), ok = emqx_data_bridge:load_bridges(), + emqx_config_handler:add_handler(emqx_data_bridge:config_key_path(), ?MODULE), {ok, Sup}. stop(_State) -> ok. %% internal functions +handle_update_config({update, Bridge = #{<<"name">> := Name}}, OldConf) -> + [Bridge | remove_bridge(Name, OldConf)]; +handle_update_config({delete, Name}, OldConf) -> + remove_bridge(Name, OldConf). + +remove_bridge(_Name, undefined) -> + []; +remove_bridge(Name, OldConf) -> + [B || B = #{<<"name">> := Name0} <- OldConf, Name0 =/= Name]. diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge_schema.erl b/apps/emqx_data_bridge/src/emqx_data_bridge_schema.erl deleted file mode 100644 index 2c974029c..000000000 --- a/apps/emqx_data_bridge/src/emqx_data_bridge_schema.erl +++ /dev/null @@ -1,23 +0,0 @@ --module(emqx_data_bridge_schema). - --export([structs/0, fields/1]). - --define(BRIDGE_FIELDS(T), - [{name, hoconsc:t(typerefl:binary())}, - {type, hoconsc:t(typerefl:atom(T))}, - {config, hoconsc:t(hoconsc:ref(list_to_atom("emqx_connector_"++atom_to_list(T)), ""))}]). - --define(TYPES, [mysql, pgsql, mongo, redis, ldap]). --define(BRIDGES, [hoconsc:ref(T) || T <- ?TYPES]). - -structs() -> [emqx_data_bridge]. - -fields(emqx_data_bridge) -> - [{bridges, #{type => hoconsc:array(hoconsc:union(?BRIDGES)), - default => []}}]; - -fields(mysql) -> ?BRIDGE_FIELDS(mysql); -fields(pgsql) -> ?BRIDGE_FIELDS(pgsql); -fields(mongo) -> ?BRIDGE_FIELDS(mongo); -fields(redis) -> ?BRIDGE_FIELDS(redis); -fields(ldap) -> ?BRIDGE_FIELDS(ldap). diff --git a/apps/emqx_data_bridge/src/emqx_data_bridge_sup.erl b/apps/emqx_data_bridge/src/emqx_data_bridge_sup.erl index 3ac1af7b2..a699a72a0 100644 --- a/apps/emqx_data_bridge/src/emqx_data_bridge_sup.erl +++ b/apps/emqx_data_bridge/src/emqx_data_bridge_sup.erl @@ -35,13 +35,8 @@ init([]) -> start => {emqx_data_bridge_monitor, start_link, []}, restart => permanent, type => worker, - modules => [emqx_data_bridge_monitor]}, - emqx_config_handler:child_spec(emqx_data_bridge_config_handler, emqx_config_handler, - config_key_path()) + modules => [emqx_data_bridge_monitor]} ], {ok, {SupFlags, ChildSpecs}}. %% internal functions - -config_key_path() -> - [emqx_data_bridge, bridges]. diff --git a/apps/emqx_management/src/emqx_mgmt_http.erl b/apps/emqx_management/src/emqx_mgmt_http.erl index b902e486e..4b927a982 100644 --- a/apps/emqx_management/src/emqx_mgmt_http.erl +++ b/apps/emqx_management/src/emqx_mgmt_http.erl @@ -84,7 +84,7 @@ listener_name(Proto) -> list_to_atom(atom_to_list(Proto) ++ ":management"). http_handlers() -> - Apps = [ App || {App, _, _} <- application:which_applications(), + Apps = [ App || {App, _, _} <- application:loaded_applications(), case re:run(atom_to_list(App), "^emqx") of {match,[{0,4}]} -> true; _ -> false diff --git a/apps/emqx_resource/examples/demo.md b/apps/emqx_resource/examples/demo.md index 76b9dee19..0a15be7c5 100644 --- a/apps/emqx_resource/examples/demo.md +++ b/apps/emqx_resource/examples/demo.md @@ -77,7 +77,7 @@ emqx_resource:query(ResourceID, {sql, SQL}). <<"keyfile">> => [],<<"password">> => "public", <<"pool_size">> => 1, <<"server">> => {{127,0,0,1},3306}, - <<"ssl">> => false,<<"user">> => "root", + <<"ssl">> => false,<<"username">> => "root", <<"verify">> => false}, id => <<"bridge:mysql-def">>,mod => emqx_connector_mysql, state => #{poolname => 'bridge:mysql-def'}, @@ -107,7 +107,7 @@ BridgeMySQL='{ "status": "started", "config": { "verify": false, - "user": "root", + "username": "root", "ssl": false, "server": "127.0.0.1:3306", "pool_size": 1, @@ -135,7 +135,7 @@ BridgeMySQL='{ "status": "started", "config": { "verify": false, - "user": "root", + "username": "root", "ssl": false, "server": "127.0.0.1:3306", "pool_size": 2, diff --git a/apps/emqx_telemetry/src/emqx_telemetry.erl b/apps/emqx_telemetry/src/emqx_telemetry.erl index ea4017dc9..dba22e85b 100644 --- a/apps/emqx_telemetry/src/emqx_telemetry.erl +++ b/apps/emqx_telemetry/src/emqx_telemetry.erl @@ -16,6 +16,10 @@ -module(emqx_telemetry). +-include_lib("typerefl/include/types.hrl"). + +-behaviour(hocon_schema). + -behaviour(gen_server). -include_lib("emqx/include/emqx.hrl"). @@ -36,6 +40,9 @@ , stop/0 ]). +-export([ structs/0 + , fields/1]). + %% gen_server callbacks -export([ init/1 , handle_call/3 @@ -90,6 +97,12 @@ -define(TELEMETRY, emqx_telemetry). +%%-------------------------------------------------------------------- +structs() -> ["emqx_telemetry"]. + +fields("emqx_telemetry") -> + [{enabled, emqx_schema:t(boolean(), undefined, false)}]. + %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- diff --git a/apps/emqx_telemetry/src/emqx_telemetry_app.erl b/apps/emqx_telemetry/src/emqx_telemetry_app.erl index 8a5b34627..89a30393b 100644 --- a/apps/emqx_telemetry/src/emqx_telemetry_app.erl +++ b/apps/emqx_telemetry/src/emqx_telemetry_app.erl @@ -25,13 +25,6 @@ ]). start(_Type, _Args) -> - %% TODO - %% After the relevant code for building hocon configuration will be deleted - %% Get the configuration using emqx_config:get - ConfFile = filename:join([emqx:get_env(plugins_etc_dir), ?APP]) ++ ".conf", - {ok, RawConfig} = hocon:load(ConfFile), - Config = hocon_schema:check_plain(emqx_telemetry_schema, RawConfig, #{atom_key => true}), - emqx_config_handler:update_config(emqx_config_handler, Config), Enabled = emqx_config:get([?APP, enabled], true), emqx_telemetry_sup:start_link([{enabled, Enabled}]). diff --git a/apps/emqx_telemetry/src/emqx_telemetry_schema.erl b/apps/emqx_telemetry/src/emqx_telemetry_schema.erl deleted file mode 100644 index 1a60ac946..000000000 --- a/apps/emqx_telemetry/src/emqx_telemetry_schema.erl +++ /dev/null @@ -1,13 +0,0 @@ --module(emqx_telemetry_schema). - --include_lib("typerefl/include/types.hrl"). - --behaviour(hocon_schema). - --export([ structs/0 - , fields/1]). - -structs() -> ["emqx_telemetry"]. - -fields("emqx_telemetry") -> - [{enabled, emqx_schema:t(boolean(), undefined, false)}]. diff --git a/rebar.config b/rebar.config index d2288c5ce..915931709 100644 --- a/rebar.config +++ b/rebar.config @@ -53,7 +53,7 @@ , {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1 , {getopt, "1.0.1"} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.13.0"}}} - , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.7.0"}}} + , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.9.0"}}} , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.2.1"}}} ]}. diff --git a/rebar.config.erl b/rebar.config.erl index 56987485e..8ce3aa217 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -104,7 +104,8 @@ test_plugins() -> test_deps() -> [ {bbmustache, "1.10.0"} - , {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "2.0.0"}}} + %, {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "2.0.0"}}} + , {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {branch, "hocon"}}} , meck ].