The config handler phase2 (#5052)
* refator(config_handler): handle and validate the updates to raw_configs * fix(hocon): update hocon to 0.8.0 * refactor(config_handler): check and apply envs only in top-level handler * refactor(config_handler): update config from top level to bottom level * refactor(emqx_data_bridge): move configs to emqx.conf * fix(emqx_schema): remove the extra config path * fix(config_handler): load the emqx.conf when starting emqx_config_handler * fix(data_bridge): API not working * feat(config_handler): save updated configs to emqx_override.conf * fix(config_handler): cannot find the emqx.conf and emqx_override.conf * fix(emqx_config): cannot find the correct path for etc dir * fix(test): load load emqx_schema foreign refereced apps * refactor(emqx_plugin): do not generate configs before load plugins All configs (including the configs for plugins) now should go into the `emqx.conf`. * fix(tests): update the test cases for plugins * fix(tests): don't include schema from apps when testing * fix(tests): use emqx-ct-helper branch hocon
This commit is contained in:
parent
a42605b27b
commit
704af9f3b1
|
@ -50,7 +50,7 @@ jobs:
|
||||||
printenv > .env
|
printenv > .env
|
||||||
docker exec -i erlang sh -c "make ensure-rebar3"
|
docker exec -i erlang sh -c "make ensure-rebar3"
|
||||||
docker exec -i erlang sh -c "./rebar3 eunit --application=emqx_auth_ldap"
|
docker exec -i erlang sh -c "./rebar3 eunit --application=emqx_auth_ldap"
|
||||||
docker exec --env-file .env -i erlang sh -c "./rebar3 ct --dir apps/emqx_auth_ldap"
|
docker exec --env-file .env -i erlang sh -c "make apps/emqx_auth_ldap-ct"
|
||||||
- uses: actions/upload-artifact@v1
|
- uses: actions/upload-artifact@v1
|
||||||
if: failure()
|
if: failure()
|
||||||
with:
|
with:
|
||||||
|
@ -120,7 +120,7 @@ jobs:
|
||||||
printenv > .env
|
printenv > .env
|
||||||
docker exec -i erlang sh -c "make ensure-rebar3"
|
docker exec -i erlang sh -c "make ensure-rebar3"
|
||||||
docker exec -i erlang sh -c "./rebar3 eunit --application=emqx_auth_mongo"
|
docker exec -i erlang sh -c "./rebar3 eunit --application=emqx_auth_mongo"
|
||||||
docker exec --env-file .env -i erlang sh -c "./rebar3 ct --dir apps/emqx_auth_mongo"
|
docker exec --env-file .env -i erlang sh -c "make apps/emqx_auth_mongo-ct"
|
||||||
- uses: actions/upload-artifact@v1
|
- uses: actions/upload-artifact@v1
|
||||||
if: failure()
|
if: failure()
|
||||||
with:
|
with:
|
||||||
|
@ -203,7 +203,7 @@ jobs:
|
||||||
printenv > .env
|
printenv > .env
|
||||||
docker exec -i erlang sh -c "make ensure-rebar3"
|
docker exec -i erlang sh -c "make ensure-rebar3"
|
||||||
docker exec -i erlang sh -c "./rebar3 eunit --application=emqx_auth_mysql"
|
docker exec -i erlang sh -c "./rebar3 eunit --application=emqx_auth_mysql"
|
||||||
docker exec --env-file .env -i erlang sh -c "./rebar3 ct --dir apps/emqx_auth_mysql"
|
docker exec --env-file .env -i erlang sh -c "make apps/emqx_auth_mysql-ct"
|
||||||
- uses: actions/upload-artifact@v1
|
- uses: actions/upload-artifact@v1
|
||||||
if: failure()
|
if: failure()
|
||||||
with:
|
with:
|
||||||
|
@ -278,7 +278,7 @@ jobs:
|
||||||
printenv > .env
|
printenv > .env
|
||||||
docker exec -i erlang sh -c "make ensure-rebar3"
|
docker exec -i erlang sh -c "make ensure-rebar3"
|
||||||
docker exec -i erlang sh -c "./rebar3 eunit --application=emqx_auth_pgsql"
|
docker exec -i erlang sh -c "./rebar3 eunit --application=emqx_auth_pgsql"
|
||||||
docker exec --env-file .env -i erlang sh -c "./rebar3 ct --dir apps/emqx_auth_pgsql"
|
docker exec --env-file .env -i erlang sh -c "make apps/emqx_auth_pgsql-ct"
|
||||||
- uses: actions/upload-artifact@v1
|
- uses: actions/upload-artifact@v1
|
||||||
if: failure()
|
if: failure()
|
||||||
with:
|
with:
|
||||||
|
@ -399,7 +399,7 @@ jobs:
|
||||||
printenv > .env
|
printenv > .env
|
||||||
docker exec -i erlang sh -c "make ensure-rebar3"
|
docker exec -i erlang sh -c "make ensure-rebar3"
|
||||||
docker exec -i erlang sh -c "./rebar3 eunit --application=emqx_auth_redis"
|
docker exec -i erlang sh -c "./rebar3 eunit --application=emqx_auth_redis"
|
||||||
docker exec --env-file .env -i erlang sh -c "./rebar3 ct --dir apps/emqx_auth_redis"
|
docker exec --env-file .env -i erlang sh -c "make apps/emqx_auth_redis-ct"
|
||||||
- uses: actions/upload-artifact@v1
|
- uses: actions/upload-artifact@v1
|
||||||
if: failure()
|
if: failure()
|
||||||
with:
|
with:
|
||||||
|
|
|
@ -203,6 +203,11 @@ node.cookie = "emqxsecretcookie"
|
||||||
## Value: Folder
|
## Value: Folder
|
||||||
node.data_dir = "{{ platform_data_dir }}"
|
node.data_dir = "{{ platform_data_dir }}"
|
||||||
|
|
||||||
|
## The config file dir for the node
|
||||||
|
##
|
||||||
|
## Value: Folder
|
||||||
|
node.etc_dir = "{{ platform_etc_dir }}"
|
||||||
|
|
||||||
## Heartbeat monitoring of an Erlang runtime system. Comment the line to disable
|
## Heartbeat monitoring of an Erlang runtime system. Comment the line to disable
|
||||||
## heartbeat, or set the value as 'on'
|
## heartbeat, or set the value as 'on'
|
||||||
##
|
##
|
||||||
|
@ -457,8 +462,8 @@ log.file = emqx.log
|
||||||
## and Erlang process message queue inspection.
|
## and Erlang process message queue inspection.
|
||||||
##
|
##
|
||||||
## Value: Integer or 'unlimited' (without quotes)
|
## Value: Integer or 'unlimited' (without quotes)
|
||||||
## Default: 20
|
## Default: 80
|
||||||
#log.max_depth = 20
|
#log.max_depth = 80
|
||||||
|
|
||||||
## Log formatter
|
## Log formatter
|
||||||
## Value: text | json
|
## Value: text | json
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.2"}}}
|
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.2"}}}
|
||||||
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
|
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
|
||||||
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v4.0.1"}}} %% todo delete when plugins use hocon
|
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v4.0.1"}}} %% todo delete when plugins use hocon
|
||||||
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.7.0"}}}
|
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.9.0"}}}
|
||||||
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}}
|
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.4"}}}
|
||||||
, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
|
, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
|
||||||
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.13.0"}}}
|
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.13.0"}}}
|
||||||
|
@ -28,9 +28,8 @@
|
||||||
{test,
|
{test,
|
||||||
[{deps,
|
[{deps,
|
||||||
[ meck
|
[ meck
|
||||||
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.6.0"}}}
|
|
||||||
, {bbmustache,"1.10.0"}
|
, {bbmustache,"1.10.0"}
|
||||||
, {emqx_ct_helpers, {git,"https://github.com/zmstone/emqx-ct-helpers", {branch,"hocon"}}}
|
, {emqx_ct_helpers, {git,"https://github.com/emqx/emqx-ct-helpers", {branch,"hocon"}}}
|
||||||
, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3.1"}}}
|
, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.3.1"}}}
|
||||||
]},
|
]},
|
||||||
{extra_src_dirs, [{"test",[recursive]}]}
|
{extra_src_dirs, [{"test",[recursive]}]}
|
||||||
|
|
Binary file not shown.
|
@ -23,6 +23,8 @@
|
||||||
, put/2
|
, put/2
|
||||||
, deep_get/3
|
, deep_get/3
|
||||||
, deep_put/3
|
, deep_put/3
|
||||||
|
, safe_atom_key_map/1
|
||||||
|
, unsafe_atom_key_map/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-spec get() -> term().
|
-spec get() -> term().
|
||||||
|
@ -58,3 +60,21 @@ deep_put([], Map, Config) when is_map(Map) ->
|
||||||
deep_put([Key | KeyPath], Map, Config) ->
|
deep_put([Key | KeyPath], Map, Config) ->
|
||||||
SubMap = deep_put(KeyPath, maps:get(Key, Map, #{}), Config),
|
SubMap = deep_put(KeyPath, maps:get(Key, Map, #{}), Config),
|
||||||
Map#{Key => SubMap}.
|
Map#{Key => SubMap}.
|
||||||
|
|
||||||
|
unsafe_atom_key_map(Map) ->
|
||||||
|
covert_keys_to_atom(Map, fun(K) -> binary_to_atom(K, utf8) end).
|
||||||
|
|
||||||
|
safe_atom_key_map(Map) ->
|
||||||
|
covert_keys_to_atom(Map, fun(K) -> binary_to_existing_atom(K, utf8) end).
|
||||||
|
|
||||||
|
covert_keys_to_atom(BinKeyMap, Conv) when is_map(BinKeyMap) ->
|
||||||
|
maps:fold(
|
||||||
|
fun(K, V, Acc) when is_binary(K) ->
|
||||||
|
Acc#{Conv(K) => covert_keys_to_atom(V, Conv)};
|
||||||
|
(K, V, Acc) when is_atom(K) ->
|
||||||
|
%% richmap keys
|
||||||
|
Acc#{K => covert_keys_to_atom(V, Conv)}
|
||||||
|
end, #{}, BinKeyMap);
|
||||||
|
covert_keys_to_atom(ListV, Conv) when is_list(ListV) ->
|
||||||
|
[covert_keys_to_atom(V, Conv) || V <- ListV];
|
||||||
|
covert_keys_to_atom(Val, _) -> Val.
|
||||||
|
|
|
@ -17,13 +17,16 @@
|
||||||
%% And there are a top level config handler maintains the overall config map.
|
%% And there are a top level config handler maintains the overall config map.
|
||||||
-module(emqx_config_handler).
|
-module(emqx_config_handler).
|
||||||
|
|
||||||
|
-include("logger.hrl").
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
%% API functions
|
%% API functions
|
||||||
-export([ start_link/0
|
-export([ start_link/0
|
||||||
, start_handler/3
|
, add_handler/2
|
||||||
, update_config/2
|
, update_config/2
|
||||||
, child_spec/3
|
, get_raw_config/0
|
||||||
|
, merge_to_old_config/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% emqx_config_handler callbacks
|
%% emqx_config_handler callbacks
|
||||||
|
@ -38,74 +41,74 @@
|
||||||
terminate/2,
|
terminate/2,
|
||||||
code_change/3]).
|
code_change/3]).
|
||||||
|
|
||||||
-type config() :: term().
|
-define(MOD, {mod}).
|
||||||
-type config_map() :: #{atom() => config()} | [config_map()].
|
|
||||||
-type handler_name() :: module() | top.
|
-type update_request() :: term().
|
||||||
-type key_path() :: [atom()].
|
-type raw_config() :: hocon:config() | undefined.
|
||||||
|
-type config_key() :: atom().
|
||||||
|
-type handler_name() :: module().
|
||||||
|
-type config_key_path() :: [atom()].
|
||||||
|
-type handlers() :: #{config_key() => handlers(), ?MOD => handler_name()}.
|
||||||
|
|
||||||
-optional_callbacks([handle_update_config/2]).
|
-optional_callbacks([handle_update_config/2]).
|
||||||
|
|
||||||
-callback handle_update_config(config(), config_map()) -> config_map().
|
-callback handle_update_config(update_request(), raw_config()) -> update_request().
|
||||||
|
|
||||||
-record(state, {
|
-type state() :: #{
|
||||||
handler_name :: handler_name(),
|
handlers := handlers(),
|
||||||
parent :: handler_name(),
|
raw_config := raw_config(),
|
||||||
key_path :: key_path()
|
atom() => term()
|
||||||
}).
|
}.
|
||||||
|
|
||||||
start_link() ->
|
start_link() ->
|
||||||
start_handler(?MODULE, top, []).
|
gen_server:start_link({local, ?MODULE}, ?MODULE, {}, []).
|
||||||
|
|
||||||
-spec start_handler(handler_name(), handler_name(), key_path()) ->
|
-spec update_config(config_key_path(), update_request()) -> ok | {error, term()}.
|
||||||
{ok, pid()} | {error, {already_started, pid()}} | {error, term()}.
|
update_config(ConfKeyPath, UpdateReq) ->
|
||||||
start_handler(HandlerName, Parent, ConfKeyPath) ->
|
gen_server:call(?MODULE, {update_config, ConfKeyPath, UpdateReq}).
|
||||||
gen_server:start_link({local, HandlerName}, ?MODULE, {HandlerName, Parent, ConfKeyPath}, []).
|
|
||||||
|
|
||||||
-spec child_spec(module(), handler_name(), key_path()) -> supervisor:child_spec().
|
-spec add_handler(config_key_path(), handler_name()) -> ok.
|
||||||
child_spec(Mod, Parent, KeyPath) ->
|
add_handler(ConfKeyPath, HandlerName) ->
|
||||||
#{id => Mod,
|
gen_server:call(?MODULE, {add_child, ConfKeyPath, HandlerName}).
|
||||||
start => {?MODULE, start_handler, [Mod, Parent, KeyPath]},
|
|
||||||
restart => permanent,
|
|
||||||
type => worker,
|
|
||||||
modules => [?MODULE]}.
|
|
||||||
|
|
||||||
-spec update_config(handler_name(), config()) -> ok.
|
-spec get_raw_config() -> raw_config().
|
||||||
update_config(top, Config) ->
|
get_raw_config() ->
|
||||||
emqx_config:put(Config),
|
gen_server:call(?MODULE, get_raw_config).
|
||||||
save_config_to_disk(Config);
|
|
||||||
update_config(Handler, Config) ->
|
|
||||||
case is_process_alive(whereis(Handler)) of
|
|
||||||
true -> gen_server:cast(Handler, {handle_update_config, Config});
|
|
||||||
false -> error({not_alive, Handler})
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%============================================================================
|
%%============================================================================
|
||||||
%% callbacks of emqx_config_handler (the top-level handler)
|
|
||||||
handle_update_config(Config, undefined) ->
|
|
||||||
handle_update_config(Config, #{});
|
|
||||||
handle_update_config(Config, OldConfigMap) ->
|
|
||||||
%% simply merge the config to the overall config
|
|
||||||
maps:merge(OldConfigMap, Config).
|
|
||||||
|
|
||||||
init({HandlerName, Parent, ConfKeyPath}) ->
|
-spec init(term()) -> {ok, state()}.
|
||||||
{ok, #state{handler_name = HandlerName, parent = Parent, key_path = ConfKeyPath}}.
|
init(_) ->
|
||||||
|
{ok, RawConf} = hocon:load(emqx_conf_name(), #{format => richmap}),
|
||||||
|
{_MappedEnvs, Conf} = hocon_schema:map_translate(emqx_schema, RawConf, #{}),
|
||||||
|
ok = save_config_to_emqx_config(Conf),
|
||||||
|
{ok, #{raw_config => hocon_schema:richmap_to_map(RawConf),
|
||||||
|
handlers => #{?MOD => ?MODULE}}}.
|
||||||
|
|
||||||
|
handle_call(get_raw_config, _From, State = #{raw_config := RawConf}) ->
|
||||||
|
{reply, RawConf, State};
|
||||||
|
|
||||||
|
handle_call({add_child, ConfKeyPath, HandlerName}, _From,
|
||||||
|
State = #{handlers := Handlers}) ->
|
||||||
|
{reply, ok, State#{handlers =>
|
||||||
|
emqx_config:deep_put(ConfKeyPath, Handlers, #{?MOD => HandlerName})}};
|
||||||
|
|
||||||
|
handle_call({update_config, ConfKeyPath, UpdateReq}, _From,
|
||||||
|
#{raw_config := RawConf, handlers := Handlers} = State) ->
|
||||||
|
try {RootKeys, Conf} = do_update_config(ConfKeyPath, Handlers, RawConf, UpdateReq),
|
||||||
|
{reply, save_configs(RootKeys, Conf), State#{raw_config => Conf}}
|
||||||
|
catch
|
||||||
|
throw: Reason ->
|
||||||
|
{reply, {error, Reason}, State};
|
||||||
|
Error : Reason : ST ->
|
||||||
|
?LOG(error, "update config failed: ~p", [{Error, Reason, ST}]),
|
||||||
|
{reply, {error, Reason}, State}
|
||||||
|
end;
|
||||||
|
|
||||||
handle_call(_Request, _From, State) ->
|
handle_call(_Request, _From, State) ->
|
||||||
Reply = ok,
|
Reply = ok,
|
||||||
{reply, Reply, State}.
|
{reply, Reply, State}.
|
||||||
|
|
||||||
handle_cast({handle_update_config, Config}, #state{handler_name = HandlerName,
|
|
||||||
parent = Parent, key_path = ConfKeyPath} = State) ->
|
|
||||||
%% accumulate the config map of this config handler
|
|
||||||
OldConfigMap = emqx_config:get(ConfKeyPath, undefined),
|
|
||||||
SubConfigMap = case erlang:function_exported(HandlerName, handle_update_config, 2) of
|
|
||||||
true -> HandlerName:handle_update_config(Config, OldConfigMap);
|
|
||||||
false -> wrap_sub_config(ConfKeyPath, Config)
|
|
||||||
end,
|
|
||||||
%% then notify the parent handler
|
|
||||||
update_config(Parent, SubConfigMap),
|
|
||||||
{noreply, State};
|
|
||||||
|
|
||||||
handle_cast(_Msg, State) ->
|
handle_cast(_Msg, State) ->
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
|
@ -118,17 +121,100 @@ terminate(_Reason, _State) ->
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
%%============================================================================
|
do_update_config([], Handlers, OldConf, UpdateReq) ->
|
||||||
save_config_to_disk(ConfigMap) ->
|
call_handle_update_config(Handlers, OldConf, UpdateReq);
|
||||||
Filename = filename:join([emqx_data_dir(), "emqx_override.conf"]),
|
do_update_config([ConfKey | ConfKeyPath], Handlers, OldConf, UpdateReq) ->
|
||||||
case file:write_file(Filename, jsx:encode(ConfigMap)) of
|
SubOldConf = get_sub_config(ConfKey, OldConf),
|
||||||
ok -> ok;
|
case maps:find(ConfKey, Handlers) of
|
||||||
{error, Reason} -> logger:error("write to ~s failed, ~p", [Filename, Reason])
|
error -> throw({handler_not_found, ConfKey});
|
||||||
|
{ok, SubHandlers} ->
|
||||||
|
NewUpdateReq = do_update_config(ConfKeyPath, SubHandlers, SubOldConf, UpdateReq),
|
||||||
|
call_handle_update_config(Handlers, OldConf, #{bin(ConfKey) => NewUpdateReq})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
emqx_data_dir() ->
|
get_sub_config(_, undefined) ->
|
||||||
%emqx_config:get([node, data_dir])
|
undefined;
|
||||||
"data".
|
get_sub_config(ConfKey, OldConf) when is_map(OldConf) ->
|
||||||
|
maps:get(bin(ConfKey), OldConf, undefined);
|
||||||
|
get_sub_config(_, OldConf) ->
|
||||||
|
OldConf.
|
||||||
|
|
||||||
wrap_sub_config(ConfKeyPath, Config) ->
|
call_handle_update_config(Handlers, OldConf, UpdateReq) ->
|
||||||
emqx_config:deep_put(ConfKeyPath, #{}, Config).
|
HandlerName = maps:get(?MOD, Handlers, undefined),
|
||||||
|
case erlang:function_exported(HandlerName, handle_update_config, 2) of
|
||||||
|
true -> HandlerName:handle_update_config(UpdateReq, OldConf);
|
||||||
|
false -> UpdateReq %% the default behaviour is overwriting the old config
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% callbacks for the top-level handler
|
||||||
|
handle_update_config(UpdateReq, OldConf) ->
|
||||||
|
FullRawConf = merge_to_old_config(UpdateReq, OldConf),
|
||||||
|
{maps:keys(UpdateReq), FullRawConf}.
|
||||||
|
|
||||||
|
%% default callback of config handlers
|
||||||
|
merge_to_old_config(UpdateReq, undefined) ->
|
||||||
|
merge_to_old_config(UpdateReq, #{});
|
||||||
|
merge_to_old_config(UpdateReq, RawConf) ->
|
||||||
|
maps:merge(RawConf, UpdateReq).
|
||||||
|
|
||||||
|
%%============================================================================
|
||||||
|
save_configs(RootKeys, Conf0) ->
|
||||||
|
{_MappedEnvs, Conf1} = hocon_schema:map_translate(emqx_schema, to_richmap(Conf0), #{}),
|
||||||
|
%save_config_to_app_env(MappedEnvs),
|
||||||
|
save_config_to_emqx_config(hocon_schema:richmap_to_map(Conf1)),
|
||||||
|
save_config_to_disk(RootKeys, Conf0).
|
||||||
|
|
||||||
|
%% We may need also support hot config update for the apps that use application envs.
|
||||||
|
%% If so uncomment the following lines to update the configs to application env
|
||||||
|
% save_config_to_app_env(MappedEnvs) ->
|
||||||
|
% lists:foreach(fun({AppName, Envs}) ->
|
||||||
|
% [application:set_env(AppName, Par, Val) || {Par, Val} <- Envs]
|
||||||
|
% end, MappedEnvs).
|
||||||
|
|
||||||
|
save_config_to_emqx_config(Conf) ->
|
||||||
|
emqx_config:put(emqx_config:unsafe_atom_key_map(Conf)).
|
||||||
|
|
||||||
|
save_config_to_disk(RootKeys, Conf) ->
|
||||||
|
FileName = emqx_override_conf_name(),
|
||||||
|
OldConf = read_old_config(FileName),
|
||||||
|
%% We don't save the overall config to file, but only the sub configs
|
||||||
|
%% under RootKeys
|
||||||
|
write_new_config(FileName,
|
||||||
|
maps:merge(OldConf, maps:with(RootKeys, Conf))).
|
||||||
|
|
||||||
|
write_new_config(FileName, Conf) ->
|
||||||
|
case file:write_file(FileName, jsx:prettify(jsx:encode(Conf))) of
|
||||||
|
ok -> ok;
|
||||||
|
{error, Reason} ->
|
||||||
|
logger:error("write to ~s failed, ~p", [FileName, Reason]),
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
read_old_config(FileName) ->
|
||||||
|
case file:read_file(FileName) of
|
||||||
|
{ok, Text} ->
|
||||||
|
try jsx:decode(Text, [{return_maps, true}]) of
|
||||||
|
Conf when is_map(Conf) -> Conf;
|
||||||
|
_ -> #{}
|
||||||
|
catch _Err : _Reason ->
|
||||||
|
#{}
|
||||||
|
end;
|
||||||
|
_ -> #{}
|
||||||
|
end.
|
||||||
|
|
||||||
|
emqx_conf_name() ->
|
||||||
|
filename:join([etc_dir(), "emqx.conf"]).
|
||||||
|
|
||||||
|
emqx_override_conf_name() ->
|
||||||
|
filename:join([emqx:get_env(data_dir), "emqx_override.conf"]).
|
||||||
|
|
||||||
|
etc_dir() ->
|
||||||
|
emqx:get_env(etc_dir).
|
||||||
|
|
||||||
|
to_richmap(Map) ->
|
||||||
|
{ok, RichMap} = hocon:binary(jsx:encode(Map), #{format => richmap}),
|
||||||
|
RichMap.
|
||||||
|
|
||||||
|
bin(A) when is_atom(A) -> list_to_binary(atom_to_list(A));
|
||||||
|
bin(B) when is_binary(B) -> B;
|
||||||
|
bin(S) when is_list(S) -> list_to_binary(S).
|
||||||
|
|
|
@ -30,8 +30,6 @@
|
||||||
, reload/1
|
, reload/1
|
||||||
, list/0
|
, list/0
|
||||||
, find_plugin/1
|
, find_plugin/1
|
||||||
, generate_configs/1
|
|
||||||
, apply_configs/1
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([funlog/2]).
|
-export([funlog/2]).
|
||||||
|
@ -173,15 +171,7 @@ load_ext_plugin(PluginDir) ->
|
||||||
?LOG(alert, "plugin_app_file_not_found: ~s", [AppFile]),
|
?LOG(alert, "plugin_app_file_not_found: ~s", [AppFile]),
|
||||||
error({plugin_app_file_not_found, AppFile})
|
error({plugin_app_file_not_found, AppFile})
|
||||||
end,
|
end,
|
||||||
ok = load_plugin_app(AppName, Ebin),
|
load_plugin_app(AppName, Ebin).
|
||||||
try
|
|
||||||
ok = generate_configs(AppName, PluginDir)
|
|
||||||
catch
|
|
||||||
throw : {conf_file_not_found, ConfFile} ->
|
|
||||||
%% this is maybe a dependency of an external plugin
|
|
||||||
?LOG(debug, "config_load_error_ignored for app=~p, path=~s", [AppName, ConfFile]),
|
|
||||||
ok
|
|
||||||
end.
|
|
||||||
|
|
||||||
load_plugin_app(AppName, Ebin) ->
|
load_plugin_app(AppName, Ebin) ->
|
||||||
_ = code:add_patha(Ebin),
|
_ = code:add_patha(Ebin),
|
||||||
|
@ -246,7 +236,6 @@ plugin(AppName, Type) ->
|
||||||
|
|
||||||
load_plugin(Name, Persistent) ->
|
load_plugin(Name, Persistent) ->
|
||||||
try
|
try
|
||||||
ok = ?MODULE:generate_configs(Name),
|
|
||||||
case load_app(Name) of
|
case load_app(Name) of
|
||||||
ok ->
|
ok ->
|
||||||
start_app(Name, fun(App) -> plugin_loaded(App, Persistent) end);
|
start_app(Name, fun(App) -> plugin_loaded(App, Persistent) end);
|
||||||
|
@ -362,77 +351,5 @@ plugin_type(backend) -> backend;
|
||||||
plugin_type(bridge) -> bridge;
|
plugin_type(bridge) -> bridge;
|
||||||
plugin_type(_) -> feature.
|
plugin_type(_) -> feature.
|
||||||
|
|
||||||
|
|
||||||
funlog(Key, Value) ->
|
funlog(Key, Value) ->
|
||||||
?LOG(info, "~s = ~p", [string:join(Key, "."), Value]).
|
?LOG(info, "~s = ~p", [string:join(Key, "."), Value]).
|
||||||
|
|
||||||
generate_configs(App) ->
|
|
||||||
PluginConfDir = emqx:get_env(plugins_etc_dir),
|
|
||||||
PluginSchemaDir = code:priv_dir(App),
|
|
||||||
generate_configs(App, PluginConfDir, PluginSchemaDir).
|
|
||||||
|
|
||||||
generate_configs(App, PluginDir) ->
|
|
||||||
PluginConfDir = filename:join([PluginDir, "etc"]),
|
|
||||||
PluginSchemaDir = filename:join([PluginDir, "priv"]),
|
|
||||||
generate_configs(App, PluginConfDir, PluginSchemaDir).
|
|
||||||
|
|
||||||
generate_configs(App, PluginConfDir, PluginSchemaDir) ->
|
|
||||||
ConfigFile = filename:join([PluginConfDir, App]) ++ ".config",
|
|
||||||
case filelib:is_file(ConfigFile) of
|
|
||||||
true ->
|
|
||||||
{ok, [Configs]} = file:consult(ConfigFile),
|
|
||||||
apply_configs(Configs);
|
|
||||||
false ->
|
|
||||||
SchemaFile = filename:join([PluginSchemaDir, App]) ++ ".schema",
|
|
||||||
case filelib:is_file(SchemaFile) of
|
|
||||||
true ->
|
|
||||||
AppsEnv = do_generate_configs(App),
|
|
||||||
apply_configs(AppsEnv);
|
|
||||||
false ->
|
|
||||||
SchemaMod = lists:concat([App, "_schema"]),
|
|
||||||
ConfName = filename:join([PluginConfDir, App]) ++ ".conf",
|
|
||||||
SchemaFile1 = filename:join([code:lib_dir(App), "ebin", SchemaMod]) ++ ".beam",
|
|
||||||
do_generate_hocon_configs(App, ConfName, SchemaFile1)
|
|
||||||
end
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_generate_configs(App) ->
|
|
||||||
Name1 = filename:join([emqx:get_env(plugins_etc_dir), App]) ++ ".conf",
|
|
||||||
Name2 = filename:join([code:lib_dir(App), "etc", App]) ++ ".conf",
|
|
||||||
ConfFile = case {filelib:is_file(Name1), filelib:is_file(Name2)} of
|
|
||||||
{true, _} -> Name1;
|
|
||||||
{false, true} -> Name2;
|
|
||||||
{false, false} -> error({config_not_found, [Name1, Name2]})
|
|
||||||
end,
|
|
||||||
SchemaFile = filename:join([code:priv_dir(App), App]) ++ ".schema",
|
|
||||||
case filelib:is_file(SchemaFile) of
|
|
||||||
true ->
|
|
||||||
Schema = cuttlefish_schema:files([SchemaFile]),
|
|
||||||
Conf = cuttlefish_conf:file(ConfFile),
|
|
||||||
cuttlefish_generator:map(Schema, Conf, undefined, fun ?MODULE:funlog/2);
|
|
||||||
false ->
|
|
||||||
error({schema_not_found, SchemaFile})
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_generate_hocon_configs(App, ConfName, SchemaFile) ->
|
|
||||||
SchemaMod = lists:concat([App, "_schema"]),
|
|
||||||
case {filelib:is_file(ConfName), filelib:is_file(SchemaFile)} of
|
|
||||||
{true, true} ->
|
|
||||||
{ok, RawConfig} = hocon:load(ConfName, #{format => richmap}),
|
|
||||||
Config = hocon_schema:check(list_to_atom(SchemaMod), RawConfig, #{atom_key => true,
|
|
||||||
return_plain => true}),
|
|
||||||
emqx_config_handler:update_config(emqx_config_handler, Config);
|
|
||||||
{true, false} ->
|
|
||||||
error({schema_not_found, [SchemaFile]});
|
|
||||||
{false, true} ->
|
|
||||||
error({config_not_found, [ConfName]});
|
|
||||||
{false, false} ->
|
|
||||||
error({conf_and_schema_not_found, [ConfName, SchemaFile]})
|
|
||||||
end.
|
|
||||||
|
|
||||||
apply_configs([]) ->
|
|
||||||
ok;
|
|
||||||
apply_configs([{App, Config} | More]) ->
|
|
||||||
lists:foreach(fun({Key, _}) -> application:unset_env(App, Key) end, application:get_all_env(App)),
|
|
||||||
lists:foreach(fun({Key, Val}) -> application:set_env(App, Key, Val) end, Config),
|
|
||||||
apply_configs(More).
|
|
||||||
|
|
|
@ -50,9 +50,22 @@
|
||||||
-export([conf_get/2, conf_get/3, keys/2, filter/1]).
|
-export([conf_get/2, conf_get/3, keys/2, filter/1]).
|
||||||
-export([ssl/2, tr_ssl/2, tr_password_hash/2]).
|
-export([ssl/2, tr_ssl/2, tr_password_hash/2]).
|
||||||
|
|
||||||
|
%% will be used by emqx_ct_helper to find the dependent apps
|
||||||
|
-export([includes/0]).
|
||||||
|
|
||||||
structs() -> ["cluster", "node", "rpc", "log", "lager",
|
structs() -> ["cluster", "node", "rpc", "log", "lager",
|
||||||
"acl", "mqtt", "zone", "listener", "module", "broker",
|
"acl", "mqtt", "zone", "listener", "module", "broker",
|
||||||
"plugins", "sysmon", "os_mon", "vm_mon", "alarm", "telemetry"].
|
"plugins", "sysmon", "os_mon", "vm_mon", "alarm", "telemetry"]
|
||||||
|
++ includes().
|
||||||
|
|
||||||
|
-ifdef(TEST).
|
||||||
|
includes() ->[].
|
||||||
|
-else.
|
||||||
|
includes() ->
|
||||||
|
[ "emqx_data_bridge"
|
||||||
|
, "emqx_telemetry"
|
||||||
|
].
|
||||||
|
-endif.
|
||||||
|
|
||||||
fields("cluster") ->
|
fields("cluster") ->
|
||||||
[ {"name", t(atom(), "ekka.cluster_name", emqxcl)}
|
[ {"name", t(atom(), "ekka.cluster_name", emqxcl)}
|
||||||
|
@ -119,6 +132,7 @@ fields("node") ->
|
||||||
override_env => "EMQX_NODE_COOKIE"
|
override_env => "EMQX_NODE_COOKIE"
|
||||||
})}
|
})}
|
||||||
, {"data_dir", t(string(), "emqx.data_dir", undefined)}
|
, {"data_dir", t(string(), "emqx.data_dir", undefined)}
|
||||||
|
, {"etc_dir", t(string(), "emqx.etc_dir", undefined)}
|
||||||
, {"heartbeat", t(flag(), undefined, false)}
|
, {"heartbeat", t(flag(), undefined, false)}
|
||||||
, {"async_threads", t(range(1, 1024), "vm_args.+A", undefined)}
|
, {"async_threads", t(range(1, 1024), "vm_args.+A", undefined)}
|
||||||
, {"process_limit", t(integer(), "vm_args.+P", undefined)}
|
, {"process_limit", t(integer(), "vm_args.+P", undefined)}
|
||||||
|
@ -163,7 +177,7 @@ fields("log") ->
|
||||||
, {"chars_limit", t(integer(), undefined, -1)}
|
, {"chars_limit", t(integer(), undefined, -1)}
|
||||||
, {"supervisor_reports", t(union([error, progress]), undefined, error)}
|
, {"supervisor_reports", t(union([error, progress]), undefined, error)}
|
||||||
, {"max_depth", t(union([infinity, integer()]),
|
, {"max_depth", t(union([infinity, integer()]),
|
||||||
"kernel.error_logger_format_depth", 20)}
|
"kernel.error_logger_format_depth", 80)}
|
||||||
, {"formatter", t(union([text, json]), undefined, text)}
|
, {"formatter", t(union([text, json]), undefined, text)}
|
||||||
, {"single_line", t(boolean(), undefined, true)}
|
, {"single_line", t(boolean(), undefined, true)}
|
||||||
, {"rotation", ref("rotation")}
|
, {"rotation", ref("rotation")}
|
||||||
|
@ -467,8 +481,11 @@ fields("telemetry") ->
|
||||||
[ {"enabled", t(boolean(), undefined, false)}
|
[ {"enabled", t(boolean(), undefined, false)}
|
||||||
, {"url", t(string(), undefined, "https://telemetry-emqx-io.bigpar.vercel.app/api/telemetry")}
|
, {"url", t(string(), undefined, "https://telemetry-emqx-io.bigpar.vercel.app/api/telemetry")}
|
||||||
, {"report_interval", t(duration_s(), undefined, "7d")}
|
, {"report_interval", t(duration_s(), undefined, "7d")}
|
||||||
].
|
];
|
||||||
|
|
||||||
|
fields(ExtraField) ->
|
||||||
|
Mod = list_to_atom(ExtraField),
|
||||||
|
Mod:fields(ExtraField).
|
||||||
|
|
||||||
translations() -> ["ekka", "vm_args", "gen_rpc", "kernel", "emqx"].
|
translations() -> ["ekka", "vm_args", "gen_rpc", "kernel", "emqx"].
|
||||||
|
|
||||||
|
|
|
@ -22,14 +22,44 @@
|
||||||
, resource_id_to_name/1
|
, resource_id_to_name/1
|
||||||
, list_bridges/0
|
, list_bridges/0
|
||||||
, is_bridge/1
|
, is_bridge/1
|
||||||
|
, config_key_path/0
|
||||||
|
, update_config/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export([structs/0, fields/1]).
|
||||||
|
|
||||||
|
%%======================================================================================
|
||||||
|
%% Hocon Schema Definitions
|
||||||
|
|
||||||
|
-define(BRIDGE_FIELDS(T),
|
||||||
|
[{name, hoconsc:t(typerefl:binary())},
|
||||||
|
{type, hoconsc:t(typerefl:atom(T))},
|
||||||
|
{config, hoconsc:t(hoconsc:ref(list_to_atom("emqx_connector_"++atom_to_list(T)), ""))}]).
|
||||||
|
|
||||||
|
-define(TYPES, [mysql, pgsql, mongo, redis, ldap]).
|
||||||
|
-define(BRIDGES, [hoconsc:ref(?MODULE, T) || T <- ?TYPES]).
|
||||||
|
|
||||||
|
structs() -> ["emqx_data_bridge"].
|
||||||
|
|
||||||
|
fields("emqx_data_bridge") ->
|
||||||
|
[{bridges, #{type => hoconsc:array(hoconsc:union(?BRIDGES)),
|
||||||
|
default => []}}];
|
||||||
|
|
||||||
|
fields(mysql) -> ?BRIDGE_FIELDS(mysql);
|
||||||
|
fields(pgsql) -> ?BRIDGE_FIELDS(pgsql);
|
||||||
|
fields(mongo) -> ?BRIDGE_FIELDS(mongo);
|
||||||
|
fields(redis) -> ?BRIDGE_FIELDS(redis);
|
||||||
|
fields(ldap) -> ?BRIDGE_FIELDS(ldap).
|
||||||
|
|
||||||
|
%%======================================================================================
|
||||||
|
|
||||||
load_bridges() ->
|
load_bridges() ->
|
||||||
ConfFile = filename:join([emqx:get_env(plugins_etc_dir), ?MODULE]) ++ ".conf",
|
% ConfFile = filename:join([emqx:get_env(plugins_etc_dir), ?MODULE]) ++ ".conf",
|
||||||
{ok, RawConfig} = hocon:load(ConfFile, #{format => richmap}),
|
% {ok, RawConfig} = hocon:load(ConfFile, #{format => richmap}),
|
||||||
#{emqx_data_bridge := #{bridges := Bridges}} =
|
% #{emqx_data_bridge := #{bridges := Bridges}} =
|
||||||
hocon_schema:check(emqx_data_bridge_schema, RawConfig,
|
% hocon_schema:check(emqx_data_bridge_schema, RawConfig,
|
||||||
#{atom_key => true, return_plain => true}),
|
% #{atom_key => true, return_plain => true}),
|
||||||
|
Bridges = emqx_config:get([emqx_data_bridge, bridges], []),
|
||||||
emqx_data_bridge_monitor:ensure_all_started(Bridges).
|
emqx_data_bridge_monitor:ensure_all_started(Bridges).
|
||||||
|
|
||||||
resource_type(mysql) -> emqx_connector_mysql;
|
resource_type(mysql) -> emqx_connector_mysql;
|
||||||
|
@ -57,3 +87,9 @@ is_bridge(#{id := <<"bridge:", _/binary>>}) ->
|
||||||
true;
|
true;
|
||||||
is_bridge(_Data) ->
|
is_bridge(_Data) ->
|
||||||
false.
|
false.
|
||||||
|
|
||||||
|
config_key_path() ->
|
||||||
|
[emqx_data_bridge, bridges].
|
||||||
|
|
||||||
|
update_config(ConfigReq) ->
|
||||||
|
emqx_config_handler:update_config(config_key_path(), ConfigReq).
|
||||||
|
|
|
@ -57,6 +57,8 @@
|
||||||
, delete_bridge/2
|
, delete_bridge/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-define(BRIDGE(N, T, C), #{<<"name">> => N, <<"type">> => T, <<"config">> => C}).
|
||||||
|
|
||||||
list_bridges(_Binding, _Params) ->
|
list_bridges(_Binding, _Params) ->
|
||||||
{200, #{code => 0, data => [format_api_reply(Data) ||
|
{200, #{code => 0, data => [format_api_reply(Data) ||
|
||||||
Data <- emqx_data_bridge:list_bridges()]}}.
|
Data <- emqx_data_bridge:list_bridges()]}}.
|
||||||
|
@ -74,10 +76,9 @@ create_bridge(#{name := Name}, Params) ->
|
||||||
BridgeType = proplists:get_value(<<"type">>, Params),
|
BridgeType = proplists:get_value(<<"type">>, Params),
|
||||||
case emqx_resource:check_and_create(
|
case emqx_resource:check_and_create(
|
||||||
emqx_data_bridge:name_to_resource_id(Name),
|
emqx_data_bridge:name_to_resource_id(Name),
|
||||||
emqx_data_bridge:resource_type(BridgeType), Config) of
|
emqx_data_bridge:resource_type(atom(BridgeType)), maps:from_list(Config)) of
|
||||||
{ok, Data} ->
|
{ok, Data} ->
|
||||||
update_config(),
|
update_config_and_reply(Name, BridgeType, Config, Data);
|
||||||
{200, #{code => 0, data => format_api_reply(emqx_resource_api:format_data(Data))}};
|
|
||||||
{error, already_created} ->
|
{error, already_created} ->
|
||||||
{400, #{code => 102, message => <<"bridge already created: ", Name/binary>>}};
|
{400, #{code => 102, message => <<"bridge already created: ", Name/binary>>}};
|
||||||
{error, Reason0} ->
|
{error, Reason0} ->
|
||||||
|
@ -91,10 +92,9 @@ update_bridge(#{name := Name}, Params) ->
|
||||||
BridgeType = proplists:get_value(<<"type">>, Params),
|
BridgeType = proplists:get_value(<<"type">>, Params),
|
||||||
case emqx_resource:check_and_update(
|
case emqx_resource:check_and_update(
|
||||||
emqx_data_bridge:name_to_resource_id(Name),
|
emqx_data_bridge:name_to_resource_id(Name),
|
||||||
emqx_data_bridge:resource_type(BridgeType), Config, []) of
|
emqx_data_bridge:resource_type(atom(BridgeType)), maps:from_list(Config), []) of
|
||||||
{ok, Data} ->
|
{ok, Data} ->
|
||||||
update_config(),
|
update_config_and_reply(Name, BridgeType, Config, Data);
|
||||||
{200, #{code => 0, data => format_api_reply(emqx_resource_api:format_data(Data))}};
|
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
{400, #{code => 102, message => <<"bridge not_found: ", Name/binary>>}};
|
{400, #{code => 102, message => <<"bridge not_found: ", Name/binary>>}};
|
||||||
{error, Reason0} ->
|
{error, Reason0} ->
|
||||||
|
@ -105,9 +105,7 @@ update_bridge(#{name := Name}, Params) ->
|
||||||
|
|
||||||
delete_bridge(#{name := Name}, _Params) ->
|
delete_bridge(#{name := Name}, _Params) ->
|
||||||
case emqx_resource:remove(emqx_data_bridge:name_to_resource_id(Name)) of
|
case emqx_resource:remove(emqx_data_bridge:name_to_resource_id(Name)) of
|
||||||
ok ->
|
ok -> delete_config_and_reply(Name);
|
||||||
update_config(),
|
|
||||||
{200, #{code => 0, data => #{}}};
|
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
|
{500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
|
||||||
end.
|
end.
|
||||||
|
@ -117,12 +115,28 @@ format_api_reply(#{resource_type := Type, id := Id, config := Conf, status := St
|
||||||
name => emqx_data_bridge:resource_id_to_name(Id),
|
name => emqx_data_bridge:resource_id_to_name(Id),
|
||||||
config => Conf, status => Status}.
|
config => Conf, status => Status}.
|
||||||
|
|
||||||
format_conf(#{resource_type := Type, id := Id, config := Conf}) ->
|
% format_conf(#{resource_type := Type, id := Id, config := Conf}) ->
|
||||||
#{type => Type, name => emqx_data_bridge:resource_id_to_name(Id),
|
% #{type => Type, name => emqx_data_bridge:resource_id_to_name(Id),
|
||||||
config => Conf}.
|
% config => Conf}.
|
||||||
|
|
||||||
get_all_configs() ->
|
% get_all_configs() ->
|
||||||
[format_conf(Data) || Data <- emqx_data_bridge:list_bridges()].
|
% [format_conf(Data) || Data <- emqx_data_bridge:list_bridges()].
|
||||||
|
|
||||||
update_config() ->
|
update_config_and_reply(Name, BridgeType, Config, Data) ->
|
||||||
emqx_config_handler:update_config(emqx_data_bridge_config_handler, get_all_configs()).
|
case emqx_data_bridge:update_config({update, ?BRIDGE(Name, BridgeType, Config)}) of
|
||||||
|
ok ->
|
||||||
|
{200, #{code => 0, data => format_api_reply(
|
||||||
|
emqx_resource_api:format_data(Data))}};
|
||||||
|
{error, Reason} ->
|
||||||
|
{500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
|
||||||
|
end.
|
||||||
|
|
||||||
|
delete_config_and_reply(Name) ->
|
||||||
|
case emqx_data_bridge:update_config({delete, Name}) of
|
||||||
|
ok -> {200, #{code => 0, data => #{}}};
|
||||||
|
{error, Reason} ->
|
||||||
|
{500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
|
||||||
|
end.
|
||||||
|
|
||||||
|
atom(B) when is_binary(B) ->
|
||||||
|
list_to_existing_atom(binary_to_list(B)).
|
||||||
|
|
|
@ -17,14 +17,26 @@
|
||||||
|
|
||||||
-behaviour(application).
|
-behaviour(application).
|
||||||
|
|
||||||
-export([start/2, stop/1]).
|
-behaviour(emqx_config_handler).
|
||||||
|
|
||||||
|
-export([start/2, stop/1, handle_update_config/2]).
|
||||||
|
|
||||||
start(_StartType, _StartArgs) ->
|
start(_StartType, _StartArgs) ->
|
||||||
{ok, Sup} = emqx_data_bridge_sup:start_link(),
|
{ok, Sup} = emqx_data_bridge_sup:start_link(),
|
||||||
ok = emqx_data_bridge:load_bridges(),
|
ok = emqx_data_bridge:load_bridges(),
|
||||||
|
emqx_config_handler:add_handler(emqx_data_bridge:config_key_path(), ?MODULE),
|
||||||
{ok, Sup}.
|
{ok, Sup}.
|
||||||
|
|
||||||
stop(_State) ->
|
stop(_State) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%% internal functions
|
%% internal functions
|
||||||
|
handle_update_config({update, Bridge = #{<<"name">> := Name}}, OldConf) ->
|
||||||
|
[Bridge | remove_bridge(Name, OldConf)];
|
||||||
|
handle_update_config({delete, Name}, OldConf) ->
|
||||||
|
remove_bridge(Name, OldConf).
|
||||||
|
|
||||||
|
remove_bridge(_Name, undefined) ->
|
||||||
|
[];
|
||||||
|
remove_bridge(Name, OldConf) ->
|
||||||
|
[B || B = #{<<"name">> := Name0} <- OldConf, Name0 =/= Name].
|
||||||
|
|
|
@ -1,23 +0,0 @@
|
||||||
-module(emqx_data_bridge_schema).
|
|
||||||
|
|
||||||
-export([structs/0, fields/1]).
|
|
||||||
|
|
||||||
-define(BRIDGE_FIELDS(T),
|
|
||||||
[{name, hoconsc:t(typerefl:binary())},
|
|
||||||
{type, hoconsc:t(typerefl:atom(T))},
|
|
||||||
{config, hoconsc:t(hoconsc:ref(list_to_atom("emqx_connector_"++atom_to_list(T)), ""))}]).
|
|
||||||
|
|
||||||
-define(TYPES, [mysql, pgsql, mongo, redis, ldap]).
|
|
||||||
-define(BRIDGES, [hoconsc:ref(T) || T <- ?TYPES]).
|
|
||||||
|
|
||||||
structs() -> [emqx_data_bridge].
|
|
||||||
|
|
||||||
fields(emqx_data_bridge) ->
|
|
||||||
[{bridges, #{type => hoconsc:array(hoconsc:union(?BRIDGES)),
|
|
||||||
default => []}}];
|
|
||||||
|
|
||||||
fields(mysql) -> ?BRIDGE_FIELDS(mysql);
|
|
||||||
fields(pgsql) -> ?BRIDGE_FIELDS(pgsql);
|
|
||||||
fields(mongo) -> ?BRIDGE_FIELDS(mongo);
|
|
||||||
fields(redis) -> ?BRIDGE_FIELDS(redis);
|
|
||||||
fields(ldap) -> ?BRIDGE_FIELDS(ldap).
|
|
|
@ -35,13 +35,8 @@ init([]) ->
|
||||||
start => {emqx_data_bridge_monitor, start_link, []},
|
start => {emqx_data_bridge_monitor, start_link, []},
|
||||||
restart => permanent,
|
restart => permanent,
|
||||||
type => worker,
|
type => worker,
|
||||||
modules => [emqx_data_bridge_monitor]},
|
modules => [emqx_data_bridge_monitor]}
|
||||||
emqx_config_handler:child_spec(emqx_data_bridge_config_handler, emqx_config_handler,
|
|
||||||
config_key_path())
|
|
||||||
],
|
],
|
||||||
{ok, {SupFlags, ChildSpecs}}.
|
{ok, {SupFlags, ChildSpecs}}.
|
||||||
|
|
||||||
%% internal functions
|
%% internal functions
|
||||||
|
|
||||||
config_key_path() ->
|
|
||||||
[emqx_data_bridge, bridges].
|
|
||||||
|
|
|
@ -84,7 +84,7 @@ listener_name(Proto) ->
|
||||||
list_to_atom(atom_to_list(Proto) ++ ":management").
|
list_to_atom(atom_to_list(Proto) ++ ":management").
|
||||||
|
|
||||||
http_handlers() ->
|
http_handlers() ->
|
||||||
Apps = [ App || {App, _, _} <- application:which_applications(),
|
Apps = [ App || {App, _, _} <- application:loaded_applications(),
|
||||||
case re:run(atom_to_list(App), "^emqx") of
|
case re:run(atom_to_list(App), "^emqx") of
|
||||||
{match,[{0,4}]} -> true;
|
{match,[{0,4}]} -> true;
|
||||||
_ -> false
|
_ -> false
|
||||||
|
|
|
@ -77,7 +77,7 @@ emqx_resource:query(ResourceID, {sql, SQL}).
|
||||||
<<"keyfile">> => [],<<"password">> => "public",
|
<<"keyfile">> => [],<<"password">> => "public",
|
||||||
<<"pool_size">> => 1,
|
<<"pool_size">> => 1,
|
||||||
<<"server">> => {{127,0,0,1},3306},
|
<<"server">> => {{127,0,0,1},3306},
|
||||||
<<"ssl">> => false,<<"user">> => "root",
|
<<"ssl">> => false,<<"username">> => "root",
|
||||||
<<"verify">> => false},
|
<<"verify">> => false},
|
||||||
id => <<"bridge:mysql-def">>,mod => emqx_connector_mysql,
|
id => <<"bridge:mysql-def">>,mod => emqx_connector_mysql,
|
||||||
state => #{poolname => 'bridge:mysql-def'},
|
state => #{poolname => 'bridge:mysql-def'},
|
||||||
|
@ -107,7 +107,7 @@ BridgeMySQL='{
|
||||||
"status": "started",
|
"status": "started",
|
||||||
"config": {
|
"config": {
|
||||||
"verify": false,
|
"verify": false,
|
||||||
"user": "root",
|
"username": "root",
|
||||||
"ssl": false,
|
"ssl": false,
|
||||||
"server": "127.0.0.1:3306",
|
"server": "127.0.0.1:3306",
|
||||||
"pool_size": 1,
|
"pool_size": 1,
|
||||||
|
@ -135,7 +135,7 @@ BridgeMySQL='{
|
||||||
"status": "started",
|
"status": "started",
|
||||||
"config": {
|
"config": {
|
||||||
"verify": false,
|
"verify": false,
|
||||||
"user": "root",
|
"username": "root",
|
||||||
"ssl": false,
|
"ssl": false,
|
||||||
"server": "127.0.0.1:3306",
|
"server": "127.0.0.1:3306",
|
||||||
"pool_size": 2,
|
"pool_size": 2,
|
||||||
|
|
|
@ -16,6 +16,10 @@
|
||||||
|
|
||||||
-module(emqx_telemetry).
|
-module(emqx_telemetry).
|
||||||
|
|
||||||
|
-include_lib("typerefl/include/types.hrl").
|
||||||
|
|
||||||
|
-behaviour(hocon_schema).
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
|
@ -36,6 +40,9 @@
|
||||||
, stop/0
|
, stop/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export([ structs/0
|
||||||
|
, fields/1]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([ init/1
|
-export([ init/1
|
||||||
, handle_call/3
|
, handle_call/3
|
||||||
|
@ -90,6 +97,12 @@
|
||||||
|
|
||||||
-define(TELEMETRY, emqx_telemetry).
|
-define(TELEMETRY, emqx_telemetry).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
structs() -> ["emqx_telemetry"].
|
||||||
|
|
||||||
|
fields("emqx_telemetry") ->
|
||||||
|
[{enabled, emqx_schema:t(boolean(), undefined, false)}].
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Mnesia bootstrap
|
%% Mnesia bootstrap
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -25,13 +25,6 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
start(_Type, _Args) ->
|
start(_Type, _Args) ->
|
||||||
%% TODO
|
|
||||||
%% After the relevant code for building hocon configuration will be deleted
|
|
||||||
%% Get the configuration using emqx_config:get
|
|
||||||
ConfFile = filename:join([emqx:get_env(plugins_etc_dir), ?APP]) ++ ".conf",
|
|
||||||
{ok, RawConfig} = hocon:load(ConfFile),
|
|
||||||
Config = hocon_schema:check_plain(emqx_telemetry_schema, RawConfig, #{atom_key => true}),
|
|
||||||
emqx_config_handler:update_config(emqx_config_handler, Config),
|
|
||||||
Enabled = emqx_config:get([?APP, enabled], true),
|
Enabled = emqx_config:get([?APP, enabled], true),
|
||||||
emqx_telemetry_sup:start_link([{enabled, Enabled}]).
|
emqx_telemetry_sup:start_link([{enabled, Enabled}]).
|
||||||
|
|
||||||
|
|
|
@ -1,13 +0,0 @@
|
||||||
-module(emqx_telemetry_schema).
|
|
||||||
|
|
||||||
-include_lib("typerefl/include/types.hrl").
|
|
||||||
|
|
||||||
-behaviour(hocon_schema).
|
|
||||||
|
|
||||||
-export([ structs/0
|
|
||||||
, fields/1]).
|
|
||||||
|
|
||||||
structs() -> ["emqx_telemetry"].
|
|
||||||
|
|
||||||
fields("emqx_telemetry") ->
|
|
||||||
[{enabled, emqx_schema:t(boolean(), undefined, false)}].
|
|
|
@ -53,7 +53,7 @@
|
||||||
, {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1
|
, {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1
|
||||||
, {getopt, "1.0.1"}
|
, {getopt, "1.0.1"}
|
||||||
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.13.0"}}}
|
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.13.0"}}}
|
||||||
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.7.0"}}}
|
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.9.0"}}}
|
||||||
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.2.1"}}}
|
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.2.1"}}}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
|
|
@ -104,7 +104,8 @@ test_plugins() ->
|
||||||
|
|
||||||
test_deps() ->
|
test_deps() ->
|
||||||
[ {bbmustache, "1.10.0"}
|
[ {bbmustache, "1.10.0"}
|
||||||
, {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "2.0.0"}}}
|
%, {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "2.0.0"}}}
|
||||||
|
, {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {branch, "hocon"}}}
|
||||||
, meck
|
, meck
|
||||||
].
|
].
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue