fix(exhook): ignore the exhook server load error when config update

This commit is contained in:
firest 2022-04-15 11:16:11 +08:00
parent 21fe7f01ee
commit ed73001901
2 changed files with 28 additions and 38 deletions

View File

@ -218,10 +218,10 @@ load_all_servers([#{name := Name} = Options | More], Waiting, Running, Stopped)
{ok, ServerState} -> {ok, ServerState} ->
save(Name, ServerState), save(Name, ServerState),
load_all_servers(More, Waiting, Running#{Name => Options}, Stopped); load_all_servers(More, Waiting, Running#{Name => Options}, Stopped);
{error, _} ->
load_all_servers(More, Waiting#{Name => Options}, Running, Stopped);
disable -> disable ->
load_all_servers(More, Waiting, Running, Stopped#{Name => Options}) load_all_servers(More, Waiting, Running, Stopped#{Name => Options});
{_, _} ->
load_all_servers(More, Waiting#{Name => Options}, Running, Stopped)
end; end;
load_all_servers([], Waiting, Running, Stopped) -> load_all_servers([], Waiting, Running, Stopped) ->
{Waiting, Running, Stopped}. {Waiting, Running, Stopped}.
@ -277,11 +277,11 @@ handle_call(
{ok, ServerState} -> {ok, ServerState} ->
save(Name, ServerState), save(Name, ServerState),
State2 = State#{running := Running#{Name => Conf}}; State2 = State#{running := Running#{Name => Conf}};
{error, _} ->
StateT = State#{waiting := Waitting#{Name => Conf}},
State2 = ensure_reload_timer(StateT);
disable -> disable ->
State2 = State#{stopped := Stopped#{Name => Conf}} State2 = State#{stopped := Stopped#{Name => Conf}};
{_, _} ->
StateT = State#{waiting := Waitting#{Name => Conf}},
State2 = ensure_reload_timer(StateT)
end, end,
Orders = reorder(NewConfL), Orders = reorder(NewConfL),
{reply, ok, State2#{orders := Orders}}; {reply, ok, State2#{orders := Orders}};
@ -352,23 +352,8 @@ handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.
handle_info({timeout, _Ref, {reload, Name}}, State) -> handle_info({timeout, _Ref, {reload, Name}}, State) ->
{Result, NState} = do_load_server(Name, State), NState = do_load_server(Name, State),
case Result of {noreply, NState};
ok ->
{noreply, NState};
{error, not_found} ->
{noreply, NState};
{error, Reason} ->
?SLOG(
warning,
#{
msg => "failed_to_reload_exhook_callback_server",
reason => Reason,
name => Name
}
),
{noreply, ensure_reload_timer(NState)}
end;
handle_info(refresh_tick, State) -> handle_info(refresh_tick, State) ->
refresh_tick(), refresh_tick(),
emqx_exhook_metrics:update(?REFRESH_INTERVAL), emqx_exhook_metrics:update(?REFRESH_INTERVAL),
@ -402,8 +387,7 @@ unload_exhooks() ->
]. ].
-spec do_load_server(server_name(), state()) -> -spec do_load_server(server_name(), state()) ->
{{error, not_found}, state()} {{error, term()}, state()}
| {{error, already_started}, state()}
| {ok, state()}. | {ok, state()}.
do_load_server(Name, State = #{orders := Orders}) -> do_load_server(Name, State = #{orders := Orders}) ->
case where_is_server(Name, State) of case where_is_server(Name, State) of
@ -417,6 +401,7 @@ do_load_server(Name, State = #{orders := Orders}) ->
State3 = State2#{Where := Map2}, State3 = State2#{Where := Map2},
#{ #{
running := Running, running := Running,
waiting := Waiting,
stopped := Stopped stopped := Stopped
} = State3, } = State3,
case emqx_exhook_server:load(Name, Options) of case emqx_exhook_server:load(Name, Options) of
@ -428,10 +413,20 @@ do_load_server(Name, State = #{orders := Orders}) ->
name => Name name => Name
}), }),
{ok, State3#{running := maps:put(Name, Options, Running)}}; {ok, State3#{running := maps:put(Name, Options, Running)}};
{error, Reason} ->
{{error, Reason}, State};
disable -> disable ->
{ok, State3#{stopped := Stopped#{Name => Options}}} {ok, State3#{stopped := Stopped#{Name => Options}}};
{load_error, _} ->
{ok, ensure_reload_timer(State3#{waiting := maps:put(Name, Options, Waiting)})};
{_, Reason} ->
?SLOG(
warning,
#{
msg => "failed_to_load_exhook_callback_server",
reason => Reason,
name => Name
}
),
{ok, State}
end end
end. end.
@ -607,12 +602,7 @@ restart_server(Name, ConfL, State) ->
{Where, Map} -> {Where, Map} ->
State2 = State#{Where := Map#{Name := Conf}}, State2 = State#{Where := Map#{Name := Conf}},
{ok, State3} = do_unload_server(Name, State2), {ok, State3} = do_unload_server(Name, State2),
case do_load_server(Name, State3) of do_load_server(Name, State3)
{ok, State4} ->
{ok, State4};
{Error, State4} ->
{Error, State4}
end
end end
end. end.

View File

@ -86,7 +86,7 @@
%% Load/Unload APIs %% Load/Unload APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec load(binary(), map()) -> {ok, server()} | {error, term()} | disable. -spec load(binary(), map()) -> {ok, server()} | {error, term()} | {load_error, term()} | disable.
load(_Name, #{enable := false}) -> load(_Name, #{enable := false}) ->
disable; disable;
load(Name, #{request_timeout := Timeout, failed_action := FailedAction} = Opts) -> load(Name, #{request_timeout := Timeout, failed_action := FailedAction} = Opts) ->
@ -114,9 +114,9 @@ load(Name, #{request_timeout := Timeout, failed_action := FailedAction} = Opts)
hookspec => HookSpecs, hookspec => HookSpecs,
prefix => Prefix prefix => Prefix
}}; }};
{error, _} = E -> {error, Reason} ->
emqx_exhook_sup:stop_grpc_client_channel(Name), emqx_exhook_sup:stop_grpc_client_channel(Name),
E {load_error, Reason}
end; end;
{error, _} = E -> {error, _} = E ->
E E