From 485bffadd68b2344c7d48540ba99d5b308d50ca9 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 16 Oct 2020 10:17:22 +0800 Subject: [PATCH] refactor(exhook): improve the exhook implementation --- .gitignore | 2 +- apps/emqx_exhook/.gitignore | 3 + apps/emqx_exhook/README.md | 39 ++ apps/emqx_exhook/docs/design.md | 116 ++++ apps/emqx_exhook/include/emqx_exhook.hrl | 22 + apps/emqx_exhook/priv/emqx_exhook.schema | 38 ++ apps/emqx_exhook/priv/protos/exhook.proto | 395 +++++++++++++ apps/emqx_exhook/rebar.config | 47 ++ apps/emqx_exhook/src/emqx_exhook.app.src | 12 + apps/emqx_exhook/src/emqx_exhook.erl | 134 +++++ apps/emqx_exhook/src/emqx_exhook_app.erl | 117 ++++ apps/emqx_exhook/src/emqx_exhook_cli.erl | 80 +++ apps/emqx_exhook/src/emqx_exhook_handler.erl | 288 ++++++++++ apps/emqx_exhook/src/emqx_exhook_server.erl | 274 +++++++++ apps/emqx_exhook/src/emqx_exhook_sup.erl | 74 +++ apps/emqx_exhook/test/emqx_exhook_SUITE.erl | 53 ++ .../emqx_exhook/test/emqx_exhook_demo_svr.erl | 299 ++++++++++ .../test/props/prop_exhook_hooks.erl | 535 ++++++++++++++++++ etc/emqx_cloud.d/emqx_exhook.conf | 15 + rebar.config | 243 ++++++-- 20 files changed, 2743 insertions(+), 43 deletions(-) create mode 100644 apps/emqx_exhook/.gitignore create mode 100644 apps/emqx_exhook/README.md create mode 100644 apps/emqx_exhook/docs/design.md create mode 100644 apps/emqx_exhook/include/emqx_exhook.hrl create mode 100644 apps/emqx_exhook/priv/emqx_exhook.schema create mode 100644 apps/emqx_exhook/priv/protos/exhook.proto create mode 100644 apps/emqx_exhook/rebar.config create mode 100644 apps/emqx_exhook/src/emqx_exhook.app.src create mode 100644 apps/emqx_exhook/src/emqx_exhook.erl create mode 100644 apps/emqx_exhook/src/emqx_exhook_app.erl create mode 100644 apps/emqx_exhook/src/emqx_exhook_cli.erl create mode 100644 apps/emqx_exhook/src/emqx_exhook_handler.erl create mode 100644 apps/emqx_exhook/src/emqx_exhook_server.erl create mode 100644 apps/emqx_exhook/src/emqx_exhook_sup.erl create mode 100644 apps/emqx_exhook/test/emqx_exhook_SUITE.erl create mode 100644 apps/emqx_exhook/test/emqx_exhook_demo_svr.erl create mode 100644 apps/emqx_exhook/test/props/prop_exhook_hooks.erl create mode 100644 etc/emqx_cloud.d/emqx_exhook.conf diff --git a/.gitignore b/.gitignore index b23eeafe3..f4900cf4e 100644 --- a/.gitignore +++ b/.gitignore @@ -41,4 +41,4 @@ erlang.mk *.coverdata etc/emqx.conf.rendered Mnesia.*/ -*.DS_Store +*.DS_Store \ No newline at end of file diff --git a/apps/emqx_exhook/.gitignore b/apps/emqx_exhook/.gitignore new file mode 100644 index 000000000..520db35b8 --- /dev/null +++ b/apps/emqx_exhook/.gitignore @@ -0,0 +1,3 @@ +src/emqx_exhook_pb.erl +src/emqx_exhook_v_1_hook_provider_bhvr.erl +src/emqx_exhook_v_1_hook_provider_client.erl diff --git a/apps/emqx_exhook/README.md b/apps/emqx_exhook/README.md new file mode 100644 index 000000000..216c39275 --- /dev/null +++ b/apps/emqx_exhook/README.md @@ -0,0 +1,39 @@ +# emqx_exhook + +The `emqx_exhook` extremly enhance the extensibility for EMQ X. It allow using an others programming language to mount the hooks intead of erlang. + +## Feature + +- [x] Based on gRPC, it brings a very wide range of applicability +- [x] Allows you to use the return value to extend emqx behavior. + +## Architecture + +``` +EMQ X Third-party Runtime ++========================+ +========+==========+ +| ExHook | | | | +| +----------------+ | gRPC | gRPC | User's | +| | gPRC Client | ------------------> | Server | Codes | +| +----------------+ | (HTTP/2) | | | +| | | | | ++========================+ +========+==========+ +``` + +## Usage + +### gRPC service + +See: `priv/protos/exhook.proto` + +### CLI + +## Example + +## Recommended gRPC Framework + +See: https://github.com/grpc-ecosystem/awesome-grpc + +## Thanks + +- [grpcbox](https://github.com/tsloughter/grpcbox) diff --git a/apps/emqx_exhook/docs/design.md b/apps/emqx_exhook/docs/design.md new file mode 100644 index 000000000..671e240cc --- /dev/null +++ b/apps/emqx_exhook/docs/design.md @@ -0,0 +1,116 @@ +# 设计 + +## 动机 + +在 EMQ X Broker v4.1-v4.2 中,我们发布了 2 个插件来扩展 emqx 的编程能力: + +1. `emqx-extension-hook` 提供了使用 Java, Python 向 Broker 挂载钩子的功能 +2. `emqx-exproto` 提供了使用 Java,Python 编写用户自定义协议接入插件的功能 + +但在后续的支持中发现许多难以处理的问题: + +1. 有大量的编程语言需要支持,需要编写和维护如 Go, JavaScript, Lua.. 等语言的驱动。 +2. `erlport` 使用的操作系统的管道进行通信,这让用户代码只能部署在和 emqx 同一个操作系统上。部署方式受到了极大的限制。 +3. 用户程序的启动参数直接打包到 Broker 中,导致用户开发无法实时的进行调试,单步跟踪等。 +4. `erlport` 会占用 `stdin` `stdout`。 + +因此,我们计划重构这部分的实现,其中主要的内容是: +1. 使用 `gRPC` 替换 `erlport`。 +2. 将 `emqx-extension-hook` 重命名为 `emqx-exhook` + + +旧版本的设计参考:[emqx-extension-hook design in v4.2.0](https://github.com/emqx/emqx-exhook/blob/v4.2.0/docs/design.md) + +## 设计 + +架构如下: + +``` + EMQ X ++========================+ +========+==========+ +| ExHook | | | | +| +----------------+ | gRPC | gRPC | User's | +| | gRPC Client | ------------------> | Server | Codes | +| +----------------+ | (HTTP/2) | | | +| | | | | ++========================+ +========+==========+ +``` + +`emqx-exhook` 通过 gRPC 的方式向用户部署的 gRPC 服务发送钩子的请求,并处理其返回的值。 + + +和 emqx 原生的钩子一致,emqx-exhook 也支持链式的方式计算和返回: + + + +### gRPC 服务示例 + +用户需要实现的方法,和数据类型的定义在 `priv/protos/exhook.proto` 文件中。例如,其支持的接口有: + +```protobuff +syntax = "proto3"; + +package emqx.exhook.v1; + +service HookProvider { + + rpc OnProviderLoaded(ProviderLoadedRequest) returns (LoadedResponse) {}; + + rpc OnProviderUnloaded(ProviderUnloadedRequest) returns (EmptySuccess) {}; + + rpc OnClientConnect(ClientConnectRequest) returns (EmptySuccess) {}; + + rpc OnClientConnack(ClientConnackRequest) returns (EmptySuccess) {}; + + rpc OnClientConnected(ClientConnectedRequest) returns (EmptySuccess) {}; + + rpc OnClientDisconnected(ClientDisconnectedRequest) returns (EmptySuccess) {}; + + rpc OnClientAuthenticate(ClientAuthenticateRequest) returns (ValuedResponse) {}; + + rpc OnClientCheckAcl(ClientCheckAclRequest) returns (ValuedResponse) {}; + + rpc OnClientSubscribe(ClientSubscribeRequest) returns (EmptySuccess) {}; + + rpc OnClientUnsubscribe(ClientUnsubscribeRequest) returns (EmptySuccess) {}; + + rpc OnSessionCreated(SessionCreatedRequest) returns (EmptySuccess) {}; + + rpc OnSessionSubscribed(SessionSubscribedRequest) returns (EmptySuccess) {}; + + rpc OnSessionUnsubscribed(SessionUnsubscribedRequest) returns (EmptySuccess) {}; + + rpc OnSessionResumed(SessionResumedRequest) returns (EmptySuccess) {}; + + rpc OnSessionDiscarded(SessionDiscardedRequest) returns (EmptySuccess) {}; + + rpc OnSessionTakeovered(SessionTakeoveredRequest) returns (EmptySuccess) {}; + + rpc OnSessionTerminated(SessionTerminatedRequest) returns (EmptySuccess) {}; + + rpc OnMessagePublish(MessagePublishRequest) returns (ValuedResponse) {}; + + rpc OnMessageDelivered(MessageDeliveredRequest) returns (EmptySuccess) {}; + + rpc OnMessageDropped(MessageDroppedRequest) returns (EmptySuccess) {}; + + rpc OnMessageAcked(MessageAckedRequest) returns (EmptySuccess) {}; +} +``` + +### 配置文件示例 + +``` +## 配置 gRPC 服务地址 (HTTP) +## +## s1 为服务器的名称 +exhook.server.s1.url = http://127.0.0.1:9001 + +## 配置 gRPC 服务地址 (HTTPS) +## +## s2 为服务器名称 +exhook.server.s2.url = https://127.0.0.1:9002 +exhook.server.s2.cacertfile = ca.pem +exhook.server.s2.certfile = cert.pem +exhook.server.s2.keyfile = key.pem +``` diff --git a/apps/emqx_exhook/include/emqx_exhook.hrl b/apps/emqx_exhook/include/emqx_exhook.hrl new file mode 100644 index 000000000..8a404ca39 --- /dev/null +++ b/apps/emqx_exhook/include/emqx_exhook.hrl @@ -0,0 +1,22 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-ifndef(EMQX_EXHOOK_HRL). +-define(EMQX_EXHOOK_HRL, true). + +-define(APP, emqx_exhook). + +-endif. diff --git a/apps/emqx_exhook/priv/emqx_exhook.schema b/apps/emqx_exhook/priv/emqx_exhook.schema new file mode 100644 index 000000000..2a926b968 --- /dev/null +++ b/apps/emqx_exhook/priv/emqx_exhook.schema @@ -0,0 +1,38 @@ +%%-*- mode: erlang -*- + +{mapping, "exhook.server.$name.url", "emqx_exhook.servers", [ + {datatype, string} +]}. + +{mapping, "exhook.server.$name.ssl.cacertfile", "emqx_exhook.servers", [ + {datatype, string} +]}. + +{mapping, "exhook.server.$name.ssl.certfile", "emqx_exhook.servers", [ + {datatype, string} +]}. + +{mapping, "exhook.server.$name.ssl.keyfile", "emqx_exhook.servers", [ + {datatype, string} +]}. + +{translation, "emqx_exhook.servers", fun(Conf) -> + Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end, + ServerOptions = fun(Prefix) -> + case http_uri:parse(cuttlefish:conf_get(Prefix ++ ".url", Conf)) of + {ok, {http, _, Host, Port, _, _}} -> + [{scheme, http}, {host, Host}, {port, Port}]; + {ok, {https, _, Host, Port, _, _}} -> + [{scheme, https}, {host, Host}, {port, Port}, + {ssl_options, + Filter([{ssl, true}, + {certfile, cuttlefish:conf_get(Prefix ++ ".ssl.certfile", Conf)}, + {keyfile, cuttlefish:conf_get(Prefix ++ ".ssl.keyfile", Conf)}, + {cacertfile, cuttlefish:conf_get(Prefix ++ ".ssl.cacertfile", Conf)} + ])}]; + _ -> error(invalid_server_options) + end + end, + [{list_to_atom(Name), ServerOptions("exhook.server." ++ Name)} + || {["exhook", "server", Name, "url"], _} <- cuttlefish_variable:filter_by_prefix("exhook.server", Conf)] +end}. diff --git a/apps/emqx_exhook/priv/protos/exhook.proto b/apps/emqx_exhook/priv/protos/exhook.proto new file mode 100644 index 000000000..8dc9641b9 --- /dev/null +++ b/apps/emqx_exhook/priv/protos/exhook.proto @@ -0,0 +1,395 @@ +//------------------------------------------------------------------------------ +// Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//------------------------------------------------------------------------------ + +syntax = "proto3"; + +package emqx.exhook.v1; + +service HookProvider { + + rpc OnProviderLoaded(ProviderLoadedRequest) returns (LoadedResponse) {}; + + rpc OnProviderUnloaded(ProviderUnloadedRequest) returns (EmptySuccess) {}; + + rpc OnClientConnect(ClientConnectRequest) returns (EmptySuccess) {}; + + rpc OnClientConnack(ClientConnackRequest) returns (EmptySuccess) {}; + + rpc OnClientConnected(ClientConnectedRequest) returns (EmptySuccess) {}; + + rpc OnClientDisconnected(ClientDisconnectedRequest) returns (EmptySuccess) {}; + + rpc OnClientAuthenticate(ClientAuthenticateRequest) returns (ValuedResponse) {}; + + rpc OnClientCheckAcl(ClientCheckAclRequest) returns (ValuedResponse) {}; + + rpc OnClientSubscribe(ClientSubscribeRequest) returns (EmptySuccess) {}; + + rpc OnClientUnsubscribe(ClientUnsubscribeRequest) returns (EmptySuccess) {}; + + rpc OnSessionCreated(SessionCreatedRequest) returns (EmptySuccess) {}; + + rpc OnSessionSubscribed(SessionSubscribedRequest) returns (EmptySuccess) {}; + + rpc OnSessionUnsubscribed(SessionUnsubscribedRequest) returns (EmptySuccess) {}; + + rpc OnSessionResumed(SessionResumedRequest) returns (EmptySuccess) {}; + + rpc OnSessionDiscarded(SessionDiscardedRequest) returns (EmptySuccess) {}; + + rpc OnSessionTakeovered(SessionTakeoveredRequest) returns (EmptySuccess) {}; + + rpc OnSessionTerminated(SessionTerminatedRequest) returns (EmptySuccess) {}; + + rpc OnMessagePublish(MessagePublishRequest) returns (ValuedResponse) {}; + + rpc OnMessageDelivered(MessageDeliveredRequest) returns (EmptySuccess) {}; + + rpc OnMessageDropped(MessageDroppedRequest) returns (EmptySuccess) {}; + + rpc OnMessageAcked(MessageAckedRequest) returns (EmptySuccess) {}; +} + +//------------------------------------------------------------------------------ +// Request & Response +//------------------------------------------------------------------------------ + +message ProviderLoadedRequest { + + BrokerInfo broker = 1; +} + +message LoadedResponse { + + repeated HookSpec hooks = 1; +} + +message ProviderUnloadedRequest { } + +message ClientConnectRequest { + + ConnInfo conninfo = 1; + + // MQTT CONNECT packet's properties (MQTT v5.0) + // + // It should be empty on MQTT v3.1.1/v3.1 or others protocol + repeated Property props = 2; +} + +message ClientConnackRequest { + + ConnInfo conninfo = 1; + + string result_code = 2; + + repeated Property props = 3; +} + +message ClientConnectedRequest { + + ClientInfo clientinfo = 1; +} + +message ClientDisconnectedRequest { + + ClientInfo clientinfo = 1; + + string reason = 2; +} + +message ClientAuthenticateRequest { + + ClientInfo clientinfo = 1; + + bool result = 2; +} + +message ClientCheckAclRequest { + + ClientInfo clientinfo = 1; + + enum AclReqType { + + PUBLISH = 0; + + SUBSCRIBE = 1; + } + + AclReqType type = 2; + + string topic = 3; + + bool result = 4; +} + +message ClientSubscribeRequest { + + ClientInfo clientinfo = 1; + + repeated Property props = 2; + + repeated TopicFilter topic_filters = 3; +} + +message ClientUnsubscribeRequest { + + ClientInfo clientinfo = 1; + + repeated Property props = 2; + + repeated TopicFilter topic_filters = 3; +} + +message SessionCreatedRequest { + + ClientInfo clientinfo = 1; +} + +message SessionSubscribedRequest { + + ClientInfo clientinfo = 1; + + string topic = 2; + + SubOpts subopts = 3; +} + +message SessionUnsubscribedRequest { + + ClientInfo clientinfo = 1; + + string topic = 2; +} + +message SessionResumedRequest { + + ClientInfo clientinfo = 1; +} + +message SessionDiscardedRequest { + + ClientInfo clientinfo = 1; +} + +message SessionTakeoveredRequest { + + ClientInfo clientinfo = 1; +} + +message SessionTerminatedRequest { + + ClientInfo clientinfo = 1; + + string reason = 2; +} + +message MessagePublishRequest { + + Message message = 1; +} + +message MessageDeliveredRequest { + + ClientInfo clientinfo = 1; + + Message message = 2; +} + +message MessageDroppedRequest { + + Message message = 1; + + string reason = 2; +} + +message MessageAckedRequest { + + ClientInfo clientinfo = 1; + + Message message = 2; +} + +//------------------------------------------------------------------------------ +// Basic data types +//------------------------------------------------------------------------------ + +message EmptySuccess { } + +message ValuedResponse { + + // The responsed value type + // - ignore: Ignore the responsed value + // - contiune: Use the responsed value and execute the next hook + // - stop_and_return: Use the responsed value and stop the chain executing + enum ResponsedType { + + IGNORE = 0; + + CONTINUE = 1; + + STOP_AND_RETURN = 2; + } + + ResponsedType type = 1; + + oneof value { + + // Boolean result, used on the 'client.authenticate', 'client.check_acl' hooks + bool bool_result = 3; + + // Message result, used on the 'message.*' hooks + Message message = 4; + } +} + +message BrokerInfo { + + string version = 1; + + string sysdescr = 2; + + string uptime = 3; + + string datetime = 4; +} + +message HookSpec { + + // The registered hooks name + // + // Available value: + // "client.connect", "client.connack" + // "client.connected", "client.disconnected" + // "client.authenticate", "client.check_acl" + // "client.subscribe", "client.unsubscribe" + // + // "session.created", "session.subscribed" + // "session.unsubscribed", "session.resumed" + // "session.discarded", "session.takeovered" + // "session.terminated" + // + // "message.publish", "message.delivered" + // "message.acked", "message.dropped" + string name = 1; + + // The topic filters for message hooks + repeated string topics = 2; +} + +message ConnInfo { + + string node = 1; + + string clientid = 2; + + string username = 3; + + string peerhost = 4; + + uint32 sockport = 5; + + string proto_name = 6; + + string proto_ver = 7; + + uint32 keepalive = 8; +} + +message ClientInfo { + + string node = 1; + + string clientid = 2; + + string username = 3; + + string password = 4; + + string peerhost = 5; + + uint32 sockport = 6; + + string protocol = 7; + + string mountpoint = 8; + + bool is_superuser = 9; + + bool anonymous = 10; +} + +message Message { + + string node = 1; + + string id = 2; + + uint32 qos = 3; + + string from = 4; + + string topic = 5; + + bytes payload = 6; + + uint64 timestamp = 7; +} + +message Property { + + string name = 1; + + string value = 2; +} + +message TopicFilter { + + string name = 1; + + uint32 qos = 2; +} + +message SubOpts { + + // The QoS level + uint32 qos = 1; + + // The group name for shared subscription + string share = 2; + + // The Retain Handling option (MQTT v5.0) + // + // 0 = Send retained messages at the time of the subscribe + // 1 = Send retained messages at subscribe only if the subscription does + // not currently exist + // 2 = Do not send retained messages at the time of the subscribe + uint32 rh = 3; + + // The Retain as Published option (MQTT v5.0) + // + // If 1, Application Messages forwarded using this subscription keep the + // RETAIN flag they were published with. + // If 0, Application Messages forwarded using this subscription have the + // RETAIN flag set to 0. + // Retained messages sent when the subscription is established have the RETAIN flag set to 1. + uint32 rap = 4; + + // The No Local option (MQTT v5.0) + // + // If the value is 1, Application Messages MUST NOT be forwarded to a + // connection with a ClientID equal to the ClientID of the publishing + uint32 nl = 5; +} diff --git a/apps/emqx_exhook/rebar.config b/apps/emqx_exhook/rebar.config new file mode 100644 index 000000000..0019ca245 --- /dev/null +++ b/apps/emqx_exhook/rebar.config @@ -0,0 +1,47 @@ +%%-*- mode: erlang -*- +{plugins, + [rebar3_proper, + {grpcbox_plugin, {git, "https://github.com/HJianBo/grpcbox_plugin", {branch, "master"}}} +]}. + +{deps, + [{grpcbox, {git, "https://github.com/tsloughter/grpcbox", {branch, "master"}}} +]}. + +{grpc, + [{protos, ["priv/protos"]}, + {gpb_opts, [{module_name_prefix, "emqx_"}, + {module_name_suffix, "_pb"}]} +]}. + +{provider_hooks, + [{pre, [{compile, {grpc, gen}}]}]}. + +{edoc_opts, [{preprocess, true}]}. + +{erl_opts, [warn_unused_vars, + warn_shadow_vars, + warn_unused_import, + warn_obsolete_guard, + debug_info, + {parse_transform}]}. + +{xref_checks, [undefined_function_calls, undefined_functions, + locals_not_used, deprecated_function_calls, + warnings_as_errors, deprecated_functions]}. +{xref_ignores, [emqx_exhook_pb]}. + +{cover_enabled, true}. +{cover_opts, [verbose]}. +{cover_export_enabled, true}. +{cover_excl_mods, [emqx_exhook_pb, + emqx_exhook_v_1_hook_provider_bhvr, + emqx_exhook_v_1_connection_client]}. + +{profiles, + [{test, [ + {deps, [ {emqx_ct_helper, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "v1.3.1"}}} + , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} + ]} + ]} +]}. diff --git a/apps/emqx_exhook/src/emqx_exhook.app.src b/apps/emqx_exhook/src/emqx_exhook.app.src new file mode 100644 index 000000000..e91359178 --- /dev/null +++ b/apps/emqx_exhook/src/emqx_exhook.app.src @@ -0,0 +1,12 @@ +{application, emqx_exhook, + [{description, "EMQ X Extension for Hook"}, + {vsn, "git"}, + {modules, []}, + {registered, []}, + {mod, {emqx_exhook_app, []}}, + {applications, [kernel,stdlib,emqx_libs,grpcbox]}, + {env,[]}, + {licenses, ["Apache-2.0"]}, + {maintainers, ["EMQ X Team "]}, + {links, [{"Homepage", "https://emqx.io/"}]} + ]}. diff --git a/apps/emqx_exhook/src/emqx_exhook.erl b/apps/emqx_exhook/src/emqx_exhook.erl new file mode 100644 index 000000000..c9fe6a993 --- /dev/null +++ b/apps/emqx_exhook/src/emqx_exhook.erl @@ -0,0 +1,134 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exhook). + +-include("emqx_exhook.hrl"). +-include_lib("emqx_libs/include/logger.hrl"). + +-logger_header("[ExHook]"). + +%% Mgmt APIs +-export([ enable/2 + , disable/1 + , disable_all/0 + , list/0 + ]). + +-export([ cast/2 + , call_fold/3 + ]). + +%%-------------------------------------------------------------------- +%% Mgmt APIs +%%-------------------------------------------------------------------- + +-spec list() -> [emqx_exhook_server:server()]. +list() -> + [server(Name) || Name <- running()]. + +-spec enable(atom()|string(), list()) -> ok | {error, term()}. +enable(Name, Opts) -> + case lists:member(Name, running()) of + true -> + {error, already_started}; + _ -> + case emqx_exhook_server:load(Name, Opts) of + {ok, ServiceState} -> + save(Name, ServiceState); + {error, Reason} -> + ?LOG(error, "Load server ~p failed: ~p", [Name, Reason]), + {error, Reason} + end + end. + +-spec disable(atom()|string()) -> ok | {error, term()}. +disable(Name) -> + case server(Name) of + undefined -> {error, not_running}; + Service -> + ok = emqx_exhook_server:unload(Service), + unsave(Name) + end. + +-spec disable_all() -> [term()]. +disable_all() -> + [begin disable(Name), Name end || Name <- running()]. + +%%---------------------------------------------------------- +%% Dispatch APIs +%%---------------------------------------------------------- + +-spec cast(atom(), map()) -> ok. +cast(Hookpoint, Req) -> + cast(Hookpoint, Req, running()). + +cast(_, _, []) -> + ok; +cast(Hookpoint, Req, [ServiceName|More]) -> + %% XXX: Need a real asynchronous running + _ = emqx_exhook_server:call(Hookpoint, Req, server(ServiceName)), + cast(Hookpoint, Req, More). + +-spec call_fold(atom(), term(), function()) + -> {ok, term()} + | {stop, term()}. +call_fold(Hookpoint, Req, AccFun) -> + call_fold(Hookpoint, Req, AccFun, running()). + +call_fold(_, Req, _, []) -> + {ok, Req}; +call_fold(Hookpoint, Req, AccFun, [ServiceName|More]) -> + case emqx_exhook_server:call(Hookpoint, Req, server(ServiceName)) of + {ok, Resp} -> + case AccFun(Req, Resp) of + {stop, NReq} -> {stop, NReq}; + {ok, NReq} -> call_fold(Hookpoint, NReq, AccFun, More) + end; + _ -> + call_fold(Hookpoint, Req, AccFun, More) + end. + +%%---------------------------------------------------------- +%% Storage + +-compile({inline, [save/2]}). +save(Name, ServiceState) -> + Saved = persistent_term:get(?APP, []), + persistent_term:put(?APP, lists:reverse([Name | Saved])), + persistent_term:put({?APP, Name}, ServiceState). + +-compile({inline, [unsave/1]}). +unsave(Name) -> + case persistent_term:get(?APP, []) of + [] -> + persistent_term:erase(?APP); + Saved -> + persistent_term:put(?APP, lists:delete(Name, Saved)) + end, + persistent_term:erase({?APP, Name}), + ok. + +-compile({inline, [running/0]}). +running() -> + persistent_term:get(?APP, []). + +-compile({inline, [server/1]}). +server(Name) -> + case catch persistent_term:get({?APP, Name}) of + {'EXIT', {badarg,_}} -> undefined; + Service -> Service + end. diff --git a/apps/emqx_exhook/src/emqx_exhook_app.erl b/apps/emqx_exhook/src/emqx_exhook_app.erl new file mode 100644 index 000000000..1411b618d --- /dev/null +++ b/apps/emqx_exhook/src/emqx_exhook_app.erl @@ -0,0 +1,117 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exhook_app). + +-behaviour(application). + +-include("emqx_exhook.hrl"). + +-emqx_plugin(extension). + +-export([ start/2 + , stop/1 + , prep_stop/1 + ]). + +%% Internal export +-export([ load_server/2 + , unload_server/1 + , load_exhooks/0 + , unload_exhooks/0 + ]). + +%%-------------------------------------------------------------------- +%% Application callbacks +%%-------------------------------------------------------------------- + +start(_StartType, _StartArgs) -> + {ok, Sup} = emqx_exhook_sup:start_link(), + + %% Load all dirvers + load_all_servers(), + + %% Register all hooks + load_exhooks(), + + %% Register CLI + emqx_ctl:register_command(exhook, {emqx_exhook_cli, cli}, []), + {ok, Sup}. + +prep_stop(State) -> + emqx_ctl:unregister_command(exhook), + unload_exhooks(), + unload_all_servers(), + State. + +stop(_State) -> + ok. + +%%-------------------------------------------------------------------- +%% Internal funcs +%%-------------------------------------------------------------------- + +load_all_servers() -> + lists:foreach(fun({Name, Options}) -> + load_server(Name, Options) + end, application:get_env(?APP, servers, [])). + +unload_all_servers() -> + emqx_exhook:disable_all(). + +load_server(Name, Options) -> + emqx_exhook:enable(Name, Options). + +unload_server(Name) -> + emqx_exhook:disable(Name). + +%%-------------------------------------------------------------------- +%% Exhooks + +load_exhooks() -> + [emqx:hook(Name, {M, F, A}) || {Name, {M, F, A}} <- search_exhooks()]. + +unload_exhooks() -> + [emqx:unhook(Name, {M, F}) || {Name, {M, F, _A}} <- search_exhooks()]. + +search_exhooks() -> + search_exhooks(ignore_lib_apps(application:loaded_applications())). +search_exhooks(Apps) -> + lists:flatten([ExHooks || App <- Apps, {_App, _Mod, ExHooks} <- find_attrs(App, exhooks)]). + +ignore_lib_apps(Apps) -> + LibApps = [kernel, stdlib, sasl, appmon, eldap, erts, + syntax_tools, ssl, crypto, mnesia, os_mon, + inets, goldrush, gproc, runtime_tools, + snmp, otp_mibs, public_key, asn1, ssh, hipe, + common_test, observer, webtool, xmerl, tools, + test_server, compiler, debugger, eunit, et, + wx], + [AppName || {AppName, _, _} <- Apps, not lists:member(AppName, LibApps)]. + +find_attrs(App, Def) -> + [{App, Mod, Attr} || {ok, Modules} <- [application:get_key(App, modules)], + Mod <- Modules, + {Name, Attrs} <- module_attributes(Mod), Name =:= Def, + Attr <- Attrs]. + +module_attributes(Module) -> + try Module:module_info(attributes) + catch + error:undef -> []; + error:Reason -> error(Reason) + end. + diff --git a/apps/emqx_exhook/src/emqx_exhook_cli.erl b/apps/emqx_exhook/src/emqx_exhook_cli.erl new file mode 100644 index 000000000..8bab9ced5 --- /dev/null +++ b/apps/emqx_exhook/src/emqx_exhook_cli.erl @@ -0,0 +1,80 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exhook_cli). + +-include("emqx_exhook.hrl"). + +-export([cli/1]). + +cli(["server", "list"]) -> + if_enabled(fun() -> + Services = emqx_exhook:list(), + [emqx_ctl:print("HookServer(~s)~n", [emqx_exhook_server:format(Service)]) || Service <- Services] + end); + +cli(["server", "enable", Name0]) -> + if_enabled(fun() -> + Name = list_to_atom(Name0), + case proplists:get_value(Name, application:get_env(?APP, servers, [])) of + undefined -> + emqx_ctl:print("not_found~n"); + Opts -> + print(emqx_exhook:enable(Name, Opts)) + end + end); + +cli(["server", "disable", Name]) -> + if_enabled(fun() -> + print(emqx_exhook:disable(list_to_atom(Name))) + end); + +cli(["server", "stats"]) -> + if_enabled(fun() -> + [emqx_ctl:print("~-35s:~w~n", [Name, N]) || {Name, N} <- stats()] + end); + +cli(_) -> + emqx_ctl:usage([{"exhook server list", "List all running exhook server"}, + {"exhook server enable ", "Enable a exhook server in the configuration"}, + {"exhook server disable ", "Disable a exhook server"}, + {"exhook server stats", "Print exhook server statistic"}]). + +print(ok) -> + emqx_ctl:print("ok~n"); +print({error, Reason}) -> + emqx_ctl:print("~p~n", [Reason]). + +%%-------------------------------------------------------------------- +%% Internal funcs +%%-------------------------------------------------------------------- + +if_enabled(Fun) -> + case lists:keymember(?APP, 1, application:which_applications()) of + true -> Fun(); + _ -> hint() + end. + +hint() -> + emqx_ctl:print("Please './bin/emqx_ctl plugins load emqx_exhook' first.~n"). + +stats() -> + lists:usort(lists:foldr(fun({K, N}, Acc) -> + case atom_to_list(K) of + "exhook." ++ Key -> [{Key, N}|Acc]; + _ -> Acc + end + end, [], emqx_metrics:all())). diff --git a/apps/emqx_exhook/src/emqx_exhook_handler.erl b/apps/emqx_exhook/src/emqx_exhook_handler.erl new file mode 100644 index 000000000..272f3f08f --- /dev/null +++ b/apps/emqx_exhook/src/emqx_exhook_handler.erl @@ -0,0 +1,288 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exhook_handler). + +-include("emqx_exhook.hrl"). +-include_lib("emqx_libs/include/emqx.hrl"). +-include_lib("emqx_libs/include/logger.hrl"). + +-logger_header("[ExHook]"). + +-export([ on_client_connect/2 + , on_client_connack/3 + , on_client_connected/2 + , on_client_disconnected/3 + , on_client_authenticate/2 + , on_client_check_acl/4 + , on_client_subscribe/3 + , on_client_unsubscribe/3 + ]). + +%% Session Lifecircle Hooks +-export([ on_session_created/2 + , on_session_subscribed/3 + , on_session_unsubscribed/3 + , on_session_resumed/2 + , on_session_discarded/2 + , on_session_takeovered/2 + , on_session_terminated/3 + ]). + +%% Utils +-export([ message/1 + , stringfy/1 + , merge_responsed_bool/2 + , merge_responsed_message/2 + , assign_to_message/2 + , clientinfo/1 + ]). + +-import(emqx_exhook, + [ cast/2 + , call_fold/3 + ]). + +-exhooks([ {'client.connect', {?MODULE, on_client_connect, []}} + , {'client.connack', {?MODULE, on_client_connack, []}} + , {'client.connected', {?MODULE, on_client_connected, []}} + , {'client.disconnected', {?MODULE, on_client_disconnected, []}} + , {'client.authenticate', {?MODULE, on_client_authenticate, []}} + , {'client.check_acl', {?MODULE, on_client_check_acl, []}} + , {'client.subscribe', {?MODULE, on_client_subscribe, []}} + , {'client.unsubscribe', {?MODULE, on_client_unsubscribe, []}} + , {'session.created', {?MODULE, on_session_created, []}} + , {'session.subscribed', {?MODULE, on_session_subscribed, []}} + , {'session.unsubscribed',{?MODULE, on_session_unsubscribed, []}} + , {'session.resumed', {?MODULE, on_session_resumed, []}} + , {'session.discarded', {?MODULE, on_session_discarded, []}} + , {'session.takeovered', {?MODULE, on_session_takeovered, []}} + , {'session.terminated', {?MODULE, on_session_terminated, []}} + ]). + +%%-------------------------------------------------------------------- +%% Clients +%%-------------------------------------------------------------------- + +on_client_connect(ConnInfo, Props) -> + Req = #{conninfo => conninfo(ConnInfo), + props => properties(Props) + }, + cast('client.connect', Req). + +on_client_connack(ConnInfo, Rc, Props) -> + Req = #{conninfo => conninfo(ConnInfo), + result_code => stringfy(Rc), + props => properties(Props)}, + cast('client.connack', Req). + +on_client_connected(ClientInfo, _ConnInfo) -> + Req = #{clientinfo => clientinfo(ClientInfo)}, + cast('client.connected', Req). + +on_client_disconnected(ClientInfo, Reason, _ConnInfo) -> + Req = #{clientinfo => clientinfo(ClientInfo), + reason => stringfy(Reason) + }, + cast('client.disconnected', Req). + +on_client_authenticate(ClientInfo, AuthResult) -> + Bool = maps:get(auth_result, AuthResult, undefined) == success, + Req = #{clientinfo => clientinfo(ClientInfo), + result => Bool + }, + + case call_fold('client.authenticate', Req, + fun merge_responsed_bool/2) of + {StopOrOk, #{result := Bool}} when is_boolean(Bool) -> + Result = case Bool of true -> success; _ -> not_authorized end, + {StopOrOk, AuthResult#{auth_result => Result, anonymous => false}}; + _ -> + {ok, AuthResult} + end. + +on_client_check_acl(ClientInfo, PubSub, Topic, Result) -> + Bool = Result == allow, + Type = case PubSub of + publish -> 'PUBLISH'; + subscribe -> 'SUBSCRIBE' + end, + Req = #{clientinfo => clientinfo(ClientInfo), + type => Type, + topic => Topic, + result => Bool + }, + case call_fold('client.check_acl', Req, + fun merge_responsed_bool/2) of + {StopOrOk, #{result := Bool}} when is_boolean(Bool) -> + NResult = case Bool of true -> allow; _ -> deny end, + {StopOrOk, NResult}; + _ -> {ok, Result} + end. + +on_client_subscribe(ClientInfo, Props, TopicFilters) -> + Req = #{clientinfo => clientinfo(ClientInfo), + props => properties(Props), + topic_filters => topicfilters(TopicFilters) + }, + cast('client.subscribe', Req). + +on_client_unsubscribe(ClientInfo, Props, TopicFilters) -> + Req = #{clientinfo => clientinfo(ClientInfo), + props => properties(Props), + topic_filters => topicfilters(TopicFilters) + }, + cast('client.unsubscribe', Req). + +%%-------------------------------------------------------------------- +%% Session +%%-------------------------------------------------------------------- + +on_session_created(ClientInfo, _SessInfo) -> + Req = #{clientinfo => clientinfo(ClientInfo)}, + cast('session.created', Req). + +on_session_subscribed(ClientInfo, Topic, SubOpts) -> + Req = #{clientinfo => clientinfo(ClientInfo), + topic => Topic, + subopts => maps:with([qos, share, rh, rap, nl], SubOpts) + }, + cast('session.subscribed', Req). + +on_session_unsubscribed(ClientInfo, Topic, _SubOpts) -> + Req = #{clientinfo => clientinfo(ClientInfo), + topic => Topic + }, + cast('session.unsubscribed', Req). + +on_session_resumed(ClientInfo, _SessInfo) -> + Req = #{clientinfo => clientinfo(ClientInfo)}, + cast('session.resumed', Req). + +on_session_discarded(ClientInfo, _SessInfo) -> + Req = #{clientinfo => clientinfo(ClientInfo)}, + cast('session.discarded', Req). + +on_session_takeovered(ClientInfo, _SessInfo) -> + Req = #{clientinfo => clientinfo(ClientInfo)}, + cast('session.takeovered', Req). + +on_session_terminated(ClientInfo, Reason, _SessInfo) -> + Req = #{clientinfo => clientinfo(ClientInfo), + reason => stringfy(Reason)}, + cast('session.terminated', Req). + +%%-------------------------------------------------------------------- +%% Types + +properties(undefined) -> []; +properties(M) when is_map(M) -> + maps:fold(fun(K, V, Acc) -> + [#{name => stringfy(K), + value => stringfy(V)} | Acc] + end, [], M). + +conninfo(_ConnInfo = + #{clientid := ClientId, username := Username, peername := {Peerhost, _}, + sockname := {_, SockPort}, proto_name := ProtoName, proto_ver := ProtoVer, + keepalive := Keepalive}) -> + #{node => stringfy(node()), + clientid => ClientId, + username => maybe(Username), + peerhost => ntoa(Peerhost), + sockport => SockPort, + proto_name => ProtoName, + proto_ver => stringfy(ProtoVer), + keepalive => Keepalive}. + +clientinfo(ClientInfo = + #{clientid := ClientId, username := Username, peerhost := PeerHost, + sockport := SockPort, protocol := Protocol, mountpoint := Mountpoiont}) -> + #{node => stringfy(node()), + clientid => ClientId, + username => maybe(Username), + password => maybe(maps:get(password, ClientInfo, undefined)), + peerhost => ntoa(PeerHost), + sockport => SockPort, + protocol => stringfy(Protocol), + mountpoint => maybe(Mountpoiont), + is_superuser => maps:get(is_superuser, ClientInfo, false), + anonymous => maps:get(anonymous, ClientInfo, true)}. + +message(#message{id = Id, qos = Qos, from = From, topic = Topic, payload = Payload, timestamp = Ts}) -> + #{node => stringfy(node()), + id => hexstr(Id), + qos => Qos, + from => stringfy(From), + topic => Topic, + payload => Payload, + timestamp => Ts}. + +assign_to_message(#{qos := Qos, topic := Topic, payload := Payload}, Message) -> + Message#message{qos = Qos, topic = Topic, payload = Payload}. + +topicfilters(Tfs) when is_list(Tfs) -> + [#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs]. + +ntoa({0,0,0,0,0,16#ffff,AB,CD}) -> + list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256})); +ntoa(IP) -> + list_to_binary(inet_parse:ntoa(IP)). + +maybe(undefined) -> <<>>; +maybe(B) -> B. + +%% @private +stringfy(Term) when is_binary(Term) -> + Term; +stringfy(Term) when is_integer(Term) -> + integer_to_binary(Term); +stringfy(Term) when is_atom(Term) -> + atom_to_binary(Term, utf8); +stringfy(Term) -> + unicode:characters_to_binary((io_lib:format("~0p", [Term]))). + +hexstr(B) -> + iolist_to_binary([io_lib:format("~2.16.0B", [X]) || X <- binary_to_list(B)]). + +%%-------------------------------------------------------------------- +%% Acc funcs + +%% see exhook.proto +merge_responsed_bool(Req, #{type := 'IGNORE'}) -> + {ok, Req}; +merge_responsed_bool(Req, #{type := Type, value := {bool_result, NewBool}}) + when is_boolean(NewBool) -> + NReq = Req#{result => NewBool}, + case Type of + 'CONTINUE' -> {ok, NReq}; + 'STOP_AND_RETURN' -> {stop, NReq} + end; +merge_responsed_bool(Req, Resp) -> + ?LOG(warning, "Unknown responsed value ~0p to merge to callback chain", [Resp]), + {ok, Req}. + +merge_responsed_message(Req, #{type := 'IGNORE'}) -> + {ok, Req}; +merge_responsed_message(Req, #{type := Type, value := {message, NMessage}}) -> + NReq = Req#{message => NMessage}, + case Type of + 'CONTINUE' -> {ok, NReq}; + 'STOP_AND_RETURN' -> {stop, NReq} + end; +merge_responsed_message(Req, Resp) -> + ?LOG(warning, "Unknown responsed value ~0p to merge to callback chain", [Resp]), + {ok, Req}. diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl new file mode 100644 index 000000000..21bc46889 --- /dev/null +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -0,0 +1,274 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exhook_server). + +-include_lib("emqx_libs/include/logger.hrl"). + +-logger_header("[ExHook Svr]"). + +-define(PB_CLIENT_MOD, emqx_exhook_v_1_hook_provider_client). + +%% Load/Unload +-export([ load/2 + , unload/1 + ]). + +%% APIs +-export([call/3]). + +%% Infos +-export([ name/1 + , format/1 + ]). + +-record(server, { + %% Server name (equal to grpcbox client channel name) + name :: server_name(), + %% The server started options + options :: list(), + %% gRPC channel pid + channel :: pid(), + %% Registered hook names and options + hookspec :: #{hookpoint() => map()}, + %% Metric fun + incfun :: function() + }). + +-type server_name() :: atom(). +-type server() :: #server{}. + +-type hookpoint() :: 'client.connect' + | 'client.connack' + | 'client.connected' + | 'client.disconnected' + | 'client.authenticate' + | 'client.check_acl' + | 'client.subscribe' + | 'client.unsubscribe' + | 'session.created' + | 'session.subscribed' + | 'session.unsubscribed' + | 'session.resumed' + | 'session.discarded' + | 'session.takeovered' + | 'session.terminated' + | 'message.publish' + | 'message.delivered' + | 'message.acked' + | 'message.dropped'. + +-export_type([server/0]). + +%%-------------------------------------------------------------------- +%% Load/Unload APIs +%%-------------------------------------------------------------------- + +-spec load(atom(), list()) -> {ok, server()} | {error, term()} . +load(Name, Opts0) -> + {Endpoints, Options} = channel_opts(Opts0), + StartFun = case proplists:get_bool(inplace, Opts0) of + true -> start_grpc_client_channel_inplace; + _ -> start_grpc_client_channel + end, + case emqx_exhook_sup:StartFun(Name, Endpoints, Options) of + {ok, ChannPid} -> + case do_init(Name) of + {ok, HookSpecs} -> + %% Reigster metrics + Prefix = lists:flatten(io_lib:format("exhook.~s.", [Name])), + ensure_metrics(Prefix, HookSpecs), + {ok, #server{name = Name, + options = Opts0, + channel = ChannPid, + hookspec = HookSpecs, + incfun = incfun(Prefix) }}; + {error, _} = E -> E + end; + {error, _} = E -> E + end. + +%% @private +channel_opts(Opts) -> + Scheme = proplists:get_value(scheme, Opts), + Host = proplists:get_value(host, Opts), + Port = proplists:get_value(port, Opts), + Options = proplists:get_value(options, Opts, []), + SslOpts = case Scheme of + https -> proplists:get_value(ssl_options, Opts, []); + _ -> [] + end, + {[{Scheme, Host, Port, SslOpts}], maps:from_list(Options)}. + +-spec unload(server()) -> ok. +unload(#server{name = Name, channel = ChannPid, options = Options}) -> + _ = do_deinit(Name), + {StopFun, Args} = case proplists:get_bool(inplace, Options) of + true -> {stop_grpc_client_channel_inplace, [ChannPid]}; + _ -> {stop_grpc_client_channel, [Name]} + end, + apply(emqx_exhook_sup, StopFun, Args). + +do_deinit(Name) -> + _ = do_call(Name, 'on_provider_unloaded', #{}), + ok. + +do_init(ChannName) -> + Req = #{broker => maps:from_list(emqx_sys:info())}, + case do_call(ChannName, 'on_provider_loaded', Req) of + {ok, InitialResp} -> + try + {ok, resovle_hookspec(maps:get(hooks, InitialResp, []))} + catch _:Reason:Stk -> + ?LOG(error, "try to init ~p failed, reason: ~p, stacktrace: ~0p", + [ChannName, Reason, Stk]), + {error, Reason} + end; + {error, Reason} -> + {error, Reason} + end. + +%% @private +resovle_hookspec(HookSpecs) when is_list(HookSpecs) -> + MessageHooks = message_hooks(), + AvailableHooks = available_hooks(), + lists:foldr(fun(HookSpec, Acc) -> + case maps:get(name, HookSpec, undefined) of + undefined -> Acc; + Name0 -> + Name = try binary_to_existing_atom(Name0, utf8) catch T:R:_ -> {T,R} end, + case lists:member(Name, AvailableHooks) of + true -> + case lists:member(Name, MessageHooks) of + true -> + Acc#{Name => #{topics => maps:get(topics, HookSpec, [])}}; + _ -> + Acc#{Name => #{}} + end; + _ -> error({unknown_hookpoint, Name}) + end + end + end, #{}, HookSpecs). + +ensure_metrics(Prefix, HookSpecs) -> + Keys = [list_to_atom(Prefix ++ atom_to_list(Hookpoint)) + || Hookpoint <- maps:keys(HookSpecs)], + lists:foreach(fun emqx_metrics:ensure/1, Keys). + +incfun(Prefix) -> + fun(Name) -> + emqx_metrics:inc(list_to_atom(Prefix ++ atom_to_list(Name))) + end. + +format(#server{name = Name, hookspec = Hooks}) -> + io_lib:format("name=~p, hooks=~0p", [Name, Hooks]). + +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- + +name(#server{name = Name}) -> + Name. + +-spec call(hookpoint(), map(), server()) + -> ignore + | {ok, Resp :: term()} + | {error, term()}. +call(Hookpoint, Req, #server{name = ChannName, hookspec = Hooks, incfun = IncFun}) -> + GrpcFunc = hk2func(Hookpoint), + case maps:get(Hookpoint, Hooks, undefined) of + undefined -> ignore; + Opts -> + NeedCall = case lists:member(Hookpoint, message_hooks()) of + false -> true; + _ -> + #{message := #{topic := Topic}} = Req, + match_topic_filter(Topic, maps:get(topics, Opts, [])) + end, + case NeedCall of + false -> ignore; + _ -> + IncFun(Hookpoint), + do_call(ChannName, GrpcFunc, Req) + end + end. + +-compile({inline, [match_topic_filter/2]}). +match_topic_filter(_, []) -> + true; +match_topic_filter(TopicName, TopicFilter) -> + lists:any(fun(F) -> emqx_topic:match(TopicName, F) end, TopicFilter). + +-spec do_call(atom(), atom(), map()) -> {ok, map()} | {error, term()}. +do_call(ChannName, Fun, Req) -> + Options = #{channel => ChannName}, + ?LOG(debug, "Call ~0p:~0p(~0p, ~0p)", [?PB_CLIENT_MOD, Fun, Req, Options]), + case catch apply(?PB_CLIENT_MOD, Fun, [Req, Options]) of + {ok, Resp, _Metadata} -> + ?LOG(debug, "Response {ok, ~0p, ~0p}", [Resp, _Metadata]), + {ok, Resp}; + {error, {Code, Msg}, _Metadata} -> + ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) response errcode: ~0p, errmsg: ~0p", + [?PB_CLIENT_MOD, Fun, Req, Options, Code, Msg]), + {error, {Code, Msg}}; + {error, Reason} -> + ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) error: ~0p", + [?PB_CLIENT_MOD, Fun, Req, Options, Reason]), + {error, Reason}; + {'EXIT', Reason, Stk} -> + ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) throw an exception: ~0p, stacktrace: ~p", + [?PB_CLIENT_MOD, Fun, Req, Options, Reason, Stk]), + {error, Reason} + end. + +%%-------------------------------------------------------------------- +%% Internal funcs +%%-------------------------------------------------------------------- + +-compile({inline, [hk2func/1]}). +hk2func('client.connect') -> 'on_client_connect'; +hk2func('client.connack') -> 'on_client_connack'; +hk2func('client.connected') -> 'on_client_connected'; +hk2func('client.disconnected') -> 'on_client_disconnected'; +hk2func('client.authenticate') -> 'on_client_authenticate'; +hk2func('client.check_acl') -> 'on_client_check_acl'; +hk2func('client.subscribe') -> 'on_client_subscribe'; +hk2func('client.unsubscribe') -> 'on_client_unsubscribe'; +hk2func('session.created') -> 'on_session_created'; +hk2func('session.subscribed') -> 'on_session_subscribed'; +hk2func('session.unsubscribed') -> 'on_session_unsubscribed'; +hk2func('session.resumed') -> 'on_session_resumed'; +hk2func('session.discarded') -> 'on_session_discarded'; +hk2func('session.takeovered') -> 'on_session_takeovered'; +hk2func('session.terminated') -> 'on_session_terminated'; +hk2func('message.publish') -> 'on_message_publish'; +hk2func('message.delivered') ->'on_message_delivered'; +hk2func('message.acked') -> 'on_message_acked'; +hk2func('message.dropped') ->'on_message_dropped'. + +-compile({inline, [message_hooks/0]}). +message_hooks() -> + ['message.publish', 'message.delivered', + 'message.acked', 'message.dropped']. + +-compile({inline, [available_hooks/0]}). +available_hooks() -> + ['client.connect', 'client.connack', 'client.connected', + 'client.disconnected', 'client.authenticate', 'client.check_acl', + 'client.subscribe', 'client.unsubscribe', + 'session.created', 'session.subscribed', 'session.unsubscribed', + 'session.resumed', 'session.discarded', 'session.takeovered', + 'session.terminated' | message_hooks()]. diff --git a/apps/emqx_exhook/src/emqx_exhook_sup.erl b/apps/emqx_exhook/src/emqx_exhook_sup.erl new file mode 100644 index 000000000..3238873df --- /dev/null +++ b/apps/emqx_exhook/src/emqx_exhook_sup.erl @@ -0,0 +1,74 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exhook_sup). + +-behaviour(supervisor). + +-export([ start_link/0 + , init/1 + ]). + +-export([ start_grpc_client_channel/3 + , stop_grpc_client_channel/1 + ]). + +-export([ start_grpc_client_channel_inplace/3 + , stop_grpc_client_channel_inplace/1 + ]). + +%%-------------------------------------------------------------------- +%% Supervisor APIs & Callbacks +%%-------------------------------------------------------------------- + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + {ok, {{one_for_one, 10, 100}, []}}. + +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- + +-spec start_grpc_client_channel( + atom() | string(), + [grpcbox_channel:endpoint()], + grpcbox_channel:options()) -> {ok, pid()} | {error, term()}. +start_grpc_client_channel(Name, Endpoints, Options0) -> + Options = Options0#{sync_start => true}, + Spec = #{id => Name, + start => {grpcbox_channel, start_link, [Name, Endpoints, Options]}, + type => worker}, + + supervisor:start_child(?MODULE, Spec). + +-spec stop_grpc_client_channel(atom()) -> ok. +stop_grpc_client_channel(Name) -> + ok = supervisor:terminate_child(?MODULE, Name), + ok = supervisor:delete_child(?MODULE, Name). + +-spec start_grpc_client_channel_inplace( + atom() | string(), + [grpcbox_channel:endpoint()], + grpcbox_channel:options()) -> {ok, pid()} | {error, term()}. +start_grpc_client_channel_inplace(Name, Endpoints, Options0) -> + Options = Options0#{sync_start => true}, + grpcbox_channel_sup:start_child(Name, Endpoints, Options). + +-spec stop_grpc_client_channel_inplace(pid()) -> ok. +stop_grpc_client_channel_inplace(Pid) -> + ok = supervisor:terminate_child(grpcbox_channel_sup, Pid). diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl new file mode 100644 index 000000000..abfc6f398 --- /dev/null +++ b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl @@ -0,0 +1,53 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exhook_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +%%-------------------------------------------------------------------- +%% Setups +%%-------------------------------------------------------------------- + +all() -> emqx_ct:all(?MODULE). + +init_per_suite(Cfg) -> + _ = emqx_exhook_demo_svr:start(), + emqx_ct_helpers:start_apps([emqx_exhook], fun set_special_cfgs/1), + Cfg. + +end_per_suite(Cfg) -> + emqx_exhook_demo_svr:stop(), + emqx_ct_helpers:stop_apps([emqx_exhook]). + +set_special_cfgs(emqx) -> + application:set_env(emqx, allow_anonymous, false), + application:set_env(emqx, enable_acl_cache, false), + application:set_env(emqx, plugins_loaded_file, + emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins")); +set_special_cfgs(emqx_exhook) -> + ok. + +%%-------------------------------------------------------------------- +%% Test cases +%%-------------------------------------------------------------------- + +t_hooks(Cfg) -> + ok. diff --git a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl new file mode 100644 index 000000000..08a1d4f7c --- /dev/null +++ b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl @@ -0,0 +1,299 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_exhook_demo_svr). + +-behavior(emqx_exhook_v_1_hook_provider_bhvr). + +%% +-export([ start/0 + , stop/0 + , take/0 + , in/1 + ]). + +%% gRPC server HookProvider callbacks +-export([ on_provider_loaded/2 + , on_provider_unloaded/2 + , on_client_connect/2 + , on_client_connack/2 + , on_client_connected/2 + , on_client_disconnected/2 + , on_client_authenticate/2 + , on_client_check_acl/2 + , on_client_subscribe/2 + , on_client_unsubscribe/2 + , on_session_created/2 + , on_session_subscribed/2 + , on_session_unsubscribed/2 + , on_session_resumed/2 + , on_session_discarded/2 + , on_session_takeovered/2 + , on_session_terminated/2 + , on_message_publish/2 + , on_message_delivered/2 + , on_message_dropped/2 + , on_message_acked/2 + ]). + +-define(PORT, 9000). + +-define(HTTP, #{grpc_opts => #{service_protos => [emqx_exhook_pb], + services => #{'emqx.exhook.v1.HookProvider' => emqx_exhook_demo_svr}}, + listen_opts => #{port => ?PORT, + socket_options => [{reuseaddr, true}]}, + pool_opts => #{size => 8}, + transport_opts => #{ssl => false}}). + +%%-------------------------------------------------------------------- +%% Server APIs +%%-------------------------------------------------------------------- + +start() -> + Pid = spawn(fun mngr_main/0), + register(?MODULE, Pid), + {ok, Pid}. + +stop() -> + ?MODULE ! stop. + +take() -> + ?MODULE ! {take, self()}, + receive {value, V} -> V + after 5000 -> error(timeout) end. + +in({FunName, Req}) -> + ?MODULE ! {in, FunName, Req}. + +mngr_main() -> + application:ensure_all_started(grpcbox), + Svr = grpcbox:start_server(?HTTP), + mngr_loop([Svr, queue:new(), queue:new()]). + +mngr_loop([Svr, Q, Takes]) -> + receive + {in, FunName, Req} -> + {NQ1, NQ2} = reply(queue:in({FunName, Req}, Q), Takes), + mngr_loop([Svr, NQ1, NQ2]); + {take, From} -> + {NQ1, NQ2} = reply(Q, queue:in(From, Takes)), + mngr_loop([Svr, NQ1, NQ2]); + stop -> + supervisor:terminate_child(grpcbox_services_simple_sup, Svr), + exit(normal) + end. + +reply(Q1, Q2) -> + case queue:len(Q1) =:= 0 orelse + queue:len(Q2) =:= 0 of + true -> {Q1, Q2}; + _ -> + {{value, {Name, V}}, NQ1} = queue:out(Q1), + {{value, From}, NQ2} = queue:out(Q2), + From ! {value, {Name, V}}, + {NQ1, NQ2} + end. + +%%-------------------------------------------------------------------- +%% callbacks +%%-------------------------------------------------------------------- + +-spec on_provider_loaded(ctx:ctx(), emqx_exhook_pb:on_provider_loadedial_request()) + -> {ok, emqx_exhook_pb:on_provider_loaded_response(), ctx:ctx()} + | grpcbox_stream:grpc_error_response(). +on_provider_loaded(Ctx, Req) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{hooks => [ + #{name => <<"client.connect">>}, + #{name => <<"client.connack">>}, + #{name => <<"client.connected">>}, + #{name => <<"client.disconnected">>}, + #{name => <<"client.authenticate">>}, + #{name => <<"client.check_acl">>}, + #{name => <<"client.subscribe">>}, + #{name => <<"client.unsubscribe">>}, + #{name => <<"session.created">>}, + #{name => <<"session.subscribed">>}, + #{name => <<"session.unsubscribed">>}, + #{name => <<"session.resumed">>}, + #{name => <<"session.discarded">>}, + #{name => <<"session.takeovered">>}, + #{name => <<"session.terminated">>}, + #{name => <<"message.publish">>}, + #{name => <<"message.delivered">>}, + #{name => <<"message.acked">>}, + #{name => <<"message.dropped">>}]}, Ctx}. + +-spec on_provider_unloaded(ctx:ctx(), emqx_exhook_pb:on_provider_unloadedial_request()) + -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()} + | grpcbox_stream:grpc_error_response(). +on_provider_unloaded(Ctx, Req) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Ctx}. + +-spec on_client_connect(ctx:ctx(), emqx_exhook_pb:client_connect_request()) + -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()} + | grpcbox_stream:grpc_error_response(). +on_client_connect(Ctx, Req) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Ctx}. + +-spec on_client_connack(ctx:ctx(), emqx_exhook_pb:client_connack_request()) + -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()} + | grpcbox_stream:grpc_error_response(). +on_client_connack(Ctx, Req) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Ctx}. + +-spec on_client_connected(ctx:ctx(), emqx_exhook_pb:client_connected_request()) + -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()} + | grpcbox_stream:grpc_error_response(). +on_client_connected(Ctx, Req) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Ctx}. + +-spec on_client_disconnected(ctx:ctx(), emqx_exhook_pb:client_disconnected_request()) + -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()} + | grpcbox_stream:grpc_error_response(). +on_client_disconnected(Ctx, Req) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Ctx}. + +-spec on_client_authenticate(ctx:ctx(), emqx_exhook_pb:client_authenticate_request()) + -> {ok, emqx_exhook_pb:bool_result(), ctx:ctx()} + | grpcbox_stream:grpc_error_response(). +on_client_authenticate(Ctx, Req) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{type => 'IGNORE'}, Ctx}. + +-spec on_client_check_acl(ctx:ctx(), emqx_exhook_pb:client_check_acl_request()) + -> {ok, emqx_exhook_pb:bool_result(), ctx:ctx()} + | grpcbox_stream:grpc_error_response(). +on_client_check_acl(Ctx, Req) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{type => 'STOP_AND_RETURN', value => {bool_result, true}}, Ctx}. + +-spec on_client_subscribe(ctx:ctx(), emqx_exhook_pb:client_subscribe_request()) + -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()} + | grpcbox_stream:grpc_error_response(). +on_client_subscribe(Ctx, Req) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Ctx}. + +-spec on_client_unsubscribe(ctx:ctx(), emqx_exhook_pb:client_unsubscribe_request()) + -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()} + | grpcbox_stream:grpc_error_response(). +on_client_unsubscribe(Ctx, Req) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Ctx}. + +-spec on_session_created(ctx:ctx(), emqx_exhook_pb:session_created_request()) + -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()} + | grpcbox_stream:grpc_error_response(). +on_session_created(Ctx, Req) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Ctx}. + +-spec on_session_subscribed(ctx:ctx(), emqx_exhook_pb:session_subscribed_request()) + -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()} + | grpcbox_stream:grpc_error_response(). +on_session_subscribed(Ctx, Req) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Ctx}. + +-spec on_session_unsubscribed(ctx:ctx(), emqx_exhook_pb:session_unsubscribed_request()) + -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()} + | grpcbox_stream:grpc_error_response(). +on_session_unsubscribed(Ctx, Req) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Ctx}. + +-spec on_session_resumed(ctx:ctx(), emqx_exhook_pb:session_resumed_request()) + -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()} + | grpcbox_stream:grpc_error_response(). +on_session_resumed(Ctx, Req) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Ctx}. + +-spec on_session_discarded(ctx:ctx(), emqx_exhook_pb:session_discarded_request()) + -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()} + | grpcbox_stream:grpc_error_response(). +on_session_discarded(Ctx, Req) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Ctx}. + +-spec on_session_takeovered(ctx:ctx(), emqx_exhook_pb:session_takeovered_request()) + -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()} + | grpcbox_stream:grpc_error_response(). +on_session_takeovered(Ctx, Req) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Ctx}. + +-spec on_session_terminated(ctx:ctx(), emqx_exhook_pb:session_terminated_request()) + -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()} + | grpcbox_stream:grpc_error_response(). +on_session_terminated(Ctx, Req) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Ctx}. + +-spec on_message_publish(ctx:ctx(), emqx_exhook_pb:message_publish_request()) + -> {ok, emqx_exhook_pb:valued_response(), ctx:ctx()} + | grpcbox_stream:grpc_error_response(). +on_message_publish(Ctx, Req) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Ctx}. + +-spec on_message_delivered(ctx:ctx(), emqx_exhook_pb:message_delivered_request()) + -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()} + | grpcbox_stream:grpc_error_response(). +on_message_delivered(Ctx, Req) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Ctx}. + +-spec on_message_dropped(ctx:ctx(), emqx_exhook_pb:message_dropped_request()) + -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()} + | grpcbox_stream:grpc_error_response(). +on_message_dropped(Ctx, Req) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Ctx}. + +-spec on_message_acked(ctx:ctx(), emqx_exhook_pb:message_acked_request()) + -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()} + | grpcbox_stream:grpc_error_response(). +on_message_acked(Ctx, Req) -> + ?MODULE:in({?FUNCTION_NAME, Req}), + %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), + {ok, #{}, Ctx}. diff --git a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl new file mode 100644 index 000000000..e828d354c --- /dev/null +++ b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl @@ -0,0 +1,535 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(prop_exhook_hooks). + +-include_lib("proper/include/proper.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-import(emqx_ct_proper_types, + [ conninfo/0 + , clientinfo/0 + , sessioninfo/0 + , message/0 + , connack_return_code/0 + , topictab/0 + , topic/0 + , subopts/0 + ]). + +-define(ALL(Vars, Types, Exprs), + ?SETUP(fun() -> + State = do_setup(), + fun() -> do_teardown(State) end + end, ?FORALL(Vars, Types, Exprs))). + +%%-------------------------------------------------------------------- +%% Properties +%%-------------------------------------------------------------------- + +prop_client_connect() -> + ?ALL({ConnInfo, ConnProps}, + {conninfo(), conn_properties()}, + begin + _OutConnProps = emqx_hooks:run_fold('client.connect', [ConnInfo], ConnProps), + {'on_client_connect', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{props => properties(ConnProps), + conninfo => + #{node => nodestr(), + clientid => maps:get(clientid, ConnInfo), + username => maybe(maps:get(username, ConnInfo, <<>>)), + peerhost => peerhost(ConnInfo), + sockport => sockport(ConnInfo), + proto_name => maps:get(proto_name, ConnInfo), + proto_ver => stringfy(maps:get(proto_ver, ConnInfo)), + keepalive => maps:get(keepalive, ConnInfo) + } + }, + ?assertEqual(Expected, Resp), + true + end). + +prop_client_connack() -> + ?ALL({ConnInfo, Rc, AckProps}, + {conninfo(), connack_return_code(), ack_properties()}, + begin + _OutAckProps = emqx_hooks:run_fold('client.connack', [ConnInfo, Rc], AckProps), + {'on_client_connack', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{props => properties(AckProps), + result_code => atom_to_binary(Rc, utf8), + conninfo => + #{node => nodestr(), + clientid => maps:get(clientid, ConnInfo), + username => maybe(maps:get(username, ConnInfo, <<>>)), + peerhost => peerhost(ConnInfo), + sockport => sockport(ConnInfo), + proto_name => maps:get(proto_name, ConnInfo), + proto_ver => stringfy(maps:get(proto_ver, ConnInfo)), + keepalive => maps:get(keepalive, ConnInfo) + } + }, + ?assertEqual(Expected, Resp), + true + end). + +prop_client_authenticate() -> + ?ALL({ClientInfo, AuthResult}, {clientinfo(), authresult()}, + begin + _OutAuthResult = emqx_hooks:run_fold('client.authenticate', [ClientInfo], AuthResult), + {'on_client_authenticate', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{result => authresult_to_bool(AuthResult), + clientinfo => + #{node => nodestr(), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo, <<>>)), + password => maybe(maps:get(password, ClientInfo, <<>>)), + peerhost => ntoa(maps:get(peerhost, ClientInfo)), + sockport => maps:get(sockport, ClientInfo), + protocol => stringfy(maps:get(protocol, ClientInfo)), + mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)), + is_superuser => maps:get(is_superuser, ClientInfo, false), + anonymous => maps:get(anonymous, ClientInfo, true) + } + }, + ?assertEqual(Expected, Resp), + true + end). + +prop_client_check_acl() -> + ?ALL({ClientInfo, PubSub, Topic, Result}, + {clientinfo(), oneof([publish, subscribe]), topic(), oneof([allow, deny])}, + begin + _OutResult = emqx_hooks:run_fold('client.check_acl', [ClientInfo, PubSub, Topic], Result), + {'on_client_check_acl', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{result => aclresult_to_bool(Result), + type => pubsub_to_enum(PubSub), + topic => Topic, + clientinfo => + #{node => nodestr(), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo, <<>>)), + password => maybe(maps:get(password, ClientInfo, <<>>)), + peerhost => ntoa(maps:get(peerhost, ClientInfo)), + sockport => maps:get(sockport, ClientInfo), + protocol => stringfy(maps:get(protocol, ClientInfo)), + mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)), + is_superuser => maps:get(is_superuser, ClientInfo, false), + anonymous => maps:get(anonymous, ClientInfo, true) + } + }, + ?assertEqual(Expected, Resp), + true + end). + + +prop_client_connected() -> + ?ALL({ClientInfo, ConnInfo}, + {clientinfo(), conninfo()}, + begin + ok = emqx_hooks:run('client.connected', [ClientInfo, ConnInfo]), + {'on_client_connected', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{clientinfo => + #{node => nodestr(), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo, <<>>)), + password => maybe(maps:get(password, ClientInfo, <<>>)), + peerhost => ntoa(maps:get(peerhost, ClientInfo)), + sockport => maps:get(sockport, ClientInfo), + protocol => stringfy(maps:get(protocol, ClientInfo)), + mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)), + is_superuser => maps:get(is_superuser, ClientInfo, false), + anonymous => maps:get(anonymous, ClientInfo, true) + } + }, + ?assertEqual(Expected, Resp), + true + end). + +prop_client_disconnected() -> + ?ALL({ClientInfo, Reason, ConnInfo}, + {clientinfo(), shutdown_reason(), conninfo()}, + begin + ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]), + {'on_client_disconnected', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{reason => stringfy(Reason), + clientinfo => + #{node => nodestr(), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo, <<>>)), + password => maybe(maps:get(password, ClientInfo, <<>>)), + peerhost => ntoa(maps:get(peerhost, ClientInfo)), + sockport => maps:get(sockport, ClientInfo), + protocol => stringfy(maps:get(protocol, ClientInfo)), + mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)), + is_superuser => maps:get(is_superuser, ClientInfo, false), + anonymous => maps:get(anonymous, ClientInfo, true) + } + }, + ?assertEqual(Expected, Resp), + true + end). + +prop_client_subscribe() -> + ?ALL({ClientInfo, SubProps, TopicTab}, + {clientinfo(), sub_properties(), topictab()}, + begin + _OutTopicTab = emqx_hooks:run_fold('client.subscribe', [ClientInfo, SubProps], TopicTab), + {'on_client_subscribe', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{props => properties(SubProps), + topic_filters => topicfilters(TopicTab), + clientinfo => + #{node => nodestr(), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo, <<>>)), + password => maybe(maps:get(password, ClientInfo, <<>>)), + peerhost => ntoa(maps:get(peerhost, ClientInfo)), + sockport => maps:get(sockport, ClientInfo), + protocol => stringfy(maps:get(protocol, ClientInfo)), + mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)), + is_superuser => maps:get(is_superuser, ClientInfo, false), + anonymous => maps:get(anonymous, ClientInfo, true) + } + }, + ?assertEqual(Expected, Resp), + true + end). + +prop_client_unsubscribe() -> + ?ALL({ClientInfo, UnSubProps, TopicTab}, + {clientinfo(), unsub_properties(), topictab()}, + begin + _OutTopicTab = emqx_hooks:run_fold('client.unsubscribe', [ClientInfo, UnSubProps], TopicTab), + {'on_client_unsubscribe', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{props => properties(UnSubProps), + topic_filters => topicfilters(TopicTab), + clientinfo => + #{node => nodestr(), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo, <<>>)), + password => maybe(maps:get(password, ClientInfo, <<>>)), + peerhost => ntoa(maps:get(peerhost, ClientInfo)), + sockport => maps:get(sockport, ClientInfo), + protocol => stringfy(maps:get(protocol, ClientInfo)), + mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)), + is_superuser => maps:get(is_superuser, ClientInfo, false), + anonymous => maps:get(anonymous, ClientInfo, true) + } + }, + ?assertEqual(Expected, Resp), + true + end). + +prop_session_created() -> + ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()}, + begin + ok = emqx_hooks:run('session.created', [ClientInfo, SessInfo]), + {'on_session_created', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{clientinfo => + #{node => nodestr(), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo, <<>>)), + password => maybe(maps:get(password, ClientInfo, <<>>)), + peerhost => ntoa(maps:get(peerhost, ClientInfo)), + sockport => maps:get(sockport, ClientInfo), + protocol => stringfy(maps:get(protocol, ClientInfo)), + mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)), + is_superuser => maps:get(is_superuser, ClientInfo, false), + anonymous => maps:get(anonymous, ClientInfo, true) + } + }, + ?assertEqual(Expected, Resp), + true + end). + +prop_session_subscribed() -> + ?ALL({ClientInfo, Topic, SubOpts}, + {clientinfo(), topic(), subopts()}, + begin + ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]), + {'on_session_subscribed', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{topic => Topic, + subopts => subopts(SubOpts), + clientinfo => + #{node => nodestr(), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo, <<>>)), + password => maybe(maps:get(password, ClientInfo, <<>>)), + peerhost => ntoa(maps:get(peerhost, ClientInfo)), + sockport => maps:get(sockport, ClientInfo), + protocol => stringfy(maps:get(protocol, ClientInfo)), + mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)), + is_superuser => maps:get(is_superuser, ClientInfo, false), + anonymous => maps:get(anonymous, ClientInfo, true) + } + }, + ?assertEqual(Expected, Resp), + true + end). + +prop_session_unsubscribed() -> + ?ALL({ClientInfo, Topic, SubOpts}, + {clientinfo(), topic(), subopts()}, + begin + ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, Topic, SubOpts]), + {'on_session_unsubscribed', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{topic => Topic, + clientinfo => + #{node => nodestr(), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo, <<>>)), + password => maybe(maps:get(password, ClientInfo, <<>>)), + peerhost => ntoa(maps:get(peerhost, ClientInfo)), + sockport => maps:get(sockport, ClientInfo), + protocol => stringfy(maps:get(protocol, ClientInfo)), + mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)), + is_superuser => maps:get(is_superuser, ClientInfo, false), + anonymous => maps:get(anonymous, ClientInfo, true) + } + }, + ?assertEqual(Expected, Resp), + true + end). + +prop_session_resumed() -> + ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()}, + begin + ok = emqx_hooks:run('session.resumed', [ClientInfo, SessInfo]), + {'on_session_resumed', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{clientinfo => + #{node => nodestr(), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo, <<>>)), + password => maybe(maps:get(password, ClientInfo, <<>>)), + peerhost => ntoa(maps:get(peerhost, ClientInfo)), + sockport => maps:get(sockport, ClientInfo), + protocol => stringfy(maps:get(protocol, ClientInfo)), + mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)), + is_superuser => maps:get(is_superuser, ClientInfo, false), + anonymous => maps:get(anonymous, ClientInfo, true) + } + }, + ?assertEqual(Expected, Resp), + true + end). + +prop_session_discared() -> + ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()}, + begin + ok = emqx_hooks:run('session.discarded', [ClientInfo, SessInfo]), + {'on_session_discarded', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{clientinfo => + #{node => nodestr(), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo, <<>>)), + password => maybe(maps:get(password, ClientInfo, <<>>)), + peerhost => ntoa(maps:get(peerhost, ClientInfo)), + sockport => maps:get(sockport, ClientInfo), + protocol => stringfy(maps:get(protocol, ClientInfo)), + mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)), + is_superuser => maps:get(is_superuser, ClientInfo, false), + anonymous => maps:get(anonymous, ClientInfo, true) + } + }, + ?assertEqual(Expected, Resp), + true + end). + +prop_session_takeovered() -> + ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()}, + begin + ok = emqx_hooks:run('session.takeovered', [ClientInfo, SessInfo]), + {'on_session_takeovered', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{clientinfo => + #{node => nodestr(), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo, <<>>)), + password => maybe(maps:get(password, ClientInfo, <<>>)), + peerhost => ntoa(maps:get(peerhost, ClientInfo)), + sockport => maps:get(sockport, ClientInfo), + protocol => stringfy(maps:get(protocol, ClientInfo)), + mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)), + is_superuser => maps:get(is_superuser, ClientInfo, false), + anonymous => maps:get(anonymous, ClientInfo, true) + } + }, + ?assertEqual(Expected, Resp), + true + end). + +prop_session_terminated() -> + ?ALL({ClientInfo, Reason, SessInfo}, + {clientinfo(), shutdown_reason(), sessioninfo()}, + begin + ok = emqx_hooks:run('session.terminated', [ClientInfo, Reason, SessInfo]), + {'on_session_terminated', Resp} = emqx_exhook_demo_svr:take(), + Expected = + #{reason => stringfy(Reason), + clientinfo => + #{node => nodestr(), + clientid => maps:get(clientid, ClientInfo), + username => maybe(maps:get(username, ClientInfo, <<>>)), + password => maybe(maps:get(password, ClientInfo, <<>>)), + peerhost => ntoa(maps:get(peerhost, ClientInfo)), + sockport => maps:get(sockport, ClientInfo), + protocol => stringfy(maps:get(protocol, ClientInfo)), + mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)), + is_superuser => maps:get(is_superuser, ClientInfo, false), + anonymous => maps:get(anonymous, ClientInfo, true) + } + }, + ?assertEqual(Expected, Resp), + + true + end). + +nodestr() -> + stringfy(node()). + +peerhost(#{peername := {Host, _}}) -> + ntoa(Host). + +sockport(#{sockname := {_, Port}}) -> + Port. + +%% copied from emqx_exhook + +ntoa({0,0,0,0,0,16#ffff,AB,CD}) -> + list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256})); +ntoa(IP) -> + list_to_binary(inet_parse:ntoa(IP)). + +maybe(undefined) -> <<>>; +maybe(B) -> B. + +properties(undefined) -> []; +properties(M) when is_map(M) -> + maps:fold(fun(K, V, Acc) -> + [#{name => stringfy(K), + value => stringfy(V)} | Acc] + end, [], M). + +topicfilters(Tfs) when is_list(Tfs) -> + [#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs]. + +%% @private +stringfy(Term) when is_binary(Term) -> + Term; +stringfy(Term) when is_integer(Term) -> + integer_to_binary(Term); +stringfy(Term) when is_atom(Term) -> + atom_to_binary(Term, utf8); +stringfy(Term) -> + unicode:characters_to_binary((io_lib:format("~0p", [Term]))). + +subopts(SubOpts) -> + #{qos => maps:get(qos, SubOpts, 0), + rh => maps:get(rh, SubOpts, 0), + rap => maps:get(rap, SubOpts, 0), + nl => maps:get(nl, SubOpts, 0), + share => maps:get(share, SubOpts, <<>>) + }. + +authresult_to_bool(AuthResult) -> + maps:get(auth_result, AuthResult, undefined) == success. + +aclresult_to_bool(Result) -> + Result == allow. + +pubsub_to_enum(publish) -> 'PUBLISH'; +pubsub_to_enum(subscribe) -> 'SUBSCRIBE'. + +%prop_message_publish() -> +% ?ALL({Msg, Env, Encode}, {message(), topic_filter_env()}, +% begin +% true +% end). +% +%prop_message_delivered() -> +% ?ALL({ClientInfo, Msg, Env, Encode}, {clientinfo(), message(), topic_filter_env()}, +% begin +% true +% end). +% +%prop_message_acked() -> +% ?ALL({ClientInfo, Msg, Env, Encode}, {clientinfo(), message()}, +% begin +% true +% end). + +%%-------------------------------------------------------------------- +%% Helper +%%-------------------------------------------------------------------- + +do_setup() -> + _ = emqx_exhook_demo_svr:start(), + emqx_ct_helpers:start_apps([emqx_exhook], fun set_special_cfgs/1), + %% waiting first loaded event + {'on_provider_loaded', _} = emqx_exhook_demo_svr:take(), + ok. + +do_teardown(_) -> + emqx_ct_helpers:stop_apps([emqx_exhook]), + %% waiting last unloaded event + {'on_provider_unloaded', _} = emqx_exhook_demo_svr:take(), + _ = emqx_exhook_demo_svr:stop(), + ok. + +set_special_cfgs(emqx) -> + application:set_env(emqx, allow_anonymous, false), + application:set_env(emqx, enable_acl_cache, false), + application:set_env(emqx, plugins_loaded_file, + emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins")); +set_special_cfgs(emqx_exhook) -> + ok. + +%%-------------------------------------------------------------------- +%% Generators +%%-------------------------------------------------------------------- + +conn_properties() -> + #{}. + +ack_properties() -> + #{}. + +sub_properties() -> + #{}. + +unsub_properties() -> + #{}. + +shutdown_reason() -> + oneof([utf8(), {shutdown, atom()}]). + +authresult() -> + #{auth_result => connack_return_code()}. + +%topic_filter_env() -> +% oneof([{<<"#">>}, {undefined}, {topic()}]). diff --git a/etc/emqx_cloud.d/emqx_exhook.conf b/etc/emqx_cloud.d/emqx_exhook.conf new file mode 100644 index 000000000..f6f5213f7 --- /dev/null +++ b/etc/emqx_cloud.d/emqx_exhook.conf @@ -0,0 +1,15 @@ +##==================================================================== +## EMQ X Hooks +##==================================================================== + +##-------------------------------------------------------------------- +## Server Address + +## The gRPC server url +## +## exhook.server.$name.url = url() +exhook.server.default.url = http://127.0.0.1:9000 + +#exhook.server.default.ssl.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem +#exhook.server.default.ssl.certfile = {{ platform_etc_dir }}/certs/cert.pem +#exhook.server.default.ssl.keyfile = {{ platform_etc_dir }}/certs/key.pem diff --git a/rebar.config b/rebar.config index df71f9762..046a6e326 100644 --- a/rebar.config +++ b/rebar.config @@ -1,49 +1,208 @@ {minimum_otp_vsn, "21.3"}. - -{plugins, [rebar3_proper]}. - -{deps, - [{gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}, - {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}, - {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}}, - {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.4"}}}, - {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.4"}}}, - {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}}, - {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} - ]}. - -{erl_opts, [warn_unused_vars, - warn_shadow_vars, - warn_unused_import, - warn_obsolete_guard, - debug_info, - compressed %% for edge +{plugins,[{relup_helper,{git,"https://github.com/emqx/relup_helper", + {branch,"master"}}}]}. +{edge_relx_overlay,[]}. +{edoc_opts,[{preprocess,true}]}. +{erl_opts,[warn_unused_vars,warn_shadow_vars,warn_unused_import, + warn_obsolete_guard,no_debug_info,compressed]}. +{overrides,[{add,[{erl_opts,[no_debug_info,compressed, + {parse_transform,mod_vsn}]}]}]}. +{xref_checks,[undefined_function_calls,undefined_functions,locals_not_used, + deprecated_function_calls,warnings_as_errors, + deprecated_functions]}. +{dialyzer, [{warnings, [unmatched_returns, error_handling, race_conditions]} ]}. +{cover_enabled,true}. +{cover_opts,[verbose]}. +{cover_export_enabled,true}. +{provider_hooks,[{pre,[{release,{relup_helper,gen_appups}}]}, + {post,[{release,{relup_helper,otp_vsn}}, + {release,{relup_helper,untar}}]}]}. +{post_hooks,[]}. +{erl_first_files, ["apps/emqx/src/emqx_logger.erl"]}. -{overrides, [{add, [{erl_opts, [compressed]}]}]}. +"${COVERALL_CONFIGS}". -{edoc_opts, [{preprocess, true}]}. - -{xref_checks, [undefined_function_calls, undefined_functions, - locals_not_used, deprecated_function_calls, - warnings_as_errors, deprecated_functions - ]}. - -{cover_enabled, true}. -{cover_opts, [verbose]}. -{cover_export_enabled, true}. - -{erl_first_files, ["src/emqx_logger.erl"]}. +{deps, ["${BASIC_DEPS}"]}. {profiles, - [{test, - [{plugins, [{coveralls, {git, "https://github.com/emqx/coveralls-erl", {branch, "github"}}}]}, - {deps, - [{bbmustache, "1.7.0"}, - {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.0"}}}, - {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.3.0"}}} - ]}, - {erl_opts, [debug_info]} - ]} - ]}. + [{'emqx', + [{relx, + ["${BASIC_RELX_PARAMS}", + {release, {emqx, "${GIT_DESC}"}, ["${BASIC_RELX_APPS}", "${CLOUD_RELX_APPS}"]}, + {overlay, ["${BASIC_OVERLAYS}", "${CLOUD_OVERLAYS}"]}, + {overlay_vars,["vars/vars-cloud.config","vars/vars-bin.config"]}]}]}, + {'emqx-pkg', + [{relx, + ["${BASIC_RELX_PARAMS}", + {release, {emqx, "${GIT_DESC}"}, ["${BASIC_RELX_APPS}", "${CLOUD_RELX_APPS}"]}, + {overlay, ["${BASIC_OVERLAYS}", "${CLOUD_OVERLAYS}"]}, + {overlay_vars,["vars/vars-cloud.config","vars/vars-pkg.config"]}]}]}, + {'emqx-edge', + [{relx, + ["${BASIC_RELX_PARAMS}", + {release, {emqx, "${GIT_DESC}"}, ["${BASIC_RELX_APPS}"]}, + {overlay, ["${BASIC_OVERLAYS}", "${EDGE_OVERLAYS}"]}, + {overlay_vars,["vars/vars-edge.config","vars/vars-bin.config"]}]}]}, + {'emqx-edge-pkg', + [{relx, + ["${BASIC_RELX_PARAMS}", + {release, {emqx, "${GIT_DESC}"}, ["${BASIC_RELX_APPS}"]}, + {overlay, ["${BASIC_OVERLAYS}", "${EDGE_OVERLAYS}"]}, + {overlay_vars,["vars/vars-edge.config","vars/vars-pkg.config"]}]}]}, + {test, + [ + {plugins, [ + rebar3_proper, + {coveralls, + {git, "https://github.com/emqx/coveralls-erl", + {branch, "github"}}}]}, + {deps, + [ {bbmustache, "1.7.0"}, + {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.0"}}}, + {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.3.0"}}}, + meck + ]}, + {erl_opts, [debug_info]} + ]} + ]}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%% Placeholders +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +{placeholders, [ + + {"BASIC_DEPS", elems, + [ {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} + , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} + , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}} + , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.2"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.4"}}} + , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}} + , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} + , {emqx_passwd, {git, "https://github.com/emqx/emqx-passwd", {tag, "v1.1.1"}}} + , {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.0"}}} + , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "v0.4.2"}}} + , {replayq, {git, "https://github.com/emqx/replayq", {tag, "v0.2.0"}}} + , {erlport, {git, "https://github.com/emqx/erlport", {tag, "v1.2.2"}}} + , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.3"}}} + , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.0"}}} + , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.1"}}} + , {getopt, "1.0.1"} + ] + }, + + {"BASIC_RELX_PARAMS", elems, + [ {include_src,false} + , {extended_start_script,false} + , {generate_start_script,false} + , {sys_config,false} + , {vm_args,false} + ] + }, + + {"BASIC_RELX_APPS", elems, + [ kernel + , sasl + , crypto + , public_key + , asn1 + , syntax_tools + , ssl + , os_mon + , inets + , compiler + , runtime_tools + , cuttlefish + , emqx + , {mnesia, load} + , {ekka, load} + , {emqx_retainer, load} + , {emqx_management, load} + , {emqx_dashboard, load} + , {emqx_bridge_mqtt, load} + , {emqx_sn, load} + , {emqx_coap, load} + , {emqx_stomp, load} + , {emqx_auth_clientid, load} + , {emqx_auth_username, load} + , {emqx_auth_http, load} + , {emqx_auth_mysql, load} + , {emqx_auth_jwt, load} + , {emqx_auth_mnesia, load} + , {emqx_web_hook, load} + , {emqx_recon, load} + , {emqx_rule_engine, load} + , {emqx_sasl, load} + , {emqx_telemetry, load} + ] + }, + + {"CLOUD_RELX_APPS", elems, + [ {emqx_lwm2m, load} + , {emqx_auth_ldap, load} + , {emqx_auth_pgsql, load} + , {emqx_auth_redis, load} + , {emqx_auth_mongo, load} + , {emqx_lua_hook, load} + , {emqx_exhook, load} + , {emqx_exproto, load} + , {emqx_prometheus, load} + , {emqx_psk_file, load} + , {emqx_plugin_template, load} + , {observer, load} + , luerl + , xmerl + ] + }, + + {"BASIC_OVERLAYS", elems, + [ {mkdir,"etc/"} + , {mkdir,"etc/emqx.d/"} + , {mkdir,"log/"} + , {mkdir,"data/"} + , {mkdir,"data/mnesia"} + , {mkdir,"data/configs"} + , {mkdir,"data/scripts"} + , {template, "data/loaded_plugins.tmpl", "data/loaded_plugins"} + , {template, "data/loaded_modules.tmpl", "data/loaded_modules"} + , {template,"data/emqx_vars","releases/emqx_vars"} + , {copy,"{{lib_dirs}}/cuttlefish/cuttlefish","bin/"} + , {copy,"bin/*","bin/"} + , {template,"etc/*.conf","etc/"} + , {template,"etc/emqx.d/*.conf","etc/emqx.d/"} + , {copy,"{{lib_dirs}}/emqx/priv/emqx.schema","releases/{{rel_vsn}}/"} + , {copy, "etc/certs","etc/"} + , "${RELUP_OVERLAYS}" + ] + }, + + {"RELUP_OVERLAYS", elems, + [ {copy,"bin/emqx.cmd","bin/emqx.cmd-{{rel_vsn}}"} + , {copy,"bin/emqx_ctl.cmd","bin/emqx_ctl.cmd-{{rel_vsn}}"} + , {copy,"bin/emqx","bin/emqx-{{rel_vsn}}"} + , {copy,"bin/emqx_ctl","bin/emqx_ctl-{{rel_vsn}}"} + , {copy,"bin/install_upgrade.escript", "bin/install_upgrade.escript-{{rel_vsn}}"} + , {copy,"bin/nodetool","bin/nodetool-{{rel_vsn}}"} + , {copy,"{{lib_dirs}}/cuttlefish/cuttlefish","bin/cuttlefish-{{rel_vsn}}"} + ] + }, + + {"CLOUD_OVERLAYS", elems, + [ {template,"etc/emqx_cloud.d/*.conf","etc/emqx.d/"} + , {template,"etc/emqx_cloud.d/vm.args","etc/vm.args"} + ] + }, + + {"EDGE_OVERLAYS", elems, + [ {template,"etc/emqx_edge.d/*.conf","etc/emqx.d/"} + , {template,"etc/emqx_edge.d/vm.args.edge","etc/vm.args"} + ] + }, + + {"GIT_DESC", var, fun mod_project:get_vsn/1}, + + {"COVERALL_CONFIGS", elems, fun mod_project:coveralls_configs/1} + +]}.