refactor(emqx_exhook): refactore exhook and add api module
This commit is contained in:
parent
a9b443ae34
commit
7c9c7b6a60
|
@ -2,43 +2,45 @@
|
|||
## EMQ X Hooks
|
||||
##====================================================================
|
||||
|
||||
exhook {
|
||||
## The default value or action will be returned, while the request to
|
||||
## the gRPC server failed or no available grpc server running.
|
||||
##
|
||||
## Default: deny
|
||||
## Value: ignore | deny
|
||||
request_failed_action = deny
|
||||
emqx_exhook {
|
||||
|
||||
## The timeout to request grpc server
|
||||
servers = [
|
||||
##{
|
||||
## name = default
|
||||
##
|
||||
## Default: 5s
|
||||
## Value: Duration
|
||||
request_timeout = 5s
|
||||
|
||||
## Whether to automatically reconnect (initialize) the gRPC server
|
||||
##
|
||||
## When gRPC is not available, exhook tries to request the gRPC service at
|
||||
## that interval and reinitialize the list of mounted hooks.
|
||||
##
|
||||
## Default: false
|
||||
## Value: false | Duration
|
||||
auto_reconnect = 60s
|
||||
## auto_reconnect = 60s
|
||||
|
||||
## The process pool size for gRPC client
|
||||
## The default value or action will be returned, while the request to
|
||||
## the gRPC server failed or no available grpc server running.
|
||||
##
|
||||
## Default: Equals cpu cores
|
||||
## Value: Integer
|
||||
#pool_size = 16
|
||||
## Default: deny
|
||||
## Value: ignore | deny
|
||||
## failed_action = deny
|
||||
|
||||
servers = [
|
||||
# { name: "default"
|
||||
# url: "http://127.0.0.1:9000"
|
||||
# #ssl: {
|
||||
# # cacertfile: "{{ platform_etc_dir }}/certs/cacert.pem"
|
||||
# # certfile: "{{ platform_etc_dir }}/certs/cert.pem"
|
||||
# # keyfile: "{{ platform_etc_dir }}/certs/key.pem"
|
||||
# #}
|
||||
# }
|
||||
## The timeout to request grpc server
|
||||
##
|
||||
## Default: 5s
|
||||
## Value: Duration
|
||||
## request_timeout = 5s
|
||||
|
||||
## url = "http://127.0.0.1:9000"
|
||||
## ssl {
|
||||
## cacertfile: "{{ platform_etc_dir }}/certs/cacert.pem"
|
||||
## certfile: "{{ platform_etc_dir }}/certs/cert.pem"
|
||||
## keyfile: "{{ platform_etc_dir }}/certs/key.pem"
|
||||
## }
|
||||
##
|
||||
## The process pool size for gRPC client
|
||||
##
|
||||
## Default: Equals cpu cores
|
||||
## Value: Integer
|
||||
## pool_size = 16
|
||||
##}
|
||||
]
|
||||
}
|
||||
|
|
|
@ -19,90 +19,56 @@
|
|||
-include("emqx_exhook.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
|
||||
-export([ enable/1
|
||||
, disable/1
|
||||
, list/0
|
||||
]).
|
||||
|
||||
-export([ cast/2
|
||||
, call_fold/3
|
||||
]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Mgmt APIs
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec enable(binary()) -> ok | {error, term()}.
|
||||
enable(Name) ->
|
||||
with_mngr(fun(Pid) -> emqx_exhook_mngr:enable(Pid, Name) end).
|
||||
|
||||
-spec disable(binary()) -> ok | {error, term()}.
|
||||
disable(Name) ->
|
||||
with_mngr(fun(Pid) -> emqx_exhook_mngr:disable(Pid, Name) end).
|
||||
|
||||
-spec list() -> [atom() | string()].
|
||||
list() ->
|
||||
with_mngr(fun(Pid) -> emqx_exhook_mngr:list(Pid) end).
|
||||
|
||||
with_mngr(Fun) ->
|
||||
case lists:keyfind(emqx_exhook_mngr, 1,
|
||||
supervisor:which_children(emqx_exhook_sup)) of
|
||||
{_, Pid, _, _} ->
|
||||
Fun(Pid);
|
||||
_ ->
|
||||
{error, no_manager_svr}
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Dispatch APIs
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec cast(atom(), map()) -> ok.
|
||||
cast(Hookpoint, Req) ->
|
||||
cast(Hookpoint, Req, emqx_exhook_mngr:running()).
|
||||
cast(Hookpoint, Req, emqx_exhook_mgr:running()).
|
||||
|
||||
cast(_, _, []) ->
|
||||
ok;
|
||||
cast(Hookpoint, Req, [ServerName|More]) ->
|
||||
%% XXX: Need a real asynchronous running
|
||||
_ = emqx_exhook_server:call(Hookpoint, Req,
|
||||
emqx_exhook_mngr:server(ServerName)),
|
||||
emqx_exhook_mgr:server(ServerName)),
|
||||
cast(Hookpoint, Req, More).
|
||||
|
||||
-spec call_fold(atom(), term(), function())
|
||||
-> {ok, term()}
|
||||
| {stop, term()}.
|
||||
-spec call_fold(atom(), term(), function()) -> {ok, term()}
|
||||
| {stop, term()}.
|
||||
call_fold(Hookpoint, Req, AccFun) ->
|
||||
FailedAction = emqx_exhook_mngr:get_request_failed_action(),
|
||||
ServerNames = emqx_exhook_mngr:running(),
|
||||
case ServerNames == [] andalso FailedAction == deny of
|
||||
true ->
|
||||
case emqx_exhook_mgr:running() of
|
||||
[] ->
|
||||
{stop, deny_action_result(Hookpoint, Req)};
|
||||
_ ->
|
||||
call_fold(Hookpoint, Req, FailedAction, AccFun, ServerNames)
|
||||
ServerNames ->
|
||||
call_fold(Hookpoint, Req, AccFun, ServerNames)
|
||||
end.
|
||||
|
||||
call_fold(_, Req, _, _, []) ->
|
||||
call_fold(_, Req, _, []) ->
|
||||
{ok, Req};
|
||||
call_fold(Hookpoint, Req, FailedAction, AccFun, [ServerName|More]) ->
|
||||
Server = emqx_exhook_mngr:server(ServerName),
|
||||
call_fold(Hookpoint, Req, AccFun, [ServerName|More]) ->
|
||||
Server = emqx_exhook_mgr:server(ServerName),
|
||||
case emqx_exhook_server:call(Hookpoint, Req, Server) of
|
||||
{ok, Resp} ->
|
||||
case AccFun(Req, Resp) of
|
||||
{stop, NReq} ->
|
||||
{stop, NReq};
|
||||
{ok, NReq} ->
|
||||
call_fold(Hookpoint, NReq, FailedAction, AccFun, More);
|
||||
call_fold(Hookpoint, NReq, AccFun, More);
|
||||
_ ->
|
||||
call_fold(Hookpoint, Req, FailedAction, AccFun, More)
|
||||
call_fold(Hookpoint, Req, AccFun, More)
|
||||
end;
|
||||
_ ->
|
||||
case FailedAction of
|
||||
case emqx_exhook_server:failed_action(Server) of
|
||||
ignore ->
|
||||
call_fold(Hookpoint, Req, AccFun, More);
|
||||
deny ->
|
||||
{stop, deny_action_result(Hookpoint, Req)};
|
||||
_ ->
|
||||
call_fold(Hookpoint, Req, FailedAction, AccFun, More)
|
||||
{stop, deny_action_result(Hookpoint, Req)}
|
||||
end
|
||||
end.
|
||||
|
||||
|
|
|
@ -0,0 +1,281 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_exhook_api).
|
||||
|
||||
-behaviour(minirest_api).
|
||||
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
-export([api_spec/0, paths/0, schema/1, fields/1, namespace/0]).
|
||||
|
||||
-export([exhooks/2, action_with_name/2, move/2]).
|
||||
|
||||
-import(hoconsc, [mk/2, ref/1, enum/1, array/1]).
|
||||
-import(emqx_dashboard_swagger, [schema_with_example/2, error_codes/2]).
|
||||
|
||||
-define(TAGS, [<<"exhooks">>]).
|
||||
-define(BAD_REQUEST, 'BAD_REQUEST').
|
||||
-define(BAD_RPC, 'BAD_RPC').
|
||||
|
||||
namespace() -> "exhook".
|
||||
|
||||
api_spec() ->
|
||||
emqx_dashboard_swagger:spec(?MODULE).
|
||||
|
||||
paths() -> ["/exhooks", "/exhooks/:name", "/exhooks/:name/move"].
|
||||
|
||||
schema(("/exhooks")) ->
|
||||
#{
|
||||
'operationId' => exhooks,
|
||||
get => #{tags => ?TAGS,
|
||||
description => <<"List all servers">>,
|
||||
responses => #{200 => mk(array(ref(detailed_server_info)), #{})}
|
||||
},
|
||||
post => #{tags => ?TAGS,
|
||||
description => <<"Add a servers">>,
|
||||
'requestBody' => server_conf_schema(),
|
||||
responses => #{201 => mk(ref(detailed_server_info), #{}),
|
||||
500 => error_codes([?BAD_RPC], <<"Bad RPC">>)
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
schema("/exhooks/:name") ->
|
||||
#{'operationId' => action_with_name,
|
||||
get => #{tags => ?TAGS,
|
||||
description => <<"Get the detail information of server">>,
|
||||
parameters => params_server_name_in_path(),
|
||||
responses => #{200 => mk(ref(detailed_server_info), #{}),
|
||||
400 => error_codes([?BAD_REQUEST], <<"Bad Request">>)
|
||||
}
|
||||
},
|
||||
put => #{tags => ?TAGS,
|
||||
description => <<"Update the server">>,
|
||||
parameters => params_server_name_in_path(),
|
||||
'requestBody' => server_conf_schema(),
|
||||
responses => #{200 => <<>>,
|
||||
400 => error_codes([?BAD_REQUEST], <<"Bad Request">>),
|
||||
500 => error_codes([?BAD_RPC], <<"Bad RPC">>)
|
||||
}
|
||||
},
|
||||
delete => #{tags => ?TAGS,
|
||||
description => <<"Delete the server">>,
|
||||
parameters => params_server_name_in_path(),
|
||||
responses => #{204 => <<>>,
|
||||
500 => error_codes([?BAD_RPC], <<"Bad RPC">>) }
|
||||
}
|
||||
};
|
||||
|
||||
schema("/exhooks/:name/move") ->
|
||||
#{'operationId' => move,
|
||||
post => #{tags => ?TAGS,
|
||||
description => <<"Move the server">>,
|
||||
parameters => params_server_name_in_path(),
|
||||
'requestBody' => mk(ref(move_req), #{}),
|
||||
responses => #{200 => <<>>,
|
||||
400 => error_codes([?BAD_REQUEST], <<"Bad Request">>),
|
||||
500 => error_codes([?BAD_RPC], <<"Bad RPC">>)
|
||||
}
|
||||
}
|
||||
}.
|
||||
|
||||
fields(move_req) ->
|
||||
[
|
||||
{position, mk(enum([top, bottom, before, 'after']), #{})},
|
||||
{related, mk(string(), #{desc => <<"Relative position of movement">>,
|
||||
default => <<>>,
|
||||
example => <<>>
|
||||
})}
|
||||
];
|
||||
|
||||
fields(detailed_server_info) ->
|
||||
[ {status, mk(enum([running, waiting, stopped]), #{})}
|
||||
, {hooks, mk(array(string()), #{default => []})}
|
||||
, {node_status, mk(ref(node_status), #{})}
|
||||
] ++ emqx_exhook_schema:server_config();
|
||||
|
||||
fields(node_status) ->
|
||||
[ {node, mk(string(), #{})}
|
||||
, {status, mk(enum([running, waiting, stopped, not_found, error]), #{})}
|
||||
];
|
||||
|
||||
fields(server_config) ->
|
||||
emqx_exhook_schema:server_config().
|
||||
|
||||
params_server_name_in_path() ->
|
||||
[{name, mk(string(), #{in => path,
|
||||
required => true,
|
||||
example => <<"default">>})}
|
||||
].
|
||||
|
||||
server_conf_schema() ->
|
||||
schema_with_example(ref(server_config),
|
||||
#{ name => "default"
|
||||
, enable => true
|
||||
, url => <<"http://127.0.0.1:8081">>
|
||||
, request_timeout => "5s"
|
||||
, failed_action => deny
|
||||
, auto_reconnect => "60s"
|
||||
, pool_size => 8
|
||||
, ssl => #{ enable => false
|
||||
, cacertfile => <<"{{ platform_etc_dir }}/certs/cacert.pem">>
|
||||
, certfile => <<"{{ platform_etc_dir }}/certs/cert.pem">>
|
||||
, keyfile => <<"{{ platform_etc_dir }}/certs/key.pem">>
|
||||
}
|
||||
}).
|
||||
|
||||
|
||||
exhooks(get, _) ->
|
||||
ServerL = emqx_exhook_mgr:list(),
|
||||
ServerL2 = nodes_all_server_status(ServerL),
|
||||
{200, ServerL2};
|
||||
|
||||
exhooks(post, #{body := Body}) ->
|
||||
case emqx_exhook_mgr:update_config([emqx_exhook, servers], {add, Body}) of
|
||||
{ok, Result} ->
|
||||
{201, Result};
|
||||
{error, Error} ->
|
||||
{500, #{code => <<"BAD_RPC">>,
|
||||
message => Error
|
||||
}}
|
||||
end.
|
||||
|
||||
action_with_name(get, #{bindings := #{name := Name}}) ->
|
||||
Result = emqx_exhook_mgr:lookup(Name),
|
||||
NodeStatus = nodes_server_status(Name),
|
||||
case Result of
|
||||
not_found ->
|
||||
{400, #{code => <<"BAD_REQUEST">>,
|
||||
message => <<"Server not found">>
|
||||
}};
|
||||
ServerInfo ->
|
||||
{200, ServerInfo#{node_status => NodeStatus}}
|
||||
end;
|
||||
|
||||
action_with_name(put, #{bindings := #{name := Name}, body := Body}) ->
|
||||
case emqx_exhook_mgr:update_config([emqx_exhook, servers],
|
||||
{update, Name, Body}) of
|
||||
{ok, not_found} ->
|
||||
{400, #{code => <<"BAD_REQUEST">>,
|
||||
message => <<"Server not found">>
|
||||
}};
|
||||
{ok, {error, Reason}} ->
|
||||
{400, #{code => <<"BAD_REQUEST">>,
|
||||
message => unicode:characters_to_binary(io_lib:format("Error Reason:~p~n", [Reason]))
|
||||
}};
|
||||
{ok, _} ->
|
||||
{200};
|
||||
{error, Error} ->
|
||||
{500, #{code => <<"BAD_RPC">>,
|
||||
message => Error
|
||||
}}
|
||||
end;
|
||||
|
||||
action_with_name(delete, #{bindings := #{name := Name}}) ->
|
||||
case emqx_exhook_mgr:update_config([emqx_exhook, servers],
|
||||
{delete, Name}) of
|
||||
{ok, _} ->
|
||||
{200};
|
||||
{error, Error} ->
|
||||
{500, #{code => <<"BAD_RPC">>,
|
||||
message => Error
|
||||
}}
|
||||
end.
|
||||
|
||||
move(post, #{bindings := #{name := Name}, body := Body}) ->
|
||||
#{<<"position">> := PositionT, <<"related">> := Related} = Body,
|
||||
Position = erlang:binary_to_atom(PositionT),
|
||||
case emqx_exhook_mgr:update_config([emqx_exhook, servers],
|
||||
{move, Name, Position, Related}) of
|
||||
{ok, ok} ->
|
||||
{200};
|
||||
{ok, not_found} ->
|
||||
{400, #{code => <<"BAD_REQUEST">>,
|
||||
message => <<"Server not found">>
|
||||
}};
|
||||
{error, Error} ->
|
||||
{500, #{code => <<"BAD_RPC">>,
|
||||
message => Error
|
||||
}}
|
||||
end.
|
||||
|
||||
nodes_server_status(Name) ->
|
||||
StatusL = call_cluster(emqx_exhook_mgr, server_status, [Name]),
|
||||
|
||||
Handler = fun({Node, {error, _}}) ->
|
||||
#{node => Node,
|
||||
status => error
|
||||
};
|
||||
({Node, Status}) ->
|
||||
#{node => Node,
|
||||
status => Status
|
||||
}
|
||||
end,
|
||||
|
||||
lists:map(Handler, StatusL).
|
||||
|
||||
nodes_all_server_status(ServerL) ->
|
||||
AllStatusL = call_cluster(emqx_exhook_mgr, all_servers_status, []),
|
||||
|
||||
AggreMap = lists:foldl(fun(#{name := Name}, Acc) ->
|
||||
Acc#{Name => []}
|
||||
end,
|
||||
#{},
|
||||
ServerL),
|
||||
|
||||
AddToMap = fun(Servers, Node, Status, Map) ->
|
||||
lists:foldl(fun(Name, Acc) ->
|
||||
StatusL = maps:get(Name, Acc),
|
||||
StatusL2 = [#{node => Node,
|
||||
status => Status
|
||||
} | StatusL],
|
||||
Acc#{Name := StatusL2}
|
||||
end,
|
||||
Map,
|
||||
Servers)
|
||||
end,
|
||||
|
||||
AggreMap2 = lists:foldl(fun({Node, #{running := Running,
|
||||
waiting := Waiting,
|
||||
stopped := Stopped}},
|
||||
Acc) ->
|
||||
AddToMap(Stopped, Node, stopped,
|
||||
AddToMap(Waiting, Node, waiting,
|
||||
AddToMap(Running, Node, running, Acc)))
|
||||
end,
|
||||
AggreMap,
|
||||
AllStatusL),
|
||||
|
||||
Handler = fun(#{name := Name} = Server) ->
|
||||
Server#{node_status => maps:get(Name, AggreMap2)}
|
||||
end,
|
||||
|
||||
lists:map(Handler, ServerL).
|
||||
|
||||
call_cluster(Module, Fun, Args) ->
|
||||
Nodes = mria_mnesia:running_nodes(),
|
||||
[{Node, rpc_call(Node, Module, Fun, Args)} || Node <- Nodes].
|
||||
|
||||
rpc_call(Node, Module, Fun, Args) when Node =:= node() ->
|
||||
erlang:apply(Module, Fun, Args);
|
||||
|
||||
rpc_call(Node, Module, Fun, Args) ->
|
||||
case rpc:call(Node, Module, Fun, Args) of
|
||||
{badrpc, Reason} -> {error, Reason};
|
||||
Res -> Res
|
||||
end.
|
|
@ -1,84 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_exhook_cli).
|
||||
|
||||
-include("emqx_exhook.hrl").
|
||||
|
||||
-export([cli/1]).
|
||||
|
||||
cli(["server", "list"]) ->
|
||||
if_enabled(fun() ->
|
||||
ServerNames = emqx_exhook:list(),
|
||||
[emqx_ctl:print("Server(~ts)~n", [format(Name)]) || Name <- ServerNames]
|
||||
end);
|
||||
|
||||
cli(["server", "enable", Name]) ->
|
||||
if_enabled(fun() ->
|
||||
print(emqx_exhook:enable(iolist_to_binary(Name)))
|
||||
end);
|
||||
|
||||
cli(["server", "disable", Name]) ->
|
||||
if_enabled(fun() ->
|
||||
print(emqx_exhook:disable(iolist_to_binary(Name)))
|
||||
end);
|
||||
|
||||
cli(["server", "stats"]) ->
|
||||
if_enabled(fun() ->
|
||||
[emqx_ctl:print("~-35s:~w~n", [Name, N]) || {Name, N} <- stats()]
|
||||
end);
|
||||
|
||||
cli(_) ->
|
||||
emqx_ctl:usage([{"exhook server list", "List all running exhook server"},
|
||||
{"exhook server enable <Name>", "Enable a exhook server in the configuration"},
|
||||
{"exhook server disable <Name>", "Disable a exhook server"},
|
||||
{"exhook server stats", "Print exhook server statistic"}]).
|
||||
|
||||
print(ok) ->
|
||||
emqx_ctl:print("ok~n");
|
||||
print({error, Reason}) ->
|
||||
emqx_ctl:print("~p~n", [Reason]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal funcs
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
if_enabled(Fun) ->
|
||||
case lists:keymember(?APP, 1, application:which_applications()) of
|
||||
true ->
|
||||
Fun();
|
||||
_ -> hint()
|
||||
end.
|
||||
|
||||
hint() ->
|
||||
emqx_ctl:print("Please './bin/emqx_ctl plugins load emqx_exhook' first.~n").
|
||||
|
||||
stats() ->
|
||||
lists:usort(lists:foldr(fun({K, N}, Acc) ->
|
||||
case atom_to_list(K) of
|
||||
"exhook." ++ Key -> [{Key, N} | Acc];
|
||||
_ -> Acc
|
||||
end
|
||||
end, [], emqx_metrics:all())).
|
||||
|
||||
format(Name) ->
|
||||
case emqx_exhook_mngr:server(Name) of
|
||||
undefined ->
|
||||
lists:flatten(
|
||||
io_lib:format("name=~ts, hooks=#{}, active=false", [Name]));
|
||||
Server ->
|
||||
emqx_exhook_server:format(Server)
|
||||
end.
|
|
@ -0,0 +1,596 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Manage the server status and reload strategy
|
||||
-module(emqx_exhook_mgr).
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
-include("emqx_exhook.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
%% APIs
|
||||
-export([start_link/0]).
|
||||
|
||||
%% Mgmt API
|
||||
-export([ list/0
|
||||
, lookup/1
|
||||
, enable/1
|
||||
, disable/1
|
||||
, server_status/1
|
||||
, all_servers_status/0
|
||||
]).
|
||||
|
||||
%% Helper funcs
|
||||
-export([ running/0
|
||||
, server/1
|
||||
, init_counter_table/0
|
||||
]).
|
||||
|
||||
-export([ update_config/2
|
||||
, pre_config_update/3
|
||||
, post_config_update/5
|
||||
]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([ init/1
|
||||
, handle_call/3
|
||||
, handle_cast/2
|
||||
, handle_info/2
|
||||
, terminate/2
|
||||
, code_change/3
|
||||
]).
|
||||
|
||||
-export([roots/0]).
|
||||
|
||||
-type state() :: #{%% Running servers
|
||||
running := servers(),
|
||||
%% Wait to reload servers
|
||||
waiting := servers(),
|
||||
%% Marked stopped servers
|
||||
stopped := servers(),
|
||||
%% Timer references
|
||||
trefs := map(),
|
||||
orders := orders()
|
||||
}.
|
||||
|
||||
-type server_name() :: binary().
|
||||
-type servers() :: #{server_name() => server()}.
|
||||
-type server() :: server_options().
|
||||
-type server_options() :: map().
|
||||
|
||||
-type move_direct() :: top
|
||||
| bottom
|
||||
| before
|
||||
| 'after'.
|
||||
|
||||
-type orders() :: #{server_name() => integer()}.
|
||||
|
||||
-type server_info() :: #{name := server_name(),
|
||||
status := running | waiting | stopped,
|
||||
|
||||
atom() => term()
|
||||
}.
|
||||
|
||||
-define(DEFAULT_TIMEOUT, 60000).
|
||||
-define(CNTER, emqx_exhook_counter).
|
||||
|
||||
-export_type([server_info/0]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% APIs
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec start_link() -> ignore
|
||||
| {ok, pid()}
|
||||
| {error, any()}.
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||
|
||||
list() ->
|
||||
call(list).
|
||||
|
||||
-spec lookup(server_name()) -> not_found | server_info().
|
||||
lookup(Name) ->
|
||||
call({lookup, Name}).
|
||||
|
||||
enable(Name) ->
|
||||
update_config([emqx_exhook, servers], {enable, Name, true}).
|
||||
|
||||
disable(Name) ->
|
||||
update_config([emqx_exhook, servers], {enable, Name, false}).
|
||||
|
||||
server_status(Name) ->
|
||||
call({server_status, Name}).
|
||||
|
||||
all_servers_status() ->
|
||||
call(all_servers_status).
|
||||
|
||||
call(Req) ->
|
||||
gen_server:call(?MODULE, Req, ?DEFAULT_TIMEOUT).
|
||||
|
||||
init_counter_table() ->
|
||||
_ = ets:new(?CNTER, [named_table, public]).
|
||||
|
||||
%%=====================================================================
|
||||
%% Hocon schema
|
||||
roots() ->
|
||||
emqx_exhook_schema:server_config().
|
||||
|
||||
update_config(KeyPath, UpdateReq) ->
|
||||
case emqx_conf:update(KeyPath, UpdateReq, #{override_to => cluster}) of
|
||||
{ok, UpdateResult} ->
|
||||
#{post_config_update := #{?MODULE := Result}} = UpdateResult,
|
||||
{ok, Result};
|
||||
Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
pre_config_update(_, {add, Conf}, OldConf) ->
|
||||
{ok, OldConf ++ [Conf]};
|
||||
|
||||
pre_config_update(_, {update, Name, Conf}, OldConf) ->
|
||||
case replace_conf(Name, fun(_) -> Conf end, OldConf) of
|
||||
not_found -> {error, not_found};
|
||||
NewConf -> {ok, NewConf}
|
||||
end;
|
||||
|
||||
pre_config_update(_, {delete, ToDelete}, OldConf) ->
|
||||
{ok, lists:dropwhile(fun(#{<<"name">> := Name}) -> Name =:= ToDelete end,
|
||||
OldConf)};
|
||||
|
||||
pre_config_update(_, {move, Name, Position, Relate}, OldConf) ->
|
||||
case do_move(Name, Position, Relate, OldConf) of
|
||||
not_found -> {error, not_found};
|
||||
NewConf -> {ok, NewConf}
|
||||
end;
|
||||
|
||||
pre_config_update(_, {enable, Name, Enable}, OldConf) ->
|
||||
case replace_conf(Name,
|
||||
fun(Conf) -> Conf#{<<"enable">> => Enable} end, OldConf) of
|
||||
not_found -> {error, not_found};
|
||||
NewConf ->
|
||||
ct:pal(">>>> enable Name:~p Enable:~p, New:~p~n", [Name, Enable, NewConf]),
|
||||
{ok, NewConf}
|
||||
end.
|
||||
|
||||
post_config_update(_KeyPath, UpdateReq, NewConf, _OldConf, _AppEnvs) ->
|
||||
{ok, call({update_config, UpdateReq, NewConf})}.
|
||||
|
||||
%%=====================================================================
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% gen_server callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
init([]) ->
|
||||
process_flag(trap_exit, true),
|
||||
emqx_conf:add_handler([emqx_exhook, servers], ?MODULE),
|
||||
ServerL = emqx:get_config([emqx_exhook, servers]),
|
||||
{Waiting, Running, Stopped} = load_all_servers(ServerL),
|
||||
Orders = reorder(ServerL),
|
||||
{ok, ensure_reload_timer(
|
||||
#{waiting => Waiting,
|
||||
running => Running,
|
||||
stopped => Stopped,
|
||||
trefs => #{},
|
||||
orders => Orders
|
||||
})}.
|
||||
|
||||
-spec load_all_servers(list(server_options())) -> {servers(), servers(), servers()}.
|
||||
load_all_servers(ServerL) ->
|
||||
load_all_servers(ServerL, #{}, #{}, #{}).
|
||||
|
||||
load_all_servers([#{name := Name} = Options | More], Waiting, Running, Stopped) ->
|
||||
case emqx_exhook_server:load(Name, Options) of
|
||||
{ok, ServerState} ->
|
||||
save(Name, ServerState),
|
||||
load_all_servers(More, Waiting, Running#{Name => Options}, Stopped);
|
||||
{error, _} ->
|
||||
load_all_servers(More, Waiting#{Name => Options}, Running, Stopped);
|
||||
disable ->
|
||||
load_all_servers(More, Waiting, Running, Stopped#{Name => Options})
|
||||
end;
|
||||
|
||||
load_all_servers([], Waiting, Running, Stopped) ->
|
||||
{Waiting, Running, Stopped}.
|
||||
|
||||
handle_call(list, _From, State = #{running := Running,
|
||||
waiting := Waiting,
|
||||
stopped := Stopped,
|
||||
orders := Orders}) ->
|
||||
|
||||
R = get_servers_info(running, Running),
|
||||
W = get_servers_info(waiting, Waiting),
|
||||
S = get_servers_info(stopped, Stopped),
|
||||
|
||||
Servers = R ++ W ++ S,
|
||||
OrderServers = sort_name_by_order(Servers, Orders),
|
||||
|
||||
{reply, OrderServers, State};
|
||||
|
||||
handle_call({update_config, {move, _Name, _Direct, _Related}, NewConfL},
|
||||
_From,
|
||||
State) ->
|
||||
Orders = reorder(NewConfL),
|
||||
{reply, ok, State#{orders := Orders}};
|
||||
|
||||
handle_call({update_config, {delete, ToDelete}, _}, _From, State) ->
|
||||
{ok, #{orders := Orders,
|
||||
stopped := Stopped
|
||||
} = State2} = do_unload_server(ToDelete, State),
|
||||
|
||||
State3 = State2#{stopped := maps:remove(ToDelete, Stopped),
|
||||
orders := maps:remove(ToDelete, Orders)
|
||||
},
|
||||
|
||||
{reply, ok, State3};
|
||||
|
||||
handle_call({update_config, {add, RawConf}, NewConfL},
|
||||
_From,
|
||||
#{running := Running, waiting := Waitting, stopped := Stopped} = State) ->
|
||||
{_, #{name := Name} = Conf} = emqx_config:check_config(?MODULE, RawConf),
|
||||
|
||||
case emqx_exhook_server:load(Name, Conf) of
|
||||
{ok, ServerState} ->
|
||||
save(Name, ServerState),
|
||||
Status = running,
|
||||
Hooks = hooks(Name),
|
||||
State2 = State#{running := Running#{Name => Conf}};
|
||||
{error, _} ->
|
||||
Status = running,
|
||||
Hooks = [],
|
||||
StateT = State#{waiting := Waitting#{Name => Conf}},
|
||||
State2 = ensure_reload_timer(StateT);
|
||||
disable ->
|
||||
Status = stopped,
|
||||
Hooks = [],
|
||||
State2 = State#{stopped := Stopped#{Name => Conf}}
|
||||
end,
|
||||
Orders = reorder(NewConfL),
|
||||
Resulte = maps:merge(Conf, #{status => Status, hooks => Hooks}),
|
||||
{reply, Resulte, State2#{orders := Orders}};
|
||||
|
||||
handle_call({lookup, Name}, _From, State) ->
|
||||
case where_is_server(Name, State) of
|
||||
not_found ->
|
||||
Result = not_found;
|
||||
{Where, #{Name := Conf}} ->
|
||||
Result = maps:merge(Conf,
|
||||
#{ status => Where
|
||||
, hooks => hooks(Name)
|
||||
})
|
||||
end,
|
||||
{reply, Result, State};
|
||||
|
||||
handle_call({update_config, {update, Name, _Conf}, NewConfL}, _From, State) ->
|
||||
{Result, State2} = restart_server(Name, NewConfL, State),
|
||||
{reply, Result, State2};
|
||||
|
||||
handle_call({update_config, {enable, Name, _Enable}, NewConfL}, _From, State) ->
|
||||
{Result, State2} = restart_server(Name, NewConfL, State),
|
||||
{reply, Result, State2};
|
||||
|
||||
handle_call({server_status, Name}, _From, State) ->
|
||||
case where_is_server(Name, State) of
|
||||
not_found ->
|
||||
Result = not_found;
|
||||
{Status, _} ->
|
||||
Result = Status
|
||||
end,
|
||||
{reply, Result, State};
|
||||
|
||||
handle_call(all_servers_status, _From, #{running := Running,
|
||||
waiting := Waiting,
|
||||
stopped := Stopped} = State) ->
|
||||
{reply, #{running => maps:keys(Running),
|
||||
waiting => maps:keys(Waiting),
|
||||
stopped => maps:keys(Stopped)}, State};
|
||||
|
||||
handle_call(_Request, _From, State) ->
|
||||
Reply = ok,
|
||||
{reply, Reply, State}.
|
||||
|
||||
handle_cast(_Msg, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({timeout, _Ref, {reload, Name}}, State) ->
|
||||
{Result, NState} = do_load_server(Name, State),
|
||||
case Result of
|
||||
ok ->
|
||||
{noreply, NState};
|
||||
{error, not_found} ->
|
||||
{noreply, NState};
|
||||
{error, Reason} ->
|
||||
?LOG(warning, "Failed to reload exhook callback server \"~ts\", "
|
||||
"Reason: ~0p", [Name, Reason]),
|
||||
{noreply, ensure_reload_timer(NState)}
|
||||
end;
|
||||
|
||||
handle_info(_Info, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, State = #{running := Running}) ->
|
||||
_ = maps:fold(fun(Name, _, AccIn) ->
|
||||
{ok, NAccIn} = do_unload_server(Name, AccIn),
|
||||
NAccIn
|
||||
end, State, Running),
|
||||
_ = unload_exhooks(),
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal funcs
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
unload_exhooks() ->
|
||||
[emqx:unhook(Name, {M, F}) ||
|
||||
{Name, {M, F, _A}} <- ?ENABLED_HOOKS].
|
||||
|
||||
-spec do_load_server(server_name(), state()) -> {{error, not_found}, state()}
|
||||
| {{error, already_started}, state()}
|
||||
| {ok, state()}.
|
||||
do_load_server(Name, State = #{orders := Orders}) ->
|
||||
case where_is_server(Name, State) of
|
||||
not_found ->
|
||||
{{error, not_found}, State};
|
||||
{running, _} ->
|
||||
{ok, State};
|
||||
{Where, Map} ->
|
||||
State2 = clean_reload_timer(Name, State),
|
||||
{Options, Map2} = maps:take(Name, Map),
|
||||
State3 = State2#{Where := Map2},
|
||||
#{running := Running,
|
||||
stopped := Stopped} = State3,
|
||||
case emqx_exhook_server:load(Name, Options) of
|
||||
{ok, ServerState} ->
|
||||
save(Name, ServerState),
|
||||
update_order(Orders),
|
||||
?LOG(info, "Load exhook callback server "
|
||||
"\"~ts\" successfully!", [Name]),
|
||||
{ok, State3#{running := maps:put(Name, Options, Running)}};
|
||||
{error, Reason} ->
|
||||
{{error, Reason}, State};
|
||||
disable ->
|
||||
{ok, State3#{stopped := Stopped#{Name => Options}}}
|
||||
end
|
||||
end.
|
||||
|
||||
-spec do_unload_server(server_name(), state()) -> {ok, state()}.
|
||||
do_unload_server(Name, #{stopped := Stopped} = State) ->
|
||||
case where_is_server(Name, State) of
|
||||
{stopped, _} -> {ok, State};
|
||||
{waiting, Waiting} ->
|
||||
{Options, Waiting2} = maps:take(Name, Waiting),
|
||||
{ok, clean_reload_timer(Name,
|
||||
State#{waiting := Waiting2,
|
||||
stopped := maps:put(Name, Options, Stopped)
|
||||
}
|
||||
)};
|
||||
{running, Running} ->
|
||||
Service = server(Name),
|
||||
ok = unsave(Name),
|
||||
ok = emqx_exhook_server:unload(Service),
|
||||
{Options, Running2} = maps:take(Name, Running),
|
||||
{ok, State#{running := Running2,
|
||||
stopped := maps:put(Name, Options, Stopped)
|
||||
}};
|
||||
not_found -> {ok, State}
|
||||
end.
|
||||
|
||||
-spec ensure_reload_timer(state()) -> state().
|
||||
ensure_reload_timer(State = #{waiting := Waiting,
|
||||
stopped := Stopped,
|
||||
trefs := TRefs}) ->
|
||||
Iter = maps:iterator(Waiting),
|
||||
|
||||
{Waitting2, Stopped2, TRefs2} =
|
||||
ensure_reload_timer(maps:next(Iter), Waiting, Stopped, TRefs),
|
||||
|
||||
State#{waiting := Waitting2,
|
||||
stopped := Stopped2,
|
||||
trefs := TRefs2}.
|
||||
|
||||
ensure_reload_timer(none, Waiting, Stopped, TimerRef) ->
|
||||
{Waiting, Stopped, TimerRef};
|
||||
|
||||
ensure_reload_timer({Name, #{auto_reconnect := Intv}, Iter},
|
||||
Waiting,
|
||||
Stopped,
|
||||
TimerRef) ->
|
||||
Next = maps:next(Iter),
|
||||
case maps:is_key(Name, TimerRef) of
|
||||
true ->
|
||||
ensure_reload_timer(Next, Waiting, Stopped, TimerRef);
|
||||
_ ->
|
||||
Ref = erlang:start_timer(Intv, self(), {reload, Name}),
|
||||
TimerRef2 = maps:put(Name, Ref, TimerRef),
|
||||
ensure_reload_timer(Next, Waiting, Stopped, TimerRef2)
|
||||
end;
|
||||
|
||||
ensure_reload_timer({Name, Opts, Iter}, Waiting, Stopped, TimerRef) ->
|
||||
ensure_reload_timer(maps:next(Iter),
|
||||
maps:remove(Name, Waiting),
|
||||
maps:put(Name, Opts, Stopped),
|
||||
TimerRef).
|
||||
|
||||
-spec clean_reload_timer(server_name(), state()) -> state().
|
||||
clean_reload_timer(Name, State = #{trefs := TRefs}) ->
|
||||
case maps:take(Name, TRefs) of
|
||||
error -> State;
|
||||
{TRef, NTRefs} ->
|
||||
_ = erlang:cancel_timer(TRef),
|
||||
State#{trefs := NTRefs}
|
||||
end.
|
||||
|
||||
-spec do_move(binary(), move_direct(), binary(), list(server_options())) ->
|
||||
not_found | list(server_options()).
|
||||
do_move(Name, Direct, ToName, ConfL) ->
|
||||
move(ConfL, Name, Direct, ToName, []).
|
||||
|
||||
move([#{<<"name">> := Name} = Server | T], Name, Direct, ToName, HeadL) ->
|
||||
move_to(Direct, ToName, Server, lists:reverse(HeadL) ++ T);
|
||||
|
||||
move([Server | T], Name, Direct, ToName, HeadL) ->
|
||||
move(T, Name, Direct, ToName, [Server | HeadL]);
|
||||
|
||||
move([], _Name, _Direct, _ToName, _HeadL) ->
|
||||
not_found.
|
||||
|
||||
move_to(top, _, Server, ServerL) ->
|
||||
[Server | ServerL];
|
||||
|
||||
move_to(bottom, _, Server, ServerL) ->
|
||||
ServerL ++ [Server];
|
||||
|
||||
move_to(Direct, ToName, Server, ServerL) ->
|
||||
move_to(ServerL, Direct, ToName, Server, []).
|
||||
|
||||
move_to([#{<<"name">> := Name} | _] = T, before, Name, Server, HeadL) ->
|
||||
lists:reverse(HeadL) ++ [Server | T];
|
||||
|
||||
move_to([#{<<"name">> := Name} = H | T], 'after', Name, Server, HeadL) ->
|
||||
lists:reverse(HeadL) ++ [H, Server | T];
|
||||
|
||||
move_to([H | T], Direct, Name, Server, HeadL) ->
|
||||
move_to(T, Direct, Name, Server, [H | HeadL]);
|
||||
|
||||
move_to([], _Direct, _Name, _Server, _HeadL) ->
|
||||
not_found.
|
||||
|
||||
-spec reorder(list(server_options())) -> orders().
|
||||
reorder(ServerL) ->
|
||||
Orders = reorder(ServerL, 1, #{}),
|
||||
update_order(Orders),
|
||||
Orders.
|
||||
|
||||
reorder([#{name := Name} | T], Order, Orders) ->
|
||||
reorder(T, Order + 1, Orders#{Name => Order});
|
||||
|
||||
reorder([], _Order, Orders) ->
|
||||
Orders.
|
||||
|
||||
get_servers_info(Status, Map) ->
|
||||
Fold = fun(Name, Conf, Acc) ->
|
||||
[maps:merge(Conf, #{status => Status,
|
||||
hooks => hooks(Name)}) | Acc]
|
||||
end,
|
||||
maps:fold(Fold, [], Map).
|
||||
|
||||
|
||||
where_is_server(Name, #{running := Running}) when is_map_key(Name, Running) ->
|
||||
{running, Running};
|
||||
|
||||
where_is_server(Name, #{waiting := Waiting}) when is_map_key(Name, Waiting) ->
|
||||
{waiting, Waiting};
|
||||
|
||||
where_is_server(Name, #{stopped := Stopped}) when is_map_key(Name, Stopped) ->
|
||||
{stopped, Stopped};
|
||||
|
||||
where_is_server(_, _) ->
|
||||
not_found.
|
||||
|
||||
-type replace_fun() :: fun((server_options()) -> server_options()).
|
||||
|
||||
-spec replace_conf(binary(), replace_fun(), list(server_options())) -> not_found
|
||||
| list(server_options()).
|
||||
replace_conf(Name, ReplaceFun, ConfL) ->
|
||||
replace_conf(ConfL, Name, ReplaceFun, []).
|
||||
|
||||
replace_conf([#{<<"name">> := Name} = H | T], Name, ReplaceFun, HeadL) ->
|
||||
New = ReplaceFun(H),
|
||||
lists:reverse(HeadL) ++ [New | T];
|
||||
|
||||
replace_conf([H | T], Name, ReplaceFun, HeadL) ->
|
||||
replace_conf(T, Name, ReplaceFun, [H | HeadL]);
|
||||
|
||||
replace_conf([], _, _, _) ->
|
||||
not_found.
|
||||
|
||||
-spec restart_server(binary(), list(server_options()), state()) -> {ok, state()}
|
||||
| {{error, term()}, state()}.
|
||||
restart_server(Name, ConfL, State) ->
|
||||
case lists:search(fun(#{name := CName}) -> CName =:= Name end, ConfL) of
|
||||
false ->
|
||||
{{error, not_found}, State};
|
||||
{value, Conf} ->
|
||||
case where_is_server(Name, State) of
|
||||
not_found ->
|
||||
{{error, not_found}, State};
|
||||
{Where, Map} ->
|
||||
State2 = State#{Where := Map#{Name := Conf}},
|
||||
{ok, State3} = do_unload_server(Name, State2),
|
||||
case do_load_server(Name, State3) of
|
||||
{ok, State4} ->
|
||||
{ok, State4};
|
||||
{Error, State4} ->
|
||||
{Error, State4}
|
||||
end
|
||||
end
|
||||
end.
|
||||
|
||||
sort_name_by_order(Names, Orders) ->
|
||||
lists:sort(fun(A, B) when is_binary(A) ->
|
||||
maps:get(A, Orders) < maps:get(B, Orders);
|
||||
(#{name := A}, #{name := B}) ->
|
||||
maps:get(A, Orders) < maps:get(B, Orders)
|
||||
end,
|
||||
Names).
|
||||
%%--------------------------------------------------------------------
|
||||
%% Server state persistent
|
||||
save(Name, ServerState) ->
|
||||
Saved = persistent_term:get(?APP, []),
|
||||
persistent_term:put(?APP, lists:reverse([Name | Saved])),
|
||||
persistent_term:put({?APP, Name}, ServerState).
|
||||
|
||||
unsave(Name) ->
|
||||
case persistent_term:get(?APP, []) of
|
||||
[] ->
|
||||
ok;
|
||||
Saved ->
|
||||
case lists:member(Name, Saved) of
|
||||
false ->
|
||||
ok;
|
||||
true ->
|
||||
persistent_term:put(?APP, lists:delete(Name, Saved))
|
||||
end
|
||||
end,
|
||||
persistent_term:erase({?APP, Name}),
|
||||
ok.
|
||||
|
||||
running() ->
|
||||
persistent_term:get(?APP, []).
|
||||
|
||||
server(Name) ->
|
||||
case persistent_term:get({?APP, Name}, undefined) of
|
||||
undefined -> undefined;
|
||||
Service -> Service
|
||||
end.
|
||||
|
||||
update_order(Orders) ->
|
||||
Running = running(),
|
||||
Running2 = sort_name_by_order(Running, Orders),
|
||||
persistent_term:put(?APP, Running2).
|
||||
|
||||
hooks(Name) ->
|
||||
case server(Name) of
|
||||
undefined ->
|
||||
[];
|
||||
Service ->
|
||||
emqx_exhook_server:hookpoints(Service)
|
||||
end.
|
|
@ -1,329 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% @doc Manage the server status and reload strategy
|
||||
-module(emqx_exhook_mngr).
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
-include("emqx_exhook.hrl").
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
%% APIs
|
||||
-export([start_link/3]).
|
||||
|
||||
%% Mgmt API
|
||||
-export([ enable/2
|
||||
, disable/2
|
||||
, list/1
|
||||
]).
|
||||
|
||||
%% Helper funcs
|
||||
-export([ running/0
|
||||
, server/1
|
||||
, put_request_failed_action/1
|
||||
, get_request_failed_action/0
|
||||
, put_pool_size/1
|
||||
, get_pool_size/0
|
||||
]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([ init/1
|
||||
, handle_call/3
|
||||
, handle_cast/2
|
||||
, handle_info/2
|
||||
, terminate/2
|
||||
, code_change/3
|
||||
]).
|
||||
|
||||
-record(state, {
|
||||
%% Running servers
|
||||
running :: map(), %% XXX: server order?
|
||||
%% Wait to reload servers
|
||||
waiting :: map(),
|
||||
%% Marked stopped servers
|
||||
stopped :: map(),
|
||||
%% Auto reconnect timer interval
|
||||
auto_reconnect :: false | non_neg_integer(),
|
||||
%% Request options
|
||||
request_options :: grpc_client:options(),
|
||||
%% Timer references
|
||||
trefs :: map()
|
||||
}).
|
||||
|
||||
-type servers() :: [{Name :: atom(), server_options()}].
|
||||
|
||||
-type server_options() :: [ {scheme, http | https}
|
||||
| {host, string()}
|
||||
| {port, inet:port_number()}
|
||||
].
|
||||
|
||||
-define(DEFAULT_TIMEOUT, 60000).
|
||||
|
||||
-define(CNTER, emqx_exhook_counter).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% APIs
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec start_link(servers(), false | non_neg_integer(), grpc_client:options())
|
||||
->ignore
|
||||
| {ok, pid()}
|
||||
| {error, any()}.
|
||||
start_link(Servers, AutoReconnect, ReqOpts) ->
|
||||
gen_server:start_link(?MODULE, [Servers, AutoReconnect, ReqOpts], []).
|
||||
|
||||
-spec enable(pid(), binary()) -> ok | {error, term()}.
|
||||
enable(Pid, Name) ->
|
||||
call(Pid, {load, Name}).
|
||||
|
||||
-spec disable(pid(), binary()) -> ok | {error, term()}.
|
||||
disable(Pid, Name) ->
|
||||
call(Pid, {unload, Name}).
|
||||
|
||||
list(Pid) ->
|
||||
call(Pid, list).
|
||||
|
||||
call(Pid, Req) ->
|
||||
gen_server:call(Pid, Req, ?DEFAULT_TIMEOUT).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% gen_server callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
init([Servers, AutoReconnect, ReqOpts0]) ->
|
||||
process_flag(trap_exit, true),
|
||||
%% XXX: Due to the ExHook Module in the enterprise,
|
||||
%% this process may start multiple times and they will share this table
|
||||
try
|
||||
_ = ets:new(?CNTER, [named_table, public]), ok
|
||||
catch
|
||||
error:badarg:_ ->
|
||||
ok
|
||||
end,
|
||||
|
||||
%% put the global option
|
||||
put_request_failed_action(
|
||||
maps:get(request_failed_action, ReqOpts0, deny)
|
||||
),
|
||||
put_pool_size(
|
||||
maps:get(pool_size, ReqOpts0, erlang:system_info(schedulers))
|
||||
),
|
||||
|
||||
%% Load the hook servers
|
||||
ReqOpts = maps:without([request_failed_action], ReqOpts0),
|
||||
{Waiting, Running} = load_all_servers(Servers, ReqOpts),
|
||||
{ok, ensure_reload_timer(
|
||||
#state{waiting = Waiting,
|
||||
running = Running,
|
||||
stopped = #{},
|
||||
request_options = ReqOpts,
|
||||
auto_reconnect = AutoReconnect,
|
||||
trefs = #{}
|
||||
}
|
||||
)}.
|
||||
|
||||
%% @private
|
||||
load_all_servers(Servers, ReqOpts) ->
|
||||
load_all_servers(Servers, ReqOpts, #{}, #{}).
|
||||
load_all_servers([], _Request, Waiting, Running) ->
|
||||
{Waiting, Running};
|
||||
load_all_servers([#{name := Name0} = Options0 | More], ReqOpts, Waiting, Running) ->
|
||||
Name = iolist_to_binary(Name0),
|
||||
Options = Options0#{name => Name},
|
||||
{NWaiting, NRunning} =
|
||||
case emqx_exhook_server:load(Name, Options, ReqOpts) of
|
||||
{ok, ServerState} ->
|
||||
save(Name, ServerState),
|
||||
{Waiting, Running#{Name => Options}};
|
||||
{error, _} ->
|
||||
{Waiting#{Name => Options}, Running}
|
||||
end,
|
||||
load_all_servers(More, ReqOpts, NWaiting, NRunning).
|
||||
|
||||
handle_call({load, Name}, _From, State) ->
|
||||
{Result, NState} = do_load_server(Name, State),
|
||||
{reply, Result, NState};
|
||||
|
||||
handle_call({unload, Name}, _From, State) ->
|
||||
case do_unload_server(Name, State) of
|
||||
{error, Reason} ->
|
||||
{reply, {error, Reason}, State};
|
||||
{ok, NState} ->
|
||||
{reply, ok, NState}
|
||||
end;
|
||||
|
||||
handle_call(list, _From, State = #state{
|
||||
running = Running,
|
||||
waiting = Waiting,
|
||||
stopped = Stopped}) ->
|
||||
ServerNames = maps:keys(Running)
|
||||
++ maps:keys(Waiting)
|
||||
++ maps:keys(Stopped),
|
||||
{reply, ServerNames, State};
|
||||
|
||||
handle_call(_Request, _From, State) ->
|
||||
Reply = ok,
|
||||
{reply, Reply, State}.
|
||||
|
||||
handle_cast(_Msg, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({timeout, _Ref, {reload, Name}}, State) ->
|
||||
{Result, NState} = do_load_server(Name, State),
|
||||
case Result of
|
||||
ok ->
|
||||
{noreply, NState};
|
||||
{error, not_found} ->
|
||||
{noreply, NState};
|
||||
{error, Reason} ->
|
||||
?SLOG(warning, #{msg => "failed_to_reload_exhook_callback_server",
|
||||
server_name => Name,
|
||||
reason => Reason}),
|
||||
{noreply, ensure_reload_timer(NState)}
|
||||
end;
|
||||
|
||||
handle_info(_Info, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, State = #state{running = Running}) ->
|
||||
_ = maps:fold(fun(Name, _, AccIn) ->
|
||||
case do_unload_server(Name, AccIn) of
|
||||
{ok, NAccIn} -> NAccIn;
|
||||
_ -> AccIn
|
||||
end
|
||||
end, State, Running),
|
||||
_ = unload_exhooks(),
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal funcs
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
unload_exhooks() ->
|
||||
[emqx:unhook(Name, {M, F}) ||
|
||||
{Name, {M, F, _A}} <- ?ENABLED_HOOKS].
|
||||
|
||||
do_load_server(Name, State0 = #state{
|
||||
waiting = Waiting,
|
||||
running = Running,
|
||||
stopped = Stopped,
|
||||
request_options = ReqOpts}) ->
|
||||
State = clean_reload_timer(Name, State0),
|
||||
case maps:get(Name, Running, undefined) of
|
||||
undefined ->
|
||||
case maps:get(Name, Stopped,
|
||||
maps:get(Name, Waiting, undefined)) of
|
||||
undefined ->
|
||||
{{error, not_found}, State};
|
||||
Options ->
|
||||
case emqx_exhook_server:load(Name, Options, ReqOpts) of
|
||||
{ok, ServerState} ->
|
||||
save(Name, ServerState),
|
||||
?SLOG(info, #{msg => "load_exhook_callback_server_successfully",
|
||||
server_name => Name}),
|
||||
{ok, State#state{
|
||||
running = maps:put(Name, Options, Running),
|
||||
waiting = maps:remove(Name, Waiting),
|
||||
stopped = maps:remove(Name, Stopped)
|
||||
}
|
||||
};
|
||||
{error, Reason} ->
|
||||
{{error, Reason}, State}
|
||||
end
|
||||
end;
|
||||
_ ->
|
||||
{{error, already_started}, State}
|
||||
end.
|
||||
|
||||
do_unload_server(Name, State = #state{running = Running, stopped = Stopped}) ->
|
||||
case maps:take(Name, Running) of
|
||||
error -> {error, not_running};
|
||||
{Options, NRunning} ->
|
||||
ok = emqx_exhook_server:unload(server(Name)),
|
||||
ok = unsave(Name),
|
||||
{ok, State#state{running = NRunning,
|
||||
stopped = maps:put(Name, Options, Stopped)
|
||||
}}
|
||||
end.
|
||||
|
||||
ensure_reload_timer(State = #state{auto_reconnect = false}) ->
|
||||
State;
|
||||
ensure_reload_timer(State = #state{waiting = Waiting,
|
||||
trefs = TRefs,
|
||||
auto_reconnect = Intv}) ->
|
||||
NRefs = maps:fold(fun(Name, _, AccIn) ->
|
||||
case maps:get(Name, AccIn, undefined) of
|
||||
undefined ->
|
||||
Ref = erlang:start_timer(Intv, self(), {reload, Name}),
|
||||
AccIn#{Name => Ref};
|
||||
_HasRef ->
|
||||
AccIn
|
||||
end
|
||||
end, TRefs, Waiting),
|
||||
State#state{trefs = NRefs}.
|
||||
|
||||
clean_reload_timer(Name, State = #state{trefs = TRefs}) ->
|
||||
case maps:take(Name, TRefs) of
|
||||
error -> State;
|
||||
{TRef, NTRefs} ->
|
||||
_ = erlang:cancel_timer(TRef),
|
||||
State#state{trefs = NTRefs}
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Server state persistent
|
||||
|
||||
put_request_failed_action(Val) ->
|
||||
persistent_term:put({?APP, request_failed_action}, Val).
|
||||
|
||||
get_request_failed_action() ->
|
||||
persistent_term:get({?APP, request_failed_action}).
|
||||
|
||||
put_pool_size(Val) ->
|
||||
persistent_term:put({?APP, pool_size}, Val).
|
||||
|
||||
get_pool_size() ->
|
||||
%% Avoid the scenario that the parameter is not set after
|
||||
%% the hot upgrade completed.
|
||||
persistent_term:get({?APP, pool_size}, erlang:system_info(schedulers)).
|
||||
|
||||
save(Name, ServerState) ->
|
||||
Saved = persistent_term:get(?APP, []),
|
||||
persistent_term:put(?APP, lists:reverse([Name | Saved])),
|
||||
persistent_term:put({?APP, Name}, ServerState).
|
||||
|
||||
unsave(Name) ->
|
||||
case persistent_term:get(?APP, []) of
|
||||
[] ->
|
||||
persistent_term:erase(?APP);
|
||||
Saved ->
|
||||
persistent_term:put(?APP, lists:delete(Name, Saved))
|
||||
end,
|
||||
persistent_term:erase({?APP, Name}),
|
||||
ok.
|
||||
|
||||
running() ->
|
||||
persistent_term:get(?APP, []).
|
||||
|
||||
server(Name) ->
|
||||
case catch persistent_term:get({?APP, Name}) of
|
||||
{'EXIT', {badarg,_}} -> undefined;
|
||||
Service -> Service
|
||||
end.
|
|
@ -32,61 +32,58 @@
|
|||
|
||||
-reflect_type([duration/0]).
|
||||
|
||||
-export([namespace/0, roots/0, fields/1]).
|
||||
-export([namespace/0, roots/0, fields/1, server_config/0]).
|
||||
|
||||
namespace() -> exhook.
|
||||
namespace() -> emqx_exhook.
|
||||
|
||||
roots() -> [exhook].
|
||||
roots() -> [emqx_exhook].
|
||||
|
||||
fields(exhook) ->
|
||||
[ {request_failed_action,
|
||||
sc(hoconsc:enum([deny, ignore]),
|
||||
#{default => deny})}
|
||||
, {request_timeout,
|
||||
sc(duration(),
|
||||
#{default => "5s"})}
|
||||
, {auto_reconnect,
|
||||
sc(hoconsc:union([false, duration()]),
|
||||
#{ default => "60s"
|
||||
})}
|
||||
, {pool_size,
|
||||
sc(integer(),
|
||||
#{ nullable => true
|
||||
})}
|
||||
, {servers,
|
||||
sc(hoconsc:array(ref(servers)),
|
||||
fields(emqx_exhook) ->
|
||||
[{servers,
|
||||
sc(hoconsc:array(ref(server)),
|
||||
#{default => []})}
|
||||
];
|
||||
|
||||
fields(servers) ->
|
||||
[ {name,
|
||||
sc(string(),
|
||||
#{})}
|
||||
, {url,
|
||||
sc(string(),
|
||||
#{})}
|
||||
fields(server) ->
|
||||
[ {name, sc(binary(), #{})}
|
||||
, {enable, sc(boolean(), #{default => true})}
|
||||
, {url, sc(binary(), #{})}
|
||||
, {request_timeout,
|
||||
sc(duration(), #{default => "5s"})}
|
||||
, {failed_action, failed_action()}
|
||||
, {ssl,
|
||||
sc(ref(ssl_conf),
|
||||
#{})}
|
||||
sc(ref(ssl_conf), #{})}
|
||||
, {auto_reconnect,
|
||||
sc(hoconsc:union([false, duration()]),
|
||||
#{default => "60s"})}
|
||||
, {pool_size,
|
||||
sc(integer(), #{default => 8, example => 8})}
|
||||
];
|
||||
|
||||
fields(ssl_conf) ->
|
||||
[ {cacertfile,
|
||||
sc(string(),
|
||||
#{})
|
||||
}
|
||||
[ {enable, sc(boolean(), #{default => true})}
|
||||
, {cacertfile,
|
||||
sc(binary(),
|
||||
#{example => <<"{{ platform_etc_dir }}/certs/cacert.pem">>})
|
||||
}
|
||||
, {certfile,
|
||||
sc(string(),
|
||||
#{})
|
||||
}
|
||||
sc(binary(),
|
||||
#{example => <<"{{ platform_etc_dir }}/certs/cert.pem">>})
|
||||
}
|
||||
, {keyfile,
|
||||
sc(string(),
|
||||
#{})}
|
||||
sc(binary(),
|
||||
#{example => <<"{{ platform_etc_dir }}/certs/key.pem">>})}
|
||||
].
|
||||
|
||||
%% types
|
||||
|
||||
sc(Type, Meta) -> Meta#{type => Type}.
|
||||
|
||||
ref(Field) ->
|
||||
hoconsc:ref(?MODULE, Field).
|
||||
|
||||
failed_action() ->
|
||||
sc(hoconsc:enum([deny, ignore]),
|
||||
#{default => deny}).
|
||||
|
||||
server_config() ->
|
||||
fields(server).
|
||||
|
|
|
@ -24,7 +24,7 @@
|
|||
-define(PB_CLIENT_MOD, emqx_exhook_v_1_hook_provider_client).
|
||||
|
||||
%% Load/Unload
|
||||
-export([ load/3
|
||||
-export([ load/2
|
||||
, unload/1
|
||||
]).
|
||||
|
||||
|
@ -33,23 +33,24 @@
|
|||
|
||||
%% Infos
|
||||
-export([ name/1
|
||||
, hookpoints/1
|
||||
, format/1
|
||||
, failed_action/1
|
||||
]).
|
||||
|
||||
-record(server, {
|
||||
%% Server name (equal to grpc client channel name)
|
||||
name :: binary(),
|
||||
%% The function options
|
||||
options :: map(),
|
||||
%% gRPC channel pid
|
||||
channel :: pid(),
|
||||
%% Registered hook names and options
|
||||
hookspec :: #{hookpoint() => map()},
|
||||
%% Metrcis name prefix
|
||||
prefix :: list()
|
||||
}).
|
||||
|
||||
-type server() :: #server{}.
|
||||
-type server() :: #{%% Server name (equal to grpc client channel name)
|
||||
name := binary(),
|
||||
%% The function options
|
||||
options := map(),
|
||||
%% gRPC channel pid
|
||||
channel := pid(),
|
||||
%% Registered hook names and options
|
||||
hookspec := #{hookpoint() => map()},
|
||||
%% Metrcis name prefix
|
||||
prefix := list()
|
||||
}.
|
||||
|
||||
|
||||
-type hookpoint() :: 'client.connect'
|
||||
| 'client.connack'
|
||||
|
@ -81,9 +82,13 @@
|
|||
%% Load/Unload APIs
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec load(binary(), map(), map()) -> {ok, server()} | {error, term()} .
|
||||
load(Name, Opts0, ReqOpts) ->
|
||||
{SvrAddr, ClientOpts} = channel_opts(Opts0),
|
||||
-spec load(binary(), map()) -> {ok, server()} | {error, term()} | disable.
|
||||
load(_Name, #{enable := false}) ->
|
||||
disable;
|
||||
|
||||
load(Name, #{request_timeout := Timeout, failed_action := FailedAction} = Opts) ->
|
||||
ReqOpts = #{timeout => Timeout, failed_action => FailedAction},
|
||||
{SvrAddr, ClientOpts} = channel_opts(Opts),
|
||||
case emqx_exhook_sup:start_grpc_client_channel(
|
||||
Name,
|
||||
SvrAddr,
|
||||
|
@ -92,16 +97,15 @@ load(Name, Opts0, ReqOpts) ->
|
|||
case do_init(Name, ReqOpts) of
|
||||
{ok, HookSpecs} ->
|
||||
%% Reigster metrics
|
||||
Prefix = lists:flatten(
|
||||
io_lib:format("exhook.~ts.", [Name])),
|
||||
Prefix = lists:flatten(io_lib:format("exhook.~ts.", [Name])),
|
||||
ensure_metrics(Prefix, HookSpecs),
|
||||
%% Ensure hooks
|
||||
ensure_hooks(HookSpecs),
|
||||
{ok, #server{name = Name,
|
||||
options = ReqOpts,
|
||||
channel = _ChannPoolPid,
|
||||
hookspec = HookSpecs,
|
||||
prefix = Prefix }};
|
||||
{ok, #{name => Name,
|
||||
options => ReqOpts,
|
||||
channel => _ChannPoolPid,
|
||||
hookspec => HookSpecs,
|
||||
prefix => Prefix }};
|
||||
{error, _} = E ->
|
||||
emqx_exhook_sup:stop_grpc_client_channel(Name), E
|
||||
end;
|
||||
|
@ -110,14 +114,16 @@ load(Name, Opts0, ReqOpts) ->
|
|||
|
||||
%% @private
|
||||
channel_opts(Opts = #{url := URL}) ->
|
||||
ClientOpts = #{pool_size => emqx_exhook_mngr:get_pool_size()},
|
||||
ClientOpts = maps:merge(#{pool_size => erlang:system_info(schedulers)},
|
||||
Opts),
|
||||
case uri_string:parse(URL) of
|
||||
#{scheme := "http", host := Host, port := Port} ->
|
||||
#{scheme := <<"http">>, host := Host, port := Port} ->
|
||||
{format_http_uri("http", Host, Port), ClientOpts};
|
||||
#{scheme := "https", host := Host, port := Port} ->
|
||||
#{scheme := <<"https">>, host := Host, port := Port} ->
|
||||
SslOpts =
|
||||
case maps:get(ssl, Opts, undefined) of
|
||||
undefined -> [];
|
||||
#{enable := false} -> [];
|
||||
MapOpts ->
|
||||
filter(
|
||||
[{cacertfile, maps:get(cacertfile, MapOpts, undefined)},
|
||||
|
@ -131,8 +137,8 @@ channel_opts(Opts = #{url := URL}) ->
|
|||
transport_opts => SslOpts}
|
||||
},
|
||||
{format_http_uri("https", Host, Port), NClientOpts};
|
||||
_ ->
|
||||
error(bad_server_url)
|
||||
Error ->
|
||||
error({bad_server_url, URL, Error})
|
||||
end.
|
||||
|
||||
format_http_uri(Scheme, Host, Port) ->
|
||||
|
@ -142,7 +148,7 @@ filter(Ls) ->
|
|||
[ E || E <- Ls, E /= undefined].
|
||||
|
||||
-spec unload(server()) -> ok.
|
||||
unload(#server{name = Name, options = ReqOpts, hookspec = HookSpecs}) ->
|
||||
unload(#{name := Name, options := ReqOpts, hookspec := HookSpecs}) ->
|
||||
_ = do_deinit(Name, ReqOpts),
|
||||
_ = may_unload_hooks(HookSpecs),
|
||||
_ = emqx_exhook_sup:stop_grpc_client_channel(Name),
|
||||
|
@ -155,7 +161,7 @@ do_deinit(Name, ReqOpts) ->
|
|||
do_init(ChannName, ReqOpts) ->
|
||||
%% BrokerInfo defined at: exhook.protos
|
||||
BrokerInfo = maps:with([version, sysdescr, uptime, datetime],
|
||||
maps:from_list(emqx_sys:info())),
|
||||
maps:from_list(emqx_sys:info())),
|
||||
Req = #{broker => BrokerInfo},
|
||||
case do_call(ChannName, 'on_provider_loaded', Req, ReqOpts) of
|
||||
{ok, InitialResp} ->
|
||||
|
@ -227,7 +233,7 @@ may_unload_hooks(HookSpecs) ->
|
|||
end
|
||||
end, maps:keys(HookSpecs)).
|
||||
|
||||
format(#server{name = Name, hookspec = Hooks}) ->
|
||||
format(#{name := Name, hookspec := Hooks}) ->
|
||||
lists:flatten(
|
||||
io_lib:format("name=~ts, hooks=~0p, active=true", [Name, Hooks])).
|
||||
|
||||
|
@ -235,15 +241,17 @@ format(#server{name = Name, hookspec = Hooks}) ->
|
|||
%% APIs
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
name(#server{name = Name}) ->
|
||||
name(#{name := Name}) ->
|
||||
Name.
|
||||
|
||||
-spec call(hookpoint(), map(), server())
|
||||
-> ignore
|
||||
| {ok, Resp :: term()}
|
||||
| {error, term()}.
|
||||
call(Hookpoint, Req, #server{name = ChannName, options = ReqOpts,
|
||||
hookspec = Hooks, prefix = Prefix}) ->
|
||||
hookpoints(#{hookspec := Hooks}) ->
|
||||
maps:keys(Hooks).
|
||||
|
||||
-spec call(hookpoint(), map(), server()) -> ignore
|
||||
| {ok, Resp :: term()}
|
||||
| {error, term()}.
|
||||
call(Hookpoint, Req, #{name := ChannName, options := ReqOpts,
|
||||
hookspec := Hooks, prefix := Prefix}) ->
|
||||
GrpcFunc = hk2func(Hookpoint),
|
||||
case maps:get(Hookpoint, Hooks, undefined) of
|
||||
undefined -> ignore;
|
||||
|
@ -299,6 +307,9 @@ do_call(ChannName, Fun, Req, ReqOpts) ->
|
|||
{error, Reason}
|
||||
end.
|
||||
|
||||
failed_action(#{options := Opts}) ->
|
||||
maps:get(failed_action, Opts).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal funcs
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -42,25 +42,10 @@ start_link() ->
|
|||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||
|
||||
init([]) ->
|
||||
Mngr = ?CHILD(emqx_exhook_mngr, worker,
|
||||
[servers(), auto_reconnect(), request_options()]),
|
||||
_ = emqx_exhook_mgr:init_counter_table(),
|
||||
Mngr = ?CHILD(emqx_exhook_mgr, worker, []),
|
||||
{ok, {{one_for_one, 10, 100}, [Mngr]}}.
|
||||
|
||||
servers() ->
|
||||
env(servers, []).
|
||||
|
||||
auto_reconnect() ->
|
||||
env(auto_reconnect, 60000).
|
||||
|
||||
request_options() ->
|
||||
#{timeout => env(request_timeout, 5000),
|
||||
request_failed_action => env(request_failed_action, deny),
|
||||
pool_size => env(pool_size, erlang:system_info(schedulers))
|
||||
}.
|
||||
|
||||
env(Key, Def) ->
|
||||
emqx_conf:get([exhook, Key], Def).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% APIs
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -21,14 +21,14 @@
|
|||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
|
||||
|
||||
-define(CONF_DEFAULT, <<"
|
||||
exhook: {
|
||||
servers: [
|
||||
{ name: \"default\"
|
||||
url: \"http://127.0.0.1:9000\"
|
||||
}
|
||||
]
|
||||
emqx_exhook
|
||||
{servers = [
|
||||
{name = default,
|
||||
url = \"http://127.0.0.1:9000\"
|
||||
}]
|
||||
}
|
||||
">>).
|
||||
|
||||
|
@ -39,27 +39,53 @@ exhook: {
|
|||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Cfg) ->
|
||||
application:load(emqx_conf),
|
||||
ok = ekka:start(),
|
||||
ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
|
||||
meck:new(emqx_alarm, [non_strict, passthrough, no_link]),
|
||||
meck:expect(emqx_alarm, activate, 3, ok),
|
||||
meck:expect(emqx_alarm, deactivate, 3, ok),
|
||||
|
||||
_ = emqx_exhook_demo_svr:start(),
|
||||
ok = emqx_config:init_load(emqx_exhook_schema, ?CONF_DEFAULT),
|
||||
emqx_common_test_helpers:start_apps([emqx_exhook]),
|
||||
Cfg.
|
||||
|
||||
end_per_suite(_Cfg) ->
|
||||
ekka:stop(),
|
||||
mria:stop(),
|
||||
mria_mnesia:delete_schema(),
|
||||
meck:unload(emqx_alarm),
|
||||
|
||||
emqx_common_test_helpers:stop_apps([emqx_exhook]),
|
||||
emqx_exhook_demo_svr:stop().
|
||||
|
||||
init_per_testcase(_, Config) ->
|
||||
{ok, _} = emqx_cluster_rpc:start_link(),
|
||||
timer:sleep(200),
|
||||
Config.
|
||||
|
||||
end_per_testcase(_, Config) ->
|
||||
case erlang:whereis(node()) of
|
||||
undefined -> ok;
|
||||
P ->
|
||||
erlang:unlink(P),
|
||||
erlang:exit(P, kill)
|
||||
end,
|
||||
Config.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Test cases
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
t_noserver_nohook(_) ->
|
||||
emqx_exhook:disable(<<"default">>),
|
||||
emqx_exhook_mgr:disable(<<"default">>),
|
||||
?assertEqual([], ets:tab2list(emqx_hooks)),
|
||||
ok = emqx_exhook:enable(<<"default">>),
|
||||
{ok, _} = emqx_exhook_mgr:enable(<<"default">>),
|
||||
?assertNotEqual([], ets:tab2list(emqx_hooks)).
|
||||
|
||||
t_access_failed_if_no_server_running(_) ->
|
||||
emqx_exhook:disable(<<"default">>),
|
||||
emqx_exhook_mgr:disable(<<"default">>),
|
||||
ClientInfo = #{clientid => <<"user-id-1">>,
|
||||
username => <<"usera">>,
|
||||
peerhost => {127,0,0,1},
|
||||
|
@ -76,30 +102,7 @@ t_access_failed_if_no_server_running(_) ->
|
|||
Message = emqx_message:make(<<"t/1">>, <<"abc">>),
|
||||
?assertMatch({stop, Message},
|
||||
emqx_exhook_handler:on_message_publish(Message)),
|
||||
emqx_exhook:enable(<<"default">>).
|
||||
|
||||
t_cli_list(_) ->
|
||||
meck_print(),
|
||||
?assertEqual( [[emqx_exhook_server:format(emqx_exhook_mngr:server(Name)) || Name <- emqx_exhook:list()]]
|
||||
, emqx_exhook_cli:cli(["server", "list"])
|
||||
),
|
||||
unmeck_print().
|
||||
|
||||
t_cli_enable_disable(_) ->
|
||||
meck_print(),
|
||||
?assertEqual([already_started], emqx_exhook_cli:cli(["server", "enable", "default"])),
|
||||
?assertEqual(ok, emqx_exhook_cli:cli(["server", "disable", "default"])),
|
||||
?assertEqual([["name=default, hooks=#{}, active=false"]], emqx_exhook_cli:cli(["server", "list"])),
|
||||
|
||||
?assertEqual([not_running], emqx_exhook_cli:cli(["server", "disable", "default"])),
|
||||
?assertEqual(ok, emqx_exhook_cli:cli(["server", "enable", "default"])),
|
||||
unmeck_print().
|
||||
|
||||
t_cli_stats(_) ->
|
||||
meck_print(),
|
||||
_ = emqx_exhook_cli:cli(["server", "stats"]),
|
||||
_ = emqx_exhook_cli:cli(x),
|
||||
unmeck_print().
|
||||
emqx_exhook_mgr:enable(<<"default">>).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Utils
|
||||
|
@ -115,13 +118,13 @@ unmeck_print() ->
|
|||
|
||||
loaded_exhook_hookpoints() ->
|
||||
lists:filtermap(fun(E) ->
|
||||
Name = element(2, E),
|
||||
Callbacks = element(3, E),
|
||||
case lists:any(fun is_exhook_callback/1, Callbacks) of
|
||||
true -> {true, Name};
|
||||
_ -> false
|
||||
end
|
||||
end, ets:tab2list(emqx_hooks)).
|
||||
Name = element(2, E),
|
||||
Callbacks = element(3, E),
|
||||
case lists:any(fun is_exhook_callback/1, Callbacks) of
|
||||
true -> {true, Name};
|
||||
_ -> false
|
||||
end
|
||||
end, ets:tab2list(emqx_hooks)).
|
||||
|
||||
is_exhook_callback(Cb) ->
|
||||
Action = element(2, Cb),
|
||||
|
|
|
@ -0,0 +1,197 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_exhook_api_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-define(HOST, "http://127.0.0.1:18083/").
|
||||
-define(API_VERSION, "v5").
|
||||
-define(BASE_PATH, "api").
|
||||
-define(CLUSTER_RPC_SHARD, emqx_cluster_rpc_shard).
|
||||
|
||||
-define(CONF_DEFAULT, <<"
|
||||
emqx_exhook {servers = [
|
||||
{name = default,
|
||||
url = \"http://127.0.0.1:9000\"
|
||||
}
|
||||
]
|
||||
}
|
||||
">>).
|
||||
|
||||
all() ->
|
||||
[t_list, t_get, t_add, t_move_1, t_move_2, t_delete, t_update].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
application:load(emqx_conf),
|
||||
ok = ekka:start(),
|
||||
ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
|
||||
meck:new(emqx_alarm, [non_strict, passthrough, no_link]),
|
||||
meck:expect(emqx_alarm, activate, 3, ok),
|
||||
meck:expect(emqx_alarm, deactivate, 3, ok),
|
||||
|
||||
_ = emqx_exhook_demo_svr:start(),
|
||||
ok = emqx_config:init_load(emqx_exhook_schema, ?CONF_DEFAULT),
|
||||
emqx_mgmt_api_test_util:init_suite([emqx_exhook]),
|
||||
[Conf] = emqx:get_config([emqx_exhook, servers]),
|
||||
[{template, Conf} | Config].
|
||||
|
||||
end_per_suite(Config) ->
|
||||
ekka:stop(),
|
||||
mria:stop(),
|
||||
mria_mnesia:delete_schema(),
|
||||
meck:unload(emqx_alarm),
|
||||
|
||||
emqx_mgmt_api_test_util:end_suite([emqx_exhook]),
|
||||
emqx_exhook_demo_svr:stop(),
|
||||
emqx_exhook_demo_svr:stop(<<"test1">>),
|
||||
Config.
|
||||
|
||||
init_per_testcase(t_add, Config) ->
|
||||
{ok, _} = emqx_cluster_rpc:start_link(),
|
||||
_ = emqx_exhook_demo_svr:start(<<"test1">>, 9001),
|
||||
timer:sleep(200),
|
||||
Config;
|
||||
|
||||
init_per_testcase(_, Config) ->
|
||||
{ok, _} = emqx_cluster_rpc:start_link(),
|
||||
timer:sleep(200),
|
||||
Config.
|
||||
|
||||
end_per_testcase(_, Config) ->
|
||||
case erlang:whereis(node()) of
|
||||
undefined -> ok;
|
||||
P ->
|
||||
erlang:unlink(P),
|
||||
erlang:exit(P, kill)
|
||||
end,
|
||||
Config.
|
||||
|
||||
t_list(_) ->
|
||||
{ok, Data} = request_api(get, api_path(["exhooks"]), "",
|
||||
auth_header_()),
|
||||
|
||||
List = decode_json(Data),
|
||||
?assertEqual(1, length(List)),
|
||||
|
||||
[Svr] = List,
|
||||
|
||||
?assertMatch(#{name := <<"default">>,
|
||||
status := <<"running">>}, Svr).
|
||||
|
||||
t_get(_) ->
|
||||
{ok, Data} = request_api(get, api_path(["exhooks", "default"]), "",
|
||||
auth_header_()),
|
||||
|
||||
Svr = decode_json(Data),
|
||||
|
||||
?assertMatch(#{name := <<"default">>,
|
||||
status := <<"running">>}, Svr).
|
||||
|
||||
t_add(Cfg) ->
|
||||
Template = proplists:get_value(template, Cfg),
|
||||
Instance = Template#{name => <<"test1">>,
|
||||
url => "http://127.0.0.1:9001"
|
||||
},
|
||||
{ok, Data} = request_api(post, api_path(["exhooks"]), "",
|
||||
auth_header_(), Instance),
|
||||
|
||||
Svr = decode_json(Data),
|
||||
|
||||
?assertMatch(#{name := <<"test1">>,
|
||||
status := <<"running">>}, Svr),
|
||||
|
||||
?assertMatch([<<"default">>, <<"test1">>], emqx_exhook_mgr:running()).
|
||||
|
||||
t_move_1(_) ->
|
||||
Result = request_api(post, api_path(["exhooks", "default", "move"]), "",
|
||||
auth_header_(),
|
||||
#{position => bottom, related => <<>>}),
|
||||
|
||||
?assertMatch({ok, <<>>}, Result),
|
||||
?assertMatch([<<"test1">>, <<"default">>], emqx_exhook_mgr:running()).
|
||||
|
||||
t_move_2(_) ->
|
||||
Result = request_api(post, api_path(["exhooks", "default", "move"]), "",
|
||||
auth_header_(),
|
||||
#{position => before, related => <<"test1">>}),
|
||||
|
||||
?assertMatch({ok, <<>>}, Result),
|
||||
?assertMatch([<<"default">>, <<"test1">>], emqx_exhook_mgr:running()).
|
||||
|
||||
t_delete(_) ->
|
||||
Result = request_api(delete, api_path(["exhooks", "test1"]), "",
|
||||
auth_header_()),
|
||||
|
||||
?assertMatch({ok, <<>>}, Result),
|
||||
?assertMatch([<<"default">>], emqx_exhook_mgr:running()).
|
||||
|
||||
t_update(Cfg) ->
|
||||
Template = proplists:get_value(template, Cfg),
|
||||
Instance = Template#{enable => false},
|
||||
{ok, <<>>} = request_api(put, api_path(["exhooks", "default"]), "",
|
||||
auth_header_(), Instance),
|
||||
|
||||
?assertMatch([], emqx_exhook_mgr:running()).
|
||||
|
||||
decode_json(Data) ->
|
||||
BinJosn = emqx_json:decode(Data, [return_maps]),
|
||||
emqx_map_lib:unsafe_atom_key_map(BinJosn).
|
||||
|
||||
request_api(Method, Url, Auth) ->
|
||||
request_api(Method, Url, [], Auth, []).
|
||||
|
||||
request_api(Method, Url, QueryParams, Auth) ->
|
||||
request_api(Method, Url, QueryParams, Auth, []).
|
||||
|
||||
request_api(Method, Url, QueryParams, Auth, []) ->
|
||||
NewUrl = case QueryParams of
|
||||
"" -> Url;
|
||||
_ -> Url ++ "?" ++ QueryParams
|
||||
end,
|
||||
do_request_api(Method, {NewUrl, [Auth]});
|
||||
request_api(Method, Url, QueryParams, Auth, Body) ->
|
||||
NewUrl = case QueryParams of
|
||||
"" -> Url;
|
||||
_ -> Url ++ "?" ++ QueryParams
|
||||
end,
|
||||
do_request_api(Method, {NewUrl, [Auth], "application/json", emqx_json:encode(Body)}).
|
||||
|
||||
do_request_api(Method, Request)->
|
||||
case httpc:request(Method, Request, [], [{body_format, binary}]) of
|
||||
{error, socket_closed_remotely} ->
|
||||
{error, socket_closed_remotely};
|
||||
{ok, {{"HTTP/1.1", Code, _}, _, Return} }
|
||||
when Code =:= 200 orelse Code =:= 204 orelse Code =:= 201 ->
|
||||
{ok, Return};
|
||||
{ok, {Reason, _, _}} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
auth_header_() ->
|
||||
AppId = <<"admin">>,
|
||||
AppSecret = <<"public">>,
|
||||
auth_header_(binary_to_list(AppId), binary_to_list(AppSecret)).
|
||||
|
||||
auth_header_(User, Pass) ->
|
||||
Encoded = base64:encode_to_string(lists:append([User,":",Pass])),
|
||||
{"Authorization","Basic " ++ Encoded}.
|
||||
|
||||
api_path(Parts)->
|
||||
?HOST ++ filename:join([?BASE_PATH, ?API_VERSION] ++ Parts).
|
|
@ -20,7 +20,9 @@
|
|||
|
||||
%%
|
||||
-export([ start/0
|
||||
, start/2
|
||||
, stop/0
|
||||
, stop/1
|
||||
, take/0
|
||||
, in/1
|
||||
]).
|
||||
|
@ -57,39 +59,45 @@
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
start() ->
|
||||
Pid = spawn(fun mngr_main/0),
|
||||
register(?MODULE, Pid),
|
||||
start(?NAME, ?PORT).
|
||||
|
||||
start(Name, Port) ->
|
||||
Pid = spawn(fun() -> mgr_main(Name, Port) end),
|
||||
register(to_atom_name(Name), Pid),
|
||||
{ok, Pid}.
|
||||
|
||||
stop() ->
|
||||
grpc:stop_server(?NAME),
|
||||
?MODULE ! stop.
|
||||
stop(?NAME).
|
||||
|
||||
stop(Name) ->
|
||||
grpc:stop_server(Name),
|
||||
to_atom_name(Name) ! stop.
|
||||
|
||||
take() ->
|
||||
?MODULE ! {take, self()},
|
||||
to_atom_name(?NAME) ! {take, self()},
|
||||
receive {value, V} -> V
|
||||
after 5000 -> error(timeout) end.
|
||||
|
||||
in({FunName, Req}) ->
|
||||
?MODULE ! {in, FunName, Req}.
|
||||
to_atom_name(?NAME) ! {in, FunName, Req}.
|
||||
|
||||
mngr_main() ->
|
||||
mgr_main(Name, Port) ->
|
||||
application:ensure_all_started(grpc),
|
||||
Services = #{protos => [emqx_exhook_pb],
|
||||
services => #{'emqx.exhook.v1.HookProvider' => emqx_exhook_demo_svr}
|
||||
},
|
||||
Options = [],
|
||||
Svr = grpc:start_server(?NAME, ?PORT, Services, Options),
|
||||
mngr_loop([Svr, queue:new(), queue:new()]).
|
||||
Svr = grpc:start_server(Name, Port, Services, Options),
|
||||
mgr_loop([Svr, queue:new(), queue:new()]).
|
||||
|
||||
mngr_loop([Svr, Q, Takes]) ->
|
||||
mgr_loop([Svr, Q, Takes]) ->
|
||||
receive
|
||||
{in, FunName, Req} ->
|
||||
{NQ1, NQ2} = reply(queue:in({FunName, Req}, Q), Takes),
|
||||
mngr_loop([Svr, NQ1, NQ2]);
|
||||
mgr_loop([Svr, NQ1, NQ2]);
|
||||
{take, From} ->
|
||||
{NQ1, NQ2} = reply(Q, queue:in(From, Takes)),
|
||||
mngr_loop([Svr, NQ1, NQ2]);
|
||||
mgr_loop([Svr, NQ1, NQ2]);
|
||||
stop ->
|
||||
exit(normal)
|
||||
end.
|
||||
|
@ -105,12 +113,18 @@ reply(Q1, Q2) ->
|
|||
{NQ1, NQ2}
|
||||
end.
|
||||
|
||||
to_atom_name(Name) when is_atom(Name) ->
|
||||
Name;
|
||||
|
||||
to_atom_name(Name) ->
|
||||
erlang:binary_to_atom(Name).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec on_provider_loaded(emqx_exhook_pb:provider_loaded_request(), grpc:metadata())
|
||||
-> {ok, emqx_exhook_pb:loaded_response(), grpc:metadata()}
|
||||
-> {ok, emqx_exhook_pb:loaded_response(), grpc:metadata()}
|
||||
| {error, grpc_cowboy_h:error_response()}.
|
||||
|
||||
on_provider_loaded(Req, Md) ->
|
||||
|
|
|
@ -31,12 +31,11 @@
|
|||
]).
|
||||
|
||||
-define(CONF_DEFAULT, <<"
|
||||
exhook: {
|
||||
servers: [
|
||||
{ name: \"default\"
|
||||
url: \"http://127.0.0.1:9000\"
|
||||
}
|
||||
]
|
||||
emqx_exhook
|
||||
{servers = [
|
||||
{name = default,
|
||||
url = \"http://127.0.0.1:9000\"
|
||||
}]
|
||||
}
|
||||
">>).
|
||||
|
||||
|
|
Loading…
Reference in New Issue