emqx/apps/emqx_lua_hook/src/emqx_lua_script.erl

347 lines
16 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_lua_script).
-include("emqx_lua_hook.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-export([ register_on_message_publish/2
, register_on_client_connected/2
, register_on_client_disconnected/2
, register_on_client_subscribe/2
, register_on_client_unsubscribe/2
, register_on_message_acked/2
, register_on_message_delivered/2
, register_on_session_subscribed/2
, register_on_session_unsubscribed/2
, register_on_client_authenticate/2
, register_on_client_check_acl/2
, unregister_hooks/1
]).
-export([ on_client_connected/4
, on_client_disconnected/5
, on_client_authenticate/4
, on_client_check_acl/6
, on_client_subscribe/5
, on_client_unsubscribe/5
, on_session_subscribed/5
, on_session_unsubscribed/5
, on_message_publish/3
, on_message_delivered/4
, on_message_acked/4
]).
-define(EMPTY_USERNAME, <<"">>).
-define(HOOK_ADD(A, B), emqx:hook(A, B)).
-define(HOOK_DEL(A, B), emqx:unhook(A, B)).
register_on_client_connected(ScriptName, LuaState) ->
?HOOK_ADD('client.connected', {?MODULE, on_client_connected, [ScriptName, LuaState]}).
register_on_client_disconnected(ScriptName, LuaState) ->
?HOOK_ADD('client.disconnected', {?MODULE, on_client_disconnected, [ScriptName, LuaState]}).
register_on_client_authenticate(ScriptName, LuaState) ->
?HOOK_ADD('client.authenticate', {?MODULE, on_client_authenticate, [ScriptName, LuaState]}).
register_on_client_check_acl(ScriptName, LuaState) ->
?HOOK_ADD('client.check_acl', {?MODULE, on_client_check_acl, [ScriptName, LuaState]}).
register_on_client_subscribe(ScriptName, LuaState) ->
?HOOK_ADD('client.subscribe', {?MODULE, on_client_subscribe, [ScriptName, LuaState]}).
register_on_client_unsubscribe(ScriptName, LuaState) ->
?HOOK_ADD('client.unsubscribe', {?MODULE, on_client_unsubscribe, [ScriptName, LuaState]}).
register_on_session_subscribed(ScriptName, LuaState) ->
?HOOK_ADD('session.subscribed', {?MODULE, on_session_subscribed, [ScriptName, LuaState]}).
register_on_session_unsubscribed(ScriptName, LuaState) ->
?HOOK_ADD('session.unsubscribed', {?MODULE, on_session_unsubscribed, [ScriptName, LuaState]}).
register_on_message_publish(ScriptName, LuaState) ->
?HOOK_ADD('message.publish', {?MODULE, on_message_publish, [ScriptName, LuaState]}).
register_on_message_delivered(ScriptName, LuaState) ->
?HOOK_ADD('message.delivered', {?MODULE, on_message_delivered, [ScriptName, LuaState]}).
register_on_message_acked(ScriptName, LuaState) ->
?HOOK_ADD('message.acked', {?MODULE, on_message_acked, [ScriptName, LuaState]}).
unregister_hooks({ScriptName, LuaState}) ->
?HOOK_DEL('client.connected', {?MODULE, on_client_connected, [ScriptName, LuaState]}),
?HOOK_DEL('client.disconnected', {?MODULE, on_client_disconnected, [ScriptName, LuaState]}),
?HOOK_DEL('client.authenticate', {?MODULE, on_client_authenticate, [ScriptName, LuaState]}),
?HOOK_DEL('client.check_acl', {?MODULE, on_client_check_acl, [ScriptName, LuaState]}),
?HOOK_DEL('client.subscribe', {?MODULE, on_client_subscribe, [ScriptName, LuaState]}),
?HOOK_DEL('client.unsubscribe', {?MODULE, on_client_unsubscribe, [ScriptName, LuaState]}),
?HOOK_DEL('session.subscribed', {?MODULE, on_session_subscribed, [ScriptName, LuaState]}),
?HOOK_DEL('session.unsubscribed', {?MODULE, on_session_unsubscribed, [ScriptName, LuaState]}),
?HOOK_DEL('message.publish', {?MODULE, on_message_publish, [ScriptName, LuaState]}),
?HOOK_DEL('message.delivered', {?MODULE, on_message_delivered, [ScriptName, LuaState]}),
?HOOK_DEL('message.acked', {?MODULE, on_message_acked, [ScriptName, LuaState]}).
on_client_connected(ClientInfo = #{clientid := ClientId, username := Username},
ConnInfo, _ScriptName, LuaState) ->
?LOG(debug, "Client(~s) connected, ClientInfo:~n~p~n, ConnInfo:~n~p~n",
[ClientId, ClientInfo, ConnInfo]),
case catch luerl:call_function([on_client_connected], [ClientId, Username], LuaState) of
{'EXIT', St} ->
?LOG(error, "Failed to execute function on_client_connected(), which has syntax error, St=~p", [St]),
ok;
{_Result, _St} ->
ok;
Other ->
?LOG(error, "Lua function on_client_connected() caught exception, ~p", [Other]),
ok
end.
on_client_disconnected(ClientInfo = #{clientid := ClientId, username := Username},
ReasonCode, ConnInfo, _ScriptName, LuaState) ->
?LOG(debug, "Client(~s) disconnected due to ~p, ClientInfo:~n~p~n, ConnInfo:~n~p~n",
[ClientId, ReasonCode, ClientInfo, ConnInfo]),
case catch luerl:call_function([on_client_disconnected], [ClientId, Username, ReasonCode], LuaState) of
{'EXIT', St} ->
?LOG(error, "Failed to execute function on_client_disconnected(), which has syntax error, St=~p", [St]),
ok;
{_Result, _St} ->
ok;
Other ->
?LOG(error, "Lua function on_client_disconnected() caught exception, ~p", [Other]),
ok
end.
on_client_authenticate(#{clientid := ClientId,
username := Username,
peerhost := Peerhost,
password := Password}, Result, _ScriptName, LuaState) ->
case catch luerl:call_function([on_client_authenticate],
[ClientId, Username, inet:ntoa(Peerhost), Password], LuaState) of
{'EXIT', St} ->
?LOG(error, "Failed to execute function on_client_authenticate(), which has syntax error, St=~p", [St]),
ok;
{[<<"ignore">>], _St} ->
ok;
{[<<"ok">>], _St} ->
{stop, Result#{auth_result => success}};
Other ->
?LOG(error, "Lua function on_client_authenticate() caught exception, ~p", [Other]),
ok
end.
on_client_check_acl(#{clientid := ClientId,
username := Username,
peerhost := Peerhost,
password := Password}, Topic, PubSub, _Result, _ScriptName, LuaState) ->
case catch luerl:call_function([on_client_check_acl], [ClientId, Username, inet:ntoa(Peerhost), Password, Topic, PubSub], LuaState) of
{'EXIT', St} ->
?LOG(error, "Failed to execute function on_client_check_acl(), which has syntax error, St=~p", [St]),
ok;
{[<<"ignore">>],_St} ->
ok;
{[<<"allow">>], _St} ->
{stop, allow};
{[<<"deny">>], _St} ->
{stop, deny};
Other ->
?LOG(error, "Lua function on_client_check_acl() caught exception, ~p", [Other]),
ok
end.
on_client_subscribe(#{clientid := ClientId, username := Username}, _Properties, TopicFilters, _ScriptName, LuaState) ->
NewTopicFilters =
lists:foldr(fun(TopicFilter, Acc) ->
case on_client_subscribe_single(ClientId, Username, TopicFilter, LuaState) of
false ->
{Topic, Opts} = TopicFilter,
[{Topic, Opts#{deny_subscription => true}} | Acc];
NewTopicFilter ->
[NewTopicFilter | Acc]
end
end, [], TopicFilters),
case NewTopicFilters of
[] -> stop;
_ -> {ok, NewTopicFilters}
end.
on_client_subscribe_single(_ClientId, _Username, TopicFilter = {<<$$, _Rest/binary>>, _SubOpts}, _LuaState) ->
%% ignore topics starting with $
TopicFilter;
on_client_subscribe_single(ClientId, Username, TopicFilter = {Topic, SubOpts}, LuaState) ->
?LOG(debug, "hook client(~s/~s) will subscribe: ~p~n", [ClientId, Username, Topic]),
case catch luerl:call_function([on_client_subscribe], [ClientId, Username, Topic], LuaState) of
{'EXIT', St} ->
?LOG(error, "Failed to execute function on_client_subscribe(), which has syntax error, St=~p", [St]),
TopicFilter;
{[false], _St} ->
false; % cancel this topic's subscription
{[NewTopic], _St} ->
?LOG(debug, "LUA function on_client_subscribe() return ~p", [NewTopic]),
{NewTopic, SubOpts}; % modify topic
Other ->
?LOG(error, "Lua function on_client_subscribe() caught exception, ~p", [Other]),
TopicFilter
end.
on_client_unsubscribe(#{clientid := ClientId, username := Username}, _Properties, TopicFilters, _ScriptName, LuaState) ->
NewTopicFilters =
lists:foldr(fun(TopicFilter, Acc) ->
case on_client_unsubscribe_single(ClientId, Username, TopicFilter, LuaState) of
false -> Acc;
NewTopicFilter -> [NewTopicFilter | Acc]
end
end, [], TopicFilters),
case NewTopicFilters of
[] -> stop;
_ -> {ok, NewTopicFilters}
end.
on_client_unsubscribe_single(_ClientId, _Username, TopicFilter = {<<$$, _Rest/binary>>, _SubOpts}, _LuaState) ->
%% ignore topics starting with $
TopicFilter;
on_client_unsubscribe_single(ClientId, Username, TopicFilter = {Topic, SubOpts}, LuaState) ->
?LOG(debug, "hook client(~s/~s) unsubscribe ~p~n", [ClientId, Username, Topic]),
case catch luerl:call_function([on_client_unsubscribe], [ClientId, Username, Topic], LuaState) of
{'EXIT', St} ->
?LOG(error, "Failed to execute function on_client_unsubscribe(), which has syntax error, St=~p", [St]),
TopicFilter;
{[false], _St} ->
false; % cancel this topic's unsubscription
{[NewTopic], _} ->
?LOG(debug, "Lua function on_client_unsubscribe() return ~p", [NewTopic]),
{NewTopic, SubOpts}; % modify topic
Other ->
?LOG(error, "Topic=~p, lua function on_client_unsubscribe() caught exception, ~p", [Topic, Other]),
TopicFilter
end.
on_session_subscribed(#{}, <<$$, _Rest/binary>>, _SubOpts, _ScriptName, _LuaState) ->
%% ignore topics starting with $
ok;
on_session_subscribed(#{clientid := ClientId, username := Username},
Topic, SubOpts, _ScriptName, LuaState) ->
?LOG(debug, "Session(~s/s) subscribed ~s with subopts: ~p~n", [ClientId, Username, Topic, SubOpts]),
case catch luerl:call_function([on_session_subscribed], [ClientId, Username, Topic], LuaState) of
{'EXIT', St} ->
?LOG(error, "Failed to execute function on_session_subscribed(), which has syntax error, St=~p", [St]),
ok;
{_Result, _St} ->
ok;
Other ->
?LOG(error, "Topic=~p, lua function on_session_subscribed() caught exception, ~p", [Topic, Other]),
ok
end.
on_session_unsubscribed(#{}, <<$$, _Rest/binary>>, _SubOpts, _ScriptName, _LuaState) ->
%% ignore topics starting with $
ok;
on_session_unsubscribed(#{clientid := ClientId, username := Username},
Topic, _SubOpts, _ScriptName, LuaState) ->
?LOG(debug, "Session(~s/~s) unsubscribed ~s~n", [ClientId, Username, Topic]),
case catch luerl:call_function([on_session_unsubscribed], [ClientId, Username, Topic], LuaState) of
{'EXIT', St} ->
?LOG(error, "Failed to execute function on_session_unsubscribed(), which has syntax error, St=~p", [St]),
ok;
{_Result, _St} ->
ok;
Other ->
?LOG(error, "Topic=~p, lua function on_session_unsubscribed() caught exception, ~p", [Topic, Other]),
ok
end.
on_message_publish(Message = #message{topic = <<$$, _Rest/binary>>}, _ScriptName, _LuaState) ->
%% ignore topics starting with $
{ok, Message};
on_message_publish(Message = #message{from = ClientId,
qos = QoS,
flags = Flags = #{retain := Retain},
topic = Topic,
payload = Payload,
headers = Headers},
_ScriptName, LuaState) ->
Username = maps:get(username, Headers, ?EMPTY_USERNAME),
?LOG(debug, "Publish ~s~n", [emqx_message:format(Message)]),
case catch luerl:call_function([on_message_publish], [ClientId, Username, Topic, Payload, QoS, Retain], LuaState) of
{'EXIT', St} ->
?LOG(error, "Failed to execute function on_message_publish(), which has syntax error, St=~p", [St]),
{ok, Message};
{[false], _St} ->
?LOG(debug, "Lua function on_message_publish() returned false, setting allow_publish header to false", []),
{stop, Message#message{headers = Headers#{allow_publish => false}}};
{[NewTopic, NewPayload, NewQos, NewRetain], _St} ->
?LOG(debug, "Lua function on_message_publish() returned ~p", [{NewTopic, NewPayload, NewQos, NewRetain}]),
{ok, Message#message{topic = NewTopic, payload = NewPayload,
qos = round(NewQos), flags = Flags#{retain => to_retain(NewRetain)}}};
Other ->
?LOG(error, "Topic=~p, lua function on_message_publish() caught exception, ~p", [Topic, Other]),
{ok, Message}
end.
on_message_delivered(#{}, #message{topic = <<$$, _Rest/binary>>}, _ScriptName, _LuaState) ->
%% ignore topics starting with $
ok;
on_message_delivered(#{clientid := ClientId, username := Username},
Message = #message{topic = Topic, payload = Payload, qos = QoS, flags = Flags = #{retain := Retain}},
_ScriptName, LuaState) ->
?LOG(debug, "Message delivered to client(~s): ~s~n",
[ClientId, emqx_message:format(Message)]),
case catch luerl:call_function([on_message_delivered], [ClientId, Username, Topic, Payload, QoS, Retain], LuaState) of
{'EXIT', St} ->
?LOG(error, "Failed to execute function on_message_delivered(), which has syntax error, St=~p", [St]),
ok;
{[false], _St} ->
ok;
{[NewTopic, NewPayload, NewQos, NewRetain], _St} ->
{ok, Message#message{topic = NewTopic, payload = NewPayload,
qos = round(NewQos), flags = Flags#{retain => to_retain(NewRetain)}}};
Other ->
?LOG(error, "Topic=~p, lua function on_message_delivered() caught exception, ~p", [Topic, Other]),
ok
end.
on_message_acked(#{}, #message{topic = <<$$, _Rest/binary>>}, _ScriptName, _LuaState) ->
%% ignore topics starting with $
ok;
on_message_acked(#{clientid := ClientId, username := Username},
Message = #message{topic = Topic, payload = Payload, qos = QoS, flags = #{retain := Retain}}, _ScriptName, LuaState) ->
?LOG(debug, "Message acked by client(~s): ~s~n",
[ClientId, emqx_message:format(Message)]),
case catch luerl:call_function([on_message_acked], [ClientId, Username, Topic, Payload, QoS, Retain], LuaState) of
{'EXIT', St} ->
?LOG(error, "Failed to execute function on_message_acked(), which has syntax error, St=~p", [St]),
ok;
{_Result, _St} ->
ok;
Other ->
?LOG(error, "Topic=~p, lua function on_message_acked() caught exception, ~p", [Topic, Other]),
ok
end.
to_retain(0) -> false;
to_retain(1) -> true;
to_retain("true") -> true;
to_retain("false") -> false;
to_retain(<<"true">>) -> true;
to_retain(<<"false">>) -> false;
to_retain(true) -> true;
to_retain(false) -> false;
to_retain(Num) when is_float(Num) ->
case round(Num) of 0 -> false; _ -> true end.