From 3cef377b33d8cd1523e93a4c9fa1f5cb5fa1d606 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 11 Aug 2021 19:41:07 +0800 Subject: [PATCH] feat(exhook): make request_failed_action working --- apps/emqx_exhook/etc/emqx_exhook.conf | 38 +++++++++++++++++ apps/emqx_exhook/src/emqx_exhook.erl | 47 ++++++++++++++++----- apps/emqx_exhook/src/emqx_exhook_mngr.erl | 16 ++++++- apps/emqx_exhook/src/emqx_exhook_sup.erl | 11 +++-- apps/emqx_exhook/test/emqx_exhook_SUITE.erl | 20 +++++++++ 5 files changed, 117 insertions(+), 15 deletions(-) create mode 100644 apps/emqx_exhook/etc/emqx_exhook.conf diff --git a/apps/emqx_exhook/etc/emqx_exhook.conf b/apps/emqx_exhook/etc/emqx_exhook.conf new file mode 100644 index 000000000..4df798fed --- /dev/null +++ b/apps/emqx_exhook/etc/emqx_exhook.conf @@ -0,0 +1,38 @@ +##==================================================================== +## EMQ X Hooks +##==================================================================== + +exhook: { + ## The default value or action will be returned, while the request to + ## the gRPC server failed or no available grpc server running. + ## + ## Default: deny + ## Value: ignore | deny + request_failed_action: deny + + ## The timeout to request grpc server + ## + ## Default: 5s + ## Value: Duration + request_timeout: 5s + + ## Whether to automatically reconnect (initialize) the gRPC server + ## + ## When gRPC is not available, exhook tries to request the gRPC service at + ## that interval and reinitialize the list of mounted hooks. + ## + ## Default: false + ## Value: false | Duration + auto_reconnect: 60s + + servers: [ + # { name: "default" + # url: "http://127.0.0.1:9000" + # #ssl: { + # # cacertfile: "{{ platform_etc_dir }}/certs/cacert.pem" + # # certfile: "{{ platform_etc_dir }}/certs/cert.pem" + # # keyfile: "{{ platform_etc_dir }}/certs/key.pem" + # #} + # } + ] +} diff --git a/apps/emqx_exhook/src/emqx_exhook.erl b/apps/emqx_exhook/src/emqx_exhook.erl index 008c3d241..8558a91ad 100644 --- a/apps/emqx_exhook/src/emqx_exhook.erl +++ b/apps/emqx_exhook/src/emqx_exhook.erl @@ -64,29 +64,54 @@ cast(Hookpoint, Req) -> cast(_, _, []) -> ok; -cast(Hookpoint, Req, [ServiceName|More]) -> +cast(Hookpoint, Req, [ServerName|More]) -> %% XXX: Need a real asynchronous running _ = emqx_exhook_server:call(Hookpoint, Req, - emqx_exhook_mngr:server(ServiceName)), + emqx_exhook_mngr:server(ServerName)), cast(Hookpoint, Req, More). -spec call_fold(atom(), term(), function()) -> {ok, term()} | {stop, term()}. call_fold(Hookpoint, Req, AccFun) -> - call_fold(Hookpoint, Req, AccFun, emqx_exhook_mngr:running()). + FailedAction = emqx_exhook_mngr:get_request_failed_action(), + ServerNames = emqx_exhook_mngr:running(), + case ServerNames == [] andalso FailedAction == deny of + true -> + {stop, deny_action_result(Hookpoint, Req)}; + _ -> + call_fold(Hookpoint, Req, FailedAction, AccFun, ServerNames) + end. -call_fold(_, Req, _, []) -> +call_fold(_, Req, _, _, []) -> {ok, Req}; -call_fold(Hookpoint, Req, AccFun, [ServiceName|More]) -> - case emqx_exhook_server:call(Hookpoint, Req, - emqx_exhook_mngr:server(ServiceName)) of +call_fold(Hookpoint, Req, FailedAction, AccFun, [ServerName|More]) -> + Server = emqx_exhook_mngr:server(ServerName), + case emqx_exhook_server:call(Hookpoint, Req, Server) of {ok, Resp} -> case AccFun(Req, Resp) of - {stop, NReq} -> {stop, NReq}; - {ok, NReq} -> call_fold(Hookpoint, NReq, AccFun, More); - _ -> call_fold(Hookpoint, Req, AccFun, More) + {stop, NReq} -> + {stop, NReq}; + {ok, NReq} -> + call_fold(Hookpoint, NReq, FailedAction, AccFun, More); + _ -> + call_fold(Hookpoint, Req, FailedAction, AccFun, More) end; _ -> - call_fold(Hookpoint, Req, AccFun, More) + case FailedAction of + deny -> + {stop, deny_action_result(Hookpoint, Req)}; + _ -> + call_fold(Hookpoint, Req, FailedAction, AccFun, More) + end end. + +%% XXX: Hard-coded the deny response +deny_action_result('client.authenticate', _) -> + #{result => false}; +deny_action_result('client.check_acl', _) -> + #{result => false}; +deny_action_result('message.publish', Msg) -> + %% TODO: Not support to deny a message + %% maybe we can put the 'allow_publish' into message header + Msg. diff --git a/apps/emqx_exhook/src/emqx_exhook_mngr.erl b/apps/emqx_exhook/src/emqx_exhook_mngr.erl index e5796cd7e..cadd5eb37 100644 --- a/apps/emqx_exhook/src/emqx_exhook_mngr.erl +++ b/apps/emqx_exhook/src/emqx_exhook_mngr.erl @@ -34,6 +34,8 @@ %% Helper funcs -export([ running/0 , server/1 + , put_request_failed_action/1 + , get_request_failed_action/0 ]). %% gen_server callbacks @@ -100,7 +102,7 @@ call(Pid, Req) -> %% gen_server callbacks %%-------------------------------------------------------------------- -init([Servers, AutoReconnect, ReqOpts]) -> +init([Servers, AutoReconnect, ReqOpts0]) -> process_flag(trap_exit, true), %% XXX: Due to the ExHook Module in the enterprise, %% this process may start multiple times and they will share this table @@ -111,7 +113,13 @@ init([Servers, AutoReconnect, ReqOpts]) -> ok end, + %% put the global option + put_request_failed_action( + maps:get(request_failed_action, ReqOpts0, deny) + ), + %% Load the hook servers + ReqOpts = maps:without([request_failed_action], ReqOpts0), {Waiting, Running} = load_all_servers(Servers, ReqOpts), {ok, ensure_reload_timer( #state{waiting = Waiting, @@ -272,6 +280,12 @@ clean_reload_timer(Name, State = #state{trefs = TRefs}) -> %%-------------------------------------------------------------------- %% Server state persistent +put_request_failed_action(Val) -> + persistent_term:put({?APP, request_failed_action}, Val). + +get_request_failed_action() -> + persistent_term:get({?APP, request_failed_action}). + save(Name, ServerState) -> Saved = persistent_term:get(?APP, []), persistent_term:put(?APP, lists:reverse([Name | Saved])), diff --git a/apps/emqx_exhook/src/emqx_exhook_sup.erl b/apps/emqx_exhook/src/emqx_exhook_sup.erl index 6afed3d80..e9c405de0 100644 --- a/apps/emqx_exhook/src/emqx_exhook_sup.erl +++ b/apps/emqx_exhook/src/emqx_exhook_sup.erl @@ -47,13 +47,18 @@ init([]) -> {ok, {{one_for_one, 10, 100}, [Mngr]}}. servers() -> - application:get_env(emqx_exhook, servers, []). + env(servers, []). auto_reconnect() -> - application:get_env(emqx_exhook, auto_reconnect, 60000). + env(auto_reconnect, 60000). request_options() -> - #{timeout => application:get_env(emqx_exhook, request_timeout, 5000)}. + #{timeout => env(request_timeout, 5000), + request_failed_action => env(request_failed_action, deny) + }. + +env(Key, Def) -> + application:get_env(emqx_exhook, Key, Def). %%-------------------------------------------------------------------- %% APIs diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl index 52664f57a..88964487c 100644 --- a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl @@ -58,6 +58,26 @@ t_noserver_nohook(_) -> ok = emqx_exhook:enable(default), ?assertNotEqual([], ets:tab2list(emqx_hooks)). +t_access_failed_if_no_server_running(_) -> + emqx_exhook:disable(default), + ClientInfo = #{clientid => <<"user-id-1">>, + username => <<"usera">>, + peerhost => {127,0,0,1}, + sockport => 1883, + protocol => mqtt, + mountpoint => undefined + }, + ?assertMatch({stop, #{auth_result := not_authorized}}, + emqx_exhook_handler:on_client_authenticate(ClientInfo, #{auth_result => success})), + + ?assertMatch({stop, deny}, + emqx_exhook_handler:on_client_check_acl(ClientInfo, publish, <<"t/1">>, allow)), + + Message = emqx_message:make(<<"t/1">>, <<"abc">>), + ?assertMatch({stop, Message}, + emqx_exhook_handler:on_message_publish(Message)), + emqx_exhook:enable(default). + t_cli_list(_) -> meck_print(), ?assertEqual( [[emqx_exhook_server:format(emqx_exhook_mngr:server(Name)) || Name <- emqx_exhook:list()]]