emqx/apps/emqx_exhook/src/emqx_exhook_mgr.erl

636 lines
18 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2021-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc Manage the server status and reload strategy
-module(emqx_exhook_mgr).
-behaviour(gen_server).
-include("emqx_exhook.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% APIs
-export([start_link/0]).
%% Mgmt API
-export([
list/0,
lookup/1,
enable/1,
disable/1,
server_info/1,
all_servers_info/0,
server_hooks_metrics/1
]).
%% Helper funcs
-export([
running/0,
server/1,
hooks/1,
init_ref_counter_table/0
]).
-export([
update_config/2,
pre_config_update/3,
post_config_update/5
]).
%% gen_server callbacks
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
-export([roots/0]).
%% Running servers
-type state() :: #{servers := servers()}.
-type server_options() :: map().
-type server_name() :: binary().
-type status() ::
connected
| connecting
| disconnected
| disabled.
-type server() :: #{
status := status(),
timer := reference(),
order := integer(),
%% include the content of server_options
atom() => any()
}.
-type servers() :: #{server_name() => server()}.
-type position() ::
front
| rear
| {before, binary()}
| {'after', binary()}.
-define(DEFAULT_TIMEOUT, 60000).
-define(REFRESH_INTERVAL, timer:seconds(5)).
-export_type([servers/0, server/0]).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
-spec start_link() ->
ignore
| {ok, pid()}
| {error, any()}.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
list() ->
call(list).
-spec lookup(server_name()) -> not_found | server().
lookup(Name) ->
call({lookup, Name}).
enable(Name) ->
update_config([exhook, servers], {enable, Name, true}).
disable(Name) ->
update_config([exhook, servers], {enable, Name, false}).
server_info(Name) ->
call({?FUNCTION_NAME, Name}).
all_servers_info() ->
call(?FUNCTION_NAME).
server_hooks_metrics(Name) ->
call({?FUNCTION_NAME, Name}).
call(Req) ->
gen_server:call(?MODULE, Req, ?DEFAULT_TIMEOUT).
init_ref_counter_table() ->
_ = ets:new(?HOOKS_REF_COUNTER, [named_table, public]).
%%=====================================================================
%% Hocon schema
roots() ->
emqx_exhook_schema:server_config().
update_config(KeyPath, UpdateReq) ->
case emqx_conf:update(KeyPath, UpdateReq, #{override_to => cluster}) of
{ok, UpdateResult} ->
#{post_config_update := #{?MODULE := Result}} = UpdateResult,
{ok, Result};
Error ->
Error
end.
pre_config_update(_, {add, #{<<"name">> := Name} = Conf}, OldConf) ->
case lists:any(fun(#{<<"name">> := ExistedName}) -> ExistedName =:= Name end, OldConf) of
true ->
throw(already_exists);
false ->
NConf = maybe_write_certs(Conf),
{ok, OldConf ++ [NConf]}
end;
pre_config_update(_, {update, Name, Conf}, OldConf) ->
case replace_conf(Name, fun(_) -> Conf end, OldConf) of
not_found -> throw(not_found);
NewConf -> {ok, lists:map(fun maybe_write_certs/1, NewConf)}
end;
pre_config_update(_, {delete, ToDelete}, OldConf) ->
case do_delete(ToDelete, OldConf) of
not_found -> throw(not_found);
NewConf -> {ok, NewConf}
end;
pre_config_update(_, {move, Name, Position}, OldConf) ->
case do_move(Name, Position, OldConf) of
not_found -> throw(not_found);
NewConf -> {ok, NewConf}
end;
pre_config_update(_, {enable, Name, Enable}, OldConf) ->
case
replace_conf(
Name,
fun(Conf) -> Conf#{<<"enable">> => Enable} end,
OldConf
)
of
not_found -> throw(not_found);
NewConf -> {ok, lists:map(fun maybe_write_certs/1, NewConf)}
end.
post_config_update(_KeyPath, UpdateReq, NewConf, OldConf, _AppEnvs) ->
Result = call({update_config, UpdateReq, NewConf}),
try_clear_ssl_files(UpdateReq, NewConf, OldConf),
{ok, Result}.
%%=====================================================================
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([]) ->
process_flag(trap_exit, true),
emqx_conf:add_handler([exhook, servers], ?MODULE),
ServerL = emqx:get_config([exhook, servers]),
Servers = load_all_servers(ServerL),
Servers2 = reorder(ServerL, Servers),
refresh_tick(),
{ok, #{servers => Servers2}}.
-spec load_all_servers(list(server_options())) -> servers().
load_all_servers(ServerL) ->
load_all_servers(ServerL, #{}).
load_all_servers([#{name := Name} = Options | More], Servers) ->
{_, Server} = do_load_server(options_to_server(Options)),
load_all_servers(More, Servers#{Name => Server});
load_all_servers([], Servers) ->
Servers.
handle_call(
list,
_From,
State = #{servers := Servers}
) ->
Infos = get_servers_info(Servers),
OrderServers = sort_name_by_order(Infos, Servers),
{reply, OrderServers, State};
handle_call(
{update_config, {move, _Name, _Position}, NewConfL},
_From,
#{servers := Servers} = State
) ->
Servers2 = reorder(NewConfL, Servers),
{reply, ok, State#{servers := Servers2}};
handle_call({update_config, {delete, ToDelete}, _}, _From, State) ->
emqx_exhook_metrics:on_server_deleted(ToDelete),
#{servers := Servers} = State2 = do_unload_server(ToDelete, State),
Servers2 = maps:remove(ToDelete, Servers),
{reply, ok, update_servers(Servers2, State2)};
handle_call(
{update_config, {add, RawConf}, NewConfL},
_From,
#{servers := Servers} = State
) ->
{_, #{name := Name} = Conf} = emqx_config:check_config(?MODULE, RawConf),
{Result, Server} = do_load_server(options_to_server(Conf)),
Servers2 = Servers#{Name => Server},
Servers3 = reorder(NewConfL, Servers2),
{reply, Result, State#{servers := Servers3}};
handle_call({lookup, Name}, _From, State) ->
{reply, where_is_server(Name, State), State};
handle_call({update_config, {update, Name, _Conf}, NewConfL}, _From, State) ->
{Result, State2} = restart_server(Name, NewConfL, State),
{reply, Result, State2};
handle_call({update_config, {enable, Name, _Enable}, NewConfL}, _From, State) ->
{Result, State2} = restart_server(Name, NewConfL, State),
{reply, Result, State2};
handle_call({server_info, Name}, _From, State) ->
case where_is_server(Name, State) of
not_found ->
Result = not_found;
#{status := Status} ->
HooksMetrics = emqx_exhook_metrics:server_metrics(Name),
Result = #{
status => Status,
metrics => HooksMetrics
}
end,
{reply, Result, State};
handle_call(
all_servers_info,
_From,
#{servers := Servers} = State
) ->
Status = maps:map(fun(_Name, #{status := Status}) -> Status end, Servers),
Metrics = emqx_exhook_metrics:servers_metrics(),
Result = #{
status => Status,
metrics => Metrics
},
{reply, Result, State};
handle_call({server_hooks_metrics, Name}, _From, State) ->
Result = emqx_exhook_metrics:hooks_metrics(Name),
{reply, Result, State};
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({timeout, _Ref, {reload, Name}}, State) ->
{_, NState} = do_reload_server(Name, State),
{noreply, NState};
handle_info(refresh_tick, State) ->
refresh_tick(),
emqx_exhook_metrics:update(?REFRESH_INTERVAL),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
terminate(Reason, State = #{servers := Servers}) ->
_ = unload_exhooks(),
_ = maps:fold(
fun(Name, _, AccIn) ->
do_unload_server(Name, AccIn)
end,
State,
Servers
),
?tp(info, exhook_mgr_terminated, #{reason => Reason, servers => Servers}),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
unload_exhooks() ->
[
emqx_hooks:del(Name, {M, F})
|| {Name, {M, F, _A}} <- ?ENABLED_HOOKS
].
do_load_server(#{name := Name} = Server) ->
case emqx_exhook_server:load(Name, Server) of
{ok, ServerState} ->
save(Name, ServerState),
{ok, Server#{status => connected}};
disable ->
{ok, set_disable(Server)};
{ErrorType, Reason} = Error ->
?SLOG(
error,
#{
msg => "failed_to_load_exhook_callback_server",
reason => Reason,
name => Name
}
),
case ErrorType of
load_error ->
{ok, ensure_reload_timer(Server)};
_ ->
{Error, Server#{status => disconnected}}
end
end.
do_load_server(#{name := Name} = Server, #{servers := Servers} = State) ->
{Result, Server2} = do_load_server(Server),
{Result, update_servers(Servers#{Name => Server2}, State)}.
-spec do_reload_server(server_name(), state()) ->
{{error, term()}, state()}
| {ok, state()}.
do_reload_server(Name, State) ->
case where_is_server(Name, State) of
not_found ->
{{error, not_found}, State};
#{timer := undefined} ->
{ok, State};
Server ->
clean_reload_timer(Server),
do_load_server(Server, State)
end.
-spec do_unload_server(server_name(), state()) -> state().
do_unload_server(Name, #{servers := Servers} = State) ->
case where_is_server(Name, State) of
not_found ->
State;
#{status := disabled} ->
State;
Server ->
clean_reload_timer(Server),
case server(Name) of
undefined ->
State;
Service ->
ok = unsave(Name),
ok = emqx_exhook_server:unload(Service),
Servers2 = Servers#{Name := set_disable(Server)},
State#{servers := Servers2}
end
end.
ensure_reload_timer(#{name := Name, auto_reconnect := Intv} = Server) when is_integer(Intv) ->
clean_reload_timer(Server),
Ref = erlang:start_timer(Intv, self(), {reload, Name}),
Server#{status := connecting, timer := Ref};
ensure_reload_timer(Server) ->
Server#{status := disconnected}.
-spec clean_reload_timer(server()) -> ok.
clean_reload_timer(#{timer := undefined}) ->
ok;
clean_reload_timer(#{timer := Timer}) ->
_ = erlang:cancel_timer(Timer),
ok.
-spec do_move(binary(), position(), list(server_options())) ->
not_found | list(server_options()).
do_move(Name, Position, ConfL) ->
move(ConfL, Name, Position, []).
move([#{<<"name">> := Name} = Server | T], Name, Position, HeadL) ->
move_to(Position, Server, lists:reverse(HeadL) ++ T);
move([Server | T], Name, Position, HeadL) ->
move(T, Name, Position, [Server | HeadL]);
move([], _Name, _Position, _HeadL) ->
not_found.
move_to(?CMD_MOVE_FRONT, Server, ServerL) ->
[Server | ServerL];
move_to(?CMD_MOVE_REAR, Server, ServerL) ->
ServerL ++ [Server];
move_to(Position, Server, ServerL) ->
move_to(ServerL, Position, Server, []).
move_to([#{<<"name">> := Name} | _] = T, ?CMD_MOVE_BEFORE(Name), Server, HeadL) ->
lists:reverse(HeadL) ++ [Server | T];
move_to([#{<<"name">> := Name} = H | T], ?CMD_MOVE_AFTER(Name), Server, HeadL) ->
lists:reverse(HeadL) ++ [H, Server | T];
move_to([H | T], Position, Server, HeadL) ->
move_to(T, Position, Server, [H | HeadL]);
move_to([], _Position, _Server, _HeadL) ->
not_found.
-spec do_delete(binary(), list(server_options())) ->
not_found | list(server_options()).
do_delete(ToDelete, OldConf) ->
case lists:any(fun(#{<<"name">> := ExistedName}) -> ExistedName =:= ToDelete end, OldConf) of
true ->
lists:filter(
fun(#{<<"name">> := Name}) -> Name =/= ToDelete end,
OldConf
);
false ->
not_found
end.
-spec reorder(list(server_options()), servers()) -> servers().
reorder(ServerL, Servers) ->
Orders = reorder(ServerL, 1, Servers),
update_order(Orders),
Orders.
reorder([#{name := Name} | T], Order, Servers) ->
reorder(T, Order + 1, update_order(Name, Servers, Order));
reorder([], _Order, Servers) ->
Servers.
update_order(Name, Servers, Order) ->
Server = maps:get(Name, Servers),
Servers#{Name := Server#{order := Order}}.
get_servers_info(Svrs) ->
Fold = fun(Name, Conf, Acc) ->
[
maps:merge(Conf, #{hooks => hooks(Name)})
| Acc
]
end,
maps:fold(Fold, [], Svrs).
where_is_server(Name, #{servers := Servers}) ->
maps:get(Name, Servers, not_found).
-type replace_fun() :: fun((server_options()) -> server_options()).
-spec replace_conf(binary(), replace_fun(), list(server_options())) ->
not_found
| list(server_options()).
replace_conf(Name, ReplaceFun, ConfL) ->
replace_conf(ConfL, Name, ReplaceFun, []).
replace_conf([#{<<"name">> := Name} = H | T], Name, ReplaceFun, HeadL) ->
New = ReplaceFun(H),
lists:reverse(HeadL) ++ [New | T];
replace_conf([H | T], Name, ReplaceFun, HeadL) ->
replace_conf(T, Name, ReplaceFun, [H | HeadL]);
replace_conf([], _, _, _) ->
not_found.
-spec restart_server(binary(), list(server_options()), state()) ->
{ok, state()}
| {{error, term()}, state()}.
restart_server(Name, ConfL, State) ->
case lists:search(fun(#{name := CName}) -> CName =:= Name end, ConfL) of
false ->
{{error, not_found}, State};
{value, Conf} ->
case where_is_server(Name, State) of
not_found ->
{{error, not_found}, State};
Server ->
Server2 = maps:merge(Server, Conf),
State2 = do_unload_server(Name, State),
do_load_server(Server2, State2)
end
end.
sort_name_by_order(Names, Orders) ->
lists:sort(
fun
(A, B) when is_binary(A) ->
emqx_map_lib:deep_get([A, order], Orders) <
emqx_map_lib:deep_get([B, order], Orders);
(#{name := A}, #{name := B}) ->
emqx_map_lib:deep_get([A, order], Orders) <
emqx_map_lib:deep_get([B, order], Orders)
end,
Names
).
refresh_tick() ->
erlang:send_after(?REFRESH_INTERVAL, self(), ?FUNCTION_NAME).
options_to_server(Options) ->
maps:merge(Options, #{status => disconnected, timer => undefined, order => 0}).
update_servers(Servers, State) ->
update_order(Servers),
State#{servers := Servers}.
set_disable(Server) ->
Server#{status := disabled, timer := undefined}.
%%--------------------------------------------------------------------
%% Server state persistent
save(Name, ServerState) ->
Saved = persistent_term:get(?APP, []),
persistent_term:put(?APP, lists:reverse([Name | Saved])),
persistent_term:put({?APP, Name}, ServerState).
unsave(Name) ->
case persistent_term:get(?APP, []) of
[] ->
ok;
Saved ->
case lists:member(Name, Saved) of
false ->
ok;
true ->
persistent_term:put(?APP, lists:delete(Name, Saved))
end
end,
persistent_term:erase({?APP, Name}),
ok.
running() ->
persistent_term:get(?APP, []).
server(Name) ->
case persistent_term:get({?APP, Name}, undefined) of
undefined -> undefined;
Service -> Service
end.
update_order(Servers) ->
Running = running(),
Orders = maps:filter(
fun
(Name, #{status := connected}) ->
lists:member(Name, Running);
(_, _) ->
false
end,
Servers
),
Running2 = sort_name_by_order(Running, Orders),
persistent_term:put(?APP, Running2).
hooks(Name) ->
case server(Name) of
undefined ->
[];
Service ->
emqx_exhook_server:hooks(Service)
end.
maybe_write_certs(#{<<"name">> := Name} = Conf) ->
case
emqx_tls_lib:ensure_ssl_files(
ssl_file_path(Name), maps:get(<<"ssl">>, Conf, undefined)
)
of
{ok, SSL} ->
new_ssl_source(Conf, SSL);
{error, Reason} ->
?SLOG(error, Reason#{msg => "bad_ssl_config"}),
throw({bad_ssl_config, Reason})
end.
ssl_file_path(Name) ->
filename:join(["exhook", Name]).
new_ssl_source(Source, undefined) ->
Source;
new_ssl_source(Source, SSL) ->
Source#{<<"ssl">> => SSL}.
try_clear_ssl_files({delete, Name}, _NewConf, OldConfs) ->
OldSSL = find_server_ssl_cfg(Name, OldConfs),
emqx_tls_lib:delete_ssl_files(ssl_file_path(Name), undefined, OldSSL);
try_clear_ssl_files({Op, Name, _}, NewConfs, OldConfs) when
Op =:= update; Op =:= enable
->
NewSSL = find_server_ssl_cfg(Name, NewConfs),
OldSSL = find_server_ssl_cfg(Name, OldConfs),
emqx_tls_lib:delete_ssl_files(ssl_file_path(Name), NewSSL, OldSSL);
try_clear_ssl_files(_Req, _NewConf, _OldConf) ->
ok.
search_server_cfg(Name, Confs) ->
lists:search(
fun
(#{name := SvrName}) when SvrName =:= Name ->
true;
(_) ->
false
end,
Confs
).
find_server_ssl_cfg(Name, Confs) ->
case search_server_cfg(Name, Confs) of
{value, Value} ->
maps:get(ssl, Value, undefined);
false ->
undefined
end.