Merge pull request #7632 from lafirest/fix/exhook_cfg_update

fix(exhook):  refine the status of the exhook callback server
This commit is contained in:
lafirest 2022-04-18 16:34:42 +08:00 committed by GitHub
commit 1be1ecd979
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 200 additions and 309 deletions

View File

@ -170,7 +170,7 @@ fields(node_metrics) ->
fields(node_status) ->
[
{node, mk(string(), #{})},
{status, mk(enum([running, waiting, stopped, error]), #{})}
{status, mk(enum([connected, connecting, unconnected, disable, error]), #{})}
];
fields(hook_info) ->
[

View File

@ -63,21 +63,25 @@
-export([roots/0]).
%% Running servers
-type state() :: #{
running := servers(),
%% Wait to reload servers
waiting := servers(),
%% Marked stopped servers
stopped := servers(),
%% Timer references
trefs := map(),
orders := orders()
}.
-type state() :: #{servers := servers()}.
-type server_name() :: binary().
-type servers() :: #{server_name() => server()}.
-type server() :: server_options().
-type server_options() :: map().
-type server_name() :: binary().
-type status() ::
connected
| connecting
| unconnected
| disable.
-type server() :: #{
status := status(),
timer := reference(),
order := integer(),
%% include the content of server_options
atom() => any()
}.
-type servers() :: #{server_name() => server()}.
-type position() ::
front
@ -85,19 +89,10 @@
| {before, binary()}
| {'after', binary()}.
-type orders() :: #{server_name() => integer()}.
-type server_info() :: #{
name := server_name(),
status := running | waiting | stopped,
atom() => term()
}.
-define(DEFAULT_TIMEOUT, 60000).
-define(REFRESH_INTERVAL, timer:seconds(5)).
-export_type([servers/0, server/0, server_info/0]).
-export_type([servers/0, server/0]).
%%--------------------------------------------------------------------
%% APIs
@ -113,7 +108,7 @@ start_link() ->
list() ->
call(list).
-spec lookup(server_name()) -> not_found | server_info().
-spec lookup(server_name()) -> not_found | server().
lookup(Name) ->
call({lookup, Name}).
@ -195,104 +190,56 @@ init([]) ->
process_flag(trap_exit, true),
emqx_conf:add_handler([exhook, servers], ?MODULE),
ServerL = emqx:get_config([exhook, servers]),
{Waiting, Running, Stopped} = load_all_servers(ServerL),
Orders = reorder(ServerL),
Servers = load_all_servers(ServerL),
Servers2 = reorder(ServerL, Servers),
refresh_tick(),
{ok,
ensure_reload_timer(
#{
waiting => Waiting,
running => Running,
stopped => Stopped,
trefs => #{},
orders => Orders
}
)}.
{ok, #{servers => Servers2}}.
-spec load_all_servers(list(server_options())) -> {servers(), servers(), servers()}.
-spec load_all_servers(list(server_options())) -> servers().
load_all_servers(ServerL) ->
load_all_servers(ServerL, #{}, #{}, #{}).
load_all_servers(ServerL, #{}).
load_all_servers([#{name := Name} = Options | More], Waiting, Running, Stopped) ->
case emqx_exhook_server:load(Name, Options) of
{ok, ServerState} ->
save(Name, ServerState),
load_all_servers(More, Waiting, Running#{Name => Options}, Stopped);
{error, _} ->
load_all_servers(More, Waiting#{Name => Options}, Running, Stopped);
disable ->
load_all_servers(More, Waiting, Running, Stopped#{Name => Options})
end;
load_all_servers([], Waiting, Running, Stopped) ->
{Waiting, Running, Stopped}.
load_all_servers([#{name := Name} = Options | More], Servers) ->
{_, Server} = do_load_server(options_to_server(Options)),
load_all_servers(More, Servers#{Name => Server});
load_all_servers([], Servers) ->
Servers.
handle_call(
list,
_From,
State = #{
running := Running,
waiting := Waiting,
stopped := Stopped,
orders := Orders
}
State = #{servers := Servers}
) ->
R = get_servers_info(running, Running),
W = get_servers_info(waiting, Waiting),
S = get_servers_info(stopped, Stopped),
Servers = R ++ W ++ S,
OrderServers = sort_name_by_order(Servers, Orders),
Infos = get_servers_info(Servers),
OrderServers = sort_name_by_order(Infos, Servers),
{reply, OrderServers, State};
handle_call(
{update_config, {move, _Name, _Position}, NewConfL},
_From,
State
#{servers := Servers} = State
) ->
Orders = reorder(NewConfL),
{reply, ok, State#{orders := Orders}};
Servers2 = reorder(NewConfL, Servers),
{reply, ok, State#{servers := Servers2}};
handle_call({update_config, {delete, ToDelete}, _}, _From, State) ->
{ok,
#{
orders := Orders,
stopped := Stopped
} = State2} = do_unload_server(ToDelete, State),
State3 = State2#{
stopped := maps:remove(ToDelete, Stopped),
orders := maps:remove(ToDelete, Orders)
},
emqx_exhook_metrics:on_server_deleted(ToDelete),
{reply, ok, State3};
#{servers := Servers} = State2 = do_unload_server(ToDelete, State),
Servers2 = maps:remove(ToDelete, Servers),
{reply, ok, update_servers(Servers2, State2)};
handle_call(
{update_config, {add, RawConf}, NewConfL},
_From,
#{running := Running, waiting := Waitting, stopped := Stopped} = State
#{servers := Servers} = State
) ->
{_, #{name := Name} = Conf} = emqx_config:check_config(?MODULE, RawConf),
case emqx_exhook_server:load(Name, Conf) of
{ok, ServerState} ->
save(Name, ServerState),
State2 = State#{running := Running#{Name => Conf}};
{error, _} ->
StateT = State#{waiting := Waitting#{Name => Conf}},
State2 = ensure_reload_timer(StateT);
disable ->
State2 = State#{stopped := Stopped#{Name => Conf}}
end,
Orders = reorder(NewConfL),
{reply, ok, State2#{orders := Orders}};
{Result, Server} = do_load_server(options_to_server(Conf)),
Servers2 = Servers#{Name => Server},
Servers3 = reorder(NewConfL, Servers2),
{reply, Result, State#{servers := Servers3}};
handle_call({lookup, Name}, _From, State) ->
case where_is_server(Name, State) of
not_found ->
Result = not_found;
{Where, #{Name := Conf}} ->
Result = maps:merge(Conf, #{status => Where})
end,
{reply, Result, State};
{reply, where_is_server(Name, State), State};
handle_call({update_config, {update, Name, _Conf}, NewConfL}, _From, State) ->
{Result, State2} = restart_server(Name, NewConfL, State),
{reply, Result, State2};
@ -303,7 +250,7 @@ handle_call({server_info, Name}, _From, State) ->
case where_is_server(Name, State) of
not_found ->
Result = not_found;
{Status, _} ->
#{status := Status} ->
HooksMetrics = emqx_exhook_metrics:server_metrics(Name),
Result = #{
status => Status,
@ -314,25 +261,9 @@ handle_call({server_info, Name}, _From, State) ->
handle_call(
all_servers_info,
_From,
#{
running := Running,
waiting := Waiting,
stopped := Stopped
} = State
#{servers := Servers} = State
) ->
MakeStatus = fun(Status, Servers, Acc) ->
lists:foldl(
fun(Name, IAcc) -> IAcc#{Name => Status} end,
Acc,
maps:keys(Servers)
)
end,
Status = lists:foldl(
fun({Status, Servers}, Acc) -> MakeStatus(Status, Servers, Acc) end,
#{},
[{running, Running}, {waiting, Waiting}, {stopped, Stopped}]
),
Status = maps:map(fun(_Name, #{status := Status}) -> Status end, Servers),
Metrics = emqx_exhook_metrics:servers_metrics(),
Result = #{
@ -352,23 +283,8 @@ handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({timeout, _Ref, {reload, Name}}, State) ->
{Result, NState} = do_load_server(Name, State),
case Result of
ok ->
{noreply, NState};
{error, not_found} ->
{noreply, NState};
{error, Reason} ->
?SLOG(
warning,
#{
msg => "failed_to_reload_exhook_callback_server",
reason => Reason,
name => Name
}
),
{noreply, ensure_reload_timer(NState)}
end;
{_, NState} = do_reload_server(Name, State),
{noreply, NState};
handle_info(refresh_tick, State) ->
refresh_tick(),
emqx_exhook_metrics:update(?REFRESH_INTERVAL),
@ -376,14 +292,13 @@ handle_info(refresh_tick, State) ->
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, State = #{running := Running}) ->
terminate(_Reason, State = #{servers := Servers}) ->
_ = maps:fold(
fun(Name, _, AccIn) ->
{ok, NAccIn} = do_unload_server(Name, AccIn),
NAccIn
do_unload_server(Name, AccIn)
end,
State,
Running
Servers
),
_ = unload_exhooks(),
ok.
@ -401,122 +316,83 @@ unload_exhooks() ->
|| {Name, {M, F, _A}} <- ?ENABLED_HOOKS
].
-spec do_load_server(server_name(), state()) ->
{{error, not_found}, state()}
| {{error, already_started}, state()}
| {ok, state()}.
do_load_server(Name, State = #{orders := Orders}) ->
case where_is_server(Name, State) of
not_found ->
{{error, not_found}, State};
{running, _} ->
{ok, State};
{Where, Map} ->
State2 = clean_reload_timer(Name, State),
{Options, Map2} = maps:take(Name, Map),
State3 = State2#{Where := Map2},
#{
running := Running,
stopped := Stopped
} = State3,
case emqx_exhook_server:load(Name, Options) of
{ok, ServerState} ->
save(Name, ServerState),
update_order(Orders),
?SLOG(info, #{
msg => "load_exhook_callback_server_ok",
name => Name
}),
{ok, State3#{running := maps:put(Name, Options, Running)}};
{error, Reason} ->
{{error, Reason}, State};
disable ->
{ok, State3#{stopped := Stopped#{Name => Options}}}
do_load_server(#{name := Name} = Server) ->
case emqx_exhook_server:load(Name, Server) of
{ok, ServerState} ->
save(Name, ServerState),
{ok, Server#{status => connected}};
disable ->
{ok, set_disable(Server)};
{ErrorType, Reason} = Error ->
?SLOG(
error,
#{
msg => "failed_to_load_exhook_callback_server",
reason => Reason,
name => Name
}
),
case ErrorType of
load_error ->
{ok, ensure_reload_timer(Server)};
_ ->
{Error, Server#{status => unconnected}}
end
end.
-spec do_unload_server(server_name(), state()) -> {ok, state()}.
do_unload_server(Name, #{stopped := Stopped} = State) ->
do_load_server(#{name := Name} = Server, #{servers := Servers} = State) ->
{Result, Server2} = do_load_server(Server),
{Result, update_servers(Servers#{Name => Server2}, State)}.
-spec do_reload_server(server_name(), state()) ->
{{error, term()}, state()}
| {ok, state()}.
do_reload_server(Name, State = #{servers := Servers}) ->
case where_is_server(Name, State) of
{stopped, _} ->
{ok, State};
{waiting, Waiting} ->
{Options, Waiting2} = maps:take(Name, Waiting),
{ok,
clean_reload_timer(
Name,
State#{
waiting := Waiting2,
stopped := maps:put(Name, Options, Stopped)
}
)};
{running, Running} ->
Service = server(Name),
ok = unsave(Name),
ok = emqx_exhook_server:unload(Service),
{Options, Running2} = maps:take(Name, Running),
{ok, State#{
running := Running2,
stopped := maps:put(Name, Options, Stopped)
}};
not_found ->
{ok, State}
{{error, not_found}, State};
#{timer := undefined} ->
{ok, State};
Server ->
clean_reload_timer(Server),
do_load_server(Server, Servers)
end.
-spec ensure_reload_timer(state()) -> state().
ensure_reload_timer(
State = #{
waiting := Waiting,
stopped := Stopped,
trefs := TRefs
}
) ->
Iter = maps:iterator(Waiting),
{Waitting2, Stopped2, TRefs2} =
ensure_reload_timer(maps:next(Iter), Waiting, Stopped, TRefs),
State#{
waiting := Waitting2,
stopped := Stopped2,
trefs := TRefs2
}.
ensure_reload_timer(none, Waiting, Stopped, TimerRef) ->
{Waiting, Stopped, TimerRef};
ensure_reload_timer(
{Name, #{auto_reconnect := Intv}, Iter},
Waiting,
Stopped,
TimerRef
) when is_integer(Intv) ->
Next = maps:next(Iter),
case maps:is_key(Name, TimerRef) of
true ->
ensure_reload_timer(Next, Waiting, Stopped, TimerRef);
_ ->
Ref = erlang:start_timer(Intv, self(), {reload, Name}),
TimerRef2 = maps:put(Name, Ref, TimerRef),
ensure_reload_timer(Next, Waiting, Stopped, TimerRef2)
end;
ensure_reload_timer({Name, Opts, Iter}, Waiting, Stopped, TimerRef) ->
ensure_reload_timer(
maps:next(Iter),
maps:remove(Name, Waiting),
maps:put(Name, Opts, Stopped),
TimerRef
).
-spec clean_reload_timer(server_name(), state()) -> state().
clean_reload_timer(Name, State = #{trefs := TRefs}) ->
case maps:take(Name, TRefs) of
error ->
-spec do_unload_server(server_name(), state()) -> state().
do_unload_server(Name, #{servers := Servers} = State) ->
case where_is_server(Name, State) of
not_found ->
State;
{TRef, NTRefs} ->
_ = erlang:cancel_timer(TRef),
State#{trefs := NTRefs}
#{status := disable} ->
State;
Server ->
clean_reload_timer(Server),
case server(Name) of
undefined ->
State;
Service ->
ok = unsave(Name),
ok = emqx_exhook_server:unload(Service),
Servers2 = Servers#{Name := set_disable(Server)},
State#{servers := Servers2}
end
end.
ensure_reload_timer(#{timer := Timer} = Server) when is_reference(Timer) ->
Server;
ensure_reload_timer(#{name := Name, auto_reconnect := Intv} = Server) when is_integer(Intv) ->
Ref = erlang:start_timer(Intv, self(), {reload, Name}),
Server#{status := connecting, timer := Ref};
ensure_reload_timer(Server) ->
Server#{status := unconnected}.
-spec clean_reload_timer(server()) -> ok.
clean_reload_timer(#{timer := undefined}) ->
ok;
clean_reload_timer(#{timer := Timer}) ->
_ = erlang:cancel_timer(Timer),
ok.
-spec do_move(binary(), position(), list(server_options())) ->
not_found | list(server_options()).
do_move(Name, Position, ConfL) ->
@ -545,37 +421,32 @@ move_to([H | T], Position, Server, HeadL) ->
move_to([], _Position, _Server, _HeadL) ->
not_found.
-spec reorder(list(server_options())) -> orders().
reorder(ServerL) ->
Orders = reorder(ServerL, 1, #{}),
-spec reorder(list(server_options()), servers()) -> servers().
reorder(ServerL, Servers) ->
Orders = reorder(ServerL, 1, Servers),
update_order(Orders),
Orders.
reorder([#{name := Name} | T], Order, Orders) ->
reorder(T, Order + 1, Orders#{Name => Order});
reorder([], _Order, Orders) ->
Orders.
reorder([#{name := Name} | T], Order, Servers) ->
reorder(T, Order + 1, update_order(Name, Servers, Order));
reorder([], _Order, Servers) ->
Servers.
get_servers_info(Status, Map) ->
update_order(Name, Servers, Order) ->
Server = maps:get(Name, Servers),
Servers#{Name := Server#{order := Order}}.
get_servers_info(Svrs) ->
Fold = fun(Name, Conf, Acc) ->
[
maps:merge(Conf, #{
status => Status,
hooks => hooks(Name)
})
maps:merge(Conf, #{hooks => hooks(Name)})
| Acc
]
end,
maps:fold(Fold, [], Map).
maps:fold(Fold, [], Svrs).
where_is_server(Name, #{running := Running}) when is_map_key(Name, Running) ->
{running, Running};
where_is_server(Name, #{waiting := Waiting}) when is_map_key(Name, Waiting) ->
{waiting, Waiting};
where_is_server(Name, #{stopped := Stopped}) when is_map_key(Name, Stopped) ->
{stopped, Stopped};
where_is_server(_, _) ->
not_found.
where_is_server(Name, #{servers := Servers}) ->
maps:get(Name, Servers, not_found).
-type replace_fun() :: fun((server_options()) -> server_options()).
@ -604,15 +475,10 @@ restart_server(Name, ConfL, State) ->
case where_is_server(Name, State) of
not_found ->
{{error, not_found}, State};
{Where, Map} ->
State2 = State#{Where := Map#{Name := Conf}},
{ok, State3} = do_unload_server(Name, State2),
case do_load_server(Name, State3) of
{ok, State4} ->
{ok, State4};
{Error, State4} ->
{Error, State4}
end
Server ->
Server2 = maps:merge(Server, Conf),
State2 = do_unload_server(Name, State),
do_load_server(Server2, State2)
end
end.
@ -620,9 +486,11 @@ sort_name_by_order(Names, Orders) ->
lists:sort(
fun
(A, B) when is_binary(A) ->
maps:get(A, Orders) < maps:get(B, Orders);
emqx_map_lib:deep_get([A, order], Orders) <
emqx_map_lib:deep_get([B, order], Orders);
(#{name := A}, #{name := B}) ->
maps:get(A, Orders) < maps:get(B, Orders)
emqx_map_lib:deep_get([A, order], Orders) <
emqx_map_lib:deep_get([B, order], Orders)
end,
Names
).
@ -630,6 +498,16 @@ sort_name_by_order(Names, Orders) ->
refresh_tick() ->
erlang:send_after(?REFRESH_INTERVAL, self(), ?FUNCTION_NAME).
options_to_server(Options) ->
maps:merge(Options, #{status => unconnected, timer => undefined, order => 0}).
update_servers(Servers, State) ->
update_order(Servers),
State#{servers := Servers}.
set_disable(Server) ->
Server#{status := disable, timer := undefined}.
%%--------------------------------------------------------------------
%% Server state persistent
save(Name, ServerState) ->
@ -661,8 +539,17 @@ server(Name) ->
Service -> Service
end.
update_order(Orders) ->
update_order(Servers) ->
Running = running(),
Orders = maps:filter(
fun
(Name, #{status := connected}) ->
lists:member(Name, Running);
(_, _) ->
false
end,
Servers
),
Running2 = sort_name_by_order(Running, Orders),
persistent_term:put(?APP, Running2).

View File

@ -86,40 +86,44 @@
%% Load/Unload APIs
%%--------------------------------------------------------------------
-spec load(binary(), map()) -> {ok, server()} | {error, term()} | disable.
-spec load(binary(), map()) -> {ok, server()} | {error, term()} | {load_error, term()} | disable.
load(_Name, #{enable := false}) ->
disable;
load(Name, #{request_timeout := Timeout, failed_action := FailedAction} = Opts) ->
ReqOpts = #{timeout => Timeout, failed_action => FailedAction},
{SvrAddr, ClientOpts} = channel_opts(Opts),
case
emqx_exhook_sup:start_grpc_client_channel(
Name,
SvrAddr,
ClientOpts
)
of
{ok, _ChannPoolPid} ->
case do_init(Name, ReqOpts) of
{ok, HookSpecs} ->
%% Register metrics
Prefix = lists:flatten(io_lib:format("exhook.~ts.", [Name])),
ensure_metrics(Prefix, HookSpecs),
%% Ensure hooks
ensure_hooks(HookSpecs),
{ok, #{
name => Name,
options => ReqOpts,
channel => _ChannPoolPid,
hookspec => HookSpecs,
prefix => Prefix
}};
case channel_opts(Opts) of
{ok, {SvrAddr, ClientOpts}} ->
case
emqx_exhook_sup:start_grpc_client_channel(
Name,
SvrAddr,
ClientOpts
)
of
{ok, _ChannPoolPid} ->
case do_init(Name, ReqOpts) of
{ok, HookSpecs} ->
%% Register metrics
Prefix = lists:flatten(io_lib:format("exhook.~ts.", [Name])),
ensure_metrics(Prefix, HookSpecs),
%% Ensure hooks
ensure_hooks(HookSpecs),
{ok, #{
name => Name,
options => ReqOpts,
channel => _ChannPoolPid,
hookspec => HookSpecs,
prefix => Prefix
}};
{error, Reason} ->
emqx_exhook_sup:stop_grpc_client_channel(Name),
{load_error, Reason}
end;
{error, _} = E ->
emqx_exhook_sup:stop_grpc_client_channel(Name),
E
end;
{error, _} = E ->
E
Error ->
Error
end.
%% @private
@ -130,7 +134,7 @@ channel_opts(Opts = #{url := URL}) ->
),
case uri_string:parse(URL) of
#{scheme := <<"http">>, host := Host, port := Port} ->
{format_http_uri("http", Host, Port), ClientOpts};
{ok, {format_http_uri("http", Host, Port), ClientOpts}};
#{scheme := <<"https">>, host := Host, port := Port} ->
SslOpts =
case maps:get(ssl, Opts, undefined) of
@ -154,9 +158,9 @@ channel_opts(Opts = #{url := URL}) ->
transport_opts => SslOpts
}
},
{format_http_uri("https", Host, Port), NClientOpts};
{ok, {format_http_uri("https", Host, Port), NClientOpts}};
Error ->
error({bad_server_url, URL, Error})
{error, {bad_server_url, URL, Error}}
end.
format_http_uri(Scheme, Host, Port) ->