%%-------------------------------------------------------------------- %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_exhook_handler). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_access_control.hrl"). -export([ on_client_connect/2, on_client_connack/3, on_client_connected/2, on_client_disconnected/3, on_client_authenticate/2, on_client_authorize/4, on_client_subscribe/3, on_client_unsubscribe/3 ]). %% Session Lifecircle Hooks -export([ on_session_created/2, on_session_subscribed/3, on_session_unsubscribed/3, on_session_resumed/2, on_session_discarded/2, on_session_takenover/2, on_session_terminated/3 ]). -export([ on_message_publish/1, on_message_dropped/3, on_message_delivered/2, on_message_acked/2 ]). %% Utils -export([ message/1, headers/1, stringfy/1, merge_responsed_bool/2, merge_responsed_message/2, assign_to_message/2, clientinfo/1, request_meta/0 ]). -import( emqx_exhook, [ cast/2, call_fold/3 ] ). -elvis([{elvis_style, god_modules, disable}]). %%-------------------------------------------------------------------- %% Clients %%-------------------------------------------------------------------- on_client_connect(ConnInfo, Props) -> Req = #{ conninfo => conninfo(ConnInfo), props => properties(Props) }, cast('client.connect', Req). on_client_connack(ConnInfo, Rc, Props) -> Req = #{ conninfo => conninfo(ConnInfo), result_code => stringfy(Rc), props => properties(Props) }, cast('client.connack', Req). on_client_connected(ClientInfo, _ConnInfo) -> Req = #{clientinfo => clientinfo(ClientInfo)}, cast('client.connected', Req). on_client_disconnected(ClientInfo, Reason, _ConnInfo) -> Req = #{ clientinfo => clientinfo(ClientInfo), reason => stringfy(Reason) }, cast('client.disconnected', Req). on_client_authenticate(ClientInfo, AuthResult) -> %% XXX: Bool is missing more information about the atom of the result %% So, the `Req` has missed detailed info too. %% %% The return value of `call_fold` just a bool, that has missed %% detailed info too. %% Bool = AuthResult == ok, Req = #{ clientinfo => clientinfo(ClientInfo), result => Bool }, case call_fold( 'client.authenticate', Req, fun merge_responsed_bool/2 ) of {StopOrOk, #{result := Result0}} when is_boolean(Result0) -> Result = case Result0 of true -> ok; _ -> {error, not_authorized} end, {StopOrOk, Result}; _ -> {ok, AuthResult} end. on_client_authorize(ClientInfo, Action, Topic, Result) -> Bool = maps:get(result, Result, deny) == allow, %% TODO: Support full action in major release Type = case Action of ?authz_action(publish) -> 'PUBLISH'; ?authz_action(subscribe) -> 'SUBSCRIBE' end, Req = #{ clientinfo => clientinfo(ClientInfo), type => Type, topic => emqx_topic:get_shared_real_topic(Topic), result => Bool }, case call_fold( 'client.authorize', Req, fun merge_responsed_bool/2 ) of {StopOrOk, #{result := Result0}} when is_boolean(Result0) -> NResult = case Result0 of true -> allow; _ -> deny end, {StopOrOk, #{result => NResult, from => exhook}}; _ -> {ok, Result} end. on_client_subscribe(ClientInfo, Props, TopicFilters) -> Req = #{ clientinfo => clientinfo(ClientInfo), props => properties(Props), topic_filters => topicfilters(TopicFilters) }, cast('client.subscribe', Req). on_client_unsubscribe(ClientInfo, Props, TopicFilters) -> Req = #{ clientinfo => clientinfo(ClientInfo), props => properties(Props), topic_filters => topicfilters(TopicFilters) }, cast('client.unsubscribe', Req). %%-------------------------------------------------------------------- %% Session %%-------------------------------------------------------------------- on_session_created(ClientInfo, _SessInfo) -> Req = #{clientinfo => clientinfo(ClientInfo)}, cast('session.created', Req). on_session_subscribed(ClientInfo, Topic, SubOpts) -> Req = #{ clientinfo => clientinfo(ClientInfo), topic => emqx_topic:maybe_format_share(Topic), subopts => subopts(SubOpts) }, cast('session.subscribed', Req). on_session_unsubscribed(ClientInfo, Topic, _SubOpts) -> Req = #{ clientinfo => clientinfo(ClientInfo), topic => emqx_topic:maybe_format_share(Topic) %% no subopts when unsub }, cast('session.unsubscribed', Req). on_session_resumed(ClientInfo, _SessInfo) -> Req = #{clientinfo => clientinfo(ClientInfo)}, cast('session.resumed', Req). on_session_discarded(ClientInfo, _SessInfo) -> Req = #{clientinfo => clientinfo(ClientInfo)}, cast('session.discarded', Req). on_session_takenover(ClientInfo, _SessInfo) -> Req = #{clientinfo => clientinfo(ClientInfo)}, cast('session.takenover', Req). on_session_terminated(ClientInfo, Reason, _SessInfo) -> Req = #{ clientinfo => clientinfo(ClientInfo), reason => stringfy(Reason) }, cast('session.terminated', Req). %%-------------------------------------------------------------------- %% Message %%-------------------------------------------------------------------- on_message_publish(#message{topic = <<"$SYS/", _/binary>>}) -> ok; on_message_publish(Message) -> Req = #{message => message(Message)}, case call_fold( 'message.publish', Req, fun emqx_exhook_handler:merge_responsed_message/2 ) of {StopOrOk, #{message := NMessage}} -> {StopOrOk, assign_to_message(NMessage, Message)}; _ -> {ok, Message} end. on_message_dropped(#message{topic = <<"$SYS/", _/binary>>}, _By, _Reason) -> ok; on_message_dropped(Message, _By, Reason) -> Req = #{ message => message(Message), reason => stringfy(Reason) }, cast('message.dropped', Req). on_message_delivered(_ClientInfo, #message{topic = <<"$SYS/", _/binary>>}) -> ok; on_message_delivered(ClientInfo, Message) -> Req = #{ clientinfo => clientinfo(ClientInfo), message => message(Message) }, cast('message.delivered', Req). on_message_acked(_ClientInfo, #message{topic = <<"$SYS/", _/binary>>}) -> ok; on_message_acked(ClientInfo, Message) -> Req = #{ clientinfo => clientinfo(ClientInfo), message => message(Message) }, cast('message.acked', Req). %%-------------------------------------------------------------------- %% Types properties(undefined) -> []; properties(M) when is_map(M) -> maps:fold( fun(K, V, Acc) -> [ #{ name => stringfy(K), value => stringfy(V) } | Acc ] end, [], M ). conninfo( ConnInfo = #{ clientid := ClientId, peername := {Peerhost, PeerPort}, sockname := {_, SockPort} } ) -> Username = maps:get(username, ConnInfo, undefined), ProtoName = maps:get(proto_name, ConnInfo, undefined), ProtoVer = maps:get(proto_ver, ConnInfo, undefined), Keepalive = maps:get(keepalive, ConnInfo, 0), #{ node => stringfy(node()), clientid => ClientId, username => maybe(Username), peerhost => ntoa(Peerhost), peerport => PeerPort, sockport => SockPort, proto_name => ProtoName, proto_ver => stringfy(ProtoVer), keepalive => Keepalive }. clientinfo( ClientInfo = #{ clientid := ClientId, username := Username, peerhost := PeerHost, peerport := PeerPort, sockport := SockPort, protocol := Protocol, mountpoint := Mountpoiont } ) -> #{ node => stringfy(node()), clientid => ClientId, username => maybe(Username), password => maybe(maps:get(password, ClientInfo, undefined)), peerhost => ntoa(PeerHost), peerport => PeerPort, sockport => SockPort, protocol => stringfy(Protocol), mountpoint => maybe(Mountpoiont), is_superuser => maps:get(is_superuser, ClientInfo, false), anonymous => maps:get(anonymous, ClientInfo, true), cn => maybe(maps:get(cn, ClientInfo, undefined)), dn => maybe(maps:get(dn, ClientInfo, undefined)) }. message(#message{ id = Id, qos = Qos, from = From, topic = Topic, payload = Payload, timestamp = Ts, headers = Headers }) -> #{ node => stringfy(node()), id => emqx_guid:to_hexstr(Id), qos => Qos, from => stringfy(From), topic => Topic, payload => Payload, timestamp => Ts, headers => headers(Headers) }. headers(Headers) -> Ls = [username, protocol, peerhost, allow_publish], maps:fold( fun (_, undefined, Acc) -> %% Ignore undefined value Acc; (K, V, Acc) -> case lists:member(K, Ls) of true -> Acc#{atom_to_binary(K) => bin(K, V)}; _ -> Acc end end, #{}, Headers ). bin(K, V) when K == username; K == protocol; K == allow_publish -> bin(V); bin(peerhost, V) -> bin(inet:ntoa(V)). bin(V) when is_binary(V) -> V; bin(V) when is_atom(V) -> atom_to_binary(V); bin(V) when is_list(V) -> iolist_to_binary(V). assign_to_message( InMessage = #{ qos := Qos, topic := Topic, payload := Payload }, Message ) -> NMsg = Message#message{qos = Qos, topic = Topic, payload = Payload}, enrich_header(maps:get(headers, InMessage, #{}), NMsg). enrich_header(Headers, Message) -> case maps:get(<<"allow_publish">>, Headers, undefined) of <<"false">> -> emqx_message:set_header(allow_publish, false, Message); <<"true">> -> emqx_message:set_header(allow_publish, true, Message); _ -> Message end. topicfilters(Tfs) when is_list(Tfs) -> [ #{name => emqx_topic:maybe_format_share(Topic), subopts => subopts(SubOpts)} || {Topic, SubOpts} <- Tfs ]. subopts(SubOpts) -> #{ qos => maps:get(qos, SubOpts, 0), rh => maps:get(rh, SubOpts, 0), rap => maps:get(rap, SubOpts, 0), nl => maps:get(nl, SubOpts, 0) }. ntoa({0, 0, 0, 0, 0, 16#ffff, AB, CD}) -> list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256})); ntoa(IP) -> list_to_binary(inet_parse:ntoa(IP)). maybe(undefined) -> <<>>; maybe(B) -> B. %% @private stringfy(Term) when is_binary(Term) -> Term; stringfy(Term) when is_integer(Term) -> integer_to_binary(Term); stringfy(Term) when is_atom(Term) -> atom_to_binary(Term, utf8); stringfy(Term) -> unicode:characters_to_binary((io_lib:format("~0p", [Term]))). %%-------------------------------------------------------------------- %% Acc funcs %% see exhook.proto merge_responsed_bool(_Req, #{type := 'IGNORE'}) -> ignore; merge_responsed_bool(Req, #{type := Type, value := {bool_result, NewBool}}) when is_boolean(NewBool) -> {ret(Type), Req#{result => NewBool}}; merge_responsed_bool(_Req, Resp) -> ?SLOG(warning, #{msg => "unknown_responsed_value", resp => Resp}), ignore. merge_responsed_message(_Req, #{type := 'IGNORE'}) -> ignore; merge_responsed_message(Req, #{type := Type, value := {message, NMessage}}) -> {ret(Type), Req#{message => NMessage}}; merge_responsed_message(_Req, Resp) -> ?SLOG(warning, #{msg => "unknown_responsed_value", resp => Resp}), ignore. ret('CONTINUE') -> ok; ret('STOP_AND_RETURN') -> stop. request_meta() -> #{ node => stringfy(node()), version => emqx_sys:version(), sysdescr => emqx_sys:sysdescr(), cluster_name => emqx_sys:cluster_name() }.