diff --git a/apps/emqx_exhook/docs/design.md b/apps/emqx_exhook/docs/design-cn.md similarity index 93% rename from apps/emqx_exhook/docs/design.md rename to apps/emqx_exhook/docs/design-cn.md index 671e240cc..6686e96e3 100644 --- a/apps/emqx_exhook/docs/design.md +++ b/apps/emqx_exhook/docs/design-cn.md @@ -19,7 +19,7 @@ 2. 将 `emqx-extension-hook` 重命名为 `emqx-exhook` -旧版本的设计参考:[emqx-extension-hook design in v4.2.0](https://github.com/emqx/emqx-exhook/blob/v4.2.0/docs/design.md) +旧版本的设计:[emqx-extension-hook design in v4.2.0](https://github.com/emqx/emqx-exhook/blob/v4.2.0/docs/design.md) ## 设计 @@ -39,13 +39,13 @@ `emqx-exhook` 通过 gRPC 的方式向用户部署的 gRPC 服务发送钩子的请求,并处理其返回的值。 -和 emqx 原生的钩子一致,emqx-exhook 也支持链式的方式计算和返回: +和 emqx 原生的钩子一致,emqx-exhook 也按照链式的方式执行: ### gRPC 服务示例 -用户需要实现的方法,和数据类型的定义在 `priv/protos/exhook.proto` 文件中。例如,其支持的接口有: +用户需要实现的方法,和数据类型的定义在 `priv/protos/exhook.proto` 文件中: ```protobuff syntax = "proto3"; diff --git a/apps/emqx_exhook/rebar.config b/apps/emqx_exhook/rebar.config index d2e437b8b..883aad9bd 100644 --- a/apps/emqx_exhook/rebar.config +++ b/apps/emqx_exhook/rebar.config @@ -1,11 +1,11 @@ %%-*- mode: erlang -*- {plugins, [rebar3_proper, - {grpc_plugin, {git, "https://github.com/HJianBo/grpcbox_plugin", {tag, "v0.10.0"}}} + {grpc_plugin, {git, "https://github.com/HJianBo/grpc_plugin", {tag, "v0.10.2"}}} ]}. {deps, - [{grpc, {git, "https://github.com/emqx/grpc", {tag, "0.6.0"}}} + [{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.2"}}} ]}. {grpc, @@ -15,7 +15,9 @@ ]}. {provider_hooks, - [{pre, [{compile, {grpc, gen}}]}]}. + [{pre, [{compile, {grpc, gen}}, + {clean, {grpc, clean}}]} +]}. {edoc_opts, [{preprocess, true}]}. diff --git a/apps/emqx_exhook/src/emqx_exhook_handler.erl b/apps/emqx_exhook/src/emqx_exhook_handler.erl index 3a35073ca..c565d5dee 100644 --- a/apps/emqx_exhook/src/emqx_exhook_handler.erl +++ b/apps/emqx_exhook/src/emqx_exhook_handler.erl @@ -42,6 +42,12 @@ , on_session_terminated/3 ]). +-export([ on_message_publish/1 + , on_message_dropped/3 + , on_message_delivered/2 + , on_message_acked/2 + ]). + %% Utils -export([ message/1 , stringfy/1 @@ -71,8 +77,15 @@ , {'session.discarded', {?MODULE, on_session_discarded, []}} , {'session.takeovered', {?MODULE, on_session_takeovered, []}} , {'session.terminated', {?MODULE, on_session_terminated, []}} + %]). + , {'message.publish', {?MODULE, on_message_publish, []}} + , {'message.delivered', {?MODULE, on_message_delivered, []}} + , {'message.acked', {?MODULE, on_message_acked, []}} + , {'message.dropped', {?MODULE, on_message_dropped, []}} ]). + + %%-------------------------------------------------------------------- %% Clients %%-------------------------------------------------------------------- @@ -185,6 +198,45 @@ on_session_terminated(ClientInfo, Reason, _SessInfo) -> reason => stringfy(Reason)}, cast('session.terminated', Req). +%%-------------------------------------------------------------------- +%% Message +%%-------------------------------------------------------------------- + +on_message_publish(#message{topic = <<"$SYS/", _/binary>>}) -> + ok; +on_message_publish(Message) -> + Req = #{message => message(Message)}, + case call_fold('message.publish', Req, + fun emqx_exhook_handler:merge_responsed_message/2) of + {StopOrOk, #{message := NMessage}} -> + {StopOrOk, assign_to_message(NMessage, Message)}; + _ -> {ok, Message} + end. + +on_message_dropped(#message{topic = <<"$SYS/", _/binary>>}, _By, _Reason) -> + ok; +on_message_dropped(Message, _By, Reason) -> + Req = #{message => message(Message), + reason => stringfy(Reason) + }, + cast('message.dropped', Req). + +on_message_delivered(_ClientInfo, #message{topic = <<"$SYS/", _/binary>>}) -> + ok; +on_message_delivered(ClientInfo, Message) -> + Req = #{clientinfo => clientinfo(ClientInfo), + message => message(Message) + }, + cast('message.delivered', Req). + +on_message_acked(_ClientInfo, #message{topic = <<"$SYS/", _/binary>>}) -> + ok; +on_message_acked(ClientInfo, Message) -> + Req = #{clientinfo => clientinfo(ClientInfo), + message => message(Message) + }, + cast('message.acked', Req). + %%-------------------------------------------------------------------- %% Types diff --git a/apps/emqx_exproto/rebar.config b/apps/emqx_exproto/rebar.config index 88831ce15..61677f0e3 100644 --- a/apps/emqx_exproto/rebar.config +++ b/apps/emqx_exproto/rebar.config @@ -9,11 +9,11 @@ {parse_transform}]}. {plugins, [rebar3_proper, - {grpc_plugin, {git, "https://github.com/HJianBo/grpcbox_plugin", {tag, "v0.10.0"}}} + {grpc_plugin, {git, "https://github.com/HJianBo/grpc_plugin", {tag, "v0.10.2"}}} ]}. {deps, - [{grpc, {git, "https://github.com/emqx/grpc", {tag, "0.6.0"}}} + [{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.2"}}} ]}. {grpc, @@ -21,10 +21,12 @@ {protos, ["priv/protos"]}, {gpb_opts, [{module_name_prefix, "emqx_"}, {module_name_suffix, "_pb"}]} - ]}. +]}. {provider_hooks, - [{pre, [{compile, {grpc, gen}}]}]}. + [{pre, [{compile, {grpc, gen}}, + {clean, {grpc, clean}}]} +]}. {xref_checks, [undefined_function_calls, undefined_functions, locals_not_used, deprecated_function_calls,