refactor(exhook): refine the status of the exhook callback server

This commit is contained in:
firest 2022-04-15 16:58:05 +08:00
parent ed73001901
commit f20af91ba0
2 changed files with 165 additions and 268 deletions

View File

@ -170,7 +170,7 @@ fields(node_metrics) ->
fields(node_status) ->
[
{node, mk(string(), #{})},
{status, mk(enum([running, waiting, stopped, error]), #{})}
{status, mk(enum([connected, connecting, unconnected, disable, error]), #{})}
];
fields(hook_info) ->
[

View File

@ -63,21 +63,25 @@
-export([roots/0]).
%% Running servers
-type state() :: #{
running := servers(),
%% Wait to reload servers
waiting := servers(),
%% Marked stopped servers
stopped := servers(),
%% Timer references
trefs := map(),
orders := orders()
}.
-type state() :: #{servers := servers()}.
-type server_name() :: binary().
-type servers() :: #{server_name() => server()}.
-type server() :: server_options().
-type server_options() :: map().
-type server_name() :: binary().
-type status() ::
connected
| connecting
| unconnected
| disable.
-type server() :: #{
status := status(),
timer := reference(),
order := integer(),
%% include the content of server_options
atom() => any()
}.
-type servers() :: #{server_name() => server()}.
-type position() ::
front
@ -85,19 +89,10 @@
| {before, binary()}
| {'after', binary()}.
-type orders() :: #{server_name() => integer()}.
-type server_info() :: #{
name := server_name(),
status := running | waiting | stopped,
atom() => term()
}.
-define(DEFAULT_TIMEOUT, 60000).
-define(REFRESH_INTERVAL, timer:seconds(5)).
-export_type([servers/0, server/0, server_info/0]).
-export_type([servers/0, server/0]).
%%--------------------------------------------------------------------
%% APIs
@ -113,7 +108,7 @@ start_link() ->
list() ->
call(list).
-spec lookup(server_name()) -> not_found | server_info().
-spec lookup(server_name()) -> not_found | server().
lookup(Name) ->
call({lookup, Name}).
@ -195,104 +190,56 @@ init([]) ->
process_flag(trap_exit, true),
emqx_conf:add_handler([exhook, servers], ?MODULE),
ServerL = emqx:get_config([exhook, servers]),
{Waiting, Running, Stopped} = load_all_servers(ServerL),
Orders = reorder(ServerL),
Servers = load_all_servers(ServerL),
Servers2 = reorder(ServerL, Servers),
refresh_tick(),
{ok,
ensure_reload_timer(
#{
waiting => Waiting,
running => Running,
stopped => Stopped,
trefs => #{},
orders => Orders
}
)}.
{ok, #{servers => Servers2}}.
-spec load_all_servers(list(server_options())) -> {servers(), servers(), servers()}.
-spec load_all_servers(list(server_options())) -> servers().
load_all_servers(ServerL) ->
load_all_servers(ServerL, #{}, #{}, #{}).
load_all_servers(ServerL, #{}).
load_all_servers([#{name := Name} = Options | More], Waiting, Running, Stopped) ->
case emqx_exhook_server:load(Name, Options) of
{ok, ServerState} ->
save(Name, ServerState),
load_all_servers(More, Waiting, Running#{Name => Options}, Stopped);
disable ->
load_all_servers(More, Waiting, Running, Stopped#{Name => Options});
{_, _} ->
load_all_servers(More, Waiting#{Name => Options}, Running, Stopped)
end;
load_all_servers([], Waiting, Running, Stopped) ->
{Waiting, Running, Stopped}.
load_all_servers([#{name := Name} = Options | More], Servers) ->
{_, Server} = do_load_server(options_to_server(Options)),
load_all_servers(More, Servers#{Name => Server});
load_all_servers([], Servers) ->
Servers.
handle_call(
list,
_From,
State = #{
running := Running,
waiting := Waiting,
stopped := Stopped,
orders := Orders
}
State = #{servers := Servers}
) ->
R = get_servers_info(running, Running),
W = get_servers_info(waiting, Waiting),
S = get_servers_info(stopped, Stopped),
Servers = R ++ W ++ S,
OrderServers = sort_name_by_order(Servers, Orders),
Infos = get_servers_info(Servers),
OrderServers = sort_name_by_order(Infos, Servers),
{reply, OrderServers, State};
handle_call(
{update_config, {move, _Name, _Position}, NewConfL},
_From,
State
#{servers := Servers} = State
) ->
Orders = reorder(NewConfL),
{reply, ok, State#{orders := Orders}};
Servers2 = reorder(NewConfL, Servers),
{reply, ok, State#{servers := Servers2}};
handle_call({update_config, {delete, ToDelete}, _}, _From, State) ->
{ok,
#{
orders := Orders,
stopped := Stopped
} = State2} = do_unload_server(ToDelete, State),
State3 = State2#{
stopped := maps:remove(ToDelete, Stopped),
orders := maps:remove(ToDelete, Orders)
},
emqx_exhook_metrics:on_server_deleted(ToDelete),
{reply, ok, State3};
#{servers := Servers} = State2 = do_unload_server(ToDelete, State),
Servers2 = maps:remove(ToDelete, Servers),
{reply, ok, update_servers(Servers2, State2)};
handle_call(
{update_config, {add, RawConf}, NewConfL},
_From,
#{running := Running, waiting := Waitting, stopped := Stopped} = State
#{servers := Servers} = State
) ->
{_, #{name := Name} = Conf} = emqx_config:check_config(?MODULE, RawConf),
case emqx_exhook_server:load(Name, Conf) of
{ok, ServerState} ->
save(Name, ServerState),
State2 = State#{running := Running#{Name => Conf}};
disable ->
State2 = State#{stopped := Stopped#{Name => Conf}};
{_, _} ->
StateT = State#{waiting := Waitting#{Name => Conf}},
State2 = ensure_reload_timer(StateT)
end,
Orders = reorder(NewConfL),
{reply, ok, State2#{orders := Orders}};
{Result, Server} = do_load_server(options_to_server(Conf)),
Servers2 = Servers#{Name => Server},
Servers3 = reorder(NewConfL, Servers2),
{reply, Result, State#{servers := Servers3}};
handle_call({lookup, Name}, _From, State) ->
case where_is_server(Name, State) of
not_found ->
Result = not_found;
{Where, #{Name := Conf}} ->
Result = maps:merge(Conf, #{status => Where})
end,
{reply, Result, State};
{reply, where_is_server(Name, State), State};
handle_call({update_config, {update, Name, _Conf}, NewConfL}, _From, State) ->
{Result, State2} = restart_server(Name, NewConfL, State),
{reply, Result, State2};
@ -303,7 +250,7 @@ handle_call({server_info, Name}, _From, State) ->
case where_is_server(Name, State) of
not_found ->
Result = not_found;
{Status, _} ->
#{status := Status} ->
HooksMetrics = emqx_exhook_metrics:server_metrics(Name),
Result = #{
status => Status,
@ -314,25 +261,9 @@ handle_call({server_info, Name}, _From, State) ->
handle_call(
all_servers_info,
_From,
#{
running := Running,
waiting := Waiting,
stopped := Stopped
} = State
#{servers := Servers} = State
) ->
MakeStatus = fun(Status, Servers, Acc) ->
lists:foldl(
fun(Name, IAcc) -> IAcc#{Name => Status} end,
Acc,
maps:keys(Servers)
)
end,
Status = lists:foldl(
fun({Status, Servers}, Acc) -> MakeStatus(Status, Servers, Acc) end,
#{},
[{running, Running}, {waiting, Waiting}, {stopped, Stopped}]
),
Status = maps:map(fun(_Name, #{status := Status}) -> Status end, Servers),
Metrics = emqx_exhook_metrics:servers_metrics(),
Result = #{
@ -352,7 +283,7 @@ handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({timeout, _Ref, {reload, Name}}, State) ->
NState = do_load_server(Name, State),
{_, NState} = do_reload_server(Name, State),
{noreply, NState};
handle_info(refresh_tick, State) ->
refresh_tick(),
@ -361,14 +292,13 @@ handle_info(refresh_tick, State) ->
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, State = #{running := Running}) ->
terminate(_Reason, State = #{servers := Servers}) ->
_ = maps:fold(
fun(Name, _, AccIn) ->
{ok, NAccIn} = do_unload_server(Name, AccIn),
NAccIn
do_unload_server(Name, AccIn)
end,
State,
Running
Servers
),
_ = unload_exhooks(),
ok.
@ -386,131 +316,82 @@ unload_exhooks() ->
|| {Name, {M, F, _A}} <- ?ENABLED_HOOKS
].
-spec do_load_server(server_name(), state()) ->
{{error, term()}, state()}
| {ok, state()}.
do_load_server(Name, State = #{orders := Orders}) ->
case where_is_server(Name, State) of
not_found ->
{{error, not_found}, State};
{running, _} ->
{ok, State};
{Where, Map} ->
State2 = clean_reload_timer(Name, State),
{Options, Map2} = maps:take(Name, Map),
State3 = State2#{Where := Map2},
#{
running := Running,
waiting := Waiting,
stopped := Stopped
} = State3,
case emqx_exhook_server:load(Name, Options) of
do_load_server(#{name := Name} = Server) ->
case emqx_exhook_server:load(Name, Server) of
{ok, ServerState} ->
save(Name, ServerState),
update_order(Orders),
?SLOG(info, #{
msg => "load_exhook_callback_server_ok",
name => Name
}),
{ok, State3#{running := maps:put(Name, Options, Running)}};
{ok, Server#{status => connected}};
disable ->
{ok, State3#{stopped := Stopped#{Name => Options}}};
{load_error, _} ->
{ok, ensure_reload_timer(State3#{waiting := maps:put(Name, Options, Waiting)})};
{_, Reason} ->
{ok, set_disable(Server)};
{ErrorType, Reason} = Error ->
?SLOG(
warning,
error,
#{
msg => "failed_to_load_exhook_callback_server",
reason => Reason,
name => Name
}
),
{ok, State}
case ErrorType of
load_error ->
{ok, ensure_reload_timer(Server)};
_ ->
{Error, Server#{status => unconnected}}
end
end.
-spec do_unload_server(server_name(), state()) -> {ok, state()}.
do_unload_server(Name, #{stopped := Stopped} = State) ->
do_load_server(#{name := Name} = Server, #{servers := Servers} = State) ->
{Result, Server2} = do_load_server(Server),
{Result, update_servers(Servers#{Name => Server2}, State)}.
-spec do_reload_server(server_name(), state()) ->
{{error, term()}, state()}
| {ok, state()}.
do_reload_server(Name, State = #{servers := Servers}) ->
case where_is_server(Name, State) of
{stopped, _} ->
not_found ->
{{error, not_found}, State};
#{timer := undefined} ->
{ok, State};
{waiting, Waiting} ->
{Options, Waiting2} = maps:take(Name, Waiting),
{ok,
clean_reload_timer(
Name,
State#{
waiting := Waiting2,
stopped := maps:put(Name, Options, Stopped)
}
)};
{running, Running} ->
Service = server(Name),
Server ->
clean_reload_timer(Server),
do_load_server(Server, Servers)
end.
-spec do_unload_server(server_name(), state()) -> state().
do_unload_server(Name, #{servers := Servers} = State) ->
case where_is_server(Name, State) of
not_found ->
State;
#{status := disable} ->
State;
Server ->
clean_reload_timer(Server),
case server(Name) of
undefined ->
State;
Service ->
ok = unsave(Name),
ok = emqx_exhook_server:unload(Service),
{Options, Running2} = maps:take(Name, Running),
{ok, State#{
running := Running2,
stopped := maps:put(Name, Options, Stopped)
}};
not_found ->
{ok, State}
Servers2 = Servers#{Name := set_disable(Server)},
State#{servers := Servers2}
end
end.
-spec ensure_reload_timer(state()) -> state().
ensure_reload_timer(
State = #{
waiting := Waiting,
stopped := Stopped,
trefs := TRefs
}
) ->
Iter = maps:iterator(Waiting),
{Waitting2, Stopped2, TRefs2} =
ensure_reload_timer(maps:next(Iter), Waiting, Stopped, TRefs),
State#{
waiting := Waitting2,
stopped := Stopped2,
trefs := TRefs2
}.
ensure_reload_timer(none, Waiting, Stopped, TimerRef) ->
{Waiting, Stopped, TimerRef};
ensure_reload_timer(
{Name, #{auto_reconnect := Intv}, Iter},
Waiting,
Stopped,
TimerRef
) when is_integer(Intv) ->
Next = maps:next(Iter),
case maps:is_key(Name, TimerRef) of
true ->
ensure_reload_timer(Next, Waiting, Stopped, TimerRef);
_ ->
ensure_reload_timer(#{timer := Timer} = Server) when is_reference(Timer) ->
Server;
ensure_reload_timer(#{name := Name, auto_reconnect := Intv} = Server) when is_integer(Intv) ->
Ref = erlang:start_timer(Intv, self(), {reload, Name}),
TimerRef2 = maps:put(Name, Ref, TimerRef),
ensure_reload_timer(Next, Waiting, Stopped, TimerRef2)
end;
ensure_reload_timer({Name, Opts, Iter}, Waiting, Stopped, TimerRef) ->
ensure_reload_timer(
maps:next(Iter),
maps:remove(Name, Waiting),
maps:put(Name, Opts, Stopped),
TimerRef
).
Server#{status := connecting, timer := Ref};
ensure_reload_timer(Server) ->
Server#{status := unconnected}.
-spec clean_reload_timer(server_name(), state()) -> state().
clean_reload_timer(Name, State = #{trefs := TRefs}) ->
case maps:take(Name, TRefs) of
error ->
State;
{TRef, NTRefs} ->
_ = erlang:cancel_timer(TRef),
State#{trefs := NTRefs}
end.
-spec clean_reload_timer(server()) -> ok.
clean_reload_timer(#{timer := undefined}) ->
ok;
clean_reload_timer(#{timer := Timer}) ->
_ = erlang:cancel_timer(Timer),
ok.
-spec do_move(binary(), position(), list(server_options())) ->
not_found | list(server_options()).
@ -540,37 +421,32 @@ move_to([H | T], Position, Server, HeadL) ->
move_to([], _Position, _Server, _HeadL) ->
not_found.
-spec reorder(list(server_options())) -> orders().
reorder(ServerL) ->
Orders = reorder(ServerL, 1, #{}),
-spec reorder(list(server_options()), servers()) -> servers().
reorder(ServerL, Servers) ->
Orders = reorder(ServerL, 1, Servers),
update_order(Orders),
Orders.
reorder([#{name := Name} | T], Order, Orders) ->
reorder(T, Order + 1, Orders#{Name => Order});
reorder([], _Order, Orders) ->
Orders.
reorder([#{name := Name} | T], Order, Servers) ->
reorder(T, Order + 1, update_order(Name, Servers, Order));
reorder([], _Order, Servers) ->
Servers.
get_servers_info(Status, Map) ->
update_order(Name, Servers, Order) ->
Server = maps:get(Name, Servers),
Servers#{Name := Server#{order := Order}}.
get_servers_info(Svrs) ->
Fold = fun(Name, Conf, Acc) ->
[
maps:merge(Conf, #{
status => Status,
hooks => hooks(Name)
})
maps:merge(Conf, #{hooks => hooks(Name)})
| Acc
]
end,
maps:fold(Fold, [], Map).
maps:fold(Fold, [], Svrs).
where_is_server(Name, #{running := Running}) when is_map_key(Name, Running) ->
{running, Running};
where_is_server(Name, #{waiting := Waiting}) when is_map_key(Name, Waiting) ->
{waiting, Waiting};
where_is_server(Name, #{stopped := Stopped}) when is_map_key(Name, Stopped) ->
{stopped, Stopped};
where_is_server(_, _) ->
not_found.
where_is_server(Name, #{servers := Servers}) ->
maps:get(Name, Servers, not_found).
-type replace_fun() :: fun((server_options()) -> server_options()).
@ -599,10 +475,10 @@ restart_server(Name, ConfL, State) ->
case where_is_server(Name, State) of
not_found ->
{{error, not_found}, State};
{Where, Map} ->
State2 = State#{Where := Map#{Name := Conf}},
{ok, State3} = do_unload_server(Name, State2),
do_load_server(Name, State3)
Server ->
Server2 = maps:merge(Server, Conf),
State2 = do_unload_server(Name, State),
do_load_server(Server2, State2)
end
end.
@ -610,9 +486,11 @@ sort_name_by_order(Names, Orders) ->
lists:sort(
fun
(A, B) when is_binary(A) ->
maps:get(A, Orders) < maps:get(B, Orders);
emqx_map_lib:deep_get([A, order], Orders) <
emqx_map_lib:deep_get([B, order], Orders);
(#{name := A}, #{name := B}) ->
maps:get(A, Orders) < maps:get(B, Orders)
emqx_map_lib:deep_get([A, order], Orders) <
emqx_map_lib:deep_get([B, order], Orders)
end,
Names
).
@ -620,6 +498,16 @@ sort_name_by_order(Names, Orders) ->
refresh_tick() ->
erlang:send_after(?REFRESH_INTERVAL, self(), ?FUNCTION_NAME).
options_to_server(Options) ->
maps:merge(Options, #{status => unconnected, timer => undefined, order => 0}).
update_servers(Servers, State) ->
update_order(Servers),
State#{servers := Servers}.
set_disable(Server) ->
Server#{status := disable, timer := undefined}.
%%--------------------------------------------------------------------
%% Server state persistent
save(Name, ServerState) ->
@ -651,8 +539,17 @@ server(Name) ->
Service -> Service
end.
update_order(Orders) ->
update_order(Servers) ->
Running = running(),
Orders = maps:filter(
fun
(Name, #{status := connected}) ->
lists:member(Name, Running);
(_, _) ->
false
end,
Servers
),
Running2 = sort_name_by_order(Running, Orders),
persistent_term:put(?APP, Running2).