390 lines
15 KiB
Erlang
390 lines
15 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_web_hook).
|
|
|
|
-include_lib("emqx/include/emqx.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
|
|
-define(APP, emqx_web_hook).
|
|
|
|
-logger_header("[WebHook]").
|
|
|
|
-import(inet, [ntoa/1]).
|
|
|
|
%% APIs
|
|
-export([ register_metrics/0
|
|
, load/0
|
|
, unload/0
|
|
]).
|
|
|
|
%% Hooks callback
|
|
-export([ on_client_connect/3
|
|
, on_client_connack/4
|
|
, on_client_connected/3
|
|
, on_client_disconnected/4
|
|
, on_client_subscribe/4
|
|
, on_client_unsubscribe/4
|
|
]).
|
|
|
|
-export([ on_session_subscribed/4
|
|
, on_session_unsubscribed/4
|
|
, on_session_terminated/4
|
|
]).
|
|
-export([ on_message_publish/2
|
|
, on_message_delivered/3
|
|
, on_message_acked/3
|
|
]).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% APIs
|
|
%%--------------------------------------------------------------------
|
|
|
|
register_metrics() ->
|
|
lists:foreach(fun emqx_metrics:ensure/1,
|
|
['webhook.client_connect',
|
|
'webhook.client_connack',
|
|
'webhook.client_connected',
|
|
'webhook.client_disconnected',
|
|
'webhook.client_subscribe',
|
|
'webhook.client_unsubscribe',
|
|
'webhook.session_subscribed',
|
|
'webhook.session_unsubscribed',
|
|
'webhook.session_terminated',
|
|
'webhook.message_publish',
|
|
'webhook.message_delivered',
|
|
'webhook.message_acked']).
|
|
|
|
load() ->
|
|
lists:foreach(
|
|
fun({Hook, Fun, Filter}) ->
|
|
emqx:hook(Hook, {?MODULE, Fun, [{Filter}]})
|
|
end, parse_rule(application:get_env(?APP, rules, []))).
|
|
|
|
unload() ->
|
|
lists:foreach(
|
|
fun({Hook, Fun, _Filter}) ->
|
|
emqx:unhook(Hook, {?MODULE, Fun})
|
|
end, parse_rule(application:get_env(?APP, rules, []))).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Client connect
|
|
%%--------------------------------------------------------------------
|
|
|
|
on_client_connect(ConnInfo = #{clientid := ClientId, username := Username, peername := {Peerhost, _}}, _ConnProp, _Env) ->
|
|
emqx_metrics:inc('webhook.client_connect'),
|
|
Params = #{ action => client_connect
|
|
, node => node()
|
|
, clientid => ClientId
|
|
, username => maybe(Username)
|
|
, ipaddress => iolist_to_binary(ntoa(Peerhost))
|
|
, keepalive => maps:get(keepalive, ConnInfo)
|
|
, proto_ver => maps:get(proto_ver, ConnInfo)
|
|
},
|
|
send_http_request(ClientId, Params).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Client connack
|
|
%%--------------------------------------------------------------------
|
|
|
|
on_client_connack(ConnInfo = #{clientid := ClientId, username := Username, peername := {Peerhost, _}}, Rc, _AckProp, _Env) ->
|
|
emqx_metrics:inc('webhook.client_connack'),
|
|
Params = #{ action => client_connack
|
|
, node => node()
|
|
, clientid => ClientId
|
|
, username => maybe(Username)
|
|
, ipaddress => iolist_to_binary(ntoa(Peerhost))
|
|
, keepalive => maps:get(keepalive, ConnInfo)
|
|
, proto_ver => maps:get(proto_ver, ConnInfo)
|
|
, connected_at => maps:get(connected_at, ConnInfo)
|
|
, conn_ack => Rc
|
|
},
|
|
send_http_request(ClientId, Params).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Client connected
|
|
%%--------------------------------------------------------------------
|
|
|
|
on_client_connected(#{clientid := ClientId, username := Username, peerhost := Peerhost}, ConnInfo, _Env) ->
|
|
emqx_metrics:inc('webhook.client_connected'),
|
|
Params = #{ action => client_connected
|
|
, node => node()
|
|
, clientid => ClientId
|
|
, username => maybe(Username)
|
|
, ipaddress => iolist_to_binary(ntoa(Peerhost))
|
|
, keepalive => maps:get(keepalive, ConnInfo)
|
|
, proto_ver => maps:get(proto_ver, ConnInfo)
|
|
, connected_at => maps:get(connected_at, ConnInfo)
|
|
},
|
|
send_http_request(ClientId, Params).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Client disconnected
|
|
%%--------------------------------------------------------------------
|
|
|
|
on_client_disconnected(ClientInfo, {shutdown, Reason}, ConnInfo, Env) when is_atom(Reason) ->
|
|
on_client_disconnected(ClientInfo, Reason, ConnInfo, Env);
|
|
on_client_disconnected(#{clientid := ClientId, username := Username}, Reason, ConnInfo, _Env) ->
|
|
emqx_metrics:inc('webhook.client_disconnected'),
|
|
Params = #{ action => client_disconnected
|
|
, node => node()
|
|
, clientid => ClientId
|
|
, username => maybe(Username)
|
|
, reason => stringfy(maybe(Reason))
|
|
, connected_at => maps:get(connected_at, ConnInfo)
|
|
, disconnected_at => maps:get(disconnected_at, ConnInfo, erlang:system_time(millisecond))
|
|
},
|
|
send_http_request(ClientId, Params).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Client subscribe
|
|
%%--------------------------------------------------------------------
|
|
|
|
on_client_subscribe(#{clientid := ClientId, username := Username}, _Properties, TopicTable, {Filter}) ->
|
|
lists:foreach(fun({Topic, Opts}) ->
|
|
with_filter(
|
|
fun() ->
|
|
emqx_metrics:inc('webhook.client_subscribe'),
|
|
Params = #{ action => client_subscribe
|
|
, node => node()
|
|
, clientid => ClientId
|
|
, username => maybe(Username)
|
|
, topic => Topic
|
|
, opts => Opts
|
|
},
|
|
send_http_request(ClientId, Params)
|
|
end, Topic, Filter)
|
|
end, TopicTable).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Client unsubscribe
|
|
%%--------------------------------------------------------------------
|
|
|
|
on_client_unsubscribe(#{clientid := ClientId, username := Username}, _Properties, TopicTable, {Filter}) ->
|
|
lists:foreach(fun({Topic, Opts}) ->
|
|
with_filter(
|
|
fun() ->
|
|
emqx_metrics:inc('webhook.client_unsubscribe'),
|
|
Params = #{ action => client_unsubscribe
|
|
, node => node()
|
|
, clientid => ClientId
|
|
, username => maybe(Username)
|
|
, topic => Topic
|
|
, opts => Opts
|
|
},
|
|
send_http_request(ClientId, Params)
|
|
end, Topic, Filter)
|
|
end, TopicTable).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Session subscribed
|
|
%%--------------------------------------------------------------------
|
|
|
|
on_session_subscribed(#{clientid := ClientId, username := Username}, Topic, Opts, {Filter}) ->
|
|
with_filter(
|
|
fun() ->
|
|
emqx_metrics:inc('webhook.session_subscribed'),
|
|
Params = #{ action => session_subscribed
|
|
, node => node()
|
|
, clientid => ClientId
|
|
, username => maybe(Username)
|
|
, topic => Topic
|
|
, opts => Opts
|
|
},
|
|
send_http_request(ClientId, Params)
|
|
end, Topic, Filter).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Session unsubscribed
|
|
%%--------------------------------------------------------------------
|
|
|
|
on_session_unsubscribed(#{clientid := ClientId, username := Username}, Topic, _Opts, {Filter}) ->
|
|
with_filter(
|
|
fun() ->
|
|
emqx_metrics:inc('webhook.session_unsubscribed'),
|
|
Params = #{ action => session_unsubscribed
|
|
, node => node()
|
|
, clientid => ClientId
|
|
, username => maybe(Username)
|
|
, topic => Topic
|
|
},
|
|
send_http_request(ClientId, Params)
|
|
end, Topic, Filter).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Session terminated
|
|
%%--------------------------------------------------------------------
|
|
|
|
on_session_terminated(Info, {shutdown, Reason}, SessInfo, Env) when is_atom(Reason) ->
|
|
on_session_terminated(Info, Reason, SessInfo, Env);
|
|
on_session_terminated(#{clientid := ClientId, username := Username}, Reason, _SessInfo, _Env) ->
|
|
emqx_metrics:inc('webhook.session_terminated'),
|
|
Params = #{ action => session_terminated
|
|
, node => node()
|
|
, clientid => ClientId
|
|
, username => maybe(Username)
|
|
, reason => stringfy(maybe(Reason))
|
|
},
|
|
send_http_request(ClientId, Params).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Message publish
|
|
%%--------------------------------------------------------------------
|
|
|
|
on_message_publish(Message = #message{topic = <<"$SYS/", _/binary>>}, _Env) ->
|
|
{ok, Message};
|
|
on_message_publish(Message = #message{topic = Topic}, {Filter}) ->
|
|
with_filter(
|
|
fun() ->
|
|
emqx_metrics:inc('webhook.message_publish'),
|
|
{FromClientId, FromUsername} = parse_from(Message),
|
|
Params = #{ action => message_publish
|
|
, node => node()
|
|
, from_client_id => FromClientId
|
|
, from_username => FromUsername
|
|
, topic => Message#message.topic
|
|
, qos => Message#message.qos
|
|
, retain => emqx_message:get_flag(retain, Message)
|
|
, payload => encode_payload(Message#message.payload)
|
|
, ts => Message#message.timestamp
|
|
},
|
|
send_http_request(FromClientId, Params),
|
|
{ok, Message}
|
|
end, Message, Topic, Filter).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Message deliver
|
|
%%--------------------------------------------------------------------
|
|
|
|
on_message_delivered(_ClientInfo,#message{topic = <<"$SYS/", _/binary>>}, _Env) ->
|
|
ok;
|
|
on_message_delivered(#{clientid := ClientId, username := Username},
|
|
Message = #message{topic = Topic}, {Filter}) ->
|
|
with_filter(
|
|
fun() ->
|
|
emqx_metrics:inc('webhook.message_delivered'),
|
|
{FromClientId, FromUsername} = parse_from(Message),
|
|
Params = #{ action => message_delivered
|
|
, node => node()
|
|
, clientid => ClientId
|
|
, username => maybe(Username)
|
|
, from_client_id => FromClientId
|
|
, from_username => FromUsername
|
|
, topic => Message#message.topic
|
|
, qos => Message#message.qos
|
|
, retain => emqx_message:get_flag(retain, Message)
|
|
, payload => encode_payload(Message#message.payload)
|
|
, ts => Message#message.timestamp
|
|
},
|
|
send_http_request(ClientId, Params)
|
|
end, Topic, Filter).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Message acked
|
|
%%--------------------------------------------------------------------
|
|
|
|
on_message_acked(_ClientInfo, #message{topic = <<"$SYS/", _/binary>>}, _Env) ->
|
|
ok;
|
|
on_message_acked(#{clientid := ClientId, username := Username},
|
|
Message = #message{topic = Topic}, {Filter}) ->
|
|
with_filter(
|
|
fun() ->
|
|
emqx_metrics:inc('webhook.message_acked'),
|
|
{FromClientId, FromUsername} = parse_from(Message),
|
|
Params = #{ action => message_acked
|
|
, node => node()
|
|
, clientid => ClientId
|
|
, username => maybe(Username)
|
|
, from_client_id => FromClientId
|
|
, from_username => FromUsername
|
|
, topic => Message#message.topic
|
|
, qos => Message#message.qos
|
|
, retain => emqx_message:get_flag(retain, Message)
|
|
, payload => encode_payload(Message#message.payload)
|
|
, ts => Message#message.timestamp
|
|
},
|
|
send_http_request(ClientId, Params)
|
|
end, Topic, Filter).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Internal functions
|
|
%%--------------------------------------------------------------------
|
|
|
|
send_http_request(ClientID, Params) ->
|
|
{ok, Path} = application:get_env(?APP, path),
|
|
Headers = application:get_env(?APP, headers, []),
|
|
Body = emqx_json:encode(Params),
|
|
?LOG(debug, "Send to: ~0p, params: ~s", [Path, Body]),
|
|
case ehttpc:request({?APP, ClientID}, post, {Path, Headers, Body}) of
|
|
{ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 ->
|
|
ok;
|
|
{ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 ->
|
|
ok;
|
|
{ok, StatusCode, _} ->
|
|
?LOG(warning, "HTTP request failed with status code: ~p", [StatusCode]),
|
|
ok;
|
|
{ok, StatusCode, _, _} ->
|
|
?LOG(warning, "HTTP request failed with status code: ~p", [StatusCode]),
|
|
ok;
|
|
{error, Reason} ->
|
|
?LOG(error, "HTTP request error: ~p", [Reason]),
|
|
ok
|
|
end.
|
|
|
|
parse_rule([]) ->
|
|
[];
|
|
parse_rule([{Rule, Conf} | Rules]) ->
|
|
Params = emqx_json:decode(iolist_to_binary(Conf)),
|
|
Action = proplists:get_value(<<"action">>, Params),
|
|
Filter = proplists:get_value(<<"topic">>, Params),
|
|
[{list_to_atom(Rule), binary_to_existing_atom(Action, utf8), Filter} | parse_rule(Rules)].
|
|
|
|
with_filter(Fun, _, undefined) ->
|
|
Fun(), ok;
|
|
with_filter(Fun, Topic, Filter) ->
|
|
case emqx_topic:match(Topic, Filter) of
|
|
true -> Fun(), ok;
|
|
false -> ok
|
|
end.
|
|
|
|
with_filter(Fun, _, _, undefined) ->
|
|
Fun();
|
|
with_filter(Fun, Msg, Topic, Filter) ->
|
|
case emqx_topic:match(Topic, Filter) of
|
|
true -> Fun();
|
|
false -> {ok, Msg}
|
|
end.
|
|
|
|
parse_from(Message) ->
|
|
{emqx_message:from(Message), maybe(emqx_message:get_header(username, Message))}.
|
|
|
|
encode_payload(Payload) ->
|
|
encode_payload(Payload, application:get_env(?APP, encoding_of_payload_field, plain)).
|
|
|
|
encode_payload(Payload, base62) -> emqx_base62:encode(Payload);
|
|
encode_payload(Payload, base64) -> base64:encode(Payload);
|
|
encode_payload(Payload, plain) -> Payload.
|
|
|
|
stringfy(Term) when is_binary(Term) ->
|
|
Term;
|
|
stringfy(Term) when is_atom(Term) ->
|
|
atom_to_binary(Term, utf8);
|
|
stringfy(Term) ->
|
|
unicode:characters_to_binary((io_lib:format("~0p", [Term]))).
|
|
|
|
maybe(undefined) -> null;
|
|
maybe(Str) -> Str.
|