diff --git a/apps/emqx_exhook/.gitignore b/apps/emqx_exhook/.gitignore new file mode 100644 index 000000000..da1f0db23 --- /dev/null +++ b/apps/emqx_exhook/.gitignore @@ -0,0 +1,29 @@ +.rebar3 +_* +.eunit +*.o +*.beam +*.plt +*.swp +*.swo +.erlang.cookie +ebin +log +erl_crash.dump +.rebar +logs +_build +.idea +*.iml +rebar3.crashdump +*~ +rebar.lock +data/ +*.conf.rendered +*.pyc +.DS_Store +*.class +Mnesia.nonode@nohost/ +src/emqx_exhook_pb.erl +src/emqx_exhook_v_1_hook_provider_client.erl +src/emqx_exhook_v_1_hook_provider_bhvr.erl diff --git a/apps/emqx_gateway/src/exhook/README.md b/apps/emqx_exhook/README.md similarity index 100% rename from apps/emqx_gateway/src/exhook/README.md rename to apps/emqx_exhook/README.md diff --git a/apps/emqx_exhook/docs/design-cn.md b/apps/emqx_exhook/docs/design-cn.md new file mode 100644 index 000000000..6686e96e3 --- /dev/null +++ b/apps/emqx_exhook/docs/design-cn.md @@ -0,0 +1,116 @@ +# 设计 + +## 动机 + +在 EMQ X Broker v4.1-v4.2 中,我们发布了 2 个插件来扩展 emqx 的编程能力: + +1. `emqx-extension-hook` 提供了使用 Java, Python 向 Broker 挂载钩子的功能 +2. `emqx-exproto` 提供了使用 Java,Python 编写用户自定义协议接入插件的功能 + +但在后续的支持中发现许多难以处理的问题: + +1. 有大量的编程语言需要支持,需要编写和维护如 Go, JavaScript, Lua.. 等语言的驱动。 +2. `erlport` 使用的操作系统的管道进行通信,这让用户代码只能部署在和 emqx 同一个操作系统上。部署方式受到了极大的限制。 +3. 用户程序的启动参数直接打包到 Broker 中,导致用户开发无法实时的进行调试,单步跟踪等。 +4. `erlport` 会占用 `stdin` `stdout`。 + +因此,我们计划重构这部分的实现,其中主要的内容是: +1. 使用 `gRPC` 替换 `erlport`。 +2. 将 `emqx-extension-hook` 重命名为 `emqx-exhook` + + +旧版本的设计:[emqx-extension-hook design in v4.2.0](https://github.com/emqx/emqx-exhook/blob/v4.2.0/docs/design.md) + +## 设计 + +架构如下: + +``` + EMQ X ++========================+ +========+==========+ +| ExHook | | | | +| +----------------+ | gRPC | gRPC | User's | +| | gRPC Client | ------------------> | Server | Codes | +| +----------------+ | (HTTP/2) | | | +| | | | | ++========================+ +========+==========+ +``` + +`emqx-exhook` 通过 gRPC 的方式向用户部署的 gRPC 服务发送钩子的请求,并处理其返回的值。 + + +和 emqx 原生的钩子一致,emqx-exhook 也按照链式的方式执行: + + + +### gRPC 服务示例 + +用户需要实现的方法,和数据类型的定义在 `priv/protos/exhook.proto` 文件中: + +```protobuff +syntax = "proto3"; + +package emqx.exhook.v1; + +service HookProvider { + + rpc OnProviderLoaded(ProviderLoadedRequest) returns (LoadedResponse) {}; + + rpc OnProviderUnloaded(ProviderUnloadedRequest) returns (EmptySuccess) {}; + + rpc OnClientConnect(ClientConnectRequest) returns (EmptySuccess) {}; + + rpc OnClientConnack(ClientConnackRequest) returns (EmptySuccess) {}; + + rpc OnClientConnected(ClientConnectedRequest) returns (EmptySuccess) {}; + + rpc OnClientDisconnected(ClientDisconnectedRequest) returns (EmptySuccess) {}; + + rpc OnClientAuthenticate(ClientAuthenticateRequest) returns (ValuedResponse) {}; + + rpc OnClientCheckAcl(ClientCheckAclRequest) returns (ValuedResponse) {}; + + rpc OnClientSubscribe(ClientSubscribeRequest) returns (EmptySuccess) {}; + + rpc OnClientUnsubscribe(ClientUnsubscribeRequest) returns (EmptySuccess) {}; + + rpc OnSessionCreated(SessionCreatedRequest) returns (EmptySuccess) {}; + + rpc OnSessionSubscribed(SessionSubscribedRequest) returns (EmptySuccess) {}; + + rpc OnSessionUnsubscribed(SessionUnsubscribedRequest) returns (EmptySuccess) {}; + + rpc OnSessionResumed(SessionResumedRequest) returns (EmptySuccess) {}; + + rpc OnSessionDiscarded(SessionDiscardedRequest) returns (EmptySuccess) {}; + + rpc OnSessionTakeovered(SessionTakeoveredRequest) returns (EmptySuccess) {}; + + rpc OnSessionTerminated(SessionTerminatedRequest) returns (EmptySuccess) {}; + + rpc OnMessagePublish(MessagePublishRequest) returns (ValuedResponse) {}; + + rpc OnMessageDelivered(MessageDeliveredRequest) returns (EmptySuccess) {}; + + rpc OnMessageDropped(MessageDroppedRequest) returns (EmptySuccess) {}; + + rpc OnMessageAcked(MessageAckedRequest) returns (EmptySuccess) {}; +} +``` + +### 配置文件示例 + +``` +## 配置 gRPC 服务地址 (HTTP) +## +## s1 为服务器的名称 +exhook.server.s1.url = http://127.0.0.1:9001 + +## 配置 gRPC 服务地址 (HTTPS) +## +## s2 为服务器名称 +exhook.server.s2.url = https://127.0.0.1:9002 +exhook.server.s2.cacertfile = ca.pem +exhook.server.s2.certfile = cert.pem +exhook.server.s2.keyfile = key.pem +``` diff --git a/apps/emqx_exhook/etc/emqx_exhook.conf b/apps/emqx_exhook/etc/emqx_exhook.conf new file mode 100644 index 000000000..3ed499e28 --- /dev/null +++ b/apps/emqx_exhook/etc/emqx_exhook.conf @@ -0,0 +1,14 @@ +##==================================================================== +## EMQ X Hooks +##==================================================================== + +exhook: { + server.default: { + url: "http://127.0.0.1:9000" + #ssl: { + # cacertfile: "{{ platform_etc_dir }}/certs/cacert.pem" + # certfile: "{{ platform_etc_dir }}/certs/cert.pem" + # keyfile: "{{ platform_etc_dir }}/certs/key.pem" + #} + } +} diff --git a/apps/emqx_gateway/src/exhook/include/emqx_exhook.hrl b/apps/emqx_exhook/include/emqx_exhook.hrl similarity index 96% rename from apps/emqx_gateway/src/exhook/include/emqx_exhook.hrl rename to apps/emqx_exhook/include/emqx_exhook.hrl index 64131735e..7301fdcbb 100644 --- a/apps/emqx_gateway/src/exhook/include/emqx_exhook.hrl +++ b/apps/emqx_exhook/include/emqx_exhook.hrl @@ -25,7 +25,7 @@ , {'client.connected', {emqx_exhook_handler, on_client_connected, []}} , {'client.disconnected', {emqx_exhook_handler, on_client_disconnected, []}} , {'client.authenticate', {emqx_exhook_handler, on_client_authenticate, []}} - , {'client.authorize', {emqx_exhook_handler, on_client_authorize, []}} + , {'client.check_acl', {emqx_exhook_handler, on_client_check_acl, []}} , {'client.subscribe', {emqx_exhook_handler, on_client_subscribe, []}} , {'client.unsubscribe', {emqx_exhook_handler, on_client_unsubscribe, []}} , {'session.created', {emqx_exhook_handler, on_session_created, []}} diff --git a/apps/emqx_exhook/priv/emqx_exhook.schema b/apps/emqx_exhook/priv/emqx_exhook.schema new file mode 100644 index 000000000..e5481a3dd --- /dev/null +++ b/apps/emqx_exhook/priv/emqx_exhook.schema @@ -0,0 +1,38 @@ +%%-*- mode: erlang -*- + +{mapping, "exhook.server.$name.url", "emqx_exhook.servers", [ + {datatype, string} +]}. + +{mapping, "exhook.server.$name.ssl.cacertfile", "emqx_exhook.servers", [ + {datatype, string} +]}. + +{mapping, "exhook.server.$name.ssl.certfile", "emqx_exhook.servers", [ + {datatype, string} +]}. + +{mapping, "exhook.server.$name.ssl.keyfile", "emqx_exhook.servers", [ + {datatype, string} +]}. + +{translation, "emqx_exhook.servers", fun(Conf) -> + Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, + ServerOptions = fun(Prefix) -> + case http_uri:parse(cuttlefish:conf_get(Prefix ++ ".url", Conf)) of + {ok, {http, _, Host, Port, _, _}} -> + [{scheme, http}, {host, Host}, {port, Port}]; + {ok, {https, _, Host, Port, _, _}} -> + [{scheme, https}, {host, Host}, {port, Port}, + {ssl_options, + Filter([{ssl, true}, + {certfile, cuttlefish:conf_get(Prefix ++ ".ssl.certfile", Conf, undefined)}, + {keyfile, cuttlefish:conf_get(Prefix ++ ".ssl.keyfile", Conf, undefined)}, + {cacertfile, cuttlefish:conf_get(Prefix ++ ".ssl.cacertfile", Conf, undefined)} + ])}]; + _ -> error(invalid_server_options) + end + end, + [{list_to_atom(Name), ServerOptions("exhook.server." ++ Name)} + || {["exhook", "server", Name, "url"], _} <- cuttlefish_variable:filter_by_prefix("exhook.server", Conf)] +end}. diff --git a/apps/emqx_gateway/etc/priv/exhook.proto b/apps/emqx_exhook/priv/protos/exhook.proto similarity index 99% rename from apps/emqx_gateway/etc/priv/exhook.proto rename to apps/emqx_exhook/priv/protos/exhook.proto index 97a011352..d8a8ef918 100644 --- a/apps/emqx_gateway/etc/priv/exhook.proto +++ b/apps/emqx_exhook/priv/protos/exhook.proto @@ -127,14 +127,14 @@ message ClientAuthorizeRequest { ClientInfo clientinfo = 1; - enum AuthzReqType { + enum AuthorizeReqType { PUBLISH = 0; SUBSCRIBE = 1; } - AuthzReqType type = 2; + AuthorizeReqType type = 2; string topic = 3; diff --git a/apps/emqx_exhook/rebar.config b/apps/emqx_exhook/rebar.config new file mode 100644 index 000000000..883aad9bd --- /dev/null +++ b/apps/emqx_exhook/rebar.config @@ -0,0 +1,49 @@ +%%-*- mode: erlang -*- +{plugins, + [rebar3_proper, + {grpc_plugin, {git, "https://github.com/HJianBo/grpc_plugin", {tag, "v0.10.2"}}} +]}. + +{deps, + [{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.2"}}} +]}. + +{grpc, + [{protos, ["priv/protos"]}, + {gpb_opts, [{module_name_prefix, "emqx_"}, + {module_name_suffix, "_pb"}]} +]}. + +{provider_hooks, + [{pre, [{compile, {grpc, gen}}, + {clean, {grpc, clean}}]} +]}. + +{edoc_opts, [{preprocess, true}]}. + +{erl_opts, [warn_unused_vars, + warn_shadow_vars, + warn_unused_import, + warn_obsolete_guard, + debug_info, + {parse_transform}]}. + +{xref_checks, [undefined_function_calls, undefined_functions, + locals_not_used, deprecated_function_calls, + warnings_as_errors, deprecated_functions]}. +{xref_ignores, [emqx_exhook_pb]}. + +{cover_enabled, true}. +{cover_opts, [verbose]}. +{cover_export_enabled, true}. +{cover_excl_mods, [emqx_exhook_pb, + emqx_exhook_v_1_hook_provider_bhvr, + emqx_exhook_v_1_hook_provider_client]}. + +{profiles, + [{test, + [{deps, + [{emqx_ct_helper, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "v1.3.1"}}} + ]} + ]} +]}. diff --git a/apps/emqx_gateway/src/exhook/emqx_exhook.app.src b/apps/emqx_exhook/src/emqx_exhook.app.src similarity index 94% rename from apps/emqx_gateway/src/exhook/emqx_exhook.app.src rename to apps/emqx_exhook/src/emqx_exhook.app.src index e703cfe5e..39c408e20 100644 --- a/apps/emqx_gateway/src/exhook/emqx_exhook.app.src +++ b/apps/emqx_exhook/src/emqx_exhook.app.src @@ -1,6 +1,6 @@ {application, emqx_exhook, [{description, "EMQ X Extension for Hook"}, - {vsn, "4.3.2"}, + {vsn, "4.3.3"}, {modules, []}, {registered, []}, {mod, {emqx_exhook_app, []}}, diff --git a/apps/emqx_exhook/src/emqx_exhook.appup.src b/apps/emqx_exhook/src/emqx_exhook.appup.src new file mode 100644 index 000000000..1da31dbbc --- /dev/null +++ b/apps/emqx_exhook/src/emqx_exhook.appup.src @@ -0,0 +1,33 @@ +%% -*-: erlang -*- +{VSN, + [ + {"4.3.2", [ + {load_module, emqx_exhook_app, brutal_purge, soft_purge, []} + ]}, + {"4.3.1", [ + {load_module, emqx_exhook_app, brutal_purge, soft_purge, []}, + {load_module, emqx_exhook_server, brutal_purge, soft_purge, []} + ]}, + {"4.3.0", [ + {load_module, emqx_exhook_app, brutal_purge, soft_purge, []}, + {load_module, emqx_exhook_pb, brutal_purge, soft_purge, []}, + {load_module, emqx_exhook_server, brutal_purge, soft_purge, []} + ]}, + {<<".*">>, []} + ], + [ + {"4.3.2", [ + {load_module, emqx_exhook_app, brutal_purge, soft_purge, []} + ]}, + {"4.3.1", [ + {load_module, emqx_exhook_app, brutal_purge, soft_purge, []}, + {load_module, emqx_exhook_server, brutal_purge, soft_purge, []} + ]}, + {"4.3.0", [ + {load_module, emqx_exhook_app, brutal_purge, soft_purge, []}, + {load_module, emqx_exhook_pb, brutal_purge, soft_purge, []}, + {load_module, emqx_exhook_server, brutal_purge, soft_purge, []} + ]}, + {<<".*">>, []} + ] +}. diff --git a/apps/emqx_gateway/src/exhook/emqx_exhook.erl b/apps/emqx_exhook/src/emqx_exhook.erl similarity index 95% rename from apps/emqx_gateway/src/exhook/emqx_exhook.erl rename to apps/emqx_exhook/src/emqx_exhook.erl index 3d977d2f1..e993858cf 100644 --- a/apps/emqx_gateway/src/exhook/emqx_exhook.erl +++ b/apps/emqx_exhook/src/emqx_exhook.erl @@ -16,7 +16,7 @@ -module(emqx_exhook). --include("src/exhook/include/emqx_exhook.hrl"). +-include("emqx_exhook.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -40,13 +40,13 @@ list() -> [server(Name) || Name <- running()]. --spec enable(atom()|string(), list()) -> ok | {error, term()}. -enable(Name, Opts) -> +-spec enable(atom()|string(), map()) -> ok | {error, term()}. +enable(Name, Options) -> case lists:member(Name, running()) of true -> {error, already_started}; _ -> - case emqx_exhook_server:load(Name, Opts) of + case emqx_exhook_server:load(Name, Options) of {ok, ServiceState} -> save(Name, ServiceState); {error, Reason} -> diff --git a/apps/emqx_gateway/src/exhook/emqx_exhook_app.erl b/apps/emqx_exhook/src/emqx_exhook_app.erl similarity index 94% rename from apps/emqx_gateway/src/exhook/emqx_exhook_app.erl rename to apps/emqx_exhook/src/emqx_exhook_app.erl index d4621f85e..b057ce463 100644 --- a/apps/emqx_gateway/src/exhook/emqx_exhook_app.erl +++ b/apps/emqx_exhook/src/emqx_exhook_app.erl @@ -18,7 +18,7 @@ -behaviour(application). --include("src/exhook/include/emqx_exhook.hrl"). +-include("emqx_exhook.hrl"). -emqx_plugin(extension). @@ -67,9 +67,10 @@ stop(_State) -> %%-------------------------------------------------------------------- load_all_servers() -> - lists:foreach(fun({Name, Options}) -> + _ = maps:map(fun(Name, Options) -> load_server(Name, Options) - end, application:get_env(?APP, servers, [])). + end, emqx_config:get([exhook, server])), + ok. unload_all_servers() -> emqx_exhook:disable_all(). diff --git a/apps/emqx_gateway/src/exhook/emqx_exhook_cli.erl b/apps/emqx_exhook/src/emqx_exhook_cli.erl similarity index 98% rename from apps/emqx_gateway/src/exhook/emqx_exhook_cli.erl rename to apps/emqx_exhook/src/emqx_exhook_cli.erl index efce962d9..a8dc43b16 100644 --- a/apps/emqx_gateway/src/exhook/emqx_exhook_cli.erl +++ b/apps/emqx_exhook/src/emqx_exhook_cli.erl @@ -16,7 +16,7 @@ -module(emqx_exhook_cli). --include("src/exhook/include/emqx_exhook.hrl"). +-include("emqx_exhook.hrl"). -export([cli/1]). diff --git a/apps/emqx_gateway/src/exhook/emqx_exhook_handler.erl b/apps/emqx_exhook/src/emqx_exhook_handler.erl similarity index 99% rename from apps/emqx_gateway/src/exhook/emqx_exhook_handler.erl rename to apps/emqx_exhook/src/emqx_exhook_handler.erl index 2bb2dc7f5..d2fb84bab 100644 --- a/apps/emqx_gateway/src/exhook/emqx_exhook_handler.erl +++ b/apps/emqx_exhook/src/emqx_exhook_handler.erl @@ -16,7 +16,7 @@ -module(emqx_exhook_handler). --include("src/exhook/include/emqx_exhook.hrl"). +-include("emqx_exhook.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -87,6 +87,7 @@ on_client_disconnected(ClientInfo, Reason, _ConnInfo) -> }, cast('client.disconnected', Req). +%% FIXME: `AuthResult` on_client_authenticate(ClientInfo, AuthResult) -> %% XXX: Bool is missing more information about the atom of the result %% So, the `Req` has missed detailed info too. diff --git a/apps/emqx_exhook/src/emqx_exhook_schema.erl b/apps/emqx_exhook/src/emqx_exhook_schema.erl new file mode 100644 index 000000000..867c8e6df --- /dev/null +++ b/apps/emqx_exhook/src/emqx_exhook_schema.erl @@ -0,0 +1,62 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exhook_schema). + +-dialyzer(no_return). +-dialyzer(no_match). +-dialyzer(no_contracts). +-dialyzer(no_unused). +-dialyzer(no_fail_call). + +-include_lib("typerefl/include/types.hrl"). + +-behaviour(hocon_schema). + +-export([structs/0, fields/1]). +-export([t/1, t/3, t/4, ref/1]). + +structs() -> [server]. + +fields(server) -> + [{"$name", t(ref(server_structs))}]; + +fields(server_structs) -> + [ {url, t(string(), "emqx_exhook.url", "")} + , {ssl, t(ref(ssl_conf_group))} + ]; + +fields(ssl_conf_group) -> + [ {cacertfile, string()} + , {certfile, string()} + , {keyfile, string()} + ]. + +%% types + +t(Type) -> #{type => Type}. + +t(Type, Mapping, Default) -> + hoconsc:t(Type, #{mapping => Mapping, default => Default}). + +t(Type, Mapping, Default, OverrideEnv) -> + hoconsc:t(Type, #{ mapping => Mapping + , default => Default + , override_env => OverrideEnv + }). + +ref(Field) -> + hoconsc:ref(?MODULE, Field). diff --git a/apps/emqx_gateway/src/exhook/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl similarity index 90% rename from apps/emqx_gateway/src/exhook/emqx_exhook_server.erl rename to apps/emqx_exhook/src/emqx_exhook_server.erl index 79bb9d32b..5bda76d13 100644 --- a/apps/emqx_gateway/src/exhook/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -16,7 +16,7 @@ -module(emqx_exhook_server). --include("src/exhook/include/emqx_exhook.hrl"). +-include("emqx_exhook.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -74,13 +74,17 @@ -export_type([server/0]). +-type options() :: #{ url := uri_string:uri_string() + , ssl => map() + }. + -dialyzer({nowarn_function, [inc_metrics/2]}). %%-------------------------------------------------------------------- %% Load/Unload APIs %%-------------------------------------------------------------------- --spec load(atom(), list()) -> {ok, server()} | {error, term()} . +-spec load(atom(), options()) -> {ok, server()} | {error, term()} . load(Name0, Opts0) -> Name = to_list(Name0), {SvrAddr, ClientOpts} = channel_opts(Opts0), @@ -117,20 +121,27 @@ to_list(Name) when is_list(Name) -> Name. %% @private -channel_opts(Opts) -> - Scheme = proplists:get_value(scheme, Opts), - Host = proplists:get_value(host, Opts), - Port = proplists:get_value(port, Opts), - SvrAddr = format_http_uri(Scheme, Host, Port), - ClientOpts = case Scheme of - https -> - SslOpts = lists:keydelete(ssl, 1, proplists:get_value(ssl_options, Opts, [])), - #{gun_opts => - #{transport => ssl, - transport_opts => SslOpts}}; - _ -> #{} - end, - {SvrAddr, ClientOpts}. +channel_opts(Opts = #{url := URL}) -> + io:format("~p~n", [Opts]), + case uri_string:parse(URL) of + #{scheme := <<"http">>, host := Host, port := Port} -> + {format_http_uri("http", Host, Port), #{}}; + #{scheme := <<"https">>, host := Host, port := Port} -> + SslOpts = + case maps:get(ssl, Opts, undefined) of + undefined -> []; + MapOpts -> + filter( + [{cacertfile, maps:get(cacertfile, MapOpts, undefined)}, + {certfile, maps:get(certfile, MapOpts, undefined)}, + {keyfile, maps:get(keyfile, MapOpts, undefined)} + ]) + end, + {format_http_uri("https", Host, Port), + #{gun_opts => #{transport => ssl, transport_opts => SslOpts}}}; + _ -> + error(bad_server_url) + end. format_http_uri(Scheme, Host0, Port) -> Host = case is_tuple(Host0) of @@ -139,6 +150,9 @@ format_http_uri(Scheme, Host0, Port) -> end, lists:flatten(io_lib:format("~s://~s:~w", [Scheme, Host, Port])). +filter(Ls) -> + [ E || E <- Ls, E /= undefined]. + -spec unload(server()) -> ok. unload(#server{name = Name, hookspec = HookSpecs}) -> _ = do_deinit(Name), diff --git a/apps/emqx_gateway/src/exhook/emqx_exhook_sup.erl b/apps/emqx_exhook/src/emqx_exhook_sup.erl similarity index 100% rename from apps/emqx_gateway/src/exhook/emqx_exhook_sup.erl rename to apps/emqx_exhook/src/emqx_exhook_sup.erl diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl new file mode 100644 index 000000000..5d5a396a5 --- /dev/null +++ b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl @@ -0,0 +1,96 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exhook_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +%%-------------------------------------------------------------------- +%% Setups +%%-------------------------------------------------------------------- + +all() -> emqx_ct:all(?MODULE). + +init_per_suite(Cfg) -> + _ = emqx_exhook_demo_svr:start(), + emqx_ct_helpers:start_apps([emqx_exhook], fun set_special_cfgs/1), + Cfg. + +end_per_suite(_Cfg) -> + emqx_ct_helpers:stop_apps([emqx_exhook]), + emqx_exhook_demo_svr:stop(). + +set_special_cfgs(emqx) -> + application:set_env(emqx, allow_anonymous, false), + application:set_env(emqx, enable_acl_cache, false), + application:set_env(emqx, plugins_loaded_file, undefined), + application:set_env(emqx, modules_loaded_file, undefined); +set_special_cfgs(emqx_exhook) -> + ok. + +%%-------------------------------------------------------------------- +%% Test cases +%%-------------------------------------------------------------------- + +t_noserver_nohook(_) -> + emqx_exhook:disable(default), + ?assertEqual([], ets:tab2list(emqx_hooks)), + + Opts = proplists:get_value( + default, + application:get_env(emqx_exhook, servers, []) + ), + ok = emqx_exhook:enable(default, Opts), + ?assertNotEqual([], ets:tab2list(emqx_hooks)). + +t_cli_list(_) -> + meck_print(), + ?assertEqual( [[emqx_exhook_server:format(Svr) || Svr <- emqx_exhook:list()]] + , emqx_exhook_cli:cli(["server", "list"]) + ), + unmeck_print(). + +t_cli_enable_disable(_) -> + meck_print(), + ?assertEqual([already_started], emqx_exhook_cli:cli(["server", "enable", "default"])), + ?assertEqual(ok, emqx_exhook_cli:cli(["server", "disable", "default"])), + ?assertEqual([], emqx_exhook_cli:cli(["server", "list"])), + + ?assertEqual([not_running], emqx_exhook_cli:cli(["server", "disable", "default"])), + ?assertEqual(ok, emqx_exhook_cli:cli(["server", "enable", "default"])), + unmeck_print(). + +t_cli_stats(_) -> + meck_print(), + _ = emqx_exhook_cli:cli(["server", "stats"]), + _ = emqx_exhook_cli:cli(x), + unmeck_print(). + +%%-------------------------------------------------------------------- +%% Utils +%%-------------------------------------------------------------------- + +meck_print() -> + meck:new(emqx_ctl, [passthrough, no_history, no_link]), + meck:expect(emqx_ctl, print, fun(_) -> ok end), + meck:expect(emqx_ctl, print, fun(_, Args) -> Args end). + +unmeck_print() -> + meck:unload(emqx_ctl). diff --git a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl new file mode 100644 index 000000000..c2db04dd4 --- /dev/null +++ b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl @@ -0,0 +1,339 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exhook_demo_svr). + +-behavior(emqx_exhook_v_1_hook_provider_bhvr). + +%% +-export([ start/0 + , stop/0 + , take/0 + , in/1 + ]). + +%% gRPC server HookProvider callbacks +-export([ on_provider_loaded/2 + , on_provider_unloaded/2 + , on_client_connect/2 + , on_client_connack/2 + , on_client_connected/2 + , on_client_disconnected/2 + , on_client_authenticate/2 + , on_client_check_acl/2 + , on_client_subscribe/2 + , on_client_unsubscribe/2 + , on_session_created/2 + , on_session_subscribed/2 + , on_session_unsubscribed/2 + , on_session_resumed/2 + , on_session_discarded/2 + , on_session_takeovered/2 + , on_session_terminated/2 + , on_message_publish/2 + , on_message_delivered/2 + , on_message_dropped/2 + , on_message_acked/2 + ]). + +-define(PORT, 9000). +-define(NAME, ?MODULE). + +%%-------------------------------------------------------------------- +%% Server APIs +%%-------------------------------------------------------------------- + +start() -> + Pid = spawn(fun mngr_main/0), + register(?MODULE, Pid), + {ok, Pid}. + +stop() -> + grpc:stop_server(?NAME), + ?MODULE ! stop. + +take() -> + ?MODULE ! {take, self()}, + receive {value, V} -> V + after 5000 -> error(timeout) end. + +in({FunName, Req}) -> + ?MODULE ! {in, FunName, Req}. + +mngr_main() -> + application:ensure_all_started(grpc), + Services = #{protos => [emqx_exhook_pb], + services => #{'emqx.exhook.v1.HookProvider' => emqx_exhook_demo_svr} + }, + Options = [], + Svr = grpc:start_server(?NAME, ?PORT, Services, Options), + mngr_loop([Svr, queue:new(), queue:new()]). + +mngr_loop([Svr, Q, Takes]) -> + receive + {in, FunName, Req} -> + {NQ1, NQ2} = reply(queue:in({FunName, Req}, Q), Takes), + mngr_loop([Svr, NQ1, NQ2]); + {take, From} -> + {NQ1, NQ2} = reply(Q, queue:in(From, Takes)), + mngr_loop([Svr, NQ1, NQ2]); + stop -> + exit(normal) + end. + +reply(Q1, Q2) -> + case queue:len(Q1) =:= 0 orelse + queue:len(Q2) =:= 0 of + true -> {Q1, Q2}; + _ -> + {{value, {Name, V}}, NQ1} = queue:out(Q1), + {{value, From}, NQ2} = queue:out(Q2), + From ! {value, {Name, V}}, + {NQ1, NQ2} + end. + +%%-------------------------------------------------------------------- +%% callbacks +%%-------------------------------------------------------------------- + +-spec on_provider_loaded(emqx_exhook_pb:provider_loaded_request(), grpc:metadata()) + -> {ok, emqx_exhook_pb:loaded_response(), grpc:metadata()} + | {error, grpc_cowboy_h:error_response()}. + +on_provider_loaded(Req, Md) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{hooks => [ + #{name => <<"client.connect">>}, + #{name => <<"client.connack">>}, + #{name => <<"client.connected">>}, + #{name => <<"client.disconnected">>}, + #{name => <<"client.authenticate">>}, + #{name => <<"client.check_acl">>}, + #{name => <<"client.subscribe">>}, + #{name => <<"client.unsubscribe">>}, + #{name => <<"session.created">>}, + #{name => <<"session.subscribed">>}, + #{name => <<"session.unsubscribed">>}, + #{name => <<"session.resumed">>}, + #{name => <<"session.discarded">>}, + #{name => <<"session.takeovered">>}, + #{name => <<"session.terminated">>}, + #{name => <<"message.publish">>}, + #{name => <<"message.delivered">>}, + #{name => <<"message.acked">>}, + #{name => <<"message.dropped">>}]}, Md}. +-spec on_provider_unloaded(emqx_exhook_pb:provider_unloaded_request(), grpc:metadata()) + -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} + | {error, grpc_cowboy_h:error_response()}. +on_provider_unloaded(Req, Md) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Md}. + +-spec on_client_connect(emqx_exhook_pb:client_connect_request(), grpc:metadata()) + -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} + | {error, grpc_cowboy_h:error_response()}. +on_client_connect(Req, Md) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Md}. + +-spec on_client_connack(emqx_exhook_pb:client_connack_request(), grpc:metadata()) + -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} + | {error, grpc_cowboy_h:error_response()}. +on_client_connack(Req, Md) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Md}. + +-spec on_client_connected(emqx_exhook_pb:client_connected_request(), grpc:metadata()) + -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} + | {error, grpc_cowboy_h:error_response()}. +on_client_connected(Req, Md) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Md}. + +-spec on_client_disconnected(emqx_exhook_pb:client_disconnected_request(), grpc:metadata()) + -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} + | {error, grpc_cowboy_h:error_response()}. +on_client_disconnected(Req, Md) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Md}. + +-spec on_client_authenticate(emqx_exhook_pb:client_authenticate_request(), grpc:metadata()) + -> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()} + | {error, grpc_cowboy_h:error_response()}. +on_client_authenticate(#{clientinfo := #{username := Username}} = Req, Md) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + %% some cases for testing + case Username of + <<"baduser">> -> + {ok, #{type => 'STOP_AND_RETURN', + value => {bool_result, false}}, Md}; + <<"gooduser">> -> + {ok, #{type => 'STOP_AND_RETURN', + value => {bool_result, true}}, Md}; + <<"normaluser">> -> + {ok, #{type => 'CONTINUE', + value => {bool_result, true}}, Md}; + _ -> + {ok, #{type => 'IGNORE'}, Md} + end. + +-spec on_client_check_acl(emqx_exhook_pb:client_check_acl_request(), grpc:metadata()) + -> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()} + | {error, grpc_cowboy_h:error_response()}. +on_client_check_acl(#{clientinfo := #{username := Username}} = Req, Md) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + %% some cases for testing + case Username of + <<"baduser">> -> + {ok, #{type => 'STOP_AND_RETURN', + value => {bool_result, false}}, Md}; + <<"gooduser">> -> + {ok, #{type => 'STOP_AND_RETURN', + value => {bool_result, true}}, Md}; + <<"normaluser">> -> + {ok, #{type => 'CONTINUE', + value => {bool_result, true}}, Md}; + _ -> + {ok, #{type => 'IGNORE'}, Md} + end. + +-spec on_client_subscribe(emqx_exhook_pb:client_subscribe_request(), grpc:metadata()) + -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} + | {error, grpc_cowboy_h:error_response()}. +on_client_subscribe(Req, Md) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Md}. + +-spec on_client_unsubscribe(emqx_exhook_pb:client_unsubscribe_request(), grpc:metadata()) + -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} + | {error, grpc_cowboy_h:error_response()}. +on_client_unsubscribe(Req, Md) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Md}. + +-spec on_session_created(emqx_exhook_pb:session_created_request(), grpc:metadata()) + -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} + | {error, grpc_cowboy_h:error_response()}. +on_session_created(Req, Md) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Md}. + +-spec on_session_subscribed(emqx_exhook_pb:session_subscribed_request(), grpc:metadata()) + -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} + | {error, grpc_cowboy_h:error_response()}. +on_session_subscribed(Req, Md) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Md}. + +-spec on_session_unsubscribed(emqx_exhook_pb:session_unsubscribed_request(), grpc:metadata()) + -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} + | {error, grpc_cowboy_h:error_response()}. +on_session_unsubscribed(Req, Md) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Md}. + +-spec on_session_resumed(emqx_exhook_pb:session_resumed_request(), grpc:metadata()) + -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} + | {error, grpc_cowboy_h:error_response()}. +on_session_resumed(Req, Md) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Md}. + +-spec on_session_discarded(emqx_exhook_pb:session_discarded_request(), grpc:metadata()) + -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} + | {error, grpc_cowboy_h:error_response()}. +on_session_discarded(Req, Md) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Md}. + +-spec on_session_takeovered(emqx_exhook_pb:session_takeovered_request(), grpc:metadata()) + -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} + | {error, grpc_cowboy_h:error_response()}. +on_session_takeovered(Req, Md) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Md}. + +-spec on_session_terminated(emqx_exhook_pb:session_terminated_request(), grpc:metadata()) + -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} + | {error, grpc_cowboy_h:error_response()}. +on_session_terminated(Req, Md) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Md}. + +-spec on_message_publish(emqx_exhook_pb:message_publish_request(), grpc:metadata()) + -> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()} + | {error, grpc_cowboy_h:error_response()}. +on_message_publish(#{message := #{from := From} = Msg} = Req, Md) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + %% some cases for testing + case From of + <<"baduser">> -> + NMsg = Msg#{qos => 0, + topic => <<"">>, + payload => <<"">> + }, + {ok, #{type => 'STOP_AND_RETURN', + value => {message, NMsg}}, Md}; + <<"gooduser">> -> + NMsg = Msg#{topic => From, + payload => From}, + {ok, #{type => 'STOP_AND_RETURN', + value => {message, NMsg}}, Md}; + _ -> + {ok, #{type => 'IGNORE'}, Md} + end. + +-spec on_message_delivered(emqx_exhook_pb:message_delivered_request(), grpc:metadata()) + -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} + | {error, grpc_cowboy_h:error_response()}. +on_message_delivered(Req, Md) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Md}. + +-spec on_message_dropped(emqx_exhook_pb:message_dropped_request(), grpc:metadata()) + -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} + | {error, grpc_cowboy_h:error_response()}. +on_message_dropped(Req, Md) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Md}. + +-spec on_message_acked(emqx_exhook_pb:message_acked_request(), grpc:metadata()) + -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} + | {error, grpc_cowboy_h:error_response()}. +on_message_acked(Req, Md) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Md}. diff --git a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl new file mode 100644 index 000000000..24f45c8b0 --- /dev/null +++ b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl @@ -0,0 +1,531 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(prop_exhook_hooks). + +-include_lib("proper/include/proper.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-import(emqx_ct_proper_types, + [ conninfo/0 + , clientinfo/0 + , sessioninfo/0 + , message/0 + , connack_return_code/0 + , topictab/0 + , topic/0 + , subopts/0 + ]). + +-define(ALL(Vars, Types, Exprs), + ?SETUP(fun() -> + State = do_setup(), + fun() -> do_teardown(State) end + end, ?FORALL(Vars, Types, Exprs))). + +%%-------------------------------------------------------------------- +%% Properties +%%-------------------------------------------------------------------- + +prop_client_connect() -> + ?ALL({ConnInfo, ConnProps}, + {conninfo(), conn_properties()}, + begin + ok = emqx_hooks:run('client.connect', [ConnInfo, ConnProps]), + {'on_client_connect', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{props => properties(ConnProps), + conninfo => from_conninfo(ConnInfo) + }, + ?assertEqual(Expected, Resp), + true + end). + +prop_client_connack() -> + ?ALL({ConnInfo, Rc, AckProps}, + {conninfo(), connack_return_code(), ack_properties()}, + begin + ok = emqx_hooks:run('client.connack', [ConnInfo, Rc, AckProps]), + {'on_client_connack', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{props => properties(AckProps), + result_code => atom_to_binary(Rc, utf8), + conninfo => from_conninfo(ConnInfo) + }, + ?assertEqual(Expected, Resp), + true + end). + +prop_client_authenticate() -> + ?ALL({ClientInfo0, AuthResult}, + {clientinfo(), authresult()}, + begin + ClientInfo = inject_magic_into(username, ClientInfo0), + OutAuthResult = emqx_hooks:run_fold('client.authenticate', [ClientInfo], AuthResult), + ExpectedAuthResult = case maps:get(username, ClientInfo) of + <<"baduser">> -> + AuthResult#{ + auth_result => not_authorized, + anonymous => false}; + <<"gooduser">> -> + AuthResult#{ + auth_result => success, + anonymous => false}; + <<"normaluser">> -> + AuthResult#{ + auth_result => success, + anonymous => false}; + _ -> + case maps:get(auth_result, AuthResult) of + success -> + #{auth_result => success, + anonymous => false}; + _ -> + #{auth_result => not_authorized, + anonymous => false} + end + end, + ?assertEqual(ExpectedAuthResult, OutAuthResult), + + {'on_client_authenticate', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{result => authresult_to_bool(AuthResult), + clientinfo => from_clientinfo(ClientInfo) + }, + ?assertEqual(Expected, Resp), + true + end). + +prop_client_check_acl() -> + ?ALL({ClientInfo0, PubSub, Topic, Result}, + {clientinfo(), oneof([publish, subscribe]), + topic(), oneof([allow, deny])}, + begin + ClientInfo = inject_magic_into(username, ClientInfo0), + OutResult = emqx_hooks:run_fold( + 'client.check_acl', + [ClientInfo, PubSub, Topic], + Result), + ExpectedOutResult = case maps:get(username, ClientInfo) of + <<"baduser">> -> deny; + <<"gooduser">> -> allow; + <<"normaluser">> -> allow; + _ -> Result + end, + ?assertEqual(ExpectedOutResult, OutResult), + + {'on_client_check_acl', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{result => aclresult_to_bool(Result), + type => pubsub_to_enum(PubSub), + topic => Topic, + clientinfo => from_clientinfo(ClientInfo) + }, + ?assertEqual(Expected, Resp), + true + end). + +prop_client_connected() -> + ?ALL({ClientInfo, ConnInfo}, + {clientinfo(), conninfo()}, + begin + ok = emqx_hooks:run('client.connected', [ClientInfo, ConnInfo]), + {'on_client_connected', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{clientinfo => from_clientinfo(ClientInfo) + }, + ?assertEqual(Expected, Resp), + true + end). + +prop_client_disconnected() -> + ?ALL({ClientInfo, Reason, ConnInfo}, + {clientinfo(), shutdown_reason(), conninfo()}, + begin + ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]), + {'on_client_disconnected', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{reason => stringfy(Reason), + clientinfo => from_clientinfo(ClientInfo) + }, + ?assertEqual(Expected, Resp), + true + end). + +prop_client_subscribe() -> + ?ALL({ClientInfo, SubProps, TopicTab}, + {clientinfo(), sub_properties(), topictab()}, + begin + ok = emqx_hooks:run('client.subscribe', [ClientInfo, SubProps, TopicTab]), + {'on_client_subscribe', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{props => properties(SubProps), + topic_filters => topicfilters(TopicTab), + clientinfo => from_clientinfo(ClientInfo) + }, + ?assertEqual(Expected, Resp), + true + end). + +prop_client_unsubscribe() -> + ?ALL({ClientInfo, UnSubProps, TopicTab}, + {clientinfo(), unsub_properties(), topictab()}, + begin + ok = emqx_hooks:run('client.unsubscribe', [ClientInfo, UnSubProps, TopicTab]), + {'on_client_unsubscribe', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{props => properties(UnSubProps), + topic_filters => topicfilters(TopicTab), + clientinfo => from_clientinfo(ClientInfo) + }, + ?assertEqual(Expected, Resp), + true + end). + +prop_session_created() -> + ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()}, + begin + ok = emqx_hooks:run('session.created', [ClientInfo, SessInfo]), + {'on_session_created', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{clientinfo => from_clientinfo(ClientInfo) + }, + ?assertEqual(Expected, Resp), + true + end). + +prop_session_subscribed() -> + ?ALL({ClientInfo, Topic, SubOpts}, + {clientinfo(), topic(), subopts()}, + begin + ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]), + {'on_session_subscribed', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{topic => Topic, + subopts => subopts(SubOpts), + clientinfo => from_clientinfo(ClientInfo) + }, + ?assertEqual(Expected, Resp), + true + end). + +prop_session_unsubscribed() -> + ?ALL({ClientInfo, Topic, SubOpts}, + {clientinfo(), topic(), subopts()}, + begin + ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, Topic, SubOpts]), + {'on_session_unsubscribed', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{topic => Topic, + clientinfo => from_clientinfo(ClientInfo) + }, + ?assertEqual(Expected, Resp), + true + end). + +prop_session_resumed() -> + ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()}, + begin + ok = emqx_hooks:run('session.resumed', [ClientInfo, SessInfo]), + {'on_session_resumed', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{clientinfo => from_clientinfo(ClientInfo) + }, + ?assertEqual(Expected, Resp), + true + end). + +prop_session_discared() -> + ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()}, + begin + ok = emqx_hooks:run('session.discarded', [ClientInfo, SessInfo]), + {'on_session_discarded', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{clientinfo => from_clientinfo(ClientInfo) + }, + ?assertEqual(Expected, Resp), + true + end). + +prop_session_takeovered() -> + ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()}, + begin + ok = emqx_hooks:run('session.takeovered', [ClientInfo, SessInfo]), + {'on_session_takeovered', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{clientinfo => from_clientinfo(ClientInfo) + }, + ?assertEqual(Expected, Resp), + true + end). + +prop_session_terminated() -> + ?ALL({ClientInfo, Reason, SessInfo}, + {clientinfo(), shutdown_reason(), sessioninfo()}, + begin + ok = emqx_hooks:run('session.terminated', [ClientInfo, Reason, SessInfo]), + {'on_session_terminated', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{reason => stringfy(Reason), + clientinfo => from_clientinfo(ClientInfo) + }, + ?assertEqual(Expected, Resp), + true + end). + +prop_message_publish() -> + ?ALL(Msg0, message(), + begin + Msg = emqx_message:from_map( + inject_magic_into(from, emqx_message:to_map(Msg0))), + OutMsg= emqx_hooks:run_fold('message.publish', [], Msg), + case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of + true -> + ?assertEqual(Msg, OutMsg), + skip; + _ -> + ExpectedOutMsg = case emqx_message:from(Msg) of + <<"baduser">> -> + MsgMap = emqx_message:to_map(Msg), + emqx_message:from_map( + MsgMap#{qos => 0, + topic => <<"">>, + payload => <<"">> + }); + <<"gooduser">> = From -> + MsgMap = emqx_message:to_map(Msg), + emqx_message:from_map( + MsgMap#{topic => From, + payload => From + }); + _ -> Msg + end, + ?assertEqual(ExpectedOutMsg, OutMsg), + + {'on_message_publish', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{message => from_message(Msg) + }, + ?assertEqual(Expected, Resp) + end, + true + end). + +prop_message_dropped() -> + ?ALL({Msg, By, Reason}, {message(), hardcoded, shutdown_reason()}, + begin + ok = emqx_hooks:run('message.dropped', [Msg, By, Reason]), + case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of + true -> skip; + _ -> + {'on_message_dropped', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{reason => stringfy(Reason), + message => from_message(Msg) + }, + ?assertEqual(Expected, Resp) + end, + true + end). + +prop_message_delivered() -> + ?ALL({ClientInfo, Msg}, {clientinfo(), message()}, + begin + ok = emqx_hooks:run('message.delivered', [ClientInfo, Msg]), + case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of + true -> skip; + _ -> + {'on_message_delivered', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{clientinfo => from_clientinfo(ClientInfo), + message => from_message(Msg) + }, + ?assertEqual(Expected, Resp) + end, + true + end). + +prop_message_acked() -> + ?ALL({ClientInfo, Msg}, {clientinfo(), message()}, + begin + ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), + case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of + true -> skip; + _ -> + {'on_message_acked', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{clientinfo => from_clientinfo(ClientInfo), + message => from_message(Msg) + }, + ?assertEqual(Expected, Resp) + end, + true + end). + +nodestr() -> + stringfy(node()). + +peerhost(#{peername := {Host, _}}) -> + ntoa(Host). + +sockport(#{sockname := {_, Port}}) -> + Port. + +%% copied from emqx_exhook + +ntoa({0,0,0,0,0,16#ffff,AB,CD}) -> + list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256})); +ntoa(IP) -> + list_to_binary(inet_parse:ntoa(IP)). + +maybe(undefined) -> <<>>; +maybe(B) -> B. + +properties(undefined) -> []; +properties(M) when is_map(M) -> + maps:fold(fun(K, V, Acc) -> + [#{name => stringfy(K), + value => stringfy(V)} | Acc] + end, [], M). + +topicfilters(Tfs) when is_list(Tfs) -> + [#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs]. + +%% @private +stringfy(Term) when is_binary(Term) -> + Term; +stringfy(Term) when is_integer(Term) -> + integer_to_binary(Term); +stringfy(Term) when is_atom(Term) -> + atom_to_binary(Term, utf8); +stringfy(Term) -> + unicode:characters_to_binary((io_lib:format("~0p", [Term]))). + +subopts(SubOpts) -> + #{qos => maps:get(qos, SubOpts, 0), + rh => maps:get(rh, SubOpts, 0), + rap => maps:get(rap, SubOpts, 0), + nl => maps:get(nl, SubOpts, 0), + share => maps:get(share, SubOpts, <<>>) + }. + +authresult_to_bool(AuthResult) -> + maps:get(auth_result, AuthResult, undefined) == success. + +aclresult_to_bool(Result) -> + Result == allow. + +pubsub_to_enum(publish) -> 'PUBLISH'; +pubsub_to_enum(subscribe) -> 'SUBSCRIBE'. + +from_conninfo(ConnInfo) -> + #{node => nodestr(), + clientid => maps:get(clientid, ConnInfo), + username => maybe(maps:get(username, ConnInfo, <<>>)), + peerhost => peerhost(ConnInfo), + sockport => sockport(ConnInfo), + proto_name => maps:get(proto_name, ConnInfo), + proto_ver => stringfy(maps:get(proto_ver, ConnInfo)), + keepalive => maps:get(keepalive, ConnInfo) + }. + +from_clientinfo(ClientInfo) -> + #{node => nodestr(), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo, <<>>)), + password => maybe(maps:get(password, ClientInfo, <<>>)), + peerhost => ntoa(maps:get(peerhost, ClientInfo)), + sockport => maps:get(sockport, ClientInfo), + protocol => stringfy(maps:get(protocol, ClientInfo)), + mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)), + is_superuser => maps:get(is_superuser, ClientInfo, false), + anonymous => maps:get(anonymous, ClientInfo, true), + cn => maybe(maps:get(cn, ClientInfo, <<>>)), + dn => maybe(maps:get(dn, ClientInfo, <<>>)) + }. + +from_message(Msg) -> + #{node => nodestr(), + id => emqx_guid:to_hexstr(emqx_message:id(Msg)), + qos => emqx_message:qos(Msg), + from => stringfy(emqx_message:from(Msg)), + topic => emqx_message:topic(Msg), + payload => emqx_message:payload(Msg), + timestamp => emqx_message:timestamp(Msg) + }. + +%%-------------------------------------------------------------------- +%% Helper +%%-------------------------------------------------------------------- + +do_setup() -> + logger:set_primary_config(#{level => warning}), + _ = emqx_exhook_demo_svr:start(), + emqx_ct_helpers:start_apps([emqx_exhook], fun set_special_cfgs/1), + %% waiting first loaded event + {'on_provider_loaded', _} = emqx_exhook_demo_svr:take(), + ok. + +do_teardown(_) -> + emqx_ct_helpers:stop_apps([emqx_exhook]), + %% waiting last unloaded event + {'on_provider_unloaded', _} = emqx_exhook_demo_svr:take(), + _ = emqx_exhook_demo_svr:stop(), + logger:set_primary_config(#{level => notice}), + timer:sleep(2000), + ok. + +set_special_cfgs(emqx) -> + application:set_env(emqx, allow_anonymous, false), + application:set_env(emqx, enable_acl_cache, false), + application:set_env(emqx, modules_loaded_file, undefined), + application:set_env(emqx, plugins_loaded_file, + emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins")); +set_special_cfgs(emqx_exhook) -> + ok. + +%%-------------------------------------------------------------------- +%% Generators +%%-------------------------------------------------------------------- + +conn_properties() -> + #{}. + +ack_properties() -> + #{}. + +sub_properties() -> + #{}. + +unsub_properties() -> + #{}. + +shutdown_reason() -> + oneof([utf8(), {shutdown, emqx_ct_proper_types:limited_atom()}]). + +authresult() -> + ?LET(RC, connack_return_code(), #{auth_result => RC}). + +inject_magic_into(Key, Object) -> + case castspell() of + muggles -> Object; + Spell -> + Object#{Key => Spell} + end. + +castspell() -> + L = [<<"baduser">>, <<"gooduser">>, <<"normaluser">>, muggles], + lists:nth(rand:uniform(length(L)), L). diff --git a/apps/emqx_gateway/etc/emqx_exhook.conf b/apps/emqx_gateway/etc/emqx_exhook.conf deleted file mode 100644 index b2758e705..000000000 --- a/apps/emqx_gateway/etc/emqx_exhook.conf +++ /dev/null @@ -1,15 +0,0 @@ -##==================================================================== -## EMQ X Hooks -##==================================================================== - -##-------------------------------------------------------------------- -## Server Address - -## The gRPC server url -## -## exhook.server.$name.url = url() -exhook.server.default.url = "http://127.0.0.1:9000" - -#exhook.server.default.ssl.cacertfile = "{{ platform_etc_dir }}/certs/cacert.pem" -#exhook.server.default.ssl.certfile = "{{ platform_etc_dir }}/certs/cert.pem" -#exhook.server.default.ssl.keyfile = "{{ platform_etc_dir }}/certs/key.pem" diff --git a/apps/emqx_gateway/src/exhook/prop_exhook_hooks.erl b/apps/emqx_gateway/src/exhook/prop_exhook_hooks.erl deleted file mode 100644 index cb2ab8d11..000000000 --- a/apps/emqx_gateway/src/exhook/prop_exhook_hooks.erl +++ /dev/null @@ -1,531 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(prop_exhook_hooks). - -% -include_lib("proper/include/proper.hrl"). -% -include_lib("eunit/include/eunit.hrl"). - -% -import(emqx_ct_proper_types, -% [ conninfo/0 -% , clientinfo/0 -% , sessioninfo/0 -% , message/0 -% , connack_return_code/0 -% , topictab/0 -% , topic/0 -% , subopts/0 -% ]). - -% -define(ALL(Vars, Types, Exprs), -% ?SETUP(fun() -> -% State = do_setup(), -% fun() -> do_teardown(State) end -% end, ?FORALL(Vars, Types, Exprs))). - -% %%-------------------------------------------------------------------- -% %% Properties -% %%-------------------------------------------------------------------- - -% prop_client_connect() -> -% ?ALL({ConnInfo, ConnProps}, -% {conninfo(), conn_properties()}, -% begin -% ok = emqx_hooks:run('client.connect', [ConnInfo, ConnProps]), -% {'on_client_connect', Resp} = emqx_exhook_demo_svr:take(), -% Expected = -% #{props => properties(ConnProps), -% conninfo => from_conninfo(ConnInfo) -% }, -% ?assertEqual(Expected, Resp), -% true -% end). - -% prop_client_connack() -> -% ?ALL({ConnInfo, Rc, AckProps}, -% {conninfo(), connack_return_code(), ack_properties()}, -% begin -% ok = emqx_hooks:run('client.connack', [ConnInfo, Rc, AckProps]), -% {'on_client_connack', Resp} = emqx_exhook_demo_svr:take(), -% Expected = -% #{props => properties(AckProps), -% result_code => atom_to_binary(Rc, utf8), -% conninfo => from_conninfo(ConnInfo) -% }, -% ?assertEqual(Expected, Resp), -% true -% end). - -% prop_client_authenticate() -> -% ?ALL({ClientInfo0, AuthResult}, -% {clientinfo(), authresult()}, -% begin -% ClientInfo = inject_magic_into(username, ClientInfo0), -% OutAuthResult = emqx_hooks:run_fold('client.authenticate', [ClientInfo], AuthResult), -% ExpectedAuthResult = case maps:get(username, ClientInfo) of -% <<"baduser">> -> -% AuthResult#{ -% auth_result => not_authorized, -% anonymous => false}; -% <<"gooduser">> -> -% AuthResult#{ -% auth_result => success, -% anonymous => false}; -% <<"normaluser">> -> -% AuthResult#{ -% auth_result => success, -% anonymous => false}; -% _ -> -% case maps:get(auth_result, AuthResult) of -% success -> -% #{auth_result => success, -% anonymous => false}; -% _ -> -% #{auth_result => not_authorized, -% anonymous => false} -% end -% end, -% ?assertEqual(ExpectedAuthResult, OutAuthResult), - -% {'on_client_authenticate', Resp} = emqx_exhook_demo_svr:take(), -% Expected = -% #{result => authresult_to_bool(AuthResult), -% clientinfo => from_clientinfo(ClientInfo) -% }, -% ?assertEqual(Expected, Resp), -% true -% end). - -% prop_client_authorize() -> -% ?ALL({ClientInfo0, PubSub, Topic, Result}, -% {clientinfo(), oneof([publish, subscribe]), -% topic(), oneof([allow, deny])}, -% begin -% ClientInfo = inject_magic_into(username, ClientInfo0), -% OutResult = emqx_hooks:run_fold( -% 'client.authorize', -% [ClientInfo, PubSub, Topic], -% Result), -% ExpectedOutResult = case maps:get(username, ClientInfo) of -% <<"baduser">> -> deny; -% <<"gooduser">> -> allow; -% <<"normaluser">> -> allow; -% _ -> Result -% end, -% ?assertEqual(ExpectedOutResult, OutResult), - -% {'on_client_authorize', Resp} = emqx_exhook_demo_svr:take(), -% Expected = -% #{result => authzresult_to_bool(Result), -% type => pubsub_to_enum(PubSub), -% topic => Topic, -% clientinfo => from_clientinfo(ClientInfo) -% }, -% ?assertEqual(Expected, Resp), -% true -% end). - -% prop_client_connected() -> -% ?ALL({ClientInfo, ConnInfo}, -% {clientinfo(), conninfo()}, -% begin -% ok = emqx_hooks:run('client.connected', [ClientInfo, ConnInfo]), -% {'on_client_connected', Resp} = emqx_exhook_demo_svr:take(), -% Expected = -% #{clientinfo => from_clientinfo(ClientInfo) -% }, -% ?assertEqual(Expected, Resp), -% true -% end). - -% prop_client_disconnected() -> -% ?ALL({ClientInfo, Reason, ConnInfo}, -% {clientinfo(), shutdown_reason(), conninfo()}, -% begin -% ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]), -% {'on_client_disconnected', Resp} = emqx_exhook_demo_svr:take(), -% Expected = -% #{reason => stringfy(Reason), -% clientinfo => from_clientinfo(ClientInfo) -% }, -% ?assertEqual(Expected, Resp), -% true -% end). - -% prop_client_subscribe() -> -% ?ALL({ClientInfo, SubProps, TopicTab}, -% {clientinfo(), sub_properties(), topictab()}, -% begin -% ok = emqx_hooks:run('client.subscribe', [ClientInfo, SubProps, TopicTab]), -% {'on_client_subscribe', Resp} = emqx_exhook_demo_svr:take(), -% Expected = -% #{props => properties(SubProps), -% topic_filters => topicfilters(TopicTab), -% clientinfo => from_clientinfo(ClientInfo) -% }, -% ?assertEqual(Expected, Resp), -% true -% end). - -% prop_client_unsubscribe() -> -% ?ALL({ClientInfo, UnSubProps, TopicTab}, -% {clientinfo(), unsub_properties(), topictab()}, -% begin -% ok = emqx_hooks:run('client.unsubscribe', [ClientInfo, UnSubProps, TopicTab]), -% {'on_client_unsubscribe', Resp} = emqx_exhook_demo_svr:take(), -% Expected = -% #{props => properties(UnSubProps), -% topic_filters => topicfilters(TopicTab), -% clientinfo => from_clientinfo(ClientInfo) -% }, -% ?assertEqual(Expected, Resp), -% true -% end). - -% prop_session_created() -> -% ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()}, -% begin -% ok = emqx_hooks:run('session.created', [ClientInfo, SessInfo]), -% {'on_session_created', Resp} = emqx_exhook_demo_svr:take(), -% Expected = -% #{clientinfo => from_clientinfo(ClientInfo) -% }, -% ?assertEqual(Expected, Resp), -% true -% end). - -% prop_session_subscribed() -> -% ?ALL({ClientInfo, Topic, SubOpts}, -% {clientinfo(), topic(), subopts()}, -% begin -% ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]), -% {'on_session_subscribed', Resp} = emqx_exhook_demo_svr:take(), -% Expected = -% #{topic => Topic, -% subopts => subopts(SubOpts), -% clientinfo => from_clientinfo(ClientInfo) -% }, -% ?assertEqual(Expected, Resp), -% true -% end). - -% prop_session_unsubscribed() -> -% ?ALL({ClientInfo, Topic, SubOpts}, -% {clientinfo(), topic(), subopts()}, -% begin -% ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, Topic, SubOpts]), -% {'on_session_unsubscribed', Resp} = emqx_exhook_demo_svr:take(), -% Expected = -% #{topic => Topic, -% clientinfo => from_clientinfo(ClientInfo) -% }, -% ?assertEqual(Expected, Resp), -% true -% end). - -% prop_session_resumed() -> -% ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()}, -% begin -% ok = emqx_hooks:run('session.resumed', [ClientInfo, SessInfo]), -% {'on_session_resumed', Resp} = emqx_exhook_demo_svr:take(), -% Expected = -% #{clientinfo => from_clientinfo(ClientInfo) -% }, -% ?assertEqual(Expected, Resp), -% true -% end). - -% prop_session_discared() -> -% ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()}, -% begin -% ok = emqx_hooks:run('session.discarded', [ClientInfo, SessInfo]), -% {'on_session_discarded', Resp} = emqx_exhook_demo_svr:take(), -% Expected = -% #{clientinfo => from_clientinfo(ClientInfo) -% }, -% ?assertEqual(Expected, Resp), -% true -% end). - -% prop_session_takeovered() -> -% ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()}, -% begin -% ok = emqx_hooks:run('session.takeovered', [ClientInfo, SessInfo]), -% {'on_session_takeovered', Resp} = emqx_exhook_demo_svr:take(), -% Expected = -% #{clientinfo => from_clientinfo(ClientInfo) -% }, -% ?assertEqual(Expected, Resp), -% true -% end). - -% prop_session_terminated() -> -% ?ALL({ClientInfo, Reason, SessInfo}, -% {clientinfo(), shutdown_reason(), sessioninfo()}, -% begin -% ok = emqx_hooks:run('session.terminated', [ClientInfo, Reason, SessInfo]), -% {'on_session_terminated', Resp} = emqx_exhook_demo_svr:take(), -% Expected = -% #{reason => stringfy(Reason), -% clientinfo => from_clientinfo(ClientInfo) -% }, -% ?assertEqual(Expected, Resp), -% true -% end). - -% prop_message_publish() -> -% ?ALL(Msg0, message(), -% begin -% Msg = emqx_message:from_map( -% inject_magic_into(from, emqx_message:to_map(Msg0))), -% OutMsg= emqx_hooks:run_fold('message.publish', [], Msg), -% case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of -% true -> -% ?assertEqual(Msg, OutMsg), -% skip; -% _ -> -% ExpectedOutMsg = case emqx_message:from(Msg) of -% <<"baduser">> -> -% MsgMap = emqx_message:to_map(Msg), -% emqx_message:from_map( -% MsgMap#{qos => 0, -% topic => <<"">>, -% payload => <<"">> -% }); -% <<"gooduser">> = From -> -% MsgMap = emqx_message:to_map(Msg), -% emqx_message:from_map( -% MsgMap#{topic => From, -% payload => From -% }); -% _ -> Msg -% end, -% ?assertEqual(ExpectedOutMsg, OutMsg), - -% {'on_message_publish', Resp} = emqx_exhook_demo_svr:take(), -% Expected = -% #{message => from_message(Msg) -% }, -% ?assertEqual(Expected, Resp) -% end, -% true -% end). - -% prop_message_dropped() -> -% ?ALL({Msg, By, Reason}, {message(), hardcoded, shutdown_reason()}, -% begin -% ok = emqx_hooks:run('message.dropped', [Msg, By, Reason]), -% case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of -% true -> skip; -% _ -> -% {'on_message_dropped', Resp} = emqx_exhook_demo_svr:take(), -% Expected = -% #{reason => stringfy(Reason), -% message => from_message(Msg) -% }, -% ?assertEqual(Expected, Resp) -% end, -% true -% end). - -% prop_message_delivered() -> -% ?ALL({ClientInfo, Msg}, {clientinfo(), message()}, -% begin -% ok = emqx_hooks:run('message.delivered', [ClientInfo, Msg]), -% case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of -% true -> skip; -% _ -> -% {'on_message_delivered', Resp} = emqx_exhook_demo_svr:take(), -% Expected = -% #{clientinfo => from_clientinfo(ClientInfo), -% message => from_message(Msg) -% }, -% ?assertEqual(Expected, Resp) -% end, -% true -% end). - -% prop_message_acked() -> -% ?ALL({ClientInfo, Msg}, {clientinfo(), message()}, -% begin -% ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), -% case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of -% true -> skip; -% _ -> -% {'on_message_acked', Resp} = emqx_exhook_demo_svr:take(), -% Expected = -% #{clientinfo => from_clientinfo(ClientInfo), -% message => from_message(Msg) -% }, -% ?assertEqual(Expected, Resp) -% end, -% true -% end). - -% nodestr() -> -% stringfy(node()). - -% peerhost(#{peername := {Host, _}}) -> -% ntoa(Host). - -% sockport(#{sockname := {_, Port}}) -> -% Port. - -% %% copied from emqx_exhook - -% ntoa({0,0,0,0,0,16#ffff,AB,CD}) -> -% list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256})); -% ntoa(IP) -> -% list_to_binary(inet_parse:ntoa(IP)). - -% maybe(undefined) -> <<>>; -% maybe(B) -> B. - -% properties(undefined) -> []; -% properties(M) when is_map(M) -> -% maps:fold(fun(K, V, Acc) -> -% [#{name => stringfy(K), -% value => stringfy(V)} | Acc] -% end, [], M). - -% topicfilters(Tfs) when is_list(Tfs) -> -% [#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs]. - -% %% @private -% stringfy(Term) when is_binary(Term) -> -% Term; -% stringfy(Term) when is_integer(Term) -> -% integer_to_binary(Term); -% stringfy(Term) when is_atom(Term) -> -% atom_to_binary(Term, utf8); -% stringfy(Term) -> -% unicode:characters_to_binary((io_lib:format("~0p", [Term]))). - -% subopts(SubOpts) -> -% #{qos => maps:get(qos, SubOpts, 0), -% rh => maps:get(rh, SubOpts, 0), -% rap => maps:get(rap, SubOpts, 0), -% nl => maps:get(nl, SubOpts, 0), -% share => maps:get(share, SubOpts, <<>>) -% }. - -% authresult_to_bool(AuthResult) -> -% maps:get(auth_result, AuthResult, undefined) == success. - -% authzresult_to_bool(Result) -> -% Result == allow. - -% pubsub_to_enum(publish) -> 'PUBLISH'; -% pubsub_to_enum(subscribe) -> 'SUBSCRIBE'. - -% from_conninfo(ConnInfo) -> -% #{node => nodestr(), -% clientid => maps:get(clientid, ConnInfo), -% username => maybe(maps:get(username, ConnInfo, <<>>)), -% peerhost => peerhost(ConnInfo), -% sockport => sockport(ConnInfo), -% proto_name => maps:get(proto_name, ConnInfo), -% proto_ver => stringfy(maps:get(proto_ver, ConnInfo)), -% keepalive => maps:get(keepalive, ConnInfo) -% }. - -% from_clientinfo(ClientInfo) -> -% #{node => nodestr(), -% clientid => maps:get(clientid, ClientInfo), -% username => maybe(maps:get(username, ClientInfo, <<>>)), -% password => maybe(maps:get(password, ClientInfo, <<>>)), -% peerhost => ntoa(maps:get(peerhost, ClientInfo)), -% sockport => maps:get(sockport, ClientInfo), -% protocol => stringfy(maps:get(protocol, ClientInfo)), -% mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)), -% is_superuser => maps:get(is_superuser, ClientInfo, false), -% anonymous => maps:get(anonymous, ClientInfo, true), -% cn => maybe(maps:get(cn, ClientInfo, <<>>)), -% dn => maybe(maps:get(dn, ClientInfo, <<>>)) -% }. - -% from_message(Msg) -> -% #{node => nodestr(), -% id => emqx_guid:to_hexstr(emqx_message:id(Msg)), -% qos => emqx_message:qos(Msg), -% from => stringfy(emqx_message:from(Msg)), -% topic => emqx_message:topic(Msg), -% payload => emqx_message:payload(Msg), -% timestamp => emqx_message:timestamp(Msg) -% }. - -% %%-------------------------------------------------------------------- -% %% Helper -% %%-------------------------------------------------------------------- - -% do_setup() -> -% logger:set_primary_config(#{level => warning}), -% _ = emqx_exhook_demo_svr:start(), -% emqx_ct_helpers:start_apps([emqx_exhook], fun set_special_cfgs/1), -% %% waiting first loaded event -% {'on_provider_loaded', _} = emqx_exhook_demo_svr:take(), -% ok. - -% do_teardown(_) -> -% emqx_ct_helpers:stop_apps([emqx_exhook]), -% %% waiting last unloaded event -% {'on_provider_unloaded', _} = emqx_exhook_demo_svr:take(), -% _ = emqx_exhook_demo_svr:stop(), -% logger:set_primary_config(#{level => notice}), -% timer:sleep(2000), -% ok. - -% set_special_cfgs(emqx) -> -% application:set_env(emqx, allow_anonymous, false), -% application:set_env(emqx, enable_authz_cache, false), -% application:set_env(emqx, modules_loaded_file, undefined), -% application:set_env(emqx, plugins_loaded_file, -% emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins")); -% set_special_cfgs(emqx_exhook) -> -% ok. - -% %%-------------------------------------------------------------------- -% %% Generators -% %%-------------------------------------------------------------------- - -% conn_properties() -> -% #{}. - -% ack_properties() -> -% #{}. - -% sub_properties() -> -% #{}. - -% unsub_properties() -> -% #{}. - -% shutdown_reason() -> -% oneof([utf8(), {shutdown, emqx_ct_proper_types:limited_atom()}]). - -% authresult() -> -% ?LET(RC, connack_return_code(), #{auth_result => RC}). - -% inject_magic_into(Key, Object) -> -% case castspell() of -% muggles -> Object; -% Spell -> -% Object#{Key => Spell} -% end. - -% castspell() -> -% L = [<<"baduser">>, <<"gooduser">>, <<"normaluser">>, muggles], -% lists:nth(rand:uniform(length(L)), L). diff --git a/apps/emqx_gateway/src/exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_gateway/src/exhook/test/emqx_exhook_SUITE.erl deleted file mode 100644 index 5dc29e6f1..000000000 --- a/apps/emqx_gateway/src/exhook/test/emqx_exhook_SUITE.erl +++ /dev/null @@ -1,97 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_exhook_SUITE). - -% -compile(export_all). -% -compile(nowarn_export_all). - - -% -include_lib("eunit/include/eunit.hrl"). -% -include_lib("common_test/include/ct.hrl"). - -% %%-------------------------------------------------------------------- -% %% Setups -% %%-------------------------------------------------------------------- - -% all() -> emqx_ct:all(?MODULE). - -% init_per_suite(Cfg) -> -% _ = emqx_exhook_demo_svr:start(), -% emqx_ct_helpers:start_apps([emqx_exhook], fun set_special_cfgs/1), -% Cfg. - -% end_per_suite(_Cfg) -> -% emqx_ct_helpers:stop_apps([emqx_exhook]), -% emqx_exhook_demo_svr:stop(). - -% set_special_cfgs(emqx) -> -% application:set_env(emqx, allow_anonymous, false), -% application:set_env(emqx, enable_authz_cache, false), -% application:set_env(emqx, plugins_loaded_file, undefined), -% application:set_env(emqx, modules_loaded_file, undefined); -% set_special_cfgs(emqx_exhook) -> -% ok. - -% %%-------------------------------------------------------------------- -% %% Test cases -% %%-------------------------------------------------------------------- - -% t_noserver_nohook(_) -> -% emqx_exhook:disable(default), -% ?assertEqual([], ets:tab2list(emqx_hooks)), - -% Opts = proplists:get_value( -% default, -% application:get_env(emqx_exhook, servers, []) -% ), -% ok = emqx_exhook:enable(default, Opts), -% ?assertNotEqual([], ets:tab2list(emqx_hooks)). - -% t_cli_list(_) -> -% meck_print(), -% ?assertEqual( [[emqx_exhook_server:format(Svr) || Svr <- emqx_exhook:list()]] -% , emqx_exhook_cli:cli(["server", "list"]) -% ), -% unmeck_print(). - -% t_cli_enable_disable(_) -> -% meck_print(), -% ?assertEqual([already_started], emqx_exhook_cli:cli(["server", "enable", "default"])), -% ?assertEqual(ok, emqx_exhook_cli:cli(["server", "disable", "default"])), -% ?assertEqual([], emqx_exhook_cli:cli(["server", "list"])), - -% ?assertEqual([not_running], emqx_exhook_cli:cli(["server", "disable", "default"])), -% ?assertEqual(ok, emqx_exhook_cli:cli(["server", "enable", "default"])), -% unmeck_print(). - -% t_cli_stats(_) -> -% meck_print(), -% _ = emqx_exhook_cli:cli(["server", "stats"]), -% _ = emqx_exhook_cli:cli(x), -% unmeck_print(). - -% %%-------------------------------------------------------------------- -% %% Utils -% %%-------------------------------------------------------------------- - -% meck_print() -> -% meck:new(emqx_ctl, [passthrough, no_history, no_link]), -% meck:expect(emqx_ctl, print, fun(_) -> ok end), -% meck:expect(emqx_ctl, print, fun(_, Args) -> Args end). - -% unmeck_print() -> -% meck:unload(emqx_ctl). diff --git a/apps/emqx_gateway/src/exhook/test/emqx_exhook_demo_svr.erl b/apps/emqx_gateway/src/exhook/test/emqx_exhook_demo_svr.erl deleted file mode 100644 index bcc8865ab..000000000 --- a/apps/emqx_gateway/src/exhook/test/emqx_exhook_demo_svr.erl +++ /dev/null @@ -1,339 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_exhook_demo_svr). - -% -behavior(emqx_exhook_v_1_hook_provider_bhvr). - -% %% -% -export([ start/0 -% , stop/0 -% , take/0 -% , in/1 -% ]). - -% %% gRPC server HookProvider callbacks -% -export([ on_provider_loaded/2 -% , on_provider_unloaded/2 -% , on_client_connect/2 -% , on_client_connack/2 -% , on_client_connected/2 -% , on_client_disconnected/2 -% , on_client_authenticate/2 -% , on_client_authorize/2 -% , on_client_subscribe/2 -% , on_client_unsubscribe/2 -% , on_session_created/2 -% , on_session_subscribed/2 -% , on_session_unsubscribed/2 -% , on_session_resumed/2 -% , on_session_discarded/2 -% , on_session_takeovered/2 -% , on_session_terminated/2 -% , on_message_publish/2 -% , on_message_delivered/2 -% , on_message_dropped/2 -% , on_message_acked/2 -% ]). - -% -define(PORT, 9000). -% -define(NAME, ?MODULE). - -% %%-------------------------------------------------------------------- -% %% Server APIs -% %%-------------------------------------------------------------------- - -% start() -> -% Pid = spawn(fun mngr_main/0), -% register(?MODULE, Pid), -% {ok, Pid}. - -% stop() -> -% grpc:stop_server(?NAME), -% ?MODULE ! stop. - -% take() -> -% ?MODULE ! {take, self()}, -% receive {value, V} -> V -% after 5000 -> error(timeout) end. - -% in({FunName, Req}) -> -% ?MODULE ! {in, FunName, Req}. - -% mngr_main() -> -% application:ensure_all_started(grpc), -% Services = #{protos => [emqx_exhook_pb], -% services => #{'emqx.exhook.v1.HookProvider' => emqx_exhook_demo_svr} -% }, -% Options = [], -% Svr = grpc:start_server(?NAME, ?PORT, Services, Options), -% mngr_loop([Svr, queue:new(), queue:new()]). - -% mngr_loop([Svr, Q, Takes]) -> -% receive -% {in, FunName, Req} -> -% {NQ1, NQ2} = reply(queue:in({FunName, Req}, Q), Takes), -% mngr_loop([Svr, NQ1, NQ2]); -% {take, From} -> -% {NQ1, NQ2} = reply(Q, queue:in(From, Takes)), -% mngr_loop([Svr, NQ1, NQ2]); -% stop -> -% exit(normal) -% end. - -% reply(Q1, Q2) -> -% case queue:len(Q1) =:= 0 orelse -% queue:len(Q2) =:= 0 of -% true -> {Q1, Q2}; -% _ -> -% {{value, {Name, V}}, NQ1} = queue:out(Q1), -% {{value, From}, NQ2} = queue:out(Q2), -% From ! {value, {Name, V}}, -% {NQ1, NQ2} -% end. - -% %%-------------------------------------------------------------------- -% %% callbacks -% %%-------------------------------------------------------------------- - -% -spec on_provider_loaded(emqx_exhook_pb:provider_loaded_request(), grpc:metadata()) -% -> {ok, emqx_exhook_pb:loaded_response(), grpc:metadata()} -% | {error, grpc_cowboy_h:error_response()}. - -% on_provider_loaded(Req, Md) -> -% ?MODULE:in({?FUNCTION_NAME, Req}), -% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), -% {ok, #{hooks => [ -% #{name => <<"client.connect">>}, -% #{name => <<"client.connack">>}, -% #{name => <<"client.connected">>}, -% #{name => <<"client.disconnected">>}, -% #{name => <<"client.authenticate">>}, -% #{name => <<"client.authorize">>}, -% #{name => <<"client.subscribe">>}, -% #{name => <<"client.unsubscribe">>}, -% #{name => <<"session.created">>}, -% #{name => <<"session.subscribed">>}, -% #{name => <<"session.unsubscribed">>}, -% #{name => <<"session.resumed">>}, -% #{name => <<"session.discarded">>}, -% #{name => <<"session.takeovered">>}, -% #{name => <<"session.terminated">>}, -% #{name => <<"message.publish">>}, -% #{name => <<"message.delivered">>}, -% #{name => <<"message.acked">>}, -% #{name => <<"message.dropped">>}]}, Md}. -% -spec on_provider_unloaded(emqx_exhook_pb:provider_unloaded_request(), grpc:metadata()) -% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} -% | {error, grpc_cowboy_h:error_response()}. -% on_provider_unloaded(Req, Md) -> -% ?MODULE:in({?FUNCTION_NAME, Req}), -% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), -% {ok, #{}, Md}. - -% -spec on_client_connect(emqx_exhook_pb:client_connect_request(), grpc:metadata()) -% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} -% | {error, grpc_cowboy_h:error_response()}. -% on_client_connect(Req, Md) -> -% ?MODULE:in({?FUNCTION_NAME, Req}), -% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), -% {ok, #{}, Md}. - -% -spec on_client_connack(emqx_exhook_pb:client_connack_request(), grpc:metadata()) -% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} -% | {error, grpc_cowboy_h:error_response()}. -% on_client_connack(Req, Md) -> -% ?MODULE:in({?FUNCTION_NAME, Req}), -% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), -% {ok, #{}, Md}. - -% -spec on_client_connected(emqx_exhook_pb:client_connected_request(), grpc:metadata()) -% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} -% | {error, grpc_cowboy_h:error_response()}. -% on_client_connected(Req, Md) -> -% ?MODULE:in({?FUNCTION_NAME, Req}), -% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), -% {ok, #{}, Md}. - -% -spec on_client_disconnected(emqx_exhook_pb:client_disconnected_request(), grpc:metadata()) -% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} -% | {error, grpc_cowboy_h:error_response()}. -% on_client_disconnected(Req, Md) -> -% ?MODULE:in({?FUNCTION_NAME, Req}), -% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), -% {ok, #{}, Md}. - -% -spec on_client_authenticate(emqx_exhook_pb:client_authenticate_request(), grpc:metadata()) -% -> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()} -% | {error, grpc_cowboy_h:error_response()}. -% on_client_authenticate(#{clientinfo := #{username := Username}} = Req, Md) -> -% ?MODULE:in({?FUNCTION_NAME, Req}), -% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), -% %% some cases for testing -% case Username of -% <<"baduser">> -> -% {ok, #{type => 'STOP_AND_RETURN', -% value => {bool_result, false}}, Md}; -% <<"gooduser">> -> -% {ok, #{type => 'STOP_AND_RETURN', -% value => {bool_result, true}}, Md}; -% <<"normaluser">> -> -% {ok, #{type => 'CONTINUE', -% value => {bool_result, true}}, Md}; -% _ -> -% {ok, #{type => 'IGNORE'}, Md} -% end. - -% -spec on_client_authorize(emqx_exhook_pb:client_authorize_request(), grpc:metadata()) -% -> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()} -% | {error, grpc_cowboy_h:error_response()}. -% on_client_authorize(#{clientinfo := #{username := Username}} = Req, Md) -> -% ?MODULE:in({?FUNCTION_NAME, Req}), -% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), -% %% some cases for testing -% case Username of -% <<"baduser">> -> -% {ok, #{type => 'STOP_AND_RETURN', -% value => {bool_result, false}}, Md}; -% <<"gooduser">> -> -% {ok, #{type => 'STOP_AND_RETURN', -% value => {bool_result, true}}, Md}; -% <<"normaluser">> -> -% {ok, #{type => 'CONTINUE', -% value => {bool_result, true}}, Md}; -% _ -> -% {ok, #{type => 'IGNORE'}, Md} -% end. - -% -spec on_client_subscribe(emqx_exhook_pb:client_subscribe_request(), grpc:metadata()) -% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} -% | {error, grpc_cowboy_h:error_response()}. -% on_client_subscribe(Req, Md) -> -% ?MODULE:in({?FUNCTION_NAME, Req}), -% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), -% {ok, #{}, Md}. - -% -spec on_client_unsubscribe(emqx_exhook_pb:client_unsubscribe_request(), grpc:metadata()) -% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} -% | {error, grpc_cowboy_h:error_response()}. -% on_client_unsubscribe(Req, Md) -> -% ?MODULE:in({?FUNCTION_NAME, Req}), -% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), -% {ok, #{}, Md}. - -% -spec on_session_created(emqx_exhook_pb:session_created_request(), grpc:metadata()) -% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} -% | {error, grpc_cowboy_h:error_response()}. -% on_session_created(Req, Md) -> -% ?MODULE:in({?FUNCTION_NAME, Req}), -% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), -% {ok, #{}, Md}. - -% -spec on_session_subscribed(emqx_exhook_pb:session_subscribed_request(), grpc:metadata()) -% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} -% | {error, grpc_cowboy_h:error_response()}. -% on_session_subscribed(Req, Md) -> -% ?MODULE:in({?FUNCTION_NAME, Req}), -% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), -% {ok, #{}, Md}. - -% -spec on_session_unsubscribed(emqx_exhook_pb:session_unsubscribed_request(), grpc:metadata()) -% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} -% | {error, grpc_cowboy_h:error_response()}. -% on_session_unsubscribed(Req, Md) -> -% ?MODULE:in({?FUNCTION_NAME, Req}), -% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), -% {ok, #{}, Md}. - -% -spec on_session_resumed(emqx_exhook_pb:session_resumed_request(), grpc:metadata()) -% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} -% | {error, grpc_cowboy_h:error_response()}. -% on_session_resumed(Req, Md) -> -% ?MODULE:in({?FUNCTION_NAME, Req}), -% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), -% {ok, #{}, Md}. - -% -spec on_session_discarded(emqx_exhook_pb:session_discarded_request(), grpc:metadata()) -% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} -% | {error, grpc_cowboy_h:error_response()}. -% on_session_discarded(Req, Md) -> -% ?MODULE:in({?FUNCTION_NAME, Req}), -% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), -% {ok, #{}, Md}. - -% -spec on_session_takeovered(emqx_exhook_pb:session_takeovered_request(), grpc:metadata()) -% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} -% | {error, grpc_cowboy_h:error_response()}. -% on_session_takeovered(Req, Md) -> -% ?MODULE:in({?FUNCTION_NAME, Req}), -% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), -% {ok, #{}, Md}. - -% -spec on_session_terminated(emqx_exhook_pb:session_terminated_request(), grpc:metadata()) -% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} -% | {error, grpc_cowboy_h:error_response()}. -% on_session_terminated(Req, Md) -> -% ?MODULE:in({?FUNCTION_NAME, Req}), -% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), -% {ok, #{}, Md}. - -% -spec on_message_publish(emqx_exhook_pb:message_publish_request(), grpc:metadata()) -% -> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()} -% | {error, grpc_cowboy_h:error_response()}. -% on_message_publish(#{message := #{from := From} = Msg} = Req, Md) -> -% ?MODULE:in({?FUNCTION_NAME, Req}), -% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), -% %% some cases for testing -% case From of -% <<"baduser">> -> -% NMsg = Msg#{qos => 0, -% topic => <<"">>, -% payload => <<"">> -% }, -% {ok, #{type => 'STOP_AND_RETURN', -% value => {message, NMsg}}, Md}; -% <<"gooduser">> -> -% NMsg = Msg#{topic => From, -% payload => From}, -% {ok, #{type => 'STOP_AND_RETURN', -% value => {message, NMsg}}, Md}; -% _ -> -% {ok, #{type => 'IGNORE'}, Md} -% end. - -% -spec on_message_delivered(emqx_exhook_pb:message_delivered_request(), grpc:metadata()) -% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} -% | {error, grpc_cowboy_h:error_response()}. -% on_message_delivered(Req, Md) -> -% ?MODULE:in({?FUNCTION_NAME, Req}), -% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), -% {ok, #{}, Md}. - -% -spec on_message_dropped(emqx_exhook_pb:message_dropped_request(), grpc:metadata()) -% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} -% | {error, grpc_cowboy_h:error_response()}. -% on_message_dropped(Req, Md) -> -% ?MODULE:in({?FUNCTION_NAME, Req}), -% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), -% {ok, #{}, Md}. - -% -spec on_message_acked(emqx_exhook_pb:message_acked_request(), grpc:metadata()) -% -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} -% | {error, grpc_cowboy_h:error_response()}. -% on_message_acked(Req, Md) -> -% ?MODULE:in({?FUNCTION_NAME, Req}), -% %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), -% {ok, #{}, Md}. diff --git a/rebar.config.erl b/rebar.config.erl index 5eca3094a..4098bb7ab 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -275,6 +275,7 @@ relx_apps(ReleaseType) -> , emqx_authn , emqx_authz , emqx_gateway + , {emqx_exhook, load} , emqx_data_bridge , emqx_rule_engine , emqx_rule_actions