feat(config): update config to the sub config handler

This commit is contained in:
Shawn 2021-06-15 18:47:43 +08:00 committed by turtleDeng
parent ccba714cd6
commit 378a2b7b9e
5 changed files with 66 additions and 37 deletions

View File

@ -20,7 +20,9 @@
-export([ get/0 -export([ get/0
, get/2 , get/2
, put/1 , put/1
, put/2
, deep_get/3 , deep_get/3
, deep_put/3
]). ]).
-spec get() -> term(). -spec get() -> term().
@ -31,6 +33,7 @@ get() ->
get(KeyPath, Default) -> get(KeyPath, Default) ->
deep_get(KeyPath, get(), Default). deep_get(KeyPath, get(), Default).
-spec deep_get([atom()], map(), term()) -> term().
deep_get([], Map, _Default) -> deep_get([], Map, _Default) ->
Map; Map;
deep_get([Key | KeyPath], Map, Default) when is_map(Map) -> deep_get([Key | KeyPath], Map, Default) when is_map(Map) ->
@ -41,5 +44,17 @@ deep_get([Key | KeyPath], Map, Default) when is_map(Map) ->
deep_get([_Key | _KeyPath], _Map, Default) -> deep_get([_Key | _KeyPath], _Map, Default) ->
Default. Default.
-spec put(term()) -> ok.
put(Config) -> put(Config) ->
persistent_term:put(?MODULE, Config). persistent_term:put(?MODULE, Config).
-spec put([atom()], term()) -> ok.
put(KeyPath, Config) ->
put(deep_put(KeyPath, get(), Config)).
-spec deep_put([atom()], map(), term()) -> ok.
deep_put([], Map, Config) when is_map(Map) ->
Config;
deep_put([Key | KeyPath], Map, Config) ->
SubMap = deep_put(KeyPath, maps:get(Key, Map, #{}), Config),
Map#{Key => SubMap}.

View File

@ -21,13 +21,13 @@
%% API functions %% API functions
-export([ start_link/0 -export([ start_link/0
, start_handler/2 , start_handler/3
, update_config/2 , update_config/2
, child_spec/2
]). ]).
%% emqx_config_handler callbacks %% emqx_config_handler callbacks
-export([ handle_update_config/2 -export([ handle_update_config/2
, config_key_path/0
]). ]).
%% gen_server callbacks %% gen_server callbacks
@ -39,24 +39,35 @@
code_change/3]). code_change/3]).
-type config() :: term(). -type config() :: term().
-type config_map() :: #{atom() => config()}. -type config_map() :: #{atom() => config()} | [config_map()].
-type handler_name() :: module(). -type handler_name() :: module().
-type key_path() :: [atom()]. -type key_path() :: [atom()].
%% the path of the config that maintained by the (sub) handler. -optional_callbacks([handle_update_config/2]).
-callback config_key_path() -> key_path().
-callback handle_update_config(config(), config_map()) -> config_map(). -callback handle_update_config(config(), config_map()) -> config_map().
-record(state, {handler_name :: handler_name(), parent :: handler_name()}). -record(state, {
handler_name :: handler_name(),
parent :: handler_name(),
key_path :: key_path()
}).
start_link() -> start_link() ->
start_handler(?MODULE, top). start_handler(?MODULE, top, []).
-spec start_handler(handler_name(), handler_name()) -> -spec start_handler(handler_name(), handler_name(), key_path()) ->
{ok, pid()} | {error, {already_started, pid()}} | {error, term()}. {ok, pid()} | {error, {already_started, pid()}} | {error, term()}.
start_handler(HandlerName, Parent) -> start_handler(HandlerName, Parent, ConfKeyPath) ->
gen_server:start_link({local, HandlerName}, ?MODULE, {HandlerName, Parent}, []). gen_server:start_link({local, HandlerName}, ?MODULE, {HandlerName, Parent, ConfKeyPath}, []).
-spec child_spec(module(), key_path()) -> supervisor:child_spec().
child_spec(Mod, KeyPath) ->
#{id => Mod,
start => {?MODULE, start_handler, [Mod, ?MODULE, KeyPath]},
restart => permanent,
type => worker,
modules => [?MODULE]}.
-spec update_config(handler_name(), config()) -> ok. -spec update_config(handler_name(), config()) -> ok.
update_config(top, Config) -> update_config(top, Config) ->
@ -67,28 +78,29 @@ update_config(Handler, Config) ->
%%============================================================================ %%============================================================================
%% callbacks of emqx_config_handler (the top-level handler) %% callbacks of emqx_config_handler (the top-level handler)
config_key_path() -> [].
handle_update_config(Config, undefined) -> handle_update_config(Config, undefined) ->
handle_update_config(Config, #{}); handle_update_config(Config, #{});
handle_update_config(Config, OldConfigMap) -> handle_update_config(Config, OldConfigMap) ->
%% simply merge the config to the overall config %% simply merge the config to the overall config
maps:merge(OldConfigMap, Config). maps:merge(OldConfigMap, Config).
init({HandlerName, Parent}) -> init({HandlerName, Parent, ConfKeyPath}) ->
{ok, #state{handler_name = HandlerName, parent = Parent}}. {ok, #state{handler_name = HandlerName, parent = Parent, key_path = ConfKeyPath}}.
handle_call(_Request, _From, State) -> handle_call(_Request, _From, State) ->
Reply = ok, Reply = ok,
{reply, Reply, State}. {reply, Reply, State}.
handle_cast({handle_update_config, Config}, #state{handler_name = HandlerName, handle_cast({handle_update_config, Config}, #state{handler_name = HandlerName,
parent = Parent} = State) -> parent = Parent, key_path = ConfKeyPath} = State) ->
%% accumulate the config map of this config handler %% accumulate the config map of this config handler
OldConfigMap = emqx_config:get(HandlerName:config_key_path(), undefined), OldConfigMap = emqx_config:get(ConfKeyPath, undefined),
AccConfig = HandlerName:handle_update_config(Config, OldConfigMap), SubConfigMap = case erlang:function_exported(HandlerName, handle_update_config, 2) of
true -> HandlerName:handle_update_config(Config, OldConfigMap);
false -> wrap_sub_config(ConfKeyPath, Config)
end,
%% then notify the parent handler %% then notify the parent handler
update_config(Parent, AccConfig), update_config(Parent, SubConfigMap),
{noreply, State}; {noreply, State};
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->
@ -114,3 +126,6 @@ save_config_to_disk(ConfigMap) ->
emqx_data_dir() -> emqx_data_dir() ->
%emqx_config:get([node, data_dir]) %emqx_config:get([node, data_dir])
"data". "data".
wrap_sub_config(ConfKeyPath, Config) ->
emqx_config:deep_put(ConfKeyPath, #{}, Config).

View File

@ -15,12 +15,6 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_data_bridge). -module(emqx_data_bridge).
-behaviour(emqx_config_handler).
-export([ config_key_path/0
, handle_update_config/2
]).
-export([ load_bridges/0 -export([ load_bridges/0
, resource_type/1 , resource_type/1
, bridge_type/1 , bridge_type/1
@ -52,14 +46,3 @@ is_bridge(#{id := <<"bridge:", _/binary>>}) ->
true; true;
is_bridge(_Data) -> is_bridge(_Data) ->
false. false.
%%============================================================================
config_key_path() -> [emqx_data_bridge, bridges].
handle_update_config(_Config, _OldConfigMap) ->
[format_conf(Data) || Data <- emqx_data_bridge:list_bridges()].
format_conf(#{resource_type := Type, id := Id, config := Conf}) ->
#{type => Type, name => emqx_data_bridge:resource_id_to_name(Id),
config => Conf}.

View File

@ -76,6 +76,7 @@ create_bridge(#{name := Name}, Params) ->
emqx_data_bridge:name_to_resource_id(Name), emqx_data_bridge:name_to_resource_id(Name),
emqx_data_bridge:resource_type(BridgeType), Config) of emqx_data_bridge:resource_type(BridgeType), Config) of
{ok, Data} -> {ok, Data} ->
emqx_config_handler:update_config(emqx_data_bridge, get_all_configs()),
{200, #{code => 0, data => format_api_reply(emqx_resource_api:format_data(Data))}}; {200, #{code => 0, data => format_api_reply(emqx_resource_api:format_data(Data))}};
{error, already_created} -> {error, already_created} ->
{400, #{code => 102, message => <<"bridge already created: ", Name/binary>>}}; {400, #{code => 102, message => <<"bridge already created: ", Name/binary>>}};
@ -92,6 +93,7 @@ update_bridge(#{name := Name}, Params) ->
emqx_data_bridge:name_to_resource_id(Name), emqx_data_bridge:name_to_resource_id(Name),
emqx_data_bridge:resource_type(BridgeType), Config, []) of emqx_data_bridge:resource_type(BridgeType), Config, []) of
{ok, Data} -> {ok, Data} ->
emqx_config_handler:update_config(emqx_data_bridge, get_all_configs()),
{200, #{code => 0, data => format_api_reply(emqx_resource_api:format_data(Data))}}; {200, #{code => 0, data => format_api_reply(emqx_resource_api:format_data(Data))}};
{error, not_found} -> {error, not_found} ->
{400, #{code => 102, message => <<"bridge not_found: ", Name/binary>>}}; {400, #{code => 102, message => <<"bridge not_found: ", Name/binary>>}};
@ -103,7 +105,9 @@ update_bridge(#{name := Name}, Params) ->
delete_bridge(#{name := Name}, _Params) -> delete_bridge(#{name := Name}, _Params) ->
case emqx_resource:remove(emqx_data_bridge:name_to_resource_id(Name)) of case emqx_resource:remove(emqx_data_bridge:name_to_resource_id(Name)) of
ok -> {200, #{code => 0, data => #{}}}; ok ->
emqx_config_handler:update_config(emqx_data_bridge, get_all_configs()),
{200, #{code => 0, data => #{}}};
{error, Reason} -> {error, Reason} ->
{500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}} {500, #{code => 102, message => emqx_resource_api:stringnify(Reason)}}
end. end.
@ -112,3 +116,10 @@ format_api_reply(#{resource_type := Type, id := Id, config := Conf, status := St
#{type => emqx_data_bridge:bridge_type(Type), #{type => emqx_data_bridge:bridge_type(Type),
name => emqx_data_bridge:resource_id_to_name(Id), name => emqx_data_bridge:resource_id_to_name(Id),
config => Conf, status => Status}. config => Conf, status => Status}.
format_conf(#{resource_type := Type, id := Id, config := Conf}) ->
#{type => Type, name => emqx_data_bridge:resource_id_to_name(Id),
config => Conf}.
get_all_configs() ->
[format_conf(Data) || Data <- emqx_data_bridge:list_bridges()].

View File

@ -35,7 +35,12 @@ init([]) ->
start => {emqx_data_bridge_monitor, start_link, []}, start => {emqx_data_bridge_monitor, start_link, []},
restart => permanent, restart => permanent,
type => worker, type => worker,
modules => [emqx_data_bridge_monitor]}], modules => [emqx_data_bridge_monitor]},
emqx_config_handler:child_spec(emqx_data_bridge_config_handler, config_key_path())
],
{ok, {SupFlags, ChildSpecs}}. {ok, {SupFlags, ChildSpecs}}.
%% internal functions %% internal functions
config_key_path() ->
[emqx_data_bridge, bridges].