feat(exhook): make request_failed_action working

This commit is contained in:
JianBo He 2021-08-11 19:41:07 +08:00
parent 7ec8dc21a6
commit 66f69e7693
6 changed files with 82 additions and 18 deletions

View File

@ -5,9 +5,9 @@
## The default value or action will be returned, while the request to
## the gRPC server failed or no available grpc server running.
##
## Default: ignore
## Default: deny
## Value: ignore | deny
#exhook.request_failed_action = ignore
#exhook.request_failed_action = deny
## The timeout to request grpc server
##

View File

@ -1,7 +1,7 @@
%%-*- mode: erlang -*-
{mapping, "exhook.request_failed_action", "emqx_exhook.request_failed_action", [
{default, "ignore"},
{default, "deny"},
{datatype, {enum, [ignore, deny]}}
]}.

View File

@ -65,29 +65,54 @@ cast(Hookpoint, Req) ->
cast(_, _, []) ->
ok;
cast(Hookpoint, Req, [ServiceName|More]) ->
cast(Hookpoint, Req, [ServerName|More]) ->
%% XXX: Need a real asynchronous running
_ = emqx_exhook_server:call(Hookpoint, Req,
emqx_exhook_mngr:server(ServiceName)),
emqx_exhook_mngr:server(ServerName)),
cast(Hookpoint, Req, More).
-spec call_fold(atom(), term(), function())
-> {ok, term()}
| {stop, term()}.
call_fold(Hookpoint, Req, AccFun) ->
call_fold(Hookpoint, Req, AccFun, emqx_exhook_mngr:running()).
FailedAction = emqx_exhook_mngr:get_request_failed_action(),
ServerNames = emqx_exhook_mngr:running(),
case ServerNames == [] andalso FailedAction == deny of
true ->
{stop, deny_action_result(Hookpoint, Req)};
_ ->
call_fold(Hookpoint, Req, FailedAction, AccFun, ServerNames)
end.
call_fold(_, Req, _, []) ->
call_fold(_, Req, _, _, []) ->
{ok, Req};
call_fold(Hookpoint, Req, AccFun, [ServiceName|More]) ->
case emqx_exhook_server:call(Hookpoint, Req,
emqx_exhook_mngr:server(ServiceName)) of
call_fold(Hookpoint, Req, FailedAction, AccFun, [ServerName|More]) ->
Server = emqx_exhook_mngr:server(ServerName),
case emqx_exhook_server:call(Hookpoint, Req, Server) of
{ok, Resp} ->
case AccFun(Req, Resp) of
{stop, NReq} -> {stop, NReq};
{ok, NReq} -> call_fold(Hookpoint, NReq, AccFun, More);
_ -> call_fold(Hookpoint, Req, AccFun, More)
{stop, NReq} ->
{stop, NReq};
{ok, NReq} ->
call_fold(Hookpoint, NReq, FailedAction, AccFun, More);
_ ->
call_fold(Hookpoint, Req, FailedAction, AccFun, More)
end;
_ ->
call_fold(Hookpoint, Req, AccFun, More)
case FailedAction of
deny ->
{stop, deny_action_result(Hookpoint, Req)};
_ ->
call_fold(Hookpoint, Req, FailedAction, AccFun, More)
end
end.
%% XXX: Hard-coded the deny response
deny_action_result('client.authenticate', _) ->
#{result => false};
deny_action_result('client.check_acl', _) ->
#{result => false};
deny_action_result('message.publish', Msg) ->
%% TODO: Not support to deny a message
%% maybe we can put the 'allow_publish' into message header
Msg.

View File

@ -34,6 +34,8 @@
%% Helper funcs
-export([ running/0
, server/1
, put_request_failed_action/1
, get_request_failed_action/0
]).
%% gen_server callbacks
@ -100,7 +102,7 @@ call(Pid, Req) ->
%% gen_server callbacks
%%--------------------------------------------------------------------
init([Servers, AutoReconnect, ReqOpts]) ->
init([Servers, AutoReconnect, ReqOpts0]) ->
process_flag(trap_exit, true),
%% XXX: Due to the ExHook Module in the enterprise,
%% this process may start multiple times and they will share this table
@ -111,7 +113,13 @@ init([Servers, AutoReconnect, ReqOpts]) ->
ok
end,
%% put the global option
put_request_failed_action(
maps:get(request_failed_action, ReqOpts0, deny)
),
%% Load the hook servers
ReqOpts = maps:without([request_failed_action], ReqOpts0),
{Waiting, Running} = load_all_servers(Servers, ReqOpts),
{ok, ensure_reload_timer(
#state{waiting = Waiting,
@ -272,6 +280,12 @@ clean_reload_timer(Name, State = #state{trefs = TRefs}) ->
%%--------------------------------------------------------------------
%% Server state persistent
put_request_failed_action(Val) ->
persistent_term:put({?APP, request_failed_action}, Val).
get_request_failed_action() ->
persistent_term:get({?APP, request_failed_action}).
save(Name, ServerState) ->
Saved = persistent_term:get(?APP, []),
persistent_term:put(?APP, lists:reverse([Name | Saved])),

View File

@ -47,13 +47,18 @@ init([]) ->
{ok, {{one_for_one, 10, 100}, [Mngr]}}.
servers() ->
application:get_env(emqx_exhook, servers, []).
env(servers, []).
auto_reconnect() ->
application:get_env(emqx_exhook, auto_reconnect, 60000).
env(auto_reconnect, 60000).
request_options() ->
#{timeout => application:get_env(emqx_exhook, request_timeout, 5000)}.
#{timeout => env(request_timeout, 5000),
request_failed_action => env(request_failed_action, deny)
}.
env(Key, Def) ->
application:get_env(emqx_exhook, Key, Def).
%%--------------------------------------------------------------------
%% APIs

View File

@ -55,6 +55,26 @@ t_noserver_nohook(_) ->
ok = emqx_exhook:enable(default),
?assertNotEqual([], ets:tab2list(emqx_hooks)).
t_access_failed_if_no_server_running(_) ->
emqx_exhook:disable(default),
ClientInfo = #{clientid => <<"user-id-1">>,
username => <<"usera">>,
peerhost => {127,0,0,1},
sockport => 1883,
protocol => mqtt,
mountpoint => undefined
},
?assertMatch({stop, #{auth_result := not_authorized}},
emqx_exhook_handler:on_client_authenticate(ClientInfo, #{auth_result => success})),
?assertMatch({stop, deny},
emqx_exhook_handler:on_client_check_acl(ClientInfo, publish, <<"t/1">>, allow)),
Message = emqx_message:make(<<"t/1">>, <<"abc">>),
?assertMatch({stop, Message},
emqx_exhook_handler:on_message_publish(Message)),
emqx_exhook:enable(default).
t_cli_list(_) ->
meck_print(),
?assertEqual( [[emqx_exhook_server:format(emqx_exhook_mngr:server(Name)) || Name <- emqx_exhook:list()]]