diff --git a/apps/emqx_exhook/src/emqx_exhook_api.erl b/apps/emqx_exhook/src/emqx_exhook_api.erl index 27f8203ba..cf460d546 100644 --- a/apps/emqx_exhook/src/emqx_exhook_api.erl +++ b/apps/emqx_exhook/src/emqx_exhook_api.erl @@ -170,7 +170,7 @@ fields(node_metrics) -> fields(node_status) -> [ {node, mk(string(), #{})}, - {status, mk(enum([running, waiting, stopped, error]), #{})} + {status, mk(enum([connected, connecting, unconnected, disable, error]), #{})} ]; fields(hook_info) -> [ diff --git a/apps/emqx_exhook/src/emqx_exhook_mgr.erl b/apps/emqx_exhook/src/emqx_exhook_mgr.erl index f11b91ae4..1ccebb816 100644 --- a/apps/emqx_exhook/src/emqx_exhook_mgr.erl +++ b/apps/emqx_exhook/src/emqx_exhook_mgr.erl @@ -63,21 +63,25 @@ -export([roots/0]). %% Running servers --type state() :: #{ - running := servers(), - %% Wait to reload servers - waiting := servers(), - %% Marked stopped servers - stopped := servers(), - %% Timer references - trefs := map(), - orders := orders() -}. +-type state() :: #{servers := servers()}. --type server_name() :: binary(). --type servers() :: #{server_name() => server()}. --type server() :: server_options(). -type server_options() :: map(). +-type server_name() :: binary(). + +-type status() :: + connected + | connecting + | unconnected + | disable. + +-type server() :: #{ + status := status(), + timer := reference(), + order := integer(), + %% include the content of server_options + atom() => any() +}. +-type servers() :: #{server_name() => server()}. -type position() :: front @@ -85,19 +89,10 @@ | {before, binary()} | {'after', binary()}. --type orders() :: #{server_name() => integer()}. - --type server_info() :: #{ - name := server_name(), - status := running | waiting | stopped, - - atom() => term() -}. - -define(DEFAULT_TIMEOUT, 60000). -define(REFRESH_INTERVAL, timer:seconds(5)). --export_type([servers/0, server/0, server_info/0]). +-export_type([servers/0, server/0]). %%-------------------------------------------------------------------- %% APIs @@ -113,7 +108,7 @@ start_link() -> list() -> call(list). --spec lookup(server_name()) -> not_found | server_info(). +-spec lookup(server_name()) -> not_found | server(). lookup(Name) -> call({lookup, Name}). @@ -195,104 +190,56 @@ init([]) -> process_flag(trap_exit, true), emqx_conf:add_handler([exhook, servers], ?MODULE), ServerL = emqx:get_config([exhook, servers]), - {Waiting, Running, Stopped} = load_all_servers(ServerL), - Orders = reorder(ServerL), + Servers = load_all_servers(ServerL), + Servers2 = reorder(ServerL, Servers), refresh_tick(), - {ok, - ensure_reload_timer( - #{ - waiting => Waiting, - running => Running, - stopped => Stopped, - trefs => #{}, - orders => Orders - } - )}. + {ok, #{servers => Servers2}}. --spec load_all_servers(list(server_options())) -> {servers(), servers(), servers()}. +-spec load_all_servers(list(server_options())) -> servers(). load_all_servers(ServerL) -> - load_all_servers(ServerL, #{}, #{}, #{}). + load_all_servers(ServerL, #{}). -load_all_servers([#{name := Name} = Options | More], Waiting, Running, Stopped) -> - case emqx_exhook_server:load(Name, Options) of - {ok, ServerState} -> - save(Name, ServerState), - load_all_servers(More, Waiting, Running#{Name => Options}, Stopped); - {error, _} -> - load_all_servers(More, Waiting#{Name => Options}, Running, Stopped); - disable -> - load_all_servers(More, Waiting, Running, Stopped#{Name => Options}) - end; -load_all_servers([], Waiting, Running, Stopped) -> - {Waiting, Running, Stopped}. +load_all_servers([#{name := Name} = Options | More], Servers) -> + {_, Server} = do_load_server(options_to_server(Options)), + load_all_servers(More, Servers#{Name => Server}); +load_all_servers([], Servers) -> + Servers. handle_call( list, _From, - State = #{ - running := Running, - waiting := Waiting, - stopped := Stopped, - orders := Orders - } + State = #{servers := Servers} ) -> - R = get_servers_info(running, Running), - W = get_servers_info(waiting, Waiting), - S = get_servers_info(stopped, Stopped), - - Servers = R ++ W ++ S, - OrderServers = sort_name_by_order(Servers, Orders), - + Infos = get_servers_info(Servers), + OrderServers = sort_name_by_order(Infos, Servers), {reply, OrderServers, State}; handle_call( {update_config, {move, _Name, _Position}, NewConfL}, _From, - State + #{servers := Servers} = State ) -> - Orders = reorder(NewConfL), - {reply, ok, State#{orders := Orders}}; + Servers2 = reorder(NewConfL, Servers), + {reply, ok, State#{servers := Servers2}}; handle_call({update_config, {delete, ToDelete}, _}, _From, State) -> - {ok, - #{ - orders := Orders, - stopped := Stopped - } = State2} = do_unload_server(ToDelete, State), - - State3 = State2#{ - stopped := maps:remove(ToDelete, Stopped), - orders := maps:remove(ToDelete, Orders) - }, - emqx_exhook_metrics:on_server_deleted(ToDelete), - {reply, ok, State3}; + #{servers := Servers} = State2 = do_unload_server(ToDelete, State), + + Servers2 = maps:remove(ToDelete, Servers), + + {reply, ok, update_servers(Servers2, State2)}; handle_call( {update_config, {add, RawConf}, NewConfL}, _From, - #{running := Running, waiting := Waitting, stopped := Stopped} = State + #{servers := Servers} = State ) -> {_, #{name := Name} = Conf} = emqx_config:check_config(?MODULE, RawConf), - - case emqx_exhook_server:load(Name, Conf) of - {ok, ServerState} -> - save(Name, ServerState), - State2 = State#{running := Running#{Name => Conf}}; - {error, _} -> - StateT = State#{waiting := Waitting#{Name => Conf}}, - State2 = ensure_reload_timer(StateT); - disable -> - State2 = State#{stopped := Stopped#{Name => Conf}} - end, - Orders = reorder(NewConfL), - {reply, ok, State2#{orders := Orders}}; + {Result, Server} = do_load_server(options_to_server(Conf)), + Servers2 = Servers#{Name => Server}, + Servers3 = reorder(NewConfL, Servers2), + {reply, Result, State#{servers := Servers3}}; handle_call({lookup, Name}, _From, State) -> - case where_is_server(Name, State) of - not_found -> - Result = not_found; - {Where, #{Name := Conf}} -> - Result = maps:merge(Conf, #{status => Where}) - end, - {reply, Result, State}; + {reply, where_is_server(Name, State), State}; handle_call({update_config, {update, Name, _Conf}, NewConfL}, _From, State) -> {Result, State2} = restart_server(Name, NewConfL, State), {reply, Result, State2}; @@ -303,7 +250,7 @@ handle_call({server_info, Name}, _From, State) -> case where_is_server(Name, State) of not_found -> Result = not_found; - {Status, _} -> + #{status := Status} -> HooksMetrics = emqx_exhook_metrics:server_metrics(Name), Result = #{ status => Status, @@ -314,25 +261,9 @@ handle_call({server_info, Name}, _From, State) -> handle_call( all_servers_info, _From, - #{ - running := Running, - waiting := Waiting, - stopped := Stopped - } = State + #{servers := Servers} = State ) -> - MakeStatus = fun(Status, Servers, Acc) -> - lists:foldl( - fun(Name, IAcc) -> IAcc#{Name => Status} end, - Acc, - maps:keys(Servers) - ) - end, - Status = lists:foldl( - fun({Status, Servers}, Acc) -> MakeStatus(Status, Servers, Acc) end, - #{}, - [{running, Running}, {waiting, Waiting}, {stopped, Stopped}] - ), - + Status = maps:map(fun(_Name, #{status := Status}) -> Status end, Servers), Metrics = emqx_exhook_metrics:servers_metrics(), Result = #{ @@ -352,23 +283,8 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info({timeout, _Ref, {reload, Name}}, State) -> - {Result, NState} = do_load_server(Name, State), - case Result of - ok -> - {noreply, NState}; - {error, not_found} -> - {noreply, NState}; - {error, Reason} -> - ?SLOG( - warning, - #{ - msg => "failed_to_reload_exhook_callback_server", - reason => Reason, - name => Name - } - ), - {noreply, ensure_reload_timer(NState)} - end; + {_, NState} = do_reload_server(Name, State), + {noreply, NState}; handle_info(refresh_tick, State) -> refresh_tick(), emqx_exhook_metrics:update(?REFRESH_INTERVAL), @@ -376,14 +292,13 @@ handle_info(refresh_tick, State) -> handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, State = #{running := Running}) -> +terminate(_Reason, State = #{servers := Servers}) -> _ = maps:fold( fun(Name, _, AccIn) -> - {ok, NAccIn} = do_unload_server(Name, AccIn), - NAccIn + do_unload_server(Name, AccIn) end, State, - Running + Servers ), _ = unload_exhooks(), ok. @@ -401,122 +316,83 @@ unload_exhooks() -> || {Name, {M, F, _A}} <- ?ENABLED_HOOKS ]. --spec do_load_server(server_name(), state()) -> - {{error, not_found}, state()} - | {{error, already_started}, state()} - | {ok, state()}. -do_load_server(Name, State = #{orders := Orders}) -> - case where_is_server(Name, State) of - not_found -> - {{error, not_found}, State}; - {running, _} -> - {ok, State}; - {Where, Map} -> - State2 = clean_reload_timer(Name, State), - {Options, Map2} = maps:take(Name, Map), - State3 = State2#{Where := Map2}, - #{ - running := Running, - stopped := Stopped - } = State3, - case emqx_exhook_server:load(Name, Options) of - {ok, ServerState} -> - save(Name, ServerState), - update_order(Orders), - ?SLOG(info, #{ - msg => "load_exhook_callback_server_ok", - name => Name - }), - {ok, State3#{running := maps:put(Name, Options, Running)}}; - {error, Reason} -> - {{error, Reason}, State}; - disable -> - {ok, State3#{stopped := Stopped#{Name => Options}}} +do_load_server(#{name := Name} = Server) -> + case emqx_exhook_server:load(Name, Server) of + {ok, ServerState} -> + save(Name, ServerState), + {ok, Server#{status => connected}}; + disable -> + {ok, set_disable(Server)}; + {ErrorType, Reason} = Error -> + ?SLOG( + error, + #{ + msg => "failed_to_load_exhook_callback_server", + reason => Reason, + name => Name + } + ), + case ErrorType of + load_error -> + {ok, ensure_reload_timer(Server)}; + _ -> + {Error, Server#{status => unconnected}} end end. --spec do_unload_server(server_name(), state()) -> {ok, state()}. -do_unload_server(Name, #{stopped := Stopped} = State) -> +do_load_server(#{name := Name} = Server, #{servers := Servers} = State) -> + {Result, Server2} = do_load_server(Server), + {Result, update_servers(Servers#{Name => Server2}, State)}. + +-spec do_reload_server(server_name(), state()) -> + {{error, term()}, state()} + | {ok, state()}. +do_reload_server(Name, State = #{servers := Servers}) -> case where_is_server(Name, State) of - {stopped, _} -> - {ok, State}; - {waiting, Waiting} -> - {Options, Waiting2} = maps:take(Name, Waiting), - {ok, - clean_reload_timer( - Name, - State#{ - waiting := Waiting2, - stopped := maps:put(Name, Options, Stopped) - } - )}; - {running, Running} -> - Service = server(Name), - ok = unsave(Name), - ok = emqx_exhook_server:unload(Service), - {Options, Running2} = maps:take(Name, Running), - {ok, State#{ - running := Running2, - stopped := maps:put(Name, Options, Stopped) - }}; not_found -> - {ok, State} + {{error, not_found}, State}; + #{timer := undefined} -> + {ok, State}; + Server -> + clean_reload_timer(Server), + do_load_server(Server, Servers) end. --spec ensure_reload_timer(state()) -> state(). -ensure_reload_timer( - State = #{ - waiting := Waiting, - stopped := Stopped, - trefs := TRefs - } -) -> - Iter = maps:iterator(Waiting), - - {Waitting2, Stopped2, TRefs2} = - ensure_reload_timer(maps:next(Iter), Waiting, Stopped, TRefs), - - State#{ - waiting := Waitting2, - stopped := Stopped2, - trefs := TRefs2 - }. - -ensure_reload_timer(none, Waiting, Stopped, TimerRef) -> - {Waiting, Stopped, TimerRef}; -ensure_reload_timer( - {Name, #{auto_reconnect := Intv}, Iter}, - Waiting, - Stopped, - TimerRef -) when is_integer(Intv) -> - Next = maps:next(Iter), - case maps:is_key(Name, TimerRef) of - true -> - ensure_reload_timer(Next, Waiting, Stopped, TimerRef); - _ -> - Ref = erlang:start_timer(Intv, self(), {reload, Name}), - TimerRef2 = maps:put(Name, Ref, TimerRef), - ensure_reload_timer(Next, Waiting, Stopped, TimerRef2) - end; -ensure_reload_timer({Name, Opts, Iter}, Waiting, Stopped, TimerRef) -> - ensure_reload_timer( - maps:next(Iter), - maps:remove(Name, Waiting), - maps:put(Name, Opts, Stopped), - TimerRef - ). - --spec clean_reload_timer(server_name(), state()) -> state(). -clean_reload_timer(Name, State = #{trefs := TRefs}) -> - case maps:take(Name, TRefs) of - error -> +-spec do_unload_server(server_name(), state()) -> state(). +do_unload_server(Name, #{servers := Servers} = State) -> + case where_is_server(Name, State) of + not_found -> State; - {TRef, NTRefs} -> - _ = erlang:cancel_timer(TRef), - State#{trefs := NTRefs} + #{status := disable} -> + State; + Server -> + clean_reload_timer(Server), + case server(Name) of + undefined -> + State; + Service -> + ok = unsave(Name), + ok = emqx_exhook_server:unload(Service), + Servers2 = Servers#{Name := set_disable(Server)}, + State#{servers := Servers2} + end end. +ensure_reload_timer(#{timer := Timer} = Server) when is_reference(Timer) -> + Server; +ensure_reload_timer(#{name := Name, auto_reconnect := Intv} = Server) when is_integer(Intv) -> + Ref = erlang:start_timer(Intv, self(), {reload, Name}), + Server#{status := connecting, timer := Ref}; +ensure_reload_timer(Server) -> + Server#{status := unconnected}. + +-spec clean_reload_timer(server()) -> ok. +clean_reload_timer(#{timer := undefined}) -> + ok; +clean_reload_timer(#{timer := Timer}) -> + _ = erlang:cancel_timer(Timer), + ok. + -spec do_move(binary(), position(), list(server_options())) -> not_found | list(server_options()). do_move(Name, Position, ConfL) -> @@ -545,37 +421,32 @@ move_to([H | T], Position, Server, HeadL) -> move_to([], _Position, _Server, _HeadL) -> not_found. --spec reorder(list(server_options())) -> orders(). -reorder(ServerL) -> - Orders = reorder(ServerL, 1, #{}), +-spec reorder(list(server_options()), servers()) -> servers(). +reorder(ServerL, Servers) -> + Orders = reorder(ServerL, 1, Servers), update_order(Orders), Orders. -reorder([#{name := Name} | T], Order, Orders) -> - reorder(T, Order + 1, Orders#{Name => Order}); -reorder([], _Order, Orders) -> - Orders. +reorder([#{name := Name} | T], Order, Servers) -> + reorder(T, Order + 1, update_order(Name, Servers, Order)); +reorder([], _Order, Servers) -> + Servers. -get_servers_info(Status, Map) -> +update_order(Name, Servers, Order) -> + Server = maps:get(Name, Servers), + Servers#{Name := Server#{order := Order}}. + +get_servers_info(Svrs) -> Fold = fun(Name, Conf, Acc) -> [ - maps:merge(Conf, #{ - status => Status, - hooks => hooks(Name) - }) + maps:merge(Conf, #{hooks => hooks(Name)}) | Acc ] end, - maps:fold(Fold, [], Map). + maps:fold(Fold, [], Svrs). -where_is_server(Name, #{running := Running}) when is_map_key(Name, Running) -> - {running, Running}; -where_is_server(Name, #{waiting := Waiting}) when is_map_key(Name, Waiting) -> - {waiting, Waiting}; -where_is_server(Name, #{stopped := Stopped}) when is_map_key(Name, Stopped) -> - {stopped, Stopped}; -where_is_server(_, _) -> - not_found. +where_is_server(Name, #{servers := Servers}) -> + maps:get(Name, Servers, not_found). -type replace_fun() :: fun((server_options()) -> server_options()). @@ -604,15 +475,10 @@ restart_server(Name, ConfL, State) -> case where_is_server(Name, State) of not_found -> {{error, not_found}, State}; - {Where, Map} -> - State2 = State#{Where := Map#{Name := Conf}}, - {ok, State3} = do_unload_server(Name, State2), - case do_load_server(Name, State3) of - {ok, State4} -> - {ok, State4}; - {Error, State4} -> - {Error, State4} - end + Server -> + Server2 = maps:merge(Server, Conf), + State2 = do_unload_server(Name, State), + do_load_server(Server2, State2) end end. @@ -620,9 +486,11 @@ sort_name_by_order(Names, Orders) -> lists:sort( fun (A, B) when is_binary(A) -> - maps:get(A, Orders) < maps:get(B, Orders); + emqx_map_lib:deep_get([A, order], Orders) < + emqx_map_lib:deep_get([B, order], Orders); (#{name := A}, #{name := B}) -> - maps:get(A, Orders) < maps:get(B, Orders) + emqx_map_lib:deep_get([A, order], Orders) < + emqx_map_lib:deep_get([B, order], Orders) end, Names ). @@ -630,6 +498,16 @@ sort_name_by_order(Names, Orders) -> refresh_tick() -> erlang:send_after(?REFRESH_INTERVAL, self(), ?FUNCTION_NAME). +options_to_server(Options) -> + maps:merge(Options, #{status => unconnected, timer => undefined, order => 0}). + +update_servers(Servers, State) -> + update_order(Servers), + State#{servers := Servers}. + +set_disable(Server) -> + Server#{status := disable, timer := undefined}. + %%-------------------------------------------------------------------- %% Server state persistent save(Name, ServerState) -> @@ -661,8 +539,17 @@ server(Name) -> Service -> Service end. -update_order(Orders) -> +update_order(Servers) -> Running = running(), + Orders = maps:filter( + fun + (Name, #{status := connected}) -> + lists:member(Name, Running); + (_, _) -> + false + end, + Servers + ), Running2 = sort_name_by_order(Running, Orders), persistent_term:put(?APP, Running2). diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index b804d1478..36f5f403a 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -86,40 +86,44 @@ %% Load/Unload APIs %%-------------------------------------------------------------------- --spec load(binary(), map()) -> {ok, server()} | {error, term()} | disable. +-spec load(binary(), map()) -> {ok, server()} | {error, term()} | {load_error, term()} | disable. load(_Name, #{enable := false}) -> disable; load(Name, #{request_timeout := Timeout, failed_action := FailedAction} = Opts) -> ReqOpts = #{timeout => Timeout, failed_action => FailedAction}, - {SvrAddr, ClientOpts} = channel_opts(Opts), - case - emqx_exhook_sup:start_grpc_client_channel( - Name, - SvrAddr, - ClientOpts - ) - of - {ok, _ChannPoolPid} -> - case do_init(Name, ReqOpts) of - {ok, HookSpecs} -> - %% Register metrics - Prefix = lists:flatten(io_lib:format("exhook.~ts.", [Name])), - ensure_metrics(Prefix, HookSpecs), - %% Ensure hooks - ensure_hooks(HookSpecs), - {ok, #{ - name => Name, - options => ReqOpts, - channel => _ChannPoolPid, - hookspec => HookSpecs, - prefix => Prefix - }}; + case channel_opts(Opts) of + {ok, {SvrAddr, ClientOpts}} -> + case + emqx_exhook_sup:start_grpc_client_channel( + Name, + SvrAddr, + ClientOpts + ) + of + {ok, _ChannPoolPid} -> + case do_init(Name, ReqOpts) of + {ok, HookSpecs} -> + %% Register metrics + Prefix = lists:flatten(io_lib:format("exhook.~ts.", [Name])), + ensure_metrics(Prefix, HookSpecs), + %% Ensure hooks + ensure_hooks(HookSpecs), + {ok, #{ + name => Name, + options => ReqOpts, + channel => _ChannPoolPid, + hookspec => HookSpecs, + prefix => Prefix + }}; + {error, Reason} -> + emqx_exhook_sup:stop_grpc_client_channel(Name), + {load_error, Reason} + end; {error, _} = E -> - emqx_exhook_sup:stop_grpc_client_channel(Name), E end; - {error, _} = E -> - E + Error -> + Error end. %% @private @@ -130,7 +134,7 @@ channel_opts(Opts = #{url := URL}) -> ), case uri_string:parse(URL) of #{scheme := <<"http">>, host := Host, port := Port} -> - {format_http_uri("http", Host, Port), ClientOpts}; + {ok, {format_http_uri("http", Host, Port), ClientOpts}}; #{scheme := <<"https">>, host := Host, port := Port} -> SslOpts = case maps:get(ssl, Opts, undefined) of @@ -154,9 +158,9 @@ channel_opts(Opts = #{url := URL}) -> transport_opts => SslOpts } }, - {format_http_uri("https", Host, Port), NClientOpts}; + {ok, {format_http_uri("https", Host, Port), NClientOpts}}; Error -> - error({bad_server_url, URL, Error}) + {error, {bad_server_url, URL, Error}} end. format_http_uri(Scheme, Host, Port) ->