emqx/apps/emqx_exhook/src/emqx_exhook_server.erl

327 lines
12 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_exhook_server).
-include("emqx_exhook.hrl").
-include_lib("emqx/include/logger.hrl").
-define(CNTER, emqx_exhook_counter).
-define(PB_CLIENT_MOD, emqx_exhook_v_1_hook_provider_client).
%% Load/Unload
-export([ load/3
, unload/1
]).
%% APIs
-export([call/3]).
%% Infos
-export([ name/1
, format/1
]).
-record(server, {
%% Server name (equal to grpc client channel name)
name :: binary(),
%% The function options
options :: map(),
%% gRPC channel pid
channel :: pid(),
%% Registered hook names and options
hookspec :: #{hookpoint() => map()},
%% Metrcis name prefix
prefix :: list()
}).
-type server() :: #server{}.
-type hookpoint() :: 'client.connect'
| 'client.connack'
| 'client.connected'
| 'client.disconnected'
| 'client.authenticate'
| 'client.authorize'
| 'client.subscribe'
| 'client.unsubscribe'
| 'session.created'
| 'session.subscribed'
| 'session.unsubscribed'
| 'session.resumed'
| 'session.discarded'
| 'session.takeovered'
| 'session.terminated'
| 'message.publish'
| 'message.delivered'
| 'message.acked'
| 'message.dropped'.
-export_type([server/0]).
-dialyzer({nowarn_function, [inc_metrics/2]}).
%%--------------------------------------------------------------------
%% Load/Unload APIs
%%--------------------------------------------------------------------
-spec load(binary(), map(), map()) -> {ok, server()} | {error, term()} .
load(Name, Opts0, ReqOpts) ->
{SvrAddr, ClientOpts} = channel_opts(Opts0),
case emqx_exhook_sup:start_grpc_client_channel(
Name,
SvrAddr,
ClientOpts) of
{ok, _ChannPoolPid} ->
case do_init(Name, ReqOpts) of
{ok, HookSpecs} ->
%% Reigster metrics
Prefix = lists:flatten(
io_lib:format("exhook.~ts.", [Name])),
ensure_metrics(Prefix, HookSpecs),
%% Ensure hooks
ensure_hooks(HookSpecs),
{ok, #server{name = Name,
options = ReqOpts,
channel = _ChannPoolPid,
hookspec = HookSpecs,
prefix = Prefix }};
{error, _} = E ->
emqx_exhook_sup:stop_grpc_client_channel(Name), E
end;
{error, _} = E -> E
end.
%% @private
channel_opts(Opts = #{url := URL}) ->
case uri_string:parse(URL) of
#{scheme := "http", host := Host, port := Port} ->
{format_http_uri("http", Host, Port), #{}};
#{scheme := "https", host := Host, port := Port} ->
SslOpts =
case maps:get(ssl, Opts, undefined) of
undefined -> [];
MapOpts ->
filter(
[{cacertfile, maps:get(cacertfile, MapOpts, undefined)},
{certfile, maps:get(certfile, MapOpts, undefined)},
{keyfile, maps:get(keyfile, MapOpts, undefined)}
])
end,
{format_http_uri("https", Host, Port),
#{gun_opts => #{transport => ssl, transport_opts => SslOpts}}};
_ ->
error(bad_server_url)
end.
format_http_uri(Scheme, Host, Port) ->
lists:flatten(io_lib:format("~ts://~ts:~w", [Scheme, Host, Port])).
filter(Ls) ->
[ E || E <- Ls, E /= undefined].
-spec unload(server()) -> ok.
unload(#server{name = Name, options = ReqOpts, hookspec = HookSpecs}) ->
_ = do_deinit(Name, ReqOpts),
_ = may_unload_hooks(HookSpecs),
_ = emqx_exhook_sup:stop_grpc_client_channel(Name),
ok.
do_deinit(Name, ReqOpts) ->
_ = do_call(Name, 'on_provider_unloaded', #{}, ReqOpts),
ok.
do_init(ChannName, ReqOpts) ->
%% BrokerInfo defined at: exhook.protos
BrokerInfo = maps:with([version, sysdescr, uptime, datetime],
maps:from_list(emqx_sys:info())),
Req = #{broker => BrokerInfo},
case do_call(ChannName, 'on_provider_loaded', Req, ReqOpts) of
{ok, InitialResp} ->
try
{ok, resolve_hookspec(maps:get(hooks, InitialResp, []))}
catch _:Reason:Stk ->
?LOG(error, "try to init ~p failed, reason: ~p, stacktrace: ~0p",
[ChannName, Reason, Stk]),
{error, Reason}
end;
{error, Reason} ->
{error, Reason}
end.
%% @private
resolve_hookspec(HookSpecs) when is_list(HookSpecs) ->
MessageHooks = message_hooks(),
AvailableHooks = available_hooks(),
lists:foldr(fun(HookSpec, Acc) ->
case maps:get(name, HookSpec, undefined) of
undefined -> Acc;
Name0 ->
Name = try binary_to_existing_atom(Name0, utf8) catch T:R:_ -> {T,R} end,
case lists:member(Name, AvailableHooks) of
true ->
case lists:member(Name, MessageHooks) of
true ->
Acc#{Name => #{topics => maps:get(topics, HookSpec, [])}};
_ ->
Acc#{Name => #{}}
end;
_ -> error({unknown_hookpoint, Name})
end
end
end, #{}, HookSpecs).
ensure_metrics(Prefix, HookSpecs) ->
Keys = [list_to_atom(Prefix ++ atom_to_list(Hookpoint))
|| Hookpoint <- maps:keys(HookSpecs)],
lists:foreach(fun emqx_metrics:ensure/1, Keys).
ensure_hooks(HookSpecs) ->
lists:foreach(fun(Hookpoint) ->
case lists:keyfind(Hookpoint, 1, ?ENABLED_HOOKS) of
false ->
?LOG(error, "Unknown name ~ts to hook, skip it!", [Hookpoint]);
{Hookpoint, {M, F, A}} ->
emqx_hooks:put(Hookpoint, {M, F, A}),
ets:update_counter(?CNTER, Hookpoint, {2, 1}, {Hookpoint, 0})
end
end, maps:keys(HookSpecs)).
may_unload_hooks(HookSpecs) ->
lists:foreach(fun(Hookpoint) ->
case ets:update_counter(?CNTER, Hookpoint, {2, -1}, {Hookpoint, 0}) of
Cnt when Cnt =< 0 ->
case lists:keyfind(Hookpoint, 1, ?ENABLED_HOOKS) of
{Hookpoint, {M, F, _A}} ->
emqx_hooks:del(Hookpoint, {M, F});
_ -> ok
end,
ets:delete(?CNTER, Hookpoint);
_ -> ok
end
end, maps:keys(HookSpecs)).
format(#server{name = Name, hookspec = Hooks}) ->
lists:flatten(
io_lib:format("name=~ts, hooks=~0p, active=true", [Name, Hooks])).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
name(#server{name = Name}) ->
Name.
-spec call(hookpoint(), map(), server())
-> ignore
| {ok, Resp :: term()}
| {error, term()}.
call(Hookpoint, Req, #server{name = ChannName, options = ReqOpts,
hookspec = Hooks, prefix = Prefix}) ->
GrpcFunc = hk2func(Hookpoint),
case maps:get(Hookpoint, Hooks, undefined) of
undefined -> ignore;
Opts ->
NeedCall = case lists:member(Hookpoint, message_hooks()) of
false -> true;
_ ->
#{message := #{topic := Topic}} = Req,
match_topic_filter(Topic, maps:get(topics, Opts, []))
end,
case NeedCall of
false -> ignore;
_ ->
inc_metrics(Prefix, Hookpoint),
do_call(ChannName, GrpcFunc, Req, ReqOpts)
end
end.
%% @private
inc_metrics(IncFun, Name) when is_function(IncFun) ->
%% BACKW: e4.2.0-e4.2.2
{env, [Prefix|_]} = erlang:fun_info(IncFun, env),
inc_metrics(Prefix, Name);
inc_metrics(Prefix, Name) when is_list(Prefix) ->
emqx_metrics:inc(list_to_atom(Prefix ++ atom_to_list(Name))).
-compile({inline, [match_topic_filter/2]}).
match_topic_filter(_, []) ->
true;
match_topic_filter(TopicName, TopicFilter) ->
lists:any(fun(F) -> emqx_topic:match(TopicName, F) end, TopicFilter).
-spec do_call(binary(), atom(), map(), map()) -> {ok, map()} | {error, term()}.
do_call(ChannName, Fun, Req, ReqOpts) ->
Options = ReqOpts#{channel => ChannName},
?LOG(debug, "Call ~0p:~0p(~0p, ~0p)", [?PB_CLIENT_MOD, Fun, Req, Options]),
case catch apply(?PB_CLIENT_MOD, Fun, [Req, Options]) of
{ok, Resp, _Metadata} ->
?LOG(debug, "Response {ok, ~0p, ~0p}", [Resp, _Metadata]),
{ok, Resp};
{error, {Code, Msg}, _Metadata} ->
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) response errcode: ~0p, errmsg: ~0p",
[?PB_CLIENT_MOD, Fun, Req, Options, Code, Msg]),
{error, {Code, Msg}};
{error, Reason} ->
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) error: ~0p",
[?PB_CLIENT_MOD, Fun, Req, Options, Reason]),
{error, Reason};
{'EXIT', {Reason, Stk}} ->
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) throw an exception: ~0p, stacktrace: ~0p",
[?PB_CLIENT_MOD, Fun, Req, Options, Reason, Stk]),
{error, Reason}
end.
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------
-compile({inline, [hk2func/1]}).
hk2func('client.connect') -> 'on_client_connect';
hk2func('client.connack') -> 'on_client_connack';
hk2func('client.connected') -> 'on_client_connected';
hk2func('client.disconnected') -> 'on_client_disconnected';
hk2func('client.authenticate') -> 'on_client_authenticate';
hk2func('client.authorize') -> 'on_client_authorize';
hk2func('client.subscribe') -> 'on_client_subscribe';
hk2func('client.unsubscribe') -> 'on_client_unsubscribe';
hk2func('session.created') -> 'on_session_created';
hk2func('session.subscribed') -> 'on_session_subscribed';
hk2func('session.unsubscribed') -> 'on_session_unsubscribed';
hk2func('session.resumed') -> 'on_session_resumed';
hk2func('session.discarded') -> 'on_session_discarded';
hk2func('session.takeovered') -> 'on_session_takeovered';
hk2func('session.terminated') -> 'on_session_terminated';
hk2func('message.publish') -> 'on_message_publish';
hk2func('message.delivered') ->'on_message_delivered';
hk2func('message.acked') -> 'on_message_acked';
hk2func('message.dropped') ->'on_message_dropped'.
-compile({inline, [message_hooks/0]}).
message_hooks() ->
['message.publish', 'message.delivered',
'message.acked', 'message.dropped'].
-compile({inline, [available_hooks/0]}).
available_hooks() ->
['client.connect', 'client.connack', 'client.connected',
'client.disconnected', 'client.authenticate', 'client.authorize',
'client.subscribe', 'client.unsubscribe',
'session.created', 'session.subscribed', 'session.unsubscribed',
'session.resumed', 'session.discarded', 'session.takeovered',
'session.terminated' | message_hooks()].