chore(plugins): rm emqx-lua-hook plugin
This commit is contained in:
parent
1f7291380b
commit
d0d14f7a02
|
@ -1,19 +0,0 @@
|
|||
deps/
|
||||
ebin/
|
||||
_rel/
|
||||
.erlang.mk/
|
||||
*.d
|
||||
data/
|
||||
*.iml
|
||||
.idea/
|
||||
logs/
|
||||
*.beam
|
||||
.DS_Store
|
||||
erlang.mk
|
||||
_build/
|
||||
rebar.lock
|
||||
rebar3.crashdump
|
||||
bbmustache/
|
||||
*.conf.rendered
|
||||
.rebar3
|
||||
*.swp
|
|
@ -1,338 +0,0 @@
|
|||
|
||||
# emqx-lua-hook
|
||||
|
||||
This plugin makes it possible to write hooks in lua scripts.
|
||||
|
||||
Lua virtual machine is implemented by [luerl](https://github.com/rvirding/luerl) which supports Lua 5.2. Following features may not work properly:
|
||||
* label and goto
|
||||
* tail-call optimisation in return
|
||||
* only limited standard libraries
|
||||
* proper handling of `__metatable`
|
||||
|
||||
For the supported functions, please refer to luerl's [project page](https://github.com/rvirding/luerl).
|
||||
|
||||
Lua scripts are stored in 'data/scripts' directory, and will be loaded automatically. If a script is changed during runtime, it should be reloaded to take effect.
|
||||
|
||||
Each lua script could export several functions binding with emqx hooks, triggered by message publish, topic subscribe, client connect, etc. Different lua scripts may export same type function, binding with a same event. But their order being triggered is not guaranteed.
|
||||
|
||||
To start this plugin, run following command:
|
||||
|
||||
```shell
|
||||
bin/emqx_ctl plugins load emqx_lua_hook
|
||||
```
|
||||
|
||||
|
||||
## NOTE
|
||||
|
||||
* Since lua VM is run on erlang VM, its performance is poor. Please do NOT write long or complicated lua scripts which may degrade entire system.
|
||||
* It's hard to debug lua script in emqx environment. Recommended to unit test your lua script in your host first. If everything is OK, deploy it to emqx 'data/scripts' directory.
|
||||
* Global variable will lost its value for each call. Do NOT use global variable in lua scripts.
|
||||
|
||||
|
||||
# Example
|
||||
|
||||
Suppose your emqx is installed in /emqx, and the lua script directory should be /emqx/data/scripts.
|
||||
|
||||
Make a new file called "test.lua" and put following code into this file:
|
||||
|
||||
```lua
|
||||
function on_message_publish(clientid, username, topic, payload, qos, retain)
|
||||
return topic, "hello", qos, retain
|
||||
end
|
||||
|
||||
function register_hook()
|
||||
return "on_message_publish"
|
||||
end
|
||||
```
|
||||
|
||||
Execute following command to start emq-lua-hook and load scripts in 'data/scripts' directory.
|
||||
|
||||
```
|
||||
/emqx/bin/emqx_ctl plugins load emqx_lua_hook
|
||||
```
|
||||
|
||||
Now let's take a look at what will happend.
|
||||
|
||||
- Start a mqtt client, such as mqtt.fx.
|
||||
- Subscribe a topic="a/b".
|
||||
- Send a message, topic="a/b", payload="123"
|
||||
- Subscriber will get a message with topic="a/b" and payload="hello". test.lua modifies the payload.
|
||||
|
||||
If there are "test1.lua", "test2.lua" and "test3.lua" in /emqx/data/scripts, all these files will be loaded once emq-lua-hook get started.
|
||||
|
||||
If test2.lua has been changed, restart emq-lua-hook to reload all scripts, or execute following command to reload test2.lua only:
|
||||
|
||||
```
|
||||
/emqx/bin/emqx_ctl luahook reload test2.lua
|
||||
```
|
||||
|
||||
|
||||
# Hook API
|
||||
|
||||
You can find all example codes in the `examples.lua` file.
|
||||
|
||||
## on_client_connected
|
||||
|
||||
```lua
|
||||
function on_client_connected(clientId, userName, returncode)
|
||||
return 0
|
||||
end
|
||||
```
|
||||
This API is called after a mqtt client has establish a connection with broker.
|
||||
|
||||
### Input
|
||||
* clientid : a string, mqtt client id.
|
||||
* username : a string mqtt username
|
||||
* returncode : a string, has following values
|
||||
- success : Connection accepted
|
||||
- Others is failed reason
|
||||
|
||||
### Output
|
||||
Needless
|
||||
|
||||
## on_client_disconnected
|
||||
|
||||
```lua
|
||||
function on_client_disconnected(clientId, username, error)
|
||||
return
|
||||
end
|
||||
```
|
||||
This API is called after a mqtt client has disconnected.
|
||||
|
||||
### Input
|
||||
* clientId : a string, mqtt client id.
|
||||
* username : a string mqtt username
|
||||
* error : a string, denote the disconnection reason.
|
||||
|
||||
### Output
|
||||
Needless
|
||||
|
||||
## on_client_subscribe
|
||||
|
||||
```lua
|
||||
function on_client_subscribe(clientId, username, topic)
|
||||
-- do your job here
|
||||
if some_condition then
|
||||
return new_topic
|
||||
else
|
||||
return false
|
||||
end
|
||||
end
|
||||
```
|
||||
This API is called before mqtt engine process client's subscribe command. It is possible to change topic or cancel it.
|
||||
|
||||
### Input
|
||||
* clientid : a string, mqtt client id.
|
||||
* username : a string mqtt username
|
||||
* topic : a string, mqtt message's topic
|
||||
|
||||
### Output
|
||||
* new_topic : a string, change mqtt message's topic
|
||||
* false : cancel subscription
|
||||
|
||||
|
||||
## on_client_unsubscribe
|
||||
|
||||
```lua
|
||||
function on_client_unsubscribe(clientId, username, topic)
|
||||
-- do your job here
|
||||
if some_condition then
|
||||
return new_topic
|
||||
else
|
||||
return false
|
||||
end
|
||||
end
|
||||
```
|
||||
This API is called before mqtt engine process client's unsubscribe command. It is possible to change topic or cancel it.
|
||||
|
||||
### Input
|
||||
* clientid : a string, mqtt client id.
|
||||
* username : a string mqtt username
|
||||
* topic : a string, mqtt message's topic
|
||||
|
||||
### Output
|
||||
* new_topic : a string, change mqtt message's topic
|
||||
* false : cancel unsubscription
|
||||
|
||||
|
||||
## on_session_subscribed
|
||||
|
||||
```lua
|
||||
function on_session_subscribed(ClientId, Username, Topic)
|
||||
return
|
||||
end
|
||||
```
|
||||
This API is called after a subscription has been done.
|
||||
|
||||
### Input
|
||||
* clientid : a string, mqtt client id.
|
||||
* username : a string mqtt username
|
||||
* topic : a string, mqtt's topic filter.
|
||||
|
||||
### Output
|
||||
Needless
|
||||
|
||||
|
||||
## on_session_unsubscribed
|
||||
|
||||
```lua
|
||||
function on_session_unsubscribed(clientid, username, topic)
|
||||
return
|
||||
end
|
||||
```
|
||||
This API is called after a unsubscription has been done.
|
||||
|
||||
### Input
|
||||
* clientid : a string, mqtt client id.
|
||||
* username : a string mqtt username
|
||||
* topic : a string, mqtt's topic filter.
|
||||
|
||||
### Output
|
||||
Needless
|
||||
|
||||
## on_message_delivered
|
||||
|
||||
```lua
|
||||
function on_message_delivered(clientid, username, topic, payload, qos, retain)
|
||||
-- do your job here
|
||||
return topic, payload, qos, retain
|
||||
end
|
||||
```
|
||||
This API is called after a message has been pushed to mqtt clients.
|
||||
|
||||
### Input
|
||||
* clientId : a string, mqtt client id.
|
||||
* username : a string mqtt username
|
||||
* topic : a string, mqtt message's topic
|
||||
* payload : a string, mqtt message's payload
|
||||
* qos : a number, mqtt message's QOS (0, 1, 2)
|
||||
* retain : a boolean, mqtt message's retain flag
|
||||
|
||||
### Output
|
||||
Needless
|
||||
|
||||
## on_message_acked
|
||||
|
||||
```lua
|
||||
function on_message_acked(clientId, username, topic, payload, qos, retain)
|
||||
return
|
||||
end
|
||||
```
|
||||
This API is called after a message has been acknowledged.
|
||||
|
||||
### Input
|
||||
* clientId : a string, mqtt client id.
|
||||
* username : a string mqtt username
|
||||
* topic : a string, mqtt message's topic
|
||||
* payload : a string, mqtt message's payload
|
||||
* qos : a number, mqtt message's QOS (0, 1, 2)
|
||||
* retain : a boolean, mqtt message's retain flag
|
||||
|
||||
### Output
|
||||
Needless
|
||||
|
||||
## on_message_publish
|
||||
|
||||
```lua
|
||||
function on_message_publish(clientid, username, topic, payload, qos, retain)
|
||||
-- do your job here
|
||||
if some_condition then
|
||||
return new_topic, new_payload, new_qos, new_retain
|
||||
else
|
||||
return false
|
||||
end
|
||||
end
|
||||
```
|
||||
This API is called before publishing message into mqtt engine. It's possible to change message or cancel publish in this API.
|
||||
|
||||
### Input
|
||||
* clientid : a string, mqtt client id of publisher.
|
||||
* username : a string, mqtt username of publisher
|
||||
* topic : a string, mqtt message's topic
|
||||
* payload : a string, mqtt message's payload
|
||||
* qos : a number, mqtt message's QOS (0, 1, 2)
|
||||
* retain : a boolean, mqtt message's retain flag
|
||||
|
||||
### Output
|
||||
* new_topic : a string, change mqtt message's topic
|
||||
* new_payload : a string, change mqtt message's payload
|
||||
* new_qos : a number, change mqtt message's QOS
|
||||
* new_retain : a boolean, change mqtt message's retain flag
|
||||
* false : cancel publishing this mqtt message
|
||||
|
||||
## register_hook
|
||||
|
||||
```lua
|
||||
function register_hook()
|
||||
return "hook_name"
|
||||
end
|
||||
|
||||
-- Or register multiple callbacks
|
||||
|
||||
function register_hook()
|
||||
return "hook_name1", "hook_name2", ... , "hook_nameX"
|
||||
end
|
||||
```
|
||||
|
||||
This API exports hook(s) implemented in its lua script.
|
||||
|
||||
### Output
|
||||
* hook_name must be a string, which is equal to the hook API(s) implemented. Possible values:
|
||||
- "on_client_connected"
|
||||
- "on_client_disconnected"
|
||||
- "on_client_subscribe"
|
||||
- "on_client_unsubscribe"
|
||||
- "on_session_subscribed"
|
||||
- "on_session_unsubscribed"
|
||||
- "on_message_delivered"
|
||||
- "on_message_acked"
|
||||
- "on_message_publish"
|
||||
|
||||
# management command
|
||||
|
||||
## load
|
||||
|
||||
```shell
|
||||
emqx_ctl luahook load script_name
|
||||
```
|
||||
This command will load lua file "script_name" in 'data/scripts' directory, into emqx hook.
|
||||
|
||||
## unload
|
||||
|
||||
```shell
|
||||
emqx_ctl luahook unload script_name
|
||||
```
|
||||
This command will unload lua file "script_name" out of emqx hook.
|
||||
|
||||
## reload
|
||||
|
||||
```shell
|
||||
emqx_ctl luahook reload script_name
|
||||
```
|
||||
This command will reload lua file "script_name" in 'data/scripts'. It is useful if a lua script has been modified and apply it immediately.
|
||||
|
||||
## enable
|
||||
|
||||
```shell
|
||||
emqx_ctl luahook enable script_name
|
||||
```
|
||||
This command will rename lua file "script_name.x" to "script_name", and load it immediately.
|
||||
|
||||
## disable
|
||||
|
||||
```shell
|
||||
emqx_ctl luahook disable script_name
|
||||
```
|
||||
This command will unload this script, and rename lua file "script_name" to "script_name.x", which will not be loaded during next boot.
|
||||
|
||||
|
||||
License
|
||||
-------
|
||||
|
||||
Apache License Version 2.0
|
||||
|
||||
Author
|
||||
------
|
||||
|
||||
EMQ X Team.
|
||||
|
|
@ -1,4 +0,0 @@
|
|||
##--------------------------------------------------------------------
|
||||
## EMQ X Lua Hook
|
||||
##--------------------------------------------------------------------
|
||||
|
|
@ -1,71 +0,0 @@
|
|||
--
|
||||
-- Given all funcation names needed register to system
|
||||
--
|
||||
function register_hook()
|
||||
return "on_client_connected",
|
||||
"on_client_disconnected",
|
||||
"on_client_subscribe",
|
||||
"on_client_unsubscribe",
|
||||
"on_session_subscribed",
|
||||
"on_session_unsubscribed",
|
||||
"on_message_delivered",
|
||||
"on_message_acked",
|
||||
"on_message_publish"
|
||||
end
|
||||
|
||||
----------------------------------------------------------------------
|
||||
-- Callback Functions
|
||||
|
||||
function on_client_connected(clientid, username, returncode)
|
||||
print("Lua: on_client_connected - " .. clientid)
|
||||
-- do your job here
|
||||
return
|
||||
end
|
||||
|
||||
function on_client_disconnected(clientid, username, reason)
|
||||
print("Lua: on_client_disconnected - " .. clientid)
|
||||
-- do your job here
|
||||
return
|
||||
end
|
||||
|
||||
function on_client_subscribe(clientid, username, topic)
|
||||
print("Lua: on_client_subscribe - " .. clientid)
|
||||
-- do your job here
|
||||
return topic
|
||||
end
|
||||
|
||||
function on_client_unsubscribe(clientid, username, topic)
|
||||
print("Lua: on_client_unsubscribe - " .. clientid)
|
||||
-- do your job here
|
||||
return topic
|
||||
end
|
||||
|
||||
function on_session_subscribed(clientid, username, topic)
|
||||
print("Lua: on_session_subscribed - " .. clientid)
|
||||
-- do your job here
|
||||
return
|
||||
end
|
||||
|
||||
function on_session_unsubscribed(clientid, username, topic)
|
||||
print("Lua: on_session_unsubscribed - " .. clientid)
|
||||
-- do your job here
|
||||
return
|
||||
end
|
||||
|
||||
function on_message_delivered(clientid, username, topic, payload, qos, retain)
|
||||
print("Lua: on_message_delivered - " .. clientid)
|
||||
-- do your job here
|
||||
return topic, payload, qos, retain
|
||||
end
|
||||
|
||||
function on_message_acked(clientid, username, topic, payload, qos, retain)
|
||||
print("Lua: on_message_acked- " .. clientid)
|
||||
-- do your job here
|
||||
return
|
||||
end
|
||||
|
||||
function on_message_publish(clientid, username, topic, payload, qos, retain)
|
||||
print("Lua: on_message_publish - " .. clientid)
|
||||
-- do your job here
|
||||
return topic, payload, qos, retain
|
||||
end
|
|
@ -1,18 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-define(LOG(Level, Format, Args), emqx_logger:Level("Lua Hook: " ++ Format, Args)).
|
||||
|
|
@ -1 +0,0 @@
|
|||
|
|
@ -1,21 +0,0 @@
|
|||
{deps,
|
||||
[{luerl, {git, "https://github.com/emqx/luerl", {tag, "v0.3.1"}}}
|
||||
]}.
|
||||
|
||||
{edoc_opts, [{preprocess, true}]}.
|
||||
{erl_opts, [warn_unused_vars,
|
||||
warn_shadow_vars,
|
||||
warn_unused_import,
|
||||
warn_obsolete_guard,
|
||||
debug_info,
|
||||
compressed,
|
||||
{parse_transform}
|
||||
]}.
|
||||
{overrides, [{add, [{erl_opts, [compressed]}]}]}.
|
||||
|
||||
{xref_checks, [undefined_function_calls, undefined_functions,
|
||||
locals_not_used, deprecated_function_calls,
|
||||
warnings_as_errors, deprecated_functions]}.
|
||||
{cover_enabled, true}.
|
||||
{cover_opts, [verbose]}.
|
||||
{cover_export_enabled, true}.
|
|
@ -1,14 +0,0 @@
|
|||
{application, emqx_lua_hook,
|
||||
[{description, "EMQ X Lua Hooks"},
|
||||
{vsn, "4.3.0"}, % strict semver, bump manually!
|
||||
{modules, []},
|
||||
{registered, []},
|
||||
{applications, [kernel,stdlib]},
|
||||
{mod, {emqx_lua_hook_app,[]}},
|
||||
{env,[]},
|
||||
{licenses, ["Apache-2.0"]},
|
||||
{maintainers, ["EMQ X Team <contact@emqx.io>"]},
|
||||
{links, [{"Homepage", "https://emqx.io/"},
|
||||
{"Github", "https://github.com/emqx/emqx-lua-hook"}
|
||||
]}
|
||||
]}.
|
|
@ -1,199 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_lua_hook).
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
-include("emqx_lua_hook.hrl").
|
||||
-include_lib("luerl/src/luerl.hrl").
|
||||
|
||||
-export([ start_link/0
|
||||
, stop/0
|
||||
]).
|
||||
|
||||
-export([ load_scripts/0
|
||||
, unload_scripts/0
|
||||
, load_script/1
|
||||
, unload_script/1
|
||||
]).
|
||||
|
||||
-export([ init/1
|
||||
, handle_call/3
|
||||
, handle_cast/2
|
||||
, handle_info/2
|
||||
, terminate/2
|
||||
, code_change/3
|
||||
]).
|
||||
|
||||
-export([lua_dir/0]).
|
||||
|
||||
-define(SERVER, ?MODULE).
|
||||
|
||||
-record(state, {loaded_scripts = []}).
|
||||
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?SERVER}, ?MODULE, {}, []).
|
||||
|
||||
stop() ->
|
||||
gen_server:call(?SERVER, stop).
|
||||
|
||||
load_scripts() ->
|
||||
gen_server:call(?SERVER, load_scripts).
|
||||
|
||||
unload_scripts() ->
|
||||
gen_server:call(?SERVER, unload_scrips).
|
||||
|
||||
load_script(ScriptName) ->
|
||||
gen_server:call(?SERVER, {load_script, ScriptName}).
|
||||
|
||||
unload_script(ScriptName) ->
|
||||
gen_server:call(?SERVER, {unload_script, ScriptName}).
|
||||
|
||||
lua_dir() ->
|
||||
filename:join([emqx:get_env(data_dir, "data"), "scripts"]).
|
||||
|
||||
%%-----------------------------------------------------------------------------
|
||||
%% gen_server callbacks
|
||||
%%-----------------------------------------------------------------------------
|
||||
|
||||
init({}) ->
|
||||
{ok, #state{}}.
|
||||
|
||||
handle_call(stop, _From, State) ->
|
||||
{stop, normal, ok, State};
|
||||
|
||||
handle_call(load_scripts, _From, State) ->
|
||||
{reply, ok, State#state{loaded_scripts = do_loadall()}, hibernate};
|
||||
|
||||
handle_call(unload_scrips, _From, State=#state{loaded_scripts = Scripts}) ->
|
||||
do_unloadall(Scripts),
|
||||
{reply, ok, State#state{loaded_scripts = []}, hibernate};
|
||||
|
||||
handle_call({load_script, ScriptName}, _From, State=#state{loaded_scripts = Scripts}) ->
|
||||
{Ret, NewScripts} = case do_load(ScriptName) of
|
||||
error -> {error, Scripts};
|
||||
{ScriptName, LuaState} ->
|
||||
case lists:member({ScriptName, LuaState}, Scripts) of
|
||||
true -> {ok, Scripts};
|
||||
false -> {ok, lists:append([{ScriptName, LuaState}], Scripts)}
|
||||
end
|
||||
end,
|
||||
{reply, Ret, State#state{loaded_scripts = NewScripts}, hibernate};
|
||||
|
||||
handle_call({unload_script, ScriptName}, _From, State=#state{loaded_scripts = Scripts}) ->
|
||||
case proplists:get_all_values(ScriptName, Scripts) of
|
||||
[] ->
|
||||
{reply, ok, State, hibernate};
|
||||
LuaStates ->
|
||||
lists:foreach(fun(LuaState) ->
|
||||
% Unload first! If this gen_server has been crashed, loaded_scripts will be empty
|
||||
do_unload({ScriptName, LuaState})
|
||||
end, LuaStates),
|
||||
NewScripts = proplists:delete(ScriptName, Scripts),
|
||||
{reply, ok, State#state{loaded_scripts = NewScripts}, hibernate}
|
||||
end;
|
||||
|
||||
handle_call(Request, From, State) ->
|
||||
?LOG(error, "Unknown Request=~p from ~p", [Request, From]),
|
||||
{reply, ignored, State, hibernate}.
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
?LOG(error, "unexpected cast: ~p", [Msg]),
|
||||
{noreply, State, hibernate}.
|
||||
|
||||
handle_info(Info, State) ->
|
||||
?LOG(error, "unexpected info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, #state{loaded_scripts = Scripts}) ->
|
||||
do_unloadall(Scripts),
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%% ------------------------------------------------------------------
|
||||
%% Internal Function Definitions
|
||||
%% ------------------------------------------------------------------
|
||||
|
||||
do_loadall() ->
|
||||
FileList = filelib:wildcard(filename:join([lua_dir(), "*.lua"])),
|
||||
List = [do_load(X) || X <- FileList],
|
||||
[X || X <- List, is_tuple(X)].
|
||||
|
||||
do_load(FileName) ->
|
||||
case catch luerl:dofile(FileName) of
|
||||
{'EXIT', St00} ->
|
||||
?LOG(error, "Failed to load lua script ~p due to error ~p", [FileName, St00]),
|
||||
error;
|
||||
{_Ret, St0=#luerl{}} ->
|
||||
case catch luerl:call_function([register_hook], [], St0) of
|
||||
{'EXIT', St1} ->
|
||||
?LOG(error, "Failed to execute register_hook function in lua script ~p, which has syntax error, St1=~p", [FileName, St1]),
|
||||
error;
|
||||
{Ret1, St1} ->
|
||||
?LOG(debug, "Register lua script ~p", [FileName]),
|
||||
_ = do_register_hooks(Ret1, FileName, St1),
|
||||
{FileName, St1};
|
||||
Other ->
|
||||
?LOG(error, "Failed to load lua script ~p, register_hook() raise exception ~p", [FileName, Other]),
|
||||
error
|
||||
end;
|
||||
Exception ->
|
||||
?LOG(error, "Failed to load lua script ~p with error ~p", [FileName, Exception]),
|
||||
error
|
||||
end.
|
||||
|
||||
do_register(<<"on_message_publish">>, ScriptName, St) ->
|
||||
emqx_lua_script:register_on_message_publish(ScriptName, St);
|
||||
do_register(<<"on_message_delivered">>, ScriptName, St) ->
|
||||
emqx_lua_script:register_on_message_delivered(ScriptName, St);
|
||||
do_register(<<"on_message_acked">>, ScriptName, St) ->
|
||||
emqx_lua_script:register_on_message_acked(ScriptName, St);
|
||||
do_register(<<"on_client_connected">>, ScriptName, St) ->
|
||||
emqx_lua_script:register_on_client_connected(ScriptName, St);
|
||||
do_register(<<"on_client_subscribe">>, ScriptName, St) ->
|
||||
emqx_lua_script:register_on_client_subscribe(ScriptName, St);
|
||||
do_register(<<"on_client_unsubscribe">>, ScriptName, St) ->
|
||||
emqx_lua_script:register_on_client_unsubscribe(ScriptName, St);
|
||||
do_register(<<"on_client_disconnected">>, ScriptName, St) ->
|
||||
emqx_lua_script:register_on_client_disconnected(ScriptName, St);
|
||||
do_register(<<"on_session_subscribed">>, ScriptName, St) ->
|
||||
emqx_lua_script:register_on_session_subscribed(ScriptName, St);
|
||||
do_register(<<"on_client_authenticate">>, ScriptName, St) ->
|
||||
emqx_lua_script:register_on_client_authenticate(ScriptName, St);
|
||||
do_register(<<"on_client_check_acl">>, ScriptName, St) ->
|
||||
emqx_lua_script:register_on_client_check_acl(ScriptName, St);
|
||||
do_register(Hook, ScriptName, _St) ->
|
||||
?LOG(error, "Discard unknown hook ~p ScriptName=~p", [Hook, ScriptName]).
|
||||
|
||||
do_register_hooks([], _ScriptName, _St) ->
|
||||
ok;
|
||||
do_register_hooks([H|T], ScriptName, St) ->
|
||||
_ = do_register(H, ScriptName, St),
|
||||
do_register_hooks(T, ScriptName, St);
|
||||
do_register_hooks(Hook = <<$o, $n, _Rest/binary>>, ScriptName, St) ->
|
||||
do_register(Hook, ScriptName, St);
|
||||
do_register_hooks(Hook, ScriptName, _St) ->
|
||||
?LOG(error, "Discard unknown hook type ~p from ~p", [Hook, ScriptName]).
|
||||
|
||||
do_unloadall(Scripts) ->
|
||||
lists:foreach(fun do_unload/1, Scripts).
|
||||
|
||||
do_unload(Script) ->
|
||||
emqx_lua_script:unregister_hooks(Script),
|
||||
ok.
|
|
@ -1,40 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_lua_hook_app).
|
||||
|
||||
-behaviour(application).
|
||||
|
||||
-emqx_plugin(?MODULE).
|
||||
|
||||
-export([ start/2
|
||||
, stop/1
|
||||
, prep_stop/1
|
||||
]).
|
||||
|
||||
start(_Type, _Args) ->
|
||||
{ok, Sup} = emqx_lua_hook_sup:start_link(),
|
||||
emqx_lua_hook:load_scripts(),
|
||||
emqx_lua_hook_cli:load(),
|
||||
{ok, Sup}.
|
||||
|
||||
prep_stop(State) ->
|
||||
emqx_lua_hook:unload_scripts(),
|
||||
emqx_lua_hook_cli:unload(),
|
||||
State.
|
||||
|
||||
stop(_State) ->
|
||||
ok.
|
|
@ -1,88 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_lua_hook_cli).
|
||||
|
||||
-export([ load/0
|
||||
, cmd/1
|
||||
, unload/0
|
||||
]).
|
||||
|
||||
-include("emqx_lua_hook.hrl").
|
||||
-include_lib("luerl/src/luerl.hrl").
|
||||
|
||||
-define(PRINT(Format, Args), io:format(Format, Args)).
|
||||
-define(PRINT_CMD(Cmd, Descr), io:format("~-48s# ~s~n", [Cmd, Descr])).
|
||||
-define(USAGE(CmdList), [?PRINT_CMD(Cmd, Descr) || {Cmd, Descr} <- CmdList]).
|
||||
|
||||
load() ->
|
||||
emqx_ctl:register_command(luahook, {?MODULE, cmd}, []).
|
||||
|
||||
unload() ->
|
||||
emqx_ctl:unregister_command(luahook).
|
||||
|
||||
cmd(["load", Script]) ->
|
||||
case emqx_lua_hook:load_script(fullname(Script)) of
|
||||
ok -> emqx_ctl:print("Load ~p successfully~n", [Script]);
|
||||
error -> emqx_ctl:print("Load ~p error~n", [Script])
|
||||
end;
|
||||
|
||||
cmd(["reload", Script]) ->
|
||||
FullName = fullname(Script),
|
||||
emqx_lua_hook:unload_script(FullName),
|
||||
case emqx_lua_hook:load_script(FullName) of
|
||||
ok -> emqx_ctl:print("Reload ~p successfully~n", [Script]);
|
||||
error -> emqx_ctl:print("Reload ~p error~n", [Script])
|
||||
end;
|
||||
|
||||
cmd(["unload", Script]) ->
|
||||
emqx_lua_hook:unload_script(fullname(Script)),
|
||||
emqx_ctl:print("Unload ~p successfully~n", [Script]);
|
||||
|
||||
cmd(["enable", Script]) ->
|
||||
FullName = fullname(Script),
|
||||
case file:rename(fullnamedisable(Script), FullName) of
|
||||
ok -> case emqx_lua_hook:load_script(FullName) of
|
||||
ok ->
|
||||
emqx_ctl:print("Enable ~p successfully~n", [Script]);
|
||||
error ->
|
||||
emqx_ctl:print("Fail to enable ~p~n", [Script])
|
||||
end;
|
||||
{error, Reason} ->
|
||||
emqx_ctl:print("Fail to enable ~p due to ~p~n", [Script, Reason])
|
||||
end;
|
||||
|
||||
cmd(["disable", Script]) ->
|
||||
FullName = fullname(Script),
|
||||
emqx_lua_hook:unload_script(FullName),
|
||||
case file:rename(FullName, fullnamedisable(Script)) of
|
||||
ok ->
|
||||
emqx_ctl:print("Disable ~p successfully~n", [Script]);
|
||||
{error, Reason} ->
|
||||
emqx_ctl:print("Fail to disable ~p due to ~p~n", [Script, Reason])
|
||||
end;
|
||||
|
||||
cmd(_) ->
|
||||
emqx_ctl:usage([{"luahook load <Script>", "load lua script into hook"},
|
||||
{"luahook unload <Script>", "unload lua script from hook"},
|
||||
{"luahook reload <Script>", "reload lua script into hook"},
|
||||
{"luahook enable <Script>", "enable lua script and load it into hook"},
|
||||
{"luahook disable <Script>", "unload lua script out of hook and disable it"}]).
|
||||
|
||||
fullname(Script) ->
|
||||
filename:join([emqx_lua_hook:lua_dir(), Script]).
|
||||
fullnamedisable(Script) ->
|
||||
fullname(Script)++".x".
|
|
@ -1,35 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_lua_hook_sup).
|
||||
|
||||
-behaviour(supervisor).
|
||||
|
||||
-export([start_link/0]).
|
||||
-export([init/1]).
|
||||
|
||||
start_link() ->
|
||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||
|
||||
init(_Args) ->
|
||||
{ok, {{one_for_one, 10, 3600},
|
||||
[#{id => lua_hook,
|
||||
start => {emqx_lua_hook, start_link, []},
|
||||
restart => permanent,
|
||||
shutdown => 5000,
|
||||
type => worker,
|
||||
modules => [emqx_lua_hook]}]}}.
|
||||
|
|
@ -1,342 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_lua_script).
|
||||
|
||||
-include("emqx_lua_hook.hrl").
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||
|
||||
-export([ register_on_message_publish/2
|
||||
, register_on_client_connected/2
|
||||
, register_on_client_disconnected/2
|
||||
, register_on_client_subscribe/2
|
||||
, register_on_client_unsubscribe/2
|
||||
, register_on_message_acked/2
|
||||
, register_on_message_delivered/2
|
||||
, register_on_session_subscribed/2
|
||||
, register_on_session_unsubscribed/2
|
||||
, register_on_client_authenticate/2
|
||||
, register_on_client_check_acl/2
|
||||
, unregister_hooks/1
|
||||
]).
|
||||
|
||||
-export([ on_client_connected/4
|
||||
, on_client_disconnected/5
|
||||
, on_client_authenticate/4
|
||||
, on_client_check_acl/6
|
||||
, on_client_subscribe/5
|
||||
, on_client_unsubscribe/5
|
||||
, on_session_subscribed/5
|
||||
, on_session_unsubscribed/5
|
||||
, on_message_publish/3
|
||||
, on_message_delivered/4
|
||||
, on_message_acked/4
|
||||
]).
|
||||
|
||||
-define(EMPTY_USERNAME, <<"">>).
|
||||
|
||||
-define(HOOK_ADD(A, B), emqx:hook(A, B)).
|
||||
-define(HOOK_DEL(A, B), emqx:unhook(A, B)).
|
||||
|
||||
register_on_client_connected(ScriptName, LuaState) ->
|
||||
?HOOK_ADD('client.connected', {?MODULE, on_client_connected, [ScriptName, LuaState]}).
|
||||
|
||||
register_on_client_disconnected(ScriptName, LuaState) ->
|
||||
?HOOK_ADD('client.disconnected', {?MODULE, on_client_disconnected, [ScriptName, LuaState]}).
|
||||
|
||||
register_on_client_authenticate(ScriptName, LuaState) ->
|
||||
?HOOK_ADD('client.authenticate', {?MODULE, on_client_authenticate, [ScriptName, LuaState]}).
|
||||
|
||||
register_on_client_check_acl(ScriptName, LuaState) ->
|
||||
?HOOK_ADD('client.check_acl', {?MODULE, on_client_check_acl, [ScriptName, LuaState]}).
|
||||
|
||||
register_on_client_subscribe(ScriptName, LuaState) ->
|
||||
?HOOK_ADD('client.subscribe', {?MODULE, on_client_subscribe, [ScriptName, LuaState]}).
|
||||
|
||||
register_on_client_unsubscribe(ScriptName, LuaState) ->
|
||||
?HOOK_ADD('client.unsubscribe', {?MODULE, on_client_unsubscribe, [ScriptName, LuaState]}).
|
||||
|
||||
register_on_session_subscribed(ScriptName, LuaState) ->
|
||||
?HOOK_ADD('session.subscribed', {?MODULE, on_session_subscribed, [ScriptName, LuaState]}).
|
||||
|
||||
register_on_session_unsubscribed(ScriptName, LuaState) ->
|
||||
?HOOK_ADD('session.unsubscribed', {?MODULE, on_session_unsubscribed, [ScriptName, LuaState]}).
|
||||
|
||||
register_on_message_publish(ScriptName, LuaState) ->
|
||||
?HOOK_ADD('message.publish', {?MODULE, on_message_publish, [ScriptName, LuaState]}).
|
||||
|
||||
register_on_message_delivered(ScriptName, LuaState) ->
|
||||
?HOOK_ADD('message.delivered', {?MODULE, on_message_delivered, [ScriptName, LuaState]}).
|
||||
|
||||
register_on_message_acked(ScriptName, LuaState) ->
|
||||
?HOOK_ADD('message.acked', {?MODULE, on_message_acked, [ScriptName, LuaState]}).
|
||||
|
||||
unregister_hooks({ScriptName, LuaState}) ->
|
||||
?HOOK_DEL('client.connected', {?MODULE, on_client_connected, [ScriptName, LuaState]}),
|
||||
?HOOK_DEL('client.disconnected', {?MODULE, on_client_disconnected, [ScriptName, LuaState]}),
|
||||
?HOOK_DEL('client.authenticate', {?MODULE, on_client_authenticate, [ScriptName, LuaState]}),
|
||||
?HOOK_DEL('client.check_acl', {?MODULE, on_client_check_acl, [ScriptName, LuaState]}),
|
||||
?HOOK_DEL('client.subscribe', {?MODULE, on_client_subscribe, [ScriptName, LuaState]}),
|
||||
?HOOK_DEL('client.unsubscribe', {?MODULE, on_client_unsubscribe, [ScriptName, LuaState]}),
|
||||
?HOOK_DEL('session.subscribed', {?MODULE, on_session_subscribed, [ScriptName, LuaState]}),
|
||||
?HOOK_DEL('session.unsubscribed', {?MODULE, on_session_unsubscribed, [ScriptName, LuaState]}),
|
||||
?HOOK_DEL('message.publish', {?MODULE, on_message_publish, [ScriptName, LuaState]}),
|
||||
?HOOK_DEL('message.delivered', {?MODULE, on_message_delivered, [ScriptName, LuaState]}),
|
||||
?HOOK_DEL('message.acked', {?MODULE, on_message_acked, [ScriptName, LuaState]}).
|
||||
|
||||
on_client_connected(ClientInfo = #{clientid := ClientId, username := Username},
|
||||
ConnInfo, _ScriptName, LuaState) ->
|
||||
?LOG(debug, "Client(~s) connected, ClientInfo:~n~p~n, ConnInfo:~n~p~n",
|
||||
[ClientId, ClientInfo, ConnInfo]),
|
||||
case catch luerl:call_function([on_client_connected], [ClientId, Username], LuaState) of
|
||||
{'EXIT', St} ->
|
||||
?LOG(error, "Failed to execute function on_client_connected(), which has syntax error, St=~p", [St]),
|
||||
ok;
|
||||
{_Result, _St} ->
|
||||
ok;
|
||||
Other ->
|
||||
?LOG(error, "Lua function on_client_connected() caught exception, ~p", [Other]),
|
||||
ok
|
||||
end.
|
||||
|
||||
on_client_disconnected(ClientInfo = #{clientid := ClientId, username := Username},
|
||||
ReasonCode, ConnInfo, _ScriptName, LuaState) ->
|
||||
?LOG(debug, "Client(~s) disconnected due to ~p, ClientInfo:~n~p~n, ConnInfo:~n~p~n",
|
||||
[ClientId, ReasonCode, ClientInfo, ConnInfo]),
|
||||
case catch luerl:call_function([on_client_disconnected], [ClientId, Username, ReasonCode], LuaState) of
|
||||
{'EXIT', St} ->
|
||||
?LOG(error, "Failed to execute function on_client_disconnected(), which has syntax error, St=~p", [St]),
|
||||
ok;
|
||||
{_Result, _St} ->
|
||||
ok;
|
||||
Other ->
|
||||
?LOG(error, "Lua function on_client_disconnected() caught exception, ~p", [Other]),
|
||||
ok
|
||||
end.
|
||||
|
||||
on_client_authenticate(#{clientid := ClientId,
|
||||
username := Username,
|
||||
peerhost := Peerhost,
|
||||
password := Password}, Result, _ScriptName, LuaState) ->
|
||||
case catch luerl:call_function([on_client_authenticate],
|
||||
[ClientId, Username, inet:ntoa(Peerhost), Password], LuaState) of
|
||||
{'EXIT', St} ->
|
||||
?LOG(error, "Failed to execute function on_client_authenticate(), which has syntax error, St=~p", [St]),
|
||||
ok;
|
||||
{[<<"ignore">>], _St} ->
|
||||
ok;
|
||||
{[<<"ok">>], _St} ->
|
||||
{stop, Result#{auth_result => success}};
|
||||
Other ->
|
||||
?LOG(error, "Lua function on_client_authenticate() caught exception, ~p", [Other]),
|
||||
ok
|
||||
end.
|
||||
|
||||
on_client_check_acl(#{clientid := ClientId,
|
||||
username := Username,
|
||||
peerhost := Peerhost,
|
||||
password := Password}, Topic, PubSub, _Result, _ScriptName, LuaState) ->
|
||||
case catch luerl:call_function([on_client_check_acl], [ClientId, Username, inet:ntoa(Peerhost), Password, Topic, PubSub], LuaState) of
|
||||
{'EXIT', St} ->
|
||||
?LOG(error, "Failed to execute function on_client_check_acl(), which has syntax error, St=~p", [St]),
|
||||
ok;
|
||||
{[<<"ignore">>],_St} ->
|
||||
ok;
|
||||
{[<<"allow">>], _St} ->
|
||||
{stop, allow};
|
||||
{[<<"deny">>], _St} ->
|
||||
{stop, deny};
|
||||
Other ->
|
||||
?LOG(error, "Lua function on_client_check_acl() caught exception, ~p", [Other]),
|
||||
ok
|
||||
end.
|
||||
|
||||
on_client_subscribe(#{clientid := ClientId, username := Username}, _Properties, TopicFilters, _ScriptName, LuaState) ->
|
||||
NewTopicFilters =
|
||||
lists:foldr(fun(TopicFilter, Acc) ->
|
||||
case on_client_subscribe_single(ClientId, Username, TopicFilter, LuaState) of
|
||||
false -> Acc;
|
||||
NewTopicFilter -> [NewTopicFilter | Acc]
|
||||
end
|
||||
end, [], TopicFilters),
|
||||
case NewTopicFilters of
|
||||
[] -> stop;
|
||||
_ -> {ok, NewTopicFilters}
|
||||
end.
|
||||
|
||||
on_client_subscribe_single(_ClientId, _Username, TopicFilter = {<<$$, _Rest/binary>>, _SubOpts}, _LuaState) ->
|
||||
%% ignore topics starting with $
|
||||
TopicFilter;
|
||||
on_client_subscribe_single(ClientId, Username, TopicFilter = {Topic, SubOpts}, LuaState) ->
|
||||
?LOG(debug, "hook client(~s/~s) will subscribe: ~p~n", [ClientId, Username, Topic]),
|
||||
case catch luerl:call_function([on_client_subscribe], [ClientId, Username, Topic], LuaState) of
|
||||
{'EXIT', St} ->
|
||||
?LOG(error, "Failed to execute function on_client_subscribe(), which has syntax error, St=~p", [St]),
|
||||
TopicFilter;
|
||||
{[false], _St} ->
|
||||
false; % cancel this topic's subscription
|
||||
{[NewTopic], _St} ->
|
||||
?LOG(debug, "LUA function on_client_subscribe() return ~p", [NewTopic]),
|
||||
{NewTopic, SubOpts}; % modify topic
|
||||
Other ->
|
||||
?LOG(error, "Lua function on_client_subscribe() caught exception, ~p", [Other]),
|
||||
TopicFilter
|
||||
end.
|
||||
|
||||
on_client_unsubscribe(#{clientid := ClientId, username := Username}, _Properties, TopicFilters, _ScriptName, LuaState) ->
|
||||
NewTopicFilters =
|
||||
lists:foldr(fun(TopicFilter, Acc) ->
|
||||
case on_client_unsubscribe_single(ClientId, Username, TopicFilter, LuaState) of
|
||||
false -> Acc;
|
||||
NewTopicFilter -> [NewTopicFilter | Acc]
|
||||
end
|
||||
end, [], TopicFilters),
|
||||
case NewTopicFilters of
|
||||
[] -> stop;
|
||||
_ -> {ok, NewTopicFilters}
|
||||
end.
|
||||
|
||||
on_client_unsubscribe_single(_ClientId, _Username, TopicFilter = {<<$$, _Rest/binary>>, _SubOpts}, _LuaState) ->
|
||||
%% ignore topics starting with $
|
||||
TopicFilter;
|
||||
on_client_unsubscribe_single(ClientId, Username, TopicFilter = {Topic, SubOpts}, LuaState) ->
|
||||
?LOG(debug, "hook client(~s/~s) unsubscribe ~p~n", [ClientId, Username, Topic]),
|
||||
case catch luerl:call_function([on_client_unsubscribe], [ClientId, Username, Topic], LuaState) of
|
||||
{'EXIT', St} ->
|
||||
?LOG(error, "Failed to execute function on_client_unsubscribe(), which has syntax error, St=~p", [St]),
|
||||
TopicFilter;
|
||||
{[false], _St} ->
|
||||
false; % cancel this topic's unsubscription
|
||||
{[NewTopic], _} ->
|
||||
?LOG(debug, "Lua function on_client_unsubscribe() return ~p", [NewTopic]),
|
||||
{NewTopic, SubOpts}; % modify topic
|
||||
Other ->
|
||||
?LOG(error, "Topic=~p, lua function on_client_unsubscribe() caught exception, ~p", [Topic, Other]),
|
||||
TopicFilter
|
||||
end.
|
||||
|
||||
on_session_subscribed(#{}, <<$$, _Rest/binary>>, _SubOpts, _ScriptName, _LuaState) ->
|
||||
%% ignore topics starting with $
|
||||
ok;
|
||||
on_session_subscribed(#{clientid := ClientId, username := Username},
|
||||
Topic, SubOpts, _ScriptName, LuaState) ->
|
||||
?LOG(debug, "Session(~s/s) subscribed ~s with subopts: ~p~n", [ClientId, Username, Topic, SubOpts]),
|
||||
case catch luerl:call_function([on_session_subscribed], [ClientId, Username, Topic], LuaState) of
|
||||
{'EXIT', St} ->
|
||||
?LOG(error, "Failed to execute function on_session_subscribed(), which has syntax error, St=~p", [St]),
|
||||
ok;
|
||||
{_Result, _St} ->
|
||||
ok;
|
||||
Other ->
|
||||
?LOG(error, "Topic=~p, lua function on_session_subscribed() caught exception, ~p", [Topic, Other]),
|
||||
ok
|
||||
end.
|
||||
|
||||
on_session_unsubscribed(#{}, <<$$, _Rest/binary>>, _SubOpts, _ScriptName, _LuaState) ->
|
||||
%% ignore topics starting with $
|
||||
ok;
|
||||
on_session_unsubscribed(#{clientid := ClientId, username := Username},
|
||||
Topic, _SubOpts, _ScriptName, LuaState) ->
|
||||
?LOG(debug, "Session(~s/~s) unsubscribed ~s~n", [ClientId, Username, Topic]),
|
||||
case catch luerl:call_function([on_session_unsubscribed], [ClientId, Username, Topic], LuaState) of
|
||||
{'EXIT', St} ->
|
||||
?LOG(error, "Failed to execute function on_session_unsubscribed(), which has syntax error, St=~p", [St]),
|
||||
ok;
|
||||
{_Result, _St} ->
|
||||
ok;
|
||||
Other ->
|
||||
?LOG(error, "Topic=~p, lua function on_session_unsubscribed() caught exception, ~p", [Topic, Other]),
|
||||
ok
|
||||
end.
|
||||
|
||||
on_message_publish(Message = #message{topic = <<$$, _Rest/binary>>}, _ScriptName, _LuaState) ->
|
||||
%% ignore topics starting with $
|
||||
{ok, Message};
|
||||
on_message_publish(Message = #message{from = ClientId,
|
||||
qos = QoS,
|
||||
flags = Flags = #{retain := Retain},
|
||||
topic = Topic,
|
||||
payload = Payload,
|
||||
headers = Headers},
|
||||
_ScriptName, LuaState) ->
|
||||
Username = maps:get(username, Headers, ?EMPTY_USERNAME),
|
||||
?LOG(debug, "Publish ~s~n", [emqx_message:format(Message)]),
|
||||
case catch luerl:call_function([on_message_publish], [ClientId, Username, Topic, Payload, QoS, Retain], LuaState) of
|
||||
{'EXIT', St} ->
|
||||
?LOG(error, "Failed to execute function on_message_publish(), which has syntax error, St=~p", [St]),
|
||||
{ok, Message};
|
||||
{[false], _St} ->
|
||||
{stop, Message};
|
||||
{[NewTopic, NewPayload, NewQos, NewRetain], _St} ->
|
||||
?LOG(debug, "Lua function on_message_publish() return ~p", [{NewTopic, NewPayload, NewQos, NewRetain}]),
|
||||
{ok, Message#message{topic = NewTopic, payload = NewPayload,
|
||||
qos = round(NewQos), flags = Flags#{retain => to_retain(NewRetain)}}};
|
||||
Other ->
|
||||
?LOG(error, "Topic=~p, lua function on_message_publish caught exception, ~p", [Topic, Other]),
|
||||
{ok, Message}
|
||||
end.
|
||||
|
||||
on_message_delivered(#{}, #message{topic = <<$$, _Rest/binary>>}, _ScriptName, _LuaState) ->
|
||||
%% ignore topics starting with $
|
||||
ok;
|
||||
on_message_delivered(#{clientid := ClientId, username := Username},
|
||||
Message = #message{topic = Topic, payload = Payload, qos = QoS, flags = Flags = #{retain := Retain}},
|
||||
_ScriptName, LuaState) ->
|
||||
?LOG(debug, "Message delivered to client(~s): ~s~n",
|
||||
[ClientId, emqx_message:format(Message)]),
|
||||
case catch luerl:call_function([on_message_delivered], [ClientId, Username, Topic, Payload, QoS, Retain], LuaState) of
|
||||
{'EXIT', St} ->
|
||||
?LOG(error, "Failed to execute function on_message_delivered(), which has syntax error, St=~p", [St]),
|
||||
ok;
|
||||
{[false], _St} ->
|
||||
ok;
|
||||
{[NewTopic, NewPayload, NewQos, NewRetain], _St} ->
|
||||
{ok, Message#message{topic = NewTopic, payload = NewPayload,
|
||||
qos = round(NewQos), flags = Flags#{retain => to_retain(NewRetain)}}};
|
||||
Other ->
|
||||
?LOG(error, "Topic=~p, lua function on_message_delivered() caught exception, ~p", [Topic, Other]),
|
||||
ok
|
||||
end.
|
||||
|
||||
on_message_acked(#{}, #message{topic = <<$$, _Rest/binary>>}, _ScriptName, _LuaState) ->
|
||||
%% ignore topics starting with $
|
||||
ok;
|
||||
on_message_acked(#{clientid := ClientId, username := Username},
|
||||
Message = #message{topic = Topic, payload = Payload, qos = QoS, flags = #{retain := Retain}}, _ScriptName, LuaState) ->
|
||||
?LOG(debug, "Message acked by client(~s): ~s~n",
|
||||
[ClientId, emqx_message:format(Message)]),
|
||||
case catch luerl:call_function([on_message_acked], [ClientId, Username, Topic, Payload, QoS, Retain], LuaState) of
|
||||
{'EXIT', St} ->
|
||||
?LOG(error, "Failed to execute function on_message_acked(), which has syntax error, St=~p", [St]),
|
||||
ok;
|
||||
{_Result, _St} ->
|
||||
ok;
|
||||
Other ->
|
||||
?LOG(error, "Topic=~p, lua function on_message_acked() caught exception, ~p", [Topic, Other]),
|
||||
ok
|
||||
end.
|
||||
|
||||
to_retain(0) -> false;
|
||||
to_retain(1) -> true;
|
||||
to_retain("true") -> true;
|
||||
to_retain("false") -> false;
|
||||
to_retain(<<"true">>) -> true;
|
||||
to_retain(<<"false">>) -> false;
|
||||
to_retain(true) -> true;
|
||||
to_retain(false) -> false;
|
||||
to_retain(Num) when is_float(Num) ->
|
||||
case round(Num) of 0 -> false; _ -> true end.
|
|
@ -1,693 +0,0 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_lua_hook_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
all() ->
|
||||
[case01, case02, case03, case04,
|
||||
case11, case12, case13,
|
||||
case21, case22,
|
||||
case31, case32,
|
||||
case41, case42, case43,
|
||||
case51, case52, case53,
|
||||
case61, case62,
|
||||
case71, case72, case73,
|
||||
case81, case82, case83,
|
||||
case101,
|
||||
case110, case111, case112, case113, case114, case115,
|
||||
case201, case202, case203, case204, case205,
|
||||
case301, case302
|
||||
].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:start_apps([emqx_lua_hook], fun set_special_configs/1),
|
||||
Config.
|
||||
|
||||
end_per_suite(Config) ->
|
||||
emqx_ct_helpers:stop_apps([emqx_lua_hook]),
|
||||
Config.
|
||||
|
||||
set_special_configs(emqx) ->
|
||||
application:set_env(emqx, modules, []);
|
||||
set_special_configs(_App) ->
|
||||
ok.
|
||||
|
||||
init_per_testcase(_, Config) ->
|
||||
ok = filelib:ensure_dir(filename:join([emqx_lua_hook:lua_dir(), "a"])),
|
||||
emqx_lua_hook:start_link(),
|
||||
Config.
|
||||
|
||||
end_per_testcase(_, _Config) ->
|
||||
emqx_lua_hook:stop(),
|
||||
AllScripts = filelib:wildcard(filename:join([emqx_lua_hook:lua_dir(), "*"])),
|
||||
[file:delete(Filename) || Filename <- AllScripts].
|
||||
|
||||
case01(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_message_publish(ClientId, Username, topic, payload, qos, retain)"
|
||||
"\n return topic, \"hello\", qos, retain"
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_message_publish\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
|
||||
Msg = #message{from = <<"myclient">>, qos = 2, flags = #{retain => true}, topic = <<"a/b/c">>, payload = <<"123">>, headers = #{username => <<"tester">>}},
|
||||
Ret = emqx_hooks:run_fold('message.publish',[], Msg),
|
||||
?assertEqual(Msg#message{payload = <<"hello">>}, Ret).
|
||||
|
||||
case02(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_message_publish(clientid, username, topic, payload, qos, retain)"
|
||||
"\n return false" % return false to stop hook
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_message_publish\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
|
||||
Msg = #message{from = <<"myclient">>, qos = 2, flags = #{retain => true}, topic = <<"a/b/c">>, payload = <<"123">>, headers = #{username => <<"tester">>}},
|
||||
Ret = emqx_hooks:run_fold('message.publish',[], Msg),
|
||||
?assertEqual(Msg, Ret).
|
||||
|
||||
case03(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_message_publish(clientid, username, topic, payload, qos, retain)"
|
||||
"\n return 9/0" % this code has fatal error
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_message_publish\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
|
||||
Msg = #message{from = <<"myclient">>, qos = 2, flags = #{retain => true}, topic = <<"a/b/c">>, payload = <<"123">>, headers = #{username => <<"tester">>}},
|
||||
Ret = emqx_hooks:run_fold('message.publish',[], Msg),
|
||||
?assertEqual(Msg, Ret).
|
||||
|
||||
case04(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_message_publish(clientid, username, topic, payload, qos, retain)"
|
||||
"\n if clientid == \"broker\" then"
|
||||
"\n return topic, \"hello broker\", qos, retain"
|
||||
"\n else"
|
||||
"\n return false" % return false to stop hook
|
||||
"\n end"
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_message_publish\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
|
||||
Msg = #message{from = <<"broker">>, qos = 2, flags = #{retain => true}, topic = <<"a/b/c">>, payload = <<"123">>, headers = #{username => <<"tester">>}},
|
||||
Ret = emqx_hooks:run_fold('message.publish',[], Msg),
|
||||
?assertEqual(Msg#message{payload = <<"hello broker">>}, Ret).
|
||||
|
||||
case11(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_message_delivered(clientid, username, topic, payload, qos, retain)"
|
||||
"\n return false"
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_message_delivered\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
|
||||
Msg = #message{qos = 2, flags = #{retain => true}, topic = <<"a/b/c">>, payload = <<"123">>, headers = #{}},
|
||||
Ret = emqx_hooks:run_fold('message.delivered', [#{clientid => <<"myclient">>, username => <<"myuser">>}], Msg),
|
||||
?assertEqual(Msg, Ret),
|
||||
ok = file:delete(ScriptName).
|
||||
|
||||
case12(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_message_delivered(clientid, username, topic, payload, qos, retain)"
|
||||
"\n return topic, \"hello broker\", qos, retain"
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_message_delivered\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
|
||||
Msg = #message{qos = 2, flags = #{retain => true}, topic = <<"a/b/c">>, payload = <<"123">>, headers = #{}},
|
||||
Ret = emqx_hooks:run_fold('message.delivered', [#{clientid => <<"myclient">>, username => <<"myuser">>}], Msg),
|
||||
?assertEqual(Msg#message{payload = <<"hello broker">>}, Ret).
|
||||
|
||||
case13(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_message_delivered(clientid, username, topic, payload, qos, retain)"
|
||||
"\n return 9/0" % this code has fatal error
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_message_delivered\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
|
||||
Msg = #message{qos = 2, flags = #{retain => true}, topic = <<"a/b/c">>, payload = <<"123">>, headers = #{}},
|
||||
Ret = emqx_hooks:run_fold('message.delivered', [#{clientid => <<"myclient">>, username => <<"myuser">>}], Msg),
|
||||
?assertEqual(Msg, Ret).
|
||||
|
||||
case21(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_message_acked(clientid, username, topic, payload, qos, retain)"
|
||||
"\n return true"
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_message_acked\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
|
||||
Msg = #message{qos = 2, flags = #{retain => true}, topic = <<"a/b/c">>, payload = <<"123">>, headers = #{}},
|
||||
Ret = emqx_hooks:run('message.acked', [#{clientid => <<"myclient">>, username => <<"myuser">>}, Msg]),
|
||||
?assertEqual(ok, Ret).
|
||||
|
||||
case22(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_message_acked(clientid, username, topic, payload, qos, retain)"
|
||||
"\n return 9/0" % this code has fatal error
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_message_acked\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
|
||||
Msg = #message{qos = 2, flags = #{retain => true}, topic = <<"a/b/c">>, payload = <<"123">>, headers = #{}},
|
||||
Ret = emqx_hooks:run('message.acked', [#{clientid => <<"myclient">>, username => <<"myuser">>}, Msg]),
|
||||
?assertEqual(ok, Ret),
|
||||
ok = file:delete(ScriptName).
|
||||
|
||||
case31(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_client_connected(clientid, username)"
|
||||
"\n return 0"
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_client_connected\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
?assertEqual(ok,
|
||||
emqx_hooks:run('client.connected',
|
||||
[#{clientid => <<"myclient">>, username => <<"tester">>}, #{}])).
|
||||
|
||||
case32(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_client_connected(clientid, username)"
|
||||
"\n return 9/0" % this code has fatal error
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_client_connected\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
?assertEqual(ok,
|
||||
emqx_hooks:run('client.connected',
|
||||
[#{clientid => <<"myclient">>, username => <<"tester">>}, #{}])).
|
||||
|
||||
case41(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_client_subscribe(clientid, username, topic)"
|
||||
"\n if topic == \"a/b/c\" then"
|
||||
"\n topic = \"a1/b1/c1\";"
|
||||
"\n end"
|
||||
"\n return topic"
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_client_subscribe\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
TopicTable = [{<<"a/b/c">>, [qos, 1]}, {<<"d/+/e">>, [{qos, 2}]}],
|
||||
Ret = emqx_hooks:run_fold('client.subscribe',[#{clientid => <<"myclient">>, username => <<"myuser">>}, #{}], TopicTable),
|
||||
?assertEqual([{<<"a1/b1/c1">>, [qos, 1]}, {<<"d/+/e">>, [{qos, 2}]}], Ret).
|
||||
|
||||
case42(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_client_subscribe(clientid, username, topic)"
|
||||
"\n return false" % return false to stop hook
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_client_subscribe\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
TopicTable = [{<<"a/b/c">>, [qos, 1]}, {<<"d/+/e">>, [{qos, 2}]}],
|
||||
Ret = emqx_hooks:run_fold('client.subscribe',[#{clientid => <<"myclient">>, username => <<"myuser">>}, #{}], TopicTable),
|
||||
?assertEqual(TopicTable, Ret).
|
||||
|
||||
case43(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_client_subscribe(clientid, username, topic)"
|
||||
"\n return 9/0" % this code has fatal error
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_client_subscribe\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
|
||||
TopicTable = [{<<"a/b/c">>, [qos, 1]}, {<<"d/+/e">>, [{qos, 2}]}],
|
||||
Ret = emqx_hooks:run_fold('client.subscribe',[#{clientid => <<"myclient">>, username => <<"myuser">>}, #{}], TopicTable),
|
||||
?assertEqual(TopicTable, Ret).
|
||||
|
||||
case51(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_client_unsubscribe(clientid, username, topic)"
|
||||
"\n if topic == \"a/b/c\" then"
|
||||
"\n topic = \"a1/b1/c1\";"
|
||||
"\n end"
|
||||
"\n return topic"
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_client_unsubscribe\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
TopicTable = [{<<"a/b/c">>, [qos, 1]}, {<<"d/+/e">>, [{qos, 2}]}],
|
||||
Ret = emqx_hooks:run_fold('client.unsubscribe',[#{clientid => <<"myclient">>, username => <<"myuser">>}, #{}], TopicTable),
|
||||
?assertEqual([{<<"a1/b1/c1">>, [qos, 1]}, {<<"d/+/e">>, [{qos, 2}]}], Ret).
|
||||
|
||||
case52(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_client_unsubscribe(clientid, username, topic)"
|
||||
"\n return false" % return false to stop hook
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_client_unsubscribe\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
|
||||
TopicTable = [{<<"a/b/c">>, [qos, 1]}, {<<"d/+/e">>, [{qos, 2}]}],
|
||||
Ret = emqx_hooks:run_fold('client.unsubscribe',[#{clientid => <<"myclient">>, username => <<"myuser">>}, #{}], TopicTable),
|
||||
?assertEqual(TopicTable, Ret).
|
||||
|
||||
case53(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_client_unsubscribe(clientid, username, topic)"
|
||||
"\n return 9/0" % this code has fatal error
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_client_unsubscribe\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
|
||||
TopicTable = [{<<"a/b/c">>, [qos, 1]}, {<<"d/+/e">>, [{qos, 2}]}],
|
||||
Ret = emqx_hooks:run_fold('client.unsubscribe',[#{clientid => <<"myclient">>, username => <<"myuser">>}, #{}], TopicTable),
|
||||
?assertEqual(TopicTable, Ret).
|
||||
|
||||
case61(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_client_disconnected(clientid, username, reasoncode)"
|
||||
"\n return 0"
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_client_disconnected\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
|
||||
?assertEqual(ok,
|
||||
emqx_hooks:run('client.disconnected',
|
||||
[#{clientid => <<"myclient">>, username => <<"tester">>}, 0])).
|
||||
|
||||
case62(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_client_disconnected(clientid, username, reasoncode)"
|
||||
"\n return 9/0" % this code has fatal error
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_client_disconnected\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
|
||||
?assertEqual(ok,
|
||||
emqx_hooks:run('client.disconnected',
|
||||
[#{clientid => <<"myclient">>, username => <<"tester">>}, 0])).
|
||||
|
||||
case71(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_session_subscribed(clientid, username, topic)"
|
||||
"\n return 0"
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_session_subscribed\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
|
||||
Topic = <<"a/b/c">>,
|
||||
Ret = emqx_hooks:run('session.subscribed',[#{clientid => <<"myclient">>, username => <<"myuser">>}, Topic, #{first => false}]),
|
||||
?assertEqual(ok, Ret).
|
||||
|
||||
case72(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_session_subscribed(clientid, username, topic)"
|
||||
"\n return false" % return false to stop hook
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_session_subscribed\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
|
||||
Topic = <<"a/b/c">>,
|
||||
Ret = emqx_hooks:run('session.subscribed',[#{clientid => <<"myclient">>, username => <<"myuser">>}, Topic, #{first => false}]),
|
||||
?assertEqual(ok, Ret).
|
||||
|
||||
case73(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_session_subscribed(clientid, username, topic)"
|
||||
"\n return 9/0" % this code has fatal error
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_session_subscribed\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
|
||||
Topic = <<"a/b/c">>,
|
||||
Ret = emqx_hooks:run('session.subscribed',[#{clientid => <<"myclient">>, username => <<"myuser">>}, Topic, #{first => false}]),
|
||||
?assertEqual(ok, Ret).
|
||||
|
||||
case81(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_session_unsubscribed(clientid, username, topic)"
|
||||
"\n return 0"
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_session_unsubscribed\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
|
||||
Topic = <<"a/b/c">>,
|
||||
Ret = emqx_hooks:run('session.unsubscribed',[#{clientid => <<"myclient">>, username => <<"myuser">>}, Topic, #{first => false}]),
|
||||
?assertEqual(ok, Ret).
|
||||
|
||||
case82(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_session_unsubscribed(clientid, username, topic)"
|
||||
"\n return false" % return false to stop hook
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_session_unsubscribed\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
|
||||
Topic = <<"a/b/c">>,
|
||||
Ret = emqx_hooks:run('session.unsubscribed',[#{clientid => <<"myclient">>, username => <<"myuser">>}, Topic, #{first => false}]),
|
||||
?assertEqual(ok, Ret).
|
||||
|
||||
case83(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_session_unsubscribed(clientid, username, topic)"
|
||||
"\n return 9/0" % this code has fatal error
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_session_unsubscribed\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
|
||||
Topic = <<"a/b/c">>,
|
||||
Ret = emqx_hooks:run('session.unsubscribed',[#{clientid => <<"myclient">>, username => <<"myuser">>}, Topic, #{first => false}]),
|
||||
?assertEqual(ok, Ret).
|
||||
|
||||
case101(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
ScriptName2 = filename:join([emqx_lua_hook:lua_dir(), "mn.lua"]),
|
||||
Code = "function on_message_publish(clientid, username, topic, payload, qos, retain)"
|
||||
"\n return topic, \"hello\", qos, retain"
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_message_publish\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code),
|
||||
|
||||
Code2 = "function on_client_subscribe(clientid, username, topic)"
|
||||
"\n if topic == \"a/b/c\" then"
|
||||
"\n topic = \"a1/b1/c1\";"
|
||||
"\n end"
|
||||
"\n return topic"
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_client_subscribe\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName2, Code2), ok = emqx_lua_hook:load_scripts(),
|
||||
|
||||
Ret = emqx_hooks:run_fold('message.publish',[], #message{qos = 2, flags = #{retain => true}, topic = <<"a/b/c">>, payload = <<"123">>, headers = #{}}),
|
||||
?assertEqual(#message{qos = 2, flags = #{retain => true}, topic = <<"a/b/c">>, payload = <<"hello">>, headers = #{}}, Ret),
|
||||
|
||||
TopicTable = [{<<"a/b/c">>, [qos, 1]}, {<<"d/+/e">>, [{qos, 2}]}],
|
||||
Ret2 = emqx_hooks:run_fold('client.subscribe',[#{clientid => <<"myclient">>, username => <<"myuser">>}, #{}], TopicTable),
|
||||
?assertEqual([{<<"a1/b1/c1">>, [qos, 1]}, {<<"d/+/e">>, [{qos, 2}]}], Ret2).
|
||||
|
||||
case110(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_message_publish(clientid, username, topic, payload, qos, retain)"
|
||||
"\n return \"changed/topic\", \"hello\", qos, retain"
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_message_publish\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
|
||||
Msg = #message{qos = 2, flags = #{retain => true}, topic = <<"a/b/c">>, payload = <<"123">>, headers = #{}},
|
||||
Ret = emqx_hooks:run_fold('message.publish',[], Msg),
|
||||
?assertEqual(Msg#message{topic = <<"changed/topic">>, payload = <<"hello">>}, Ret).
|
||||
|
||||
case111(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = " function on_message_publish(topic, payload, qos, retain)"
|
||||
"\n return \"changed/topic\", \"hello\", qos, retain"
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_message_publish\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
emqx_ctl:run_command(["luahook", "unload", ScriptName]),
|
||||
|
||||
Msg = #message{qos = 2, flags = #{retain => true}, topic = <<"a/b/c">>, payload = <<"123">>, headers = #{}},
|
||||
Ret = emqx_hooks:run_fold('message.publish',[], Msg),
|
||||
?assertEqual(Msg, Ret).
|
||||
|
||||
case112(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = " function on_message_publish(clientid, username, topic, payload, qos, retain)"
|
||||
"\n return \"changed/topic\", \"hello\", qos, retain"
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_message_publish\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
|
||||
emqx_ctl:run_command(["luahook", "unload", "abc.lua"]),
|
||||
timer:sleep(100),
|
||||
emqx_ctl:run_command(["luahook", "load", "abc.lua"]),
|
||||
|
||||
Msg = #message{qos = 2, flags = #{retain => true}, topic = <<"a/b/c">>, payload = <<"123">>, headers = #{}},
|
||||
Ret = emqx_hooks:run_fold('message.publish',[], Msg),
|
||||
?assertEqual(Msg#message{topic = <<"changed/topic">>, payload = <<"hello">>}, Ret).
|
||||
|
||||
case113(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
ScriptDisabled = ScriptName ++ ".x",
|
||||
Code = "function on_message_publish(clientid, username, topic, payload, qos, retain)"
|
||||
"\n return \"changed/topic\", \"hello\", qos, retain"
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_message_publish\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code),
|
||||
file:delete(ScriptDisabled),
|
||||
emqx_ctl:run_command(["luahook", "disable", "abc.lua"]), % this command will rename "abc.lua" to "abc.lua.x"
|
||||
|
||||
Msg = #message{qos = 2, flags = #{retain => true}, topic = <<"a/b/c">>, payload = <<"123">>, headers = #{}},
|
||||
Ret = emqx_hooks:run_fold('message.publish',[], Msg),
|
||||
?assertEqual(Msg, Ret),
|
||||
true = filelib:is_file(ScriptDisabled).
|
||||
|
||||
case114(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua.x"]), % disabled script
|
||||
Code = "function on_message_publish(clientid, username, topic, payload, qos, retain)"
|
||||
"\n return \"changed/topic\", \"hello\", qos, retain"
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_message_publish\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
emqx_ctl:run_command(["luahook", "enable", "abc.lua"]),
|
||||
|
||||
Msg = #message{qos = 2, flags = #{retain => true}, topic = <<"a/b/c">>, payload = <<"123">>, headers = #{}},
|
||||
Ret = emqx_hooks:run_fold('message.publish',[], Msg),
|
||||
?assertEqual(Msg#message{topic = <<"changed/topic">>, payload = <<"hello">>}, Ret).
|
||||
|
||||
case115(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_message_publish(clientid, username, topic, payload, qos, retain)"
|
||||
"\n return \"changed/topic\", \"hello\", qos, retain"
|
||||
"\nend"
|
||||
"\n"
|
||||
"function on_client_subscribe(ClientId, Username, Topic)"
|
||||
"\n return \"play/football\""
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_message_publish\", \"on_client_subscribe\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
emqx_ctl:run_command(["luahook", "reload", "abc.lua"]),
|
||||
|
||||
Msg = #message{qos = 2, flags = #{retain => true}, topic = <<"a/b/c">>, payload = <<"123">>, headers = #{}},
|
||||
Ret = emqx_hooks:run_fold('message.publish',[], Msg),
|
||||
?assertEqual(Msg#message{topic = <<"changed/topic">>, payload = <<"hello">>}, Ret),
|
||||
|
||||
TopicTable = [{<<"d/+/e">>, [{qos, 2}]}],
|
||||
Ret2 = emqx_hooks:run_fold('client.subscribe',[#{clientid => <<"myclient">>, username => <<"myuser">>}, #{}], TopicTable),
|
||||
?assertEqual([{<<"play/football">>, [{qos, 2}]}], Ret2).
|
||||
|
||||
case201(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_session_subscribed(clientid, username, topic)"
|
||||
"\n return 0"
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction on_session_subscribed1()" % register_hook() is missing
|
||||
"\n return \"on_session_subscribed\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
|
||||
Topic = <<"a/b/c">>,
|
||||
Ret = emqx_hooks:run('session.subscribed',[#{clientid => <<"myclient">>, username => <<"myuser">>}, Topic, #{first => false}]),
|
||||
?assertEqual(ok, Ret).
|
||||
|
||||
case202(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function abc(clientid, username, topic)"
|
||||
"\n return 0"
|
||||
"\nend"
|
||||
"\n"
|
||||
"\n9/0", % error code
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
Topic = <<"a/b/c">>,
|
||||
Ret = emqx_hooks:run('session.subscribed',[#{clientid => <<"myclient">>, username => <<"myuser">>}, Topic, #{first => false}]),
|
||||
?assertEqual(ok, Ret).
|
||||
|
||||
case203(_Config) ->
|
||||
file:del_dir(emqx_lua_hook:lua_dir()), % if this dir is not exist, what will happen?
|
||||
emqx_lua_hook:load_scripts(),
|
||||
|
||||
Topic = <<"a/b/c">>,
|
||||
Ret = emqx_hooks:run('session.subscribed',[#{clientid => <<"myclient">>, username => <<"myuser">>}, Topic, #{first => false}]),
|
||||
?assertEqual(ok, Ret).
|
||||
|
||||
case204(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_message_publish(clientid, username, topic, payload, qos, retain)"
|
||||
"\n return topic, payload .. \"_Z\", qos, retain"
|
||||
"\nend"
|
||||
"\n"
|
||||
"function on_client_subscribe(ClientId, Username, Topic)"
|
||||
"\n return \"play/football\""
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_message_publish\", \"on_client_subscribe\", \"on_message_publish\"" % if 2 on_message_publish() are registered, what will happend?
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
|
||||
Msg = #message{qos = 2, flags = #{retain => true}, topic = <<"a/b/c">>, payload = <<"123">>, headers = #{}},
|
||||
Ret = emqx_hooks:run_fold('message.publish',[], Msg),
|
||||
?assertEqual(Msg#message{payload = <<"123_Z">>}, Ret).
|
||||
|
||||
case205(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_message_publish(clientid, username, topic, payload, qos, retain)"
|
||||
"\n return topic, \"hello\", qos, retain"
|
||||
"\nend_with_error" %% syntax error
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_message_publish\", \"on_client_subscribe\", \"on_message_publish\"" % if 2 on_message_publish() are registered, what will happend?
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
|
||||
Msg = #message{qos = 2, flags = #{retain => true}, topic = <<"a/b/c">>, payload = <<"123">>, headers = #{}},
|
||||
Ret = emqx_hooks:run_fold('message.publish',[], Msg),
|
||||
?assertEqual(Msg, Ret).
|
||||
|
||||
case301(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_client_authenticate(clientid, username, peerhost, password)"
|
||||
"\n return \"ok\""
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_client_authenticate\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
|
||||
ClientInfo = #{clientid => undefined,
|
||||
username => <<"test">>,
|
||||
peerhost => {127, 0, 0, 1},
|
||||
password => <<"mqtt">>
|
||||
},
|
||||
Result = #{auth_result => success, anonymous => true},
|
||||
?assertEqual(Result#{auth_result => success},
|
||||
emqx_hooks:run_fold('client.authenticate', [ClientInfo], Result)).
|
||||
|
||||
case302(_Config) ->
|
||||
ScriptName = filename:join([emqx_lua_hook:lua_dir(), "abc.lua"]),
|
||||
Code = "function on_client_check_acl(clientid, username, peerhost, password, topic, pubsub)"
|
||||
"\n return \"allow\""
|
||||
"\nend"
|
||||
"\n"
|
||||
"\nfunction register_hook()"
|
||||
"\n return \"on_client_check_acl\""
|
||||
"\nend",
|
||||
ok = file:write_file(ScriptName, Code), ok = emqx_lua_hook:load_scripts(),
|
||||
ClientInfo = #{clientid => undefined,
|
||||
username => <<"test">>,
|
||||
peerhost => {127, 0, 0, 1},
|
||||
password => <<"mqtt">>
|
||||
},
|
||||
?assertEqual(allow, emqx_hooks:run_fold('client.check_acl',
|
||||
[ClientInfo, publish, <<"mytopic">>], deny)).
|
|
@ -263,8 +263,7 @@ relx_apps(ReleaseType) ->
|
|||
++ [{N, load} || N <- relx_plugin_apps(ReleaseType)].
|
||||
|
||||
relx_apps_per_rel(cloud) ->
|
||||
[ luerl
|
||||
, xmerl
|
||||
[ xmerl
|
||||
| [{observer, load} || is_app(observer)]
|
||||
];
|
||||
relx_apps_per_rel(edge) ->
|
||||
|
@ -297,7 +296,6 @@ relx_plugin_apps(ReleaseType) ->
|
|||
|
||||
relx_plugin_apps_per_rel(cloud) ->
|
||||
[ emqx_lwm2m
|
||||
, emqx_lua_hook
|
||||
, emqx_exhook
|
||||
, emqx_exproto
|
||||
, emqx_prometheus
|
||||
|
|
Loading…
Reference in New Issue