emqx/apps/emqx_exhook/src/emqx_exhook_server.erl

500 lines
16 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_exhook_server).
-include("emqx_exhook.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_hooks.hrl").
%% The exhook proto version should be fixed as `v2` in EMQX v5.x
%% to make sure the exhook proto version is compatible
-define(PB_CLIENT_MOD, emqx_exhook_v_2_hook_provider_client).
%% Load/Unload
-export([
load/2,
unload/1
]).
%% APIs
-export([call/3]).
%% Infos
-export([
name/1,
hooks/1,
format/1,
failed_action/1
]).
-ifdef(TEST).
-export([hk2func/1]).
-endif.
%% Server name (equal to grpc client channel name)
-type server() :: #{
name := binary(),
%% The function options
options := map(),
%% gRPC channel pid
channel := pid(),
%% Registered hook names and options
hookspec := #{hookpoint() => map()},
%% Metrcis name prefix
prefix := list()
}.
-type hookpoint() ::
'client.connect'
| 'client.connack'
| 'client.connected'
| 'client.disconnected'
| 'client.authenticate'
| 'client.authorize'
| 'client.subscribe'
| 'client.unsubscribe'
| 'session.created'
| 'session.subscribed'
| 'session.unsubscribed'
| 'session.resumed'
| 'session.discarded'
| 'session.takenover'
| 'session.terminated'
| 'message.publish'
| 'message.delivered'
| 'message.acked'
| 'message.dropped'.
-export_type([server/0, hookpoint/0]).
-dialyzer({nowarn_function, [inc_metrics/2]}).
-elvis([{elvis_style, dont_repeat_yourself, disable}]).
%%--------------------------------------------------------------------
%% Load/Unload APIs
%%--------------------------------------------------------------------
-spec load(binary(), map()) -> {ok, server()} | {error, term()} | {load_error, term()} | disable.
load(_Name, #{enable := false}) ->
disable;
load(Name, #{request_timeout := Timeout, failed_action := FailedAction} = Opts) ->
ReqOpts = #{timeout => Timeout, failed_action => FailedAction},
case channel_opts(Opts) of
{ok, {SvrAddr, ClientOpts}} ->
case
emqx_exhook_sup:start_grpc_client_channel(
Name,
SvrAddr,
ClientOpts
)
of
{ok, _ChannPoolPid} ->
case do_init(Name, ReqOpts) of
{ok, HookSpecs} ->
%% Register metrics
Prefix = lists:flatten(io_lib:format("exhook.~ts.", [Name])),
ensure_metrics(Prefix, HookSpecs),
%% Ensure hooks
ensure_hooks(HookSpecs),
{ok, #{
name => Name,
options => ReqOpts,
channel => _ChannPoolPid,
hookspec => HookSpecs,
prefix => Prefix
}};
{error, Reason} ->
emqx_exhook_sup:stop_grpc_client_channel(Name),
{load_error, Reason}
end;
{error, _} = E ->
E
end;
Error ->
Error
end.
%% @private
channel_opts(Opts = #{url := URL, socket_options := SockOptsT}) ->
ClientOpts = maps:merge(
#{pool_size => erlang:system_info(schedulers)},
Opts
),
SockOpts = maps:to_list(SockOptsT),
case uri_string:parse(URL) of
#{scheme := <<"http">>, host := Host, port := Port} ->
NClientOpts = ClientOpts#{
gun_opts =>
#{transport_opts => SockOpts}
},
{ok, {format_http_uri("http", Host, Port), NClientOpts}};
#{scheme := <<"https">>, host := Host, port := Port} ->
SslOpts =
case maps:get(ssl, Opts, undefined) of
undefined ->
[];
#{enable := false} ->
[];
MapOpts ->
filter(
[
{cacertfile, maps:get(cacertfile, MapOpts, undefined)},
{certfile, maps:get(certfile, MapOpts, undefined)},
{keyfile, maps:get(keyfile, MapOpts, undefined)}
]
)
end,
NClientOpts = ClientOpts#{
gun_opts =>
#{
transport => ssl,
transport_opts => SockOpts ++ SslOpts
}
},
{ok, {format_http_uri("https", Host, Port), NClientOpts}};
Error ->
{error, {bad_server_url, URL, Error}}
end.
format_http_uri(Scheme, Host, Port) ->
lists:flatten(io_lib:format("~ts://~ts:~w", [Scheme, Host, Port])).
filter(Ls) ->
[E || E <- Ls, E /= undefined].
-spec unload(server()) -> ok.
unload(#{name := Name, options := ReqOpts, hookspec := HookSpecs}) ->
_ = may_unload_hooks(HookSpecs),
_ = do_deinit(Name, ReqOpts),
_ = emqx_exhook_sup:stop_grpc_client_channel(Name),
ok.
do_deinit(Name, ReqOpts) ->
%% Override the request timeout to deinit grpc server to
%% avoid emqx_exhook_mgr force killed by upper supervisor
NReqOpts = ReqOpts#{timeout => ?SERVER_FORCE_SHUTDOWN_TIMEOUT},
_ = do_call(Name, undefined, 'on_provider_unloaded', #{}, NReqOpts),
ok.
do_init(ChannName, ReqOpts) ->
%% BrokerInfo defined at: exhook.protos
BrokerInfo = maps:with(
[version, sysdescr, uptime, datetime],
maps:from_list(emqx_sys:info())
),
Req = #{broker => BrokerInfo},
case do_call(ChannName, undefined, 'on_provider_loaded', Req, ReqOpts) of
{ok, InitialResp} ->
try
{ok, resolve_hookspec(maps:get(hooks, InitialResp, []))}
catch
_:Reason:Stk ->
?SLOG(error, #{
msg => "failed_to_init_channel",
channel_name => ChannName,
reason => Reason,
stacktrace => Stk
}),
{error, Reason}
end;
{error, Reason} ->
{error, Reason}
end.
%% @private
resolve_hookspec(HookSpecs) when is_list(HookSpecs) ->
MessageHooks = message_hooks(),
AvailableHooks = available_hooks(),
lists:foldr(
fun(HookSpec, Acc) ->
case maps:get(name, HookSpec, undefined) of
undefined ->
Acc;
Name0 ->
Name =
try
binary_to_existing_atom(Name0, utf8)
catch
T:R:_ -> {T, R}
end,
case {lists:member(Name, AvailableHooks), lists:member(Name, MessageHooks)} of
{false, _} ->
error({unknown_hookpoint, Name0});
{true, false} ->
Acc#{Name => #{}};
{true, true} ->
Acc#{
Name => #{
topics => maps:get(topics, HookSpec, [])
}
}
end
end
end,
#{},
HookSpecs
).
ensure_metrics(Prefix, HookSpecs) ->
Keys = [
list_to_atom(Prefix ++ atom_to_list(Hookpoint))
|| Hookpoint <- maps:keys(HookSpecs)
],
lists:foreach(fun emqx_metrics:ensure/1, Keys).
ensure_hooks(HookSpecs) ->
lists:foreach(
fun(Hookpoint) ->
case lists:keyfind(Hookpoint, 1, ?ENABLED_HOOKS) of
false ->
?SLOG(error, #{msg => "skipped_unknown_hookpoint", hookpoint => Hookpoint});
{Hookpoint, {M, F, A}} ->
emqx_hooks:put(Hookpoint, {M, F, A}, ?HP_EXHOOK),
ets:update_counter(?HOOKS_REF_COUNTER, Hookpoint, {2, 1}, {Hookpoint, 0})
end
end,
maps:keys(HookSpecs)
).
may_unload_hooks(HookSpecs) ->
lists:foreach(
fun(Hookpoint) ->
case ets:update_counter(?HOOKS_REF_COUNTER, Hookpoint, {2, -1}, {Hookpoint, 0}) of
Cnt when Cnt =< 0 ->
case lists:keyfind(Hookpoint, 1, ?ENABLED_HOOKS) of
{Hookpoint, {M, F, _A}} ->
emqx_hooks:del(Hookpoint, {M, F});
_ ->
ok
end,
ets:delete(?HOOKS_REF_COUNTER, Hookpoint);
_ ->
ok
end
end,
maps:keys(HookSpecs)
).
format(#{name := Name, hookspec := Hooks}) ->
lists:flatten(
io_lib:format("name=~ts, hooks=~0p, active=true", [Name, Hooks])
).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
name(#{name := Name}) ->
Name.
hooks(#{hookspec := Hooks}) ->
FoldFun = fun(Hook, Params, Acc) ->
[
#{
name => Hook,
params => Params
}
| Acc
]
end,
maps:fold(FoldFun, [], Hooks).
-spec call(hookpoint(), map(), server()) ->
ignore
| {ok, Resp :: term()}
| {error, term()}.
call(Hookpoint, Req, #{
name := ChannName,
options := ReqOpts,
hookspec := Hooks,
prefix := Prefix
}) ->
case maps:get(Hookpoint, Hooks, undefined) of
undefined ->
ignore;
Opts ->
NeedCall =
case lists:member(Hookpoint, message_hooks()) of
false ->
true;
_ ->
#{message := #{topic := Topic}} = Req,
match_topic_filter(Topic, maps:get(topics, Opts, []))
end,
case NeedCall of
false ->
ignore;
_ ->
inc_metrics(Prefix, Hookpoint),
GrpcFun = hk2func(Hookpoint),
do_call(ChannName, Hookpoint, GrpcFun, Req, ReqOpts)
end
end.
%% @private
inc_metrics(IncFun, Name) when is_function(IncFun) ->
%% BACKW: e4.2.0-e4.2.2
{env, [Prefix | _]} = erlang:fun_info(IncFun, env),
inc_metrics(Prefix, Name);
inc_metrics(Prefix, Name) when is_list(Prefix) ->
emqx_metrics:inc(list_to_atom(Prefix ++ atom_to_list(Name))).
-compile({inline, [match_topic_filter/2]}).
match_topic_filter(_, []) ->
true;
match_topic_filter(TopicName, TopicFilter) ->
lists:any(fun(F) -> emqx_topic:match(TopicName, F) end, TopicFilter).
-ifdef(TEST).
-define(CALL_PB_CLIENT(ChanneName, Fun, Req, Options),
apply(?PB_CLIENT_MOD, Fun, [Req, #{<<"channel">> => ChannName}, Options])
).
-else.
-define(CALL_PB_CLIENT(ChanneName, Fun, Req, Options),
apply(?PB_CLIENT_MOD, Fun, [Req, Options])
).
-endif.
-spec do_call(binary(), atom(), atom(), map(), map()) -> {ok, map()} | {error, term()}.
do_call(ChannName, Hookpoint, Fun, Req, ReqOpts) ->
NReq = Req#{meta => emqx_exhook_handler:request_meta()},
Options = ReqOpts#{
channel => ChannName,
key_dispatch => key_dispatch(NReq)
},
?SLOG(debug, #{
msg => "do_call",
module => ?PB_CLIENT_MOD,
function => Fun,
req => NReq,
options => Options
}),
case catch ?CALL_PB_CLIENT(ChanneName, Fun, NReq, Options) of
{ok, Resp, Metadata} ->
?SLOG(debug, #{msg => "do_call_ok", resp => Resp, metadata => Metadata}),
update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:succeed/2),
{ok, Resp};
{error, {Code, Msg}, _Metadata} ->
?SLOG(error, #{
msg => "exhook_call_error",
module => ?PB_CLIENT_MOD,
function => Fun,
req => NReq,
options => Options,
code => Code,
packet => Msg
}),
update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:failed/2),
{error, {Code, Msg}};
{error, Reason} ->
?SLOG(error, #{
msg => "exhook_call_error",
module => ?PB_CLIENT_MOD,
function => Fun,
req => NReq,
options => Options,
reason => Reason
}),
update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:failed/2),
{error, Reason};
{'EXIT', {Reason, Stk}} ->
?SLOG(error, #{
msg => "exhook_call_exception",
module => ?PB_CLIENT_MOD,
function => Fun,
req => NReq,
options => Options,
stacktrace => Stk
}),
update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:failed/2),
{error, Reason}
end.
update_metrics(undefined, _ChannName, _Fun) ->
ok;
update_metrics(Hookpoint, ChannName, Fun) ->
Fun(ChannName, Hookpoint).
failed_action(#{options := Opts}) ->
maps:get(failed_action, Opts).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
-compile({inline, [hk2func/1]}).
hk2func('client.connect') -> 'on_client_connect';
hk2func('client.connack') -> 'on_client_connack';
hk2func('client.connected') -> 'on_client_connected';
hk2func('client.disconnected') -> 'on_client_disconnected';
hk2func('client.authenticate') -> 'on_client_authenticate';
hk2func('client.authorize') -> 'on_client_authorize';
hk2func('client.subscribe') -> 'on_client_subscribe';
hk2func('client.unsubscribe') -> 'on_client_unsubscribe';
hk2func('session.created') -> 'on_session_created';
hk2func('session.subscribed') -> 'on_session_subscribed';
hk2func('session.unsubscribed') -> 'on_session_unsubscribed';
hk2func('session.resumed') -> 'on_session_resumed';
hk2func('session.discarded') -> 'on_session_discarded';
hk2func('session.takenover') -> 'on_session_takenover';
hk2func('session.terminated') -> 'on_session_terminated';
hk2func('message.publish') -> 'on_message_publish';
hk2func('message.delivered') -> 'on_message_delivered';
hk2func('message.acked') -> 'on_message_acked';
hk2func('message.dropped') -> 'on_message_dropped'.
-compile({inline, [message_hooks/0]}).
message_hooks() ->
[
'message.publish',
'message.delivered',
'message.acked',
'message.dropped'
].
-compile({inline, [available_hooks/0]}).
available_hooks() ->
[
'client.connect',
'client.connack',
'client.connected',
'client.disconnected',
'client.authenticate',
'client.authorize',
'client.subscribe',
'client.unsubscribe',
'session.created',
'session.subscribed',
'session.unsubscribed',
'session.resumed',
'session.discarded',
'session.takenover',
'session.terminated'
| message_hooks()
].
%% @doc Get dispatch_key for each request
key_dispatch(_Req = #{clientinfo := #{clientid := ClientId}}) ->
ClientId;
key_dispatch(_Req = #{conninfo := #{clientid := ClientId}}) ->
ClientId;
key_dispatch(_Req = #{message := #{from := From}}) ->
From;
key_dispatch(_Req) ->
self().