diff --git a/apps/emqx_authn/mix.exs b/apps/emqx_authn/mix.exs index 75a7735b7..745a61b86 100644 --- a/apps/emqx_authn/mix.exs +++ b/apps/emqx_authn/mix.exs @@ -1,4 +1,4 @@ -defmodule EMQXAuthentication.MixProject do +defmodule EMQXAuthn.MixProject do use Mix.Project def project do @@ -18,7 +18,8 @@ defmodule EMQXAuthentication.MixProject do def application do [ - mod: {:emqx_authentication_app, []}, + registered: [:emqx_authn_sup, :emqx_authn_registry], + mod: {:emqx_authn_app, []}, extra_applications: [:logger] ] end @@ -26,7 +27,11 @@ defmodule EMQXAuthentication.MixProject do defp deps do [ {:emqx, in_umbrella: true, runtime: false}, + {:emqx_resource, in_umbrella: true}, {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.2.1"}, + {:esasl, github: "emqx/esasl", tag: "0.1.0"}, + {:epgsql, github: "epgsql/epgsql", tag: "4.4.0"}, + {:mysql, github: "emqx/mysql-otp", tag: "1.7.1"}, {:jose, "~> 1.11"} ] end diff --git a/apps/emqx_coap/.formatter.exs b/apps/emqx_coap/.formatter.exs deleted file mode 100644 index d2cda26ed..000000000 --- a/apps/emqx_coap/.formatter.exs +++ /dev/null @@ -1,4 +0,0 @@ -# Used by "mix format" -[ - inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"] -] diff --git a/apps/emqx_coap/mix.exs b/apps/emqx_coap/mix.exs deleted file mode 100644 index d5c6cbf20..000000000 --- a/apps/emqx_coap/mix.exs +++ /dev/null @@ -1,33 +0,0 @@ -defmodule EMQXCoap.MixProject do - use Mix.Project - - def project do - [ - app: :emqx_coap, - version: "4.3.0", - build_path: "../../_build", - config_path: "../../config/config.exs", - deps_path: "../../deps", - lockfile: "../../mix.lock", - elixir: "~> 1.12", - start_permanent: Mix.env() == :prod, - deps: deps(), - description: "EMQ X CoAP Gateway" - ] - end - - def application do - [ - mod: {:emqx_coap_app, []}, - extra_applications: [:logger] - ] - end - - defp deps do - [ - {:emqx, in_umbrella: true, runtime: false}, - {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.2.1"}, - {:gen_coap, github: "emqx/gen_coap", tag: "v0.3.2"} - ] - end -end diff --git a/apps/emqx_dashboard/mix.exs b/apps/emqx_dashboard/mix.exs index 5216ad8c0..fe954dcdd 100644 --- a/apps/emqx_dashboard/mix.exs +++ b/apps/emqx_dashboard/mix.exs @@ -27,7 +27,7 @@ defmodule EMQXDashboard.MixProject do defp deps do [ {:emqx, in_umbrella: true, runtime: false}, - {:minirest, github: "emqx/minirest", tag: "0.3.5"} + {:minirest, github: "emqx/minirest", tag: "1.1.7"} ] end end diff --git a/apps/emqx_exhook/.gitignore b/apps/emqx_exhook/.gitignore deleted file mode 100644 index da1f0db23..000000000 --- a/apps/emqx_exhook/.gitignore +++ /dev/null @@ -1,29 +0,0 @@ -.rebar3 -_* -.eunit -*.o -*.beam -*.plt -*.swp -*.swo -.erlang.cookie -ebin -log -erl_crash.dump -.rebar -logs -_build -.idea -*.iml -rebar3.crashdump -*~ -rebar.lock -data/ -*.conf.rendered -*.pyc -.DS_Store -*.class -Mnesia.nonode@nohost/ -src/emqx_exhook_pb.erl -src/emqx_exhook_v_1_hook_provider_client.erl -src/emqx_exhook_v_1_hook_provider_bhvr.erl diff --git a/apps/emqx_exhook/README.md b/apps/emqx_exhook/README.md deleted file mode 100644 index 216c39275..000000000 --- a/apps/emqx_exhook/README.md +++ /dev/null @@ -1,39 +0,0 @@ -# emqx_exhook - -The `emqx_exhook` extremly enhance the extensibility for EMQ X. It allow using an others programming language to mount the hooks intead of erlang. - -## Feature - -- [x] Based on gRPC, it brings a very wide range of applicability -- [x] Allows you to use the return value to extend emqx behavior. - -## Architecture - -``` -EMQ X Third-party Runtime -+========================+ +========+==========+ -| ExHook | | | | -| +----------------+ | gRPC | gRPC | User's | -| | gPRC Client | ------------------> | Server | Codes | -| +----------------+ | (HTTP/2) | | | -| | | | | -+========================+ +========+==========+ -``` - -## Usage - -### gRPC service - -See: `priv/protos/exhook.proto` - -### CLI - -## Example - -## Recommended gRPC Framework - -See: https://github.com/grpc-ecosystem/awesome-grpc - -## Thanks - -- [grpcbox](https://github.com/tsloughter/grpcbox) diff --git a/apps/emqx_exhook/docs/design-cn.md b/apps/emqx_exhook/docs/design-cn.md deleted file mode 100644 index 423a53bf5..000000000 --- a/apps/emqx_exhook/docs/design-cn.md +++ /dev/null @@ -1,112 +0,0 @@ -# 设计 - -## 动机 - -在 EMQ X Broker v4.1-v4.2 中,我们发布了 2 个插件来扩展 emqx 的编程能力: - -1. `emqx-extension-hook` 提供了使用 Java, Python 向 Broker 挂载钩子的功能 -2. `emqx-exproto` 提供了使用 Java,Python 编写用户自定义协议接入插件的功能 - -但在后续的支持中发现许多难以处理的问题: - -1. 有大量的编程语言需要支持,需要编写和维护如 Go, JavaScript, Lua.. 等语言的驱动。 -2. `erlport` 使用的操作系统的管道进行通信,这让用户代码只能部署在和 emqx 同一个操作系统上。部署方式受到了极大的限制。 -3. 用户程序的启动参数直接打包到 Broker 中,导致用户开发无法实时的进行调试,单步跟踪等。 -4. `erlport` 会占用 `stdin` `stdout`。 - -因此,我们计划重构这部分的实现,其中主要的内容是: -1. 使用 `gRPC` 替换 `erlport`。 -2. 将 `emqx-extension-hook` 重命名为 `emqx-exhook` - - -旧版本的设计:[emqx-extension-hook design in v4.2.0](https://github.com/emqx/emqx-exhook/blob/v4.2.0/docs/design.md) - -## 设计 - -架构如下: - -``` - EMQ X -+========================+ +========+==========+ -| ExHook | | | | -| +----------------+ | gRPC | gRPC | User's | -| | gRPC Client | ------------------> | Server | Codes | -| +----------------+ | (HTTP/2) | | | -| | | | | -+========================+ +========+==========+ -``` - -`emqx-exhook` 通过 gRPC 的方式向用户部署的 gRPC 服务发送钩子的请求,并处理其返回的值。 - - -和 emqx 原生的钩子一致,emqx-exhook 也按照链式的方式执行: - - - -### gRPC 服务示例 - -用户需要实现的方法,和数据类型的定义在 `priv/protos/exhook.proto` 文件中: - -```protobuff -syntax = "proto3"; - -package emqx.exhook.v1; - -service HookProvider { - - rpc OnProviderLoaded(ProviderLoadedRequest) returns (LoadedResponse) {}; - - rpc OnProviderUnloaded(ProviderUnloadedRequest) returns (EmptySuccess) {}; - - rpc OnClientConnect(ClientConnectRequest) returns (EmptySuccess) {}; - - rpc OnClientConnack(ClientConnackRequest) returns (EmptySuccess) {}; - - rpc OnClientConnected(ClientConnectedRequest) returns (EmptySuccess) {}; - - rpc OnClientDisconnected(ClientDisconnectedRequest) returns (EmptySuccess) {}; - - rpc OnClientAuthenticate(ClientAuthenticateRequest) returns (ValuedResponse) {}; - - rpc OnClientAuthorize(ClientAuthorizeRequest) returns (ValuedResponse) {}; - - rpc OnClientSubscribe(ClientSubscribeRequest) returns (EmptySuccess) {}; - - rpc OnClientUnsubscribe(ClientUnsubscribeRequest) returns (EmptySuccess) {}; - - rpc OnSessionCreated(SessionCreatedRequest) returns (EmptySuccess) {}; - - rpc OnSessionSubscribed(SessionSubscribedRequest) returns (EmptySuccess) {}; - - rpc OnSessionUnsubscribed(SessionUnsubscribedRequest) returns (EmptySuccess) {}; - - rpc OnSessionResumed(SessionResumedRequest) returns (EmptySuccess) {}; - - rpc OnSessionDiscarded(SessionDiscardedRequest) returns (EmptySuccess) {}; - - rpc OnSessionTakeovered(SessionTakeoveredRequest) returns (EmptySuccess) {}; - - rpc OnSessionTerminated(SessionTerminatedRequest) returns (EmptySuccess) {}; - - rpc OnMessagePublish(MessagePublishRequest) returns (ValuedResponse) {}; - - rpc OnMessageDelivered(MessageDeliveredRequest) returns (EmptySuccess) {}; - - rpc OnMessageDropped(MessageDroppedRequest) returns (EmptySuccess) {}; - - rpc OnMessageAcked(MessageAckedRequest) returns (EmptySuccess) {}; -} -``` - -### 配置文件示例 - -``` -exhook: { - ## 配置 gRPC 服务地址 (HTTP) - ## - ## default 为服务器的名称 - server.default: { - url: "http://127.0.0.1:9000" - } -} -``` diff --git a/apps/emqx_exhook/etc/emqx_exhook.conf b/apps/emqx_exhook/etc/emqx_exhook.conf deleted file mode 100644 index 648eb554f..000000000 --- a/apps/emqx_exhook/etc/emqx_exhook.conf +++ /dev/null @@ -1,16 +0,0 @@ -##==================================================================== -## EMQ X Hooks -##==================================================================== - -exhook: { - servers: [ - # { name: "default" - # url: "http://127.0.0.1:9000" - # #ssl: { - # # cacertfile: "{{ platform_etc_dir }}/certs/cacert.pem" - # # certfile: "{{ platform_etc_dir }}/certs/cert.pem" - # # keyfile: "{{ platform_etc_dir }}/certs/key.pem" - # #} - # } - ] -} diff --git a/apps/emqx_exhook/include/emqx_exhook.hrl b/apps/emqx_exhook/include/emqx_exhook.hrl deleted file mode 100644 index 64131735e..000000000 --- a/apps/emqx_exhook/include/emqx_exhook.hrl +++ /dev/null @@ -1,44 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --ifndef(EMQX_EXHOOK_HRL). --define(EMQX_EXHOOK_HRL, true). - --define(APP, emqx_exhook). - --define(ENABLED_HOOKS, - [ {'client.connect', {emqx_exhook_handler, on_client_connect, []}} - , {'client.connack', {emqx_exhook_handler, on_client_connack, []}} - , {'client.connected', {emqx_exhook_handler, on_client_connected, []}} - , {'client.disconnected', {emqx_exhook_handler, on_client_disconnected, []}} - , {'client.authenticate', {emqx_exhook_handler, on_client_authenticate, []}} - , {'client.authorize', {emqx_exhook_handler, on_client_authorize, []}} - , {'client.subscribe', {emqx_exhook_handler, on_client_subscribe, []}} - , {'client.unsubscribe', {emqx_exhook_handler, on_client_unsubscribe, []}} - , {'session.created', {emqx_exhook_handler, on_session_created, []}} - , {'session.subscribed', {emqx_exhook_handler, on_session_subscribed, []}} - , {'session.unsubscribed',{emqx_exhook_handler, on_session_unsubscribed, []}} - , {'session.resumed', {emqx_exhook_handler, on_session_resumed, []}} - , {'session.discarded', {emqx_exhook_handler, on_session_discarded, []}} - , {'session.takeovered', {emqx_exhook_handler, on_session_takeovered, []}} - , {'session.terminated', {emqx_exhook_handler, on_session_terminated, []}} - , {'message.publish', {emqx_exhook_handler, on_message_publish, []}} - , {'message.delivered', {emqx_exhook_handler, on_message_delivered, []}} - , {'message.acked', {emqx_exhook_handler, on_message_acked, []}} - , {'message.dropped', {emqx_exhook_handler, on_message_dropped, []}} - ]). - --endif. diff --git a/apps/emqx_exhook/priv/protos/exhook.proto b/apps/emqx_exhook/priv/protos/exhook.proto deleted file mode 100644 index 5e931054c..000000000 --- a/apps/emqx_exhook/priv/protos/exhook.proto +++ /dev/null @@ -1,407 +0,0 @@ -//------------------------------------------------------------------------------ -// Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -//------------------------------------------------------------------------------ - -syntax = "proto3"; - -option csharp_namespace = "Emqx.Exhook.V1"; -option go_package = "emqx.io/grpc/exhook"; -option java_multiple_files = true; -option java_package = "io.emqx.exhook"; -option java_outer_classname = "EmqxExHookProto"; - -package emqx.exhook.v1; - -service HookProvider { - - rpc OnProviderLoaded(ProviderLoadedRequest) returns (LoadedResponse) {}; - - rpc OnProviderUnloaded(ProviderUnloadedRequest) returns (EmptySuccess) {}; - - rpc OnClientConnect(ClientConnectRequest) returns (EmptySuccess) {}; - - rpc OnClientConnack(ClientConnackRequest) returns (EmptySuccess) {}; - - rpc OnClientConnected(ClientConnectedRequest) returns (EmptySuccess) {}; - - rpc OnClientDisconnected(ClientDisconnectedRequest) returns (EmptySuccess) {}; - - rpc OnClientAuthenticate(ClientAuthenticateRequest) returns (ValuedResponse) {}; - - rpc OnClientAuthorize(ClientAuthorizeRequest) returns (ValuedResponse) {}; - - rpc OnClientSubscribe(ClientSubscribeRequest) returns (EmptySuccess) {}; - - rpc OnClientUnsubscribe(ClientUnsubscribeRequest) returns (EmptySuccess) {}; - - rpc OnSessionCreated(SessionCreatedRequest) returns (EmptySuccess) {}; - - rpc OnSessionSubscribed(SessionSubscribedRequest) returns (EmptySuccess) {}; - - rpc OnSessionUnsubscribed(SessionUnsubscribedRequest) returns (EmptySuccess) {}; - - rpc OnSessionResumed(SessionResumedRequest) returns (EmptySuccess) {}; - - rpc OnSessionDiscarded(SessionDiscardedRequest) returns (EmptySuccess) {}; - - rpc OnSessionTakeovered(SessionTakeoveredRequest) returns (EmptySuccess) {}; - - rpc OnSessionTerminated(SessionTerminatedRequest) returns (EmptySuccess) {}; - - rpc OnMessagePublish(MessagePublishRequest) returns (ValuedResponse) {}; - - rpc OnMessageDelivered(MessageDeliveredRequest) returns (EmptySuccess) {}; - - rpc OnMessageDropped(MessageDroppedRequest) returns (EmptySuccess) {}; - - rpc OnMessageAcked(MessageAckedRequest) returns (EmptySuccess) {}; -} - -//------------------------------------------------------------------------------ -// Request & Response -//------------------------------------------------------------------------------ - -message ProviderLoadedRequest { - - BrokerInfo broker = 1; -} - -message LoadedResponse { - - repeated HookSpec hooks = 1; -} - -message ProviderUnloadedRequest { } - -message ClientConnectRequest { - - ConnInfo conninfo = 1; - - // MQTT CONNECT packet's properties (MQTT v5.0) - // - // It should be empty on MQTT v3.1.1/v3.1 or others protocol - repeated Property props = 2; -} - -message ClientConnackRequest { - - ConnInfo conninfo = 1; - - string result_code = 2; - - repeated Property props = 3; -} - -message ClientConnectedRequest { - - ClientInfo clientinfo = 1; -} - -message ClientDisconnectedRequest { - - ClientInfo clientinfo = 1; - - string reason = 2; -} - -message ClientAuthenticateRequest { - - ClientInfo clientinfo = 1; - - bool result = 2; -} - -message ClientAuthorizeRequest { - - ClientInfo clientinfo = 1; - - enum AuthorizeReqType { - - PUBLISH = 0; - - SUBSCRIBE = 1; - } - - AuthorizeReqType type = 2; - - string topic = 3; - - bool result = 4; -} - -message ClientSubscribeRequest { - - ClientInfo clientinfo = 1; - - repeated Property props = 2; - - repeated TopicFilter topic_filters = 3; -} - -message ClientUnsubscribeRequest { - - ClientInfo clientinfo = 1; - - repeated Property props = 2; - - repeated TopicFilter topic_filters = 3; -} - -message SessionCreatedRequest { - - ClientInfo clientinfo = 1; -} - -message SessionSubscribedRequest { - - ClientInfo clientinfo = 1; - - string topic = 2; - - SubOpts subopts = 3; -} - -message SessionUnsubscribedRequest { - - ClientInfo clientinfo = 1; - - string topic = 2; -} - -message SessionResumedRequest { - - ClientInfo clientinfo = 1; -} - -message SessionDiscardedRequest { - - ClientInfo clientinfo = 1; -} - -message SessionTakeoveredRequest { - - ClientInfo clientinfo = 1; -} - -message SessionTerminatedRequest { - - ClientInfo clientinfo = 1; - - string reason = 2; -} - -message MessagePublishRequest { - - Message message = 1; -} - -message MessageDeliveredRequest { - - ClientInfo clientinfo = 1; - - Message message = 2; -} - -message MessageDroppedRequest { - - Message message = 1; - - string reason = 2; -} - -message MessageAckedRequest { - - ClientInfo clientinfo = 1; - - Message message = 2; -} - -//------------------------------------------------------------------------------ -// Basic data types -//------------------------------------------------------------------------------ - -message EmptySuccess { } - -message ValuedResponse { - - // The responsed value type - // - contiune: Use the responsed value and execute the next hook - // - ignore: Ignore the responsed value - // - stop_and_return: Use the responsed value and stop the chain executing - enum ResponsedType { - - CONTINUE = 0; - - IGNORE = 1; - - STOP_AND_RETURN = 2; - } - - ResponsedType type = 1; - - oneof value { - - // Boolean result, used on the 'client.authenticate', 'client.authorize' hooks - bool bool_result = 3; - - // Message result, used on the 'message.*' hooks - Message message = 4; - } -} - -message BrokerInfo { - - string version = 1; - - string sysdescr = 2; - - int64 uptime = 3; - - string datetime = 4; -} - -message HookSpec { - - // The registered hooks name - // - // Available value: - // "client.connect", "client.connack" - // "client.connected", "client.disconnected" - // "client.authenticate", "client.authorize" - // "client.subscribe", "client.unsubscribe" - // - // "session.created", "session.subscribed" - // "session.unsubscribed", "session.resumed" - // "session.discarded", "session.takeovered" - // "session.terminated" - // - // "message.publish", "message.delivered" - // "message.acked", "message.dropped" - string name = 1; - - // The topic filters for message hooks - repeated string topics = 2; -} - -message ConnInfo { - - string node = 1; - - string clientid = 2; - - string username = 3; - - string peerhost = 4; - - uint32 sockport = 5; - - string proto_name = 6; - - string proto_ver = 7; - - uint32 keepalive = 8; -} - -message ClientInfo { - - string node = 1; - - string clientid = 2; - - string username = 3; - - string password = 4; - - string peerhost = 5; - - uint32 sockport = 6; - - string protocol = 7; - - string mountpoint = 8; - - bool is_superuser = 9; - - bool anonymous = 10; - - // common name of client TLS cert - string cn = 11; - - // subject of client TLS cert - string dn = 12; -} - -message Message { - - string node = 1; - - string id = 2; - - uint32 qos = 3; - - string from = 4; - - string topic = 5; - - bytes payload = 6; - - uint64 timestamp = 7; -} - -message Property { - - string name = 1; - - string value = 2; -} - -message TopicFilter { - - string name = 1; - - uint32 qos = 2; -} - -message SubOpts { - - // The QoS level - uint32 qos = 1; - - // The group name for shared subscription - string share = 2; - - // The Retain Handling option (MQTT v5.0) - // - // 0 = Send retained messages at the time of the subscribe - // 1 = Send retained messages at subscribe only if the subscription does - // not currently exist - // 2 = Do not send retained messages at the time of the subscribe - uint32 rh = 3; - - // The Retain as Published option (MQTT v5.0) - // - // If 1, Application Messages forwarded using this subscription keep the - // RETAIN flag they were published with. - // If 0, Application Messages forwarded using this subscription have the - // RETAIN flag set to 0. - // Retained messages sent when the subscription is established have the RETAIN flag set to 1. - uint32 rap = 4; - - // The No Local option (MQTT v5.0) - // - // If the value is 1, Application Messages MUST NOT be forwarded to a - // connection with a ClientID equal to the ClientID of the publishing - uint32 nl = 5; -} diff --git a/apps/emqx_exhook/rebar.config b/apps/emqx_exhook/rebar.config deleted file mode 100644 index 89dcb20a7..000000000 --- a/apps/emqx_exhook/rebar.config +++ /dev/null @@ -1,41 +0,0 @@ -%%-*- mode: erlang -*- -{plugins, - [rebar3_proper, - {grpc_plugin, {git, "https://github.com/HJianBo/grpc_plugin", {tag, "v0.10.2"}}} -]}. - -{deps, - [{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.2"}}} -]}. - -{grpc, - [{protos, ["priv/protos"]}, - {gpb_opts, [{module_name_prefix, "emqx_"}, - {module_name_suffix, "_pb"}]} -]}. - -{provider_hooks, - [{pre, [{compile, {grpc, gen}}, - {clean, {grpc, clean}}]} -]}. - -{edoc_opts, [{preprocess, true}]}. - -{erl_opts, [warn_unused_vars, - warn_shadow_vars, - warn_unused_import, - warn_obsolete_guard, - debug_info, - {parse_transform}]}. - -{xref_checks, [undefined_function_calls, undefined_functions, - locals_not_used, deprecated_function_calls, - warnings_as_errors, deprecated_functions]}. -{xref_ignores, [emqx_exhook_pb]}. - -{cover_enabled, true}. -{cover_opts, [verbose]}. -{cover_export_enabled, true}. -{cover_excl_mods, [emqx_exhook_pb, - emqx_exhook_v_1_hook_provider_bhvr, - emqx_exhook_v_1_hook_provider_client]}. diff --git a/apps/emqx_exhook/src/emqx_exhook.app.src b/apps/emqx_exhook/src/emqx_exhook.app.src deleted file mode 100644 index c306a5ea4..000000000 --- a/apps/emqx_exhook/src/emqx_exhook.app.src +++ /dev/null @@ -1,12 +0,0 @@ -{application, emqx_exhook, - [{description, "EMQ X Extension for Hook"}, - {vsn, "5.0.0"}, - {modules, []}, - {registered, []}, - {mod, {emqx_exhook_app, []}}, - {applications, [kernel,stdlib,grpc,emqx]}, - {env,[]}, - {licenses, ["Apache-2.0"]}, - {maintainers, ["EMQ X Team "]}, - {links, [{"Homepage", "https://emqx.io/"}]} - ]}. diff --git a/apps/emqx_exhook/src/emqx_exhook.appup.src b/apps/emqx_exhook/src/emqx_exhook.appup.src deleted file mode 100644 index 9e142d9e2..000000000 --- a/apps/emqx_exhook/src/emqx_exhook.appup.src +++ /dev/null @@ -1,9 +0,0 @@ -%% -*-: erlang -*- -{VSN, - [ - {<<".*">>, []} - ], - [ - {<<".*">>, []} - ] -}. diff --git a/apps/emqx_exhook/src/emqx_exhook.erl b/apps/emqx_exhook/src/emqx_exhook.erl deleted file mode 100644 index b370c6e27..000000000 --- a/apps/emqx_exhook/src/emqx_exhook.erl +++ /dev/null @@ -1,131 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_exhook). - --include("emqx_exhook.hrl"). --include_lib("emqx/include/logger.hrl"). - - -%% Mgmt APIs --export([ enable/2 - , disable/1 - , disable_all/0 - , list/0 - ]). - --export([ cast/2 - , call_fold/3 - ]). - -%%-------------------------------------------------------------------- -%% Mgmt APIs -%%-------------------------------------------------------------------- - -%% XXX: Only return the running servers --spec list() -> [emqx_exhook_server:server()]. -list() -> - [server(Name) || Name <- running()]. - --spec enable(binary(), map()) -> ok | {error, term()}. -enable(Name, Options) -> - case lists:member(Name, running()) of - true -> - {error, already_started}; - _ -> - case emqx_exhook_server:load(Name, Options) of - {ok, ServiceState} -> - save(Name, ServiceState); - {error, Reason} -> - ?LOG(error, "Load server ~p failed: ~p", [Name, Reason]), - {error, Reason} - end - end. - --spec disable(binary()) -> ok | {error, term()}. -disable(Name) -> - case server(Name) of - undefined -> {error, not_running}; - Service -> - ok = emqx_exhook_server:unload(Service), - unsave(Name) - end. - --spec disable_all() -> ok. -disable_all() -> - lists:foreach(fun disable/1, running()). - -%%---------------------------------------------------------- -%% Dispatch APIs -%%---------------------------------------------------------- - --spec cast(atom(), map()) -> ok. -cast(Hookpoint, Req) -> - cast(Hookpoint, Req, running()). - -cast(_, _, []) -> - ok; -cast(Hookpoint, Req, [ServiceName|More]) -> - %% XXX: Need a real asynchronous running - _ = emqx_exhook_server:call(Hookpoint, Req, server(ServiceName)), - cast(Hookpoint, Req, More). - --spec call_fold(atom(), term(), function()) - -> {ok, term()} - | {stop, term()}. -call_fold(Hookpoint, Req, AccFun) -> - call_fold(Hookpoint, Req, AccFun, running()). - -call_fold(_, Req, _, []) -> - {ok, Req}; -call_fold(Hookpoint, Req, AccFun, [ServiceName|More]) -> - case emqx_exhook_server:call(Hookpoint, Req, server(ServiceName)) of - {ok, Resp} -> - case AccFun(Req, Resp) of - {stop, NReq} -> {stop, NReq}; - {ok, NReq} -> call_fold(Hookpoint, NReq, AccFun, More); - _ -> call_fold(Hookpoint, Req, AccFun, More) - end; - _ -> - call_fold(Hookpoint, Req, AccFun, More) - end. - -%%---------------------------------------------------------- -%% Storage - -save(Name, ServiceState) -> - Saved = persistent_term:get(?APP, []), - persistent_term:put(?APP, lists:reverse([Name | Saved])), - persistent_term:put({?APP, Name}, ServiceState). - -unsave(Name) -> - case persistent_term:get(?APP, []) of - [] -> - persistent_term:erase(?APP); - Saved -> - persistent_term:put(?APP, lists:delete(Name, Saved)) - end, - persistent_term:erase({?APP, Name}), - ok. - -running() -> - persistent_term:get(?APP, []). - -server(Name) -> - case catch persistent_term:get({?APP, Name}) of - {'EXIT', {badarg,_}} -> undefined; - Service -> Service - end. diff --git a/apps/emqx_exhook/src/emqx_exhook_app.erl b/apps/emqx_exhook/src/emqx_exhook_app.erl deleted file mode 100644 index c97b26677..000000000 --- a/apps/emqx_exhook/src/emqx_exhook_app.erl +++ /dev/null @@ -1,97 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_exhook_app). - --behaviour(application). - --include("emqx_exhook.hrl"). - --define(CNTER, emqx_exhook_counter). - --export([ start/2 - , stop/1 - , prep_stop/1 - ]). - -%% Internal export --export([ load_server/2 - , unload_server/1 - , unload_exhooks/0 - , init_hooks_cnter/0 - ]). - -%%-------------------------------------------------------------------- -%% Application callbacks -%%-------------------------------------------------------------------- - -start(_StartType, _StartArgs) -> - {ok, Sup} = emqx_exhook_sup:start_link(), - - %% Init counter - init_hooks_cnter(), - - %% Load all dirvers - load_all_servers(), - - %% Register CLI - emqx_ctl:register_command(exhook, {emqx_exhook_cli, cli}, []), - {ok, Sup}. - -prep_stop(State) -> - emqx_ctl:unregister_command(exhook), - _ = unload_exhooks(), - ok = unload_all_servers(), - State. - -stop(_State) -> - ok. - -%%-------------------------------------------------------------------- -%% Internal funcs -%%-------------------------------------------------------------------- - -load_all_servers() -> - try - lists:foreach(fun(#{name := Name} = Options) -> - load_server(Name, maps:remove(name, Options)) - end, emqx_config:get([exhook, servers])) - catch - _Class : _Reason -> - ok - end, ok. - -unload_all_servers() -> - emqx_exhook:disable_all(). - -load_server(Name, Options) -> - emqx_exhook:enable(Name, Options). - -unload_server(Name) -> - emqx_exhook:disable(Name). - -unload_exhooks() -> - [emqx:unhook(Name, {M, F}) || - {Name, {M, F, _A}} <- ?ENABLED_HOOKS]. - -init_hooks_cnter() -> - try - _ = ets:new(?CNTER, [named_table, public]), ok - catch - error:badarg:_ -> - ok - end. - diff --git a/apps/emqx_exhook/src/emqx_exhook_cli.erl b/apps/emqx_exhook/src/emqx_exhook_cli.erl deleted file mode 100644 index 0290d00ea..000000000 --- a/apps/emqx_exhook/src/emqx_exhook_cli.erl +++ /dev/null @@ -1,89 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_exhook_cli). - --include("emqx_exhook.hrl"). - --export([cli/1]). - -cli(["server", "list"]) -> - if_enabled(fun() -> - Services = emqx_exhook:list(), - [emqx_ctl:print("HookServer(~s)~n", - [emqx_exhook_server:format(Service)]) || Service <- Services] - end); - -cli(["server", "enable", Name0]) -> - if_enabled(fun() -> - Name = iolist_to_binary(Name0), - case find_server_options(Name) of - undefined -> - emqx_ctl:print("not_found~n"); - Opts -> - print(emqx_exhook:enable(Name, Opts)) - end - end); - -cli(["server", "disable", Name]) -> - if_enabled(fun() -> - print(emqx_exhook:disable(iolist_to_binary(Name))) - end); - -cli(["server", "stats"]) -> - if_enabled(fun() -> - [emqx_ctl:print("~-35s:~w~n", [Name, N]) || {Name, N} <- stats()] - end); - -cli(_) -> - emqx_ctl:usage([{"exhook server list", "List all running exhook server"}, - {"exhook server enable ", "Enable a exhook server in the configuration"}, - {"exhook server disable ", "Disable a exhook server"}, - {"exhook server stats", "Print exhook server statistic"}]). - -print(ok) -> - emqx_ctl:print("ok~n"); -print({error, Reason}) -> - emqx_ctl:print("~p~n", [Reason]). - -find_server_options(Name) -> - Ls = emqx_config:get([exhook, servers]), - case [ E || E = #{name := N} <- Ls, N =:= Name] of - [] -> undefined; - [Options] -> - maps:remove(name, Options) - end. - -%%-------------------------------------------------------------------- -%% Internal funcs -%%-------------------------------------------------------------------- - -if_enabled(Fun) -> - case lists:keymember(?APP, 1, application:which_applications()) of - true -> Fun(); - _ -> hint() - end. - -hint() -> - emqx_ctl:print("Please './bin/emqx_ctl plugins load emqx_exhook' first.~n"). - -stats() -> - lists:usort(lists:foldr(fun({K, N}, Acc) -> - case atom_to_list(K) of - "exhook." ++ Key -> [{Key, N} | Acc]; - _ -> Acc - end - end, [], emqx_metrics:all())). diff --git a/apps/emqx_exhook/src/emqx_exhook_handler.erl b/apps/emqx_exhook/src/emqx_exhook_handler.erl deleted file mode 100644 index 1e81646e0..000000000 --- a/apps/emqx_exhook/src/emqx_exhook_handler.erl +++ /dev/null @@ -1,320 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_exhook_handler). - --include("emqx_exhook.hrl"). --include_lib("emqx/include/emqx.hrl"). --include_lib("emqx/include/logger.hrl"). - - --export([ on_client_connect/2 - , on_client_connack/3 - , on_client_connected/2 - , on_client_disconnected/3 - , on_client_authenticate/2 - , on_client_authorize/4 - , on_client_subscribe/3 - , on_client_unsubscribe/3 - ]). - -%% Session Lifecircle Hooks --export([ on_session_created/2 - , on_session_subscribed/3 - , on_session_unsubscribed/3 - , on_session_resumed/2 - , on_session_discarded/2 - , on_session_takeovered/2 - , on_session_terminated/3 - ]). - --export([ on_message_publish/1 - , on_message_dropped/3 - , on_message_delivered/2 - , on_message_acked/2 - ]). - -%% Utils --export([ message/1 - , stringfy/1 - , merge_responsed_bool/2 - , merge_responsed_message/2 - , assign_to_message/2 - , clientinfo/1 - ]). - --import(emqx_exhook, - [ cast/2 - , call_fold/3 - ]). - -%%-------------------------------------------------------------------- -%% Clients -%%-------------------------------------------------------------------- - -on_client_connect(ConnInfo, Props) -> - Req = #{conninfo => conninfo(ConnInfo), - props => properties(Props) - }, - cast('client.connect', Req). - -on_client_connack(ConnInfo, Rc, Props) -> - Req = #{conninfo => conninfo(ConnInfo), - result_code => stringfy(Rc), - props => properties(Props)}, - cast('client.connack', Req). - -on_client_connected(ClientInfo, _ConnInfo) -> - Req = #{clientinfo => clientinfo(ClientInfo)}, - cast('client.connected', Req). - -on_client_disconnected(ClientInfo, Reason, _ConnInfo) -> - Req = #{clientinfo => clientinfo(ClientInfo), - reason => stringfy(Reason) - }, - cast('client.disconnected', Req). - -on_client_authenticate(ClientInfo, AuthResult) -> - %% XXX: Bool is missing more information about the atom of the result - %% So, the `Req` has missed detailed info too. - %% - %% The return value of `call_fold` just a bool, that has missed - %% detailed info too. - %% - Bool = AuthResult == ok, - Req = #{clientinfo => clientinfo(ClientInfo), - result => Bool - }, - - case call_fold('client.authenticate', Req, - fun merge_responsed_bool/2) of - {StopOrOk, #{result := Result0}} when is_boolean(Result0) -> - Result = case Result0 of true -> ok; _ -> {error, not_authorized} end, - {StopOrOk, Result}; - _ -> - {ok, AuthResult} - end. - -on_client_authorize(ClientInfo, PubSub, Topic, Result) -> - Bool = Result == allow, - Type = case PubSub of - publish -> 'PUBLISH'; - subscribe -> 'SUBSCRIBE' - end, - Req = #{clientinfo => clientinfo(ClientInfo), - type => Type, - topic => Topic, - result => Bool - }, - case call_fold('client.authorize', Req, - fun merge_responsed_bool/2) of - {StopOrOk, #{result := Result0}} when is_boolean(Result0) -> - NResult = case Result0 of true -> allow; _ -> deny end, - {StopOrOk, NResult}; - _ -> {ok, Result} - end. - -on_client_subscribe(ClientInfo, Props, TopicFilters) -> - Req = #{clientinfo => clientinfo(ClientInfo), - props => properties(Props), - topic_filters => topicfilters(TopicFilters) - }, - cast('client.subscribe', Req). - -on_client_unsubscribe(ClientInfo, Props, TopicFilters) -> - Req = #{clientinfo => clientinfo(ClientInfo), - props => properties(Props), - topic_filters => topicfilters(TopicFilters) - }, - cast('client.unsubscribe', Req). - -%%-------------------------------------------------------------------- -%% Session -%%-------------------------------------------------------------------- - -on_session_created(ClientInfo, _SessInfo) -> - Req = #{clientinfo => clientinfo(ClientInfo)}, - cast('session.created', Req). - -on_session_subscribed(ClientInfo, Topic, SubOpts) -> - Req = #{clientinfo => clientinfo(ClientInfo), - topic => Topic, - subopts => maps:with([qos, share, rh, rap, nl], SubOpts) - }, - cast('session.subscribed', Req). - -on_session_unsubscribed(ClientInfo, Topic, _SubOpts) -> - Req = #{clientinfo => clientinfo(ClientInfo), - topic => Topic - }, - cast('session.unsubscribed', Req). - -on_session_resumed(ClientInfo, _SessInfo) -> - Req = #{clientinfo => clientinfo(ClientInfo)}, - cast('session.resumed', Req). - -on_session_discarded(ClientInfo, _SessInfo) -> - Req = #{clientinfo => clientinfo(ClientInfo)}, - cast('session.discarded', Req). - -on_session_takeovered(ClientInfo, _SessInfo) -> - Req = #{clientinfo => clientinfo(ClientInfo)}, - cast('session.takeovered', Req). - -on_session_terminated(ClientInfo, Reason, _SessInfo) -> - Req = #{clientinfo => clientinfo(ClientInfo), - reason => stringfy(Reason)}, - cast('session.terminated', Req). - -%%-------------------------------------------------------------------- -%% Message -%%-------------------------------------------------------------------- - -on_message_publish(#message{topic = <<"$SYS/", _/binary>>}) -> - ok; -on_message_publish(Message) -> - Req = #{message => message(Message)}, - case call_fold('message.publish', Req, - fun emqx_exhook_handler:merge_responsed_message/2) of - {StopOrOk, #{message := NMessage}} -> - {StopOrOk, assign_to_message(NMessage, Message)}; - _ -> {ok, Message} - end. - -on_message_dropped(#message{topic = <<"$SYS/", _/binary>>}, _By, _Reason) -> - ok; -on_message_dropped(Message, _By, Reason) -> - Req = #{message => message(Message), - reason => stringfy(Reason) - }, - cast('message.dropped', Req). - -on_message_delivered(_ClientInfo, #message{topic = <<"$SYS/", _/binary>>}) -> - ok; -on_message_delivered(ClientInfo, Message) -> - Req = #{clientinfo => clientinfo(ClientInfo), - message => message(Message) - }, - cast('message.delivered', Req). - -on_message_acked(_ClientInfo, #message{topic = <<"$SYS/", _/binary>>}) -> - ok; -on_message_acked(ClientInfo, Message) -> - Req = #{clientinfo => clientinfo(ClientInfo), - message => message(Message) - }, - cast('message.acked', Req). - -%%-------------------------------------------------------------------- -%% Types - -properties(undefined) -> []; -properties(M) when is_map(M) -> - maps:fold(fun(K, V, Acc) -> - [#{name => stringfy(K), - value => stringfy(V)} | Acc] - end, [], M). - -conninfo(_ConnInfo = - #{clientid := ClientId, username := Username, peername := {Peerhost, _}, - sockname := {_, SockPort}, proto_name := ProtoName, proto_ver := ProtoVer, - keepalive := Keepalive}) -> - #{node => stringfy(node()), - clientid => ClientId, - username => maybe(Username), - peerhost => ntoa(Peerhost), - sockport => SockPort, - proto_name => ProtoName, - proto_ver => stringfy(ProtoVer), - keepalive => Keepalive}. - -clientinfo(ClientInfo = - #{clientid := ClientId, username := Username, peerhost := PeerHost, - sockport := SockPort, protocol := Protocol, mountpoint := Mountpoiont}) -> - #{node => stringfy(node()), - clientid => ClientId, - username => maybe(Username), - password => maybe(maps:get(password, ClientInfo, undefined)), - peerhost => ntoa(PeerHost), - sockport => SockPort, - protocol => stringfy(Protocol), - mountpoint => maybe(Mountpoiont), - is_superuser => maps:get(is_superuser, ClientInfo, false), - anonymous => maps:get(anonymous, ClientInfo, true), - cn => maybe(maps:get(cn, ClientInfo, undefined)), - dn => maybe(maps:get(dn, ClientInfo, undefined))}. - -message(#message{id = Id, qos = Qos, from = From, topic = Topic, payload = Payload, timestamp = Ts}) -> - #{node => stringfy(node()), - id => emqx_guid:to_hexstr(Id), - qos => Qos, - from => stringfy(From), - topic => Topic, - payload => Payload, - timestamp => Ts}. - -assign_to_message(#{qos := Qos, topic := Topic, payload := Payload}, Message) -> - Message#message{qos = Qos, topic = Topic, payload = Payload}. - -topicfilters(Tfs) when is_list(Tfs) -> - [#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs]. - -ntoa({0,0,0,0,0,16#ffff,AB,CD}) -> - list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256})); -ntoa(IP) -> - list_to_binary(inet_parse:ntoa(IP)). - -maybe(undefined) -> <<>>; -maybe(B) -> B. - -%% @private -stringfy(Term) when is_binary(Term) -> - Term; -stringfy(Term) when is_integer(Term) -> - integer_to_binary(Term); -stringfy(Term) when is_atom(Term) -> - atom_to_binary(Term, utf8); -stringfy(Term) -> - unicode:characters_to_binary((io_lib:format("~0p", [Term]))). - -%%-------------------------------------------------------------------- -%% Acc funcs - -%% see exhook.proto -merge_responsed_bool(_Req, #{type := 'IGNORE'}) -> - ignore; -merge_responsed_bool(Req, #{type := Type, value := {bool_result, NewBool}}) - when is_boolean(NewBool) -> - NReq = Req#{result => NewBool}, - case Type of - 'CONTINUE' -> {ok, NReq}; - 'STOP_AND_RETURN' -> {stop, NReq} - end; -merge_responsed_bool(_Req, Resp) -> - ?LOG(warning, "Unknown responsed value ~0p to merge to callback chain", [Resp]), - ignore. - -merge_responsed_message(_Req, #{type := 'IGNORE'}) -> - ignore; -merge_responsed_message(Req, #{type := Type, value := {message, NMessage}}) -> - NReq = Req#{message => NMessage}, - case Type of - 'CONTINUE' -> {ok, NReq}; - 'STOP_AND_RETURN' -> {stop, NReq} - end; -merge_responsed_message(_Req, Resp) -> - ?LOG(warning, "Unknown responsed value ~0p to merge to callback chain", [Resp]), - ignore. diff --git a/apps/emqx_exhook/src/emqx_exhook_schema.erl b/apps/emqx_exhook/src/emqx_exhook_schema.erl deleted file mode 100644 index 68ffb5735..000000000 --- a/apps/emqx_exhook/src/emqx_exhook_schema.erl +++ /dev/null @@ -1,60 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_exhook_schema). - --dialyzer(no_return). --dialyzer(no_match). --dialyzer(no_contracts). --dialyzer(no_unused). --dialyzer(no_fail_call). - --include_lib("typerefl/include/types.hrl"). - --behaviour(hocon_schema). - --export([structs/0, fields/1]). --export([t/1, t/3, t/4, ref/1]). - -structs() -> [servers]. - -fields(servers) -> - [ {name, string()} - , {url, string()} - , {ssl, t(ref(ssl_conf_group))} - ]; - -fields(ssl_conf_group) -> - [ {cacertfile, string()} - , {certfile, string()} - , {keyfile, string()} - ]. - -%% types - -t(Type) -> #{type => Type}. - -t(Type, Mapping, Default) -> - hoconsc:t(Type, #{mapping => Mapping, default => Default}). - -t(Type, Mapping, Default, OverrideEnv) -> - hoconsc:t(Type, #{ mapping => Mapping - , default => Default - , override_env => OverrideEnv - }). - -ref(Field) -> - hoconsc:ref(?MODULE, Field). diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl deleted file mode 100644 index 897a2858d..000000000 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ /dev/null @@ -1,338 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_exhook_server). - --include("emqx_exhook.hrl"). --include_lib("emqx/include/logger.hrl"). - - --define(CNTER, emqx_exhook_counter). --define(PB_CLIENT_MOD, emqx_exhook_v_1_hook_provider_client). - -%% Load/Unload --export([ load/2 - , unload/1 - ]). - -%% APIs --export([call/3]). - -%% Infos --export([ name/1 - , format/1 - ]). - --record(server, { - %% Server name (equal to grpc client channel name) - name :: server_name(), - %% The server started options - options :: options(), - %% gRPC channel pid - channel :: pid(), - %% Registered hook names and options - hookspec :: #{hookpoint() => map()}, - %% Metrcis name prefix - prefix :: list() - }). - --type server_name() :: string(). --type server() :: #server{}. - --type hookpoint() :: 'client.connect' - | 'client.connack' - | 'client.connected' - | 'client.disconnected' - | 'client.authenticate' - | 'client.authorize' - | 'client.subscribe' - | 'client.unsubscribe' - | 'session.created' - | 'session.subscribed' - | 'session.unsubscribed' - | 'session.resumed' - | 'session.discarded' - | 'session.takeovered' - | 'session.terminated' - | 'message.publish' - | 'message.delivered' - | 'message.acked' - | 'message.dropped'. - --export_type([server/0]). - --type options() :: #{ url := uri_string:uri_string() - , ssl => map() - }. - --dialyzer({nowarn_function, [inc_metrics/2]}). - -%%-------------------------------------------------------------------- -%% Load/Unload APIs -%%-------------------------------------------------------------------- - --spec load(binary(), options()) -> {ok, server()} | {error, term()} . -load(Name0, Opts0) -> - Name = to_list(Name0), - {SvrAddr, ClientOpts} = channel_opts(Opts0), - case emqx_exhook_sup:start_grpc_client_channel( - Name, - SvrAddr, - ClientOpts) of - {ok, _ChannPoolPid} -> - case do_init(Name) of - {ok, HookSpecs} -> - %% Reigster metrics - Prefix = lists:flatten( - io_lib:format("exhook.~s.", [Name])), - ensure_metrics(Prefix, HookSpecs), - %% Ensure hooks - ensure_hooks(HookSpecs), - {ok, #server{name = Name, - options = Opts0, - channel = _ChannPoolPid, - hookspec = HookSpecs, - prefix = Prefix }}; - {error, _} = E -> - emqx_exhook_sup:stop_grpc_client_channel(Name), E - end; - {error, _} = E -> E - end. - -%% @private -to_list(Name) when is_atom(Name) -> - atom_to_list(Name); -to_list(Name) when is_binary(Name) -> - binary_to_list(Name); -to_list(Name) when is_list(Name) -> - Name. - -%% @private -channel_opts(Opts = #{url := URL}) -> - case uri_string:parse(URL) of - #{scheme := <<"http">>, host := Host, port := Port} -> - {format_http_uri("http", Host, Port), #{}}; - #{scheme := <<"https">>, host := Host, port := Port} -> - SslOpts = - case maps:get(ssl, Opts, undefined) of - undefined -> []; - MapOpts -> - filter( - [{cacertfile, maps:get(cacertfile, MapOpts, undefined)}, - {certfile, maps:get(certfile, MapOpts, undefined)}, - {keyfile, maps:get(keyfile, MapOpts, undefined)} - ]) - end, - {format_http_uri("https", Host, Port), - #{gun_opts => #{transport => ssl, transport_opts => SslOpts}}}; - _ -> - error(bad_server_url) - end. - -format_http_uri(Scheme, Host, Port) -> - lists:flatten(io_lib:format("~s://~s:~w", [Scheme, Host, Port])). - -filter(Ls) -> - [ E || E <- Ls, E /= undefined]. - --spec unload(server()) -> ok. -unload(#server{name = Name, hookspec = HookSpecs}) -> - _ = do_deinit(Name), - _ = may_unload_hooks(HookSpecs), - _ = emqx_exhook_sup:stop_grpc_client_channel(Name), - ok. - -do_deinit(Name) -> - _ = do_call(Name, 'on_provider_unloaded', #{}), - ok. - -do_init(ChannName) -> - %% BrokerInfo defined at: exhook.protos - BrokerInfo = maps:with([version, sysdescr, uptime, datetime], - maps:from_list(emqx_sys:info())), - Req = #{broker => BrokerInfo}, - case do_call(ChannName, 'on_provider_loaded', Req) of - {ok, InitialResp} -> - try - {ok, resovle_hookspec(maps:get(hooks, InitialResp, []))} - catch _:Reason:Stk -> - ?LOG(error, "try to init ~p failed, reason: ~p, stacktrace: ~0p", - [ChannName, Reason, Stk]), - {error, Reason} - end; - {error, Reason} -> - {error, Reason} - end. - -%% @private -resovle_hookspec(HookSpecs) when is_list(HookSpecs) -> - MessageHooks = message_hooks(), - AvailableHooks = available_hooks(), - lists:foldr(fun(HookSpec, Acc) -> - case maps:get(name, HookSpec, undefined) of - undefined -> Acc; - Name0 -> - Name = try binary_to_existing_atom(Name0, utf8) catch T:R:_ -> {T,R} end, - case lists:member(Name, AvailableHooks) of - true -> - case lists:member(Name, MessageHooks) of - true -> - Acc#{Name => #{topics => maps:get(topics, HookSpec, [])}}; - _ -> - Acc#{Name => #{}} - end; - _ -> error({unknown_hookpoint, Name}) - end - end - end, #{}, HookSpecs). - -ensure_metrics(Prefix, HookSpecs) -> - Keys = [list_to_atom(Prefix ++ atom_to_list(Hookpoint)) - || Hookpoint <- maps:keys(HookSpecs)], - lists:foreach(fun emqx_metrics:ensure/1, Keys). - -ensure_hooks(HookSpecs) -> - lists:foreach(fun(Hookpoint) -> - case lists:keyfind(Hookpoint, 1, ?ENABLED_HOOKS) of - false -> - ?LOG(error, "Unknown name ~s to hook, skip it!", [Hookpoint]); - {Hookpoint, {M, F, A}} -> - emqx_hooks:put(Hookpoint, {M, F, A}), - ets:update_counter(?CNTER, Hookpoint, {2, 1}, {Hookpoint, 0}) - end - end, maps:keys(HookSpecs)). - -may_unload_hooks(HookSpecs) -> - lists:foreach(fun(Hookpoint) -> - case ets:update_counter(?CNTER, Hookpoint, {2, -1}, {Hookpoint, 0}) of - Cnt when Cnt =< 0 -> - case lists:keyfind(Hookpoint, 1, ?ENABLED_HOOKS) of - {Hookpoint, {M, F, _A}} -> - emqx_hooks:del(Hookpoint, {M, F}); - _ -> ok - end, - ets:delete(?CNTER, Hookpoint); - _ -> ok - end - end, maps:keys(HookSpecs)). - -format(#server{name = Name, hookspec = Hooks}) -> - io_lib:format("name=~p, hooks=~0p", [Name, Hooks]). - -%%-------------------------------------------------------------------- -%% APIs -%%-------------------------------------------------------------------- - -name(#server{name = Name}) -> - Name. - --spec call(hookpoint(), map(), server()) - -> ignore - | {ok, Resp :: term()} - | {error, term()}. -call(Hookpoint, Req, #server{name = ChannName, hookspec = Hooks, prefix = Prefix}) -> - GrpcFunc = hk2func(Hookpoint), - case maps:get(Hookpoint, Hooks, undefined) of - undefined -> ignore; - Opts -> - NeedCall = case lists:member(Hookpoint, message_hooks()) of - false -> true; - _ -> - #{message := #{topic := Topic}} = Req, - match_topic_filter(Topic, maps:get(topics, Opts, [])) - end, - case NeedCall of - false -> ignore; - _ -> - inc_metrics(Prefix, Hookpoint), - do_call(ChannName, GrpcFunc, Req) - end - end. - -%% @private -inc_metrics(IncFun, Name) when is_function(IncFun) -> - %% BACKW: e4.2.0-e4.2.2 - {env, [Prefix|_]} = erlang:fun_info(IncFun, env), - inc_metrics(Prefix, Name); -inc_metrics(Prefix, Name) when is_list(Prefix) -> - emqx_metrics:inc(list_to_atom(Prefix ++ atom_to_list(Name))). - --compile({inline, [match_topic_filter/2]}). -match_topic_filter(_, []) -> - true; -match_topic_filter(TopicName, TopicFilter) -> - lists:any(fun(F) -> emqx_topic:match(TopicName, F) end, TopicFilter). - --spec do_call(string(), atom(), map()) -> {ok, map()} | {error, term()}. -do_call(ChannName, Fun, Req) -> - Options = #{channel => ChannName}, - ?LOG(debug, "Call ~0p:~0p(~0p, ~0p)", [?PB_CLIENT_MOD, Fun, Req, Options]), - case catch apply(?PB_CLIENT_MOD, Fun, [Req, Options]) of - {ok, Resp, _Metadata} -> - ?LOG(debug, "Response {ok, ~0p, ~0p}", [Resp, _Metadata]), - {ok, Resp}; - {error, {Code, Msg}, _Metadata} -> - ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) response errcode: ~0p, errmsg: ~0p", - [?PB_CLIENT_MOD, Fun, Req, Options, Code, Msg]), - {error, {Code, Msg}}; - {error, Reason} -> - ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) error: ~0p", - [?PB_CLIENT_MOD, Fun, Req, Options, Reason]), - {error, Reason}; - {'EXIT', {Reason, Stk}} -> - ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) throw an exception: ~0p, stacktrace: ~0p", - [?PB_CLIENT_MOD, Fun, Req, Options, Reason, Stk]), - {error, Reason} - end. - -%%-------------------------------------------------------------------- -%% Internal funcs -%%-------------------------------------------------------------------- - --compile({inline, [hk2func/1]}). -hk2func('client.connect') -> 'on_client_connect'; -hk2func('client.connack') -> 'on_client_connack'; -hk2func('client.connected') -> 'on_client_connected'; -hk2func('client.disconnected') -> 'on_client_disconnected'; -hk2func('client.authenticate') -> 'on_client_authenticate'; -hk2func('client.authorize') -> 'on_client_authorize'; -hk2func('client.subscribe') -> 'on_client_subscribe'; -hk2func('client.unsubscribe') -> 'on_client_unsubscribe'; -hk2func('session.created') -> 'on_session_created'; -hk2func('session.subscribed') -> 'on_session_subscribed'; -hk2func('session.unsubscribed') -> 'on_session_unsubscribed'; -hk2func('session.resumed') -> 'on_session_resumed'; -hk2func('session.discarded') -> 'on_session_discarded'; -hk2func('session.takeovered') -> 'on_session_takeovered'; -hk2func('session.terminated') -> 'on_session_terminated'; -hk2func('message.publish') -> 'on_message_publish'; -hk2func('message.delivered') ->'on_message_delivered'; -hk2func('message.acked') -> 'on_message_acked'; -hk2func('message.dropped') ->'on_message_dropped'. - --compile({inline, [message_hooks/0]}). -message_hooks() -> - ['message.publish', 'message.delivered', - 'message.acked', 'message.dropped']. - --compile({inline, [available_hooks/0]}). -available_hooks() -> - ['client.connect', 'client.connack', 'client.connected', - 'client.disconnected', 'client.authenticate', 'client.authorize', - 'client.subscribe', 'client.unsubscribe', - 'session.created', 'session.subscribed', 'session.unsubscribed', - 'session.resumed', 'session.discarded', 'session.takeovered', - 'session.terminated' | message_hooks()]. diff --git a/apps/emqx_exhook/src/emqx_exhook_sup.erl b/apps/emqx_exhook/src/emqx_exhook_sup.erl deleted file mode 100644 index c3ca811bd..000000000 --- a/apps/emqx_exhook/src/emqx_exhook_sup.erl +++ /dev/null @@ -1,59 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_exhook_sup). - --behaviour(supervisor). - --export([ start_link/0 - , init/1 - ]). - --export([ start_grpc_client_channel/3 - , stop_grpc_client_channel/1 - ]). - -%%-------------------------------------------------------------------- -%% Supervisor APIs & Callbacks -%%-------------------------------------------------------------------- - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -init([]) -> - {ok, {{one_for_one, 10, 100}, []}}. - -%%-------------------------------------------------------------------- -%% APIs -%%-------------------------------------------------------------------- - --spec start_grpc_client_channel( - string(), - uri_string:uri_string(), - grpc_client:options()) -> {ok, pid()} | {error, term()}. -start_grpc_client_channel(Name, SvrAddr, Options) -> - grpc_client_sup:create_channel_pool(Name, SvrAddr, Options). - --spec stop_grpc_client_channel(string()) -> ok. -stop_grpc_client_channel(Name) -> - %% Avoid crash due to hot-upgrade had unloaded - %% grpc application - try - grpc_client_sup:stop_channel_pool(Name) - catch - _:_:_ -> - ok - end. diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl deleted file mode 100644 index bf5d6ac1f..000000000 --- a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl +++ /dev/null @@ -1,109 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_exhook_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("eunit/include/eunit.hrl"). --include_lib("common_test/include/ct.hrl"). - --define(CONF_DEFAULT, <<" -exhook: { - servers: [ - { name: \"default\" - url: \"http://127.0.0.1:9000\" - } - ] -} -">>). - -%%-------------------------------------------------------------------- -%% Setups -%%-------------------------------------------------------------------- - -all() -> emqx_ct:all(?MODULE). - -init_per_suite(Cfg) -> - _ = emqx_exhook_demo_svr:start(), - ok = emqx_config:init_load(emqx_exhook_schema, ?CONF_DEFAULT), - emqx_ct_helpers:start_apps([emqx_exhook]), - Cfg. - -end_per_suite(_Cfg) -> - emqx_ct_helpers:stop_apps([emqx_exhook]), - emqx_exhook_demo_svr:stop(). - -%%-------------------------------------------------------------------- -%% Test cases -%%-------------------------------------------------------------------- - -t_noserver_nohook(_) -> - emqx_exhook:disable(<<"default">>), - ?assertEqual([], loaded_exhook_hookpoints()), - [#{name := Name} = Opts] = emqx_config:get([exhook, servers]), - ok = emqx_exhook:enable(Name, Opts), - ?assertNotEqual([], loaded_exhook_hookpoints()). - -t_cli_list(_) -> - meck_print(), - ?assertEqual( [[emqx_exhook_server:format(Svr) || Svr <- emqx_exhook:list()]] - , emqx_exhook_cli:cli(["server", "list"]) - ), - unmeck_print(). - -t_cli_enable_disable(_) -> - meck_print(), - ?assertEqual([already_started], emqx_exhook_cli:cli(["server", "enable", "default"])), - ?assertEqual(ok, emqx_exhook_cli:cli(["server", "disable", "default"])), - ?assertEqual([], emqx_exhook_cli:cli(["server", "list"])), - - ?assertEqual([not_running], emqx_exhook_cli:cli(["server", "disable", "default"])), - ?assertEqual(ok, emqx_exhook_cli:cli(["server", "enable", "default"])), - unmeck_print(). - -t_cli_stats(_) -> - meck_print(), - _ = emqx_exhook_cli:cli(["server", "stats"]), - _ = emqx_exhook_cli:cli(x), - unmeck_print(). - -%%-------------------------------------------------------------------- -%% Utils -%%-------------------------------------------------------------------- - -meck_print() -> - meck:new(emqx_ctl, [passthrough, no_history, no_link]), - meck:expect(emqx_ctl, print, fun(_) -> ok end), - meck:expect(emqx_ctl, print, fun(_, Args) -> Args end). - -unmeck_print() -> - meck:unload(emqx_ctl). - -loaded_exhook_hookpoints() -> - lists:filtermap(fun(E) -> - Name = element(2, E), - Callbacks = element(3, E), - case lists:any(fun is_exhook_callback/1, Callbacks) of - true -> {true, Name}; - _ -> false - end - end, ets:tab2list(emqx_hooks)). - -is_exhook_callback(Cb) -> - Action = element(2, Cb), - emqx_exhook_handler == element(1, Action). diff --git a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl deleted file mode 100644 index 656788b5e..000000000 --- a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl +++ /dev/null @@ -1,339 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_exhook_demo_svr). - --behavior(emqx_exhook_v_1_hook_provider_bhvr). - -%% --export([ start/0 - , stop/0 - , take/0 - , in/1 - ]). - -%% gRPC server HookProvider callbacks --export([ on_provider_loaded/2 - , on_provider_unloaded/2 - , on_client_connect/2 - , on_client_connack/2 - , on_client_connected/2 - , on_client_disconnected/2 - , on_client_authenticate/2 - , on_client_authorize/2 - , on_client_subscribe/2 - , on_client_unsubscribe/2 - , on_session_created/2 - , on_session_subscribed/2 - , on_session_unsubscribed/2 - , on_session_resumed/2 - , on_session_discarded/2 - , on_session_takeovered/2 - , on_session_terminated/2 - , on_message_publish/2 - , on_message_delivered/2 - , on_message_dropped/2 - , on_message_acked/2 - ]). - --define(PORT, 9000). --define(NAME, ?MODULE). - -%%-------------------------------------------------------------------- -%% Server APIs -%%-------------------------------------------------------------------- - -start() -> - Pid = spawn(fun mngr_main/0), - register(?MODULE, Pid), - {ok, Pid}. - -stop() -> - grpc:stop_server(?NAME), - ?MODULE ! stop. - -take() -> - ?MODULE ! {take, self()}, - receive {value, V} -> V - after 5000 -> error(timeout) end. - -in({FunName, Req}) -> - ?MODULE ! {in, FunName, Req}. - -mngr_main() -> - application:ensure_all_started(grpc), - Services = #{protos => [emqx_exhook_pb], - services => #{'emqx.exhook.v1.HookProvider' => emqx_exhook_demo_svr} - }, - Options = [], - Svr = grpc:start_server(?NAME, ?PORT, Services, Options), - mngr_loop([Svr, queue:new(), queue:new()]). - -mngr_loop([Svr, Q, Takes]) -> - receive - {in, FunName, Req} -> - {NQ1, NQ2} = reply(queue:in({FunName, Req}, Q), Takes), - mngr_loop([Svr, NQ1, NQ2]); - {take, From} -> - {NQ1, NQ2} = reply(Q, queue:in(From, Takes)), - mngr_loop([Svr, NQ1, NQ2]); - stop -> - exit(normal) - end. - -reply(Q1, Q2) -> - case queue:len(Q1) =:= 0 orelse - queue:len(Q2) =:= 0 of - true -> {Q1, Q2}; - _ -> - {{value, {Name, V}}, NQ1} = queue:out(Q1), - {{value, From}, NQ2} = queue:out(Q2), - From ! {value, {Name, V}}, - {NQ1, NQ2} - end. - -%%-------------------------------------------------------------------- -%% callbacks -%%-------------------------------------------------------------------- - --spec on_provider_loaded(emqx_exhook_pb:provider_loaded_request(), grpc:metadata()) - -> {ok, emqx_exhook_pb:loaded_response(), grpc:metadata()} - | {error, grpc_cowboy_h:error_response()}. - -on_provider_loaded(Req, Md) -> - ?MODULE:in({?FUNCTION_NAME, Req}), - %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), - {ok, #{hooks => [ - #{name => <<"client.connect">>}, - #{name => <<"client.connack">>}, - #{name => <<"client.connected">>}, - #{name => <<"client.disconnected">>}, - #{name => <<"client.authenticate">>}, - #{name => <<"client.authorize">>}, - #{name => <<"client.subscribe">>}, - #{name => <<"client.unsubscribe">>}, - #{name => <<"session.created">>}, - #{name => <<"session.subscribed">>}, - #{name => <<"session.unsubscribed">>}, - #{name => <<"session.resumed">>}, - #{name => <<"session.discarded">>}, - #{name => <<"session.takeovered">>}, - #{name => <<"session.terminated">>}, - #{name => <<"message.publish">>}, - #{name => <<"message.delivered">>}, - #{name => <<"message.acked">>}, - #{name => <<"message.dropped">>}]}, Md}. --spec on_provider_unloaded(emqx_exhook_pb:provider_unloaded_request(), grpc:metadata()) - -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} - | {error, grpc_cowboy_h:error_response()}. -on_provider_unloaded(Req, Md) -> - ?MODULE:in({?FUNCTION_NAME, Req}), - %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), - {ok, #{}, Md}. - --spec on_client_connect(emqx_exhook_pb:client_connect_request(), grpc:metadata()) - -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} - | {error, grpc_cowboy_h:error_response()}. -on_client_connect(Req, Md) -> - ?MODULE:in({?FUNCTION_NAME, Req}), - %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), - {ok, #{}, Md}. - --spec on_client_connack(emqx_exhook_pb:client_connack_request(), grpc:metadata()) - -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} - | {error, grpc_cowboy_h:error_response()}. -on_client_connack(Req, Md) -> - ?MODULE:in({?FUNCTION_NAME, Req}), - %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), - {ok, #{}, Md}. - --spec on_client_connected(emqx_exhook_pb:client_connected_request(), grpc:metadata()) - -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} - | {error, grpc_cowboy_h:error_response()}. -on_client_connected(Req, Md) -> - ?MODULE:in({?FUNCTION_NAME, Req}), - %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), - {ok, #{}, Md}. - --spec on_client_disconnected(emqx_exhook_pb:client_disconnected_request(), grpc:metadata()) - -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} - | {error, grpc_cowboy_h:error_response()}. -on_client_disconnected(Req, Md) -> - ?MODULE:in({?FUNCTION_NAME, Req}), - %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), - {ok, #{}, Md}. - --spec on_client_authenticate(emqx_exhook_pb:client_authenticate_request(), grpc:metadata()) - -> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()} - | {error, grpc_cowboy_h:error_response()}. -on_client_authenticate(#{clientinfo := #{username := Username}} = Req, Md) -> - ?MODULE:in({?FUNCTION_NAME, Req}), - %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), - %% some cases for testing - case Username of - <<"baduser">> -> - {ok, #{type => 'STOP_AND_RETURN', - value => {bool_result, false}}, Md}; - <<"gooduser">> -> - {ok, #{type => 'STOP_AND_RETURN', - value => {bool_result, true}}, Md}; - <<"normaluser">> -> - {ok, #{type => 'CONTINUE', - value => {bool_result, true}}, Md}; - _ -> - {ok, #{type => 'IGNORE'}, Md} - end. - --spec on_client_authorize(emqx_exhook_pb:client_authorize_request(), grpc:metadata()) - -> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()} - | {error, grpc_cowboy_h:error_response()}. -on_client_authorize(#{clientinfo := #{username := Username}} = Req, Md) -> - ?MODULE:in({?FUNCTION_NAME, Req}), - %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), - %% some cases for testing - case Username of - <<"baduser">> -> - {ok, #{type => 'STOP_AND_RETURN', - value => {bool_result, false}}, Md}; - <<"gooduser">> -> - {ok, #{type => 'STOP_AND_RETURN', - value => {bool_result, true}}, Md}; - <<"normaluser">> -> - {ok, #{type => 'CONTINUE', - value => {bool_result, true}}, Md}; - _ -> - {ok, #{type => 'IGNORE'}, Md} - end. - --spec on_client_subscribe(emqx_exhook_pb:client_subscribe_request(), grpc:metadata()) - -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} - | {error, grpc_cowboy_h:error_response()}. -on_client_subscribe(Req, Md) -> - ?MODULE:in({?FUNCTION_NAME, Req}), - %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), - {ok, #{}, Md}. - --spec on_client_unsubscribe(emqx_exhook_pb:client_unsubscribe_request(), grpc:metadata()) - -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} - | {error, grpc_cowboy_h:error_response()}. -on_client_unsubscribe(Req, Md) -> - ?MODULE:in({?FUNCTION_NAME, Req}), - %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), - {ok, #{}, Md}. - --spec on_session_created(emqx_exhook_pb:session_created_request(), grpc:metadata()) - -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} - | {error, grpc_cowboy_h:error_response()}. -on_session_created(Req, Md) -> - ?MODULE:in({?FUNCTION_NAME, Req}), - %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), - {ok, #{}, Md}. - --spec on_session_subscribed(emqx_exhook_pb:session_subscribed_request(), grpc:metadata()) - -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} - | {error, grpc_cowboy_h:error_response()}. -on_session_subscribed(Req, Md) -> - ?MODULE:in({?FUNCTION_NAME, Req}), - %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), - {ok, #{}, Md}. - --spec on_session_unsubscribed(emqx_exhook_pb:session_unsubscribed_request(), grpc:metadata()) - -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} - | {error, grpc_cowboy_h:error_response()}. -on_session_unsubscribed(Req, Md) -> - ?MODULE:in({?FUNCTION_NAME, Req}), - %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), - {ok, #{}, Md}. - --spec on_session_resumed(emqx_exhook_pb:session_resumed_request(), grpc:metadata()) - -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} - | {error, grpc_cowboy_h:error_response()}. -on_session_resumed(Req, Md) -> - ?MODULE:in({?FUNCTION_NAME, Req}), - %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), - {ok, #{}, Md}. - --spec on_session_discarded(emqx_exhook_pb:session_discarded_request(), grpc:metadata()) - -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} - | {error, grpc_cowboy_h:error_response()}. -on_session_discarded(Req, Md) -> - ?MODULE:in({?FUNCTION_NAME, Req}), - %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), - {ok, #{}, Md}. - --spec on_session_takeovered(emqx_exhook_pb:session_takeovered_request(), grpc:metadata()) - -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} - | {error, grpc_cowboy_h:error_response()}. -on_session_takeovered(Req, Md) -> - ?MODULE:in({?FUNCTION_NAME, Req}), - %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), - {ok, #{}, Md}. - --spec on_session_terminated(emqx_exhook_pb:session_terminated_request(), grpc:metadata()) - -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} - | {error, grpc_cowboy_h:error_response()}. -on_session_terminated(Req, Md) -> - ?MODULE:in({?FUNCTION_NAME, Req}), - %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), - {ok, #{}, Md}. - --spec on_message_publish(emqx_exhook_pb:message_publish_request(), grpc:metadata()) - -> {ok, emqx_exhook_pb:valued_response(), grpc:metadata()} - | {error, grpc_cowboy_h:error_response()}. -on_message_publish(#{message := #{from := From} = Msg} = Req, Md) -> - ?MODULE:in({?FUNCTION_NAME, Req}), - %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), - %% some cases for testing - case From of - <<"baduser">> -> - NMsg = Msg#{qos => 0, - topic => <<"">>, - payload => <<"">> - }, - {ok, #{type => 'STOP_AND_RETURN', - value => {message, NMsg}}, Md}; - <<"gooduser">> -> - NMsg = Msg#{topic => From, - payload => From}, - {ok, #{type => 'STOP_AND_RETURN', - value => {message, NMsg}}, Md}; - _ -> - {ok, #{type => 'IGNORE'}, Md} - end. - --spec on_message_delivered(emqx_exhook_pb:message_delivered_request(), grpc:metadata()) - -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} - | {error, grpc_cowboy_h:error_response()}. -on_message_delivered(Req, Md) -> - ?MODULE:in({?FUNCTION_NAME, Req}), - %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), - {ok, #{}, Md}. - --spec on_message_dropped(emqx_exhook_pb:message_dropped_request(), grpc:metadata()) - -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} - | {error, grpc_cowboy_h:error_response()}. -on_message_dropped(Req, Md) -> - ?MODULE:in({?FUNCTION_NAME, Req}), - %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), - {ok, #{}, Md}. - --spec on_message_acked(emqx_exhook_pb:message_acked_request(), grpc:metadata()) - -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} - | {error, grpc_cowboy_h:error_response()}. -on_message_acked(Req, Md) -> - ?MODULE:in({?FUNCTION_NAME, Req}), - %io:format("fun: ~p, req: ~0p~n", [?FUNCTION_NAME, Req]), - {ok, #{}, Md}. diff --git a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl deleted file mode 100644 index a57e0b49c..000000000 --- a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl +++ /dev/null @@ -1,524 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(prop_exhook_hooks). - --include_lib("proper/include/proper.hrl"). --include_lib("eunit/include/eunit.hrl"). - --import(emqx_ct_proper_types, - [ conninfo/0 - , clientinfo/0 - , sessioninfo/0 - , message/0 - , connack_return_code/0 - , topictab/0 - , topic/0 - , subopts/0 - ]). - --define(CONF_DEFAULT, <<" -exhook: { - servers: [ - { name: \"default\" - url: \"http://127.0.0.1:9000\" - } - ] -} -">>). - --define(ALL(Vars, Types, Exprs), - ?SETUP(fun() -> - State = do_setup(), - fun() -> do_teardown(State) end - end, ?FORALL(Vars, Types, Exprs))). - - -%%-------------------------------------------------------------------- -%% Properties -%%-------------------------------------------------------------------- - -prop_client_connect() -> - ?ALL({ConnInfo, ConnProps}, - {conninfo(), conn_properties()}, - begin - ok = emqx_hooks:run('client.connect', [ConnInfo, ConnProps]), - {'on_client_connect', Resp} = emqx_exhook_demo_svr:take(), - Expected = - #{props => properties(ConnProps), - conninfo => from_conninfo(ConnInfo) - }, - ?assertEqual(Expected, Resp), - true - end). - -prop_client_connack() -> - ?ALL({ConnInfo, Rc, AckProps}, - {conninfo(), connack_return_code(), ack_properties()}, - begin - ok = emqx_hooks:run('client.connack', [ConnInfo, Rc, AckProps]), - {'on_client_connack', Resp} = emqx_exhook_demo_svr:take(), - Expected = - #{props => properties(AckProps), - result_code => atom_to_binary(Rc, utf8), - conninfo => from_conninfo(ConnInfo) - }, - ?assertEqual(Expected, Resp), - true - end). - -prop_client_authenticate() -> - ?ALL({ClientInfo0, AuthResult}, - {clientinfo(), authresult()}, - begin - ClientInfo = inject_magic_into(username, ClientInfo0), - OutAuthResult = emqx_hooks:run_fold('client.authenticate', [ClientInfo], AuthResult), - ExpectedAuthResult = case maps:get(username, ClientInfo) of - <<"baduser">> -> {error, not_authorized}; - <<"gooduser">> -> ok; - <<"normaluser">> -> ok; - _ -> case AuthResult of - ok -> ok; - _ -> {error, not_authorized} - end - end, - ?assertEqual(ExpectedAuthResult, OutAuthResult), - - {'on_client_authenticate', Resp} = emqx_exhook_demo_svr:take(), - Expected = - #{result => authresult_to_bool(AuthResult), - clientinfo => from_clientinfo(ClientInfo) - }, - ?assertEqual(Expected, Resp), - true - end). - -prop_client_authorize() -> - ?ALL({ClientInfo0, PubSub, Topic, Result}, - {clientinfo(), oneof([publish, subscribe]), - topic(), oneof([allow, deny])}, - begin - ClientInfo = inject_magic_into(username, ClientInfo0), - OutResult = emqx_hooks:run_fold( - 'client.authorize', - [ClientInfo, PubSub, Topic], - Result), - ExpectedOutResult = case maps:get(username, ClientInfo) of - <<"baduser">> -> deny; - <<"gooduser">> -> allow; - <<"normaluser">> -> allow; - _ -> Result - end, - ?assertEqual(ExpectedOutResult, OutResult), - - {'on_client_authorize', Resp} = emqx_exhook_demo_svr:take(), - Expected = - #{result => aclresult_to_bool(Result), - type => pubsub_to_enum(PubSub), - topic => Topic, - clientinfo => from_clientinfo(ClientInfo) - }, - ?assertEqual(Expected, Resp), - true - end). - -prop_client_connected() -> - ?ALL({ClientInfo, ConnInfo}, - {clientinfo(), conninfo()}, - begin - ok = emqx_hooks:run('client.connected', [ClientInfo, ConnInfo]), - {'on_client_connected', Resp} = emqx_exhook_demo_svr:take(), - Expected = - #{clientinfo => from_clientinfo(ClientInfo) - }, - ?assertEqual(Expected, Resp), - true - end). - -prop_client_disconnected() -> - ?ALL({ClientInfo, Reason, ConnInfo}, - {clientinfo(), shutdown_reason(), conninfo()}, - begin - ok = emqx_hooks:run('client.disconnected', [ClientInfo, Reason, ConnInfo]), - {'on_client_disconnected', Resp} = emqx_exhook_demo_svr:take(), - Expected = - #{reason => stringfy(Reason), - clientinfo => from_clientinfo(ClientInfo) - }, - ?assertEqual(Expected, Resp), - true - end). - -prop_client_subscribe() -> - ?ALL({ClientInfo, SubProps, TopicTab}, - {clientinfo(), sub_properties(), topictab()}, - begin - ok = emqx_hooks:run('client.subscribe', [ClientInfo, SubProps, TopicTab]), - {'on_client_subscribe', Resp} = emqx_exhook_demo_svr:take(), - Expected = - #{props => properties(SubProps), - topic_filters => topicfilters(TopicTab), - clientinfo => from_clientinfo(ClientInfo) - }, - ?assertEqual(Expected, Resp), - true - end). - -prop_client_unsubscribe() -> - ?ALL({ClientInfo, UnSubProps, TopicTab}, - {clientinfo(), unsub_properties(), topictab()}, - begin - ok = emqx_hooks:run('client.unsubscribe', [ClientInfo, UnSubProps, TopicTab]), - {'on_client_unsubscribe', Resp} = emqx_exhook_demo_svr:take(), - Expected = - #{props => properties(UnSubProps), - topic_filters => topicfilters(TopicTab), - clientinfo => from_clientinfo(ClientInfo) - }, - ?assertEqual(Expected, Resp), - true - end). - -prop_session_created() -> - ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()}, - begin - ok = emqx_hooks:run('session.created', [ClientInfo, SessInfo]), - {'on_session_created', Resp} = emqx_exhook_demo_svr:take(), - Expected = - #{clientinfo => from_clientinfo(ClientInfo) - }, - ?assertEqual(Expected, Resp), - true - end). - -prop_session_subscribed() -> - ?ALL({ClientInfo, Topic, SubOpts}, - {clientinfo(), topic(), subopts()}, - begin - ok = emqx_hooks:run('session.subscribed', [ClientInfo, Topic, SubOpts]), - {'on_session_subscribed', Resp} = emqx_exhook_demo_svr:take(), - Expected = - #{topic => Topic, - subopts => subopts(SubOpts), - clientinfo => from_clientinfo(ClientInfo) - }, - ?assertEqual(Expected, Resp), - true - end). - -prop_session_unsubscribed() -> - ?ALL({ClientInfo, Topic, SubOpts}, - {clientinfo(), topic(), subopts()}, - begin - ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, Topic, SubOpts]), - {'on_session_unsubscribed', Resp} = emqx_exhook_demo_svr:take(), - Expected = - #{topic => Topic, - clientinfo => from_clientinfo(ClientInfo) - }, - ?assertEqual(Expected, Resp), - true - end). - -prop_session_resumed() -> - ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()}, - begin - ok = emqx_hooks:run('session.resumed', [ClientInfo, SessInfo]), - {'on_session_resumed', Resp} = emqx_exhook_demo_svr:take(), - Expected = - #{clientinfo => from_clientinfo(ClientInfo) - }, - ?assertEqual(Expected, Resp), - true - end). - -prop_session_discared() -> - ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()}, - begin - ok = emqx_hooks:run('session.discarded', [ClientInfo, SessInfo]), - {'on_session_discarded', Resp} = emqx_exhook_demo_svr:take(), - Expected = - #{clientinfo => from_clientinfo(ClientInfo) - }, - ?assertEqual(Expected, Resp), - true - end). - -prop_session_takeovered() -> - ?ALL({ClientInfo, SessInfo}, {clientinfo(), sessioninfo()}, - begin - ok = emqx_hooks:run('session.takeovered', [ClientInfo, SessInfo]), - {'on_session_takeovered', Resp} = emqx_exhook_demo_svr:take(), - Expected = - #{clientinfo => from_clientinfo(ClientInfo) - }, - ?assertEqual(Expected, Resp), - true - end). - -prop_session_terminated() -> - ?ALL({ClientInfo, Reason, SessInfo}, - {clientinfo(), shutdown_reason(), sessioninfo()}, - begin - ok = emqx_hooks:run('session.terminated', [ClientInfo, Reason, SessInfo]), - {'on_session_terminated', Resp} = emqx_exhook_demo_svr:take(), - Expected = - #{reason => stringfy(Reason), - clientinfo => from_clientinfo(ClientInfo) - }, - ?assertEqual(Expected, Resp), - true - end). - -prop_message_publish() -> - ?ALL(Msg0, message(), - begin - Msg = emqx_message:from_map( - inject_magic_into(from, emqx_message:to_map(Msg0))), - OutMsg= emqx_hooks:run_fold('message.publish', [], Msg), - case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of - true -> - ?assertEqual(Msg, OutMsg), - skip; - _ -> - ExpectedOutMsg = case emqx_message:from(Msg) of - <<"baduser">> -> - MsgMap = emqx_message:to_map(Msg), - emqx_message:from_map( - MsgMap#{qos => 0, - topic => <<"">>, - payload => <<"">> - }); - <<"gooduser">> = From -> - MsgMap = emqx_message:to_map(Msg), - emqx_message:from_map( - MsgMap#{topic => From, - payload => From - }); - _ -> Msg - end, - ?assertEqual(ExpectedOutMsg, OutMsg), - - {'on_message_publish', Resp} = emqx_exhook_demo_svr:take(), - Expected = - #{message => from_message(Msg) - }, - ?assertEqual(Expected, Resp) - end, - true - end). - -prop_message_dropped() -> - ?ALL({Msg, By, Reason}, {message(), hardcoded, shutdown_reason()}, - begin - ok = emqx_hooks:run('message.dropped', [Msg, By, Reason]), - case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of - true -> skip; - _ -> - {'on_message_dropped', Resp} = emqx_exhook_demo_svr:take(), - Expected = - #{reason => stringfy(Reason), - message => from_message(Msg) - }, - ?assertEqual(Expected, Resp) - end, - true - end). - -prop_message_delivered() -> - ?ALL({ClientInfo, Msg}, {clientinfo(), message()}, - begin - ok = emqx_hooks:run('message.delivered', [ClientInfo, Msg]), - case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of - true -> skip; - _ -> - {'on_message_delivered', Resp} = emqx_exhook_demo_svr:take(), - Expected = - #{clientinfo => from_clientinfo(ClientInfo), - message => from_message(Msg) - }, - ?assertEqual(Expected, Resp) - end, - true - end). - -prop_message_acked() -> - ?ALL({ClientInfo, Msg}, {clientinfo(), message()}, - begin - ok = emqx_hooks:run('message.acked', [ClientInfo, Msg]), - case emqx_topic:match(emqx_message:topic(Msg), <<"$SYS/#">>) of - true -> skip; - _ -> - {'on_message_acked', Resp} = emqx_exhook_demo_svr:take(), - Expected = - #{clientinfo => from_clientinfo(ClientInfo), - message => from_message(Msg) - }, - ?assertEqual(Expected, Resp) - end, - true - end). - -nodestr() -> - stringfy(node()). - -peerhost(#{peername := {Host, _}}) -> - ntoa(Host). - -sockport(#{sockname := {_, Port}}) -> - Port. - -%% copied from emqx_exhook - -ntoa({0,0,0,0,0,16#ffff,AB,CD}) -> - list_to_binary(inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256})); -ntoa(IP) -> - list_to_binary(inet_parse:ntoa(IP)). - -maybe(undefined) -> <<>>; -maybe(B) -> B. - -properties(undefined) -> []; -properties(M) when is_map(M) -> - maps:fold(fun(K, V, Acc) -> - [#{name => stringfy(K), - value => stringfy(V)} | Acc] - end, [], M). - -topicfilters(Tfs) when is_list(Tfs) -> - [#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs]. - -%% @private -stringfy(Term) when is_binary(Term) -> - Term; -stringfy(Term) when is_integer(Term) -> - integer_to_binary(Term); -stringfy(Term) when is_atom(Term) -> - atom_to_binary(Term, utf8); -stringfy(Term) -> - unicode:characters_to_binary((io_lib:format("~0p", [Term]))). - -subopts(SubOpts) -> - #{qos => maps:get(qos, SubOpts, 0), - rh => maps:get(rh, SubOpts, 0), - rap => maps:get(rap, SubOpts, 0), - nl => maps:get(nl, SubOpts, 0), - share => maps:get(share, SubOpts, <<>>) - }. - -authresult_to_bool(AuthResult) -> - AuthResult == ok. - -aclresult_to_bool(Result) -> - Result == allow. - -pubsub_to_enum(publish) -> 'PUBLISH'; -pubsub_to_enum(subscribe) -> 'SUBSCRIBE'. - -from_conninfo(ConnInfo) -> - #{node => nodestr(), - clientid => maps:get(clientid, ConnInfo), - username => maybe(maps:get(username, ConnInfo, <<>>)), - peerhost => peerhost(ConnInfo), - sockport => sockport(ConnInfo), - proto_name => maps:get(proto_name, ConnInfo), - proto_ver => stringfy(maps:get(proto_ver, ConnInfo)), - keepalive => maps:get(keepalive, ConnInfo) - }. - -from_clientinfo(ClientInfo) -> - #{node => nodestr(), - clientid => maps:get(clientid, ClientInfo), - username => maybe(maps:get(username, ClientInfo, <<>>)), - password => maybe(maps:get(password, ClientInfo, <<>>)), - peerhost => ntoa(maps:get(peerhost, ClientInfo)), - sockport => maps:get(sockport, ClientInfo), - protocol => stringfy(maps:get(protocol, ClientInfo)), - mountpoint => maybe(maps:get(mountpoint, ClientInfo, <<>>)), - is_superuser => maps:get(is_superuser, ClientInfo, false), - anonymous => maps:get(anonymous, ClientInfo, true), - cn => maybe(maps:get(cn, ClientInfo, <<>>)), - dn => maybe(maps:get(dn, ClientInfo, <<>>)) - }. - -from_message(Msg) -> - #{node => nodestr(), - id => emqx_guid:to_hexstr(emqx_message:id(Msg)), - qos => emqx_message:qos(Msg), - from => stringfy(emqx_message:from(Msg)), - topic => emqx_message:topic(Msg), - payload => emqx_message:payload(Msg), - timestamp => emqx_message:timestamp(Msg) - }. - -%%-------------------------------------------------------------------- -%% Helper -%%-------------------------------------------------------------------- - -do_setup() -> - logger:set_primary_config(#{level => warning}), - _ = emqx_exhook_demo_svr:start(), - ok = emqx_config:init_load(emqx_exhook_schema, ?CONF_DEFAULT), - emqx_ct_helpers:start_apps([emqx_exhook]), - %% waiting first loaded event - {'on_provider_loaded', _} = emqx_exhook_demo_svr:take(), - ok. - -do_teardown(_) -> - emqx_ct_helpers:stop_apps([emqx_exhook]), - %% waiting last unloaded event - {'on_provider_unloaded', _} = emqx_exhook_demo_svr:take(), - _ = emqx_exhook_demo_svr:stop(), - logger:set_primary_config(#{level => notice}), - timer:sleep(2000), - ok. - -%%-------------------------------------------------------------------- -%% Generators -%%-------------------------------------------------------------------- - -conn_properties() -> - #{}. - -ack_properties() -> - #{}. - -sub_properties() -> - #{}. - -unsub_properties() -> - #{}. - -shutdown_reason() -> - oneof([utf8(), {shutdown, emqx_ct_proper_types:limited_atom()}]). - -authresult() -> - ?LET(RC, connack_return_code(), - case RC of - success -> ok; - _ -> {error, RC} - end). - -inject_magic_into(Key, Object) -> - case castspell() of - muggles -> Object; - Spell -> - Object#{Key => Spell} - end. - -castspell() -> - L = [<<"baduser">>, <<"gooduser">>, <<"normaluser">>, muggles], - lists:nth(rand:uniform(length(L)), L). diff --git a/apps/emqx_exproto/.formatter.exs b/apps/emqx_exproto/.formatter.exs deleted file mode 100644 index d2cda26ed..000000000 --- a/apps/emqx_exproto/.formatter.exs +++ /dev/null @@ -1,4 +0,0 @@ -# Used by "mix format" -[ - inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"] -] diff --git a/apps/emqx_gateway/src/coap/include/emqx_coap.hrl b/apps/emqx_gateway/include/emqx_coap.hrl similarity index 100% rename from apps/emqx_gateway/src/coap/include/emqx_coap.hrl rename to apps/emqx_gateway/include/emqx_coap.hrl diff --git a/apps/emqx_gateway/src/exproto/include/emqx_exproto.hrl b/apps/emqx_gateway/include/emqx_exproto.hrl similarity index 100% rename from apps/emqx_gateway/src/exproto/include/emqx_exproto.hrl rename to apps/emqx_gateway/include/emqx_exproto.hrl diff --git a/apps/emqx_gateway/src/lwm2m/include/emqx_lwm2m.hrl b/apps/emqx_gateway/include/emqx_lwm2m.hrl similarity index 100% rename from apps/emqx_gateway/src/lwm2m/include/emqx_lwm2m.hrl rename to apps/emqx_gateway/include/emqx_lwm2m.hrl diff --git a/apps/emqx_gateway/src/mqttsn/include/emqx_sn.hrl b/apps/emqx_gateway/include/emqx_sn.hrl similarity index 100% rename from apps/emqx_gateway/src/mqttsn/include/emqx_sn.hrl rename to apps/emqx_gateway/include/emqx_sn.hrl diff --git a/apps/emqx_gateway/src/stomp/include/emqx_stomp.hrl b/apps/emqx_gateway/include/emqx_stomp.hrl similarity index 100% rename from apps/emqx_gateway/src/stomp/include/emqx_stomp.hrl rename to apps/emqx_gateway/include/emqx_stomp.hrl diff --git a/apps/emqx_gateway/mix.exs b/apps/emqx_gateway/mix.exs index 846164f38..b6e042b42 100644 --- a/apps/emqx_gateway/mix.exs +++ b/apps/emqx_gateway/mix.exs @@ -1,4 +1,4 @@ -defmodule EMQXStomp.MixProject do +defmodule EMQXGateway.MixProject do use Mix.Project def project do diff --git a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl index c208d00f8..16afcf303 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl @@ -19,7 +19,7 @@ -behavior(emqx_gateway_channel). -include_lib("emqx/include/logger.hrl"). --include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl"). +-include("emqx_coap.hrl"). %% API -export([]). diff --git a/apps/emqx_gateway/src/coap/emqx_coap_frame.erl b/apps/emqx_gateway/src/coap/emqx_coap_frame.erl index 9a53f3e01..a5327f239 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_frame.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_frame.erl @@ -30,8 +30,8 @@ %% API -export([]). --include("include/emqx_coap.hrl"). --include("apps/emqx/include/types.hrl"). +-include_lib("emqx/include/types.hrl"). +-include("emqx_coap.hrl"). -define(VERSION, 1). diff --git a/apps/emqx_gateway/src/coap/emqx_coap_message.erl b/apps/emqx_gateway/src/coap/emqx_coap_message.erl index 52a03c418..64c019aec 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_message.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_message.erl @@ -27,7 +27,7 @@ -export([request/2, request/3, request/4, ack/1, response/1, response/2, response/3]). -export([set/3, set_payload/2, get_content/1, set_content/2, set_content/3, get_option/2]). --include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl"). +-include("emqx_coap.hrl"). request(Type, Method) -> request(Type, Method, <<>>, []). diff --git a/apps/emqx_gateway/src/coap/emqx_coap_resource.erl b/apps/emqx_gateway/src/coap/emqx_coap_resource.erl index 93fe82aba..8383c23b0 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_resource.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_resource.erl @@ -16,7 +16,7 @@ -module(emqx_coap_resource). --include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl"). +-include("emqx_coap.hrl"). -type context() :: any(). -type topic() :: binary(). diff --git a/apps/emqx_gateway/src/coap/emqx_coap_session.erl b/apps/emqx_gateway/src/coap/emqx_coap_session.erl index 8b9eed14c..7543d787f 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_session.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_session.erl @@ -18,7 +18,7 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/logger.hrl"). --include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl"). +-include("emqx_coap.hrl"). %% API -export([new/0, transfer_result/3]). diff --git a/apps/emqx_gateway/src/coap/emqx_coap_tm.erl b/apps/emqx_gateway/src/coap/emqx_coap_tm.erl index 8830d7447..0e13eed02 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_tm.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_tm.erl @@ -26,7 +26,7 @@ -export_type([manager/0, event_result/1]). -include_lib("emqx/include/logger.hrl"). --include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl"). +-include("emqx_coap.hrl"). -type direction() :: in | out. -type state_machine_id() :: {direction(), non_neg_integer()}. diff --git a/apps/emqx_gateway/src/coap/emqx_coap_transport.erl b/apps/emqx_gateway/src/coap/emqx_coap_transport.erl index b4c8ae333..6c14caaf1 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_transport.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_transport.erl @@ -1,7 +1,7 @@ -module(emqx_coap_transport). -include_lib("emqx/include/logger.hrl"). --include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl"). +-include("emqx_coap.hrl"). -define(ACK_TIMEOUT, 2000). -define(ACK_RANDOM_FACTOR, 1000). diff --git a/apps/emqx_gateway/src/coap/resources/emqx_coap_mqtt_resource.erl b/apps/emqx_gateway/src/coap/resources/emqx_coap_mqtt_resource.erl index 1fd3d7b8e..0dc8ded84 100644 --- a/apps/emqx_gateway/src/coap/resources/emqx_coap_mqtt_resource.erl +++ b/apps/emqx_gateway/src/coap/resources/emqx_coap_mqtt_resource.erl @@ -20,7 +20,7 @@ -behaviour(emqx_coap_resource). -include_lib("emqx/include/emqx_mqtt.hrl"). --include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl"). +-include("emqx_coap.hrl"). -export([ init/1 diff --git a/apps/emqx_gateway/src/coap/resources/emqx_coap_pubsub_resource.erl b/apps/emqx_gateway/src/coap/resources/emqx_coap_pubsub_resource.erl index c750f66dd..63879cfbd 100644 --- a/apps/emqx_gateway/src/coap/resources/emqx_coap_pubsub_resource.erl +++ b/apps/emqx_gateway/src/coap/resources/emqx_coap_pubsub_resource.erl @@ -20,7 +20,7 @@ -behaviour(emqx_coap_resource). -include_lib("emqx/include/logger.hrl"). --include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl"). +-include("emqx_coap.hrl"). -export([ init/1 diff --git a/apps/emqx_gateway/src/coap/resources/emqx_coap_pubsub_topics.erl b/apps/emqx_gateway/src/coap/resources/emqx_coap_pubsub_topics.erl index 328d1df04..65fa54451 100644 --- a/apps/emqx_gateway/src/coap/resources/emqx_coap_pubsub_topics.erl +++ b/apps/emqx_gateway/src/coap/resources/emqx_coap_pubsub_topics.erl @@ -19,7 +19,7 @@ -behaviour(gen_server). -include_lib("emqx/include/logger.hrl"). --include_lib("emqx_gateway/src/coap/include/emqx_coap.hrl"). +-include("emqx_coap.hrl"). -export([ start_link/0 diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl index b1a1ae027..f6b33e0b2 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl @@ -15,7 +15,7 @@ %%-------------------------------------------------------------------- -module(emqx_exproto_channel). --include("src/exproto/include/emqx_exproto.hrl"). +-include("emqx_exproto.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/types.hrl"). diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_gsvr.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_gsvr.erl index 346f87452..83c148968 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_gsvr.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_gsvr.erl @@ -19,7 +19,7 @@ % -behavior(emqx_exproto_v_1_connection_adapter_bhvr). --include("src/exproto/include/emqx_exproto.hrl"). +-include("emqx_exproto.hrl"). -include_lib("emqx/include/logger.hrl"). diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_cmd_handler.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_cmd_handler.erl index 318328e3c..b3251a275 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_cmd_handler.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_cmd_handler.erl @@ -16,7 +16,7 @@ -module(emqx_lwm2m_cmd_handler). --include("src/lwm2m/include/emqx_lwm2m.hrl"). +-include("emqx_lwm2m.hrl"). -include_lib("lwm2m_coap/include/coap.hrl"). diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_coap_resource.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_coap_resource.erl index 588dd523e..0d1ea0568 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_coap_resource.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_coap_resource.erl @@ -41,7 +41,7 @@ -export([parse_object_list/1]). --include("src/lwm2m/include/emqx_lwm2m.hrl"). +-include("emqx_lwm2m.hrl"). -define(PREFIX, <<"rd">>). diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_json.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_json.erl index 295c68085..641cf7d97 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_json.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_json.erl @@ -22,7 +22,7 @@ , opaque_to_json/2 ]). --include("src/lwm2m/include/emqx_lwm2m.hrl"). +-include("emqx_lwm2m.hrl"). -define(LOG(Level, Format, Args), logger:Level("LWM2M-JSON: " ++ Format, Args)). diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_message.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_message.erl index 6d155f9bd..6b8bc8d50 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_message.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_message.erl @@ -23,7 +23,7 @@ , translate_json/1 ]). --include("src/lwm2m/include/emqx_lwm2m.hrl"). +-include("emqx_lwm2m.hrl"). -define(LOG(Level, Format, Args), logger:Level("LWM2M-JSON: " ++ Format, Args)). diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_protocol.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_protocol.erl index f14d6cb73..290ed86eb 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_protocol.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_protocol.erl @@ -16,7 +16,7 @@ -module(emqx_lwm2m_protocol). --include("src/lwm2m/include/emqx_lwm2m.hrl"). +-include("emqx_lwm2m.hrl"). -include_lib("emqx/include/emqx.hrl"). @@ -300,7 +300,7 @@ auto_observe_object_list(Expected, Registered) -> send_auto_observe(CoapPid, RegInfo, EndpointName) -> %% - auto observe the objects - Envs = proplists:get_value(config, lwm2m_coap_responder:options(), #{}), + Envs = proplists:get_value(config, lwm2m_coap_responder:options(), #{}), case maps:get(auto_observe, Envs, false) of false -> ?LOG(info, "Auto Observe Disabled", []); @@ -557,4 +557,3 @@ stats(_State) -> ], ProcStats = emqx_misc:proc_stats(), lists:append([SockStats, ConnStats, ChanStats, ProcStats]). - diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_timer.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_timer.erl index b86000292..75ab2d42a 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_timer.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_timer.erl @@ -16,7 +16,7 @@ -module(emqx_lwm2m_timer). --include("src/lwm2m/include/emqx_lwm2m.hrl"). +-include("emqx_lwm2m.hrl"). -export([ cancel_timer/1 , start_timer/2 diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_tlv.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_tlv.erl index dd1ecddda..76fac1342 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_tlv.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_tlv.erl @@ -25,7 +25,7 @@ -export([binary_to_hex_string/1]). -endif. --include("src/lwm2m/include/emqx_lwm2m.hrl"). +-include("emqx_lwm2m.hrl"). -define(LOG(Level, Format, Args), logger:Level("LWM2M-TLV: " ++ Format, Args)). @@ -162,4 +162,3 @@ encode_value(Value) -> binary_to_hex_string(Data) -> lists:flatten([io_lib:format("~2.16.0B ",[X]) || <> <= Data ]). -endif. - diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_xml_object.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_xml_object.erl index 96a80735f..dd9911407 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_xml_object.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_xml_object.erl @@ -16,7 +16,7 @@ -module(emqx_lwm2m_xml_object). --include("src/lwm2m/include/emqx_lwm2m.hrl"). +-include("emqx_lwm2m.hrl"). -include_lib("xmerl/include/xmerl.hrl"). -export([ get_obj_def/2 diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_xml_object_db.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_xml_object_db.erl index ea5f878d3..6412d68c8 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_xml_object_db.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_xml_object_db.erl @@ -16,7 +16,7 @@ -module(emqx_lwm2m_xml_object_db). --include("src/lwm2m/include/emqx_lwm2m.hrl"). +-include("emqx_lwm2m.hrl"). -include_lib("xmerl/include/xmerl.hrl"). % This module is for future use. Disabled now. diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_broadcast.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_broadcast.erl index f1fb4eeb1..f858c2245 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_broadcast.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_broadcast.erl @@ -18,7 +18,7 @@ -behaviour(gen_server). --include("src/mqttsn/include/emqx_sn.hrl"). +-include("emqx_sn.hrl"). -include_lib("emqx/include/logger.hrl"). -export([ start_link/2 @@ -97,4 +97,3 @@ send_advertise(#state{gwid = GwId, sock = Sock, port = Port, boradcast_addrs() -> lists:usort([Addr || {ok, IfList} <- [inet:getiflist()], If <- IfList, {ok, [{broadaddr, Addr}]} <- [inet:ifget(If, [broadaddr])]]). - diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index 5bba599c8..32ec918a9 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -18,7 +18,7 @@ -behavior(emqx_gateway_channel). --include("src/mqttsn/include/emqx_sn.hrl"). +-include("emqx_sn.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -970,7 +970,7 @@ handle_out(connack, ?SN_RC_ACCEPTED, Channel = #channel{ctx = Ctx, conninfo = ConnInfo}) -> _ = run_hooks(Ctx, 'client.connack', [ConnInfo, returncode_name(?SN_RC_ACCEPTED)], - #{} + #{} ), return_connack(?SN_CONNACK_MSG(?SN_RC_ACCEPTED), ensure_keepalive(Channel)); @@ -1227,7 +1227,7 @@ handle_deliver(Delivers, Channel = #channel{ ctx = Ctx, conn_state = ConnState, session = Session, - clientinfo = #{clientid := ClientId}}) + clientinfo = #{clientid := ClientId}}) when ConnState =:= disconnected; ConnState =:= asleep -> NSession = emqx_session:enqueue( diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_frame.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_frame.erl index 32d1a21a2..1b2025d1b 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_frame.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_frame.erl @@ -20,7 +20,7 @@ -behavior(emqx_gateway_frame). --include("src/mqttsn/include/emqx_sn.hrl"). +-include("emqx_sn.hrl"). -export([ initial_parse_state/1 , serialize_opts/0 diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl index 1249831cc..991acaa7b 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl @@ -21,7 +21,7 @@ -behaviour(gen_server). --include("src/mqttsn/include/emqx_sn.hrl"). +-include("emqx_sn.hrl"). -define(LOG(Level, Format, Args), emqx_logger:Level("MQTT-SN(registry): " ++ Format, Args)). diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl index fa7b2a357..c7426a40d 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl @@ -18,7 +18,7 @@ -behavior(emqx_gateway_channel). --include("src/stomp/include/emqx_stomp.hrl"). +-include("emqx_stomp.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl index 2b511b57a..77f426dbb 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_frame.erl @@ -70,7 +70,7 @@ -behavior(emqx_gateway_frame). --include("src/stomp/include/emqx_stomp.hrl"). +-include("emqx_stomp.hrl"). -export([ initial_parse_state/1 , parse/2 diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_heartbeat.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_heartbeat.erl index 99a1508e1..72217552b 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_heartbeat.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_heartbeat.erl @@ -17,7 +17,7 @@ %% @doc Stomp heartbeat. -module(emqx_stomp_heartbeat). --include("src/stomp/include/emqx_stomp.hrl"). +-include("emqx_stomp.hrl"). -export([ init/1 , check/3 diff --git a/apps/emqx_exproto/mix.exs b/apps/emqx_machine/mix.exs similarity index 62% rename from apps/emqx_exproto/mix.exs rename to apps/emqx_machine/mix.exs index 610e989ab..14913be6f 100644 --- a/apps/emqx_exproto/mix.exs +++ b/apps/emqx_machine/mix.exs @@ -1,10 +1,10 @@ -defmodule EMQXExproto.MixProject do +defmodule EMQXMachine.MixProject do use Mix.Project def project do [ - app: :emqx_exproto, - version: "4.4.0", + app: :emqx_machine, + version: "0.1.0", build_path: "../../_build", config_path: "../../config/config.exs", deps_path: "../../deps", @@ -12,20 +12,19 @@ defmodule EMQXExproto.MixProject do elixir: "~> 1.12", start_permanent: Mix.env() == :prod, deps: deps(), - description: "EMQ X Extension for Protocol" + description: "The EMQ X Machine" ] end def application do [ - mod: {:emqx_exproto_app, []}, + registered: [], + mod: {:emqx_machine_app, []}, extra_applications: [:logger] ] end defp deps do - [ - {:grpc, github: "emqx/grpc-erl", tag: "0.6.2"} - ] + [] end end diff --git a/apps/emqx_machine/src/emqx_machine.erl b/apps/emqx_machine/src/emqx_machine.erl index fcf8d3239..7fb51801a 100644 --- a/apps/emqx_machine/src/emqx_machine.erl +++ b/apps/emqx_machine/src/emqx_machine.erl @@ -138,7 +138,7 @@ start_one_app(App) -> %% 1. due to static static config change %% 2. after join a cluster reboot_apps() -> - [gproc, esockd, ranch, cowboy, ekka, emqx | ?EMQX_DEP_APPS]. + [gproc, esockd, ranch, cowboy, ekka, emqx]. %% | ?EMQX_DEP_APPS]. sorted_reboot_apps() -> Apps = [{App, app_deps(App)} || App <- reboot_apps()], diff --git a/apps/emqx_management/mix.exs b/apps/emqx_management/mix.exs index 6420d4b24..a26d482ce 100644 --- a/apps/emqx_management/mix.exs +++ b/apps/emqx_management/mix.exs @@ -29,7 +29,7 @@ defmodule EMQXManagement.MixProject do {:emqx_rule_engine, in_umbrella: true}, {:ekka, github: "emqx/ekka", tag: "0.10.2"}, {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.2.1"}, - {:minirest, github: "emqx/minirest", tag: "0.3.5"} + {:minirest, github: "emqx/minirest", tag: "1.1.7"} ] end end diff --git a/apps/emqx_psk_file/.formatter.exs b/apps/emqx_psk_file/.formatter.exs deleted file mode 100644 index d2cda26ed..000000000 --- a/apps/emqx_psk_file/.formatter.exs +++ /dev/null @@ -1,4 +0,0 @@ -# Used by "mix format" -[ - inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"] -] diff --git a/apps/emqx_psk_file/mix.exs b/apps/emqx_psk_file/mix.exs deleted file mode 100644 index b7e6e4e7a..000000000 --- a/apps/emqx_psk_file/mix.exs +++ /dev/null @@ -1,32 +0,0 @@ -defmodule EMQXPskFile.MixProject do - use Mix.Project - - def project do - [ - app: :emqx_psk_file, - version: "4.3.0", - build_path: "../../_build", - config_path: "../../config/config.exs", - deps_path: "../../deps", - lockfile: "../../mix.lock", - elixir: "~> 1.12", - start_permanent: Mix.env() == :prod, - deps: deps(), - description: "EMQX PSK Plugin from File" - ] - end - - def application do - [ - registered: [:emqx_psk_file_sup], - mod: {:emqx_psk_file_app, []}, - extra_applications: [:logger] - ] - end - - defp deps do - [ - {:emqx, in_umbrella: true, runtime: false} - ] - end -end diff --git a/apps/emqx_release_helper/lib/emqx_release_helper/applications.ex b/apps/emqx_release_helper/lib/emqx_release_helper/applications.ex index c93fb2c65..ee39271ed 100644 --- a/apps/emqx_release_helper/lib/emqx_release_helper/applications.ex +++ b/apps/emqx_release_helper/lib/emqx_release_helper/applications.ex @@ -7,7 +7,6 @@ defmodule EmqxReleaseHelper.Applications do overlay %{release_type: release_type} do copy "etc/certs", "etc/certs" - template "etc/acl.conf", "etc/acl.conf" template "etc/emqx.conf", "etc/emqx.conf" template "etc/ssl_dist.conf", "etc/ssl_dist.conf" template "etc/emqx_#{release_type}/vm.args", "etc/vm.args" @@ -30,22 +29,17 @@ defmodule EmqxReleaseHelper.Applications do start_type :permanent end - application :emqx_authz do - start_type :permanent - overlay :application - end - application :emqx_data_bridge do start_type :permanent overlay :application end - application :emqx_sn do + application :emqx_authn do start_type :permanent overlay :application end - application :emqx_authentication do + application :emqx_authz do start_type :permanent overlay :application end @@ -65,11 +59,6 @@ defmodule EmqxReleaseHelper.Applications do overlay :application end - application :emqx_stomp do - start_type :permanent - overlay :application - end - application :emqx_bridge_mqtt do start_type :permanent overlay :application @@ -80,22 +69,12 @@ defmodule EmqxReleaseHelper.Applications do overlay :application end - application :emqx_telemetry do - start_type :permanent - overlay :application - end - - application :emqx_coap do - start_type :permanent - overlay :application - end - application :emqx_rule_engine do start_type :permanent overlay :application end - application :emqx_web_hook do + application :emqx_rule_actions do start_type :permanent overlay :application end @@ -104,39 +83,11 @@ defmodule EmqxReleaseHelper.Applications do start_type :load end - application :emqx_exhook, %{release_type: :cloud} do - start_type :permanent - overlay :application - end - - application :emqx_exproto, %{release_type: :cloud} do - start_type :permanent - overlay :application - end - application :emqx_prometheus, %{release_type: :cloud} do start_type :permanent overlay :application end - application :emqx_lwm2m, %{release_type: :cloud} do - start_type :permanent - overlay :application - - overlay do - copy "lwm2m_xml", "etc/lwm2m_xml" - end - end - - application :emqx_psk_file, %{release_type: :cloud} do - start_type :permanent - overlay :application - - overlay do - copy "etc/psk.txt", "etc/psk.txt" - end - end - application :bcrypt, %{enable_bcrypt: true, release_type: :cloud} do start_type :permanent end diff --git a/apps/emqx_release_helper/lib/emqx_release_helper/overlay.ex b/apps/emqx_release_helper/lib/emqx_release_helper/overlay.ex index da8034580..19a240805 100644 --- a/apps/emqx_release_helper/lib/emqx_release_helper/overlay.ex +++ b/apps/emqx_release_helper/lib/emqx_release_helper/overlay.ex @@ -26,10 +26,6 @@ defmodule EmqxReleaseHelper.Overlay do copy "bin/install_upgrade.escript", "bin/install_upgrade.escript-#{release_version}" - template "data/loaded_plugins.tmpl", "data/loaded_plugins" - - template "data/loaded_modules.tmpl", "data/loaded_modules" - template "data/emqx_vars", "releases/emqx_vars" template "data/BUILT_ON", "releases/#{release_version}/BUILT_ON" # template "bin/emqx.cmd", "bin/emqx.cmd" diff --git a/apps/emqx_telemetry/mix.exs b/apps/emqx_rule_actions/mix.exs similarity index 66% rename from apps/emqx_telemetry/mix.exs rename to apps/emqx_rule_actions/mix.exs index 4316d4b92..40b658f50 100644 --- a/apps/emqx_telemetry/mix.exs +++ b/apps/emqx_rule_actions/mix.exs @@ -1,9 +1,9 @@ -defmodule EMQXTelemetry.MixProject do +defmodule EmqxRuleActions.MixProject do use Mix.Project def project do [ - app: :emqx_telemetry, + app: :emqx_rule_actions, version: "5.0.0", build_path: "../../_build", config_path: "../../config/config.exs", @@ -12,19 +12,20 @@ defmodule EMQXTelemetry.MixProject do elixir: "~> 1.12", start_permanent: Mix.env() == :prod, deps: deps(), - description: "EMQ X Telemetry" + description: "Rule Actions" ] end def application do [ - registered: [:emqx_telemetry_sup], - mod: {:emqx_telemetry_app, []}, extra_applications: [:logger] ] end defp deps do - [] + [ + {:emqx, in_umbrella: true, runtime: false}, + {:emqx_rule_engine, in_umbrella: true} + ] end end diff --git a/apps/emqx_telemetry/.formatter.exs b/apps/emqx_telemetry/.formatter.exs deleted file mode 100644 index d2cda26ed..000000000 --- a/apps/emqx_telemetry/.formatter.exs +++ /dev/null @@ -1,4 +0,0 @@ -# Used by "mix format" -[ - inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"] -] diff --git a/apps/emqx_web_hook/.formatter.exs b/apps/emqx_web_hook/.formatter.exs deleted file mode 100644 index d2cda26ed..000000000 --- a/apps/emqx_web_hook/.formatter.exs +++ /dev/null @@ -1,4 +0,0 @@ -# Used by "mix format" -[ - inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"] -] diff --git a/apps/emqx_web_hook/mix.exs b/apps/emqx_web_hook/mix.exs deleted file mode 100644 index 497570c9b..000000000 --- a/apps/emqx_web_hook/mix.exs +++ /dev/null @@ -1,34 +0,0 @@ -defmodule EMQXWebHook.MixProject do - use Mix.Project - - def project do - [ - app: :emqx_web_hook, - version: "4.3.1", - build_path: "../../_build", - config_path: "../../config/config.exs", - deps_path: "../../deps", - lockfile: "../../mix.lock", - elixir: "~> 1.12", - start_permanent: Mix.env() == :prod, - deps: deps(), - description: "EMQ X WebHook Plugin" - ] - end - - def application do - [ - registered: [:emqx_web_hook_sup], - mod: {:emqx_web_hook_app, []}, - extra_applications: [:logger] - ] - end - - defp deps do - [ - {:emqx_rule_engine, in_umbrella: true}, - {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.2.1"}, - {:ehttpc, github: "emqx/ehttpc", tag: "0.1.6"} - ] - end -end diff --git a/deploy/packages/deb/debian/init.script b/deploy/packages/deb/debian/init.script index 0fcafd1d2..ef4e16b7a 100755 --- a/deploy/packages/deb/debian/init.script +++ b/deploy/packages/deb/debian/init.script @@ -58,7 +58,7 @@ do_stop() # waiting stop done sleep 5 sleep 5 - + # Return # 0 if daemon has been stopped # 1 if daemon was already stopped @@ -146,5 +146,3 @@ case "$1" in exit 3 ;; esac - - diff --git a/mix.exs b/mix.exs index ffffdbcdb..ecd45fbf5 100644 --- a/mix.exs +++ b/mix.exs @@ -15,6 +15,7 @@ defmodule EMQXUmbrella.MixProject do defp deps do [ {:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true}, + {:jsx, "~> 3.1", override: true}, {:gun, github: "emqx/gun", tag: "1.3.4", override: true}, {:hocon, github: "emqx/hocon", override: true}, {:cuttlefish, @@ -25,13 +26,14 @@ defmodule EMQXUmbrella.MixProject do {:getopt, github: "emqx/getopt", tag: "v1.0.2", override: true}, {:cowboy, github: "emqx/cowboy", tag: "2.8.2", override: true}, {:cowlib, "~> 2.8", override: true}, + {:ranch, "~> 2.0", override: true}, {:poolboy, github: "emqx/poolboy", tag: "1.5.2", override: true}, {:esockd, github: "emqx/esockd", tag: "5.8.0", override: true}, {:gproc, "~> 0.9", override: true}, {:eetcd, "~> 0.3", override: true}, {:grpc, github: "emqx/grpc-erl", tag: "0.6.2", override: true}, {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true}, - {:typerefl, github: "k32/typerefl", tag: "0.6.2", manager: :rebar3, override: true}, + {:typerefl, github: "k32/typerefl", tag: "0.8.3", manager: :rebar3, override: true}, {:gen_rpc, github: "emqx/gen_rpc", tag: "2.5.1", override: true}, {:gen_coap, github: "emqx/gen_coap", tag: "v0.3.2", override: true}, {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "0.14.0", override: true}, diff --git a/mix.lock b/mix.lock index 0a09d5466..4ffb0395f 100644 --- a/mix.lock +++ b/mix.lock @@ -2,6 +2,7 @@ "bbmustache": {:hex, :bbmustache, "1.12.1", "857fbdf86bda46d07201b0e7a969820cb763a7c174c485fd0780d7e033efe9f0", [:rebar3], [], "hexpm", "f4320778c31a821a2a664db8894618abb79c1af7bbf7c03c703c8868d9bb09fe"}, "bcrypt": {:git, "https://github.com/emqx/erlang-bcrypt.git", "dc2ba66acf2332c111362d01137746eefecc5e90", [tag: "0.6.0"]}, "cowboy": {:git, "https://github.com/emqx/cowboy.git", "b89d4689a04149b1a4a3641280aa5c5643f921b2", [tag: "2.8.2"]}, + "cowboy_swagger": {:git, "https://github.com/inaka/cowboy_swagger", "f779ffe1365f474f0b569c9cd87ded2ac147a67e", [tag: "2.3.0"]}, "cowlib": {:hex, :cowlib, "2.11.0", "0b9ff9c346629256c42ebe1eeb769a83c6cb771a6ee5960bd110ab0b9b872063", [:make, :rebar3], [], "hexpm", "2b3e9da0b21c4565751a6d4901c20d1b4cc25cbb7fd50d91d2ab6dd287bc86a9"}, "cuttlefish": {:git, "https://github.com/emqx/cuttlefish.git", "1180224fb60d87ef41307c949453248d4ebef761", []}, "ecpool": {:git, "https://github.com/emqx/ecpool.git", "0516d2cebd14654ef8c583c347e4a0b01363b86d", [tag: "0.5.1"]}, @@ -10,6 +11,8 @@ "ekka": {:git, "https://github.com/emqx/ekka.git", "cdde7fa2db89764f5c6be69d8bc6f79f4cfa5c82", [tag: "0.10.2"]}, "emqtt": {:git, "https://github.com/emqx/emqtt.git", "9e867b1fcaadbfcce45ea75d3721f982907ae417", [tag: "v1.2.3"]}, "emqx_http_lib": {:git, "https://github.com/emqx/emqx_http_lib.git", "b8c34801ba5835d96f88d5dd9d8dbdd2c70ffa58", [tag: "0.2.1"]}, + "epgsql": {:git, "https://github.com/epgsql/epgsql.git", "895c8f9d53f08d09ec6a0301c56d3d6f270929f2", [tag: "4.4.0"]}, + "esasl": {:git, "https://github.com/emqx/esasl.git", "1d4ab8d3ff7fd52018d3dddfec499933f9bb62b6", [tag: "0.1.0"]}, "esockd": {:git, "https://github.com/emqx/esockd.git", "9b959fc11a1c398a589892f335235be6c5b4a454", [tag: "5.8.0"]}, "estatsd": {:git, "https://github.com/emqx/estatsd.git", "5184d846b7ecb83509bd4d32695c60428c0198cd", [tag: "0.1.0"]}, "gen_coap": {:git, "https://github.com/emqx/gen_coap.git", "9bf5e7f795badf68e2fb4eb226f576308f5b1bb4", [tag: "v0.3.2"]}, @@ -19,21 +22,22 @@ "gproc": {:hex, :gproc, "0.9.0", "853ccb7805e9ada25d227a157ba966f7b34508f386a3e7e21992b1b484230699", [:rebar3], [], "hexpm", "587e8af698ccd3504cf4ba8d90f893ede2b0f58cabb8a916e2bf9321de3cf10b"}, "grpc": {:git, "https://github.com/emqx/grpc-erl.git", "f8ba39eb075fb2a7f370563045d5e5d0914f2703", [tag: "0.6.2"]}, "gun": {:git, "https://github.com/emqx/gun.git", "e1b5e14139e2a936ad6561bf960f70f1e80b81e2", [tag: "1.3.4"]}, - "hocon": {:git, "https://github.com/emqx/hocon.git", "6eee3287699af968a0fc9966b9c1f114dbeb101d", []}, + "hocon": {:git, "https://github.com/emqx/hocon.git", "b86b108e84b4576fb41e3b1c0175da0974fc52a9", []}, "jiffy": {:git, "https://github.com/emqx/jiffy.git", "baa1f4e750ae3c5c9e54f9c2e52280b7fc24a8d9", [tag: "1.0.5"]}, "jose": {:hex, :jose, "1.11.2", "f4c018ccf4fdce22c71e44d471f15f723cb3efab5d909ab2ba202b5bf35557b3", [:mix, :rebar3], [], "hexpm", "98143fbc48d55f3a18daba82d34fe48959d44538e9697c08f34200fa5f0947d2"}, "jsx": {:hex, :jsx, "3.1.0", "d12516baa0bb23a59bb35dccaf02a1bd08243fcbb9efe24f2d9d056ccff71268", [:rebar3], [], "hexpm", "0c5cc8fdc11b53cc25cf65ac6705ad39e54ecc56d1c22e4adb8f5a53fb9427f3"}, "luerl": {:git, "https://github.com/rvirding/luerl.git", "fc668ef337b04ca2aee83c648cf6853dab8babe6", []}, "lwm2m_coap": {:git, "https://github.com/emqx/lwm2m-coap.git", "495f3c62fae153040c89f9a0ba5344789ff5acc8", [tag: "v2.0.0"]}, - "minirest": {:git, "https://github.com/emqx/minirest.git", "d7a4a8080cae4a9299f221ce16cfb90bd7d80697", [tag: "0.3.5"]}, + "minirest": {:git, "https://github.com/emqx/minirest.git", "27ad0a8eaf10dca7748b0f63aa66754486a39dd5", [tag: "1.1.7"]}, "mysql": {:git, "https://github.com/emqx/mysql-otp.git", "bdabac44cc8836a9e23897b7e1b77c7df7e04f70", [tag: "1.7.1"]}, "pbkdf2": {:git, "https://github.com/emqx/erlang-pbkdf2.git", "45d9981209ea07a83a58cf85aaf8236457da4342", [tag: "2.0.4"]}, "poolboy": {:git, "https://github.com/emqx/poolboy.git", "29be47db8c2be38b18c908e43a80ebb7b9b6116b", [tag: "1.5.2"]}, "prometheus": {:git, "https://github.com/emqx/prometheus.erl.git", "a41488df09472448057d264ef520cf2f71d925f8", [tag: "v3.1.1"]}, - "ranch": {:git, "https://github.com/ninenines/ranch", "3190aef88aea04d6dce8545fe9b4574288903f44", [tag: "1.7.1"]}, + "ranch": {:hex, :ranch, "2.0.0", "fbf3d79661c071543256f9051caf19d65daa6df1cf6824d8f37a49b19a66f703", [:rebar3], [], "hexpm", "c20a4840c7d6623c19812d3a7c828b2f1bd153ef0f124cb69c54fe51d8a42ae0"}, "recon": {:hex, :recon, "2.5.2", "cba53fa8db83ad968c9a652e09c3ed7ddcc4da434f27c3eaa9ca47ffb2b1ff03", [:mix, :rebar3], [], "hexpm", "2c7523c8dee91dff41f6b3d63cba2bd49eb6d2fe5bf1eec0df7f87eb5e230e1c"}, "replayq": {:git, "https://github.com/emqx/replayq", "9e5ba14d65ff1885ad85b6d33a859c01c322f273", [tag: "0.3.1"]}, "snabbkaffe": {:git, "https://github.com/kafka4beam/snabbkaffe.git", "ea1fbffddf8a3b5939bff61cc72ba45c3dceb058", [tag: "0.14.0"]}, "ssl_verify_fun": {:git, "https://github.com/deadtrickster/ssl_verify_fun.erl.git", "c5718226b0b9f3d1a38ef6ca3c3b4c75f53dda92", [tag: "1.1.4"]}, - "typerefl": {:git, "https://github.com/k32/typerefl.git", "a1fdd359741a7f58498a6682eb608ff77939b22e", [tag: "0.6.2"]}, + "trails": {:hex, :trails, "2.3.0", "b09703f056705f4943e14fff077b98c711a6f48fad40f4ff0b350794074ad69c", [:rebar3], [{:cowboy, "2.8.0", [hex: :cowboy, repo: "hexpm", optional: false]}, {:ranch, "2.0.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "40804001eb80417aa9d02400f39b7216956c3f251539a8a6096a69b3fac0ea07"}, + "typerefl": {:git, "https://github.com/k32/typerefl.git", "275d0083530fc672eb22b3f4f6c0fd81fe3e86f2", [tag: "0.8.3"]}, }