feat(exhook): make request_failed_action working
This commit is contained in:
parent
6574fc4f14
commit
3cef377b33
|
@ -0,0 +1,38 @@
|
|||
##====================================================================
|
||||
## EMQ X Hooks
|
||||
##====================================================================
|
||||
|
||||
exhook: {
|
||||
## The default value or action will be returned, while the request to
|
||||
## the gRPC server failed or no available grpc server running.
|
||||
##
|
||||
## Default: deny
|
||||
## Value: ignore | deny
|
||||
request_failed_action: deny
|
||||
|
||||
## The timeout to request grpc server
|
||||
##
|
||||
## Default: 5s
|
||||
## Value: Duration
|
||||
request_timeout: 5s
|
||||
|
||||
## Whether to automatically reconnect (initialize) the gRPC server
|
||||
##
|
||||
## When gRPC is not available, exhook tries to request the gRPC service at
|
||||
## that interval and reinitialize the list of mounted hooks.
|
||||
##
|
||||
## Default: false
|
||||
## Value: false | Duration
|
||||
auto_reconnect: 60s
|
||||
|
||||
servers: [
|
||||
# { name: "default"
|
||||
# url: "http://127.0.0.1:9000"
|
||||
# #ssl: {
|
||||
# # cacertfile: "{{ platform_etc_dir }}/certs/cacert.pem"
|
||||
# # certfile: "{{ platform_etc_dir }}/certs/cert.pem"
|
||||
# # keyfile: "{{ platform_etc_dir }}/certs/key.pem"
|
||||
# #}
|
||||
# }
|
||||
]
|
||||
}
|
|
@ -64,29 +64,54 @@ cast(Hookpoint, Req) ->
|
|||
|
||||
cast(_, _, []) ->
|
||||
ok;
|
||||
cast(Hookpoint, Req, [ServiceName|More]) ->
|
||||
cast(Hookpoint, Req, [ServerName|More]) ->
|
||||
%% XXX: Need a real asynchronous running
|
||||
_ = emqx_exhook_server:call(Hookpoint, Req,
|
||||
emqx_exhook_mngr:server(ServiceName)),
|
||||
emqx_exhook_mngr:server(ServerName)),
|
||||
cast(Hookpoint, Req, More).
|
||||
|
||||
-spec call_fold(atom(), term(), function())
|
||||
-> {ok, term()}
|
||||
| {stop, term()}.
|
||||
call_fold(Hookpoint, Req, AccFun) ->
|
||||
call_fold(Hookpoint, Req, AccFun, emqx_exhook_mngr:running()).
|
||||
FailedAction = emqx_exhook_mngr:get_request_failed_action(),
|
||||
ServerNames = emqx_exhook_mngr:running(),
|
||||
case ServerNames == [] andalso FailedAction == deny of
|
||||
true ->
|
||||
{stop, deny_action_result(Hookpoint, Req)};
|
||||
_ ->
|
||||
call_fold(Hookpoint, Req, FailedAction, AccFun, ServerNames)
|
||||
end.
|
||||
|
||||
call_fold(_, Req, _, []) ->
|
||||
call_fold(_, Req, _, _, []) ->
|
||||
{ok, Req};
|
||||
call_fold(Hookpoint, Req, AccFun, [ServiceName|More]) ->
|
||||
case emqx_exhook_server:call(Hookpoint, Req,
|
||||
emqx_exhook_mngr:server(ServiceName)) of
|
||||
call_fold(Hookpoint, Req, FailedAction, AccFun, [ServerName|More]) ->
|
||||
Server = emqx_exhook_mngr:server(ServerName),
|
||||
case emqx_exhook_server:call(Hookpoint, Req, Server) of
|
||||
{ok, Resp} ->
|
||||
case AccFun(Req, Resp) of
|
||||
{stop, NReq} -> {stop, NReq};
|
||||
{ok, NReq} -> call_fold(Hookpoint, NReq, AccFun, More);
|
||||
_ -> call_fold(Hookpoint, Req, AccFun, More)
|
||||
{stop, NReq} ->
|
||||
{stop, NReq};
|
||||
{ok, NReq} ->
|
||||
call_fold(Hookpoint, NReq, FailedAction, AccFun, More);
|
||||
_ ->
|
||||
call_fold(Hookpoint, Req, FailedAction, AccFun, More)
|
||||
end;
|
||||
_ ->
|
||||
call_fold(Hookpoint, Req, AccFun, More)
|
||||
case FailedAction of
|
||||
deny ->
|
||||
{stop, deny_action_result(Hookpoint, Req)};
|
||||
_ ->
|
||||
call_fold(Hookpoint, Req, FailedAction, AccFun, More)
|
||||
end
|
||||
end.
|
||||
|
||||
%% XXX: Hard-coded the deny response
|
||||
deny_action_result('client.authenticate', _) ->
|
||||
#{result => false};
|
||||
deny_action_result('client.check_acl', _) ->
|
||||
#{result => false};
|
||||
deny_action_result('message.publish', Msg) ->
|
||||
%% TODO: Not support to deny a message
|
||||
%% maybe we can put the 'allow_publish' into message header
|
||||
Msg.
|
||||
|
|
|
@ -34,6 +34,8 @@
|
|||
%% Helper funcs
|
||||
-export([ running/0
|
||||
, server/1
|
||||
, put_request_failed_action/1
|
||||
, get_request_failed_action/0
|
||||
]).
|
||||
|
||||
%% gen_server callbacks
|
||||
|
@ -100,7 +102,7 @@ call(Pid, Req) ->
|
|||
%% gen_server callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
init([Servers, AutoReconnect, ReqOpts]) ->
|
||||
init([Servers, AutoReconnect, ReqOpts0]) ->
|
||||
process_flag(trap_exit, true),
|
||||
%% XXX: Due to the ExHook Module in the enterprise,
|
||||
%% this process may start multiple times and they will share this table
|
||||
|
@ -111,7 +113,13 @@ init([Servers, AutoReconnect, ReqOpts]) ->
|
|||
ok
|
||||
end,
|
||||
|
||||
%% put the global option
|
||||
put_request_failed_action(
|
||||
maps:get(request_failed_action, ReqOpts0, deny)
|
||||
),
|
||||
|
||||
%% Load the hook servers
|
||||
ReqOpts = maps:without([request_failed_action], ReqOpts0),
|
||||
{Waiting, Running} = load_all_servers(Servers, ReqOpts),
|
||||
{ok, ensure_reload_timer(
|
||||
#state{waiting = Waiting,
|
||||
|
@ -272,6 +280,12 @@ clean_reload_timer(Name, State = #state{trefs = TRefs}) ->
|
|||
%%--------------------------------------------------------------------
|
||||
%% Server state persistent
|
||||
|
||||
put_request_failed_action(Val) ->
|
||||
persistent_term:put({?APP, request_failed_action}, Val).
|
||||
|
||||
get_request_failed_action() ->
|
||||
persistent_term:get({?APP, request_failed_action}).
|
||||
|
||||
save(Name, ServerState) ->
|
||||
Saved = persistent_term:get(?APP, []),
|
||||
persistent_term:put(?APP, lists:reverse([Name | Saved])),
|
||||
|
|
|
@ -47,13 +47,18 @@ init([]) ->
|
|||
{ok, {{one_for_one, 10, 100}, [Mngr]}}.
|
||||
|
||||
servers() ->
|
||||
application:get_env(emqx_exhook, servers, []).
|
||||
env(servers, []).
|
||||
|
||||
auto_reconnect() ->
|
||||
application:get_env(emqx_exhook, auto_reconnect, 60000).
|
||||
env(auto_reconnect, 60000).
|
||||
|
||||
request_options() ->
|
||||
#{timeout => application:get_env(emqx_exhook, request_timeout, 5000)}.
|
||||
#{timeout => env(request_timeout, 5000),
|
||||
request_failed_action => env(request_failed_action, deny)
|
||||
}.
|
||||
|
||||
env(Key, Def) ->
|
||||
application:get_env(emqx_exhook, Key, Def).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% APIs
|
||||
|
|
|
@ -58,6 +58,26 @@ t_noserver_nohook(_) ->
|
|||
ok = emqx_exhook:enable(default),
|
||||
?assertNotEqual([], ets:tab2list(emqx_hooks)).
|
||||
|
||||
t_access_failed_if_no_server_running(_) ->
|
||||
emqx_exhook:disable(default),
|
||||
ClientInfo = #{clientid => <<"user-id-1">>,
|
||||
username => <<"usera">>,
|
||||
peerhost => {127,0,0,1},
|
||||
sockport => 1883,
|
||||
protocol => mqtt,
|
||||
mountpoint => undefined
|
||||
},
|
||||
?assertMatch({stop, #{auth_result := not_authorized}},
|
||||
emqx_exhook_handler:on_client_authenticate(ClientInfo, #{auth_result => success})),
|
||||
|
||||
?assertMatch({stop, deny},
|
||||
emqx_exhook_handler:on_client_check_acl(ClientInfo, publish, <<"t/1">>, allow)),
|
||||
|
||||
Message = emqx_message:make(<<"t/1">>, <<"abc">>),
|
||||
?assertMatch({stop, Message},
|
||||
emqx_exhook_handler:on_message_publish(Message)),
|
||||
emqx_exhook:enable(default).
|
||||
|
||||
t_cli_list(_) ->
|
||||
meck_print(),
|
||||
?assertEqual( [[emqx_exhook_server:format(emqx_exhook_mngr:server(Name)) || Name <- emqx_exhook:list()]]
|
||||
|
|
Loading…
Reference in New Issue