diff --git a/apps/emqx_exhook/etc/emqx_exhook.conf b/apps/emqx_exhook/etc/emqx_exhook.conf index 42bd04f19..8769e9a2d 100644 --- a/apps/emqx_exhook/etc/emqx_exhook.conf +++ b/apps/emqx_exhook/etc/emqx_exhook.conf @@ -2,43 +2,45 @@ ## EMQ X Hooks ##==================================================================== -exhook { - ## The default value or action will be returned, while the request to - ## the gRPC server failed or no available grpc server running. - ## - ## Default: deny - ## Value: ignore | deny - request_failed_action = deny +emqx_exhook { - ## The timeout to request grpc server + servers = [ + ##{ + ## name = default ## - ## Default: 5s - ## Value: Duration - request_timeout = 5s - ## Whether to automatically reconnect (initialize) the gRPC server - ## ## When gRPC is not available, exhook tries to request the gRPC service at ## that interval and reinitialize the list of mounted hooks. ## ## Default: false ## Value: false | Duration - auto_reconnect = 60s + ## auto_reconnect = 60s - ## The process pool size for gRPC client + ## The default value or action will be returned, while the request to + ## the gRPC server failed or no available grpc server running. ## - ## Default: Equals cpu cores - ## Value: Integer - #pool_size = 16 + ## Default: deny + ## Value: ignore | deny + ## failed_action = deny - servers = [ - # { name: "default" - # url: "http://127.0.0.1:9000" - # #ssl: { - # # cacertfile: "{{ platform_etc_dir }}/certs/cacert.pem" - # # certfile: "{{ platform_etc_dir }}/certs/cert.pem" - # # keyfile: "{{ platform_etc_dir }}/certs/key.pem" - # #} - # } + ## The timeout to request grpc server + ## + ## Default: 5s + ## Value: Duration + ## request_timeout = 5s + + ## url = "http://127.0.0.1:9000" + ## ssl { + ## cacertfile: "{{ platform_etc_dir }}/certs/cacert.pem" + ## certfile: "{{ platform_etc_dir }}/certs/cert.pem" + ## keyfile: "{{ platform_etc_dir }}/certs/key.pem" + ## } + ## + ## The process pool size for gRPC client + ## + ## Default: Equals cpu cores + ## Value: Integer + ## pool_size = 16 + ##} ] } diff --git a/apps/emqx_exhook/src/emqx_exhook.erl b/apps/emqx_exhook/src/emqx_exhook.erl index c6b02e716..60a16ffc0 100644 --- a/apps/emqx_exhook/src/emqx_exhook.erl +++ b/apps/emqx_exhook/src/emqx_exhook.erl @@ -19,90 +19,56 @@ -include("emqx_exhook.hrl"). -include_lib("emqx/include/logger.hrl"). - --export([ enable/1 - , disable/1 - , list/0 - ]). - -export([ cast/2 , call_fold/3 ]). -%%-------------------------------------------------------------------- -%% Mgmt APIs -%%-------------------------------------------------------------------- - --spec enable(binary()) -> ok | {error, term()}. -enable(Name) -> - with_mngr(fun(Pid) -> emqx_exhook_mngr:enable(Pid, Name) end). - --spec disable(binary()) -> ok | {error, term()}. -disable(Name) -> - with_mngr(fun(Pid) -> emqx_exhook_mngr:disable(Pid, Name) end). - --spec list() -> [atom() | string()]. -list() -> - with_mngr(fun(Pid) -> emqx_exhook_mngr:list(Pid) end). - -with_mngr(Fun) -> - case lists:keyfind(emqx_exhook_mngr, 1, - supervisor:which_children(emqx_exhook_sup)) of - {_, Pid, _, _} -> - Fun(Pid); - _ -> - {error, no_manager_svr} - end. - %%-------------------------------------------------------------------- %% Dispatch APIs %%-------------------------------------------------------------------- -spec cast(atom(), map()) -> ok. cast(Hookpoint, Req) -> - cast(Hookpoint, Req, emqx_exhook_mngr:running()). + cast(Hookpoint, Req, emqx_exhook_mgr:running()). cast(_, _, []) -> ok; cast(Hookpoint, Req, [ServerName|More]) -> %% XXX: Need a real asynchronous running _ = emqx_exhook_server:call(Hookpoint, Req, - emqx_exhook_mngr:server(ServerName)), + emqx_exhook_mgr:server(ServerName)), cast(Hookpoint, Req, More). --spec call_fold(atom(), term(), function()) - -> {ok, term()} - | {stop, term()}. +-spec call_fold(atom(), term(), function()) -> {ok, term()} + | {stop, term()}. call_fold(Hookpoint, Req, AccFun) -> - FailedAction = emqx_exhook_mngr:get_request_failed_action(), - ServerNames = emqx_exhook_mngr:running(), - case ServerNames == [] andalso FailedAction == deny of - true -> + case emqx_exhook_mgr:running() of + [] -> {stop, deny_action_result(Hookpoint, Req)}; - _ -> - call_fold(Hookpoint, Req, FailedAction, AccFun, ServerNames) + ServerNames -> + call_fold(Hookpoint, Req, AccFun, ServerNames) end. -call_fold(_, Req, _, _, []) -> +call_fold(_, Req, _, []) -> {ok, Req}; -call_fold(Hookpoint, Req, FailedAction, AccFun, [ServerName|More]) -> - Server = emqx_exhook_mngr:server(ServerName), +call_fold(Hookpoint, Req, AccFun, [ServerName|More]) -> + Server = emqx_exhook_mgr:server(ServerName), case emqx_exhook_server:call(Hookpoint, Req, Server) of {ok, Resp} -> case AccFun(Req, Resp) of {stop, NReq} -> {stop, NReq}; {ok, NReq} -> - call_fold(Hookpoint, NReq, FailedAction, AccFun, More); + call_fold(Hookpoint, NReq, AccFun, More); _ -> - call_fold(Hookpoint, Req, FailedAction, AccFun, More) + call_fold(Hookpoint, Req, AccFun, More) end; _ -> - case FailedAction of + case emqx_exhook_server:failed_action(Server) of + ignore -> + call_fold(Hookpoint, Req, AccFun, More); deny -> - {stop, deny_action_result(Hookpoint, Req)}; - _ -> - call_fold(Hookpoint, Req, FailedAction, AccFun, More) + {stop, deny_action_result(Hookpoint, Req)} end end. diff --git a/apps/emqx_exhook/src/emqx_exhook_api.erl b/apps/emqx_exhook/src/emqx_exhook_api.erl new file mode 100644 index 000000000..4684c6796 --- /dev/null +++ b/apps/emqx_exhook/src/emqx_exhook_api.erl @@ -0,0 +1,281 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exhook_api). + +-behaviour(minirest_api). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("emqx/include/logger.hrl"). + +-export([api_spec/0, paths/0, schema/1, fields/1, namespace/0]). + +-export([exhooks/2, action_with_name/2, move/2]). + +-import(hoconsc, [mk/2, ref/1, enum/1, array/1]). +-import(emqx_dashboard_swagger, [schema_with_example/2, error_codes/2]). + +-define(TAGS, [<<"exhooks">>]). +-define(BAD_REQUEST, 'BAD_REQUEST'). +-define(BAD_RPC, 'BAD_RPC'). + +namespace() -> "exhook". + +api_spec() -> + emqx_dashboard_swagger:spec(?MODULE). + +paths() -> ["/exhooks", "/exhooks/:name", "/exhooks/:name/move"]. + +schema(("/exhooks")) -> + #{ + 'operationId' => exhooks, + get => #{tags => ?TAGS, + description => <<"List all servers">>, + responses => #{200 => mk(array(ref(detailed_server_info)), #{})} + }, + post => #{tags => ?TAGS, + description => <<"Add a servers">>, + 'requestBody' => server_conf_schema(), + responses => #{201 => mk(ref(detailed_server_info), #{}), + 500 => error_codes([?BAD_RPC], <<"Bad RPC">>) + } + } + }; + +schema("/exhooks/:name") -> + #{'operationId' => action_with_name, + get => #{tags => ?TAGS, + description => <<"Get the detail information of server">>, + parameters => params_server_name_in_path(), + responses => #{200 => mk(ref(detailed_server_info), #{}), + 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>) + } + }, + put => #{tags => ?TAGS, + description => <<"Update the server">>, + parameters => params_server_name_in_path(), + 'requestBody' => server_conf_schema(), + responses => #{200 => <<>>, + 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>), + 500 => error_codes([?BAD_RPC], <<"Bad RPC">>) + } + }, + delete => #{tags => ?TAGS, + description => <<"Delete the server">>, + parameters => params_server_name_in_path(), + responses => #{204 => <<>>, + 500 => error_codes([?BAD_RPC], <<"Bad RPC">>) } + } + }; + +schema("/exhooks/:name/move") -> + #{'operationId' => move, + post => #{tags => ?TAGS, + description => <<"Move the server">>, + parameters => params_server_name_in_path(), + 'requestBody' => mk(ref(move_req), #{}), + responses => #{200 => <<>>, + 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>), + 500 => error_codes([?BAD_RPC], <<"Bad RPC">>) + } + } + }. + +fields(move_req) -> + [ + {position, mk(enum([top, bottom, before, 'after']), #{})}, + {related, mk(string(), #{desc => <<"Relative position of movement">>, + default => <<>>, + example => <<>> + })} + ]; + +fields(detailed_server_info) -> + [ {status, mk(enum([running, waiting, stopped]), #{})} + , {hooks, mk(array(string()), #{default => []})} + , {node_status, mk(ref(node_status), #{})} + ] ++ emqx_exhook_schema:server_config(); + +fields(node_status) -> + [ {node, mk(string(), #{})} + , {status, mk(enum([running, waiting, stopped, not_found, error]), #{})} + ]; + +fields(server_config) -> + emqx_exhook_schema:server_config(). + +params_server_name_in_path() -> + [{name, mk(string(), #{in => path, + required => true, + example => <<"default">>})} + ]. + +server_conf_schema() -> + schema_with_example(ref(server_config), + #{ name => "default" + , enable => true + , url => <<"http://127.0.0.1:8081">> + , request_timeout => "5s" + , failed_action => deny + , auto_reconnect => "60s" + , pool_size => 8 + , ssl => #{ enable => false + , cacertfile => <<"{{ platform_etc_dir }}/certs/cacert.pem">> + , certfile => <<"{{ platform_etc_dir }}/certs/cert.pem">> + , keyfile => <<"{{ platform_etc_dir }}/certs/key.pem">> + } + }). + + +exhooks(get, _) -> + ServerL = emqx_exhook_mgr:list(), + ServerL2 = nodes_all_server_status(ServerL), + {200, ServerL2}; + +exhooks(post, #{body := Body}) -> + case emqx_exhook_mgr:update_config([emqx_exhook, servers], {add, Body}) of + {ok, Result} -> + {201, Result}; + {error, Error} -> + {500, #{code => <<"BAD_RPC">>, + message => Error + }} + end. + +action_with_name(get, #{bindings := #{name := Name}}) -> + Result = emqx_exhook_mgr:lookup(Name), + NodeStatus = nodes_server_status(Name), + case Result of + not_found -> + {400, #{code => <<"BAD_REQUEST">>, + message => <<"Server not found">> + }}; + ServerInfo -> + {200, ServerInfo#{node_status => NodeStatus}} + end; + +action_with_name(put, #{bindings := #{name := Name}, body := Body}) -> + case emqx_exhook_mgr:update_config([emqx_exhook, servers], + {update, Name, Body}) of + {ok, not_found} -> + {400, #{code => <<"BAD_REQUEST">>, + message => <<"Server not found">> + }}; + {ok, {error, Reason}} -> + {400, #{code => <<"BAD_REQUEST">>, + message => unicode:characters_to_binary(io_lib:format("Error Reason:~p~n", [Reason])) + }}; + {ok, _} -> + {200}; + {error, Error} -> + {500, #{code => <<"BAD_RPC">>, + message => Error + }} + end; + +action_with_name(delete, #{bindings := #{name := Name}}) -> + case emqx_exhook_mgr:update_config([emqx_exhook, servers], + {delete, Name}) of + {ok, _} -> + {200}; + {error, Error} -> + {500, #{code => <<"BAD_RPC">>, + message => Error + }} + end. + +move(post, #{bindings := #{name := Name}, body := Body}) -> + #{<<"position">> := PositionT, <<"related">> := Related} = Body, + Position = erlang:binary_to_atom(PositionT), + case emqx_exhook_mgr:update_config([emqx_exhook, servers], + {move, Name, Position, Related}) of + {ok, ok} -> + {200}; + {ok, not_found} -> + {400, #{code => <<"BAD_REQUEST">>, + message => <<"Server not found">> + }}; + {error, Error} -> + {500, #{code => <<"BAD_RPC">>, + message => Error + }} + end. + +nodes_server_status(Name) -> + StatusL = call_cluster(emqx_exhook_mgr, server_status, [Name]), + + Handler = fun({Node, {error, _}}) -> + #{node => Node, + status => error + }; + ({Node, Status}) -> + #{node => Node, + status => Status + } + end, + + lists:map(Handler, StatusL). + +nodes_all_server_status(ServerL) -> + AllStatusL = call_cluster(emqx_exhook_mgr, all_servers_status, []), + + AggreMap = lists:foldl(fun(#{name := Name}, Acc) -> + Acc#{Name => []} + end, + #{}, + ServerL), + + AddToMap = fun(Servers, Node, Status, Map) -> + lists:foldl(fun(Name, Acc) -> + StatusL = maps:get(Name, Acc), + StatusL2 = [#{node => Node, + status => Status + } | StatusL], + Acc#{Name := StatusL2} + end, + Map, + Servers) + end, + + AggreMap2 = lists:foldl(fun({Node, #{running := Running, + waiting := Waiting, + stopped := Stopped}}, + Acc) -> + AddToMap(Stopped, Node, stopped, + AddToMap(Waiting, Node, waiting, + AddToMap(Running, Node, running, Acc))) + end, + AggreMap, + AllStatusL), + + Handler = fun(#{name := Name} = Server) -> + Server#{node_status => maps:get(Name, AggreMap2)} + end, + + lists:map(Handler, ServerL). + +call_cluster(Module, Fun, Args) -> + Nodes = mria_mnesia:running_nodes(), + [{Node, rpc_call(Node, Module, Fun, Args)} || Node <- Nodes]. + +rpc_call(Node, Module, Fun, Args) when Node =:= node() -> + erlang:apply(Module, Fun, Args); + +rpc_call(Node, Module, Fun, Args) -> + case rpc:call(Node, Module, Fun, Args) of + {badrpc, Reason} -> {error, Reason}; + Res -> Res + end. diff --git a/apps/emqx_exhook/src/emqx_exhook_cli.erl b/apps/emqx_exhook/src/emqx_exhook_cli.erl deleted file mode 100644 index a96cdb6cc..000000000 --- a/apps/emqx_exhook/src/emqx_exhook_cli.erl +++ /dev/null @@ -1,84 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_exhook_cli). - --include("emqx_exhook.hrl"). - --export([cli/1]). - -cli(["server", "list"]) -> - if_enabled(fun() -> - ServerNames = emqx_exhook:list(), - [emqx_ctl:print("Server(~ts)~n", [format(Name)]) || Name <- ServerNames] - end); - -cli(["server", "enable", Name]) -> - if_enabled(fun() -> - print(emqx_exhook:enable(iolist_to_binary(Name))) - end); - -cli(["server", "disable", Name]) -> - if_enabled(fun() -> - print(emqx_exhook:disable(iolist_to_binary(Name))) - end); - -cli(["server", "stats"]) -> - if_enabled(fun() -> - [emqx_ctl:print("~-35s:~w~n", [Name, N]) || {Name, N} <- stats()] - end); - -cli(_) -> - emqx_ctl:usage([{"exhook server list", "List all running exhook server"}, - {"exhook server enable ", "Enable a exhook server in the configuration"}, - {"exhook server disable ", "Disable a exhook server"}, - {"exhook server stats", "Print exhook server statistic"}]). - -print(ok) -> - emqx_ctl:print("ok~n"); -print({error, Reason}) -> - emqx_ctl:print("~p~n", [Reason]). - -%%-------------------------------------------------------------------- -%% Internal funcs -%%-------------------------------------------------------------------- - -if_enabled(Fun) -> - case lists:keymember(?APP, 1, application:which_applications()) of - true -> - Fun(); - _ -> hint() - end. - -hint() -> - emqx_ctl:print("Please './bin/emqx_ctl plugins load emqx_exhook' first.~n"). - -stats() -> - lists:usort(lists:foldr(fun({K, N}, Acc) -> - case atom_to_list(K) of - "exhook." ++ Key -> [{Key, N} | Acc]; - _ -> Acc - end - end, [], emqx_metrics:all())). - -format(Name) -> - case emqx_exhook_mngr:server(Name) of - undefined -> - lists:flatten( - io_lib:format("name=~ts, hooks=#{}, active=false", [Name])); - Server -> - emqx_exhook_server:format(Server) - end. diff --git a/apps/emqx_exhook/src/emqx_exhook_mgr.erl b/apps/emqx_exhook/src/emqx_exhook_mgr.erl new file mode 100644 index 000000000..b9d03ae7d --- /dev/null +++ b/apps/emqx_exhook/src/emqx_exhook_mgr.erl @@ -0,0 +1,596 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc Manage the server status and reload strategy +-module(emqx_exhook_mgr). + +-behaviour(gen_server). + +-include("emqx_exhook.hrl"). +-include_lib("emqx/include/logger.hrl"). + +%% APIs +-export([start_link/0]). + +%% Mgmt API +-export([ list/0 + , lookup/1 + , enable/1 + , disable/1 + , server_status/1 + , all_servers_status/0 + ]). + +%% Helper funcs +-export([ running/0 + , server/1 + , init_counter_table/0 + ]). + +-export([ update_config/2 + , pre_config_update/3 + , post_config_update/5 + ]). + +%% gen_server callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +-export([roots/0]). + +-type state() :: #{%% Running servers + running := servers(), + %% Wait to reload servers + waiting := servers(), + %% Marked stopped servers + stopped := servers(), + %% Timer references + trefs := map(), + orders := orders() + }. + +-type server_name() :: binary(). +-type servers() :: #{server_name() => server()}. +-type server() :: server_options(). +-type server_options() :: map(). + +-type move_direct() :: top + | bottom + | before + | 'after'. + +-type orders() :: #{server_name() => integer()}. + +-type server_info() :: #{name := server_name(), + status := running | waiting | stopped, + + atom() => term() + }. + +-define(DEFAULT_TIMEOUT, 60000). +-define(CNTER, emqx_exhook_counter). + +-export_type([server_info/0]). + +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- + +-spec start_link() -> ignore + | {ok, pid()} + | {error, any()}. +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +list() -> + call(list). + +-spec lookup(server_name()) -> not_found | server_info(). +lookup(Name) -> + call({lookup, Name}). + +enable(Name) -> + update_config([emqx_exhook, servers], {enable, Name, true}). + +disable(Name) -> + update_config([emqx_exhook, servers], {enable, Name, false}). + +server_status(Name) -> + call({server_status, Name}). + +all_servers_status() -> + call(all_servers_status). + +call(Req) -> + gen_server:call(?MODULE, Req, ?DEFAULT_TIMEOUT). + +init_counter_table() -> + _ = ets:new(?CNTER, [named_table, public]). + +%%===================================================================== +%% Hocon schema +roots() -> + emqx_exhook_schema:server_config(). + +update_config(KeyPath, UpdateReq) -> + case emqx_conf:update(KeyPath, UpdateReq, #{override_to => cluster}) of + {ok, UpdateResult} -> + #{post_config_update := #{?MODULE := Result}} = UpdateResult, + {ok, Result}; + Error -> + Error + end. + +pre_config_update(_, {add, Conf}, OldConf) -> + {ok, OldConf ++ [Conf]}; + +pre_config_update(_, {update, Name, Conf}, OldConf) -> + case replace_conf(Name, fun(_) -> Conf end, OldConf) of + not_found -> {error, not_found}; + NewConf -> {ok, NewConf} + end; + +pre_config_update(_, {delete, ToDelete}, OldConf) -> + {ok, lists:dropwhile(fun(#{<<"name">> := Name}) -> Name =:= ToDelete end, + OldConf)}; + +pre_config_update(_, {move, Name, Position, Relate}, OldConf) -> + case do_move(Name, Position, Relate, OldConf) of + not_found -> {error, not_found}; + NewConf -> {ok, NewConf} + end; + +pre_config_update(_, {enable, Name, Enable}, OldConf) -> + case replace_conf(Name, + fun(Conf) -> Conf#{<<"enable">> => Enable} end, OldConf) of + not_found -> {error, not_found}; + NewConf -> + ct:pal(">>>> enable Name:~p Enable:~p, New:~p~n", [Name, Enable, NewConf]), + {ok, NewConf} + end. + +post_config_update(_KeyPath, UpdateReq, NewConf, _OldConf, _AppEnvs) -> + {ok, call({update_config, UpdateReq, NewConf})}. + +%%===================================================================== + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init([]) -> + process_flag(trap_exit, true), + emqx_conf:add_handler([emqx_exhook, servers], ?MODULE), + ServerL = emqx:get_config([emqx_exhook, servers]), + {Waiting, Running, Stopped} = load_all_servers(ServerL), + Orders = reorder(ServerL), + {ok, ensure_reload_timer( + #{waiting => Waiting, + running => Running, + stopped => Stopped, + trefs => #{}, + orders => Orders + })}. + +-spec load_all_servers(list(server_options())) -> {servers(), servers(), servers()}. +load_all_servers(ServerL) -> + load_all_servers(ServerL, #{}, #{}, #{}). + +load_all_servers([#{name := Name} = Options | More], Waiting, Running, Stopped) -> + case emqx_exhook_server:load(Name, Options) of + {ok, ServerState} -> + save(Name, ServerState), + load_all_servers(More, Waiting, Running#{Name => Options}, Stopped); + {error, _} -> + load_all_servers(More, Waiting#{Name => Options}, Running, Stopped); + disable -> + load_all_servers(More, Waiting, Running, Stopped#{Name => Options}) + end; + +load_all_servers([], Waiting, Running, Stopped) -> + {Waiting, Running, Stopped}. + +handle_call(list, _From, State = #{running := Running, + waiting := Waiting, + stopped := Stopped, + orders := Orders}) -> + + R = get_servers_info(running, Running), + W = get_servers_info(waiting, Waiting), + S = get_servers_info(stopped, Stopped), + + Servers = R ++ W ++ S, + OrderServers = sort_name_by_order(Servers, Orders), + + {reply, OrderServers, State}; + +handle_call({update_config, {move, _Name, _Direct, _Related}, NewConfL}, + _From, + State) -> + Orders = reorder(NewConfL), + {reply, ok, State#{orders := Orders}}; + +handle_call({update_config, {delete, ToDelete}, _}, _From, State) -> + {ok, #{orders := Orders, + stopped := Stopped + } = State2} = do_unload_server(ToDelete, State), + + State3 = State2#{stopped := maps:remove(ToDelete, Stopped), + orders := maps:remove(ToDelete, Orders) + }, + + {reply, ok, State3}; + +handle_call({update_config, {add, RawConf}, NewConfL}, + _From, + #{running := Running, waiting := Waitting, stopped := Stopped} = State) -> + {_, #{name := Name} = Conf} = emqx_config:check_config(?MODULE, RawConf), + + case emqx_exhook_server:load(Name, Conf) of + {ok, ServerState} -> + save(Name, ServerState), + Status = running, + Hooks = hooks(Name), + State2 = State#{running := Running#{Name => Conf}}; + {error, _} -> + Status = running, + Hooks = [], + StateT = State#{waiting := Waitting#{Name => Conf}}, + State2 = ensure_reload_timer(StateT); + disable -> + Status = stopped, + Hooks = [], + State2 = State#{stopped := Stopped#{Name => Conf}} + end, + Orders = reorder(NewConfL), + Resulte = maps:merge(Conf, #{status => Status, hooks => Hooks}), + {reply, Resulte, State2#{orders := Orders}}; + +handle_call({lookup, Name}, _From, State) -> + case where_is_server(Name, State) of + not_found -> + Result = not_found; + {Where, #{Name := Conf}} -> + Result = maps:merge(Conf, + #{ status => Where + , hooks => hooks(Name) + }) + end, + {reply, Result, State}; + +handle_call({update_config, {update, Name, _Conf}, NewConfL}, _From, State) -> + {Result, State2} = restart_server(Name, NewConfL, State), + {reply, Result, State2}; + +handle_call({update_config, {enable, Name, _Enable}, NewConfL}, _From, State) -> + {Result, State2} = restart_server(Name, NewConfL, State), + {reply, Result, State2}; + +handle_call({server_status, Name}, _From, State) -> + case where_is_server(Name, State) of + not_found -> + Result = not_found; + {Status, _} -> + Result = Status + end, + {reply, Result, State}; + +handle_call(all_servers_status, _From, #{running := Running, + waiting := Waiting, + stopped := Stopped} = State) -> + {reply, #{running => maps:keys(Running), + waiting => maps:keys(Waiting), + stopped => maps:keys(Stopped)}, State}; + +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({timeout, _Ref, {reload, Name}}, State) -> + {Result, NState} = do_load_server(Name, State), + case Result of + ok -> + {noreply, NState}; + {error, not_found} -> + {noreply, NState}; + {error, Reason} -> + ?LOG(warning, "Failed to reload exhook callback server \"~ts\", " + "Reason: ~0p", [Name, Reason]), + {noreply, ensure_reload_timer(NState)} + end; + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, State = #{running := Running}) -> + _ = maps:fold(fun(Name, _, AccIn) -> + {ok, NAccIn} = do_unload_server(Name, AccIn), + NAccIn + end, State, Running), + _ = unload_exhooks(), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal funcs +%%-------------------------------------------------------------------- + +unload_exhooks() -> + [emqx:unhook(Name, {M, F}) || + {Name, {M, F, _A}} <- ?ENABLED_HOOKS]. + +-spec do_load_server(server_name(), state()) -> {{error, not_found}, state()} + | {{error, already_started}, state()} + | {ok, state()}. +do_load_server(Name, State = #{orders := Orders}) -> + case where_is_server(Name, State) of + not_found -> + {{error, not_found}, State}; + {running, _} -> + {ok, State}; + {Where, Map} -> + State2 = clean_reload_timer(Name, State), + {Options, Map2} = maps:take(Name, Map), + State3 = State2#{Where := Map2}, + #{running := Running, + stopped := Stopped} = State3, + case emqx_exhook_server:load(Name, Options) of + {ok, ServerState} -> + save(Name, ServerState), + update_order(Orders), + ?LOG(info, "Load exhook callback server " + "\"~ts\" successfully!", [Name]), + {ok, State3#{running := maps:put(Name, Options, Running)}}; + {error, Reason} -> + {{error, Reason}, State}; + disable -> + {ok, State3#{stopped := Stopped#{Name => Options}}} + end + end. + +-spec do_unload_server(server_name(), state()) -> {ok, state()}. +do_unload_server(Name, #{stopped := Stopped} = State) -> + case where_is_server(Name, State) of + {stopped, _} -> {ok, State}; + {waiting, Waiting} -> + {Options, Waiting2} = maps:take(Name, Waiting), + {ok, clean_reload_timer(Name, + State#{waiting := Waiting2, + stopped := maps:put(Name, Options, Stopped) + } + )}; + {running, Running} -> + Service = server(Name), + ok = unsave(Name), + ok = emqx_exhook_server:unload(Service), + {Options, Running2} = maps:take(Name, Running), + {ok, State#{running := Running2, + stopped := maps:put(Name, Options, Stopped) + }}; + not_found -> {ok, State} + end. + +-spec ensure_reload_timer(state()) -> state(). +ensure_reload_timer(State = #{waiting := Waiting, + stopped := Stopped, + trefs := TRefs}) -> + Iter = maps:iterator(Waiting), + + {Waitting2, Stopped2, TRefs2} = + ensure_reload_timer(maps:next(Iter), Waiting, Stopped, TRefs), + + State#{waiting := Waitting2, + stopped := Stopped2, + trefs := TRefs2}. + +ensure_reload_timer(none, Waiting, Stopped, TimerRef) -> + {Waiting, Stopped, TimerRef}; + +ensure_reload_timer({Name, #{auto_reconnect := Intv}, Iter}, + Waiting, + Stopped, + TimerRef) -> + Next = maps:next(Iter), + case maps:is_key(Name, TimerRef) of + true -> + ensure_reload_timer(Next, Waiting, Stopped, TimerRef); + _ -> + Ref = erlang:start_timer(Intv, self(), {reload, Name}), + TimerRef2 = maps:put(Name, Ref, TimerRef), + ensure_reload_timer(Next, Waiting, Stopped, TimerRef2) + end; + +ensure_reload_timer({Name, Opts, Iter}, Waiting, Stopped, TimerRef) -> + ensure_reload_timer(maps:next(Iter), + maps:remove(Name, Waiting), + maps:put(Name, Opts, Stopped), + TimerRef). + +-spec clean_reload_timer(server_name(), state()) -> state(). +clean_reload_timer(Name, State = #{trefs := TRefs}) -> + case maps:take(Name, TRefs) of + error -> State; + {TRef, NTRefs} -> + _ = erlang:cancel_timer(TRef), + State#{trefs := NTRefs} + end. + +-spec do_move(binary(), move_direct(), binary(), list(server_options())) -> + not_found | list(server_options()). +do_move(Name, Direct, ToName, ConfL) -> + move(ConfL, Name, Direct, ToName, []). + +move([#{<<"name">> := Name} = Server | T], Name, Direct, ToName, HeadL) -> + move_to(Direct, ToName, Server, lists:reverse(HeadL) ++ T); + +move([Server | T], Name, Direct, ToName, HeadL) -> + move(T, Name, Direct, ToName, [Server | HeadL]); + +move([], _Name, _Direct, _ToName, _HeadL) -> + not_found. + +move_to(top, _, Server, ServerL) -> + [Server | ServerL]; + +move_to(bottom, _, Server, ServerL) -> + ServerL ++ [Server]; + +move_to(Direct, ToName, Server, ServerL) -> + move_to(ServerL, Direct, ToName, Server, []). + +move_to([#{<<"name">> := Name} | _] = T, before, Name, Server, HeadL) -> + lists:reverse(HeadL) ++ [Server | T]; + +move_to([#{<<"name">> := Name} = H | T], 'after', Name, Server, HeadL) -> + lists:reverse(HeadL) ++ [H, Server | T]; + +move_to([H | T], Direct, Name, Server, HeadL) -> + move_to(T, Direct, Name, Server, [H | HeadL]); + +move_to([], _Direct, _Name, _Server, _HeadL) -> + not_found. + +-spec reorder(list(server_options())) -> orders(). +reorder(ServerL) -> + Orders = reorder(ServerL, 1, #{}), + update_order(Orders), + Orders. + +reorder([#{name := Name} | T], Order, Orders) -> + reorder(T, Order + 1, Orders#{Name => Order}); + +reorder([], _Order, Orders) -> + Orders. + +get_servers_info(Status, Map) -> + Fold = fun(Name, Conf, Acc) -> + [maps:merge(Conf, #{status => Status, + hooks => hooks(Name)}) | Acc] + end, + maps:fold(Fold, [], Map). + + +where_is_server(Name, #{running := Running}) when is_map_key(Name, Running) -> + {running, Running}; + +where_is_server(Name, #{waiting := Waiting}) when is_map_key(Name, Waiting) -> + {waiting, Waiting}; + +where_is_server(Name, #{stopped := Stopped}) when is_map_key(Name, Stopped) -> + {stopped, Stopped}; + +where_is_server(_, _) -> + not_found. + +-type replace_fun() :: fun((server_options()) -> server_options()). + +-spec replace_conf(binary(), replace_fun(), list(server_options())) -> not_found + | list(server_options()). +replace_conf(Name, ReplaceFun, ConfL) -> + replace_conf(ConfL, Name, ReplaceFun, []). + +replace_conf([#{<<"name">> := Name} = H | T], Name, ReplaceFun, HeadL) -> + New = ReplaceFun(H), + lists:reverse(HeadL) ++ [New | T]; + +replace_conf([H | T], Name, ReplaceFun, HeadL) -> + replace_conf(T, Name, ReplaceFun, [H | HeadL]); + +replace_conf([], _, _, _) -> + not_found. + +-spec restart_server(binary(), list(server_options()), state()) -> {ok, state()} + | {{error, term()}, state()}. +restart_server(Name, ConfL, State) -> + case lists:search(fun(#{name := CName}) -> CName =:= Name end, ConfL) of + false -> + {{error, not_found}, State}; + {value, Conf} -> + case where_is_server(Name, State) of + not_found -> + {{error, not_found}, State}; + {Where, Map} -> + State2 = State#{Where := Map#{Name := Conf}}, + {ok, State3} = do_unload_server(Name, State2), + case do_load_server(Name, State3) of + {ok, State4} -> + {ok, State4}; + {Error, State4} -> + {Error, State4} + end + end + end. + +sort_name_by_order(Names, Orders) -> + lists:sort(fun(A, B) when is_binary(A) -> + maps:get(A, Orders) < maps:get(B, Orders); + (#{name := A}, #{name := B}) -> + maps:get(A, Orders) < maps:get(B, Orders) + end, + Names). +%%-------------------------------------------------------------------- +%% Server state persistent +save(Name, ServerState) -> + Saved = persistent_term:get(?APP, []), + persistent_term:put(?APP, lists:reverse([Name | Saved])), + persistent_term:put({?APP, Name}, ServerState). + +unsave(Name) -> + case persistent_term:get(?APP, []) of + [] -> + ok; + Saved -> + case lists:member(Name, Saved) of + false -> + ok; + true -> + persistent_term:put(?APP, lists:delete(Name, Saved)) + end + end, + persistent_term:erase({?APP, Name}), + ok. + +running() -> + persistent_term:get(?APP, []). + +server(Name) -> + case persistent_term:get({?APP, Name}, undefined) of + undefined -> undefined; + Service -> Service + end. + +update_order(Orders) -> + Running = running(), + Running2 = sort_name_by_order(Running, Orders), + persistent_term:put(?APP, Running2). + +hooks(Name) -> + case server(Name) of + undefined -> + []; + Service -> + emqx_exhook_server:hookpoints(Service) + end. diff --git a/apps/emqx_exhook/src/emqx_exhook_mngr.erl b/apps/emqx_exhook/src/emqx_exhook_mngr.erl deleted file mode 100644 index cd2658f93..000000000 --- a/apps/emqx_exhook/src/emqx_exhook_mngr.erl +++ /dev/null @@ -1,329 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - -%% @doc Manage the server status and reload strategy --module(emqx_exhook_mngr). - --behaviour(gen_server). - --include("emqx_exhook.hrl"). --include_lib("emqx/include/logger.hrl"). - -%% APIs --export([start_link/3]). - -%% Mgmt API --export([ enable/2 - , disable/2 - , list/1 - ]). - -%% Helper funcs --export([ running/0 - , server/1 - , put_request_failed_action/1 - , get_request_failed_action/0 - , put_pool_size/1 - , get_pool_size/0 - ]). - -%% gen_server callbacks --export([ init/1 - , handle_call/3 - , handle_cast/2 - , handle_info/2 - , terminate/2 - , code_change/3 - ]). - --record(state, { - %% Running servers - running :: map(), %% XXX: server order? - %% Wait to reload servers - waiting :: map(), - %% Marked stopped servers - stopped :: map(), - %% Auto reconnect timer interval - auto_reconnect :: false | non_neg_integer(), - %% Request options - request_options :: grpc_client:options(), - %% Timer references - trefs :: map() - }). - --type servers() :: [{Name :: atom(), server_options()}]. - --type server_options() :: [ {scheme, http | https} - | {host, string()} - | {port, inet:port_number()} - ]. - --define(DEFAULT_TIMEOUT, 60000). - --define(CNTER, emqx_exhook_counter). - -%%-------------------------------------------------------------------- -%% APIs -%%-------------------------------------------------------------------- - --spec start_link(servers(), false | non_neg_integer(), grpc_client:options()) - ->ignore - | {ok, pid()} - | {error, any()}. -start_link(Servers, AutoReconnect, ReqOpts) -> - gen_server:start_link(?MODULE, [Servers, AutoReconnect, ReqOpts], []). - --spec enable(pid(), binary()) -> ok | {error, term()}. -enable(Pid, Name) -> - call(Pid, {load, Name}). - --spec disable(pid(), binary()) -> ok | {error, term()}. -disable(Pid, Name) -> - call(Pid, {unload, Name}). - -list(Pid) -> - call(Pid, list). - -call(Pid, Req) -> - gen_server:call(Pid, Req, ?DEFAULT_TIMEOUT). - -%%-------------------------------------------------------------------- -%% gen_server callbacks -%%-------------------------------------------------------------------- - -init([Servers, AutoReconnect, ReqOpts0]) -> - process_flag(trap_exit, true), - %% XXX: Due to the ExHook Module in the enterprise, - %% this process may start multiple times and they will share this table - try - _ = ets:new(?CNTER, [named_table, public]), ok - catch - error:badarg:_ -> - ok - end, - - %% put the global option - put_request_failed_action( - maps:get(request_failed_action, ReqOpts0, deny) - ), - put_pool_size( - maps:get(pool_size, ReqOpts0, erlang:system_info(schedulers)) - ), - - %% Load the hook servers - ReqOpts = maps:without([request_failed_action], ReqOpts0), - {Waiting, Running} = load_all_servers(Servers, ReqOpts), - {ok, ensure_reload_timer( - #state{waiting = Waiting, - running = Running, - stopped = #{}, - request_options = ReqOpts, - auto_reconnect = AutoReconnect, - trefs = #{} - } - )}. - -%% @private -load_all_servers(Servers, ReqOpts) -> - load_all_servers(Servers, ReqOpts, #{}, #{}). -load_all_servers([], _Request, Waiting, Running) -> - {Waiting, Running}; -load_all_servers([#{name := Name0} = Options0 | More], ReqOpts, Waiting, Running) -> - Name = iolist_to_binary(Name0), - Options = Options0#{name => Name}, - {NWaiting, NRunning} = - case emqx_exhook_server:load(Name, Options, ReqOpts) of - {ok, ServerState} -> - save(Name, ServerState), - {Waiting, Running#{Name => Options}}; - {error, _} -> - {Waiting#{Name => Options}, Running} - end, - load_all_servers(More, ReqOpts, NWaiting, NRunning). - -handle_call({load, Name}, _From, State) -> - {Result, NState} = do_load_server(Name, State), - {reply, Result, NState}; - -handle_call({unload, Name}, _From, State) -> - case do_unload_server(Name, State) of - {error, Reason} -> - {reply, {error, Reason}, State}; - {ok, NState} -> - {reply, ok, NState} - end; - -handle_call(list, _From, State = #state{ - running = Running, - waiting = Waiting, - stopped = Stopped}) -> - ServerNames = maps:keys(Running) - ++ maps:keys(Waiting) - ++ maps:keys(Stopped), - {reply, ServerNames, State}; - -handle_call(_Request, _From, State) -> - Reply = ok, - {reply, Reply, State}. - -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info({timeout, _Ref, {reload, Name}}, State) -> - {Result, NState} = do_load_server(Name, State), - case Result of - ok -> - {noreply, NState}; - {error, not_found} -> - {noreply, NState}; - {error, Reason} -> - ?SLOG(warning, #{msg => "failed_to_reload_exhook_callback_server", - server_name => Name, - reason => Reason}), - {noreply, ensure_reload_timer(NState)} - end; - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, State = #state{running = Running}) -> - _ = maps:fold(fun(Name, _, AccIn) -> - case do_unload_server(Name, AccIn) of - {ok, NAccIn} -> NAccIn; - _ -> AccIn - end - end, State, Running), - _ = unload_exhooks(), - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- -%% Internal funcs -%%-------------------------------------------------------------------- - -unload_exhooks() -> - [emqx:unhook(Name, {M, F}) || - {Name, {M, F, _A}} <- ?ENABLED_HOOKS]. - -do_load_server(Name, State0 = #state{ - waiting = Waiting, - running = Running, - stopped = Stopped, - request_options = ReqOpts}) -> - State = clean_reload_timer(Name, State0), - case maps:get(Name, Running, undefined) of - undefined -> - case maps:get(Name, Stopped, - maps:get(Name, Waiting, undefined)) of - undefined -> - {{error, not_found}, State}; - Options -> - case emqx_exhook_server:load(Name, Options, ReqOpts) of - {ok, ServerState} -> - save(Name, ServerState), - ?SLOG(info, #{msg => "load_exhook_callback_server_successfully", - server_name => Name}), - {ok, State#state{ - running = maps:put(Name, Options, Running), - waiting = maps:remove(Name, Waiting), - stopped = maps:remove(Name, Stopped) - } - }; - {error, Reason} -> - {{error, Reason}, State} - end - end; - _ -> - {{error, already_started}, State} - end. - -do_unload_server(Name, State = #state{running = Running, stopped = Stopped}) -> - case maps:take(Name, Running) of - error -> {error, not_running}; - {Options, NRunning} -> - ok = emqx_exhook_server:unload(server(Name)), - ok = unsave(Name), - {ok, State#state{running = NRunning, - stopped = maps:put(Name, Options, Stopped) - }} - end. - -ensure_reload_timer(State = #state{auto_reconnect = false}) -> - State; -ensure_reload_timer(State = #state{waiting = Waiting, - trefs = TRefs, - auto_reconnect = Intv}) -> - NRefs = maps:fold(fun(Name, _, AccIn) -> - case maps:get(Name, AccIn, undefined) of - undefined -> - Ref = erlang:start_timer(Intv, self(), {reload, Name}), - AccIn#{Name => Ref}; - _HasRef -> - AccIn - end - end, TRefs, Waiting), - State#state{trefs = NRefs}. - -clean_reload_timer(Name, State = #state{trefs = TRefs}) -> - case maps:take(Name, TRefs) of - error -> State; - {TRef, NTRefs} -> - _ = erlang:cancel_timer(TRef), - State#state{trefs = NTRefs} - end. - -%%-------------------------------------------------------------------- -%% Server state persistent - -put_request_failed_action(Val) -> - persistent_term:put({?APP, request_failed_action}, Val). - -get_request_failed_action() -> - persistent_term:get({?APP, request_failed_action}). - -put_pool_size(Val) -> - persistent_term:put({?APP, pool_size}, Val). - -get_pool_size() -> - %% Avoid the scenario that the parameter is not set after - %% the hot upgrade completed. - persistent_term:get({?APP, pool_size}, erlang:system_info(schedulers)). - -save(Name, ServerState) -> - Saved = persistent_term:get(?APP, []), - persistent_term:put(?APP, lists:reverse([Name | Saved])), - persistent_term:put({?APP, Name}, ServerState). - -unsave(Name) -> - case persistent_term:get(?APP, []) of - [] -> - persistent_term:erase(?APP); - Saved -> - persistent_term:put(?APP, lists:delete(Name, Saved)) - end, - persistent_term:erase({?APP, Name}), - ok. - -running() -> - persistent_term:get(?APP, []). - -server(Name) -> - case catch persistent_term:get({?APP, Name}) of - {'EXIT', {badarg,_}} -> undefined; - Service -> Service - end. diff --git a/apps/emqx_exhook/src/emqx_exhook_schema.erl b/apps/emqx_exhook/src/emqx_exhook_schema.erl index 21ca5c3f0..eb75a10ff 100644 --- a/apps/emqx_exhook/src/emqx_exhook_schema.erl +++ b/apps/emqx_exhook/src/emqx_exhook_schema.erl @@ -32,61 +32,58 @@ -reflect_type([duration/0]). --export([namespace/0, roots/0, fields/1]). +-export([namespace/0, roots/0, fields/1, server_config/0]). -namespace() -> exhook. +namespace() -> emqx_exhook. -roots() -> [exhook]. +roots() -> [emqx_exhook]. -fields(exhook) -> - [ {request_failed_action, - sc(hoconsc:enum([deny, ignore]), - #{default => deny})} - , {request_timeout, - sc(duration(), - #{default => "5s"})} - , {auto_reconnect, - sc(hoconsc:union([false, duration()]), - #{ default => "60s" - })} - , {pool_size, - sc(integer(), - #{ nullable => true - })} - , {servers, - sc(hoconsc:array(ref(servers)), +fields(emqx_exhook) -> + [{servers, + sc(hoconsc:array(ref(server)), #{default => []})} ]; -fields(servers) -> - [ {name, - sc(string(), - #{})} - , {url, - sc(string(), - #{})} +fields(server) -> + [ {name, sc(binary(), #{})} + , {enable, sc(boolean(), #{default => true})} + , {url, sc(binary(), #{})} + , {request_timeout, + sc(duration(), #{default => "5s"})} + , {failed_action, failed_action()} , {ssl, - sc(ref(ssl_conf), - #{})} + sc(ref(ssl_conf), #{})} + , {auto_reconnect, + sc(hoconsc:union([false, duration()]), + #{default => "60s"})} + , {pool_size, + sc(integer(), #{default => 8, example => 8})} ]; fields(ssl_conf) -> - [ {cacertfile, - sc(string(), - #{}) - } + [ {enable, sc(boolean(), #{default => true})} + , {cacertfile, + sc(binary(), + #{example => <<"{{ platform_etc_dir }}/certs/cacert.pem">>}) + } , {certfile, - sc(string(), - #{}) - } + sc(binary(), + #{example => <<"{{ platform_etc_dir }}/certs/cert.pem">>}) + } , {keyfile, - sc(string(), - #{})} + sc(binary(), + #{example => <<"{{ platform_etc_dir }}/certs/key.pem">>})} ]. %% types - sc(Type, Meta) -> Meta#{type => Type}. ref(Field) -> hoconsc:ref(?MODULE, Field). + +failed_action() -> + sc(hoconsc:enum([deny, ignore]), + #{default => deny}). + +server_config() -> + fields(server). diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index b66b30a26..fff3172d3 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -24,7 +24,7 @@ -define(PB_CLIENT_MOD, emqx_exhook_v_1_hook_provider_client). %% Load/Unload --export([ load/3 +-export([ load/2 , unload/1 ]). @@ -33,23 +33,24 @@ %% Infos -export([ name/1 + , hookpoints/1 , format/1 + , failed_action/1 ]). --record(server, { - %% Server name (equal to grpc client channel name) - name :: binary(), - %% The function options - options :: map(), - %% gRPC channel pid - channel :: pid(), - %% Registered hook names and options - hookspec :: #{hookpoint() => map()}, - %% Metrcis name prefix - prefix :: list() - }). --type server() :: #server{}. +-type server() :: #{%% Server name (equal to grpc client channel name) + name := binary(), + %% The function options + options := map(), + %% gRPC channel pid + channel := pid(), + %% Registered hook names and options + hookspec := #{hookpoint() => map()}, + %% Metrcis name prefix + prefix := list() + }. + -type hookpoint() :: 'client.connect' | 'client.connack' @@ -81,9 +82,13 @@ %% Load/Unload APIs %%-------------------------------------------------------------------- --spec load(binary(), map(), map()) -> {ok, server()} | {error, term()} . -load(Name, Opts0, ReqOpts) -> - {SvrAddr, ClientOpts} = channel_opts(Opts0), +-spec load(binary(), map()) -> {ok, server()} | {error, term()} | disable. +load(_Name, #{enable := false}) -> + disable; + +load(Name, #{request_timeout := Timeout, failed_action := FailedAction} = Opts) -> + ReqOpts = #{timeout => Timeout, failed_action => FailedAction}, + {SvrAddr, ClientOpts} = channel_opts(Opts), case emqx_exhook_sup:start_grpc_client_channel( Name, SvrAddr, @@ -92,16 +97,15 @@ load(Name, Opts0, ReqOpts) -> case do_init(Name, ReqOpts) of {ok, HookSpecs} -> %% Reigster metrics - Prefix = lists:flatten( - io_lib:format("exhook.~ts.", [Name])), + Prefix = lists:flatten(io_lib:format("exhook.~ts.", [Name])), ensure_metrics(Prefix, HookSpecs), %% Ensure hooks ensure_hooks(HookSpecs), - {ok, #server{name = Name, - options = ReqOpts, - channel = _ChannPoolPid, - hookspec = HookSpecs, - prefix = Prefix }}; + {ok, #{name => Name, + options => ReqOpts, + channel => _ChannPoolPid, + hookspec => HookSpecs, + prefix => Prefix }}; {error, _} = E -> emqx_exhook_sup:stop_grpc_client_channel(Name), E end; @@ -110,14 +114,16 @@ load(Name, Opts0, ReqOpts) -> %% @private channel_opts(Opts = #{url := URL}) -> - ClientOpts = #{pool_size => emqx_exhook_mngr:get_pool_size()}, + ClientOpts = maps:merge(#{pool_size => erlang:system_info(schedulers)}, + Opts), case uri_string:parse(URL) of - #{scheme := "http", host := Host, port := Port} -> + #{scheme := <<"http">>, host := Host, port := Port} -> {format_http_uri("http", Host, Port), ClientOpts}; - #{scheme := "https", host := Host, port := Port} -> + #{scheme := <<"https">>, host := Host, port := Port} -> SslOpts = case maps:get(ssl, Opts, undefined) of undefined -> []; + #{enable := false} -> []; MapOpts -> filter( [{cacertfile, maps:get(cacertfile, MapOpts, undefined)}, @@ -131,8 +137,8 @@ channel_opts(Opts = #{url := URL}) -> transport_opts => SslOpts} }, {format_http_uri("https", Host, Port), NClientOpts}; - _ -> - error(bad_server_url) + Error -> + error({bad_server_url, URL, Error}) end. format_http_uri(Scheme, Host, Port) -> @@ -142,7 +148,7 @@ filter(Ls) -> [ E || E <- Ls, E /= undefined]. -spec unload(server()) -> ok. -unload(#server{name = Name, options = ReqOpts, hookspec = HookSpecs}) -> +unload(#{name := Name, options := ReqOpts, hookspec := HookSpecs}) -> _ = do_deinit(Name, ReqOpts), _ = may_unload_hooks(HookSpecs), _ = emqx_exhook_sup:stop_grpc_client_channel(Name), @@ -155,7 +161,7 @@ do_deinit(Name, ReqOpts) -> do_init(ChannName, ReqOpts) -> %% BrokerInfo defined at: exhook.protos BrokerInfo = maps:with([version, sysdescr, uptime, datetime], - maps:from_list(emqx_sys:info())), + maps:from_list(emqx_sys:info())), Req = #{broker => BrokerInfo}, case do_call(ChannName, 'on_provider_loaded', Req, ReqOpts) of {ok, InitialResp} -> @@ -227,7 +233,7 @@ may_unload_hooks(HookSpecs) -> end end, maps:keys(HookSpecs)). -format(#server{name = Name, hookspec = Hooks}) -> +format(#{name := Name, hookspec := Hooks}) -> lists:flatten( io_lib:format("name=~ts, hooks=~0p, active=true", [Name, Hooks])). @@ -235,15 +241,17 @@ format(#server{name = Name, hookspec = Hooks}) -> %% APIs %%-------------------------------------------------------------------- -name(#server{name = Name}) -> +name(#{name := Name}) -> Name. --spec call(hookpoint(), map(), server()) - -> ignore - | {ok, Resp :: term()} - | {error, term()}. -call(Hookpoint, Req, #server{name = ChannName, options = ReqOpts, - hookspec = Hooks, prefix = Prefix}) -> +hookpoints(#{hookspec := Hooks}) -> + maps:keys(Hooks). + +-spec call(hookpoint(), map(), server()) -> ignore + | {ok, Resp :: term()} + | {error, term()}. +call(Hookpoint, Req, #{name := ChannName, options := ReqOpts, + hookspec := Hooks, prefix := Prefix}) -> GrpcFunc = hk2func(Hookpoint), case maps:get(Hookpoint, Hooks, undefined) of undefined -> ignore; @@ -299,6 +307,9 @@ do_call(ChannName, Fun, Req, ReqOpts) -> {error, Reason} end. +failed_action(#{options := Opts}) -> + maps:get(failed_action, Opts). + %%-------------------------------------------------------------------- %% Internal funcs %%-------------------------------------------------------------------- diff --git a/apps/emqx_exhook/src/emqx_exhook_sup.erl b/apps/emqx_exhook/src/emqx_exhook_sup.erl index ca8d7c856..74b97ca1a 100644 --- a/apps/emqx_exhook/src/emqx_exhook_sup.erl +++ b/apps/emqx_exhook/src/emqx_exhook_sup.erl @@ -42,25 +42,10 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - Mngr = ?CHILD(emqx_exhook_mngr, worker, - [servers(), auto_reconnect(), request_options()]), + _ = emqx_exhook_mgr:init_counter_table(), + Mngr = ?CHILD(emqx_exhook_mgr, worker, []), {ok, {{one_for_one, 10, 100}, [Mngr]}}. -servers() -> - env(servers, []). - -auto_reconnect() -> - env(auto_reconnect, 60000). - -request_options() -> - #{timeout => env(request_timeout, 5000), - request_failed_action => env(request_failed_action, deny), - pool_size => env(pool_size, erlang:system_info(schedulers)) - }. - -env(Key, Def) -> - emqx_conf:get([exhook, Key], Def). - %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl index dd020ce85..a58ccd4bd 100644 --- a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl @@ -21,14 +21,14 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). -define(CONF_DEFAULT, <<" -exhook: { - servers: [ - { name: \"default\" - url: \"http://127.0.0.1:9000\" - } - ] +emqx_exhook +{servers = [ + {name = default, + url = \"http://127.0.0.1:9000\" + }] } ">>). @@ -39,27 +39,53 @@ exhook: { all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Cfg) -> + application:load(emqx_conf), + ok = ekka:start(), + ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity), + meck:new(emqx_alarm, [non_strict, passthrough, no_link]), + meck:expect(emqx_alarm, activate, 3, ok), + meck:expect(emqx_alarm, deactivate, 3, ok), + _ = emqx_exhook_demo_svr:start(), ok = emqx_config:init_load(emqx_exhook_schema, ?CONF_DEFAULT), emqx_common_test_helpers:start_apps([emqx_exhook]), Cfg. end_per_suite(_Cfg) -> + ekka:stop(), + mria:stop(), + mria_mnesia:delete_schema(), + meck:unload(emqx_alarm), + emqx_common_test_helpers:stop_apps([emqx_exhook]), emqx_exhook_demo_svr:stop(). +init_per_testcase(_, Config) -> + {ok, _} = emqx_cluster_rpc:start_link(), + timer:sleep(200), + Config. + +end_per_testcase(_, Config) -> + case erlang:whereis(node()) of + undefined -> ok; + P -> + erlang:unlink(P), + erlang:exit(P, kill) + end, + Config. + %%-------------------------------------------------------------------- %% Test cases %%-------------------------------------------------------------------- t_noserver_nohook(_) -> - emqx_exhook:disable(<<"default">>), + emqx_exhook_mgr:disable(<<"default">>), ?assertEqual([], ets:tab2list(emqx_hooks)), - ok = emqx_exhook:enable(<<"default">>), + {ok, _} = emqx_exhook_mgr:enable(<<"default">>), ?assertNotEqual([], ets:tab2list(emqx_hooks)). t_access_failed_if_no_server_running(_) -> - emqx_exhook:disable(<<"default">>), + emqx_exhook_mgr:disable(<<"default">>), ClientInfo = #{clientid => <<"user-id-1">>, username => <<"usera">>, peerhost => {127,0,0,1}, @@ -76,30 +102,7 @@ t_access_failed_if_no_server_running(_) -> Message = emqx_message:make(<<"t/1">>, <<"abc">>), ?assertMatch({stop, Message}, emqx_exhook_handler:on_message_publish(Message)), - emqx_exhook:enable(<<"default">>). - -t_cli_list(_) -> - meck_print(), - ?assertEqual( [[emqx_exhook_server:format(emqx_exhook_mngr:server(Name)) || Name <- emqx_exhook:list()]] - , emqx_exhook_cli:cli(["server", "list"]) - ), - unmeck_print(). - -t_cli_enable_disable(_) -> - meck_print(), - ?assertEqual([already_started], emqx_exhook_cli:cli(["server", "enable", "default"])), - ?assertEqual(ok, emqx_exhook_cli:cli(["server", "disable", "default"])), - ?assertEqual([["name=default, hooks=#{}, active=false"]], emqx_exhook_cli:cli(["server", "list"])), - - ?assertEqual([not_running], emqx_exhook_cli:cli(["server", "disable", "default"])), - ?assertEqual(ok, emqx_exhook_cli:cli(["server", "enable", "default"])), - unmeck_print(). - -t_cli_stats(_) -> - meck_print(), - _ = emqx_exhook_cli:cli(["server", "stats"]), - _ = emqx_exhook_cli:cli(x), - unmeck_print(). + emqx_exhook_mgr:enable(<<"default">>). %%-------------------------------------------------------------------- %% Utils @@ -115,13 +118,13 @@ unmeck_print() -> loaded_exhook_hookpoints() -> lists:filtermap(fun(E) -> - Name = element(2, E), - Callbacks = element(3, E), - case lists:any(fun is_exhook_callback/1, Callbacks) of - true -> {true, Name}; - _ -> false - end - end, ets:tab2list(emqx_hooks)). + Name = element(2, E), + Callbacks = element(3, E), + case lists:any(fun is_exhook_callback/1, Callbacks) of + true -> {true, Name}; + _ -> false + end + end, ets:tab2list(emqx_hooks)). is_exhook_callback(Cb) -> Action = element(2, Cb), diff --git a/apps/emqx_exhook/test/emqx_exhook_api_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_api_SUITE.erl new file mode 100644 index 000000000..6f4ead040 --- /dev/null +++ b/apps/emqx_exhook/test/emqx_exhook_api_SUITE.erl @@ -0,0 +1,197 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exhook_api_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +-define(HOST, "http://127.0.0.1:18083/"). +-define(API_VERSION, "v5"). +-define(BASE_PATH, "api"). +-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard). + +-define(CONF_DEFAULT, <<" +emqx_exhook {servers = [ + {name = default, + url = \"http://127.0.0.1:9000\" + } + ] + } +">>). + +all() -> + [t_list, t_get, t_add, t_move_1, t_move_2, t_delete, t_update]. + +init_per_suite(Config) -> + application:load(emqx_conf), + ok = ekka:start(), + ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity), + meck:new(emqx_alarm, [non_strict, passthrough, no_link]), + meck:expect(emqx_alarm, activate, 3, ok), + meck:expect(emqx_alarm, deactivate, 3, ok), + + _ = emqx_exhook_demo_svr:start(), + ok = emqx_config:init_load(emqx_exhook_schema, ?CONF_DEFAULT), + emqx_mgmt_api_test_util:init_suite([emqx_exhook]), + [Conf] = emqx:get_config([emqx_exhook, servers]), + [{template, Conf} | Config]. + +end_per_suite(Config) -> + ekka:stop(), + mria:stop(), + mria_mnesia:delete_schema(), + meck:unload(emqx_alarm), + + emqx_mgmt_api_test_util:end_suite([emqx_exhook]), + emqx_exhook_demo_svr:stop(), + emqx_exhook_demo_svr:stop(<<"test1">>), + Config. + +init_per_testcase(t_add, Config) -> + {ok, _} = emqx_cluster_rpc:start_link(), + _ = emqx_exhook_demo_svr:start(<<"test1">>, 9001), + timer:sleep(200), + Config; + +init_per_testcase(_, Config) -> + {ok, _} = emqx_cluster_rpc:start_link(), + timer:sleep(200), + Config. + +end_per_testcase(_, Config) -> + case erlang:whereis(node()) of + undefined -> ok; + P -> + erlang:unlink(P), + erlang:exit(P, kill) + end, + Config. + +t_list(_) -> + {ok, Data} = request_api(get, api_path(["exhooks"]), "", + auth_header_()), + + List = decode_json(Data), + ?assertEqual(1, length(List)), + + [Svr] = List, + + ?assertMatch(#{name := <<"default">>, + status := <<"running">>}, Svr). + +t_get(_) -> + {ok, Data} = request_api(get, api_path(["exhooks", "default"]), "", + auth_header_()), + + Svr = decode_json(Data), + + ?assertMatch(#{name := <<"default">>, + status := <<"running">>}, Svr). + +t_add(Cfg) -> + Template = proplists:get_value(template, Cfg), + Instance = Template#{name => <<"test1">>, + url => "http://127.0.0.1:9001" + }, + {ok, Data} = request_api(post, api_path(["exhooks"]), "", + auth_header_(), Instance), + + Svr = decode_json(Data), + + ?assertMatch(#{name := <<"test1">>, + status := <<"running">>}, Svr), + + ?assertMatch([<<"default">>, <<"test1">>], emqx_exhook_mgr:running()). + +t_move_1(_) -> + Result = request_api(post, api_path(["exhooks", "default", "move"]), "", + auth_header_(), + #{position => bottom, related => <<>>}), + + ?assertMatch({ok, <<>>}, Result), + ?assertMatch([<<"test1">>, <<"default">>], emqx_exhook_mgr:running()). + +t_move_2(_) -> + Result = request_api(post, api_path(["exhooks", "default", "move"]), "", + auth_header_(), + #{position => before, related => <<"test1">>}), + + ?assertMatch({ok, <<>>}, Result), + ?assertMatch([<<"default">>, <<"test1">>], emqx_exhook_mgr:running()). + +t_delete(_) -> + Result = request_api(delete, api_path(["exhooks", "test1"]), "", + auth_header_()), + + ?assertMatch({ok, <<>>}, Result), + ?assertMatch([<<"default">>], emqx_exhook_mgr:running()). + +t_update(Cfg) -> + Template = proplists:get_value(template, Cfg), + Instance = Template#{enable => false}, + {ok, <<>>} = request_api(put, api_path(["exhooks", "default"]), "", + auth_header_(), Instance), + + ?assertMatch([], emqx_exhook_mgr:running()). + +decode_json(Data) -> + BinJosn = emqx_json:decode(Data, [return_maps]), + emqx_map_lib:unsafe_atom_key_map(BinJosn). + +request_api(Method, Url, Auth) -> + request_api(Method, Url, [], Auth, []). + +request_api(Method, Url, QueryParams, Auth) -> + request_api(Method, Url, QueryParams, Auth, []). + +request_api(Method, Url, QueryParams, Auth, []) -> + NewUrl = case QueryParams of + "" -> Url; + _ -> Url ++ "?" ++ QueryParams + end, + do_request_api(Method, {NewUrl, [Auth]}); +request_api(Method, Url, QueryParams, Auth, Body) -> + NewUrl = case QueryParams of + "" -> Url; + _ -> Url ++ "?" ++ QueryParams + end, + do_request_api(Method, {NewUrl, [Auth], "application/json", emqx_json:encode(Body)}). + +do_request_api(Method, Request)-> + case httpc:request(Method, Request, [], [{body_format, binary}]) of + {error, socket_closed_remotely} -> + {error, socket_closed_remotely}; + {ok, {{"HTTP/1.1", Code, _}, _, Return} } + when Code =:= 200 orelse Code =:= 204 orelse Code =:= 201 -> + {ok, Return}; + {ok, {Reason, _, _}} -> + {error, Reason} + end. + +auth_header_() -> + AppId = <<"admin">>, + AppSecret = <<"public">>, + auth_header_(binary_to_list(AppId), binary_to_list(AppSecret)). + +auth_header_(User, Pass) -> + Encoded = base64:encode_to_string(lists:append([User,":",Pass])), + {"Authorization","Basic " ++ Encoded}. + +api_path(Parts)-> + ?HOST ++ filename:join([?BASE_PATH, ?API_VERSION] ++ Parts). diff --git a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl index b1e3801b2..9d1e084c2 100644 --- a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl +++ b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl @@ -20,7 +20,9 @@ %% -export([ start/0 + , start/2 , stop/0 + , stop/1 , take/0 , in/1 ]). @@ -57,39 +59,45 @@ %%-------------------------------------------------------------------- start() -> - Pid = spawn(fun mngr_main/0), - register(?MODULE, Pid), + start(?NAME, ?PORT). + +start(Name, Port) -> + Pid = spawn(fun() -> mgr_main(Name, Port) end), + register(to_atom_name(Name), Pid), {ok, Pid}. stop() -> - grpc:stop_server(?NAME), - ?MODULE ! stop. + stop(?NAME). + +stop(Name) -> + grpc:stop_server(Name), + to_atom_name(Name) ! stop. take() -> - ?MODULE ! {take, self()}, + to_atom_name(?NAME) ! {take, self()}, receive {value, V} -> V after 5000 -> error(timeout) end. in({FunName, Req}) -> - ?MODULE ! {in, FunName, Req}. + to_atom_name(?NAME) ! {in, FunName, Req}. -mngr_main() -> +mgr_main(Name, Port) -> application:ensure_all_started(grpc), Services = #{protos => [emqx_exhook_pb], services => #{'emqx.exhook.v1.HookProvider' => emqx_exhook_demo_svr} }, Options = [], - Svr = grpc:start_server(?NAME, ?PORT, Services, Options), - mngr_loop([Svr, queue:new(), queue:new()]). + Svr = grpc:start_server(Name, Port, Services, Options), + mgr_loop([Svr, queue:new(), queue:new()]). -mngr_loop([Svr, Q, Takes]) -> +mgr_loop([Svr, Q, Takes]) -> receive {in, FunName, Req} -> {NQ1, NQ2} = reply(queue:in({FunName, Req}, Q), Takes), - mngr_loop([Svr, NQ1, NQ2]); + mgr_loop([Svr, NQ1, NQ2]); {take, From} -> {NQ1, NQ2} = reply(Q, queue:in(From, Takes)), - mngr_loop([Svr, NQ1, NQ2]); + mgr_loop([Svr, NQ1, NQ2]); stop -> exit(normal) end. @@ -105,12 +113,18 @@ reply(Q1, Q2) -> {NQ1, NQ2} end. +to_atom_name(Name) when is_atom(Name) -> + Name; + +to_atom_name(Name) -> + erlang:binary_to_atom(Name). + %%-------------------------------------------------------------------- %% callbacks %%-------------------------------------------------------------------- -spec on_provider_loaded(emqx_exhook_pb:provider_loaded_request(), grpc:metadata()) - -> {ok, emqx_exhook_pb:loaded_response(), grpc:metadata()} + -> {ok, emqx_exhook_pb:loaded_response(), grpc:metadata()} | {error, grpc_cowboy_h:error_response()}. on_provider_loaded(Req, Md) -> diff --git a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl index cbd7a2a2a..076fe7134 100644 --- a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl +++ b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl @@ -31,12 +31,11 @@ ]). -define(CONF_DEFAULT, <<" -exhook: { - servers: [ - { name: \"default\" - url: \"http://127.0.0.1:9000\" - } - ] +emqx_exhook +{servers = [ + {name = default, + url = \"http://127.0.0.1:9000\" + }] } ">>).