The config handler phase2 (#5052)

* refator(config_handler): handle and validate the updates to raw_configs

* fix(hocon): update hocon to 0.8.0

* refactor(config_handler): check and apply envs only in top-level handler

* refactor(config_handler): update config from top level to bottom level

* refactor(emqx_data_bridge): move configs to emqx.conf

* fix(emqx_schema): remove the extra config path

* fix(config_handler): load the emqx.conf when starting emqx_config_handler

* fix(data_bridge): API not working

* feat(config_handler): save updated configs to emqx_override.conf

* fix(config_handler): cannot find the emqx.conf and emqx_override.conf

* fix(emqx_config): cannot find the correct path for etc dir

* fix(test): load load emqx_schema foreign refereced apps

* refactor(emqx_plugin): do not generate configs before load plugins

All configs (including the configs for plugins) now should go into
the `emqx.conf`.

* fix(tests): update the test cases for plugins

* fix(tests): don't include schema from apps when testing

* fix(tests): use emqx-ct-helper branch hocon
This commit is contained in:
Shawn 2021-06-25 11:47:18 +08:00 committed by zhanghongtong
parent 44412549ab
commit 36c7785fd0
20 changed files with 310 additions and 238 deletions

View File

@ -50,7 +50,7 @@ jobs:
printenv > .env
docker exec -i erlang sh -c "make ensure-rebar3"
docker exec -i erlang sh -c "./rebar3 eunit --application=emqx_auth_ldap"
docker exec --env-file .env -i erlang sh -c "./rebar3 ct --dir apps/emqx_auth_ldap"
docker exec --env-file .env -i erlang sh -c "make apps/emqx_auth_ldap-ct"
- uses: actions/upload-artifact@v1
if: failure()
with:
@ -120,7 +120,7 @@ jobs:
printenv > .env
docker exec -i erlang sh -c "make ensure-rebar3"
docker exec -i erlang sh -c "./rebar3 eunit --application=emqx_auth_mongo"
docker exec --env-file .env -i erlang sh -c "./rebar3 ct --dir apps/emqx_auth_mongo"
docker exec --env-file .env -i erlang sh -c "make apps/emqx_auth_mongo-ct"
- uses: actions/upload-artifact@v1
if: failure()
with:
@ -203,7 +203,7 @@ jobs:
printenv > .env
docker exec -i erlang sh -c "make ensure-rebar3"
docker exec -i erlang sh -c "./rebar3 eunit --application=emqx_auth_mysql"
docker exec --env-file .env -i erlang sh -c "./rebar3 ct --dir apps/emqx_auth_mysql"
docker exec --env-file .env -i erlang sh -c "make apps/emqx_auth_mysql-ct"
- uses: actions/upload-artifact@v1
if: failure()
with:
@ -278,7 +278,7 @@ jobs:
printenv > .env
docker exec -i erlang sh -c "make ensure-rebar3"
docker exec -i erlang sh -c "./rebar3 eunit --application=emqx_auth_pgsql"
docker exec --env-file .env -i erlang sh -c "./rebar3 ct --dir apps/emqx_auth_pgsql"
docker exec --env-file .env -i erlang sh -c "make apps/emqx_auth_pgsql-ct"
- uses: actions/upload-artifact@v1
if: failure()
with:
@ -399,7 +399,7 @@ jobs:
printenv > .env
docker exec -i erlang sh -c "make ensure-rebar3"
docker exec -i erlang sh -c "./rebar3 eunit --application=emqx_auth_redis"
docker exec --env-file .env -i erlang sh -c "./rebar3 ct --dir apps/emqx_auth_redis"
docker exec --env-file .env -i erlang sh -c "make apps/emqx_auth_redis-ct"
- uses: actions/upload-artifact@v1
if: failure()
with:

View File

@ -203,6 +203,11 @@ node.cookie = "emqxsecretcookie"
## Value: Folder
node.data_dir = "{{ platform_data_dir }}"
## The config file dir for the node
##
## Value: Folder
node.etc_dir = "{{ platform_etc_dir }}"
## Heartbeat monitoring of an Erlang runtime system. Comment the line to disable
## heartbeat, or set the value as 'on'
##
@ -457,8 +462,8 @@ log.file = emqx.log
## and Erlang process message queue inspection.
##
## Value: Integer or 'unlimited' (without quotes)
## Default: 20
#log.max_depth = 20
## Default: 80
#log.max_depth = 80
## Log formatter
## Value: text | json

View File

@ -16,7 +16,7 @@
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.2"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v4.0.1"}}} %% todo delete when plugins use hocon
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.7.0"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.9.0"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}}
, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.13.0"}}}
@ -28,9 +28,8 @@
{test,
[{deps,
[ meck
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.6.0"}}}
, {bbmustache,"1.10.0"}
, {emqx_ct_helpers, {git,"https://github.com/zmstone/emqx-ct-helpers", {branch,"hocon"}}}
, {emqx_ct_helpers, {git,"https://github.com/emqx/emqx-ct-helpers", {branch,"hocon"}}}
, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3.1"}}}
]},
{extra_src_dirs, [{"test",[recursive]}]}

BIN
apps/emqx/rebar3 Executable file

Binary file not shown.

View File

@ -23,6 +23,8 @@
, put/2
, deep_get/3
, deep_put/3
, safe_atom_key_map/1
, unsafe_atom_key_map/1
]).
-spec get() -> term().
@ -58,3 +60,21 @@ deep_put([], Map, Config) when is_map(Map) ->
deep_put([Key | KeyPath], Map, Config) ->
SubMap = deep_put(KeyPath, maps:get(Key, Map, #{}), Config),
Map#{Key => SubMap}.
unsafe_atom_key_map(Map) ->
covert_keys_to_atom(Map, fun(K) -> binary_to_atom(K, utf8) end).
safe_atom_key_map(Map) ->
covert_keys_to_atom(Map, fun(K) -> binary_to_existing_atom(K, utf8) end).
covert_keys_to_atom(BinKeyMap, Conv) when is_map(BinKeyMap) ->
maps:fold(
fun(K, V, Acc) when is_binary(K) ->
Acc#{Conv(K) => covert_keys_to_atom(V, Conv)};
(K, V, Acc) when is_atom(K) ->
%% richmap keys
Acc#{K => covert_keys_to_atom(V, Conv)}
end, #{}, BinKeyMap);
covert_keys_to_atom(ListV, Conv) when is_list(ListV) ->
[covert_keys_to_atom(V, Conv) || V <- ListV];
covert_keys_to_atom(Val, _) -> Val.

View File

@ -17,13 +17,16 @@
%% And there are a top level config handler maintains the overall config map.
-module(emqx_config_handler).
-include("logger.hrl").
-behaviour(gen_server).
%% API functions
-export([ start_link/0
, start_handler/3
, add_handler/2
, update_config/2
, child_spec/3
, get_raw_config/0
, merge_to_old_config/2
]).
%% emqx_config_handler callbacks
@ -38,74 +41,74 @@
terminate/2,
code_change/3]).
-type config() :: term().
-type config_map() :: #{atom() => config()} | [config_map()].
-type handler_name() :: module() | top.
-type key_path() :: [atom()].
-define(MOD, {mod}).
-type update_request() :: term().
-type raw_config() :: hocon:config() | undefined.
-type config_key() :: atom().
-type handler_name() :: module().
-type config_key_path() :: [atom()].
-type handlers() :: #{config_key() => handlers(), ?MOD => handler_name()}.
-optional_callbacks([handle_update_config/2]).
-callback handle_update_config(config(), config_map()) -> config_map().
-callback handle_update_config(update_request(), raw_config()) -> update_request().
-record(state, {
handler_name :: handler_name(),
parent :: handler_name(),
key_path :: key_path()
}).
-type state() :: #{
handlers := handlers(),
raw_config := raw_config(),
atom() => term()
}.
start_link() ->
start_handler(?MODULE, top, []).
gen_server:start_link({local, ?MODULE}, ?MODULE, {}, []).
-spec start_handler(handler_name(), handler_name(), key_path()) ->
{ok, pid()} | {error, {already_started, pid()}} | {error, term()}.
start_handler(HandlerName, Parent, ConfKeyPath) ->
gen_server:start_link({local, HandlerName}, ?MODULE, {HandlerName, Parent, ConfKeyPath}, []).
-spec update_config(config_key_path(), update_request()) -> ok | {error, term()}.
update_config(ConfKeyPath, UpdateReq) ->
gen_server:call(?MODULE, {update_config, ConfKeyPath, UpdateReq}).
-spec child_spec(module(), handler_name(), key_path()) -> supervisor:child_spec().
child_spec(Mod, Parent, KeyPath) ->
#{id => Mod,
start => {?MODULE, start_handler, [Mod, Parent, KeyPath]},
restart => permanent,
type => worker,
modules => [?MODULE]}.
-spec add_handler(config_key_path(), handler_name()) -> ok.
add_handler(ConfKeyPath, HandlerName) ->
gen_server:call(?MODULE, {add_child, ConfKeyPath, HandlerName}).
-spec update_config(handler_name(), config()) -> ok.
update_config(top, Config) ->
emqx_config:put(Config),
save_config_to_disk(Config);
update_config(Handler, Config) ->
case is_process_alive(whereis(Handler)) of
true -> gen_server:cast(Handler, {handle_update_config, Config});
false -> error({not_alive, Handler})
end.
-spec get_raw_config() -> raw_config().
get_raw_config() ->
gen_server:call(?MODULE, get_raw_config).
%%============================================================================
%% callbacks of emqx_config_handler (the top-level handler)
handle_update_config(Config, undefined) ->
handle_update_config(Config, #{});
handle_update_config(Config, OldConfigMap) ->
%% simply merge the config to the overall config
maps:merge(OldConfigMap, Config).
init({HandlerName, Parent, ConfKeyPath}) ->
{ok, #state{handler_name = HandlerName, parent = Parent, key_path = ConfKeyPath}}.
-spec init(term()) -> {ok, state()}.
init(_) ->
{ok, RawConf} = hocon:load(emqx_conf_name(), #{format => richmap}),
{_MappedEnvs, Conf} = hocon_schema:map_translate(emqx_schema, RawConf, #{}),
ok = save_config_to_emqx_config(Conf),
{ok, #{raw_config => hocon_schema:richmap_to_map(RawConf),
handlers => #{?MOD => ?MODULE}}}.
handle_call(get_raw_config, _From, State = #{raw_config := RawConf}) ->
{reply, RawConf, State};
handle_call({add_child, ConfKeyPath, HandlerName}, _From,
State = #{handlers := Handlers}) ->
{reply, ok, State#{handlers =>
emqx_config:deep_put(ConfKeyPath, Handlers, #{?MOD => HandlerName})}};
handle_call({update_config, ConfKeyPath, UpdateReq}, _From,
#{raw_config := RawConf, handlers := Handlers} = State) ->
try {RootKeys, Conf} = do_update_config(ConfKeyPath, Handlers, RawConf, UpdateReq),
{reply, save_configs(RootKeys, Conf), State#{raw_config => Conf}}
catch
throw: Reason ->
{reply, {error, Reason}, State};
Error : Reason : ST ->
?LOG(error, "update config failed: ~p", [{Error, Reason, ST}]),
{reply, {error, Reason}, State}
end;
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
handle_cast({handle_update_config, Config}, #state{handler_name = HandlerName,
parent = Parent, key_path = ConfKeyPath} = State) ->
%% accumulate the config map of this config handler
OldConfigMap = emqx_config:get(ConfKeyPath, undefined),
SubConfigMap = case erlang:function_exported(HandlerName, handle_update_config, 2) of
true -> HandlerName:handle_update_config(Config, OldConfigMap);
false -> wrap_sub_config(ConfKeyPath, Config)
end,
%% then notify the parent handler
update_config(Parent, SubConfigMap),
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
@ -118,17 +121,100 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%============================================================================
save_config_to_disk(ConfigMap) ->
Filename = filename:join([emqx_data_dir(), "emqx_override.conf"]),
case file:write_file(Filename, jsx:encode(ConfigMap)) of
ok -> ok;
{error, Reason} -> logger:error("write to ~s failed, ~p", [Filename, Reason])
do_update_config([], Handlers, OldConf, UpdateReq) ->
call_handle_update_config(Handlers, OldConf, UpdateReq);
do_update_config([ConfKey | ConfKeyPath], Handlers, OldConf, UpdateReq) ->
SubOldConf = get_sub_config(ConfKey, OldConf),
case maps:find(ConfKey, Handlers) of
error -> throw({handler_not_found, ConfKey});
{ok, SubHandlers} ->
NewUpdateReq = do_update_config(ConfKeyPath, SubHandlers, SubOldConf, UpdateReq),
call_handle_update_config(Handlers, OldConf, #{bin(ConfKey) => NewUpdateReq})
end.
emqx_data_dir() ->
%emqx_config:get([node, data_dir])
"data".
get_sub_config(_, undefined) ->
undefined;
get_sub_config(ConfKey, OldConf) when is_map(OldConf) ->
maps:get(bin(ConfKey), OldConf, undefined);
get_sub_config(_, OldConf) ->
OldConf.
wrap_sub_config(ConfKeyPath, Config) ->
emqx_config:deep_put(ConfKeyPath, #{}, Config).
call_handle_update_config(Handlers, OldConf, UpdateReq) ->
HandlerName = maps:get(?MOD, Handlers, undefined),
case erlang:function_exported(HandlerName, handle_update_config, 2) of
true -> HandlerName:handle_update_config(UpdateReq, OldConf);
false -> UpdateReq %% the default behaviour is overwriting the old config
end.
%% callbacks for the top-level handler
handle_update_config(UpdateReq, OldConf) ->
FullRawConf = merge_to_old_config(UpdateReq, OldConf),
{maps:keys(UpdateReq), FullRawConf}.
%% default callback of config handlers
merge_to_old_config(UpdateReq, undefined) ->
merge_to_old_config(UpdateReq, #{});
merge_to_old_config(UpdateReq, RawConf) ->
maps:merge(RawConf, UpdateReq).
%%============================================================================
save_configs(RootKeys, Conf0) ->
{_MappedEnvs, Conf1} = hocon_schema:map_translate(emqx_schema, to_richmap(Conf0), #{}),
%save_config_to_app_env(MappedEnvs),
save_config_to_emqx_config(hocon_schema:richmap_to_map(Conf1)),
save_config_to_disk(RootKeys, Conf0).
%% We may need also support hot config update for the apps that use application envs.
%% If so uncomment the following lines to update the configs to application env
% save_config_to_app_env(MappedEnvs) ->
% lists:foreach(fun({AppName, Envs}) ->
% [application:set_env(AppName, Par, Val) || {Par, Val} <- Envs]
% end, MappedEnvs).
save_config_to_emqx_config(Conf) ->
emqx_config:put(emqx_config:unsafe_atom_key_map(Conf)).
save_config_to_disk(RootKeys, Conf) ->
FileName = emqx_override_conf_name(),
OldConf = read_old_config(FileName),
%% We don't save the overall config to file, but only the sub configs
%% under RootKeys
write_new_config(FileName,
maps:merge(OldConf, maps:with(RootKeys, Conf))).
write_new_config(FileName, Conf) ->
case file:write_file(FileName, jsx:prettify(jsx:encode(Conf))) of
ok -> ok;
{error, Reason} ->
logger:error("write to ~s failed, ~p", [FileName, Reason]),
{error, Reason}
end.
read_old_config(FileName) ->
case file:read_file(FileName) of
{ok, Text} ->
try jsx:decode(Text, [{return_maps, true}]) of
Conf when is_map(Conf) -> Conf;
_ -> #{}
catch _Err : _Reason ->
#{}
end;
_ -> #{}
end.
emqx_conf_name() ->
filename:join([etc_dir(), "emqx.conf"]).
emqx_override_conf_name() ->
filename:join([emqx:get_env(data_dir), "emqx_override.conf"]).
etc_dir() ->
emqx:get_env(etc_dir).
to_richmap(Map) ->
{ok, RichMap} = hocon:binary(jsx:encode(Map), #{format => richmap}),
RichMap.
bin(A) when is_atom(A) -> list_to_binary(atom_to_list(A));
bin(B) when is_binary(B) -> B;
bin(S) when is_list(S) -> list_to_binary(S).

View File

@ -30,8 +30,6 @@
, reload/1
, list/0
, find_plugin/1
, generate_configs/1
, apply_configs/1
]).
-export([funlog/2]).
@ -173,15 +171,7 @@ load_ext_plugin(PluginDir) ->
?LOG(alert, "plugin_app_file_not_found: ~s", [AppFile]),
error({plugin_app_file_not_found, AppFile})
end,
ok = load_plugin_app(AppName, Ebin),
try
ok = generate_configs(AppName, PluginDir)
catch
throw : {conf_file_not_found, ConfFile} ->
%% this is maybe a dependency of an external plugin
?LOG(debug, "config_load_error_ignored for app=~p, path=~s", [AppName, ConfFile]),
ok
end.
load_plugin_app(AppName, Ebin).
load_plugin_app(AppName, Ebin) ->
_ = code:add_patha(Ebin),
@ -246,7 +236,6 @@ plugin(AppName, Type) ->
load_plugin(Name, Persistent) ->
try
ok = ?MODULE:generate_configs(Name),
case load_app(Name) of
ok ->
start_app(Name, fun(App) -> plugin_loaded(App, Persistent) end);
@ -362,77 +351,5 @@ plugin_type(backend) -> backend;
plugin_type(bridge) -> bridge;
plugin_type(_) -> feature.
funlog(Key, Value) ->
?LOG(info, "~s = ~p", [string:join(Key, "."), Value]).
generate_configs(App) ->
PluginConfDir = emqx:get_env(plugins_etc_dir),
PluginSchemaDir = code:priv_dir(App),
generate_configs(App, PluginConfDir, PluginSchemaDir).
generate_configs(App, PluginDir) ->
PluginConfDir = filename:join([PluginDir, "etc"]),
PluginSchemaDir = filename:join([PluginDir, "priv"]),
generate_configs(App, PluginConfDir, PluginSchemaDir).
generate_configs(App, PluginConfDir, PluginSchemaDir) ->
ConfigFile = filename:join([PluginConfDir, App]) ++ ".config",
case filelib:is_file(ConfigFile) of
true ->
{ok, [Configs]} = file:consult(ConfigFile),
apply_configs(Configs);
false ->
SchemaFile = filename:join([PluginSchemaDir, App]) ++ ".schema",
case filelib:is_file(SchemaFile) of
true ->
AppsEnv = do_generate_configs(App),
apply_configs(AppsEnv);
false ->
SchemaMod = lists:concat([App, "_schema"]),
ConfName = filename:join([PluginConfDir, App]) ++ ".conf",
SchemaFile1 = filename:join([code:lib_dir(App), "ebin", SchemaMod]) ++ ".beam",
do_generate_hocon_configs(App, ConfName, SchemaFile1)
end
end.
do_generate_configs(App) ->
Name1 = filename:join([emqx:get_env(plugins_etc_dir), App]) ++ ".conf",
Name2 = filename:join([code:lib_dir(App), "etc", App]) ++ ".conf",
ConfFile = case {filelib:is_file(Name1), filelib:is_file(Name2)} of
{true, _} -> Name1;
{false, true} -> Name2;
{false, false} -> error({config_not_found, [Name1, Name2]})
end,
SchemaFile = filename:join([code:priv_dir(App), App]) ++ ".schema",
case filelib:is_file(SchemaFile) of
true ->
Schema = cuttlefish_schema:files([SchemaFile]),
Conf = cuttlefish_conf:file(ConfFile),
cuttlefish_generator:map(Schema, Conf, undefined, fun ?MODULE:funlog/2);
false ->
error({schema_not_found, SchemaFile})
end.
do_generate_hocon_configs(App, ConfName, SchemaFile) ->
SchemaMod = lists:concat([App, "_schema"]),
case {filelib:is_file(ConfName), filelib:is_file(SchemaFile)} of
{true, true} ->
{ok, RawConfig} = hocon:load(ConfName, #{format => richmap}),
Config = hocon_schema:check(list_to_atom(SchemaMod), RawConfig, #{atom_key => true,
return_plain => true}),
emqx_config_handler:update_config(emqx_config_handler, Config);
{true, false} ->
error({schema_not_found, [SchemaFile]});
{false, true} ->
error({config_not_found, [ConfName]});
{false, false} ->
error({conf_and_schema_not_found, [ConfName, SchemaFile]})
end.
apply_configs([]) ->
ok;
apply_configs([{App, Config} | More]) ->
lists:foreach(fun({Key, _}) -> application:unset_env(App, Key) end, application:get_all_env(App)),
lists:foreach(fun({Key, Val}) -> application:set_env(App, Key, Val) end, Config),
apply_configs(More).

View File

@ -50,9 +50,22 @@
-export([conf_get/2, conf_get/3, keys/2, filter/1]).
-export([ssl/2, tr_ssl/2, tr_password_hash/2]).
%% will be used by emqx_ct_helper to find the dependent apps
-export([includes/0]).
structs() -> ["cluster", "node", "rpc", "log", "lager",
"acl", "mqtt", "zone", "listener", "module", "broker",
"plugins", "sysmon", "os_mon", "vm_mon", "alarm", "telemetry"].
"plugins", "sysmon", "os_mon", "vm_mon", "alarm", "telemetry"]
++ includes().
-ifdef(TEST).
includes() ->[].
-else.
includes() ->
[ "emqx_data_bridge"
, "emqx_telemetry"
].
-endif.
fields("cluster") ->
[ {"name", t(atom(), "ekka.cluster_name", emqxcl)}
@ -119,6 +132,7 @@ fields("node") ->
override_env => "EMQX_NODE_COOKIE"
})}
, {"data_dir", t(string(), "emqx.data_dir", undefined)}
, {"etc_dir", t(string(), "emqx.etc_dir", undefined)}
, {"heartbeat", t(flag(), undefined, false)}
, {"async_threads", t(range(1, 1024), "vm_args.+A", undefined)}
, {"process_limit", t(integer(), "vm_args.+P", undefined)}
@ -163,7 +177,7 @@ fields("log") ->
, {"chars_limit", t(integer(), undefined, -1)}
, {"supervisor_reports", t(union([error, progress]), undefined, error)}
, {"max_depth", t(union([infinity, integer()]),
"kernel.error_logger_format_depth", 20)}
"kernel.error_logger_format_depth", 80)}
, {"formatter", t(union([text, json]), undefined, text)}
, {"single_line", t(boolean(), undefined, true)}
, {"rotation", ref("rotation")}
@ -467,8 +481,11 @@ fields("telemetry") ->
[ {"enabled", t(boolean(), undefined, false)}
, {"url", t(string(), undefined, "https://telemetry-emqx-io.bigpar.vercel.app/api/telemetry")}
, {"report_interval", t(duration_s(), undefined, "7d")}
].
];
fields(ExtraField) ->
Mod = list_to_atom(ExtraField),
Mod:fields(ExtraField).
translations() -> ["ekka", "vm_args", "gen_rpc", "kernel", "emqx"].

View File

@ -22,14 +22,44 @@
, resource_id_to_name/1
, list_bridges/0
, is_bridge/1
, config_key_path/0
, update_config/1
]).
-export([structs/0, fields/1]).
%%======================================================================================
%% Hocon Schema Definitions
-define(BRIDGE_FIELDS(T),
[{name, hoconsc:t(typerefl:binary())},
{type, hoconsc:t(typerefl:atom(T))},
{config, hoconsc:t(hoconsc:ref(list_to_atom("emqx_connector_"++atom_to_list(T)), ""))}]).
-define(TYPES, [mysql, pgsql, mongo, redis, ldap]).
-define(BRIDGES, [hoconsc:ref(?MODULE, T) || T <- ?TYPES]).
structs() -> ["emqx_data_bridge"].
fields("emqx_data_bridge") ->
[{bridges, #{type => hoconsc:array(hoconsc:union(?BRIDGES)),
default => []}}];
fields(mysql) -> ?BRIDGE_FIELDS(mysql);
fields(pgsql) -> ?BRIDGE_FIELDS(pgsql);
fields(mongo) -> ?BRIDGE_FIELDS(mongo);
fields(redis) -> ?BRIDGE_FIELDS(redis);
fields(ldap) -> ?BRIDGE_FIELDS(ldap).
%%======================================================================================
load_bridges() ->
ConfFile = filename:join([emqx:get_env(plugins_etc_dir), ?MODULE]) ++ ".conf",
{ok, RawConfig} = hocon:load(ConfFile, #{format => richmap}),
#{emqx_data_bridge := #{bridges := Bridges}} =
hocon_schema:check(emqx_data_bridge_schema, RawConfig,
#{atom_key => true, return_plain => true}),
% ConfFile = filename:join([emqx:get_env(plugins_etc_dir), ?MODULE]) ++ ".conf",
% {ok, RawConfig} = hocon:load(ConfFile, #{format => richmap}),
% #{emqx_data_bridge := #{bridges := Bridges}} =
% hocon_schema:check(emqx_data_bridge_schema, RawConfig,
% #{atom_key => true, return_plain => true}),
Bridges = emqx_config:get([emqx_data_bridge, bridges], []),
emqx_data_bridge_monitor:ensure_all_started(Bridges).
resource_type(mysql) -> emqx_connector_mysql;
@ -57,3 +87,9 @@ is_bridge(#{id := <<"bridge:", _/binary>>}) ->
true;
is_bridge(_Data) ->
false.
config_key_path() ->
[emqx_data_bridge, bridges].
update_config(ConfigReq) ->
emqx_config_handler:update_config(config_key_path(), ConfigReq).

View File

@ -57,6 +57,8 @@
, delete_bridge/2
]).
-define(BRIDGE(N, T, C), #{<<"name">> => N, <<"type">> => T, <<"config">> => C}).
list_bridges(_Binding, _Params) ->
{200, #{code => 0, data => [format_api_reply(Data) ||
Data <- emqx_data_bridge:list_bridges()]}}.
@ -74,10 +76,9 @@ create_bridge(#{name := Name}, Params) ->
BridgeType = proplists:get_value(<<"type">>, Params),
case emqx_resource:check_and_create(
emqx_data_bridge:name_to_resource_id(Name),
emqx_data_bridge:resource_type(BridgeType), Config) of
emqx_data_bridge:resource_type(atom(BridgeType)), maps:from_list(Config)) of
{ok, Data} ->
update_config(),
{200, #{code => 0, data => format_api_reply(emqx_resource_api:format_data(Data))}};
update_config_and_reply(Name, BridgeType, Config, Data);
{error, already_created} ->
{400, #{code => 102, message => <<"bridge already created: ", Name/binary>>}};
{error, Reason0} ->
@ -91,10 +92,9 @@ update_bridge(#{name := Name}, Params) ->
BridgeType = proplists:get_value(<<"type">>, Params),
case emqx_resource:check_and_update(
emqx_data_bridge:name_to_resource_id(Name),
emqx_data_bridge:resource_type(BridgeType), Config, []) of
emqx_data_bridge:resource_type(atom(BridgeType)), maps:from_list(Config), []) of
{ok, Data} ->
update_config(),
{200, #{code => 0, data => format_api_reply(emqx_resource_api:format_data(Data))}};
update_config_and_reply(Name, BridgeType, Config, Data);
{error, not_found} ->
{400, #{code => 102, message => <<"bridge not_found: ", Name/binary>>}};
{error, Reason0} ->
@ -105,9 +105,7 @@ update_bridge(#{name := Name}, Params) ->
delete_bridge(#{name := Name}, _Params) ->
case emqx_resource:remove(emqx_data_bridge:name_to_resource_id(Name)) of
ok ->
update_config(),
{200, #{code => 0, data => #{}}};
ok -> delete_config_and_reply(Name);
{error, Reason} ->
{500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
end.
@ -117,12 +115,28 @@ format_api_reply(#{resource_type := Type, id := Id, config := Conf, status := St
name => emqx_data_bridge:resource_id_to_name(Id),
config => Conf, status => Status}.
format_conf(#{resource_type := Type, id := Id, config := Conf}) ->
#{type => Type, name => emqx_data_bridge:resource_id_to_name(Id),
config => Conf}.
% format_conf(#{resource_type := Type, id := Id, config := Conf}) ->
% #{type => Type, name => emqx_data_bridge:resource_id_to_name(Id),
% config => Conf}.
get_all_configs() ->
[format_conf(Data) || Data <- emqx_data_bridge:list_bridges()].
% get_all_configs() ->
% [format_conf(Data) || Data <- emqx_data_bridge:list_bridges()].
update_config() ->
emqx_config_handler:update_config(emqx_data_bridge_config_handler, get_all_configs()).
update_config_and_reply(Name, BridgeType, Config, Data) ->
case emqx_data_bridge:update_config({update, ?BRIDGE(Name, BridgeType, Config)}) of
ok ->
{200, #{code => 0, data => format_api_reply(
emqx_resource_api:format_data(Data))}};
{error, Reason} ->
{500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
end.
delete_config_and_reply(Name) ->
case emqx_data_bridge:update_config({delete, Name}) of
ok -> {200, #{code => 0, data => #{}}};
{error, Reason} ->
{500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
end.
atom(B) when is_binary(B) ->
list_to_existing_atom(binary_to_list(B)).

View File

@ -17,14 +17,26 @@
-behaviour(application).
-export([start/2, stop/1]).
-behaviour(emqx_config_handler).
-export([start/2, stop/1, handle_update_config/2]).
start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_data_bridge_sup:start_link(),
ok = emqx_data_bridge:load_bridges(),
emqx_config_handler:add_handler(emqx_data_bridge:config_key_path(), ?MODULE),
{ok, Sup}.
stop(_State) ->
ok.
%% internal functions
handle_update_config({update, Bridge = #{<<"name">> := Name}}, OldConf) ->
[Bridge | remove_bridge(Name, OldConf)];
handle_update_config({delete, Name}, OldConf) ->
remove_bridge(Name, OldConf).
remove_bridge(_Name, undefined) ->
[];
remove_bridge(Name, OldConf) ->
[B || B = #{<<"name">> := Name0} <- OldConf, Name0 =/= Name].

View File

@ -1,23 +0,0 @@
-module(emqx_data_bridge_schema).
-export([structs/0, fields/1]).
-define(BRIDGE_FIELDS(T),
[{name, hoconsc:t(typerefl:binary())},
{type, hoconsc:t(typerefl:atom(T))},
{config, hoconsc:t(hoconsc:ref(list_to_atom("emqx_connector_"++atom_to_list(T)), ""))}]).
-define(TYPES, [mysql, pgsql, mongo, redis, ldap]).
-define(BRIDGES, [hoconsc:ref(T) || T <- ?TYPES]).
structs() -> [emqx_data_bridge].
fields(emqx_data_bridge) ->
[{bridges, #{type => hoconsc:array(hoconsc:union(?BRIDGES)),
default => []}}];
fields(mysql) -> ?BRIDGE_FIELDS(mysql);
fields(pgsql) -> ?BRIDGE_FIELDS(pgsql);
fields(mongo) -> ?BRIDGE_FIELDS(mongo);
fields(redis) -> ?BRIDGE_FIELDS(redis);
fields(ldap) -> ?BRIDGE_FIELDS(ldap).

View File

@ -35,13 +35,8 @@ init([]) ->
start => {emqx_data_bridge_monitor, start_link, []},
restart => permanent,
type => worker,
modules => [emqx_data_bridge_monitor]},
emqx_config_handler:child_spec(emqx_data_bridge_config_handler, emqx_config_handler,
config_key_path())
modules => [emqx_data_bridge_monitor]}
],
{ok, {SupFlags, ChildSpecs}}.
%% internal functions
config_key_path() ->
[emqx_data_bridge, bridges].

View File

@ -84,7 +84,7 @@ listener_name(Proto) ->
list_to_atom(atom_to_list(Proto) ++ ":management").
http_handlers() ->
Apps = [ App || {App, _, _} <- application:which_applications(),
Apps = [ App || {App, _, _} <- application:loaded_applications(),
case re:run(atom_to_list(App), "^emqx") of
{match,[{0,4}]} -> true;
_ -> false

View File

@ -77,7 +77,7 @@ emqx_resource:query(ResourceID, {sql, SQL}).
<<"keyfile">> => [],<<"password">> => "public",
<<"pool_size">> => 1,
<<"server">> => {{127,0,0,1},3306},
<<"ssl">> => false,<<"user">> => "root",
<<"ssl">> => false,<<"username">> => "root",
<<"verify">> => false},
id => <<"bridge:mysql-def">>,mod => emqx_connector_mysql,
state => #{poolname => 'bridge:mysql-def'},
@ -107,7 +107,7 @@ BridgeMySQL='{
"status": "started",
"config": {
"verify": false,
"user": "root",
"username": "root",
"ssl": false,
"server": "127.0.0.1:3306",
"pool_size": 1,
@ -135,7 +135,7 @@ BridgeMySQL='{
"status": "started",
"config": {
"verify": false,
"user": "root",
"username": "root",
"ssl": false,
"server": "127.0.0.1:3306",
"pool_size": 2,

View File

@ -16,6 +16,10 @@
-module(emqx_telemetry).
-include_lib("typerefl/include/types.hrl").
-behaviour(hocon_schema).
-behaviour(gen_server).
-include_lib("emqx/include/emqx.hrl").
@ -36,6 +40,9 @@
, stop/0
]).
-export([ structs/0
, fields/1]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
@ -90,6 +97,12 @@
-define(TELEMETRY, emqx_telemetry).
%%--------------------------------------------------------------------
structs() -> ["emqx_telemetry"].
fields("emqx_telemetry") ->
[{enabled, emqx_schema:t(boolean(), undefined, false)}].
%%--------------------------------------------------------------------
%% Mnesia bootstrap
%%--------------------------------------------------------------------

View File

@ -25,13 +25,6 @@
]).
start(_Type, _Args) ->
%% TODO
%% After the relevant code for building hocon configuration will be deleted
%% Get the configuration using emqx_config:get
ConfFile = filename:join([emqx:get_env(plugins_etc_dir), ?APP]) ++ ".conf",
{ok, RawConfig} = hocon:load(ConfFile),
Config = hocon_schema:check_plain(emqx_telemetry_schema, RawConfig, #{atom_key => true}),
emqx_config_handler:update_config(emqx_config_handler, Config),
Enabled = emqx_config:get([?APP, enabled], true),
emqx_telemetry_sup:start_link([{enabled, Enabled}]).

View File

@ -1,13 +0,0 @@
-module(emqx_telemetry_schema).
-include_lib("typerefl/include/types.hrl").
-behaviour(hocon_schema).
-export([ structs/0
, fields/1]).
structs() -> ["emqx_telemetry"].
fields("emqx_telemetry") ->
[{enabled, emqx_schema:t(boolean(), undefined, false)}].

View File

@ -53,7 +53,7 @@
, {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1
, {getopt, "1.0.1"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.13.0"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.7.0"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.9.0"}}}
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.2.1"}}}
]}.

View File

@ -104,7 +104,8 @@ test_plugins() ->
test_deps() ->
[ {bbmustache, "1.10.0"}
, {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "2.0.0"}}}
%, {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "2.0.0"}}}
, {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {branch, "hocon"}}}
, meck
].