emqx/apps/emqx_exhook/src/emqx_exhook_handler.erl

371 lines
12 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_exhook_handler).
-include("emqx_exhook.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-logger_header("[ExHook]").
-export([ on_client_connect/2
, on_client_connack/3
, on_client_connected/2
, on_client_disconnected/3
, on_client_authenticate/2
, on_client_check_acl/4
, on_client_subscribe/3
, on_client_unsubscribe/3
]).
%% Session Lifecircle Hooks
-export([ on_session_created/2
, on_session_subscribed/3
, on_session_unsubscribed/3
, on_session_resumed/2
, on_session_discarded/2
, on_session_takeovered/2
, on_session_terminated/3
]).
-export([ on_message_publish/1
, on_message_dropped/3
, on_message_delivered/2
, on_message_acked/2
]).
%% Utils
-export([ message/1
, headers/1
, stringfy/1
, merge_responsed_bool/2
, merge_responsed_message/2
, assign_to_message/2
, clientinfo/1
]).
-import(emqx_exhook,
[ cast/2
, call_fold/3
]).
-elvis([{elvis_style, god_modules, disable}]).
-define(STOP_OR_OK(Res),
(Res =:= ok orelse Res =:= stop)).
%%--------------------------------------------------------------------
%% Clients
%%--------------------------------------------------------------------
on_client_connect(ConnInfo, Props) ->
Req = #{conninfo => conninfo(ConnInfo),
props => properties(Props)
},
cast('client.connect', Req).
on_client_connack(ConnInfo, Rc, Props) ->
Req = #{conninfo => conninfo(ConnInfo),
result_code => stringfy(Rc),
props => properties(Props)},
cast('client.connack', Req).
on_client_connected(ClientInfo, _ConnInfo) ->
Req = #{clientinfo => clientinfo(ClientInfo)},
cast('client.connected', Req).
on_client_disconnected(ClientInfo, Reason, _ConnInfo) ->
Req = #{clientinfo => clientinfo(ClientInfo),
reason => stringfy(Reason)
},
cast('client.disconnected', Req).
on_client_authenticate(ClientInfo, AuthResult) ->
%% XXX: Bool is missing more information about the atom of the result
%% So, the `Req` has missed detailed info too.
%%
%% The return value of `call_fold` just a bool, that has missed
%% detailed info too.
%%
Bool = maps:get(auth_result, AuthResult, undefined) == success,
Req = #{clientinfo => clientinfo(ClientInfo),
result => Bool
},
case call_fold('client.authenticate', Req,
fun merge_responsed_bool/2) of
{StopOrOk, #{result := Result0}}
when is_boolean(Result0) andalso ?STOP_OR_OK(StopOrOk) ->
Result = case Result0 of true -> success; _ -> not_authorized end,
{StopOrOk, AuthResult#{auth_result => Result, anonymous => false}};
ignore ->
{ok, AuthResult}
end.
on_client_check_acl(ClientInfo, PubSub, Topic, Result) ->
Bool = Result == allow,
Type = case PubSub of
publish -> 'PUBLISH';
subscribe -> 'SUBSCRIBE'
end,
Req = #{clientinfo => clientinfo(ClientInfo),
type => Type,
topic => Topic,
result => Bool
},
case call_fold('client.check_acl', Req,
fun merge_responsed_bool/2) of
{StopOrOk, #{result := Result0}}
when is_boolean(Result0) andalso ?STOP_OR_OK(StopOrOk) ->
NResult = case Result0 of true -> allow; _ -> deny end,
{StopOrOk, NResult};
ignore -> {ok, Result}
end.
on_client_subscribe(ClientInfo, Props, TopicFilters) ->
Req = #{clientinfo => clientinfo(ClientInfo),
props => properties(Props),
topic_filters => topicfilters(TopicFilters)
},
cast('client.subscribe', Req).
on_client_unsubscribe(ClientInfo, Props, TopicFilters) ->
Req = #{clientinfo => clientinfo(ClientInfo),
props => properties(Props),
topic_filters => topicfilters(TopicFilters)
},
cast('client.unsubscribe', Req).
%%--------------------------------------------------------------------
%% Session
%%--------------------------------------------------------------------
on_session_created(ClientInfo, _SessInfo) ->
Req = #{clientinfo => clientinfo(ClientInfo)},
cast('session.created', Req).
on_session_subscribed(ClientInfo, Topic, SubOpts) ->
Req = #{clientinfo => clientinfo(ClientInfo),
topic => Topic,
subopts => maps:with([qos, share, rh, rap, nl], SubOpts)
},
cast('session.subscribed', Req).
on_session_unsubscribed(ClientInfo, Topic, _SubOpts) ->
Req = #{clientinfo => clientinfo(ClientInfo),
topic => Topic
},
cast('session.unsubscribed', Req).
on_session_resumed(ClientInfo, _SessInfo) ->
Req = #{clientinfo => clientinfo(ClientInfo)},
cast('session.resumed', Req).
on_session_discarded(ClientInfo, _SessInfo) ->
Req = #{clientinfo => clientinfo(ClientInfo)},
cast('session.discarded', Req).
on_session_takeovered(ClientInfo, _SessInfo) ->
Req = #{clientinfo => clientinfo(ClientInfo)},
cast('session.takeovered', Req).
on_session_terminated(ClientInfo, Reason, _SessInfo) ->
Req = #{clientinfo => clientinfo(ClientInfo),
reason => stringfy(Reason)},
cast('session.terminated', Req).
%%--------------------------------------------------------------------
%% Message
%%--------------------------------------------------------------------
on_message_publish(#message{topic = <<"$SYS/", _/binary>>}) ->
ok;
on_message_publish(Message) ->
Req = #{message => message(Message)},
case call_fold('message.publish', Req,
fun emqx_exhook_handler:merge_responsed_message/2) of
{StopOrOk, #{message := NMessage}}
when ?STOP_OR_OK(StopOrOk) ->
{StopOrOk, assign_to_message(NMessage, Message)};
ignore ->
{ok, Message}
end.
on_message_dropped(#message{topic = <<"$SYS/", _/binary>>}, _By, _Reason) ->
ok;
on_message_dropped(Message, _By, Reason) ->
Req = #{message => message(Message),
reason => stringfy(Reason)
},
cast('message.dropped', Req).
on_message_delivered(_ClientInfo, #message{topic = <<"$SYS/", _/binary>>}) ->
ok;
on_message_delivered(ClientInfo, Message) ->
Req = #{clientinfo => clientinfo(ClientInfo),
message => message(Message)
},
cast('message.delivered', Req).
on_message_acked(_ClientInfo, #message{topic = <<"$SYS/", _/binary>>}) ->
ok;
on_message_acked(ClientInfo, Message) ->
Req = #{clientinfo => clientinfo(ClientInfo),
message => message(Message)
},
cast('message.acked', Req).
%%--------------------------------------------------------------------
%% Types
properties(undefined) -> [];
properties(M) when is_map(M) ->
maps:fold(fun(K, V, Acc) ->
[#{name => stringfy(K),
value => stringfy(V)} | Acc]
end, [], M).
conninfo(_ConnInfo =
#{clientid := ClientId, username := Username, peername := {Peerhost, _},
sockname := {_, SockPort}, proto_name := ProtoName, proto_ver := ProtoVer,
keepalive := Keepalive}) ->
#{node => stringfy(node()),
clientid => ClientId,
username => maybe(Username),
peerhost => ntoa(Peerhost),
sockport => SockPort,
proto_name => ProtoName,
proto_ver => stringfy(ProtoVer),
keepalive => Keepalive}.
clientinfo(ClientInfo =
#{clientid := ClientId, username := Username, peerhost := PeerHost,
sockport := SockPort, protocol := Protocol, mountpoint := Mountpoiont}) ->
#{node => stringfy(node()),
clientid => ClientId,
username => maybe(Username),
password => maybe(maps:get(password, ClientInfo, undefined)),
peerhost => ntoa(PeerHost),
sockport => SockPort,
protocol => stringfy(Protocol),
mountpoint => maybe(Mountpoiont),
is_superuser => maps:get(is_superuser, ClientInfo, false),
anonymous => maps:get(anonymous, ClientInfo, true),
cn => maybe(maps:get(cn, ClientInfo, undefined)),
dn => maybe(maps:get(dn, ClientInfo, undefined))}.
message(#message{id = Id, qos = Qos, from = From, topic = Topic,
payload = Payload, timestamp = Ts, headers = Headers}) ->
#{node => stringfy(node()),
id => emqx_guid:to_hexstr(Id),
qos => Qos,
from => stringfy(From),
topic => Topic,
payload => Payload,
timestamp => Ts,
headers => headers(Headers)
}.
headers(Headers) ->
Ls = [username, protocol, peerhost, allow_publish],
maps:fold(
fun
(_, undefined, Acc) ->
Acc; %% Ignore undefined value
(K, V, Acc) ->
case lists:member(K, Ls) of
true ->
Acc#{atom_to_binary(K) => bin(K, V)};
_ ->
Acc
end
end, #{}, Headers).
bin(K, V) when K == username;
K == protocol;
K == allow_publish ->
bin(V);
bin(peerhost, V) ->
bin(inet:ntoa(V)).
bin(V) when is_binary(V) -> V;
bin(V) when is_atom(V) -> atom_to_binary(V);
bin(V) when is_list(V) -> iolist_to_binary(V).
assign_to_message(InMessage = #{qos := Qos, topic := Topic,
payload := Payload}, Message) ->
NMsg = Message#message{qos = Qos, topic = Topic, payload = Payload},
enrich_header(maps:get(headers, InMessage, #{}), NMsg).
enrich_header(Headers, Message) ->
case maps:get(<<"allow_publish">>, Headers, undefined) of
<<"false">> ->
emqx_message:set_header(allow_publish, false, Message);
<<"true">> ->
emqx_message:set_header(allow_publish, true, Message);
_ ->
Message
end.
topicfilters(Tfs) when is_list(Tfs) ->
[#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs].
ntoa({0,0,0,0,0,16#ffff,AB,CD}) ->
list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}));
ntoa(IP) ->
list_to_binary(inet_parse:ntoa(IP)).
maybe(undefined) -> <<>>;
maybe(B) -> B.
%% @private
stringfy(Term) when is_binary(Term) ->
Term;
stringfy(Term) when is_integer(Term) ->
integer_to_binary(Term);
stringfy(Term) when is_atom(Term) ->
atom_to_binary(Term, utf8);
stringfy(Term) when is_list(Term) ->
list_to_binary(Term);
stringfy(Term) ->
unicode:characters_to_binary((io_lib:format("~0p", [Term]))).
%%--------------------------------------------------------------------
%% Acc funcs
%% see exhook.proto
merge_responsed_bool(_Req, #{type := 'IGNORE'}) ->
ignore;
merge_responsed_bool(Req, #{type := Type, value := {bool_result, NewBool}})
when is_boolean(NewBool) ->
{ret(Type), Req#{result => NewBool}};
merge_responsed_bool(_Req, Resp) ->
?LOG(warning, "Unknown responsed value ~0p to merge to callback chain", [Resp]),
ignore.
merge_responsed_message(_Req, #{type := 'IGNORE'}) ->
ignore;
merge_responsed_message(Req, #{type := Type, value := {message, NMessage}}) ->
{ret(Type), Req#{message => NMessage}};
merge_responsed_message(_Req, Resp) ->
?LOG(warning, "Unknown responsed value ~0p to merge to callback chain", [Resp]),
ignore.
ret('CONTINUE') -> ok;
ret('STOP_AND_RETURN') -> stop.