From 9bb3533d112f1ed05474146e03844916a135986c Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 24 Jun 2022 14:13:27 +0800 Subject: [PATCH] fix(exhook): guarantee sequential dispatch of client's events from the same node fix: https://github.com/emqx/emqx/issues/7569 --- apps/emqx_exhook/src/emqx_exhook.appup.src | 6 ++++-- apps/emqx_exhook/src/emqx_exhook_server.erl | 12 +++++++++++- rebar.config | 2 +- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/apps/emqx_exhook/src/emqx_exhook.appup.src b/apps/emqx_exhook/src/emqx_exhook.appup.src index d3b73d98a..f286255f0 100644 --- a/apps/emqx_exhook/src/emqx_exhook.appup.src +++ b/apps/emqx_exhook/src/emqx_exhook.appup.src @@ -1,6 +1,7 @@ %% -*- mode: erlang -*- {VSN, - [ {"4.3.5", [ + [ + {"4.3.5", [ {load_module, emqx_exhook_server, brutal_purge, soft_purge, []} ]}, {"4.3.4", [ @@ -16,7 +17,8 @@ ]}, {<<".*">>, []} ], - [ {"4.3.5", [ + [ + {"4.3.5", [ {load_module, emqx_exhook_server, brutal_purge, soft_purge, []} ]}, {"4.3.4", [ diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index e767cd84b..93568783a 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -273,7 +273,7 @@ match_topic_filter(TopicName, TopicFilter) -> -spec do_call(string(), atom(), map(), map()) -> {ok, map()} | {error, term()}. do_call(ChannName, Fun, Req, ReqOpts) -> NReq = Req#{meta => emqx_exhook:request_meta()}, - Options = ReqOpts#{channel => ChannName}, + Options = ReqOpts#{channel => ChannName, key_dispatch => key_dispatch(NReq)}, ?LOG(debug, "Call ~0p:~0p(~0p, ~0p)", [?PB_CLIENT_MOD, Fun, NReq, Options]), case catch apply(?PB_CLIENT_MOD, Fun, [NReq, Options]) of {ok, Resp, _Metadata} -> @@ -331,3 +331,13 @@ available_hooks() -> 'session.created', 'session.subscribed', 'session.unsubscribed', 'session.resumed', 'session.discarded', 'session.takeovered', 'session.terminated' | message_hooks()]. + +%% @doc Get dispatch_key for each request +key_dispatch(_Req = #{clientinfo := #{clientid := ClientId}}) -> + ClientId; +key_dispatch(_Req = #{conninfo := #{clientid := ClientId}}) -> + ClientId; +key_dispatch(_Req = #{message := #{from := From}}) -> + From; +key_dispatch(_Req) -> + self(). diff --git a/rebar.config b/rebar.config index 1cd36d47c..7c5e01ff9 100644 --- a/rebar.config +++ b/rebar.config @@ -62,7 +62,7 @@ , {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.1"}}} , {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.13"}}} , {epgsql, {git, "https://github.com/emqx/epgsql.git", {tag, "4.6.0"}}} - , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.5"}}} + , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.6"}}} ]}. {xref_ignores,