500 lines
16 KiB
Erlang
500 lines
16 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_exhook_server).
|
|
|
|
-include("emqx_exhook.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
-include_lib("emqx/include/emqx_hooks.hrl").
|
|
|
|
%% The exhook proto version should be fixed as `v2` in EMQX v5.x
|
|
%% to make sure the exhook proto version is compatible
|
|
-define(PB_CLIENT_MOD, emqx_exhook_v_2_hook_provider_client).
|
|
|
|
%% Load/Unload
|
|
-export([
|
|
load/2,
|
|
unload/1
|
|
]).
|
|
|
|
%% APIs
|
|
-export([call/3]).
|
|
|
|
%% Infos
|
|
-export([
|
|
name/1,
|
|
hooks/1,
|
|
format/1,
|
|
failed_action/1
|
|
]).
|
|
|
|
-ifdef(TEST).
|
|
-export([hk2func/1]).
|
|
-endif.
|
|
|
|
%% Server name (equal to grpc client channel name)
|
|
-type server() :: #{
|
|
name := binary(),
|
|
%% The function options
|
|
options := map(),
|
|
%% gRPC channel pid
|
|
channel := pid(),
|
|
%% Registered hook names and options
|
|
hookspec := #{hookpoint() => map()},
|
|
%% Metrcis name prefix
|
|
prefix := list()
|
|
}.
|
|
|
|
-type hookpoint() ::
|
|
'client.connect'
|
|
| 'client.connack'
|
|
| 'client.connected'
|
|
| 'client.disconnected'
|
|
| 'client.authenticate'
|
|
| 'client.authorize'
|
|
| 'client.subscribe'
|
|
| 'client.unsubscribe'
|
|
| 'session.created'
|
|
| 'session.subscribed'
|
|
| 'session.unsubscribed'
|
|
| 'session.resumed'
|
|
| 'session.discarded'
|
|
| 'session.takenover'
|
|
| 'session.terminated'
|
|
| 'message.publish'
|
|
| 'message.delivered'
|
|
| 'message.acked'
|
|
| 'message.dropped'.
|
|
|
|
-export_type([server/0, hookpoint/0]).
|
|
|
|
-dialyzer({nowarn_function, [inc_metrics/2]}).
|
|
|
|
-elvis([{elvis_style, dont_repeat_yourself, disable}]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Load/Unload APIs
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec load(binary(), map()) -> {ok, server()} | {error, term()} | {load_error, term()} | disable.
|
|
load(_Name, #{enable := false}) ->
|
|
disable;
|
|
load(Name, #{request_timeout := Timeout, failed_action := FailedAction} = Opts) ->
|
|
ReqOpts = #{timeout => Timeout, failed_action => FailedAction},
|
|
case channel_opts(Opts) of
|
|
{ok, {SvrAddr, ClientOpts}} ->
|
|
case
|
|
emqx_exhook_sup:start_grpc_client_channel(
|
|
Name,
|
|
SvrAddr,
|
|
ClientOpts
|
|
)
|
|
of
|
|
{ok, _ChannPoolPid} ->
|
|
case do_init(Name, ReqOpts) of
|
|
{ok, HookSpecs} ->
|
|
%% Register metrics
|
|
Prefix = lists:flatten(io_lib:format("exhook.~ts.", [Name])),
|
|
ensure_metrics(Prefix, HookSpecs),
|
|
%% Ensure hooks
|
|
ensure_hooks(HookSpecs),
|
|
{ok, #{
|
|
name => Name,
|
|
options => ReqOpts,
|
|
channel => _ChannPoolPid,
|
|
hookspec => HookSpecs,
|
|
prefix => Prefix
|
|
}};
|
|
{error, Reason} ->
|
|
emqx_exhook_sup:stop_grpc_client_channel(Name),
|
|
{load_error, Reason}
|
|
end;
|
|
{error, _} = E ->
|
|
E
|
|
end;
|
|
Error ->
|
|
Error
|
|
end.
|
|
|
|
%% @private
|
|
channel_opts(Opts = #{url := URL, socket_options := SockOptsT}) ->
|
|
ClientOpts = maps:merge(
|
|
#{pool_size => erlang:system_info(schedulers)},
|
|
Opts
|
|
),
|
|
SockOpts = maps:to_list(SockOptsT),
|
|
case uri_string:parse(URL) of
|
|
#{scheme := <<"http">>, host := Host, port := Port} ->
|
|
NClientOpts = ClientOpts#{
|
|
gun_opts =>
|
|
#{transport_opts => SockOpts}
|
|
},
|
|
{ok, {format_http_uri("http", Host, Port), NClientOpts}};
|
|
#{scheme := <<"https">>, host := Host, port := Port} ->
|
|
SslOpts =
|
|
case maps:get(ssl, Opts, undefined) of
|
|
undefined ->
|
|
[];
|
|
#{enable := false} ->
|
|
[];
|
|
MapOpts ->
|
|
filter(
|
|
[
|
|
{cacertfile, maps:get(cacertfile, MapOpts, undefined)},
|
|
{certfile, maps:get(certfile, MapOpts, undefined)},
|
|
{keyfile, maps:get(keyfile, MapOpts, undefined)}
|
|
]
|
|
)
|
|
end,
|
|
NClientOpts = ClientOpts#{
|
|
gun_opts =>
|
|
#{
|
|
transport => ssl,
|
|
transport_opts => SockOpts ++ SslOpts
|
|
}
|
|
},
|
|
{ok, {format_http_uri("https", Host, Port), NClientOpts}};
|
|
Error ->
|
|
{error, {bad_server_url, URL, Error}}
|
|
end.
|
|
|
|
format_http_uri(Scheme, Host, Port) ->
|
|
lists:flatten(io_lib:format("~ts://~ts:~w", [Scheme, Host, Port])).
|
|
|
|
filter(Ls) ->
|
|
[E || E <- Ls, E /= undefined].
|
|
|
|
-spec unload(server()) -> ok.
|
|
unload(#{name := Name, options := ReqOpts, hookspec := HookSpecs}) ->
|
|
_ = may_unload_hooks(HookSpecs),
|
|
_ = do_deinit(Name, ReqOpts),
|
|
_ = emqx_exhook_sup:stop_grpc_client_channel(Name),
|
|
ok.
|
|
|
|
do_deinit(Name, ReqOpts) ->
|
|
%% Override the request timeout to deinit grpc server to
|
|
%% avoid emqx_exhook_mgr force killed by upper supervisor
|
|
NReqOpts = ReqOpts#{timeout => ?SERVER_FORCE_SHUTDOWN_TIMEOUT},
|
|
_ = do_call(Name, undefined, 'on_provider_unloaded', #{}, NReqOpts),
|
|
ok.
|
|
|
|
do_init(ChannName, ReqOpts) ->
|
|
%% BrokerInfo defined at: exhook.protos
|
|
BrokerInfo = maps:with(
|
|
[version, sysdescr, uptime, datetime],
|
|
maps:from_list(emqx_sys:info())
|
|
),
|
|
Req = #{broker => BrokerInfo},
|
|
case do_call(ChannName, undefined, 'on_provider_loaded', Req, ReqOpts) of
|
|
{ok, InitialResp} ->
|
|
try
|
|
{ok, resolve_hookspec(maps:get(hooks, InitialResp, []))}
|
|
catch
|
|
_:Reason:Stk ->
|
|
?SLOG(error, #{
|
|
msg => "failed_to_init_channel",
|
|
channel_name => ChannName,
|
|
reason => Reason,
|
|
stacktrace => Stk
|
|
}),
|
|
{error, Reason}
|
|
end;
|
|
{error, Reason} ->
|
|
{error, Reason}
|
|
end.
|
|
|
|
%% @private
|
|
resolve_hookspec(HookSpecs) when is_list(HookSpecs) ->
|
|
MessageHooks = message_hooks(),
|
|
AvailableHooks = available_hooks(),
|
|
lists:foldr(
|
|
fun(HookSpec, Acc) ->
|
|
case maps:get(name, HookSpec, undefined) of
|
|
undefined ->
|
|
Acc;
|
|
Name0 ->
|
|
Name =
|
|
try
|
|
binary_to_existing_atom(Name0, utf8)
|
|
catch
|
|
T:R:_ -> {T, R}
|
|
end,
|
|
case {lists:member(Name, AvailableHooks), lists:member(Name, MessageHooks)} of
|
|
{false, _} ->
|
|
error({unknown_hookpoint, Name0});
|
|
{true, false} ->
|
|
Acc#{Name => #{}};
|
|
{true, true} ->
|
|
Acc#{
|
|
Name => #{
|
|
topics => maps:get(topics, HookSpec, [])
|
|
}
|
|
}
|
|
end
|
|
end
|
|
end,
|
|
#{},
|
|
HookSpecs
|
|
).
|
|
|
|
ensure_metrics(Prefix, HookSpecs) ->
|
|
Keys = [
|
|
list_to_atom(Prefix ++ atom_to_list(Hookpoint))
|
|
|| Hookpoint <- maps:keys(HookSpecs)
|
|
],
|
|
lists:foreach(fun emqx_metrics:ensure/1, Keys).
|
|
|
|
ensure_hooks(HookSpecs) ->
|
|
lists:foreach(
|
|
fun(Hookpoint) ->
|
|
case lists:keyfind(Hookpoint, 1, ?ENABLED_HOOKS) of
|
|
false ->
|
|
?SLOG(error, #{msg => "skipped_unknown_hookpoint", hookpoint => Hookpoint});
|
|
{Hookpoint, {M, F, A}} ->
|
|
emqx_hooks:put(Hookpoint, {M, F, A}, ?HP_EXHOOK),
|
|
ets:update_counter(?HOOKS_REF_COUNTER, Hookpoint, {2, 1}, {Hookpoint, 0})
|
|
end
|
|
end,
|
|
maps:keys(HookSpecs)
|
|
).
|
|
|
|
may_unload_hooks(HookSpecs) ->
|
|
lists:foreach(
|
|
fun(Hookpoint) ->
|
|
case ets:update_counter(?HOOKS_REF_COUNTER, Hookpoint, {2, -1}, {Hookpoint, 0}) of
|
|
Cnt when Cnt =< 0 ->
|
|
case lists:keyfind(Hookpoint, 1, ?ENABLED_HOOKS) of
|
|
{Hookpoint, {M, F, _A}} ->
|
|
emqx_hooks:del(Hookpoint, {M, F});
|
|
_ ->
|
|
ok
|
|
end,
|
|
ets:delete(?HOOKS_REF_COUNTER, Hookpoint);
|
|
_ ->
|
|
ok
|
|
end
|
|
end,
|
|
maps:keys(HookSpecs)
|
|
).
|
|
|
|
format(#{name := Name, hookspec := Hooks}) ->
|
|
lists:flatten(
|
|
io_lib:format("name=~ts, hooks=~0p, active=true", [Name, Hooks])
|
|
).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% APIs
|
|
%%--------------------------------------------------------------------
|
|
|
|
name(#{name := Name}) ->
|
|
Name.
|
|
|
|
hooks(#{hookspec := Hooks}) ->
|
|
FoldFun = fun(Hook, Params, Acc) ->
|
|
[
|
|
#{
|
|
name => Hook,
|
|
params => Params
|
|
}
|
|
| Acc
|
|
]
|
|
end,
|
|
maps:fold(FoldFun, [], Hooks).
|
|
|
|
-spec call(hookpoint(), map(), server()) ->
|
|
ignore
|
|
| {ok, Resp :: term()}
|
|
| {error, term()}.
|
|
call(Hookpoint, Req, #{
|
|
name := ChannName,
|
|
options := ReqOpts,
|
|
hookspec := Hooks,
|
|
prefix := Prefix
|
|
}) ->
|
|
case maps:get(Hookpoint, Hooks, undefined) of
|
|
undefined ->
|
|
ignore;
|
|
Opts ->
|
|
NeedCall =
|
|
case lists:member(Hookpoint, message_hooks()) of
|
|
false ->
|
|
true;
|
|
_ ->
|
|
#{message := #{topic := Topic}} = Req,
|
|
match_topic_filter(Topic, maps:get(topics, Opts, []))
|
|
end,
|
|
case NeedCall of
|
|
false ->
|
|
ignore;
|
|
_ ->
|
|
inc_metrics(Prefix, Hookpoint),
|
|
GrpcFun = hk2func(Hookpoint),
|
|
do_call(ChannName, Hookpoint, GrpcFun, Req, ReqOpts)
|
|
end
|
|
end.
|
|
|
|
%% @private
|
|
inc_metrics(IncFun, Name) when is_function(IncFun) ->
|
|
%% BACKW: e4.2.0-e4.2.2
|
|
{env, [Prefix | _]} = erlang:fun_info(IncFun, env),
|
|
inc_metrics(Prefix, Name);
|
|
inc_metrics(Prefix, Name) when is_list(Prefix) ->
|
|
emqx_metrics:inc(list_to_atom(Prefix ++ atom_to_list(Name))).
|
|
|
|
-compile({inline, [match_topic_filter/2]}).
|
|
match_topic_filter(_, []) ->
|
|
true;
|
|
match_topic_filter(TopicName, TopicFilter) ->
|
|
lists:any(fun(F) -> emqx_topic:match(TopicName, F) end, TopicFilter).
|
|
|
|
-ifdef(TEST).
|
|
-define(CALL_PB_CLIENT(ChanneName, Fun, Req, Options),
|
|
apply(?PB_CLIENT_MOD, Fun, [Req, #{<<"channel">> => ChannName}, Options])
|
|
).
|
|
-else.
|
|
-define(CALL_PB_CLIENT(ChanneName, Fun, Req, Options),
|
|
apply(?PB_CLIENT_MOD, Fun, [Req, Options])
|
|
).
|
|
-endif.
|
|
|
|
-spec do_call(binary(), atom(), atom(), map(), map()) -> {ok, map()} | {error, term()}.
|
|
do_call(ChannName, Hookpoint, Fun, Req, ReqOpts) ->
|
|
NReq = Req#{meta => emqx_exhook_handler:request_meta()},
|
|
Options = ReqOpts#{
|
|
channel => ChannName,
|
|
key_dispatch => key_dispatch(NReq)
|
|
},
|
|
?SLOG(debug, #{
|
|
msg => "do_call",
|
|
module => ?PB_CLIENT_MOD,
|
|
function => Fun,
|
|
req => NReq,
|
|
options => Options
|
|
}),
|
|
case catch ?CALL_PB_CLIENT(ChanneName, Fun, NReq, Options) of
|
|
{ok, Resp, Metadata} ->
|
|
?SLOG(debug, #{msg => "do_call_ok", resp => Resp, metadata => Metadata}),
|
|
update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:succeed/2),
|
|
{ok, Resp};
|
|
{error, {Code, Msg}, _Metadata} ->
|
|
?SLOG(error, #{
|
|
msg => "exhook_call_error",
|
|
module => ?PB_CLIENT_MOD,
|
|
function => Fun,
|
|
req => NReq,
|
|
options => Options,
|
|
code => Code,
|
|
packet => Msg
|
|
}),
|
|
update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:failed/2),
|
|
{error, {Code, Msg}};
|
|
{error, Reason} ->
|
|
?SLOG(error, #{
|
|
msg => "exhook_call_error",
|
|
module => ?PB_CLIENT_MOD,
|
|
function => Fun,
|
|
req => NReq,
|
|
options => Options,
|
|
reason => Reason
|
|
}),
|
|
update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:failed/2),
|
|
{error, Reason};
|
|
{'EXIT', {Reason, Stk}} ->
|
|
?SLOG(error, #{
|
|
msg => "exhook_call_exception",
|
|
module => ?PB_CLIENT_MOD,
|
|
function => Fun,
|
|
req => NReq,
|
|
options => Options,
|
|
stacktrace => Stk
|
|
}),
|
|
update_metrics(Hookpoint, ChannName, fun emqx_exhook_metrics:failed/2),
|
|
{error, Reason}
|
|
end.
|
|
|
|
update_metrics(undefined, _ChannName, _Fun) ->
|
|
ok;
|
|
update_metrics(Hookpoint, ChannName, Fun) ->
|
|
Fun(ChannName, Hookpoint).
|
|
|
|
failed_action(#{options := Opts}) ->
|
|
maps:get(failed_action, Opts).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Internal funcs
|
|
%%--------------------------------------------------------------------
|
|
|
|
-compile({inline, [hk2func/1]}).
|
|
hk2func('client.connect') -> 'on_client_connect';
|
|
hk2func('client.connack') -> 'on_client_connack';
|
|
hk2func('client.connected') -> 'on_client_connected';
|
|
hk2func('client.disconnected') -> 'on_client_disconnected';
|
|
hk2func('client.authenticate') -> 'on_client_authenticate';
|
|
hk2func('client.authorize') -> 'on_client_authorize';
|
|
hk2func('client.subscribe') -> 'on_client_subscribe';
|
|
hk2func('client.unsubscribe') -> 'on_client_unsubscribe';
|
|
hk2func('session.created') -> 'on_session_created';
|
|
hk2func('session.subscribed') -> 'on_session_subscribed';
|
|
hk2func('session.unsubscribed') -> 'on_session_unsubscribed';
|
|
hk2func('session.resumed') -> 'on_session_resumed';
|
|
hk2func('session.discarded') -> 'on_session_discarded';
|
|
hk2func('session.takenover') -> 'on_session_takenover';
|
|
hk2func('session.terminated') -> 'on_session_terminated';
|
|
hk2func('message.publish') -> 'on_message_publish';
|
|
hk2func('message.delivered') -> 'on_message_delivered';
|
|
hk2func('message.acked') -> 'on_message_acked';
|
|
hk2func('message.dropped') -> 'on_message_dropped'.
|
|
|
|
-compile({inline, [message_hooks/0]}).
|
|
message_hooks() ->
|
|
[
|
|
'message.publish',
|
|
'message.delivered',
|
|
'message.acked',
|
|
'message.dropped'
|
|
].
|
|
|
|
-compile({inline, [available_hooks/0]}).
|
|
available_hooks() ->
|
|
[
|
|
'client.connect',
|
|
'client.connack',
|
|
'client.connected',
|
|
'client.disconnected',
|
|
'client.authenticate',
|
|
'client.authorize',
|
|
'client.subscribe',
|
|
'client.unsubscribe',
|
|
'session.created',
|
|
'session.subscribed',
|
|
'session.unsubscribed',
|
|
'session.resumed',
|
|
'session.discarded',
|
|
'session.takenover',
|
|
'session.terminated'
|
|
| message_hooks()
|
|
].
|
|
|
|
%% @doc Get dispatch_key for each request
|
|
key_dispatch(_Req = #{clientinfo := #{clientid := ClientId}}) ->
|
|
ClientId;
|
|
key_dispatch(_Req = #{conninfo := #{clientid := ClientId}}) ->
|
|
ClientId;
|
|
key_dispatch(_Req = #{message := #{from := From}}) ->
|
|
From;
|
|
key_dispatch(_Req) ->
|
|
self().
|