chore(exhook): change the name to binary type
This commit is contained in:
parent
b3fac24c5e
commit
4921c00a19
|
@ -33,7 +33,7 @@
|
||||||
%% Mgmt APIs
|
%% Mgmt APIs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec enable(atom()|string()) -> ok | {error, term()}.
|
-spec enable(binary()) -> ok | {error, term()}.
|
||||||
enable(Name) ->
|
enable(Name) ->
|
||||||
with_mngr(fun(Pid) -> emqx_exhook_mngr:enable(Pid, Name) end).
|
with_mngr(fun(Pid) -> emqx_exhook_mngr:enable(Pid, Name) end).
|
||||||
|
|
||||||
|
@ -109,7 +109,7 @@ call_fold(Hookpoint, Req, FailedAction, AccFun, [ServerName|More]) ->
|
||||||
%% XXX: Hard-coded the deny response
|
%% XXX: Hard-coded the deny response
|
||||||
deny_action_result('client.authenticate', _) ->
|
deny_action_result('client.authenticate', _) ->
|
||||||
#{result => false};
|
#{result => false};
|
||||||
deny_action_result('client.check_acl', _) ->
|
deny_action_result('client.authorize', _) ->
|
||||||
#{result => false};
|
#{result => false};
|
||||||
deny_action_result('message.publish', Msg) ->
|
deny_action_result('message.publish', Msg) ->
|
||||||
%% TODO: Not support to deny a message
|
%% TODO: Not support to deny a message
|
||||||
|
|
|
@ -28,12 +28,12 @@ cli(["server", "list"]) ->
|
||||||
|
|
||||||
cli(["server", "enable", Name]) ->
|
cli(["server", "enable", Name]) ->
|
||||||
if_enabled(fun() ->
|
if_enabled(fun() ->
|
||||||
print(emqx_exhook:enable(list_to_existing_atom(Name)))
|
print(emqx_exhook:enable(iolist_to_binary(Name)))
|
||||||
end);
|
end);
|
||||||
|
|
||||||
cli(["server", "disable", Name]) ->
|
cli(["server", "disable", Name]) ->
|
||||||
if_enabled(fun() ->
|
if_enabled(fun() ->
|
||||||
print(emqx_exhook:disable(list_to_existing_atom(Name)))
|
print(emqx_exhook:disable(iolist_to_binary(Name)))
|
||||||
end);
|
end);
|
||||||
|
|
||||||
cli(["server", "stats"]) ->
|
cli(["server", "stats"]) ->
|
||||||
|
@ -52,14 +52,6 @@ print(ok) ->
|
||||||
print({error, Reason}) ->
|
print({error, Reason}) ->
|
||||||
emqx_ctl:print("~p~n", [Reason]).
|
emqx_ctl:print("~p~n", [Reason]).
|
||||||
|
|
||||||
find_server_options(Name) ->
|
|
||||||
Ls = emqx_config:get([exhook, servers]),
|
|
||||||
case [ E || E = #{name := N} <- Ls, N =:= Name] of
|
|
||||||
[] -> undefined;
|
|
||||||
[Options] ->
|
|
||||||
maps:remove(name, Options)
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal funcs
|
%% Internal funcs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -85,7 +77,8 @@ stats() ->
|
||||||
format(Name) ->
|
format(Name) ->
|
||||||
case emqx_exhook_mngr:server(Name) of
|
case emqx_exhook_mngr:server(Name) of
|
||||||
undefined ->
|
undefined ->
|
||||||
io_lib:format("name=~s, hooks=#{}, active=false", [Name]);
|
lists:flatten(
|
||||||
|
io_lib:format("name=~s, hooks=#{}, active=false", [Name]));
|
||||||
Server ->
|
Server ->
|
||||||
emqx_exhook_server:format(Server)
|
emqx_exhook_server:format(Server)
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -84,11 +84,11 @@
|
||||||
start_link(Servers, AutoReconnect, ReqOpts) ->
|
start_link(Servers, AutoReconnect, ReqOpts) ->
|
||||||
gen_server:start_link(?MODULE, [Servers, AutoReconnect, ReqOpts], []).
|
gen_server:start_link(?MODULE, [Servers, AutoReconnect, ReqOpts], []).
|
||||||
|
|
||||||
-spec enable(pid(), atom()|string()) -> ok | {error, term()}.
|
-spec enable(pid(), binary()) -> ok | {error, term()}.
|
||||||
enable(Pid, Name) ->
|
enable(Pid, Name) ->
|
||||||
call(Pid, {load, Name}).
|
call(Pid, {load, Name}).
|
||||||
|
|
||||||
-spec disable(pid(), atom()|string()) -> ok | {error, term()}.
|
-spec disable(pid(), binary()) -> ok | {error, term()}.
|
||||||
disable(Pid, Name) ->
|
disable(Pid, Name) ->
|
||||||
call(Pid, {unload, Name}).
|
call(Pid, {unload, Name}).
|
||||||
|
|
||||||
|
@ -136,7 +136,9 @@ load_all_servers(Servers, ReqOpts) ->
|
||||||
load_all_servers(Servers, ReqOpts, #{}, #{}).
|
load_all_servers(Servers, ReqOpts, #{}, #{}).
|
||||||
load_all_servers([], _Request, Waiting, Running) ->
|
load_all_servers([], _Request, Waiting, Running) ->
|
||||||
{Waiting, Running};
|
{Waiting, Running};
|
||||||
load_all_servers([{Name, Options}|More], ReqOpts, Waiting, Running) ->
|
load_all_servers([#{name := Name0} = Options0|More], ReqOpts, Waiting, Running) ->
|
||||||
|
Name = iolist_to_binary(Name0),
|
||||||
|
Options = Options0#{name => Name},
|
||||||
{NWaiting, NRunning} =
|
{NWaiting, NRunning} =
|
||||||
case emqx_exhook_server:load(Name, Options, ReqOpts) of
|
case emqx_exhook_server:load(Name, Options, ReqOpts) of
|
||||||
{ok, ServerState} ->
|
{ok, ServerState} ->
|
||||||
|
|
|
@ -26,10 +26,24 @@
|
||||||
|
|
||||||
-behaviour(hocon_schema).
|
-behaviour(hocon_schema).
|
||||||
|
|
||||||
|
-type duration() :: integer().
|
||||||
|
|
||||||
|
-typerefl_from_string({duration/0, emqx_schema, to_duration}).
|
||||||
|
|
||||||
|
-reflect_type([duration/0]).
|
||||||
|
|
||||||
-export([structs/0, fields/1]).
|
-export([structs/0, fields/1]).
|
||||||
|
|
||||||
-export([t/1, t/3, t/4, ref/1]).
|
-export([t/1, t/3, t/4, ref/1]).
|
||||||
|
|
||||||
structs() -> [servers].
|
structs() -> [exhook].
|
||||||
|
|
||||||
|
fields(exhook) ->
|
||||||
|
[ {request_failed_action, t(union([deny, ignore]), undefined, deny)}
|
||||||
|
, {request_timeout, t(duration(), undefined, "5s")}
|
||||||
|
, {auto_reconnect, t(union([false, duration()]), undefined, "60s")}
|
||||||
|
, {servers, t(hoconsc:array(ref(servers)), undefined, [])}
|
||||||
|
];
|
||||||
|
|
||||||
fields(servers) ->
|
fields(servers) ->
|
||||||
[ {name, string()}
|
[ {name, string()}
|
||||||
|
|
|
@ -38,7 +38,7 @@
|
||||||
|
|
||||||
-record(server, {
|
-record(server, {
|
||||||
%% Server name (equal to grpc client channel name)
|
%% Server name (equal to grpc client channel name)
|
||||||
name :: server_name(),
|
name :: binary(),
|
||||||
%% The function options
|
%% The function options
|
||||||
options :: map(),
|
options :: map(),
|
||||||
%% gRPC channel pid
|
%% gRPC channel pid
|
||||||
|
@ -49,7 +49,6 @@
|
||||||
prefix :: list()
|
prefix :: list()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-type server_name() :: string().
|
|
||||||
-type server() :: #server{}.
|
-type server() :: #server{}.
|
||||||
|
|
||||||
-type hookpoint() :: 'client.connect'
|
-type hookpoint() :: 'client.connect'
|
||||||
|
@ -84,9 +83,8 @@
|
||||||
%% Load/Unload APIs
|
%% Load/Unload APIs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-spec load(atom(), options(), map()) -> {ok, server()} | {error, term()} .
|
-spec load(binary(), options(), map()) -> {ok, server()} | {error, term()} .
|
||||||
load(Name0, Opts0, ReqOpts) ->
|
load(Name, Opts0, ReqOpts) ->
|
||||||
Name = to_list(Name0),
|
|
||||||
{SvrAddr, ClientOpts} = channel_opts(Opts0),
|
{SvrAddr, ClientOpts} = channel_opts(Opts0),
|
||||||
case emqx_exhook_sup:start_grpc_client_channel(
|
case emqx_exhook_sup:start_grpc_client_channel(
|
||||||
Name,
|
Name,
|
||||||
|
@ -112,20 +110,12 @@ load(Name0, Opts0, ReqOpts) ->
|
||||||
{error, _} = E -> E
|
{error, _} = E -> E
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @private
|
|
||||||
to_list(Name) when is_atom(Name) ->
|
|
||||||
atom_to_list(Name);
|
|
||||||
to_list(Name) when is_binary(Name) ->
|
|
||||||
binary_to_list(Name);
|
|
||||||
to_list(Name) when is_list(Name) ->
|
|
||||||
Name.
|
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
channel_opts(Opts = #{url := URL}) ->
|
channel_opts(Opts = #{url := URL}) ->
|
||||||
case uri_string:parse(URL) of
|
case uri_string:parse(URL) of
|
||||||
#{scheme := <<"http">>, host := Host, port := Port} ->
|
#{scheme := "http", host := Host, port := Port} ->
|
||||||
{format_http_uri("http", Host, Port), #{}};
|
{format_http_uri("http", Host, Port), #{}};
|
||||||
#{scheme := <<"https">>, host := Host, port := Port} ->
|
#{scheme := "https", host := Host, port := Port} ->
|
||||||
SslOpts =
|
SslOpts =
|
||||||
case maps:get(ssl, Opts, undefined) of
|
case maps:get(ssl, Opts, undefined) of
|
||||||
undefined -> [];
|
undefined -> [];
|
||||||
|
@ -230,7 +220,8 @@ may_unload_hooks(HookSpecs) ->
|
||||||
end, maps:keys(HookSpecs)).
|
end, maps:keys(HookSpecs)).
|
||||||
|
|
||||||
format(#server{name = Name, hookspec = Hooks}) ->
|
format(#server{name = Name, hookspec = Hooks}) ->
|
||||||
io_lib:format("name=~s, hooks=~0p, active=true", [Name, Hooks]).
|
lists:flatten(
|
||||||
|
io_lib:format("name=~s, hooks=~0p, active=true", [Name, Hooks])).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% APIs
|
%% APIs
|
||||||
|
|
|
@ -58,7 +58,7 @@ request_options() ->
|
||||||
}.
|
}.
|
||||||
|
|
||||||
env(Key, Def) ->
|
env(Key, Def) ->
|
||||||
application:get_env(emqx_exhook, Key, Def).
|
emqx_config:get([exhook, Key], Def).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% APIs
|
%% APIs
|
||||||
|
|
|
@ -53,13 +53,13 @@ end_per_suite(_Cfg) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
t_noserver_nohook(_) ->
|
t_noserver_nohook(_) ->
|
||||||
emqx_exhook:disable(default),
|
emqx_exhook:disable(<<"default">>),
|
||||||
?assertEqual([], ets:tab2list(emqx_hooks)),
|
?assertEqual([], ets:tab2list(emqx_hooks)),
|
||||||
ok = emqx_exhook:enable(default),
|
ok = emqx_exhook:enable(<<"default">>),
|
||||||
?assertNotEqual([], ets:tab2list(emqx_hooks)).
|
?assertNotEqual([], ets:tab2list(emqx_hooks)).
|
||||||
|
|
||||||
t_access_failed_if_no_server_running(_) ->
|
t_access_failed_if_no_server_running(_) ->
|
||||||
emqx_exhook:disable(default),
|
emqx_exhook:disable(<<"default">>),
|
||||||
ClientInfo = #{clientid => <<"user-id-1">>,
|
ClientInfo = #{clientid => <<"user-id-1">>,
|
||||||
username => <<"usera">>,
|
username => <<"usera">>,
|
||||||
peerhost => {127,0,0,1},
|
peerhost => {127,0,0,1},
|
||||||
|
@ -67,16 +67,16 @@ t_access_failed_if_no_server_running(_) ->
|
||||||
protocol => mqtt,
|
protocol => mqtt,
|
||||||
mountpoint => undefined
|
mountpoint => undefined
|
||||||
},
|
},
|
||||||
?assertMatch({stop, #{auth_result := not_authorized}},
|
?assertMatch({stop, {error, not_authorized}},
|
||||||
emqx_exhook_handler:on_client_authenticate(ClientInfo, #{auth_result => success})),
|
emqx_exhook_handler:on_client_authenticate(ClientInfo, #{auth_result => success})),
|
||||||
|
|
||||||
?assertMatch({stop, deny},
|
?assertMatch({stop, deny},
|
||||||
emqx_exhook_handler:on_client_check_acl(ClientInfo, publish, <<"t/1">>, allow)),
|
emqx_exhook_handler:on_client_authorize(ClientInfo, publish, <<"t/1">>, allow)),
|
||||||
|
|
||||||
Message = emqx_message:make(<<"t/1">>, <<"abc">>),
|
Message = emqx_message:make(<<"t/1">>, <<"abc">>),
|
||||||
?assertMatch({stop, Message},
|
?assertMatch({stop, Message},
|
||||||
emqx_exhook_handler:on_message_publish(Message)),
|
emqx_exhook_handler:on_message_publish(Message)),
|
||||||
emqx_exhook:enable(default).
|
emqx_exhook:enable(<<"default">>).
|
||||||
|
|
||||||
t_cli_list(_) ->
|
t_cli_list(_) ->
|
||||||
meck_print(),
|
meck_print(),
|
||||||
|
|
Loading…
Reference in New Issue