diff --git a/.gitignore b/.gitignore
index b23eeafe3..f4900cf4e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -41,4 +41,4 @@ erlang.mk
*.coverdata
etc/emqx.conf.rendered
Mnesia.*/
-*.DS_Store
+*.DS_Store
\ No newline at end of file
diff --git a/apps/emqx_exhook/.gitignore b/apps/emqx_exhook/.gitignore
new file mode 100644
index 000000000..520db35b8
--- /dev/null
+++ b/apps/emqx_exhook/.gitignore
@@ -0,0 +1,3 @@
+src/emqx_exhook_pb.erl
+src/emqx_exhook_v_1_hook_provider_bhvr.erl
+src/emqx_exhook_v_1_hook_provider_client.erl
diff --git a/apps/emqx_exhook/README.md b/apps/emqx_exhook/README.md
new file mode 100644
index 000000000..216c39275
--- /dev/null
+++ b/apps/emqx_exhook/README.md
@@ -0,0 +1,39 @@
+# emqx_exhook
+
+The `emqx_exhook` extremly enhance the extensibility for EMQ X. It allow using an others programming language to mount the hooks intead of erlang.
+
+## Feature
+
+- [x] Based on gRPC, it brings a very wide range of applicability
+- [x] Allows you to use the return value to extend emqx behavior.
+
+## Architecture
+
+```
+EMQ X Third-party Runtime
++========================+ +========+==========+
+| ExHook | | | |
+| +----------------+ | gRPC | gRPC | User's |
+| | gPRC Client | ------------------> | Server | Codes |
+| +----------------+ | (HTTP/2) | | |
+| | | | |
++========================+ +========+==========+
+```
+
+## Usage
+
+### gRPC service
+
+See: `priv/protos/exhook.proto`
+
+### CLI
+
+## Example
+
+## Recommended gRPC Framework
+
+See: https://github.com/grpc-ecosystem/awesome-grpc
+
+## Thanks
+
+- [grpcbox](https://github.com/tsloughter/grpcbox)
diff --git a/apps/emqx_exhook/docs/design.md b/apps/emqx_exhook/docs/design.md
new file mode 100644
index 000000000..671e240cc
--- /dev/null
+++ b/apps/emqx_exhook/docs/design.md
@@ -0,0 +1,116 @@
+# 设计
+
+## 动机
+
+在 EMQ X Broker v4.1-v4.2 中,我们发布了 2 个插件来扩展 emqx 的编程能力:
+
+1. `emqx-extension-hook` 提供了使用 Java, Python 向 Broker 挂载钩子的功能
+2. `emqx-exproto` 提供了使用 Java,Python 编写用户自定义协议接入插件的功能
+
+但在后续的支持中发现许多难以处理的问题:
+
+1. 有大量的编程语言需要支持,需要编写和维护如 Go, JavaScript, Lua.. 等语言的驱动。
+2. `erlport` 使用的操作系统的管道进行通信,这让用户代码只能部署在和 emqx 同一个操作系统上。部署方式受到了极大的限制。
+3. 用户程序的启动参数直接打包到 Broker 中,导致用户开发无法实时的进行调试,单步跟踪等。
+4. `erlport` 会占用 `stdin` `stdout`。
+
+因此,我们计划重构这部分的实现,其中主要的内容是:
+1. 使用 `gRPC` 替换 `erlport`。
+2. 将 `emqx-extension-hook` 重命名为 `emqx-exhook`
+
+
+旧版本的设计参考:[emqx-extension-hook design in v4.2.0](https://github.com/emqx/emqx-exhook/blob/v4.2.0/docs/design.md)
+
+## 设计
+
+架构如下:
+
+```
+ EMQ X
++========================+ +========+==========+
+| ExHook | | | |
+| +----------------+ | gRPC | gRPC | User's |
+| | gRPC Client | ------------------> | Server | Codes |
+| +----------------+ | (HTTP/2) | | |
+| | | | |
++========================+ +========+==========+
+```
+
+`emqx-exhook` 通过 gRPC 的方式向用户部署的 gRPC 服务发送钩子的请求,并处理其返回的值。
+
+
+和 emqx 原生的钩子一致,emqx-exhook 也支持链式的方式计算和返回:
+
+
+
+### gRPC 服务示例
+
+用户需要实现的方法,和数据类型的定义在 `priv/protos/exhook.proto` 文件中。例如,其支持的接口有:
+
+```protobuff
+syntax = "proto3";
+
+package emqx.exhook.v1;
+
+service HookProvider {
+
+ rpc OnProviderLoaded(ProviderLoadedRequest) returns (LoadedResponse) {};
+
+ rpc OnProviderUnloaded(ProviderUnloadedRequest) returns (EmptySuccess) {};
+
+ rpc OnClientConnect(ClientConnectRequest) returns (EmptySuccess) {};
+
+ rpc OnClientConnack(ClientConnackRequest) returns (EmptySuccess) {};
+
+ rpc OnClientConnected(ClientConnectedRequest) returns (EmptySuccess) {};
+
+ rpc OnClientDisconnected(ClientDisconnectedRequest) returns (EmptySuccess) {};
+
+ rpc OnClientAuthenticate(ClientAuthenticateRequest) returns (ValuedResponse) {};
+
+ rpc OnClientCheckAcl(ClientCheckAclRequest) returns (ValuedResponse) {};
+
+ rpc OnClientSubscribe(ClientSubscribeRequest) returns (EmptySuccess) {};
+
+ rpc OnClientUnsubscribe(ClientUnsubscribeRequest) returns (EmptySuccess) {};
+
+ rpc OnSessionCreated(SessionCreatedRequest) returns (EmptySuccess) {};
+
+ rpc OnSessionSubscribed(SessionSubscribedRequest) returns (EmptySuccess) {};
+
+ rpc OnSessionUnsubscribed(SessionUnsubscribedRequest) returns (EmptySuccess) {};
+
+ rpc OnSessionResumed(SessionResumedRequest) returns (EmptySuccess) {};
+
+ rpc OnSessionDiscarded(SessionDiscardedRequest) returns (EmptySuccess) {};
+
+ rpc OnSessionTakeovered(SessionTakeoveredRequest) returns (EmptySuccess) {};
+
+ rpc OnSessionTerminated(SessionTerminatedRequest) returns (EmptySuccess) {};
+
+ rpc OnMessagePublish(MessagePublishRequest) returns (ValuedResponse) {};
+
+ rpc OnMessageDelivered(MessageDeliveredRequest) returns (EmptySuccess) {};
+
+ rpc OnMessageDropped(MessageDroppedRequest) returns (EmptySuccess) {};
+
+ rpc OnMessageAcked(MessageAckedRequest) returns (EmptySuccess) {};
+}
+```
+
+### 配置文件示例
+
+```
+## 配置 gRPC 服务地址 (HTTP)
+##
+## s1 为服务器的名称
+exhook.server.s1.url = http://127.0.0.1:9001
+
+## 配置 gRPC 服务地址 (HTTPS)
+##
+## s2 为服务器名称
+exhook.server.s2.url = https://127.0.0.1:9002
+exhook.server.s2.cacertfile = ca.pem
+exhook.server.s2.certfile = cert.pem
+exhook.server.s2.keyfile = key.pem
+```
diff --git a/apps/emqx_exhook/include/emqx_exhook.hrl b/apps/emqx_exhook/include/emqx_exhook.hrl
new file mode 100644
index 000000000..8a404ca39
--- /dev/null
+++ b/apps/emqx_exhook/include/emqx_exhook.hrl
@@ -0,0 +1,22 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-ifndef(EMQX_EXHOOK_HRL).
+-define(EMQX_EXHOOK_HRL, true).
+
+-define(APP, emqx_exhook).
+
+-endif.
diff --git a/apps/emqx_exhook/priv/emqx_exhook.schema b/apps/emqx_exhook/priv/emqx_exhook.schema
new file mode 100644
index 000000000..2a926b968
--- /dev/null
+++ b/apps/emqx_exhook/priv/emqx_exhook.schema
@@ -0,0 +1,38 @@
+%%-*- mode: erlang -*-
+
+{mapping, "exhook.server.$name.url", "emqx_exhook.servers", [
+ {datatype, string}
+]}.
+
+{mapping, "exhook.server.$name.ssl.cacertfile", "emqx_exhook.servers", [
+ {datatype, string}
+]}.
+
+{mapping, "exhook.server.$name.ssl.certfile", "emqx_exhook.servers", [
+ {datatype, string}
+]}.
+
+{mapping, "exhook.server.$name.ssl.keyfile", "emqx_exhook.servers", [
+ {datatype, string}
+]}.
+
+{translation, "emqx_exhook.servers", fun(Conf) ->
+ Filter = fun(Opts) -> [{K, V} || {K, V} <- Opts, V =/= undefined] end,
+ ServerOptions = fun(Prefix) ->
+ case http_uri:parse(cuttlefish:conf_get(Prefix ++ ".url", Conf)) of
+ {ok, {http, _, Host, Port, _, _}} ->
+ [{scheme, http}, {host, Host}, {port, Port}];
+ {ok, {https, _, Host, Port, _, _}} ->
+ [{scheme, https}, {host, Host}, {port, Port},
+ {ssl_options,
+ Filter([{ssl, true},
+ {certfile, cuttlefish:conf_get(Prefix ++ ".ssl.certfile", Conf)},
+ {keyfile, cuttlefish:conf_get(Prefix ++ ".ssl.keyfile", Conf)},
+ {cacertfile, cuttlefish:conf_get(Prefix ++ ".ssl.cacertfile", Conf)}
+ ])}];
+ _ -> error(invalid_server_options)
+ end
+ end,
+ [{list_to_atom(Name), ServerOptions("exhook.server." ++ Name)}
+ || {["exhook", "server", Name, "url"], _} <- cuttlefish_variable:filter_by_prefix("exhook.server", Conf)]
+end}.
diff --git a/apps/emqx_exhook/priv/protos/exhook.proto b/apps/emqx_exhook/priv/protos/exhook.proto
new file mode 100644
index 000000000..8dc9641b9
--- /dev/null
+++ b/apps/emqx_exhook/priv/protos/exhook.proto
@@ -0,0 +1,395 @@
+//------------------------------------------------------------------------------
+// Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//------------------------------------------------------------------------------
+
+syntax = "proto3";
+
+package emqx.exhook.v1;
+
+service HookProvider {
+
+ rpc OnProviderLoaded(ProviderLoadedRequest) returns (LoadedResponse) {};
+
+ rpc OnProviderUnloaded(ProviderUnloadedRequest) returns (EmptySuccess) {};
+
+ rpc OnClientConnect(ClientConnectRequest) returns (EmptySuccess) {};
+
+ rpc OnClientConnack(ClientConnackRequest) returns (EmptySuccess) {};
+
+ rpc OnClientConnected(ClientConnectedRequest) returns (EmptySuccess) {};
+
+ rpc OnClientDisconnected(ClientDisconnectedRequest) returns (EmptySuccess) {};
+
+ rpc OnClientAuthenticate(ClientAuthenticateRequest) returns (ValuedResponse) {};
+
+ rpc OnClientCheckAcl(ClientCheckAclRequest) returns (ValuedResponse) {};
+
+ rpc OnClientSubscribe(ClientSubscribeRequest) returns (EmptySuccess) {};
+
+ rpc OnClientUnsubscribe(ClientUnsubscribeRequest) returns (EmptySuccess) {};
+
+ rpc OnSessionCreated(SessionCreatedRequest) returns (EmptySuccess) {};
+
+ rpc OnSessionSubscribed(SessionSubscribedRequest) returns (EmptySuccess) {};
+
+ rpc OnSessionUnsubscribed(SessionUnsubscribedRequest) returns (EmptySuccess) {};
+
+ rpc OnSessionResumed(SessionResumedRequest) returns (EmptySuccess) {};
+
+ rpc OnSessionDiscarded(SessionDiscardedRequest) returns (EmptySuccess) {};
+
+ rpc OnSessionTakeovered(SessionTakeoveredRequest) returns (EmptySuccess) {};
+
+ rpc OnSessionTerminated(SessionTerminatedRequest) returns (EmptySuccess) {};
+
+ rpc OnMessagePublish(MessagePublishRequest) returns (ValuedResponse) {};
+
+ rpc OnMessageDelivered(MessageDeliveredRequest) returns (EmptySuccess) {};
+
+ rpc OnMessageDropped(MessageDroppedRequest) returns (EmptySuccess) {};
+
+ rpc OnMessageAcked(MessageAckedRequest) returns (EmptySuccess) {};
+}
+
+//------------------------------------------------------------------------------
+// Request & Response
+//------------------------------------------------------------------------------
+
+message ProviderLoadedRequest {
+
+ BrokerInfo broker = 1;
+}
+
+message LoadedResponse {
+
+ repeated HookSpec hooks = 1;
+}
+
+message ProviderUnloadedRequest { }
+
+message ClientConnectRequest {
+
+ ConnInfo conninfo = 1;
+
+ // MQTT CONNECT packet's properties (MQTT v5.0)
+ //
+ // It should be empty on MQTT v3.1.1/v3.1 or others protocol
+ repeated Property props = 2;
+}
+
+message ClientConnackRequest {
+
+ ConnInfo conninfo = 1;
+
+ string result_code = 2;
+
+ repeated Property props = 3;
+}
+
+message ClientConnectedRequest {
+
+ ClientInfo clientinfo = 1;
+}
+
+message ClientDisconnectedRequest {
+
+ ClientInfo clientinfo = 1;
+
+ string reason = 2;
+}
+
+message ClientAuthenticateRequest {
+
+ ClientInfo clientinfo = 1;
+
+ bool result = 2;
+}
+
+message ClientCheckAclRequest {
+
+ ClientInfo clientinfo = 1;
+
+ enum AclReqType {
+
+ PUBLISH = 0;
+
+ SUBSCRIBE = 1;
+ }
+
+ AclReqType type = 2;
+
+ string topic = 3;
+
+ bool result = 4;
+}
+
+message ClientSubscribeRequest {
+
+ ClientInfo clientinfo = 1;
+
+ repeated Property props = 2;
+
+ repeated TopicFilter topic_filters = 3;
+}
+
+message ClientUnsubscribeRequest {
+
+ ClientInfo clientinfo = 1;
+
+ repeated Property props = 2;
+
+ repeated TopicFilter topic_filters = 3;
+}
+
+message SessionCreatedRequest {
+
+ ClientInfo clientinfo = 1;
+}
+
+message SessionSubscribedRequest {
+
+ ClientInfo clientinfo = 1;
+
+ string topic = 2;
+
+ SubOpts subopts = 3;
+}
+
+message SessionUnsubscribedRequest {
+
+ ClientInfo clientinfo = 1;
+
+ string topic = 2;
+}
+
+message SessionResumedRequest {
+
+ ClientInfo clientinfo = 1;
+}
+
+message SessionDiscardedRequest {
+
+ ClientInfo clientinfo = 1;
+}
+
+message SessionTakeoveredRequest {
+
+ ClientInfo clientinfo = 1;
+}
+
+message SessionTerminatedRequest {
+
+ ClientInfo clientinfo = 1;
+
+ string reason = 2;
+}
+
+message MessagePublishRequest {
+
+ Message message = 1;
+}
+
+message MessageDeliveredRequest {
+
+ ClientInfo clientinfo = 1;
+
+ Message message = 2;
+}
+
+message MessageDroppedRequest {
+
+ Message message = 1;
+
+ string reason = 2;
+}
+
+message MessageAckedRequest {
+
+ ClientInfo clientinfo = 1;
+
+ Message message = 2;
+}
+
+//------------------------------------------------------------------------------
+// Basic data types
+//------------------------------------------------------------------------------
+
+message EmptySuccess { }
+
+message ValuedResponse {
+
+ // The responsed value type
+ // - ignore: Ignore the responsed value
+ // - contiune: Use the responsed value and execute the next hook
+ // - stop_and_return: Use the responsed value and stop the chain executing
+ enum ResponsedType {
+
+ IGNORE = 0;
+
+ CONTINUE = 1;
+
+ STOP_AND_RETURN = 2;
+ }
+
+ ResponsedType type = 1;
+
+ oneof value {
+
+ // Boolean result, used on the 'client.authenticate', 'client.check_acl' hooks
+ bool bool_result = 3;
+
+ // Message result, used on the 'message.*' hooks
+ Message message = 4;
+ }
+}
+
+message BrokerInfo {
+
+ string version = 1;
+
+ string sysdescr = 2;
+
+ string uptime = 3;
+
+ string datetime = 4;
+}
+
+message HookSpec {
+
+ // The registered hooks name
+ //
+ // Available value:
+ // "client.connect", "client.connack"
+ // "client.connected", "client.disconnected"
+ // "client.authenticate", "client.check_acl"
+ // "client.subscribe", "client.unsubscribe"
+ //
+ // "session.created", "session.subscribed"
+ // "session.unsubscribed", "session.resumed"
+ // "session.discarded", "session.takeovered"
+ // "session.terminated"
+ //
+ // "message.publish", "message.delivered"
+ // "message.acked", "message.dropped"
+ string name = 1;
+
+ // The topic filters for message hooks
+ repeated string topics = 2;
+}
+
+message ConnInfo {
+
+ string node = 1;
+
+ string clientid = 2;
+
+ string username = 3;
+
+ string peerhost = 4;
+
+ uint32 sockport = 5;
+
+ string proto_name = 6;
+
+ string proto_ver = 7;
+
+ uint32 keepalive = 8;
+}
+
+message ClientInfo {
+
+ string node = 1;
+
+ string clientid = 2;
+
+ string username = 3;
+
+ string password = 4;
+
+ string peerhost = 5;
+
+ uint32 sockport = 6;
+
+ string protocol = 7;
+
+ string mountpoint = 8;
+
+ bool is_superuser = 9;
+
+ bool anonymous = 10;
+}
+
+message Message {
+
+ string node = 1;
+
+ string id = 2;
+
+ uint32 qos = 3;
+
+ string from = 4;
+
+ string topic = 5;
+
+ bytes payload = 6;
+
+ uint64 timestamp = 7;
+}
+
+message Property {
+
+ string name = 1;
+
+ string value = 2;
+}
+
+message TopicFilter {
+
+ string name = 1;
+
+ uint32 qos = 2;
+}
+
+message SubOpts {
+
+ // The QoS level
+ uint32 qos = 1;
+
+ // The group name for shared subscription
+ string share = 2;
+
+ // The Retain Handling option (MQTT v5.0)
+ //
+ // 0 = Send retained messages at the time of the subscribe
+ // 1 = Send retained messages at subscribe only if the subscription does
+ // not currently exist
+ // 2 = Do not send retained messages at the time of the subscribe
+ uint32 rh = 3;
+
+ // The Retain as Published option (MQTT v5.0)
+ //
+ // If 1, Application Messages forwarded using this subscription keep the
+ // RETAIN flag they were published with.
+ // If 0, Application Messages forwarded using this subscription have the
+ // RETAIN flag set to 0.
+ // Retained messages sent when the subscription is established have the RETAIN flag set to 1.
+ uint32 rap = 4;
+
+ // The No Local option (MQTT v5.0)
+ //
+ // If the value is 1, Application Messages MUST NOT be forwarded to a
+ // connection with a ClientID equal to the ClientID of the publishing
+ uint32 nl = 5;
+}
diff --git a/apps/emqx_exhook/rebar.config b/apps/emqx_exhook/rebar.config
new file mode 100644
index 000000000..0019ca245
--- /dev/null
+++ b/apps/emqx_exhook/rebar.config
@@ -0,0 +1,47 @@
+%%-*- mode: erlang -*-
+{plugins,
+ [rebar3_proper,
+ {grpcbox_plugin, {git, "https://github.com/HJianBo/grpcbox_plugin", {branch, "master"}}}
+]}.
+
+{deps,
+ [{grpcbox, {git, "https://github.com/tsloughter/grpcbox", {branch, "master"}}}
+]}.
+
+{grpc,
+ [{protos, ["priv/protos"]},
+ {gpb_opts, [{module_name_prefix, "emqx_"},
+ {module_name_suffix, "_pb"}]}
+]}.
+
+{provider_hooks,
+ [{pre, [{compile, {grpc, gen}}]}]}.
+
+{edoc_opts, [{preprocess, true}]}.
+
+{erl_opts, [warn_unused_vars,
+ warn_shadow_vars,
+ warn_unused_import,
+ warn_obsolete_guard,
+ debug_info,
+ {parse_transform}]}.
+
+{xref_checks, [undefined_function_calls, undefined_functions,
+ locals_not_used, deprecated_function_calls,
+ warnings_as_errors, deprecated_functions]}.
+{xref_ignores, [emqx_exhook_pb]}.
+
+{cover_enabled, true}.
+{cover_opts, [verbose]}.
+{cover_export_enabled, true}.
+{cover_excl_mods, [emqx_exhook_pb,
+ emqx_exhook_v_1_hook_provider_bhvr,
+ emqx_exhook_v_1_connection_client]}.
+
+{profiles,
+ [{test, [
+ {deps, [ {emqx_ct_helper, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "v1.3.1"}}}
+ , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
+ ]}
+ ]}
+]}.
diff --git a/apps/emqx_exhook/src/emqx_exhook.app.src b/apps/emqx_exhook/src/emqx_exhook.app.src
new file mode 100644
index 000000000..e91359178
--- /dev/null
+++ b/apps/emqx_exhook/src/emqx_exhook.app.src
@@ -0,0 +1,12 @@
+{application, emqx_exhook,
+ [{description, "EMQ X Extension for Hook"},
+ {vsn, "git"},
+ {modules, []},
+ {registered, []},
+ {mod, {emqx_exhook_app, []}},
+ {applications, [kernel,stdlib,emqx_libs,grpcbox]},
+ {env,[]},
+ {licenses, ["Apache-2.0"]},
+ {maintainers, ["EMQ X Team "]},
+ {links, [{"Homepage", "https://emqx.io/"}]}
+ ]}.
diff --git a/apps/emqx_exhook/src/emqx_exhook.erl b/apps/emqx_exhook/src/emqx_exhook.erl
new file mode 100644
index 000000000..c9fe6a993
--- /dev/null
+++ b/apps/emqx_exhook/src/emqx_exhook.erl
@@ -0,0 +1,134 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_exhook).
+
+-include("emqx_exhook.hrl").
+-include_lib("emqx_libs/include/logger.hrl").
+
+-logger_header("[ExHook]").
+
+%% Mgmt APIs
+-export([ enable/2
+ , disable/1
+ , disable_all/0
+ , list/0
+ ]).
+
+-export([ cast/2
+ , call_fold/3
+ ]).
+
+%%--------------------------------------------------------------------
+%% Mgmt APIs
+%%--------------------------------------------------------------------
+
+-spec list() -> [emqx_exhook_server:server()].
+list() ->
+ [server(Name) || Name <- running()].
+
+-spec enable(atom()|string(), list()) -> ok | {error, term()}.
+enable(Name, Opts) ->
+ case lists:member(Name, running()) of
+ true ->
+ {error, already_started};
+ _ ->
+ case emqx_exhook_server:load(Name, Opts) of
+ {ok, ServiceState} ->
+ save(Name, ServiceState);
+ {error, Reason} ->
+ ?LOG(error, "Load server ~p failed: ~p", [Name, Reason]),
+ {error, Reason}
+ end
+ end.
+
+-spec disable(atom()|string()) -> ok | {error, term()}.
+disable(Name) ->
+ case server(Name) of
+ undefined -> {error, not_running};
+ Service ->
+ ok = emqx_exhook_server:unload(Service),
+ unsave(Name)
+ end.
+
+-spec disable_all() -> [term()].
+disable_all() ->
+ [begin disable(Name), Name end || Name <- running()].
+
+%%----------------------------------------------------------
+%% Dispatch APIs
+%%----------------------------------------------------------
+
+-spec cast(atom(), map()) -> ok.
+cast(Hookpoint, Req) ->
+ cast(Hookpoint, Req, running()).
+
+cast(_, _, []) ->
+ ok;
+cast(Hookpoint, Req, [ServiceName|More]) ->
+ %% XXX: Need a real asynchronous running
+ _ = emqx_exhook_server:call(Hookpoint, Req, server(ServiceName)),
+ cast(Hookpoint, Req, More).
+
+-spec call_fold(atom(), term(), function())
+ -> {ok, term()}
+ | {stop, term()}.
+call_fold(Hookpoint, Req, AccFun) ->
+ call_fold(Hookpoint, Req, AccFun, running()).
+
+call_fold(_, Req, _, []) ->
+ {ok, Req};
+call_fold(Hookpoint, Req, AccFun, [ServiceName|More]) ->
+ case emqx_exhook_server:call(Hookpoint, Req, server(ServiceName)) of
+ {ok, Resp} ->
+ case AccFun(Req, Resp) of
+ {stop, NReq} -> {stop, NReq};
+ {ok, NReq} -> call_fold(Hookpoint, NReq, AccFun, More)
+ end;
+ _ ->
+ call_fold(Hookpoint, Req, AccFun, More)
+ end.
+
+%%----------------------------------------------------------
+%% Storage
+
+-compile({inline, [save/2]}).
+save(Name, ServiceState) ->
+ Saved = persistent_term:get(?APP, []),
+ persistent_term:put(?APP, lists:reverse([Name | Saved])),
+ persistent_term:put({?APP, Name}, ServiceState).
+
+-compile({inline, [unsave/1]}).
+unsave(Name) ->
+ case persistent_term:get(?APP, []) of
+ [] ->
+ persistent_term:erase(?APP);
+ Saved ->
+ persistent_term:put(?APP, lists:delete(Name, Saved))
+ end,
+ persistent_term:erase({?APP, Name}),
+ ok.
+
+-compile({inline, [running/0]}).
+running() ->
+ persistent_term:get(?APP, []).
+
+-compile({inline, [server/1]}).
+server(Name) ->
+ case catch persistent_term:get({?APP, Name}) of
+ {'EXIT', {badarg,_}} -> undefined;
+ Service -> Service
+ end.
diff --git a/apps/emqx_exhook/src/emqx_exhook_app.erl b/apps/emqx_exhook/src/emqx_exhook_app.erl
new file mode 100644
index 000000000..1411b618d
--- /dev/null
+++ b/apps/emqx_exhook/src/emqx_exhook_app.erl
@@ -0,0 +1,117 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_exhook_app).
+
+-behaviour(application).
+
+-include("emqx_exhook.hrl").
+
+-emqx_plugin(extension).
+
+-export([ start/2
+ , stop/1
+ , prep_stop/1
+ ]).
+
+%% Internal export
+-export([ load_server/2
+ , unload_server/1
+ , load_exhooks/0
+ , unload_exhooks/0
+ ]).
+
+%%--------------------------------------------------------------------
+%% Application callbacks
+%%--------------------------------------------------------------------
+
+start(_StartType, _StartArgs) ->
+ {ok, Sup} = emqx_exhook_sup:start_link(),
+
+ %% Load all dirvers
+ load_all_servers(),
+
+ %% Register all hooks
+ load_exhooks(),
+
+ %% Register CLI
+ emqx_ctl:register_command(exhook, {emqx_exhook_cli, cli}, []),
+ {ok, Sup}.
+
+prep_stop(State) ->
+ emqx_ctl:unregister_command(exhook),
+ unload_exhooks(),
+ unload_all_servers(),
+ State.
+
+stop(_State) ->
+ ok.
+
+%%--------------------------------------------------------------------
+%% Internal funcs
+%%--------------------------------------------------------------------
+
+load_all_servers() ->
+ lists:foreach(fun({Name, Options}) ->
+ load_server(Name, Options)
+ end, application:get_env(?APP, servers, [])).
+
+unload_all_servers() ->
+ emqx_exhook:disable_all().
+
+load_server(Name, Options) ->
+ emqx_exhook:enable(Name, Options).
+
+unload_server(Name) ->
+ emqx_exhook:disable(Name).
+
+%%--------------------------------------------------------------------
+%% Exhooks
+
+load_exhooks() ->
+ [emqx:hook(Name, {M, F, A}) || {Name, {M, F, A}} <- search_exhooks()].
+
+unload_exhooks() ->
+ [emqx:unhook(Name, {M, F}) || {Name, {M, F, _A}} <- search_exhooks()].
+
+search_exhooks() ->
+ search_exhooks(ignore_lib_apps(application:loaded_applications())).
+search_exhooks(Apps) ->
+ lists:flatten([ExHooks || App <- Apps, {_App, _Mod, ExHooks} <- find_attrs(App, exhooks)]).
+
+ignore_lib_apps(Apps) ->
+ LibApps = [kernel, stdlib, sasl, appmon, eldap, erts,
+ syntax_tools, ssl, crypto, mnesia, os_mon,
+ inets, goldrush, gproc, runtime_tools,
+ snmp, otp_mibs, public_key, asn1, ssh, hipe,
+ common_test, observer, webtool, xmerl, tools,
+ test_server, compiler, debugger, eunit, et,
+ wx],
+ [AppName || {AppName, _, _} <- Apps, not lists:member(AppName, LibApps)].
+
+find_attrs(App, Def) ->
+ [{App, Mod, Attr} || {ok, Modules} <- [application:get_key(App, modules)],
+ Mod <- Modules,
+ {Name, Attrs} <- module_attributes(Mod), Name =:= Def,
+ Attr <- Attrs].
+
+module_attributes(Module) ->
+ try Module:module_info(attributes)
+ catch
+ error:undef -> [];
+ error:Reason -> error(Reason)
+ end.
+
diff --git a/apps/emqx_exhook/src/emqx_exhook_cli.erl b/apps/emqx_exhook/src/emqx_exhook_cli.erl
new file mode 100644
index 000000000..8bab9ced5
--- /dev/null
+++ b/apps/emqx_exhook/src/emqx_exhook_cli.erl
@@ -0,0 +1,80 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_exhook_cli).
+
+-include("emqx_exhook.hrl").
+
+-export([cli/1]).
+
+cli(["server", "list"]) ->
+ if_enabled(fun() ->
+ Services = emqx_exhook:list(),
+ [emqx_ctl:print("HookServer(~s)~n", [emqx_exhook_server:format(Service)]) || Service <- Services]
+ end);
+
+cli(["server", "enable", Name0]) ->
+ if_enabled(fun() ->
+ Name = list_to_atom(Name0),
+ case proplists:get_value(Name, application:get_env(?APP, servers, [])) of
+ undefined ->
+ emqx_ctl:print("not_found~n");
+ Opts ->
+ print(emqx_exhook:enable(Name, Opts))
+ end
+ end);
+
+cli(["server", "disable", Name]) ->
+ if_enabled(fun() ->
+ print(emqx_exhook:disable(list_to_atom(Name)))
+ end);
+
+cli(["server", "stats"]) ->
+ if_enabled(fun() ->
+ [emqx_ctl:print("~-35s:~w~n", [Name, N]) || {Name, N} <- stats()]
+ end);
+
+cli(_) ->
+ emqx_ctl:usage([{"exhook server list", "List all running exhook server"},
+ {"exhook server enable ", "Enable a exhook server in the configuration"},
+ {"exhook server disable ", "Disable a exhook server"},
+ {"exhook server stats", "Print exhook server statistic"}]).
+
+print(ok) ->
+ emqx_ctl:print("ok~n");
+print({error, Reason}) ->
+ emqx_ctl:print("~p~n", [Reason]).
+
+%%--------------------------------------------------------------------
+%% Internal funcs
+%%--------------------------------------------------------------------
+
+if_enabled(Fun) ->
+ case lists:keymember(?APP, 1, application:which_applications()) of
+ true -> Fun();
+ _ -> hint()
+ end.
+
+hint() ->
+ emqx_ctl:print("Please './bin/emqx_ctl plugins load emqx_exhook' first.~n").
+
+stats() ->
+ lists:usort(lists:foldr(fun({K, N}, Acc) ->
+ case atom_to_list(K) of
+ "exhook." ++ Key -> [{Key, N}|Acc];
+ _ -> Acc
+ end
+ end, [], emqx_metrics:all())).
diff --git a/apps/emqx_exhook/src/emqx_exhook_handler.erl b/apps/emqx_exhook/src/emqx_exhook_handler.erl
new file mode 100644
index 000000000..272f3f08f
--- /dev/null
+++ b/apps/emqx_exhook/src/emqx_exhook_handler.erl
@@ -0,0 +1,288 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_exhook_handler).
+
+-include("emqx_exhook.hrl").
+-include_lib("emqx_libs/include/emqx.hrl").
+-include_lib("emqx_libs/include/logger.hrl").
+
+-logger_header("[ExHook]").
+
+-export([ on_client_connect/2
+ , on_client_connack/3
+ , on_client_connected/2
+ , on_client_disconnected/3
+ , on_client_authenticate/2
+ , on_client_check_acl/4
+ , on_client_subscribe/3
+ , on_client_unsubscribe/3
+ ]).
+
+%% Session Lifecircle Hooks
+-export([ on_session_created/2
+ , on_session_subscribed/3
+ , on_session_unsubscribed/3
+ , on_session_resumed/2
+ , on_session_discarded/2
+ , on_session_takeovered/2
+ , on_session_terminated/3
+ ]).
+
+%% Utils
+-export([ message/1
+ , stringfy/1
+ , merge_responsed_bool/2
+ , merge_responsed_message/2
+ , assign_to_message/2
+ , clientinfo/1
+ ]).
+
+-import(emqx_exhook,
+ [ cast/2
+ , call_fold/3
+ ]).
+
+-exhooks([ {'client.connect', {?MODULE, on_client_connect, []}}
+ , {'client.connack', {?MODULE, on_client_connack, []}}
+ , {'client.connected', {?MODULE, on_client_connected, []}}
+ , {'client.disconnected', {?MODULE, on_client_disconnected, []}}
+ , {'client.authenticate', {?MODULE, on_client_authenticate, []}}
+ , {'client.check_acl', {?MODULE, on_client_check_acl, []}}
+ , {'client.subscribe', {?MODULE, on_client_subscribe, []}}
+ , {'client.unsubscribe', {?MODULE, on_client_unsubscribe, []}}
+ , {'session.created', {?MODULE, on_session_created, []}}
+ , {'session.subscribed', {?MODULE, on_session_subscribed, []}}
+ , {'session.unsubscribed',{?MODULE, on_session_unsubscribed, []}}
+ , {'session.resumed', {?MODULE, on_session_resumed, []}}
+ , {'session.discarded', {?MODULE, on_session_discarded, []}}
+ , {'session.takeovered', {?MODULE, on_session_takeovered, []}}
+ , {'session.terminated', {?MODULE, on_session_terminated, []}}
+ ]).
+
+%%--------------------------------------------------------------------
+%% Clients
+%%--------------------------------------------------------------------
+
+on_client_connect(ConnInfo, Props) ->
+ Req = #{conninfo => conninfo(ConnInfo),
+ props => properties(Props)
+ },
+ cast('client.connect', Req).
+
+on_client_connack(ConnInfo, Rc, Props) ->
+ Req = #{conninfo => conninfo(ConnInfo),
+ result_code => stringfy(Rc),
+ props => properties(Props)},
+ cast('client.connack', Req).
+
+on_client_connected(ClientInfo, _ConnInfo) ->
+ Req = #{clientinfo => clientinfo(ClientInfo)},
+ cast('client.connected', Req).
+
+on_client_disconnected(ClientInfo, Reason, _ConnInfo) ->
+ Req = #{clientinfo => clientinfo(ClientInfo),
+ reason => stringfy(Reason)
+ },
+ cast('client.disconnected', Req).
+
+on_client_authenticate(ClientInfo, AuthResult) ->
+ Bool = maps:get(auth_result, AuthResult, undefined) == success,
+ Req = #{clientinfo => clientinfo(ClientInfo),
+ result => Bool
+ },
+
+ case call_fold('client.authenticate', Req,
+ fun merge_responsed_bool/2) of
+ {StopOrOk, #{result := Bool}} when is_boolean(Bool) ->
+ Result = case Bool of true -> success; _ -> not_authorized end,
+ {StopOrOk, AuthResult#{auth_result => Result, anonymous => false}};
+ _ ->
+ {ok, AuthResult}
+ end.
+
+on_client_check_acl(ClientInfo, PubSub, Topic, Result) ->
+ Bool = Result == allow,
+ Type = case PubSub of
+ publish -> 'PUBLISH';
+ subscribe -> 'SUBSCRIBE'
+ end,
+ Req = #{clientinfo => clientinfo(ClientInfo),
+ type => Type,
+ topic => Topic,
+ result => Bool
+ },
+ case call_fold('client.check_acl', Req,
+ fun merge_responsed_bool/2) of
+ {StopOrOk, #{result := Bool}} when is_boolean(Bool) ->
+ NResult = case Bool of true -> allow; _ -> deny end,
+ {StopOrOk, NResult};
+ _ -> {ok, Result}
+ end.
+
+on_client_subscribe(ClientInfo, Props, TopicFilters) ->
+ Req = #{clientinfo => clientinfo(ClientInfo),
+ props => properties(Props),
+ topic_filters => topicfilters(TopicFilters)
+ },
+ cast('client.subscribe', Req).
+
+on_client_unsubscribe(ClientInfo, Props, TopicFilters) ->
+ Req = #{clientinfo => clientinfo(ClientInfo),
+ props => properties(Props),
+ topic_filters => topicfilters(TopicFilters)
+ },
+ cast('client.unsubscribe', Req).
+
+%%--------------------------------------------------------------------
+%% Session
+%%--------------------------------------------------------------------
+
+on_session_created(ClientInfo, _SessInfo) ->
+ Req = #{clientinfo => clientinfo(ClientInfo)},
+ cast('session.created', Req).
+
+on_session_subscribed(ClientInfo, Topic, SubOpts) ->
+ Req = #{clientinfo => clientinfo(ClientInfo),
+ topic => Topic,
+ subopts => maps:with([qos, share, rh, rap, nl], SubOpts)
+ },
+ cast('session.subscribed', Req).
+
+on_session_unsubscribed(ClientInfo, Topic, _SubOpts) ->
+ Req = #{clientinfo => clientinfo(ClientInfo),
+ topic => Topic
+ },
+ cast('session.unsubscribed', Req).
+
+on_session_resumed(ClientInfo, _SessInfo) ->
+ Req = #{clientinfo => clientinfo(ClientInfo)},
+ cast('session.resumed', Req).
+
+on_session_discarded(ClientInfo, _SessInfo) ->
+ Req = #{clientinfo => clientinfo(ClientInfo)},
+ cast('session.discarded', Req).
+
+on_session_takeovered(ClientInfo, _SessInfo) ->
+ Req = #{clientinfo => clientinfo(ClientInfo)},
+ cast('session.takeovered', Req).
+
+on_session_terminated(ClientInfo, Reason, _SessInfo) ->
+ Req = #{clientinfo => clientinfo(ClientInfo),
+ reason => stringfy(Reason)},
+ cast('session.terminated', Req).
+
+%%--------------------------------------------------------------------
+%% Types
+
+properties(undefined) -> [];
+properties(M) when is_map(M) ->
+ maps:fold(fun(K, V, Acc) ->
+ [#{name => stringfy(K),
+ value => stringfy(V)} | Acc]
+ end, [], M).
+
+conninfo(_ConnInfo =
+ #{clientid := ClientId, username := Username, peername := {Peerhost, _},
+ sockname := {_, SockPort}, proto_name := ProtoName, proto_ver := ProtoVer,
+ keepalive := Keepalive}) ->
+ #{node => stringfy(node()),
+ clientid => ClientId,
+ username => maybe(Username),
+ peerhost => ntoa(Peerhost),
+ sockport => SockPort,
+ proto_name => ProtoName,
+ proto_ver => stringfy(ProtoVer),
+ keepalive => Keepalive}.
+
+clientinfo(ClientInfo =
+ #{clientid := ClientId, username := Username, peerhost := PeerHost,
+ sockport := SockPort, protocol := Protocol, mountpoint := Mountpoiont}) ->
+ #{node => stringfy(node()),
+ clientid => ClientId,
+ username => maybe(Username),
+ password => maybe(maps:get(password, ClientInfo, undefined)),
+ peerhost => ntoa(PeerHost),
+ sockport => SockPort,
+ protocol => stringfy(Protocol),
+ mountpoint => maybe(Mountpoiont),
+ is_superuser => maps:get(is_superuser, ClientInfo, false),
+ anonymous => maps:get(anonymous, ClientInfo, true)}.
+
+message(#message{id = Id, qos = Qos, from = From, topic = Topic, payload = Payload, timestamp = Ts}) ->
+ #{node => stringfy(node()),
+ id => hexstr(Id),
+ qos => Qos,
+ from => stringfy(From),
+ topic => Topic,
+ payload => Payload,
+ timestamp => Ts}.
+
+assign_to_message(#{qos := Qos, topic := Topic, payload := Payload}, Message) ->
+ Message#message{qos = Qos, topic = Topic, payload = Payload}.
+
+topicfilters(Tfs) when is_list(Tfs) ->
+ [#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs].
+
+ntoa({0,0,0,0,0,16#ffff,AB,CD}) ->
+ list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}));
+ntoa(IP) ->
+ list_to_binary(inet_parse:ntoa(IP)).
+
+maybe(undefined) -> <<>>;
+maybe(B) -> B.
+
+%% @private
+stringfy(Term) when is_binary(Term) ->
+ Term;
+stringfy(Term) when is_integer(Term) ->
+ integer_to_binary(Term);
+stringfy(Term) when is_atom(Term) ->
+ atom_to_binary(Term, utf8);
+stringfy(Term) ->
+ unicode:characters_to_binary((io_lib:format("~0p", [Term]))).
+
+hexstr(B) ->
+ iolist_to_binary([io_lib:format("~2.16.0B", [X]) || X <- binary_to_list(B)]).
+
+%%--------------------------------------------------------------------
+%% Acc funcs
+
+%% see exhook.proto
+merge_responsed_bool(Req, #{type := 'IGNORE'}) ->
+ {ok, Req};
+merge_responsed_bool(Req, #{type := Type, value := {bool_result, NewBool}})
+ when is_boolean(NewBool) ->
+ NReq = Req#{result => NewBool},
+ case Type of
+ 'CONTINUE' -> {ok, NReq};
+ 'STOP_AND_RETURN' -> {stop, NReq}
+ end;
+merge_responsed_bool(Req, Resp) ->
+ ?LOG(warning, "Unknown responsed value ~0p to merge to callback chain", [Resp]),
+ {ok, Req}.
+
+merge_responsed_message(Req, #{type := 'IGNORE'}) ->
+ {ok, Req};
+merge_responsed_message(Req, #{type := Type, value := {message, NMessage}}) ->
+ NReq = Req#{message => NMessage},
+ case Type of
+ 'CONTINUE' -> {ok, NReq};
+ 'STOP_AND_RETURN' -> {stop, NReq}
+ end;
+merge_responsed_message(Req, Resp) ->
+ ?LOG(warning, "Unknown responsed value ~0p to merge to callback chain", [Resp]),
+ {ok, Req}.
diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl
new file mode 100644
index 000000000..21bc46889
--- /dev/null
+++ b/apps/emqx_exhook/src/emqx_exhook_server.erl
@@ -0,0 +1,274 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_exhook_server).
+
+-include_lib("emqx_libs/include/logger.hrl").
+
+-logger_header("[ExHook Svr]").
+
+-define(PB_CLIENT_MOD, emqx_exhook_v_1_hook_provider_client).
+
+%% Load/Unload
+-export([ load/2
+ , unload/1
+ ]).
+
+%% APIs
+-export([call/3]).
+
+%% Infos
+-export([ name/1
+ , format/1
+ ]).
+
+-record(server, {
+ %% Server name (equal to grpcbox client channel name)
+ name :: server_name(),
+ %% The server started options
+ options :: list(),
+ %% gRPC channel pid
+ channel :: pid(),
+ %% Registered hook names and options
+ hookspec :: #{hookpoint() => map()},
+ %% Metric fun
+ incfun :: function()
+ }).
+
+-type server_name() :: atom().
+-type server() :: #server{}.
+
+-type hookpoint() :: 'client.connect'
+ | 'client.connack'
+ | 'client.connected'
+ | 'client.disconnected'
+ | 'client.authenticate'
+ | 'client.check_acl'
+ | 'client.subscribe'
+ | 'client.unsubscribe'
+ | 'session.created'
+ | 'session.subscribed'
+ | 'session.unsubscribed'
+ | 'session.resumed'
+ | 'session.discarded'
+ | 'session.takeovered'
+ | 'session.terminated'
+ | 'message.publish'
+ | 'message.delivered'
+ | 'message.acked'
+ | 'message.dropped'.
+
+-export_type([server/0]).
+
+%%--------------------------------------------------------------------
+%% Load/Unload APIs
+%%--------------------------------------------------------------------
+
+-spec load(atom(), list()) -> {ok, server()} | {error, term()} .
+load(Name, Opts0) ->
+ {Endpoints, Options} = channel_opts(Opts0),
+ StartFun = case proplists:get_bool(inplace, Opts0) of
+ true -> start_grpc_client_channel_inplace;
+ _ -> start_grpc_client_channel
+ end,
+ case emqx_exhook_sup:StartFun(Name, Endpoints, Options) of
+ {ok, ChannPid} ->
+ case do_init(Name) of
+ {ok, HookSpecs} ->
+ %% Reigster metrics
+ Prefix = lists:flatten(io_lib:format("exhook.~s.", [Name])),
+ ensure_metrics(Prefix, HookSpecs),
+ {ok, #server{name = Name,
+ options = Opts0,
+ channel = ChannPid,
+ hookspec = HookSpecs,
+ incfun = incfun(Prefix) }};
+ {error, _} = E -> E
+ end;
+ {error, _} = E -> E
+ end.
+
+%% @private
+channel_opts(Opts) ->
+ Scheme = proplists:get_value(scheme, Opts),
+ Host = proplists:get_value(host, Opts),
+ Port = proplists:get_value(port, Opts),
+ Options = proplists:get_value(options, Opts, []),
+ SslOpts = case Scheme of
+ https -> proplists:get_value(ssl_options, Opts, []);
+ _ -> []
+ end,
+ {[{Scheme, Host, Port, SslOpts}], maps:from_list(Options)}.
+
+-spec unload(server()) -> ok.
+unload(#server{name = Name, channel = ChannPid, options = Options}) ->
+ _ = do_deinit(Name),
+ {StopFun, Args} = case proplists:get_bool(inplace, Options) of
+ true -> {stop_grpc_client_channel_inplace, [ChannPid]};
+ _ -> {stop_grpc_client_channel, [Name]}
+ end,
+ apply(emqx_exhook_sup, StopFun, Args).
+
+do_deinit(Name) ->
+ _ = do_call(Name, 'on_provider_unloaded', #{}),
+ ok.
+
+do_init(ChannName) ->
+ Req = #{broker => maps:from_list(emqx_sys:info())},
+ case do_call(ChannName, 'on_provider_loaded', Req) of
+ {ok, InitialResp} ->
+ try
+ {ok, resovle_hookspec(maps:get(hooks, InitialResp, []))}
+ catch _:Reason:Stk ->
+ ?LOG(error, "try to init ~p failed, reason: ~p, stacktrace: ~0p",
+ [ChannName, Reason, Stk]),
+ {error, Reason}
+ end;
+ {error, Reason} ->
+ {error, Reason}
+ end.
+
+%% @private
+resovle_hookspec(HookSpecs) when is_list(HookSpecs) ->
+ MessageHooks = message_hooks(),
+ AvailableHooks = available_hooks(),
+ lists:foldr(fun(HookSpec, Acc) ->
+ case maps:get(name, HookSpec, undefined) of
+ undefined -> Acc;
+ Name0 ->
+ Name = try binary_to_existing_atom(Name0, utf8) catch T:R:_ -> {T,R} end,
+ case lists:member(Name, AvailableHooks) of
+ true ->
+ case lists:member(Name, MessageHooks) of
+ true ->
+ Acc#{Name => #{topics => maps:get(topics, HookSpec, [])}};
+ _ ->
+ Acc#{Name => #{}}
+ end;
+ _ -> error({unknown_hookpoint, Name})
+ end
+ end
+ end, #{}, HookSpecs).
+
+ensure_metrics(Prefix, HookSpecs) ->
+ Keys = [list_to_atom(Prefix ++ atom_to_list(Hookpoint))
+ || Hookpoint <- maps:keys(HookSpecs)],
+ lists:foreach(fun emqx_metrics:ensure/1, Keys).
+
+incfun(Prefix) ->
+ fun(Name) ->
+ emqx_metrics:inc(list_to_atom(Prefix ++ atom_to_list(Name)))
+ end.
+
+format(#server{name = Name, hookspec = Hooks}) ->
+ io_lib:format("name=~p, hooks=~0p", [Name, Hooks]).
+
+%%--------------------------------------------------------------------
+%% APIs
+%%--------------------------------------------------------------------
+
+name(#server{name = Name}) ->
+ Name.
+
+-spec call(hookpoint(), map(), server())
+ -> ignore
+ | {ok, Resp :: term()}
+ | {error, term()}.
+call(Hookpoint, Req, #server{name = ChannName, hookspec = Hooks, incfun = IncFun}) ->
+ GrpcFunc = hk2func(Hookpoint),
+ case maps:get(Hookpoint, Hooks, undefined) of
+ undefined -> ignore;
+ Opts ->
+ NeedCall = case lists:member(Hookpoint, message_hooks()) of
+ false -> true;
+ _ ->
+ #{message := #{topic := Topic}} = Req,
+ match_topic_filter(Topic, maps:get(topics, Opts, []))
+ end,
+ case NeedCall of
+ false -> ignore;
+ _ ->
+ IncFun(Hookpoint),
+ do_call(ChannName, GrpcFunc, Req)
+ end
+ end.
+
+-compile({inline, [match_topic_filter/2]}).
+match_topic_filter(_, []) ->
+ true;
+match_topic_filter(TopicName, TopicFilter) ->
+ lists:any(fun(F) -> emqx_topic:match(TopicName, F) end, TopicFilter).
+
+-spec do_call(atom(), atom(), map()) -> {ok, map()} | {error, term()}.
+do_call(ChannName, Fun, Req) ->
+ Options = #{channel => ChannName},
+ ?LOG(debug, "Call ~0p:~0p(~0p, ~0p)", [?PB_CLIENT_MOD, Fun, Req, Options]),
+ case catch apply(?PB_CLIENT_MOD, Fun, [Req, Options]) of
+ {ok, Resp, _Metadata} ->
+ ?LOG(debug, "Response {ok, ~0p, ~0p}", [Resp, _Metadata]),
+ {ok, Resp};
+ {error, {Code, Msg}, _Metadata} ->
+ ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) response errcode: ~0p, errmsg: ~0p",
+ [?PB_CLIENT_MOD, Fun, Req, Options, Code, Msg]),
+ {error, {Code, Msg}};
+ {error, Reason} ->
+ ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) error: ~0p",
+ [?PB_CLIENT_MOD, Fun, Req, Options, Reason]),
+ {error, Reason};
+ {'EXIT', Reason, Stk} ->
+ ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) throw an exception: ~0p, stacktrace: ~p",
+ [?PB_CLIENT_MOD, Fun, Req, Options, Reason, Stk]),
+ {error, Reason}
+ end.
+
+%%--------------------------------------------------------------------
+%% Internal funcs
+%%--------------------------------------------------------------------
+
+-compile({inline, [hk2func/1]}).
+hk2func('client.connect') -> 'on_client_connect';
+hk2func('client.connack') -> 'on_client_connack';
+hk2func('client.connected') -> 'on_client_connected';
+hk2func('client.disconnected') -> 'on_client_disconnected';
+hk2func('client.authenticate') -> 'on_client_authenticate';
+hk2func('client.check_acl') -> 'on_client_check_acl';
+hk2func('client.subscribe') -> 'on_client_subscribe';
+hk2func('client.unsubscribe') -> 'on_client_unsubscribe';
+hk2func('session.created') -> 'on_session_created';
+hk2func('session.subscribed') -> 'on_session_subscribed';
+hk2func('session.unsubscribed') -> 'on_session_unsubscribed';
+hk2func('session.resumed') -> 'on_session_resumed';
+hk2func('session.discarded') -> 'on_session_discarded';
+hk2func('session.takeovered') -> 'on_session_takeovered';
+hk2func('session.terminated') -> 'on_session_terminated';
+hk2func('message.publish') -> 'on_message_publish';
+hk2func('message.delivered') ->'on_message_delivered';
+hk2func('message.acked') -> 'on_message_acked';
+hk2func('message.dropped') ->'on_message_dropped'.
+
+-compile({inline, [message_hooks/0]}).
+message_hooks() ->
+ ['message.publish', 'message.delivered',
+ 'message.acked', 'message.dropped'].
+
+-compile({inline, [available_hooks/0]}).
+available_hooks() ->
+ ['client.connect', 'client.connack', 'client.connected',
+ 'client.disconnected', 'client.authenticate', 'client.check_acl',
+ 'client.subscribe', 'client.unsubscribe',
+ 'session.created', 'session.subscribed', 'session.unsubscribed',
+ 'session.resumed', 'session.discarded', 'session.takeovered',
+ 'session.terminated' | message_hooks()].
diff --git a/apps/emqx_exhook/src/emqx_exhook_sup.erl b/apps/emqx_exhook/src/emqx_exhook_sup.erl
new file mode 100644
index 000000000..3238873df
--- /dev/null
+++ b/apps/emqx_exhook/src/emqx_exhook_sup.erl
@@ -0,0 +1,74 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_exhook_sup).
+
+-behaviour(supervisor).
+
+-export([ start_link/0
+ , init/1
+ ]).
+
+-export([ start_grpc_client_channel/3
+ , stop_grpc_client_channel/1
+ ]).
+
+-export([ start_grpc_client_channel_inplace/3
+ , stop_grpc_client_channel_inplace/1
+ ]).
+
+%%--------------------------------------------------------------------
+%% Supervisor APIs & Callbacks
+%%--------------------------------------------------------------------
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init([]) ->
+ {ok, {{one_for_one, 10, 100}, []}}.
+
+%%--------------------------------------------------------------------
+%% APIs
+%%--------------------------------------------------------------------
+
+-spec start_grpc_client_channel(
+ atom() | string(),
+ [grpcbox_channel:endpoint()],
+ grpcbox_channel:options()) -> {ok, pid()} | {error, term()}.
+start_grpc_client_channel(Name, Endpoints, Options0) ->
+ Options = Options0#{sync_start => true},
+ Spec = #{id => Name,
+ start => {grpcbox_channel, start_link, [Name, Endpoints, Options]},
+ type => worker},
+
+ supervisor:start_child(?MODULE, Spec).
+
+-spec stop_grpc_client_channel(atom()) -> ok.
+stop_grpc_client_channel(Name) ->
+ ok = supervisor:terminate_child(?MODULE, Name),
+ ok = supervisor:delete_child(?MODULE, Name).
+
+-spec start_grpc_client_channel_inplace(
+ atom() | string(),
+ [grpcbox_channel:endpoint()],
+ grpcbox_channel:options()) -> {ok, pid()} | {error, term()}.
+start_grpc_client_channel_inplace(Name, Endpoints, Options0) ->
+ Options = Options0#{sync_start => true},
+ grpcbox_channel_sup:start_child(Name, Endpoints, Options).
+
+-spec stop_grpc_client_channel_inplace(pid()) -> ok.
+stop_grpc_client_channel_inplace(Pid) ->
+ ok = supervisor:terminate_child(grpcbox_channel_sup, Pid).
diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl
new file mode 100644
index 000000000..abfc6f398
--- /dev/null
+++ b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl
@@ -0,0 +1,53 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_exhook_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+%%--------------------------------------------------------------------
+%% Setups
+%%--------------------------------------------------------------------
+
+all() -> emqx_ct:all(?MODULE).
+
+init_per_suite(Cfg) ->
+ _ = emqx_exhook_demo_svr:start(),
+ emqx_ct_helpers:start_apps([emqx_exhook], fun set_special_cfgs/1),
+ Cfg.
+
+end_per_suite(Cfg) ->
+ emqx_exhook_demo_svr:stop(),
+ emqx_ct_helpers:stop_apps([emqx_exhook]).
+
+set_special_cfgs(emqx) ->
+ application:set_env(emqx, allow_anonymous, false),
+ application:set_env(emqx, enable_acl_cache, false),
+ application:set_env(emqx, plugins_loaded_file,
+ emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins"));
+set_special_cfgs(emqx_exhook) ->
+ ok.
+
+%%--------------------------------------------------------------------
+%% Test cases
+%%--------------------------------------------------------------------
+
+t_hooks(Cfg) ->
+ ok.
diff --git a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl
new file mode 100644
index 000000000..08a1d4f7c
--- /dev/null
+++ b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl
@@ -0,0 +1,299 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_exhook_demo_svr).
+
+-behavior(emqx_exhook_v_1_hook_provider_bhvr).
+
+%%
+-export([ start/0
+ , stop/0
+ , take/0
+ , in/1
+ ]).
+
+%% gRPC server HookProvider callbacks
+-export([ on_provider_loaded/2
+ , on_provider_unloaded/2
+ , on_client_connect/2
+ , on_client_connack/2
+ , on_client_connected/2
+ , on_client_disconnected/2
+ , on_client_authenticate/2
+ , on_client_check_acl/2
+ , on_client_subscribe/2
+ , on_client_unsubscribe/2
+ , on_session_created/2
+ , on_session_subscribed/2
+ , on_session_unsubscribed/2
+ , on_session_resumed/2
+ , on_session_discarded/2
+ , on_session_takeovered/2
+ , on_session_terminated/2
+ , on_message_publish/2
+ , on_message_delivered/2
+ , on_message_dropped/2
+ , on_message_acked/2
+ ]).
+
+-define(PORT, 9000).
+
+-define(HTTP, #{grpc_opts => #{service_protos => [emqx_exhook_pb],
+ services => #{'emqx.exhook.v1.HookProvider' => emqx_exhook_demo_svr}},
+ listen_opts => #{port => ?PORT,
+ socket_options => [{reuseaddr, true}]},
+ pool_opts => #{size => 8},
+ transport_opts => #{ssl => false}}).
+
+%%--------------------------------------------------------------------
+%% Server APIs
+%%--------------------------------------------------------------------
+
+start() ->
+ Pid = spawn(fun mngr_main/0),
+ register(?MODULE, Pid),
+ {ok, Pid}.
+
+stop() ->
+ ?MODULE ! stop.
+
+take() ->
+ ?MODULE ! {take, self()},
+ receive {value, V} -> V
+ after 5000 -> error(timeout) end.
+
+in({FunName, Req}) ->
+ ?MODULE ! {in, FunName, Req}.
+
+mngr_main() ->
+ application:ensure_all_started(grpcbox),
+ Svr = grpcbox:start_server(?HTTP),
+ mngr_loop([Svr, queue:new(), queue:new()]).
+
+mngr_loop([Svr, Q, Takes]) ->
+ receive
+ {in, FunName, Req} ->
+ {NQ1, NQ2} = reply(queue:in({FunName, Req}, Q), Takes),
+ mngr_loop([Svr, NQ1, NQ2]);
+ {take, From} ->
+ {NQ1, NQ2} = reply(Q, queue:in(From, Takes)),
+ mngr_loop([Svr, NQ1, NQ2]);
+ stop ->
+ supervisor:terminate_child(grpcbox_services_simple_sup, Svr),
+ exit(normal)
+ end.
+
+reply(Q1, Q2) ->
+ case queue:len(Q1) =:= 0 orelse
+ queue:len(Q2) =:= 0 of
+ true -> {Q1, Q2};
+ _ ->
+ {{value, {Name, V}}, NQ1} = queue:out(Q1),
+ {{value, From}, NQ2} = queue:out(Q2),
+ From ! {value, {Name, V}},
+ {NQ1, NQ2}
+ end.
+
+%%--------------------------------------------------------------------
+%% callbacks
+%%--------------------------------------------------------------------
+
+-spec on_provider_loaded(ctx:ctx(), emqx_exhook_pb:on_provider_loadedial_request())
+ -> {ok, emqx_exhook_pb:on_provider_loaded_response(), ctx:ctx()}
+ | grpcbox_stream:grpc_error_response().
+on_provider_loaded(Ctx, Req) ->
+ ?MODULE:in({?FUNCTION_NAME, Req}),
+ %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+ {ok, #{hooks => [
+ #{name => <<"client.connect">>},
+ #{name => <<"client.connack">>},
+ #{name => <<"client.connected">>},
+ #{name => <<"client.disconnected">>},
+ #{name => <<"client.authenticate">>},
+ #{name => <<"client.check_acl">>},
+ #{name => <<"client.subscribe">>},
+ #{name => <<"client.unsubscribe">>},
+ #{name => <<"session.created">>},
+ #{name => <<"session.subscribed">>},
+ #{name => <<"session.unsubscribed">>},
+ #{name => <<"session.resumed">>},
+ #{name => <<"session.discarded">>},
+ #{name => <<"session.takeovered">>},
+ #{name => <<"session.terminated">>},
+ #{name => <<"message.publish">>},
+ #{name => <<"message.delivered">>},
+ #{name => <<"message.acked">>},
+ #{name => <<"message.dropped">>}]}, Ctx}.
+
+-spec on_provider_unloaded(ctx:ctx(), emqx_exhook_pb:on_provider_unloadedial_request())
+ -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+ | grpcbox_stream:grpc_error_response().
+on_provider_unloaded(Ctx, Req) ->
+ ?MODULE:in({?FUNCTION_NAME, Req}),
+ %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+ {ok, #{}, Ctx}.
+
+-spec on_client_connect(ctx:ctx(), emqx_exhook_pb:client_connect_request())
+ -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+ | grpcbox_stream:grpc_error_response().
+on_client_connect(Ctx, Req) ->
+ ?MODULE:in({?FUNCTION_NAME, Req}),
+ %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+ {ok, #{}, Ctx}.
+
+-spec on_client_connack(ctx:ctx(), emqx_exhook_pb:client_connack_request())
+ -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+ | grpcbox_stream:grpc_error_response().
+on_client_connack(Ctx, Req) ->
+ ?MODULE:in({?FUNCTION_NAME, Req}),
+ %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+ {ok, #{}, Ctx}.
+
+-spec on_client_connected(ctx:ctx(), emqx_exhook_pb:client_connected_request())
+ -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+ | grpcbox_stream:grpc_error_response().
+on_client_connected(Ctx, Req) ->
+ ?MODULE:in({?FUNCTION_NAME, Req}),
+ %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+ {ok, #{}, Ctx}.
+
+-spec on_client_disconnected(ctx:ctx(), emqx_exhook_pb:client_disconnected_request())
+ -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+ | grpcbox_stream:grpc_error_response().
+on_client_disconnected(Ctx, Req) ->
+ ?MODULE:in({?FUNCTION_NAME, Req}),
+ %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+ {ok, #{}, Ctx}.
+
+-spec on_client_authenticate(ctx:ctx(), emqx_exhook_pb:client_authenticate_request())
+ -> {ok, emqx_exhook_pb:bool_result(), ctx:ctx()}
+ | grpcbox_stream:grpc_error_response().
+on_client_authenticate(Ctx, Req) ->
+ ?MODULE:in({?FUNCTION_NAME, Req}),
+ %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+ {ok, #{type => 'IGNORE'}, Ctx}.
+
+-spec on_client_check_acl(ctx:ctx(), emqx_exhook_pb:client_check_acl_request())
+ -> {ok, emqx_exhook_pb:bool_result(), ctx:ctx()}
+ | grpcbox_stream:grpc_error_response().
+on_client_check_acl(Ctx, Req) ->
+ ?MODULE:in({?FUNCTION_NAME, Req}),
+ %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+ {ok, #{type => 'STOP_AND_RETURN', value => {bool_result, true}}, Ctx}.
+
+-spec on_client_subscribe(ctx:ctx(), emqx_exhook_pb:client_subscribe_request())
+ -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+ | grpcbox_stream:grpc_error_response().
+on_client_subscribe(Ctx, Req) ->
+ ?MODULE:in({?FUNCTION_NAME, Req}),
+ %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+ {ok, #{}, Ctx}.
+
+-spec on_client_unsubscribe(ctx:ctx(), emqx_exhook_pb:client_unsubscribe_request())
+ -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+ | grpcbox_stream:grpc_error_response().
+on_client_unsubscribe(Ctx, Req) ->
+ ?MODULE:in({?FUNCTION_NAME, Req}),
+ %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+ {ok, #{}, Ctx}.
+
+-spec on_session_created(ctx:ctx(), emqx_exhook_pb:session_created_request())
+ -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+ | grpcbox_stream:grpc_error_response().
+on_session_created(Ctx, Req) ->
+ ?MODULE:in({?FUNCTION_NAME, Req}),
+ %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+ {ok, #{}, Ctx}.
+
+-spec on_session_subscribed(ctx:ctx(), emqx_exhook_pb:session_subscribed_request())
+ -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+ | grpcbox_stream:grpc_error_response().
+on_session_subscribed(Ctx, Req) ->
+ ?MODULE:in({?FUNCTION_NAME, Req}),
+ %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+ {ok, #{}, Ctx}.
+
+-spec on_session_unsubscribed(ctx:ctx(), emqx_exhook_pb:session_unsubscribed_request())
+ -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+ | grpcbox_stream:grpc_error_response().
+on_session_unsubscribed(Ctx, Req) ->
+ ?MODULE:in({?FUNCTION_NAME, Req}),
+ %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+ {ok, #{}, Ctx}.
+
+-spec on_session_resumed(ctx:ctx(), emqx_exhook_pb:session_resumed_request())
+ -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+ | grpcbox_stream:grpc_error_response().
+on_session_resumed(Ctx, Req) ->
+ ?MODULE:in({?FUNCTION_NAME, Req}),
+ %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+ {ok, #{}, Ctx}.
+
+-spec on_session_discarded(ctx:ctx(), emqx_exhook_pb:session_discarded_request())
+ -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+ | grpcbox_stream:grpc_error_response().
+on_session_discarded(Ctx, Req) ->
+ ?MODULE:in({?FUNCTION_NAME, Req}),
+ %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+ {ok, #{}, Ctx}.
+
+-spec on_session_takeovered(ctx:ctx(), emqx_exhook_pb:session_takeovered_request())
+ -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+ | grpcbox_stream:grpc_error_response().
+on_session_takeovered(Ctx, Req) ->
+ ?MODULE:in({?FUNCTION_NAME, Req}),
+ %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+ {ok, #{}, Ctx}.
+
+-spec on_session_terminated(ctx:ctx(), emqx_exhook_pb:session_terminated_request())
+ -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+ | grpcbox_stream:grpc_error_response().
+on_session_terminated(Ctx, Req) ->
+ ?MODULE:in({?FUNCTION_NAME, Req}),
+ %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+ {ok, #{}, Ctx}.
+
+-spec on_message_publish(ctx:ctx(), emqx_exhook_pb:message_publish_request())
+ -> {ok, emqx_exhook_pb:valued_response(), ctx:ctx()}
+ | grpcbox_stream:grpc_error_response().
+on_message_publish(Ctx, Req) ->
+ ?MODULE:in({?FUNCTION_NAME, Req}),
+ %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+ {ok, #{}, Ctx}.
+
+-spec on_message_delivered(ctx:ctx(), emqx_exhook_pb:message_delivered_request())
+ -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+ | grpcbox_stream:grpc_error_response().
+on_message_delivered(Ctx, Req) ->
+ ?MODULE:in({?FUNCTION_NAME, Req}),
+ %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+ {ok, #{}, Ctx}.
+
+-spec on_message_dropped(ctx:ctx(), emqx_exhook_pb:message_dropped_request())
+ -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+ | grpcbox_stream:grpc_error_response().
+on_message_dropped(Ctx, Req) ->
+ ?MODULE:in({?FUNCTION_NAME, Req}),
+ %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+ {ok, #{}, Ctx}.
+
+-spec on_message_acked(ctx:ctx(), emqx_exhook_pb:message_acked_request())
+ -> {ok, emqx_exhook_pb:empty_success(), ctx:ctx()}
+ | grpcbox_stream:grpc_error_response().
+on_message_acked(Ctx, Req) ->
+ ?MODULE:in({?FUNCTION_NAME, Req}),
+ %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]),
+ {ok, #{}, Ctx}.
diff --git a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl
new file mode 100644
index 000000000..e828d354c
--- /dev/null
+++ b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl
@@ -0,0 +1,535 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(prop_exhook_hooks).
+
+-include_lib("proper/include/proper.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-import(emqx_ct_proper_types,
+ [ conninfo/0
+ , clientinfo/0
+ , sessioninfo/0
+ , message/0
+ , connack_return_code/0
+ , topictab/0
+ , topic/0
+ , subopts/0
+ ]).
+
+-define(ALL(Vars, Types, Exprs),
+ ?SETUP(fun() ->
+ State = do_setup(),
+ fun() -> do_teardown(State) end
+ end, ?FORALL(Vars, Types, Exprs))).
+
+%%--------------------------------------------------------------------
+%% Properties
+%%--------------------------------------------------------------------
+
+prop_client_connect() ->
+ ?ALL({ConnInfo, ConnProps},
+ {conninfo(), conn_properties()},
+ begin
+ _OutConnProps = emqx_hooks:run_fold('client.connect', [ConnInfo], ConnProps),
+ {'on_client_connect', Resp} = emqx_exhook_demo_svr:take(),
+ Expected =
+ #{props => properties(ConnProps),
+ conninfo =>
+ #{node => nodestr(),
+ clientid => maps:get(clientid, ConnInfo),
+ username => maybe(maps:get(username, ConnInfo, <<>>)),
+ peerhost => peerhost(ConnInfo),
+ sockport => sockport(ConnInfo),
+ proto_name => maps:get(proto_name, ConnInfo),
+ proto_ver => stringfy(maps:get(proto_ver, ConnInfo)),
+ keepalive => maps:get(keepalive, ConnInfo)
+ }
+ },
+ ?assertEqual(Expected, Resp),
+ true
+ end).
+
+prop_client_connack() ->
+ ?ALL({ConnInfo, Rc, AckProps},
+ {conninfo(), connack_return_code(), ack_properties()},
+ begin
+ _OutAckProps = emqx_hooks:run_fold('client.connack', [ConnInfo, Rc], AckProps),
+ {'on_client_connack', Resp} = emqx_exhook_demo_svr:take(),
+ Expected =
+ #{props => properties(AckProps),
+ result_code => atom_to_binary(Rc, utf8),
+ conninfo =>
+ #{node => nodestr(),
+ clientid => maps:get(clientid, ConnInfo),
+ username => maybe(maps:get(username, ConnInfo, <<>>)),
+ peerhost => peerhost(ConnInfo),
+ sockport => sockport(ConnInfo),
+ proto_name => maps:get(proto_name, ConnInfo),
+ proto_ver => stringfy(maps:get(proto_ver, ConnInfo)),
+ keepalive => maps:get(keepalive, ConnInfo)
+ }
+ },
+ ?assertEqual(Expected, Resp),
+ true
+ end).
+
+prop_client_authenticate() ->
+ ?ALL({ClientInfo, AuthResult}, {clientinfo(), authresult()},
+ begin
+ _OutAuthResult = emqx_hooks:run_fold('client.authenticate', [ClientInfo], AuthResult),
+ {'on_client_authenticate', Resp} = emqx_exhook_demo_svr:take(),
+ Expected =
+ #{result => authresult_to_bool(AuthResult),
+ clientinfo =>
+ #{node => nodestr(),
+ clientid => maps:get(clientid, ClientInfo),
+ username => maybe(maps:get(username, ClientInfo, <<>>)),
+ password => maybe(maps:get(password, ClientInfo, <<>>)),
+ peerhost => ntoa(maps:get(peerhost, ClientInfo)),
+ sockport => maps:get(sockport, ClientInfo),
+ protocol => stringfy(maps:get(protocol, ClientInfo)),
+ mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
+ is_superuser => maps:get(is_superuser, ClientInfo, false),
+ anonymous => maps:get(anonymous, ClientInfo, true)
+ }
+ },
+ ?assertEqual(Expected, Resp),
+ true
+ end).
+
+prop_client_check_acl() ->
+ ?ALL({ClientInfo, PubSub, Topic, Result},
+ {clientinfo(), oneof([publish, subscribe]), topic(), oneof([allow, deny])},
+ begin
+ _OutResult = emqx_hooks:run_fold('client.check_acl', [ClientInfo, PubSub, Topic], Result),
+ {'on_client_check_acl', Resp} = emqx_exhook_demo_svr:take(),
+ Expected =
+ #{result => aclresult_to_bool(Result),
+ type => pubsub_to_enum(PubSub),
+ topic => Topic,
+ clientinfo =>
+ #{node => nodestr(),
+ clientid => maps:get(clientid, ClientInfo),
+ username => maybe(maps:get(username, ClientInfo, <<>>)),
+ password => maybe(maps:get(password, ClientInfo, <<>>)),
+ peerhost => ntoa(maps:get(peerhost, ClientInfo)),
+ sockport => maps:get(sockport, ClientInfo),
+ protocol => stringfy(maps:get(protocol, ClientInfo)),
+ mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
+ is_superuser => maps:get(is_superuser, ClientInfo, false),
+ anonymous => maps:get(anonymous, ClientInfo, true)
+ }
+ },
+ ?assertEqual(Expected, Resp),
+ true
+ end).
+
+
+prop_client_connected() ->
+ ?ALL({ClientInfo, ConnInfo},
+ {clientinfo(), conninfo()},
+ begin
+ ok = emqx_hooks:run('client.connected', [ClientInfo, ConnInfo]),
+ {'on_client_connected', Resp} = emqx_exhook_demo_svr:take(),
+ Expected =
+ #{clientinfo =>
+ #{node => nodestr(),
+ clientid => maps:get(clientid, ClientInfo),
+ username => maybe(maps:get(username, ClientInfo, <<>>)),
+ password => maybe(maps:get(password, ClientInfo, <<>>)),
+ peerhost => ntoa(maps:get(peerhost, ClientInfo)),
+ sockport => maps:get(sockport, ClientInfo),
+ protocol => stringfy(maps:get(protocol, ClientInfo)),
+ mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
+ is_superuser => maps:get(is_superuser, ClientInfo, false),
+ anonymous => maps:get(anonymous, ClientInfo, true)
+ }
+ },
+ ?assertEqual(Expected, Resp),
+ true
+ end).
+
+prop_client_disconnected() ->
+ ?ALL({ClientInfo, Reason, ConnInfo},
+ {clientinfo(), shutdown_reason(), conninfo()},
+ begin
+ ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]),
+ {'on_client_disconnected', Resp} = emqx_exhook_demo_svr:take(),
+ Expected =
+ #{reason => stringfy(Reason),
+ clientinfo =>
+ #{node => nodestr(),
+ clientid => maps:get(clientid, ClientInfo),
+ username => maybe(maps:get(username, ClientInfo, <<>>)),
+ password => maybe(maps:get(password, ClientInfo, <<>>)),
+ peerhost => ntoa(maps:get(peerhost, ClientInfo)),
+ sockport => maps:get(sockport, ClientInfo),
+ protocol => stringfy(maps:get(protocol, ClientInfo)),
+ mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
+ is_superuser => maps:get(is_superuser, ClientInfo, false),
+ anonymous => maps:get(anonymous, ClientInfo, true)
+ }
+ },
+ ?assertEqual(Expected, Resp),
+ true
+ end).
+
+prop_client_subscribe() ->
+ ?ALL({ClientInfo, SubProps, TopicTab},
+ {clientinfo(), sub_properties(), topictab()},
+ begin
+ _OutTopicTab = emqx_hooks:run_fold('client.subscribe', [ClientInfo, SubProps], TopicTab),
+ {'on_client_subscribe', Resp} = emqx_exhook_demo_svr:take(),
+ Expected =
+ #{props => properties(SubProps),
+ topic_filters => topicfilters(TopicTab),
+ clientinfo =>
+ #{node => nodestr(),
+ clientid => maps:get(clientid, ClientInfo),
+ username => maybe(maps:get(username, ClientInfo, <<>>)),
+ password => maybe(maps:get(password, ClientInfo, <<>>)),
+ peerhost => ntoa(maps:get(peerhost, ClientInfo)),
+ sockport => maps:get(sockport, ClientInfo),
+ protocol => stringfy(maps:get(protocol, ClientInfo)),
+ mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
+ is_superuser => maps:get(is_superuser, ClientInfo, false),
+ anonymous => maps:get(anonymous, ClientInfo, true)
+ }
+ },
+ ?assertEqual(Expected, Resp),
+ true
+ end).
+
+prop_client_unsubscribe() ->
+ ?ALL({ClientInfo, UnSubProps, TopicTab},
+ {clientinfo(), unsub_properties(), topictab()},
+ begin
+ _OutTopicTab = emqx_hooks:run_fold('client.unsubscribe', [ClientInfo, UnSubProps], TopicTab),
+ {'on_client_unsubscribe', Resp} = emqx_exhook_demo_svr:take(),
+ Expected =
+ #{props => properties(UnSubProps),
+ topic_filters => topicfilters(TopicTab),
+ clientinfo =>
+ #{node => nodestr(),
+ clientid => maps:get(clientid, ClientInfo),
+ username => maybe(maps:get(username, ClientInfo, <<>>)),
+ password => maybe(maps:get(password, ClientInfo, <<>>)),
+ peerhost => ntoa(maps:get(peerhost, ClientInfo)),
+ sockport => maps:get(sockport, ClientInfo),
+ protocol => stringfy(maps:get(protocol, ClientInfo)),
+ mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
+ is_superuser => maps:get(is_superuser, ClientInfo, false),
+ anonymous => maps:get(anonymous, ClientInfo, true)
+ }
+ },
+ ?assertEqual(Expected, Resp),
+ true
+ end).
+
+prop_session_created() ->
+ ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()},
+ begin
+ ok = emqx_hooks:run('session.created', [ClientInfo, SessInfo]),
+ {'on_session_created', Resp} = emqx_exhook_demo_svr:take(),
+ Expected =
+ #{clientinfo =>
+ #{node => nodestr(),
+ clientid => maps:get(clientid, ClientInfo),
+ username => maybe(maps:get(username, ClientInfo, <<>>)),
+ password => maybe(maps:get(password, ClientInfo, <<>>)),
+ peerhost => ntoa(maps:get(peerhost, ClientInfo)),
+ sockport => maps:get(sockport, ClientInfo),
+ protocol => stringfy(maps:get(protocol, ClientInfo)),
+ mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
+ is_superuser => maps:get(is_superuser, ClientInfo, false),
+ anonymous => maps:get(anonymous, ClientInfo, true)
+ }
+ },
+ ?assertEqual(Expected, Resp),
+ true
+ end).
+
+prop_session_subscribed() ->
+ ?ALL({ClientInfo, Topic, SubOpts},
+ {clientinfo(), topic(), subopts()},
+ begin
+ ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]),
+ {'on_session_subscribed', Resp} = emqx_exhook_demo_svr:take(),
+ Expected =
+ #{topic => Topic,
+ subopts => subopts(SubOpts),
+ clientinfo =>
+ #{node => nodestr(),
+ clientid => maps:get(clientid, ClientInfo),
+ username => maybe(maps:get(username, ClientInfo, <<>>)),
+ password => maybe(maps:get(password, ClientInfo, <<>>)),
+ peerhost => ntoa(maps:get(peerhost, ClientInfo)),
+ sockport => maps:get(sockport, ClientInfo),
+ protocol => stringfy(maps:get(protocol, ClientInfo)),
+ mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
+ is_superuser => maps:get(is_superuser, ClientInfo, false),
+ anonymous => maps:get(anonymous, ClientInfo, true)
+ }
+ },
+ ?assertEqual(Expected, Resp),
+ true
+ end).
+
+prop_session_unsubscribed() ->
+ ?ALL({ClientInfo, Topic, SubOpts},
+ {clientinfo(), topic(), subopts()},
+ begin
+ ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, Topic, SubOpts]),
+ {'on_session_unsubscribed', Resp} = emqx_exhook_demo_svr:take(),
+ Expected =
+ #{topic => Topic,
+ clientinfo =>
+ #{node => nodestr(),
+ clientid => maps:get(clientid, ClientInfo),
+ username => maybe(maps:get(username, ClientInfo, <<>>)),
+ password => maybe(maps:get(password, ClientInfo, <<>>)),
+ peerhost => ntoa(maps:get(peerhost, ClientInfo)),
+ sockport => maps:get(sockport, ClientInfo),
+ protocol => stringfy(maps:get(protocol, ClientInfo)),
+ mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
+ is_superuser => maps:get(is_superuser, ClientInfo, false),
+ anonymous => maps:get(anonymous, ClientInfo, true)
+ }
+ },
+ ?assertEqual(Expected, Resp),
+ true
+ end).
+
+prop_session_resumed() ->
+ ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()},
+ begin
+ ok = emqx_hooks:run('session.resumed', [ClientInfo, SessInfo]),
+ {'on_session_resumed', Resp} = emqx_exhook_demo_svr:take(),
+ Expected =
+ #{clientinfo =>
+ #{node => nodestr(),
+ clientid => maps:get(clientid, ClientInfo),
+ username => maybe(maps:get(username, ClientInfo, <<>>)),
+ password => maybe(maps:get(password, ClientInfo, <<>>)),
+ peerhost => ntoa(maps:get(peerhost, ClientInfo)),
+ sockport => maps:get(sockport, ClientInfo),
+ protocol => stringfy(maps:get(protocol, ClientInfo)),
+ mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
+ is_superuser => maps:get(is_superuser, ClientInfo, false),
+ anonymous => maps:get(anonymous, ClientInfo, true)
+ }
+ },
+ ?assertEqual(Expected, Resp),
+ true
+ end).
+
+prop_session_discared() ->
+ ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()},
+ begin
+ ok = emqx_hooks:run('session.discarded', [ClientInfo, SessInfo]),
+ {'on_session_discarded', Resp} = emqx_exhook_demo_svr:take(),
+ Expected =
+ #{clientinfo =>
+ #{node => nodestr(),
+ clientid => maps:get(clientid, ClientInfo),
+ username => maybe(maps:get(username, ClientInfo, <<>>)),
+ password => maybe(maps:get(password, ClientInfo, <<>>)),
+ peerhost => ntoa(maps:get(peerhost, ClientInfo)),
+ sockport => maps:get(sockport, ClientInfo),
+ protocol => stringfy(maps:get(protocol, ClientInfo)),
+ mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
+ is_superuser => maps:get(is_superuser, ClientInfo, false),
+ anonymous => maps:get(anonymous, ClientInfo, true)
+ }
+ },
+ ?assertEqual(Expected, Resp),
+ true
+ end).
+
+prop_session_takeovered() ->
+ ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()},
+ begin
+ ok = emqx_hooks:run('session.takeovered', [ClientInfo, SessInfo]),
+ {'on_session_takeovered', Resp} = emqx_exhook_demo_svr:take(),
+ Expected =
+ #{clientinfo =>
+ #{node => nodestr(),
+ clientid => maps:get(clientid, ClientInfo),
+ username => maybe(maps:get(username, ClientInfo, <<>>)),
+ password => maybe(maps:get(password, ClientInfo, <<>>)),
+ peerhost => ntoa(maps:get(peerhost, ClientInfo)),
+ sockport => maps:get(sockport, ClientInfo),
+ protocol => stringfy(maps:get(protocol, ClientInfo)),
+ mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
+ is_superuser => maps:get(is_superuser, ClientInfo, false),
+ anonymous => maps:get(anonymous, ClientInfo, true)
+ }
+ },
+ ?assertEqual(Expected, Resp),
+ true
+ end).
+
+prop_session_terminated() ->
+ ?ALL({ClientInfo, Reason, SessInfo},
+ {clientinfo(), shutdown_reason(), sessioninfo()},
+ begin
+ ok = emqx_hooks:run('session.terminated', [ClientInfo, Reason, SessInfo]),
+ {'on_session_terminated', Resp} = emqx_exhook_demo_svr:take(),
+ Expected =
+ #{reason => stringfy(Reason),
+ clientinfo =>
+ #{node => nodestr(),
+ clientid => maps:get(clientid, ClientInfo),
+ username => maybe(maps:get(username, ClientInfo, <<>>)),
+ password => maybe(maps:get(password, ClientInfo, <<>>)),
+ peerhost => ntoa(maps:get(peerhost, ClientInfo)),
+ sockport => maps:get(sockport, ClientInfo),
+ protocol => stringfy(maps:get(protocol, ClientInfo)),
+ mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)),
+ is_superuser => maps:get(is_superuser, ClientInfo, false),
+ anonymous => maps:get(anonymous, ClientInfo, true)
+ }
+ },
+ ?assertEqual(Expected, Resp),
+
+ true
+ end).
+
+nodestr() ->
+ stringfy(node()).
+
+peerhost(#{peername := {Host, _}}) ->
+ ntoa(Host).
+
+sockport(#{sockname := {_, Port}}) ->
+ Port.
+
+%% copied from emqx_exhook
+
+ntoa({0,0,0,0,0,16#ffff,AB,CD}) ->
+ list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}));
+ntoa(IP) ->
+ list_to_binary(inet_parse:ntoa(IP)).
+
+maybe(undefined) -> <<>>;
+maybe(B) -> B.
+
+properties(undefined) -> [];
+properties(M) when is_map(M) ->
+ maps:fold(fun(K, V, Acc) ->
+ [#{name => stringfy(K),
+ value => stringfy(V)} | Acc]
+ end, [], M).
+
+topicfilters(Tfs) when is_list(Tfs) ->
+ [#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs].
+
+%% @private
+stringfy(Term) when is_binary(Term) ->
+ Term;
+stringfy(Term) when is_integer(Term) ->
+ integer_to_binary(Term);
+stringfy(Term) when is_atom(Term) ->
+ atom_to_binary(Term, utf8);
+stringfy(Term) ->
+ unicode:characters_to_binary((io_lib:format("~0p", [Term]))).
+
+subopts(SubOpts) ->
+ #{qos => maps:get(qos, SubOpts, 0),
+ rh => maps:get(rh, SubOpts, 0),
+ rap => maps:get(rap, SubOpts, 0),
+ nl => maps:get(nl, SubOpts, 0),
+ share => maps:get(share, SubOpts, <<>>)
+ }.
+
+authresult_to_bool(AuthResult) ->
+ maps:get(auth_result, AuthResult, undefined) == success.
+
+aclresult_to_bool(Result) ->
+ Result == allow.
+
+pubsub_to_enum(publish) -> 'PUBLISH';
+pubsub_to_enum(subscribe) -> 'SUBSCRIBE'.
+
+%prop_message_publish() ->
+% ?ALL({Msg, Env, Encode}, {message(), topic_filter_env()},
+% begin
+% true
+% end).
+%
+%prop_message_delivered() ->
+% ?ALL({ClientInfo, Msg, Env, Encode}, {clientinfo(), message(), topic_filter_env()},
+% begin
+% true
+% end).
+%
+%prop_message_acked() ->
+% ?ALL({ClientInfo, Msg, Env, Encode}, {clientinfo(), message()},
+% begin
+% true
+% end).
+
+%%--------------------------------------------------------------------
+%% Helper
+%%--------------------------------------------------------------------
+
+do_setup() ->
+ _ = emqx_exhook_demo_svr:start(),
+ emqx_ct_helpers:start_apps([emqx_exhook], fun set_special_cfgs/1),
+ %% waiting first loaded event
+ {'on_provider_loaded', _} = emqx_exhook_demo_svr:take(),
+ ok.
+
+do_teardown(_) ->
+ emqx_ct_helpers:stop_apps([emqx_exhook]),
+ %% waiting last unloaded event
+ {'on_provider_unloaded', _} = emqx_exhook_demo_svr:take(),
+ _ = emqx_exhook_demo_svr:stop(),
+ ok.
+
+set_special_cfgs(emqx) ->
+ application:set_env(emqx, allow_anonymous, false),
+ application:set_env(emqx, enable_acl_cache, false),
+ application:set_env(emqx, plugins_loaded_file,
+ emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins"));
+set_special_cfgs(emqx_exhook) ->
+ ok.
+
+%%--------------------------------------------------------------------
+%% Generators
+%%--------------------------------------------------------------------
+
+conn_properties() ->
+ #{}.
+
+ack_properties() ->
+ #{}.
+
+sub_properties() ->
+ #{}.
+
+unsub_properties() ->
+ #{}.
+
+shutdown_reason() ->
+ oneof([utf8(), {shutdown, atom()}]).
+
+authresult() ->
+ #{auth_result => connack_return_code()}.
+
+%topic_filter_env() ->
+% oneof([{<<"#">>}, {undefined}, {topic()}]).
diff --git a/etc/emqx_cloud.d/emqx_exhook.conf b/etc/emqx_cloud.d/emqx_exhook.conf
new file mode 100644
index 000000000..f6f5213f7
--- /dev/null
+++ b/etc/emqx_cloud.d/emqx_exhook.conf
@@ -0,0 +1,15 @@
+##====================================================================
+## EMQ X Hooks
+##====================================================================
+
+##--------------------------------------------------------------------
+## Server Address
+
+## The gRPC server url
+##
+## exhook.server.$name.url = url()
+exhook.server.default.url = http://127.0.0.1:9000
+
+#exhook.server.default.ssl.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem
+#exhook.server.default.ssl.certfile = {{ platform_etc_dir }}/certs/cert.pem
+#exhook.server.default.ssl.keyfile = {{ platform_etc_dir }}/certs/key.pem
diff --git a/rebar.config b/rebar.config
index df71f9762..046a6e326 100644
--- a/rebar.config
+++ b/rebar.config
@@ -1,49 +1,208 @@
{minimum_otp_vsn, "21.3"}.
-
-{plugins, [rebar3_proper]}.
-
-{deps,
- [{gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}},
- {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}},
- {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}},
- {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.4"}}},
- {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.4"}}},
- {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}},
- {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
- ]}.
-
-{erl_opts, [warn_unused_vars,
- warn_shadow_vars,
- warn_unused_import,
- warn_obsolete_guard,
- debug_info,
- compressed %% for edge
+{plugins,[{relup_helper,{git,"https://github.com/emqx/relup_helper",
+ {branch,"master"}}}]}.
+{edge_relx_overlay,[]}.
+{edoc_opts,[{preprocess,true}]}.
+{erl_opts,[warn_unused_vars,warn_shadow_vars,warn_unused_import,
+ warn_obsolete_guard,no_debug_info,compressed]}.
+{overrides,[{add,[{erl_opts,[no_debug_info,compressed,
+ {parse_transform,mod_vsn}]}]}]}.
+{xref_checks,[undefined_function_calls,undefined_functions,locals_not_used,
+ deprecated_function_calls,warnings_as_errors,
+ deprecated_functions]}.
+{dialyzer, [{warnings, [unmatched_returns, error_handling, race_conditions]}
]}.
+{cover_enabled,true}.
+{cover_opts,[verbose]}.
+{cover_export_enabled,true}.
+{provider_hooks,[{pre,[{release,{relup_helper,gen_appups}}]},
+ {post,[{release,{relup_helper,otp_vsn}},
+ {release,{relup_helper,untar}}]}]}.
+{post_hooks,[]}.
+{erl_first_files, ["apps/emqx/src/emqx_logger.erl"]}.
-{overrides, [{add, [{erl_opts, [compressed]}]}]}.
+"${COVERALL_CONFIGS}".
-{edoc_opts, [{preprocess, true}]}.
-
-{xref_checks, [undefined_function_calls, undefined_functions,
- locals_not_used, deprecated_function_calls,
- warnings_as_errors, deprecated_functions
- ]}.
-
-{cover_enabled, true}.
-{cover_opts, [verbose]}.
-{cover_export_enabled, true}.
-
-{erl_first_files, ["src/emqx_logger.erl"]}.
+{deps, ["${BASIC_DEPS}"]}.
{profiles,
- [{test,
- [{plugins, [{coveralls, {git, "https://github.com/emqx/coveralls-erl", {branch, "github"}}}]},
- {deps,
- [{bbmustache, "1.7.0"},
- {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.0"}}},
- {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.3.0"}}}
- ]},
- {erl_opts, [debug_info]}
- ]}
- ]}.
+ [{'emqx',
+ [{relx,
+ ["${BASIC_RELX_PARAMS}",
+ {release, {emqx, "${GIT_DESC}"}, ["${BASIC_RELX_APPS}", "${CLOUD_RELX_APPS}"]},
+ {overlay, ["${BASIC_OVERLAYS}", "${CLOUD_OVERLAYS}"]},
+ {overlay_vars,["vars/vars-cloud.config","vars/vars-bin.config"]}]}]},
+ {'emqx-pkg',
+ [{relx,
+ ["${BASIC_RELX_PARAMS}",
+ {release, {emqx, "${GIT_DESC}"}, ["${BASIC_RELX_APPS}", "${CLOUD_RELX_APPS}"]},
+ {overlay, ["${BASIC_OVERLAYS}", "${CLOUD_OVERLAYS}"]},
+ {overlay_vars,["vars/vars-cloud.config","vars/vars-pkg.config"]}]}]},
+ {'emqx-edge',
+ [{relx,
+ ["${BASIC_RELX_PARAMS}",
+ {release, {emqx, "${GIT_DESC}"}, ["${BASIC_RELX_APPS}"]},
+ {overlay, ["${BASIC_OVERLAYS}", "${EDGE_OVERLAYS}"]},
+ {overlay_vars,["vars/vars-edge.config","vars/vars-bin.config"]}]}]},
+ {'emqx-edge-pkg',
+ [{relx,
+ ["${BASIC_RELX_PARAMS}",
+ {release, {emqx, "${GIT_DESC}"}, ["${BASIC_RELX_APPS}"]},
+ {overlay, ["${BASIC_OVERLAYS}", "${EDGE_OVERLAYS}"]},
+ {overlay_vars,["vars/vars-edge.config","vars/vars-pkg.config"]}]}]},
+ {test,
+ [
+ {plugins, [
+ rebar3_proper,
+ {coveralls,
+ {git, "https://github.com/emqx/coveralls-erl",
+ {branch, "github"}}}]},
+ {deps,
+ [ {bbmustache, "1.7.0"},
+ {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.0"}}},
+ {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.3.0"}}},
+ meck
+ ]},
+ {erl_opts, [debug_info]}
+ ]}
+ ]}.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%%% Placeholders
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+{placeholders, [
+
+ {"BASIC_DEPS", elems,
+ [ {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
+ , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
+ , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}}
+ , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.2"}}}
+ , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.4"}}}
+ , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.0"}}}
+ , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}
+ , {emqx_passwd, {git, "https://github.com/emqx/emqx-passwd", {tag, "v1.1.1"}}}
+ , {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.0"}}}
+ , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "v0.4.2"}}}
+ , {replayq, {git, "https://github.com/emqx/replayq", {tag, "v0.2.0"}}}
+ , {erlport, {git, "https://github.com/emqx/erlport", {tag, "v1.2.2"}}}
+ , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {branch, "2.0.3"}}}
+ , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.0"}}}
+ , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.1"}}}
+ , {getopt, "1.0.1"}
+ ]
+ },
+
+ {"BASIC_RELX_PARAMS", elems,
+ [ {include_src,false}
+ , {extended_start_script,false}
+ , {generate_start_script,false}
+ , {sys_config,false}
+ , {vm_args,false}
+ ]
+ },
+
+ {"BASIC_RELX_APPS", elems,
+ [ kernel
+ , sasl
+ , crypto
+ , public_key
+ , asn1
+ , syntax_tools
+ , ssl
+ , os_mon
+ , inets
+ , compiler
+ , runtime_tools
+ , cuttlefish
+ , emqx
+ , {mnesia, load}
+ , {ekka, load}
+ , {emqx_retainer, load}
+ , {emqx_management, load}
+ , {emqx_dashboard, load}
+ , {emqx_bridge_mqtt, load}
+ , {emqx_sn, load}
+ , {emqx_coap, load}
+ , {emqx_stomp, load}
+ , {emqx_auth_clientid, load}
+ , {emqx_auth_username, load}
+ , {emqx_auth_http, load}
+ , {emqx_auth_mysql, load}
+ , {emqx_auth_jwt, load}
+ , {emqx_auth_mnesia, load}
+ , {emqx_web_hook, load}
+ , {emqx_recon, load}
+ , {emqx_rule_engine, load}
+ , {emqx_sasl, load}
+ , {emqx_telemetry, load}
+ ]
+ },
+
+ {"CLOUD_RELX_APPS", elems,
+ [ {emqx_lwm2m, load}
+ , {emqx_auth_ldap, load}
+ , {emqx_auth_pgsql, load}
+ , {emqx_auth_redis, load}
+ , {emqx_auth_mongo, load}
+ , {emqx_lua_hook, load}
+ , {emqx_exhook, load}
+ , {emqx_exproto, load}
+ , {emqx_prometheus, load}
+ , {emqx_psk_file, load}
+ , {emqx_plugin_template, load}
+ , {observer, load}
+ , luerl
+ , xmerl
+ ]
+ },
+
+ {"BASIC_OVERLAYS", elems,
+ [ {mkdir,"etc/"}
+ , {mkdir,"etc/emqx.d/"}
+ , {mkdir,"log/"}
+ , {mkdir,"data/"}
+ , {mkdir,"data/mnesia"}
+ , {mkdir,"data/configs"}
+ , {mkdir,"data/scripts"}
+ , {template, "data/loaded_plugins.tmpl", "data/loaded_plugins"}
+ , {template, "data/loaded_modules.tmpl", "data/loaded_modules"}
+ , {template,"data/emqx_vars","releases/emqx_vars"}
+ , {copy,"{{lib_dirs}}/cuttlefish/cuttlefish","bin/"}
+ , {copy,"bin/*","bin/"}
+ , {template,"etc/*.conf","etc/"}
+ , {template,"etc/emqx.d/*.conf","etc/emqx.d/"}
+ , {copy,"{{lib_dirs}}/emqx/priv/emqx.schema","releases/{{rel_vsn}}/"}
+ , {copy, "etc/certs","etc/"}
+ , "${RELUP_OVERLAYS}"
+ ]
+ },
+
+ {"RELUP_OVERLAYS", elems,
+ [ {copy,"bin/emqx.cmd","bin/emqx.cmd-{{rel_vsn}}"}
+ , {copy,"bin/emqx_ctl.cmd","bin/emqx_ctl.cmd-{{rel_vsn}}"}
+ , {copy,"bin/emqx","bin/emqx-{{rel_vsn}}"}
+ , {copy,"bin/emqx_ctl","bin/emqx_ctl-{{rel_vsn}}"}
+ , {copy,"bin/install_upgrade.escript", "bin/install_upgrade.escript-{{rel_vsn}}"}
+ , {copy,"bin/nodetool","bin/nodetool-{{rel_vsn}}"}
+ , {copy,"{{lib_dirs}}/cuttlefish/cuttlefish","bin/cuttlefish-{{rel_vsn}}"}
+ ]
+ },
+
+ {"CLOUD_OVERLAYS", elems,
+ [ {template,"etc/emqx_cloud.d/*.conf","etc/emqx.d/"}
+ , {template,"etc/emqx_cloud.d/vm.args","etc/vm.args"}
+ ]
+ },
+
+ {"EDGE_OVERLAYS", elems,
+ [ {template,"etc/emqx_edge.d/*.conf","etc/emqx.d/"}
+ , {template,"etc/emqx_edge.d/vm.args.edge","etc/vm.args"}
+ ]
+ },
+
+ {"GIT_DESC", var, fun mod_project:get_vsn/1},
+
+ {"COVERALL_CONFIGS", elems, fun mod_project:coveralls_configs/1}
+
+]}.