%%-------------------------------------------------------------------- %% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -module(emqx_web_hook). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). -define(APP, emqx_web_hook). -logger_header("[WebHook]"). -import(inet, [ntoa/1]). %% APIs -export([ register_metrics/0 , load/0 , unload/0 ]). %% Hooks callback -export([ on_client_connect/3 , on_client_connack/4 , on_client_connected/3 , on_client_disconnected/4 , on_client_subscribe/4 , on_client_unsubscribe/4 ]). -export([ on_session_subscribed/4 , on_session_unsubscribed/4 , on_session_terminated/4 ]). -export([ on_message_publish/2 , on_message_delivered/3 , on_message_acked/3 ]). %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- register_metrics() -> lists:foreach(fun emqx_metrics:ensure/1, ['webhook.client_connect', 'webhook.client_connack', 'webhook.client_connected', 'webhook.client_disconnected', 'webhook.client_subscribe', 'webhook.client_unsubscribe', 'webhook.session_subscribed', 'webhook.session_unsubscribed', 'webhook.session_terminated', 'webhook.message_publish', 'webhook.message_delivered', 'webhook.message_acked']). load() -> lists:foreach( fun({Hook, Fun, Filter}) -> emqx:hook(Hook, {?MODULE, Fun, [{Filter}]}) end, parse_rule(application:get_env(?APP, rules, []))). unload() -> lists:foreach( fun({Hook, Fun, _Filter}) -> emqx:unhook(Hook, {?MODULE, Fun}) end, parse_rule(application:get_env(?APP, rules, []))). %%-------------------------------------------------------------------- %% Client connect %%-------------------------------------------------------------------- on_client_connect(ConnInfo = #{clientid := ClientId, username := Username, peername := {Peerhost, _}}, _ConnProp, _Env) -> emqx_metrics:inc('webhook.client_connect'), Params = #{ action => client_connect , node => node() , clientid => ClientId , username => maybe(Username) , ipaddress => iolist_to_binary(ntoa(Peerhost)) , keepalive => maps:get(keepalive, ConnInfo) , proto_ver => maps:get(proto_ver, ConnInfo) }, send_http_request(ClientId, Params). %%-------------------------------------------------------------------- %% Client connack %%-------------------------------------------------------------------- on_client_connack(ConnInfo = #{clientid := ClientId, username := Username, peername := {Peerhost, _}}, Rc, _AckProp, _Env) -> emqx_metrics:inc('webhook.client_connack'), Params = #{ action => client_connack , node => node() , clientid => ClientId , username => maybe(Username) , ipaddress => iolist_to_binary(ntoa(Peerhost)) , keepalive => maps:get(keepalive, ConnInfo) , proto_ver => maps:get(proto_ver, ConnInfo) , connected_at => maps:get(connected_at, ConnInfo) , conn_ack => Rc }, send_http_request(ClientId, Params). %%-------------------------------------------------------------------- %% Client connected %%-------------------------------------------------------------------- on_client_connected(#{clientid := ClientId, username := Username, peerhost := Peerhost}, ConnInfo, _Env) -> emqx_metrics:inc('webhook.client_connected'), Params = #{ action => client_connected , node => node() , clientid => ClientId , username => maybe(Username) , ipaddress => iolist_to_binary(ntoa(Peerhost)) , keepalive => maps:get(keepalive, ConnInfo) , proto_ver => maps:get(proto_ver, ConnInfo) , connected_at => maps:get(connected_at, ConnInfo) }, send_http_request(ClientId, Params). %%-------------------------------------------------------------------- %% Client disconnected %%-------------------------------------------------------------------- on_client_disconnected(ClientInfo, {shutdown, Reason}, ConnInfo, Env) when is_atom(Reason) -> on_client_disconnected(ClientInfo, Reason, ConnInfo, Env); on_client_disconnected(#{clientid := ClientId, username := Username}, Reason, ConnInfo, _Env) -> emqx_metrics:inc('webhook.client_disconnected'), Params = #{ action => client_disconnected , node => node() , clientid => ClientId , username => maybe(Username) , reason => stringfy(maybe(Reason)) , connected_at => maps:get(connected_at, ConnInfo) , disconnected_at => maps:get(disconnected_at, ConnInfo, erlang:system_time(millisecond)) }, send_http_request(ClientId, Params). %%-------------------------------------------------------------------- %% Client subscribe %%-------------------------------------------------------------------- on_client_subscribe(#{clientid := ClientId, username := Username}, _Properties, TopicTable, {Filter}) -> lists:foreach(fun({Topic, Opts}) -> with_filter( fun() -> emqx_metrics:inc('webhook.client_subscribe'), Params = #{ action => client_subscribe , node => node() , clientid => ClientId , username => maybe(Username) , topic => Topic , opts => Opts }, send_http_request(ClientId, Params) end, Topic, Filter) end, TopicTable). %%-------------------------------------------------------------------- %% Client unsubscribe %%-------------------------------------------------------------------- on_client_unsubscribe(#{clientid := ClientId, username := Username}, _Properties, TopicTable, {Filter}) -> lists:foreach(fun({Topic, Opts}) -> with_filter( fun() -> emqx_metrics:inc('webhook.client_unsubscribe'), Params = #{ action => client_unsubscribe , node => node() , clientid => ClientId , username => maybe(Username) , topic => Topic , opts => Opts }, send_http_request(ClientId, Params) end, Topic, Filter) end, TopicTable). %%-------------------------------------------------------------------- %% Session subscribed %%-------------------------------------------------------------------- on_session_subscribed(#{clientid := ClientId, username := Username}, Topic, Opts, {Filter}) -> with_filter( fun() -> emqx_metrics:inc('webhook.session_subscribed'), Params = #{ action => session_subscribed , node => node() , clientid => ClientId , username => maybe(Username) , topic => Topic , opts => Opts }, send_http_request(ClientId, Params) end, Topic, Filter). %%-------------------------------------------------------------------- %% Session unsubscribed %%-------------------------------------------------------------------- on_session_unsubscribed(#{clientid := ClientId, username := Username}, Topic, _Opts, {Filter}) -> with_filter( fun() -> emqx_metrics:inc('webhook.session_unsubscribed'), Params = #{ action => session_unsubscribed , node => node() , clientid => ClientId , username => maybe(Username) , topic => Topic }, send_http_request(ClientId, Params) end, Topic, Filter). %%-------------------------------------------------------------------- %% Session terminated %%-------------------------------------------------------------------- on_session_terminated(Info, {shutdown, Reason}, SessInfo, Env) when is_atom(Reason) -> on_session_terminated(Info, Reason, SessInfo, Env); on_session_terminated(#{clientid := ClientId, username := Username}, Reason, _SessInfo, _Env) -> emqx_metrics:inc('webhook.session_terminated'), Params = #{ action => session_terminated , node => node() , clientid => ClientId , username => maybe(Username) , reason => stringfy(maybe(Reason)) }, send_http_request(ClientId, Params). %%-------------------------------------------------------------------- %% Message publish %%-------------------------------------------------------------------- on_message_publish(Message = #message{topic = <<"$SYS/", _/binary>>}, _Env) -> {ok, Message}; on_message_publish(Message = #message{topic = Topic}, {Filter}) -> with_filter( fun() -> emqx_metrics:inc('webhook.message_publish'), {FromClientId, FromUsername} = parse_from(Message), Params = #{ action => message_publish , node => node() , from_client_id => FromClientId , from_username => FromUsername , topic => Message#message.topic , qos => Message#message.qos , retain => emqx_message:get_flag(retain, Message) , payload => encode_payload(Message#message.payload) , ts => Message#message.timestamp }, send_http_request(FromClientId, Params), {ok, Message} end, Message, Topic, Filter). %%-------------------------------------------------------------------- %% Message deliver %%-------------------------------------------------------------------- on_message_delivered(_ClientInfo,#message{topic = <<"$SYS/", _/binary>>}, _Env) -> ok; on_message_delivered(#{clientid := ClientId, username := Username}, Message = #message{topic = Topic}, {Filter}) -> with_filter( fun() -> emqx_metrics:inc('webhook.message_delivered'), {FromClientId, FromUsername} = parse_from(Message), Params = #{ action => message_delivered , node => node() , clientid => ClientId , username => maybe(Username) , from_client_id => FromClientId , from_username => FromUsername , topic => Message#message.topic , qos => Message#message.qos , retain => emqx_message:get_flag(retain, Message) , payload => encode_payload(Message#message.payload) , ts => Message#message.timestamp }, send_http_request(ClientId, Params) end, Topic, Filter). %%-------------------------------------------------------------------- %% Message acked %%-------------------------------------------------------------------- on_message_acked(_ClientInfo, #message{topic = <<"$SYS/", _/binary>>}, _Env) -> ok; on_message_acked(#{clientid := ClientId, username := Username}, Message = #message{topic = Topic}, {Filter}) -> with_filter( fun() -> emqx_metrics:inc('webhook.message_acked'), {FromClientId, FromUsername} = parse_from(Message), Params = #{ action => message_acked , node => node() , clientid => ClientId , username => maybe(Username) , from_client_id => FromClientId , from_username => FromUsername , topic => Message#message.topic , qos => Message#message.qos , retain => emqx_message:get_flag(retain, Message) , payload => encode_payload(Message#message.payload) , ts => Message#message.timestamp }, send_http_request(ClientId, Params) end, Topic, Filter). %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- send_http_request(ClientID, Params) -> {ok, Path} = application:get_env(?APP, path), Headers = application:get_env(?APP, headers, []), Body = emqx_json:encode(Params), ?LOG(debug, "Send to: ~0p, params: ~s", [Path, Body]), case ehttpc:request({?APP, ClientID}, post, {Path, Headers, Body}) of {ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 -> ok; {ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 -> ok; {ok, StatusCode, _} -> ?LOG(warning, "HTTP request failed with status code: ~p", [StatusCode]), ok; {ok, StatusCode, _, _} -> ?LOG(warning, "HTTP request failed with status code: ~p", [StatusCode]), ok; {error, Reason} -> ?LOG(error, "HTTP request error: ~p", [Reason]), ok end. parse_rule([]) -> []; parse_rule([{Rule, Conf} | Rules]) -> Params = emqx_json:decode(iolist_to_binary(Conf)), Action = proplists:get_value(<<"action">>, Params), Filter = proplists:get_value(<<"topic">>, Params), [{list_to_atom(Rule), binary_to_existing_atom(Action, utf8), Filter} | parse_rule(Rules)]. with_filter(Fun, _, undefined) -> Fun(), ok; with_filter(Fun, Topic, Filter) -> case emqx_topic:match(Topic, Filter) of true -> Fun(), ok; false -> ok end. with_filter(Fun, _, _, undefined) -> Fun(); with_filter(Fun, Msg, Topic, Filter) -> case emqx_topic:match(Topic, Filter) of true -> Fun(); false -> {ok, Msg} end. parse_from(Message) -> {emqx_message:from(Message), maybe(emqx_message:get_header(username, Message))}. encode_payload(Payload) -> encode_payload(Payload, application:get_env(?APP, encoding_of_payload_field, plain)). encode_payload(Payload, base62) -> emqx_base62:encode(Payload); encode_payload(Payload, base64) -> base64:encode(Payload); encode_payload(Payload, plain) -> Payload. stringfy(Term) when is_binary(Term) -> Term; stringfy(Term) when is_atom(Term) -> atom_to_binary(Term, utf8); stringfy(Term) -> unicode:characters_to_binary((io_lib:format("~0p", [Term]))). maybe(undefined) -> null; maybe(Str) -> Str.