refactor(exhook): adapt to the hocon schmea

This commit is contained in:
JianBo He 2021-07-31 11:29:25 +08:00
parent 58b39361b3
commit 879c191e41
25 changed files with 1353 additions and 1011 deletions

29
apps/emqx_exhook/.gitignore vendored Normal file
View File

@ -0,0 +1,29 @@
.rebar3
_*
.eunit
*.o
*.beam
*.plt
*.swp
*.swo
.erlang.cookie
ebin
log
erl_crash.dump
.rebar
logs
_build
.idea
*.iml
rebar3.crashdump
*~
rebar.lock
data/
*.conf.rendered
*.pyc
.DS_Store
*.class
Mnesia.nonode@nohost/
src/emqx_exhook_pb.erl
src/emqx_exhook_v_1_hook_provider_client.erl
src/emqx_exhook_v_1_hook_provider_bhvr.erl

View File

@ -0,0 +1,116 @@
# 设计
## 动机
在 EMQ X Broker v4.1-v4.2 中,我们发布了 2 个插件来扩展 emqx 的编程能力:
1. `emqx-extension-hook` 提供了使用 Java, Python 向 Broker 挂载钩子的功能
2. `emqx-exproto` 提供了使用 JavaPython 编写用户自定义协议接入插件的功能
但在后续的支持中发现许多难以处理的问题:
1. 有大量的编程语言需要支持,需要编写和维护如 Go, JavaScript, Lua.. 等语言的驱动。
2. `erlport` 使用的操作系统的管道进行通信,这让用户代码只能部署在和 emqx 同一个操作系统上。部署方式受到了极大的限制。
3. 用户程序的启动参数直接打包到 Broker 中,导致用户开发无法实时的进行调试,单步跟踪等。
4. `erlport` 会占用 `stdin` `stdout`
因此,我们计划重构这部分的实现,其中主要的内容是:
1. 使用 `gRPC` 替换 `erlport`
2. 将 `emqx-extension-hook` 重命名为 `emqx-exhook`
旧版本的设计:[emqx-extension-hook design in v4.2.0](https://github.com/emqx/emqx-exhook/blob/v4.2.0/docs/design.md)
## 设计
架构如下:
```
EMQ X
+========================+ +========+==========+
| ExHook | | | |
| +----------------+ | gRPC | gRPC | User's |
| | gRPC Client | ------------------> | Server | Codes |
| +----------------+ | (HTTP/2) | | |
| | | | |
+========================+ +========+==========+
```
`emqx-exhook` 通过 gRPC 的方式向用户部署的 gRPC 服务发送钩子的请求,并处理其返回的值。
和 emqx 原生的钩子一致emqx-exhook 也按照链式的方式执行:
<img src="https://docs.emqx.net/broker/latest/cn/advanced/assets/chain_of_responsiblity.png" style="zoom:50%;" />
### gRPC 服务示例
用户需要实现的方法,和数据类型的定义在 `priv/protos/exhook.proto` 文件中:
```protobuff
syntax = "proto3";
package emqx.exhook.v1;
service HookProvider {
rpc OnProviderLoaded(ProviderLoadedRequest) returns (LoadedResponse) {};
rpc OnProviderUnloaded(ProviderUnloadedRequest) returns (EmptySuccess) {};
rpc OnClientConnect(ClientConnectRequest) returns (EmptySuccess) {};
rpc OnClientConnack(ClientConnackRequest) returns (EmptySuccess) {};
rpc OnClientConnected(ClientConnectedRequest) returns (EmptySuccess) {};
rpc OnClientDisconnected(ClientDisconnectedRequest) returns (EmptySuccess) {};
rpc OnClientAuthenticate(ClientAuthenticateRequest) returns (ValuedResponse) {};
rpc OnClientCheckAcl(ClientCheckAclRequest) returns (ValuedResponse) {};
rpc OnClientSubscribe(ClientSubscribeRequest) returns (EmptySuccess) {};
rpc OnClientUnsubscribe(ClientUnsubscribeRequest) returns (EmptySuccess) {};
rpc OnSessionCreated(SessionCreatedRequest) returns (EmptySuccess) {};
rpc OnSessionSubscribed(SessionSubscribedRequest) returns (EmptySuccess) {};
rpc OnSessionUnsubscribed(SessionUnsubscribedRequest) returns (EmptySuccess) {};
rpc OnSessionResumed(SessionResumedRequest) returns (EmptySuccess) {};
rpc OnSessionDiscarded(SessionDiscardedRequest) returns (EmptySuccess) {};
rpc OnSessionTakeovered(SessionTakeoveredRequest) returns (EmptySuccess) {};
rpc OnSessionTerminated(SessionTerminatedRequest) returns (EmptySuccess) {};
rpc OnMessagePublish(MessagePublishRequest) returns (ValuedResponse) {};
rpc OnMessageDelivered(MessageDeliveredRequest) returns (EmptySuccess) {};
rpc OnMessageDropped(MessageDroppedRequest) returns (EmptySuccess) {};
rpc OnMessageAcked(MessageAckedRequest) returns (EmptySuccess) {};
}
```
### 配置文件示例
```
## 配置 gRPC 服务地址 (HTTP)
##
## s1 为服务器的名称
exhook.server.s1.url = http://127.0.0.1:9001
## 配置 gRPC 服务地址 (HTTPS)
##
## s2 为服务器名称
exhook.server.s2.url = https://127.0.0.1:9002
exhook.server.s2.cacertfile = ca.pem
exhook.server.s2.certfile = cert.pem
exhook.server.s2.keyfile = key.pem
```

View File

@ -0,0 +1,14 @@
##====================================================================
## EMQ X Hooks
##====================================================================
exhook: {
server.default: {
url: "http://127.0.0.1:9000"
#ssl: {
# cacertfile: "{{ platform_etc_dir }}/certs/cacert.pem"
# certfile: "{{ platform_etc_dir }}/certs/cert.pem"
# keyfile: "{{ platform_etc_dir }}/certs/key.pem"
#}
}
}

View File

@ -25,7 +25,7 @@
, {'client.connected', {emqx_exhook_handler, on_client_connected, []}}
, {'client.disconnected', {emqx_exhook_handler, on_client_disconnected, []}}
, {'client.authenticate', {emqx_exhook_handler, on_client_authenticate, []}}
, {'client.authorize', {emqx_exhook_handler, on_client_authorize, []}}
, {'client.check_acl', {emqx_exhook_handler, on_client_check_acl, []}}
, {'client.subscribe', {emqx_exhook_handler, on_client_subscribe, []}}
, {'client.unsubscribe', {emqx_exhook_handler, on_client_unsubscribe, []}}
, {'session.created', {emqx_exhook_handler, on_session_created, []}}

View File

@ -0,0 +1,38 @@
%%-*- mode: erlang -*-
{mapping, "exhook.server.$name.url", "emqx_exhook.servers", [
{datatype, string}
]}.
{mapping, "exhook.server.$name.ssl.cacertfile", "emqx_exhook.servers", [
{datatype, string}
]}.
{mapping, "exhook.server.$name.ssl.certfile", "emqx_exhook.servers", [
{datatype, string}
]}.
{mapping, "exhook.server.$name.ssl.keyfile", "emqx_exhook.servers", [
{datatype, string}
]}.
{translation, "emqx_exhook.servers", fun(Conf) ->
Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end,
ServerOptions = fun(Prefix) ->
case http_uri:parse(cuttlefish:conf_get(Prefix ++ ".url", Conf)) of
{ok, {http, _, Host, Port, _, _}} ->
[{scheme, http}, {host, Host}, {port, Port}];
{ok, {https, _, Host, Port, _, _}} ->
[{scheme, https}, {host, Host}, {port, Port},
{ssl_options,
Filter([{ssl, true},
{certfile, cuttlefish:conf_get(Prefix ++ ".ssl.certfile", Conf, undefined)},
{keyfile, cuttlefish:conf_get(Prefix ++ ".ssl.keyfile", Conf, undefined)},
{cacertfile, cuttlefish:conf_get(Prefix ++ ".ssl.cacertfile", Conf, undefined)}
])}];
_ -> error(invalid_server_options)
end
end,
[{list_to_atom(Name), ServerOptions("exhook.server." ++ Name)}
|| {["exhook", "server", Name, "url"], _} <- cuttlefish_variable:filter_by_prefix("exhook.server", Conf)]
end}.

View File

@ -127,14 +127,14 @@ message ClientAuthorizeRequest {
ClientInfo clientinfo = 1;
enum AuthzReqType {
enum AuthorizeReqType {
PUBLISH = 0;
SUBSCRIBE = 1;
}
AuthzReqType type = 2;
AuthorizeReqType type = 2;
string topic = 3;

View File

@ -0,0 +1,49 @@
%%-*- mode: erlang -*-
{plugins,
[rebar3_proper,
{grpc_plugin, {git, "https://github.com/HJianBo/grpc_plugin", {tag, "v0.10.2"}}}
]}.
{deps,
[{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.2"}}}
]}.
{grpc,
[{protos, ["priv/protos"]},
{gpb_opts, [{module_name_prefix, "emqx_"},
{module_name_suffix, "_pb"}]}
]}.
{provider_hooks,
[{pre, [{compile, {grpc, gen}},
{clean, {grpc, clean}}]}
]}.
{edoc_opts, [{preprocess, true}]}.
{erl_opts, [warn_unused_vars,
warn_shadow_vars,
warn_unused_import,
warn_obsolete_guard,
debug_info,
{parse_transform}]}.
{xref_checks, [undefined_function_calls, undefined_functions,
locals_not_used, deprecated_function_calls,
warnings_as_errors, deprecated_functions]}.
{xref_ignores, [emqx_exhook_pb]}.
{cover_enabled, true}.
{cover_opts, [verbose]}.
{cover_export_enabled, true}.
{cover_excl_mods, [emqx_exhook_pb,
emqx_exhook_v_1_hook_provider_bhvr,
emqx_exhook_v_1_hook_provider_client]}.
{profiles,
[{test,
[{deps,
[{emqx_ct_helper, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "v1.3.1"}}}
]}
]}
]}.

View File

@ -1,6 +1,6 @@
{application, emqx_exhook,
[{description, "EMQ X Extension for Hook"},
{vsn, "4.3.2"},
{vsn, "4.3.3"},
{modules, []},
{registered, []},
{mod, {emqx_exhook_app, []}},

View File

@ -0,0 +1,33 @@
%% -*-: erlang -*-
{VSN,
[
{"4.3.2", [
{load_module, emqx_exhook_app, brutal_purge, soft_purge, []}
]},
{"4.3.1", [
{load_module, emqx_exhook_app, brutal_purge, soft_purge, []},
{load_module, emqx_exhook_server, brutal_purge, soft_purge, []}
]},
{"4.3.0", [
{load_module, emqx_exhook_app, brutal_purge, soft_purge, []},
{load_module, emqx_exhook_pb, brutal_purge, soft_purge, []},
{load_module, emqx_exhook_server, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}
],
[
{"4.3.2", [
{load_module, emqx_exhook_app, brutal_purge, soft_purge, []}
]},
{"4.3.1", [
{load_module, emqx_exhook_app, brutal_purge, soft_purge, []},
{load_module, emqx_exhook_server, brutal_purge, soft_purge, []}
]},
{"4.3.0", [
{load_module, emqx_exhook_app, brutal_purge, soft_purge, []},
{load_module, emqx_exhook_pb, brutal_purge, soft_purge, []},
{load_module, emqx_exhook_server, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}
]
}.

View File

@ -16,7 +16,7 @@
-module(emqx_exhook).
-include("src/exhook/include/emqx_exhook.hrl").
-include("emqx_exhook.hrl").
-include_lib("emqx/include/logger.hrl").
@ -40,13 +40,13 @@
list() ->
[server(Name) || Name <- running()].
-spec enable(atom()|string(), list()) -> ok | {error, term()}.
enable(Name, Opts) ->
-spec enable(atom()|string(), map()) -> ok | {error, term()}.
enable(Name, Options) ->
case lists:member(Name, running()) of
true ->
{error, already_started};
_ ->
case emqx_exhook_server:load(Name, Opts) of
case emqx_exhook_server:load(Name, Options) of
{ok, ServiceState} ->
save(Name, ServiceState);
{error, Reason} ->

View File

@ -18,7 +18,7 @@
-behaviour(application).
-include("src/exhook/include/emqx_exhook.hrl").
-include("emqx_exhook.hrl").
-emqx_plugin(extension).
@ -67,9 +67,10 @@ stop(_State) ->
%%--------------------------------------------------------------------
load_all_servers() ->
lists:foreach(fun({Name, Options}) ->
_ = maps:map(fun(Name, Options) ->
load_server(Name, Options)
end, application:get_env(?APP, servers, [])).
end, emqx_config:get([exhook, server])),
ok.
unload_all_servers() ->
emqx_exhook:disable_all().

View File

@ -16,7 +16,7 @@
-module(emqx_exhook_cli).
-include("src/exhook/include/emqx_exhook.hrl").
-include("emqx_exhook.hrl").
-export([cli/1]).

View File

@ -16,7 +16,7 @@
-module(emqx_exhook_handler).
-include("src/exhook/include/emqx_exhook.hrl").
-include("emqx_exhook.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
@ -87,6 +87,7 @@ on_client_disconnected(ClientInfo, Reason, _ConnInfo) ->
},
cast('client.disconnected', Req).
%% FIXME: `AuthResult`
on_client_authenticate(ClientInfo, AuthResult) ->
%% XXX: Bool is missing more information about the atom of the result
%% So, the `Req` has missed detailed info too.

View File

@ -0,0 +1,62 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_exhook_schema).
-dialyzer(no_return).
-dialyzer(no_match).
-dialyzer(no_contracts).
-dialyzer(no_unused).
-dialyzer(no_fail_call).
-include_lib("typerefl/include/types.hrl").
-behaviour(hocon_schema).
-export([structs/0, fields/1]).
-export([t/1, t/3, t/4, ref/1]).
structs() -> [server].
fields(server) ->
[{"$name", t(ref(server_structs))}];
fields(server_structs) ->
[ {url, t(string(), "emqx_exhook.url", "")}
, {ssl, t(ref(ssl_conf_group))}
];
fields(ssl_conf_group) ->
[ {cacertfile, string()}
, {certfile, string()}
, {keyfile, string()}
].
%% types
t(Type) -> #{type => Type}.
t(Type, Mapping, Default) ->
hoconsc:t(Type, #{mapping => Mapping, default => Default}).
t(Type, Mapping, Default, OverrideEnv) ->
hoconsc:t(Type, #{ mapping => Mapping
, default => Default
, override_env => OverrideEnv
}).
ref(Field) ->
hoconsc:ref(?MODULE, Field).

View File

@ -16,7 +16,7 @@
-module(emqx_exhook_server).
-include("src/exhook/include/emqx_exhook.hrl").
-include("emqx_exhook.hrl").
-include_lib("emqx/include/logger.hrl").
@ -74,13 +74,17 @@
-export_type([server/0]).
-type options() :: #{ url := uri_string:uri_string()
, ssl => map()
}.
-dialyzer({nowarn_function, [inc_metrics/2]}).
%%--------------------------------------------------------------------
%% Load/Unload APIs
%%--------------------------------------------------------------------
-spec load(atom(), list()) -> {ok, server()} | {error, term()} .
-spec load(atom(), options()) -> {ok, server()} | {error, term()} .
load(Name0, Opts0) ->
Name = to_list(Name0),
{SvrAddr, ClientOpts} = channel_opts(Opts0),
@ -117,20 +121,27 @@ to_list(Name) when is_list(Name) ->
Name.
%% @private
channel_opts(Opts) ->
Scheme = proplists:get_value(scheme, Opts),
Host = proplists:get_value(host, Opts),
Port = proplists:get_value(port, Opts),
SvrAddr = format_http_uri(Scheme, Host, Port),
ClientOpts = case Scheme of
https ->
SslOpts = lists:keydelete(ssl, 1, proplists:get_value(ssl_options, Opts, [])),
#{gun_opts =>
#{transport => ssl,
transport_opts => SslOpts}};
_ -> #{}
end,
{SvrAddr, ClientOpts}.
channel_opts(Opts = #{url := URL}) ->
io:format("~p~n", [Opts]),
case uri_string:parse(URL) of
#{scheme := <<"http">>, host := Host, port := Port} ->
{format_http_uri("http", Host, Port), #{}};
#{scheme := <<"https">>, host := Host, port := Port} ->
SslOpts =
case maps:get(ssl, Opts, undefined) of
undefined -> [];
MapOpts ->
filter(
[{cacertfile, maps:get(cacertfile, MapOpts, undefined)},
{certfile, maps:get(certfile, MapOpts, undefined)},
{keyfile, maps:get(keyfile, MapOpts, undefined)}
])
end,
{format_http_uri("https", Host, Port),
#{gun_opts => #{transport => ssl, transport_opts => SslOpts}}};
_ ->
error(bad_server_url)
end.
format_http_uri(Scheme, Host0, Port) ->
Host = case is_tuple(Host0) of
@ -139,6 +150,9 @@ format_http_uri(Scheme, Host0, Port) ->
end,
lists:flatten(io_lib:format("~s://~s:~w", [Scheme, Host, Port])).
filter(Ls) ->
[ E || E <- Ls, E /= undefined].
-spec unload(server()) -> ok.
unload(#server{name = Name, hookspec = HookSpecs}) ->
_ = do_deinit(Name),

View File

@ -0,0 +1,96 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_exhook_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
%%--------------------------------------------------------------------
%% Setups
%%--------------------------------------------------------------------
all() -> emqx_ct:all(?MODULE).
init_per_suite(Cfg) ->
_ = emqx_exhook_demo_svr:start(),
emqx_ct_helpers:start_apps([emqx_exhook], fun set_special_cfgs/1),
Cfg.
end_per_suite(_Cfg) ->
emqx_ct_helpers:stop_apps([emqx_exhook]),
emqx_exhook_demo_svr:stop().
set_special_cfgs(emqx) ->
application:set_env(emqx, allow_anonymous, false),
application:set_env(emqx, enable_acl_cache, false),
application:set_env(emqx, plugins_loaded_file, undefined),
application:set_env(emqx, modules_loaded_file, undefined);
set_special_cfgs(emqx_exhook) ->
ok.
%%--------------------------------------------------------------------
%% Test cases
%%--------------------------------------------------------------------
t_noserver_nohook(_) ->
emqx_exhook:disable(default),
?assertEqual([], ets:tab2list(emqx_hooks)),
Opts = proplists:get_value(
default,
application:get_env(emqx_exhook, servers, [])
),
ok = emqx_exhook:enable(default, Opts),
?assertNotEqual([], ets:tab2list(emqx_hooks)).
t_cli_list(_) ->
meck_print(),
?assertEqual( [[emqx_exhook_server:format(Svr) || Svr <- emqx_exhook:list()]]
, emqx_exhook_cli:cli(["server", "list"])
),
unmeck_print().
t_cli_enable_disable(_) ->
meck_print(),
?assertEqual([already_started], emqx_exhook_cli:cli(["server", "enable", "default"])),
?assertEqual(ok, emqx_exhook_cli:cli(["server", "disable", "default"])),
?assertEqual([], emqx_exhook_cli:cli(["server", "list"])),
?assertEqual([not_running], emqx_exhook_cli:cli(["server", "disable", "default"])),
?assertEqual(ok, emqx_exhook_cli:cli(["server", "enable", "default"])),
unmeck_print().
t_cli_stats(_) ->
meck_print(),
_ = emqx_exhook_cli:cli(["server", "stats"]),
_ = emqx_exhook_cli:cli(x),
unmeck_print().
%%--------------------------------------------------------------------
%% Utils
%%--------------------------------------------------------------------
meck_print() ->
meck:new(emqx_ctl, [passthrough, no_history, no_link]),
meck:expect(emqx_ctl, print, fun(_) -> ok end),
meck:expect(emqx_ctl, print, fun(_, Args) -> Args end).
unmeck_print() ->
meck:unload(emqx_ctl).

View File

@ -0,0 +1,339 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_exhook_demo_svr).
-behavior(emqx_exhook_v_1_hook_provider_bhvr).
%%
-export([ start/0
, stop/0
, take/0
, in/1
]).
%% gRPC server HookProvider callbacks
-export([ on_provider_loaded/2
, on_provider_unloaded/2
, on_client_connect/2
, on_client_connack/2
, on_client_connected/2
, on_client_disconnected/2
, on_client_authenticate/2
, on_client_check_acl/2
, on_client_subscribe/2
, on_client_unsubscribe/2
, on_session_created/2
, on_session_subscribed/2
, on_session_unsubscribed/2
, on_session_resumed/2
, on_session_discarded/2
, on_session_takeovered/2
, on_session_terminated/2
, on_message_publish/2
, on_message_delivered/2
, on_message_dropped/2
, on_message_acked/2
]).
-define(PORT, 9000).
-define(NAME, ?MODULE).
%%--------------------------------------------------------------------
%% Server APIs
%%--------------------------------------------------------------------
start() ->
Pid = spawn(fun mngr_main/0),
register(?MODULE, Pid),
{ok, Pid}.
stop() ->
grpc:stop_server(?NAME),
?MODULE ! stop.
take() ->
?MODULE ! {take, self()},
receive {value, V} -> V
after 5000 -> error(timeout) end.
in({FunName, Req}) ->
?MODULE ! {in, FunName, Req}.
mngr_main() ->
application:ensure_all_started(grpc),
Services = #{protos => [emqx_exhook_pb],
services => #{'emqx.exhook.v1.HookProvider' => emqx_exhook_demo_svr}
},
Options = [],
Svr = grpc:start_server(?NAME, ?PORT, Services, Options),
mngr_loop([Svr, queue:new(), queue:new()]).
mngr_loop([Svr, Q, Takes]) ->
receive
{in, FunName, Req} ->
{NQ1, NQ2} = reply(queue:in({FunName, Req}, Q), Takes),
mngr_loop([Svr, NQ1, NQ2]);
{take, From} ->
{NQ1, NQ2} = reply(Q, queue:in(From, Takes)),
mngr_loop([Svr, NQ1, NQ2]);
stop ->
exit(normal)
end.
reply(Q1, Q2) ->
case queue:len(Q1) =:= 0 orelse
queue:len(Q2) =:= 0 of
true -> {Q1, Q2};
_ ->
{{value, {Name, V}}, NQ1} = queue:out(Q1),
{{value, From}, NQ2} = queue:out(Q2),
From ! {value, {Name, V}},
{NQ1, NQ2}
end.
%%--------------------------------------------------------------------
%% callbacks
%%--------------------------------------------------------------------
-spec on_provider_loaded(emqx_exhook_pb:provider_loaded_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:loaded_response(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_provider_loaded(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{hooks => [
#{name => <<"client.connect">>},
#{name => <<"client.connack">>},
#{name => <<"client.connected">>},
#{name => <<"client.disconnected">>},
#{name => <<"client.authenticate">>},
#{name => <<"client.check_acl">>},
#{name => <<"client.subscribe">>},
#{name => <<"client.unsubscribe">>},
#{name => <<"session.created">>},
#{name => <<"session.subscribed">>},
#{name => <<"session.unsubscribed">>},
#{name => <<"session.resumed">>},
#{name => <<"session.discarded">>},
#{name => <<"session.takeovered">>},
#{name => <<"session.terminated">>},
#{name => <<"message.publish">>},
#{name => <<"message.delivered">>},
#{name => <<"message.acked">>},
#{name => <<"message.dropped">>}]}, Md}.
-spec on_provider_unloaded(emqx_exhook_pb:provider_unloaded_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_provider_unloaded(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_client_connect(emqx_exhook_pb:client_connect_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_client_connect(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_client_connack(emqx_exhook_pb:client_connack_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_client_connack(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_client_connected(emqx_exhook_pb:client_connected_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_client_connected(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_client_disconnected(emqx_exhook_pb:client_disconnected_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_client_disconnected(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_client_authenticate(emqx_exhook_pb:client_authenticate_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_client_authenticate(#{clientinfo := #{username := Username}} = Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
%% some cases for testing
case Username of
<<"baduser">> ->
{ok, #{type => 'STOP_AND_RETURN',
value => {bool_result, false}}, Md};
<<"gooduser">> ->
{ok, #{type => 'STOP_AND_RETURN',
value => {bool_result, true}}, Md};
<<"normaluser">> ->
{ok, #{type => 'CONTINUE',
value => {bool_result, true}}, Md};
_ ->
{ok, #{type => 'IGNORE'}, Md}
end.
-spec on_client_check_acl(emqx_exhook_pb:client_check_acl_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_client_check_acl(#{clientinfo := #{username := Username}} = Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
%% some cases for testing
case Username of
<<"baduser">> ->
{ok, #{type => 'STOP_AND_RETURN',
value => {bool_result, false}}, Md};
<<"gooduser">> ->
{ok, #{type => 'STOP_AND_RETURN',
value => {bool_result, true}}, Md};
<<"normaluser">> ->
{ok, #{type => 'CONTINUE',
value => {bool_result, true}}, Md};
_ ->
{ok, #{type => 'IGNORE'}, Md}
end.
-spec on_client_subscribe(emqx_exhook_pb:client_subscribe_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_client_subscribe(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_client_unsubscribe(emqx_exhook_pb:client_unsubscribe_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_client_unsubscribe(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_session_created(emqx_exhook_pb:session_created_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_session_created(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_session_subscribed(emqx_exhook_pb:session_subscribed_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_session_subscribed(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_session_unsubscribed(emqx_exhook_pb:session_unsubscribed_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_session_unsubscribed(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_session_resumed(emqx_exhook_pb:session_resumed_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_session_resumed(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_session_discarded(emqx_exhook_pb:session_discarded_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_session_discarded(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_session_takeovered(emqx_exhook_pb:session_takeovered_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_session_takeovered(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_session_terminated(emqx_exhook_pb:session_terminated_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_session_terminated(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_message_publish(emqx_exhook_pb:message_publish_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_message_publish(#{message := #{from := From} = Msg} = Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
%% some cases for testing
case From of
<<"baduser">> ->
NMsg = Msg#{qos => 0,
topic => <<"">>,
payload => <<"">>
},
{ok, #{type => 'STOP_AND_RETURN',
value => {message, NMsg}}, Md};
<<"gooduser">> ->
NMsg = Msg#{topic => From,
payload => From},
{ok, #{type => 'STOP_AND_RETURN',
value => {message, NMsg}}, Md};
_ ->
{ok, #{type => 'IGNORE'}, Md}
end.
-spec on_message_delivered(emqx_exhook_pb:message_delivered_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_message_delivered(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_message_dropped(emqx_exhook_pb:message_dropped_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_message_dropped(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.
-spec on_message_acked(emqx_exhook_pb:message_acked_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.
on_message_acked(Req, Md) ->
?MODULE:in({?FUNCTION_NAME, Req}),
%io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
{ok, #{}, Md}.

View File

@ -0,0 +1,531 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(prop_exhook_hooks).
-include_lib("proper/include/proper.hrl").
-include_lib("eunit/include/eunit.hrl").
-import(emqx_ct_proper_types,
[ conninfo/0
, clientinfo/0
, sessioninfo/0
, message/0
, connack_return_code/0
, topictab/0
, topic/0
, subopts/0
]).
-define(ALL(Vars, Types, Exprs),
?SETUP(fun() ->
State = do_setup(),
fun() -> do_teardown(State) end
end, ?FORALL(Vars, Types, Exprs))).
%%--------------------------------------------------------------------
%% Properties
%%--------------------------------------------------------------------
prop_client_connect() ->
?ALL({ConnInfo, ConnProps},
{conninfo(), conn_properties()},
begin
ok = emqx_hooks:run('client.connect', [ConnInfo, ConnProps]),
{'on_client_connect', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{props => properties(ConnProps),
conninfo => from_conninfo(ConnInfo)
},
?assertEqual(Expected, Resp),
true
end).
prop_client_connack() ->
?ALL({ConnInfo, Rc, AckProps},
{conninfo(), connack_return_code(), ack_properties()},
begin
ok = emqx_hooks:run('client.connack', [ConnInfo, Rc, AckProps]),
{'on_client_connack', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{props => properties(AckProps),
result_code => atom_to_binary(Rc, utf8),
conninfo => from_conninfo(ConnInfo)
},
?assertEqual(Expected, Resp),
true
end).
prop_client_authenticate() ->
?ALL({ClientInfo0, AuthResult},
{clientinfo(), authresult()},
begin
ClientInfo = inject_magic_into(username, ClientInfo0),
OutAuthResult = emqx_hooks:run_fold('client.authenticate', [ClientInfo], AuthResult),
ExpectedAuthResult = case maps:get(username, ClientInfo) of
<<"baduser">> ->
AuthResult#{
auth_result => not_authorized,
anonymous => false};
<<"gooduser">> ->
AuthResult#{
auth_result => success,
anonymous => false};
<<"normaluser">> ->
AuthResult#{
auth_result => success,
anonymous => false};
_ ->
case maps:get(auth_result, AuthResult) of
success ->
#{auth_result => success,
anonymous => false};
_ ->
#{auth_result => not_authorized,
anonymous => false}
end
end,
?assertEqual(ExpectedAuthResult, OutAuthResult),
{'on_client_authenticate', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{result => authresult_to_bool(AuthResult),
clientinfo => from_clientinfo(ClientInfo)
},
?assertEqual(Expected, Resp),
true
end).
prop_client_check_acl() ->
?ALL({ClientInfo0, PubSub, Topic, Result},
{clientinfo(), oneof([publish, subscribe]),
topic(), oneof([allow, deny])},
begin
ClientInfo = inject_magic_into(username, ClientInfo0),
OutResult = emqx_hooks:run_fold(
'client.check_acl',
[ClientInfo, PubSub, Topic],
Result),
ExpectedOutResult = case maps:get(username, ClientInfo) of
<<"baduser">> -> deny;
<<"gooduser">> -> allow;
<<"normaluser">> -> allow;
_ -> Result
end,
?assertEqual(ExpectedOutResult, OutResult),
{'on_client_check_acl', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{result => aclresult_to_bool(Result),
type => pubsub_to_enum(PubSub),
topic => Topic,
clientinfo => from_clientinfo(ClientInfo)
},
?assertEqual(Expected, Resp),
true
end).
prop_client_connected() ->
?ALL({ClientInfo, ConnInfo},
{clientinfo(), conninfo()},
begin
ok = emqx_hooks:run('client.connected', [ClientInfo, ConnInfo]),
{'on_client_connected', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{clientinfo => from_clientinfo(ClientInfo)
},
?assertEqual(Expected, Resp),
true
end).
prop_client_disconnected() ->
?ALL({ClientInfo, Reason, ConnInfo},
{clientinfo(), shutdown_reason(), conninfo()},
begin
ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]),
{'on_client_disconnected', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{reason => stringfy(Reason),
clientinfo => from_clientinfo(ClientInfo)
},
?assertEqual(Expected, Resp),
true
end).
prop_client_subscribe() ->
?ALL({ClientInfo, SubProps, TopicTab},
{clientinfo(), sub_properties(), topictab()},
begin
ok = emqx_hooks:run('client.subscribe', [ClientInfo, SubProps, TopicTab]),
{'on_client_subscribe', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{props => properties(SubProps),
topic_filters => topicfilters(TopicTab),
clientinfo => from_clientinfo(ClientInfo)
},
?assertEqual(Expected, Resp),
true
end).
prop_client_unsubscribe() ->
?ALL({ClientInfo, UnSubProps, TopicTab},
{clientinfo(), unsub_properties(), topictab()},
begin
ok = emqx_hooks:run('client.unsubscribe', [ClientInfo, UnSubProps, TopicTab]),
{'on_client_unsubscribe', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{props => properties(UnSubProps),
topic_filters => topicfilters(TopicTab),
clientinfo => from_clientinfo(ClientInfo)
},
?assertEqual(Expected, Resp),
true
end).
prop_session_created() ->
?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()},
begin
ok = emqx_hooks:run('session.created', [ClientInfo, SessInfo]),
{'on_session_created', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{clientinfo => from_clientinfo(ClientInfo)
},
?assertEqual(Expected, Resp),
true
end).
prop_session_subscribed() ->
?ALL({ClientInfo, Topic, SubOpts},
{clientinfo(), topic(), subopts()},
begin
ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]),
{'on_session_subscribed', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{topic => Topic,
subopts => subopts(SubOpts),
clientinfo => from_clientinfo(ClientInfo)
},
?assertEqual(Expected, Resp),
true
end).
prop_session_unsubscribed() ->
?ALL({ClientInfo, Topic, SubOpts},
{clientinfo(), topic(), subopts()},
begin
ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, Topic, SubOpts]),
{'on_session_unsubscribed', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{topic => Topic,
clientinfo => from_clientinfo(ClientInfo)
},
?assertEqual(Expected, Resp),
true
end).
prop_session_resumed() ->
?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()},
begin
ok = emqx_hooks:run('session.resumed', [ClientInfo, SessInfo]),
{'on_session_resumed', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{clientinfo => from_clientinfo(ClientInfo)
},
?assertEqual(Expected, Resp),
true
end).
prop_session_discared() ->
?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()},
begin
ok = emqx_hooks:run('session.discarded', [ClientInfo, SessInfo]),
{'on_session_discarded', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{clientinfo => from_clientinfo(ClientInfo)
},
?assertEqual(Expected, Resp),
true
end).
prop_session_takeovered() ->
?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()},
begin
ok = emqx_hooks:run('session.takeovered', [ClientInfo, SessInfo]),
{'on_session_takeovered', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{clientinfo => from_clientinfo(ClientInfo)
},
?assertEqual(Expected, Resp),
true
end).
prop_session_terminated() ->
?ALL({ClientInfo, Reason, SessInfo},
{clientinfo(), shutdown_reason(), sessioninfo()},
begin
ok = emqx_hooks:run('session.terminated', [ClientInfo, Reason, SessInfo]),
{'on_session_terminated', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{reason => stringfy(Reason),
clientinfo => from_clientinfo(ClientInfo)
},
?assertEqual(Expected, Resp),
true
end).
prop_message_publish() ->
?ALL(Msg0, message(),
begin
Msg = emqx_message:from_map(
inject_magic_into(from, emqx_message:to_map(Msg0))),
OutMsg= emqx_hooks:run_fold('message.publish', [], Msg),
case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
true ->
?assertEqual(Msg, OutMsg),
skip;
_ ->
ExpectedOutMsg = case emqx_message:from(Msg) of
<<"baduser">> ->
MsgMap = emqx_message:to_map(Msg),
emqx_message:from_map(
MsgMap#{qos => 0,
topic => <<"">>,
payload => <<"">>
});
<<"gooduser">> = From ->
MsgMap = emqx_message:to_map(Msg),
emqx_message:from_map(
MsgMap#{topic => From,
payload => From
});
_ -> Msg
end,
?assertEqual(ExpectedOutMsg, OutMsg),
{'on_message_publish', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{message => from_message(Msg)
},
?assertEqual(Expected, Resp)
end,
true
end).
prop_message_dropped() ->
?ALL({Msg, By, Reason}, {message(), hardcoded, shutdown_reason()},
begin
ok = emqx_hooks:run('message.dropped', [Msg, By, Reason]),
case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
true -> skip;
_ ->
{'on_message_dropped', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{reason => stringfy(Reason),
message => from_message(Msg)
},
?assertEqual(Expected, Resp)
end,
true
end).
prop_message_delivered() ->
?ALL({ClientInfo, Msg}, {clientinfo(), message()},
begin
ok = emqx_hooks:run('message.delivered', [ClientInfo, Msg]),
case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
true -> skip;
_ ->
{'on_message_delivered', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{clientinfo => from_clientinfo(ClientInfo),
message => from_message(Msg)
},
?assertEqual(Expected, Resp)
end,
true
end).
prop_message_acked() ->
?ALL({ClientInfo, Msg}, {clientinfo(), message()},
begin
ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
true -> skip;
_ ->
{'on_message_acked', Resp} = emqx_exhook_demo_svr:take(),
Expected =
#{clientinfo => from_clientinfo(ClientInfo),
message => from_message(Msg)
},
?assertEqual(Expected, Resp)
end,
true
end).
nodestr() ->
stringfy(node()).
peerhost(#{peername := {Host, _}}) ->
ntoa(Host).
sockport(#{sockname := {_, Port}}) ->
Port.
%% copied from emqx_exhook
ntoa({0,0,0,0,0,16#ffff,AB,CD}) ->
list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}));
ntoa(IP) ->
list_to_binary(inet_parse:ntoa(IP)).
maybe(undefined) -> <<>>;
maybe(B) -> B.
properties(undefined) -> [];
properties(M) when is_map(M) ->
maps:fold(fun(K, V, Acc) ->
[#{name => stringfy(K),
value => stringfy(V)} | Acc]
end, [], M).
topicfilters(Tfs) when is_list(Tfs) ->
[#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs].
%% @private
stringfy(Term) when is_binary(Term) ->
Term;
stringfy(Term) when is_integer(Term) ->
integer_to_binary(Term);
stringfy(Term) when is_atom(Term) ->
atom_to_binary(Term, utf8);
stringfy(Term) ->
unicode:characters_to_binary((io_lib:format("~0p", [Term]))).
subopts(SubOpts) ->
#{qos => maps:get(qos, SubOpts, 0),
rh => maps:get(rh, SubOpts, 0),
rap => maps:get(rap, SubOpts, 0),
nl => maps:get(nl, SubOpts, 0),
share => maps:get(share, SubOpts, <<>>)
}.
authresult_to_bool(AuthResult) ->
maps:get(auth_result, AuthResult, undefined) == success.
aclresult_to_bool(Result) ->
Result == allow.
pubsub_to_enum(publish) -> 'PUBLISH';
pubsub_to_enum(subscribe) -> 'SUBSCRIBE'.
from_conninfo(ConnInfo) ->
#{node => nodestr(),
clientid => maps:get(clientid, ConnInfo),
username => maybe(maps:get(username, ConnInfo, <<>>)),
peerhost => peerhost(ConnInfo),
sockport => sockport(ConnInfo),
proto_name => maps:get(proto_name, ConnInfo),
proto_ver => stringfy(maps:get(proto_ver, ConnInfo)),
keepalive => maps:get(keepalive, ConnInfo)
}.
from_clientinfo(ClientInfo) ->
#{node => nodestr(),
clientid => maps:get(clientid, ClientInfo),
username => maybe(maps:get(username, ClientInfo, <<>>)),
password => maybe(maps:get(password, ClientInfo, <<>>)),
peerhost => ntoa(maps:get(peerhost, ClientInfo)),
sockport => maps:get(sockport, ClientInfo),
protocol => stringfy(maps:get(protocol, ClientInfo)),
mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
is_superuser => maps:get(is_superuser, ClientInfo, false),
anonymous => maps:get(anonymous, ClientInfo, true),
cn => maybe(maps:get(cn, ClientInfo, <<>>)),
dn => maybe(maps:get(dn, ClientInfo, <<>>))
}.
from_message(Msg) ->
#{node => nodestr(),
id => emqx_guid:to_hexstr(emqx_message:id(Msg)),
qos => emqx_message:qos(Msg),
from => stringfy(emqx_message:from(Msg)),
topic => emqx_message:topic(Msg),
payload => emqx_message:payload(Msg),
timestamp => emqx_message:timestamp(Msg)
}.
%%--------------------------------------------------------------------
%% Helper
%%--------------------------------------------------------------------
do_setup() ->
logger:set_primary_config(#{level => warning}),
_ = emqx_exhook_demo_svr:start(),
emqx_ct_helpers:start_apps([emqx_exhook], fun set_special_cfgs/1),
%% waiting first loaded event
{'on_provider_loaded', _} = emqx_exhook_demo_svr:take(),
ok.
do_teardown(_) ->
emqx_ct_helpers:stop_apps([emqx_exhook]),
%% waiting last unloaded event
{'on_provider_unloaded', _} = emqx_exhook_demo_svr:take(),
_ = emqx_exhook_demo_svr:stop(),
logger:set_primary_config(#{level => notice}),
timer:sleep(2000),
ok.
set_special_cfgs(emqx) ->
application:set_env(emqx, allow_anonymous, false),
application:set_env(emqx, enable_acl_cache, false),
application:set_env(emqx, modules_loaded_file, undefined),
application:set_env(emqx, plugins_loaded_file,
emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins"));
set_special_cfgs(emqx_exhook) ->
ok.
%%--------------------------------------------------------------------
%% Generators
%%--------------------------------------------------------------------
conn_properties() ->
#{}.
ack_properties() ->
#{}.
sub_properties() ->
#{}.
unsub_properties() ->
#{}.
shutdown_reason() ->
oneof([utf8(), {shutdown, emqx_ct_proper_types:limited_atom()}]).
authresult() ->
?LET(RC, connack_return_code(), #{auth_result => RC}).
inject_magic_into(Key, Object) ->
case castspell() of
muggles -> Object;
Spell ->
Object#{Key => Spell}
end.
castspell() ->
L = [<<"baduser">>, <<"gooduser">>, <<"normaluser">>, muggles],
lists:nth(rand:uniform(length(L)), L).

View File

@ -1,15 +0,0 @@
##====================================================================
## EMQ X Hooks
##====================================================================
##--------------------------------------------------------------------
## Server Address
## The gRPC server url
##
## exhook.server.$name.url = url()
exhook.server.default.url = "http://127.0.0.1:9000"
#exhook.server.default.ssl.cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem"
#exhook.server.default.ssl.certfile = "{{ platform_etc_dir }}/certs/cert.pem"
#exhook.server.default.ssl.keyfile = "{{ platform_etc_dir }}/certs/key.pem"

View File

@ -1,531 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(prop_exhook_hooks).
% -include_lib("proper/include/proper.hrl").
% -include_lib("eunit/include/eunit.hrl").
% -import(emqx_ct_proper_types,
% [ conninfo/0
% , clientinfo/0
% , sessioninfo/0
% , message/0
% , connack_return_code/0
% , topictab/0
% , topic/0
% , subopts/0
% ]).
% -define(ALL(Vars, Types, Exprs),
% ?SETUP(fun() ->
% State = do_setup(),
% fun() -> do_teardown(State) end
% end, ?FORALL(Vars, Types, Exprs))).
% %%--------------------------------------------------------------------
% %% Properties
% %%--------------------------------------------------------------------
% prop_client_connect() ->
% ?ALL({ConnInfo, ConnProps},
% {conninfo(), conn_properties()},
% begin
% ok = emqx_hooks:run('client.connect', [ConnInfo, ConnProps]),
% {'on_client_connect', Resp} = emqx_exhook_demo_svr:take(),
% Expected =
% #{props => properties(ConnProps),
% conninfo => from_conninfo(ConnInfo)
% },
% ?assertEqual(Expected, Resp),
% true
% end).
% prop_client_connack() ->
% ?ALL({ConnInfo, Rc, AckProps},
% {conninfo(), connack_return_code(), ack_properties()},
% begin
% ok = emqx_hooks:run('client.connack', [ConnInfo, Rc, AckProps]),
% {'on_client_connack', Resp} = emqx_exhook_demo_svr:take(),
% Expected =
% #{props => properties(AckProps),
% result_code => atom_to_binary(Rc, utf8),
% conninfo => from_conninfo(ConnInfo)
% },
% ?assertEqual(Expected, Resp),
% true
% end).
% prop_client_authenticate() ->
% ?ALL({ClientInfo0, AuthResult},
% {clientinfo(), authresult()},
% begin
% ClientInfo = inject_magic_into(username, ClientInfo0),
% OutAuthResult = emqx_hooks:run_fold('client.authenticate', [ClientInfo], AuthResult),
% ExpectedAuthResult = case maps:get(username, ClientInfo) of
% <<"baduser">> ->
% AuthResult#{
% auth_result => not_authorized,
% anonymous => false};
% <<"gooduser">> ->
% AuthResult#{
% auth_result => success,
% anonymous => false};
% <<"normaluser">> ->
% AuthResult#{
% auth_result => success,
% anonymous => false};
% _ ->
% case maps:get(auth_result, AuthResult) of
% success ->
% #{auth_result => success,
% anonymous => false};
% _ ->
% #{auth_result => not_authorized,
% anonymous => false}
% end
% end,
% ?assertEqual(ExpectedAuthResult, OutAuthResult),
% {'on_client_authenticate', Resp} = emqx_exhook_demo_svr:take(),
% Expected =
% #{result => authresult_to_bool(AuthResult),
% clientinfo => from_clientinfo(ClientInfo)
% },
% ?assertEqual(Expected, Resp),
% true
% end).
% prop_client_authorize() ->
% ?ALL({ClientInfo0, PubSub, Topic, Result},
% {clientinfo(), oneof([publish, subscribe]),
% topic(), oneof([allow, deny])},
% begin
% ClientInfo = inject_magic_into(username, ClientInfo0),
% OutResult = emqx_hooks:run_fold(
% 'client.authorize',
% [ClientInfo, PubSub, Topic],
% Result),
% ExpectedOutResult = case maps:get(username, ClientInfo) of
% <<"baduser">> -> deny;
% <<"gooduser">> -> allow;
% <<"normaluser">> -> allow;
% _ -> Result
% end,
% ?assertEqual(ExpectedOutResult, OutResult),
% {'on_client_authorize', Resp} = emqx_exhook_demo_svr:take(),
% Expected =
% #{result => authzresult_to_bool(Result),
% type => pubsub_to_enum(PubSub),
% topic => Topic,
% clientinfo => from_clientinfo(ClientInfo)
% },
% ?assertEqual(Expected, Resp),
% true
% end).
% prop_client_connected() ->
% ?ALL({ClientInfo, ConnInfo},
% {clientinfo(), conninfo()},
% begin
% ok = emqx_hooks:run('client.connected', [ClientInfo, ConnInfo]),
% {'on_client_connected', Resp} = emqx_exhook_demo_svr:take(),
% Expected =
% #{clientinfo => from_clientinfo(ClientInfo)
% },
% ?assertEqual(Expected, Resp),
% true
% end).
% prop_client_disconnected() ->
% ?ALL({ClientInfo, Reason, ConnInfo},
% {clientinfo(), shutdown_reason(), conninfo()},
% begin
% ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]),
% {'on_client_disconnected', Resp} = emqx_exhook_demo_svr:take(),
% Expected =
% #{reason => stringfy(Reason),
% clientinfo => from_clientinfo(ClientInfo)
% },
% ?assertEqual(Expected, Resp),
% true
% end).
% prop_client_subscribe() ->
% ?ALL({ClientInfo, SubProps, TopicTab},
% {clientinfo(), sub_properties(), topictab()},
% begin
% ok = emqx_hooks:run('client.subscribe', [ClientInfo, SubProps, TopicTab]),
% {'on_client_subscribe', Resp} = emqx_exhook_demo_svr:take(),
% Expected =
% #{props => properties(SubProps),
% topic_filters => topicfilters(TopicTab),
% clientinfo => from_clientinfo(ClientInfo)
% },
% ?assertEqual(Expected, Resp),
% true
% end).
% prop_client_unsubscribe() ->
% ?ALL({ClientInfo, UnSubProps, TopicTab},
% {clientinfo(), unsub_properties(), topictab()},
% begin
% ok = emqx_hooks:run('client.unsubscribe', [ClientInfo, UnSubProps, TopicTab]),
% {'on_client_unsubscribe', Resp} = emqx_exhook_demo_svr:take(),
% Expected =
% #{props => properties(UnSubProps),
% topic_filters => topicfilters(TopicTab),
% clientinfo => from_clientinfo(ClientInfo)
% },
% ?assertEqual(Expected, Resp),
% true
% end).
% prop_session_created() ->
% ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()},
% begin
% ok = emqx_hooks:run('session.created', [ClientInfo, SessInfo]),
% {'on_session_created', Resp} = emqx_exhook_demo_svr:take(),
% Expected =
% #{clientinfo => from_clientinfo(ClientInfo)
% },
% ?assertEqual(Expected, Resp),
% true
% end).
% prop_session_subscribed() ->
% ?ALL({ClientInfo, Topic, SubOpts},
% {clientinfo(), topic(), subopts()},
% begin
% ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]),
% {'on_session_subscribed', Resp} = emqx_exhook_demo_svr:take(),
% Expected =
% #{topic => Topic,
% subopts => subopts(SubOpts),
% clientinfo => from_clientinfo(ClientInfo)
% },
% ?assertEqual(Expected, Resp),
% true
% end).
% prop_session_unsubscribed() ->
% ?ALL({ClientInfo, Topic, SubOpts},
% {clientinfo(), topic(), subopts()},
% begin
% ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, Topic, SubOpts]),
% {'on_session_unsubscribed', Resp} = emqx_exhook_demo_svr:take(),
% Expected =
% #{topic => Topic,
% clientinfo => from_clientinfo(ClientInfo)
% },
% ?assertEqual(Expected, Resp),
% true
% end).
% prop_session_resumed() ->
% ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()},
% begin
% ok = emqx_hooks:run('session.resumed', [ClientInfo, SessInfo]),
% {'on_session_resumed', Resp} = emqx_exhook_demo_svr:take(),
% Expected =
% #{clientinfo => from_clientinfo(ClientInfo)
% },
% ?assertEqual(Expected, Resp),
% true
% end).
% prop_session_discared() ->
% ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()},
% begin
% ok = emqx_hooks:run('session.discarded', [ClientInfo, SessInfo]),
% {'on_session_discarded', Resp} = emqx_exhook_demo_svr:take(),
% Expected =
% #{clientinfo => from_clientinfo(ClientInfo)
% },
% ?assertEqual(Expected, Resp),
% true
% end).
% prop_session_takeovered() ->
% ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()},
% begin
% ok = emqx_hooks:run('session.takeovered', [ClientInfo, SessInfo]),
% {'on_session_takeovered', Resp} = emqx_exhook_demo_svr:take(),
% Expected =
% #{clientinfo => from_clientinfo(ClientInfo)
% },
% ?assertEqual(Expected, Resp),
% true
% end).
% prop_session_terminated() ->
% ?ALL({ClientInfo, Reason, SessInfo},
% {clientinfo(), shutdown_reason(), sessioninfo()},
% begin
% ok = emqx_hooks:run('session.terminated', [ClientInfo, Reason, SessInfo]),
% {'on_session_terminated', Resp} = emqx_exhook_demo_svr:take(),
% Expected =
% #{reason => stringfy(Reason),
% clientinfo => from_clientinfo(ClientInfo)
% },
% ?assertEqual(Expected, Resp),
% true
% end).
% prop_message_publish() ->
% ?ALL(Msg0, message(),
% begin
% Msg = emqx_message:from_map(
% inject_magic_into(from, emqx_message:to_map(Msg0))),
% OutMsg= emqx_hooks:run_fold('message.publish', [], Msg),
% case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
% true ->
% ?assertEqual(Msg, OutMsg),
% skip;
% _ ->
% ExpectedOutMsg = case emqx_message:from(Msg) of
% <<"baduser">> ->
% MsgMap = emqx_message:to_map(Msg),
% emqx_message:from_map(
% MsgMap#{qos => 0,
% topic => <<"">>,
% payload => <<"">>
% });
% <<"gooduser">> = From ->
% MsgMap = emqx_message:to_map(Msg),
% emqx_message:from_map(
% MsgMap#{topic => From,
% payload => From
% });
% _ -> Msg
% end,
% ?assertEqual(ExpectedOutMsg, OutMsg),
% {'on_message_publish', Resp} = emqx_exhook_demo_svr:take(),
% Expected =
% #{message => from_message(Msg)
% },
% ?assertEqual(Expected, Resp)
% end,
% true
% end).
% prop_message_dropped() ->
% ?ALL({Msg, By, Reason}, {message(), hardcoded, shutdown_reason()},
% begin
% ok = emqx_hooks:run('message.dropped', [Msg, By, Reason]),
% case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
% true -> skip;
% _ ->
% {'on_message_dropped', Resp} = emqx_exhook_demo_svr:take(),
% Expected =
% #{reason => stringfy(Reason),
% message => from_message(Msg)
% },
% ?assertEqual(Expected, Resp)
% end,
% true
% end).
% prop_message_delivered() ->
% ?ALL({ClientInfo, Msg}, {clientinfo(), message()},
% begin
% ok = emqx_hooks:run('message.delivered', [ClientInfo, Msg]),
% case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
% true -> skip;
% _ ->
% {'on_message_delivered', Resp} = emqx_exhook_demo_svr:take(),
% Expected =
% #{clientinfo => from_clientinfo(ClientInfo),
% message => from_message(Msg)
% },
% ?assertEqual(Expected, Resp)
% end,
% true
% end).
% prop_message_acked() ->
% ?ALL({ClientInfo, Msg}, {clientinfo(), message()},
% begin
% ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]),
% case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of
% true -> skip;
% _ ->
% {'on_message_acked', Resp} = emqx_exhook_demo_svr:take(),
% Expected =
% #{clientinfo => from_clientinfo(ClientInfo),
% message => from_message(Msg)
% },
% ?assertEqual(Expected, Resp)
% end,
% true
% end).
% nodestr() ->
% stringfy(node()).
% peerhost(#{peername := {Host, _}}) ->
% ntoa(Host).
% sockport(#{sockname := {_, Port}}) ->
% Port.
% %% copied from emqx_exhook
% ntoa({0,0,0,0,0,16#ffff,AB,CD}) ->
% list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}));
% ntoa(IP) ->
% list_to_binary(inet_parse:ntoa(IP)).
% maybe(undefined) -> <<>>;
% maybe(B) -> B.
% properties(undefined) -> [];
% properties(M) when is_map(M) ->
% maps:fold(fun(K, V, Acc) ->
% [#{name => stringfy(K),
% value => stringfy(V)} | Acc]
% end, [], M).
% topicfilters(Tfs) when is_list(Tfs) ->
% [#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs].
% %% @private
% stringfy(Term) when is_binary(Term) ->
% Term;
% stringfy(Term) when is_integer(Term) ->
% integer_to_binary(Term);
% stringfy(Term) when is_atom(Term) ->
% atom_to_binary(Term, utf8);
% stringfy(Term) ->
% unicode:characters_to_binary((io_lib:format("~0p", [Term]))).
% subopts(SubOpts) ->
% #{qos => maps:get(qos, SubOpts, 0),
% rh => maps:get(rh, SubOpts, 0),
% rap => maps:get(rap, SubOpts, 0),
% nl => maps:get(nl, SubOpts, 0),
% share => maps:get(share, SubOpts, <<>>)
% }.
% authresult_to_bool(AuthResult) ->
% maps:get(auth_result, AuthResult, undefined) == success.
% authzresult_to_bool(Result) ->
% Result == allow.
% pubsub_to_enum(publish) -> 'PUBLISH';
% pubsub_to_enum(subscribe) -> 'SUBSCRIBE'.
% from_conninfo(ConnInfo) ->
% #{node => nodestr(),
% clientid => maps:get(clientid, ConnInfo),
% username => maybe(maps:get(username, ConnInfo, <<>>)),
% peerhost => peerhost(ConnInfo),
% sockport => sockport(ConnInfo),
% proto_name => maps:get(proto_name, ConnInfo),
% proto_ver => stringfy(maps:get(proto_ver, ConnInfo)),
% keepalive => maps:get(keepalive, ConnInfo)
% }.
% from_clientinfo(ClientInfo) ->
% #{node => nodestr(),
% clientid => maps:get(clientid, ClientInfo),
% username => maybe(maps:get(username, ClientInfo, <<>>)),
% password => maybe(maps:get(password, ClientInfo, <<>>)),
% peerhost => ntoa(maps:get(peerhost, ClientInfo)),
% sockport => maps:get(sockport, ClientInfo),
% protocol => stringfy(maps:get(protocol, ClientInfo)),
% mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
% is_superuser => maps:get(is_superuser, ClientInfo, false),
% anonymous => maps:get(anonymous, ClientInfo, true),
% cn => maybe(maps:get(cn, ClientInfo, <<>>)),
% dn => maybe(maps:get(dn, ClientInfo, <<>>))
% }.
% from_message(Msg) ->
% #{node => nodestr(),
% id => emqx_guid:to_hexstr(emqx_message:id(Msg)),
% qos => emqx_message:qos(Msg),
% from => stringfy(emqx_message:from(Msg)),
% topic => emqx_message:topic(Msg),
% payload => emqx_message:payload(Msg),
% timestamp => emqx_message:timestamp(Msg)
% }.
% %%--------------------------------------------------------------------
% %% Helper
% %%--------------------------------------------------------------------
% do_setup() ->
% logger:set_primary_config(#{level => warning}),
% _ = emqx_exhook_demo_svr:start(),
% emqx_ct_helpers:start_apps([emqx_exhook], fun set_special_cfgs/1),
% %% waiting first loaded event
% {'on_provider_loaded', _} = emqx_exhook_demo_svr:take(),
% ok.
% do_teardown(_) ->
% emqx_ct_helpers:stop_apps([emqx_exhook]),
% %% waiting last unloaded event
% {'on_provider_unloaded', _} = emqx_exhook_demo_svr:take(),
% _ = emqx_exhook_demo_svr:stop(),
% logger:set_primary_config(#{level => notice}),
% timer:sleep(2000),
% ok.
% set_special_cfgs(emqx) ->
% application:set_env(emqx, allow_anonymous, false),
% application:set_env(emqx, enable_authz_cache, false),
% application:set_env(emqx, modules_loaded_file, undefined),
% application:set_env(emqx, plugins_loaded_file,
% emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins"));
% set_special_cfgs(emqx_exhook) ->
% ok.
% %%--------------------------------------------------------------------
% %% Generators
% %%--------------------------------------------------------------------
% conn_properties() ->
% #{}.
% ack_properties() ->
% #{}.
% sub_properties() ->
% #{}.
% unsub_properties() ->
% #{}.
% shutdown_reason() ->
% oneof([utf8(), {shutdown, emqx_ct_proper_types:limited_atom()}]).
% authresult() ->
% ?LET(RC, connack_return_code(), #{auth_result => RC}).
% inject_magic_into(Key, Object) ->
% case castspell() of
% muggles -> Object;
% Spell ->
% Object#{Key => Spell}
% end.
% castspell() ->
% L = [<<"baduser">>, <<"gooduser">>, <<"normaluser">>, muggles],
% lists:nth(rand:uniform(length(L)), L).

View File

@ -1,97 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_exhook_SUITE).
% -compile(export_all).
% -compile(nowarn_export_all).
% -include_lib("eunit/include/eunit.hrl").
% -include_lib("common_test/include/ct.hrl").
% %%--------------------------------------------------------------------
% %% Setups
% %%--------------------------------------------------------------------
% all() -> emqx_ct:all(?MODULE).
% init_per_suite(Cfg) ->
% _ = emqx_exhook_demo_svr:start(),
% emqx_ct_helpers:start_apps([emqx_exhook], fun set_special_cfgs/1),
% Cfg.
% end_per_suite(_Cfg) ->
% emqx_ct_helpers:stop_apps([emqx_exhook]),
% emqx_exhook_demo_svr:stop().
% set_special_cfgs(emqx) ->
% application:set_env(emqx, allow_anonymous, false),
% application:set_env(emqx, enable_authz_cache, false),
% application:set_env(emqx, plugins_loaded_file, undefined),
% application:set_env(emqx, modules_loaded_file, undefined);
% set_special_cfgs(emqx_exhook) ->
% ok.
% %%--------------------------------------------------------------------
% %% Test cases
% %%--------------------------------------------------------------------
% t_noserver_nohook(_) ->
% emqx_exhook:disable(default),
% ?assertEqual([], ets:tab2list(emqx_hooks)),
% Opts = proplists:get_value(
% default,
% application:get_env(emqx_exhook, servers, [])
% ),
% ok = emqx_exhook:enable(default, Opts),
% ?assertNotEqual([], ets:tab2list(emqx_hooks)).
% t_cli_list(_) ->
% meck_print(),
% ?assertEqual( [[emqx_exhook_server:format(Svr) || Svr <- emqx_exhook:list()]]
% , emqx_exhook_cli:cli(["server", "list"])
% ),
% unmeck_print().
% t_cli_enable_disable(_) ->
% meck_print(),
% ?assertEqual([already_started], emqx_exhook_cli:cli(["server", "enable", "default"])),
% ?assertEqual(ok, emqx_exhook_cli:cli(["server", "disable", "default"])),
% ?assertEqual([], emqx_exhook_cli:cli(["server", "list"])),
% ?assertEqual([not_running], emqx_exhook_cli:cli(["server", "disable", "default"])),
% ?assertEqual(ok, emqx_exhook_cli:cli(["server", "enable", "default"])),
% unmeck_print().
% t_cli_stats(_) ->
% meck_print(),
% _ = emqx_exhook_cli:cli(["server", "stats"]),
% _ = emqx_exhook_cli:cli(x),
% unmeck_print().
% %%--------------------------------------------------------------------
% %% Utils
% %%--------------------------------------------------------------------
% meck_print() ->
% meck:new(emqx_ctl, [passthrough, no_history, no_link]),
% meck:expect(emqx_ctl, print, fun(_) -> ok end),
% meck:expect(emqx_ctl, print, fun(_, Args) -> Args end).
% unmeck_print() ->
% meck:unload(emqx_ctl).

View File

@ -1,339 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_exhook_demo_svr).
% -behavior(emqx_exhook_v_1_hook_provider_bhvr).
% %%
% -export([ start/0
% , stop/0
% , take/0
% , in/1
% ]).
% %% gRPC server HookProvider callbacks
% -export([ on_provider_loaded/2
% , on_provider_unloaded/2
% , on_client_connect/2
% , on_client_connack/2
% , on_client_connected/2
% , on_client_disconnected/2
% , on_client_authenticate/2
% , on_client_authorize/2
% , on_client_subscribe/2
% , on_client_unsubscribe/2
% , on_session_created/2
% , on_session_subscribed/2
% , on_session_unsubscribed/2
% , on_session_resumed/2
% , on_session_discarded/2
% , on_session_takeovered/2
% , on_session_terminated/2
% , on_message_publish/2
% , on_message_delivered/2
% , on_message_dropped/2
% , on_message_acked/2
% ]).
% -define(PORT, 9000).
% -define(NAME, ?MODULE).
% %%--------------------------------------------------------------------
% %% Server APIs
% %%--------------------------------------------------------------------
% start() ->
% Pid = spawn(fun mngr_main/0),
% register(?MODULE, Pid),
% {ok, Pid}.
% stop() ->
% grpc:stop_server(?NAME),
% ?MODULE ! stop.
% take() ->
% ?MODULE ! {take, self()},
% receive {value, V} -> V
% after 5000 -> error(timeout) end.
% in({FunName, Req}) ->
% ?MODULE ! {in, FunName, Req}.
% mngr_main() ->
% application:ensure_all_started(grpc),
% Services = #{protos => [emqx_exhook_pb],
% services => #{'emqx.exhook.v1.HookProvider' => emqx_exhook_demo_svr}
% },
% Options = [],
% Svr = grpc:start_server(?NAME, ?PORT, Services, Options),
% mngr_loop([Svr, queue:new(), queue:new()]).
% mngr_loop([Svr, Q, Takes]) ->
% receive
% {in, FunName, Req} ->
% {NQ1, NQ2} = reply(queue:in({FunName, Req}, Q), Takes),
% mngr_loop([Svr, NQ1, NQ2]);
% {take, From} ->
% {NQ1, NQ2} = reply(Q, queue:in(From, Takes)),
% mngr_loop([Svr, NQ1, NQ2]);
% stop ->
% exit(normal)
% end.
% reply(Q1, Q2) ->
% case queue:len(Q1) =:= 0 orelse
% queue:len(Q2) =:= 0 of
% true -> {Q1, Q2};
% _ ->
% {{value, {Name, V}}, NQ1} = queue:out(Q1),
% {{value, From}, NQ2} = queue:out(Q2),
% From ! {value, {Name, V}},
% {NQ1, NQ2}
% end.
% %%--------------------------------------------------------------------
% %% callbacks
% %%--------------------------------------------------------------------
% -spec on_provider_loaded(emqx_exhook_pb:provider_loaded_request(), grpc:metadata())
% -> {ok, emqx_exhook_pb:loaded_response(), grpc:metadata()}
% | {error, grpc_cowboy_h:error_response()}.
% on_provider_loaded(Req, Md) ->
% ?MODULE:in({?FUNCTION_NAME, Req}),
% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
% {ok, #{hooks => [
% #{name => <<"client.connect">>},
% #{name => <<"client.connack">>},
% #{name => <<"client.connected">>},
% #{name => <<"client.disconnected">>},
% #{name => <<"client.authenticate">>},
% #{name => <<"client.authorize">>},
% #{name => <<"client.subscribe">>},
% #{name => <<"client.unsubscribe">>},
% #{name => <<"session.created">>},
% #{name => <<"session.subscribed">>},
% #{name => <<"session.unsubscribed">>},
% #{name => <<"session.resumed">>},
% #{name => <<"session.discarded">>},
% #{name => <<"session.takeovered">>},
% #{name => <<"session.terminated">>},
% #{name => <<"message.publish">>},
% #{name => <<"message.delivered">>},
% #{name => <<"message.acked">>},
% #{name => <<"message.dropped">>}]}, Md}.
% -spec on_provider_unloaded(emqx_exhook_pb:provider_unloaded_request(), grpc:metadata())
% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
% | {error, grpc_cowboy_h:error_response()}.
% on_provider_unloaded(Req, Md) ->
% ?MODULE:in({?FUNCTION_NAME, Req}),
% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
% {ok, #{}, Md}.
% -spec on_client_connect(emqx_exhook_pb:client_connect_request(), grpc:metadata())
% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
% | {error, grpc_cowboy_h:error_response()}.
% on_client_connect(Req, Md) ->
% ?MODULE:in({?FUNCTION_NAME, Req}),
% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
% {ok, #{}, Md}.
% -spec on_client_connack(emqx_exhook_pb:client_connack_request(), grpc:metadata())
% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
% | {error, grpc_cowboy_h:error_response()}.
% on_client_connack(Req, Md) ->
% ?MODULE:in({?FUNCTION_NAME, Req}),
% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
% {ok, #{}, Md}.
% -spec on_client_connected(emqx_exhook_pb:client_connected_request(), grpc:metadata())
% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
% | {error, grpc_cowboy_h:error_response()}.
% on_client_connected(Req, Md) ->
% ?MODULE:in({?FUNCTION_NAME, Req}),
% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
% {ok, #{}, Md}.
% -spec on_client_disconnected(emqx_exhook_pb:client_disconnected_request(), grpc:metadata())
% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
% | {error, grpc_cowboy_h:error_response()}.
% on_client_disconnected(Req, Md) ->
% ?MODULE:in({?FUNCTION_NAME, Req}),
% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
% {ok, #{}, Md}.
% -spec on_client_authenticate(emqx_exhook_pb:client_authenticate_request(), grpc:metadata())
% -> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()}
% | {error, grpc_cowboy_h:error_response()}.
% on_client_authenticate(#{clientinfo := #{username := Username}} = Req, Md) ->
% ?MODULE:in({?FUNCTION_NAME, Req}),
% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
% %% some cases for testing
% case Username of
% <<"baduser">> ->
% {ok, #{type => 'STOP_AND_RETURN',
% value => {bool_result, false}}, Md};
% <<"gooduser">> ->
% {ok, #{type => 'STOP_AND_RETURN',
% value => {bool_result, true}}, Md};
% <<"normaluser">> ->
% {ok, #{type => 'CONTINUE',
% value => {bool_result, true}}, Md};
% _ ->
% {ok, #{type => 'IGNORE'}, Md}
% end.
% -spec on_client_authorize(emqx_exhook_pb:client_authorize_request(), grpc:metadata())
% -> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()}
% | {error, grpc_cowboy_h:error_response()}.
% on_client_authorize(#{clientinfo := #{username := Username}} = Req, Md) ->
% ?MODULE:in({?FUNCTION_NAME, Req}),
% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
% %% some cases for testing
% case Username of
% <<"baduser">> ->
% {ok, #{type => 'STOP_AND_RETURN',
% value => {bool_result, false}}, Md};
% <<"gooduser">> ->
% {ok, #{type => 'STOP_AND_RETURN',
% value => {bool_result, true}}, Md};
% <<"normaluser">> ->
% {ok, #{type => 'CONTINUE',
% value => {bool_result, true}}, Md};
% _ ->
% {ok, #{type => 'IGNORE'}, Md}
% end.
% -spec on_client_subscribe(emqx_exhook_pb:client_subscribe_request(), grpc:metadata())
% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
% | {error, grpc_cowboy_h:error_response()}.
% on_client_subscribe(Req, Md) ->
% ?MODULE:in({?FUNCTION_NAME, Req}),
% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
% {ok, #{}, Md}.
% -spec on_client_unsubscribe(emqx_exhook_pb:client_unsubscribe_request(), grpc:metadata())
% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
% | {error, grpc_cowboy_h:error_response()}.
% on_client_unsubscribe(Req, Md) ->
% ?MODULE:in({?FUNCTION_NAME, Req}),
% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
% {ok, #{}, Md}.
% -spec on_session_created(emqx_exhook_pb:session_created_request(), grpc:metadata())
% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
% | {error, grpc_cowboy_h:error_response()}.
% on_session_created(Req, Md) ->
% ?MODULE:in({?FUNCTION_NAME, Req}),
% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
% {ok, #{}, Md}.
% -spec on_session_subscribed(emqx_exhook_pb:session_subscribed_request(), grpc:metadata())
% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
% | {error, grpc_cowboy_h:error_response()}.
% on_session_subscribed(Req, Md) ->
% ?MODULE:in({?FUNCTION_NAME, Req}),
% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
% {ok, #{}, Md}.
% -spec on_session_unsubscribed(emqx_exhook_pb:session_unsubscribed_request(), grpc:metadata())
% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
% | {error, grpc_cowboy_h:error_response()}.
% on_session_unsubscribed(Req, Md) ->
% ?MODULE:in({?FUNCTION_NAME, Req}),
% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
% {ok, #{}, Md}.
% -spec on_session_resumed(emqx_exhook_pb:session_resumed_request(), grpc:metadata())
% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
% | {error, grpc_cowboy_h:error_response()}.
% on_session_resumed(Req, Md) ->
% ?MODULE:in({?FUNCTION_NAME, Req}),
% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
% {ok, #{}, Md}.
% -spec on_session_discarded(emqx_exhook_pb:session_discarded_request(), grpc:metadata())
% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
% | {error, grpc_cowboy_h:error_response()}.
% on_session_discarded(Req, Md) ->
% ?MODULE:in({?FUNCTION_NAME, Req}),
% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
% {ok, #{}, Md}.
% -spec on_session_takeovered(emqx_exhook_pb:session_takeovered_request(), grpc:metadata())
% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
% | {error, grpc_cowboy_h:error_response()}.
% on_session_takeovered(Req, Md) ->
% ?MODULE:in({?FUNCTION_NAME, Req}),
% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
% {ok, #{}, Md}.
% -spec on_session_terminated(emqx_exhook_pb:session_terminated_request(), grpc:metadata())
% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
% | {error, grpc_cowboy_h:error_response()}.
% on_session_terminated(Req, Md) ->
% ?MODULE:in({?FUNCTION_NAME, Req}),
% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
% {ok, #{}, Md}.
% -spec on_message_publish(emqx_exhook_pb:message_publish_request(), grpc:metadata())
% -> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()}
% | {error, grpc_cowboy_h:error_response()}.
% on_message_publish(#{message := #{from := From} = Msg} = Req, Md) ->
% ?MODULE:in({?FUNCTION_NAME, Req}),
% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
% %% some cases for testing
% case From of
% <<"baduser">> ->
% NMsg = Msg#{qos => 0,
% topic => <<"">>,
% payload => <<"">>
% },
% {ok, #{type => 'STOP_AND_RETURN',
% value => {message, NMsg}}, Md};
% <<"gooduser">> ->
% NMsg = Msg#{topic => From,
% payload => From},
% {ok, #{type => 'STOP_AND_RETURN',
% value => {message, NMsg}}, Md};
% _ ->
% {ok, #{type => 'IGNORE'}, Md}
% end.
% -spec on_message_delivered(emqx_exhook_pb:message_delivered_request(), grpc:metadata())
% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
% | {error, grpc_cowboy_h:error_response()}.
% on_message_delivered(Req, Md) ->
% ?MODULE:in({?FUNCTION_NAME, Req}),
% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
% {ok, #{}, Md}.
% -spec on_message_dropped(emqx_exhook_pb:message_dropped_request(), grpc:metadata())
% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
% | {error, grpc_cowboy_h:error_response()}.
% on_message_dropped(Req, Md) ->
% ?MODULE:in({?FUNCTION_NAME, Req}),
% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
% {ok, #{}, Md}.
% -spec on_message_acked(emqx_exhook_pb:message_acked_request(), grpc:metadata())
% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
% | {error, grpc_cowboy_h:error_response()}.
% on_message_acked(Req, Md) ->
% ?MODULE:in({?FUNCTION_NAME, Req}),
% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
% {ok, #{}, Md}.

View File

@ -275,6 +275,7 @@ relx_apps(ReleaseType) ->
, emqx_authn
, emqx_authz
, emqx_gateway
, {emqx_exhook, load}
, emqx_data_bridge
, emqx_rule_engine
, emqx_rule_actions