fix(exhook): guarantee sequential dispatch of client's events from the same node

fix: https://github.com/emqx/emqx/issues/7569
This commit is contained in:
JianBo He 2022-06-24 14:13:27 +08:00
parent 08976c6946
commit 9bb3533d11
3 changed files with 16 additions and 4 deletions

View File

@ -1,6 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{VSN, {VSN,
[ {"4.3.5", [ [
{"4.3.5", [
{load_module, emqx_exhook_server, brutal_purge, soft_purge, []} {load_module, emqx_exhook_server, brutal_purge, soft_purge, []}
]}, ]},
{"4.3.4", [ {"4.3.4", [
@ -16,7 +17,8 @@
]}, ]},
{<<".*">>, []} {<<".*">>, []}
], ],
[ {"4.3.5", [ [
{"4.3.5", [
{load_module, emqx_exhook_server, brutal_purge, soft_purge, []} {load_module, emqx_exhook_server, brutal_purge, soft_purge, []}
]}, ]},
{"4.3.4", [ {"4.3.4", [

View File

@ -273,7 +273,7 @@ match_topic_filter(TopicName, TopicFilter) ->
-spec do_call(string(), atom(), map(), map()) -> {ok, map()} | {error, term()}. -spec do_call(string(), atom(), map(), map()) -> {ok, map()} | {error, term()}.
do_call(ChannName, Fun, Req, ReqOpts) -> do_call(ChannName, Fun, Req, ReqOpts) ->
NReq = Req#{meta => emqx_exhook:request_meta()}, NReq = Req#{meta => emqx_exhook:request_meta()},
Options = ReqOpts#{channel => ChannName}, Options = ReqOpts#{channel => ChannName, key_dispatch => key_dispatch(NReq)},
?LOG(debug, "Call ~0p:~0p(~0p, ~0p)", [?PB_CLIENT_MOD, Fun, NReq, Options]), ?LOG(debug, "Call ~0p:~0p(~0p, ~0p)", [?PB_CLIENT_MOD, Fun, NReq, Options]),
case catch apply(?PB_CLIENT_MOD, Fun, [NReq, Options]) of case catch apply(?PB_CLIENT_MOD, Fun, [NReq, Options]) of
{ok, Resp, _Metadata} -> {ok, Resp, _Metadata} ->
@ -331,3 +331,13 @@ available_hooks() ->
'session.created', 'session.subscribed', 'session.unsubscribed', 'session.created', 'session.subscribed', 'session.unsubscribed',
'session.resumed', 'session.discarded', 'session.takeovered', 'session.resumed', 'session.discarded', 'session.takeovered',
'session.terminated' | message_hooks()]. 'session.terminated' | message_hooks()].
%% @doc Get dispatch_key for each request
key_dispatch(_Req = #{clientinfo := #{clientid := ClientId}}) ->
ClientId;
key_dispatch(_Req = #{conninfo := #{clientid := ClientId}}) ->
ClientId;
key_dispatch(_Req = #{message := #{from := From}}) ->
From;
key_dispatch(_Req) ->
self().

View File

@ -62,7 +62,7 @@
, {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.1"}}} , {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.1"}}}
, {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.13"}}} , {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.13"}}}
, {epgsql, {git, "https://github.com/emqx/epgsql.git", {tag, "4.6.0"}}} , {epgsql, {git, "https://github.com/emqx/epgsql.git", {tag, "4.6.0"}}}
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.5"}}} , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.6"}}}
]}. ]}.
{xref_ignores, {xref_ignores,