From bad0c35bb96de6fea7840e70f0d36c0370499b63 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 8 Jun 2023 15:25:44 +0800 Subject: [PATCH] fix(stomp): ensure the subscripton_cnt timely updated --- .../src/emqx_stomp_channel.erl | 16 +++++++++++++--- .../emqx_gateway_stomp/test/emqx_stomp_SUITE.erl | 16 ++++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl index 316432dea..7a16792a0 100644 --- a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl +++ b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl @@ -499,7 +499,7 @@ handle_in( [{MountedTopic, SubOpts} | _] -> NSubs = [{SubId, MountedTopic, Ack, SubOpts} | Subs], NChannel1 = NChannel#channel{subscriptions = NSubs}, - handle_out(receipt, receipt_id(Headers), NChannel1) + handle_out_and_update(receipt, receipt_id(Headers), NChannel1) end; {error, ErrMsg, NChannel} -> ?SLOG(error, #{ @@ -541,7 +541,7 @@ handle_in( false -> {ok, Channel} end, - handle_out(receipt, receipt_id(Headers), NChannel); + handle_out_and_update(receipt, receipt_id(Headers), NChannel); %% XXX: How to ack a frame ??? handle_in(Frame = ?PACKET(?CMD_ACK, Headers), Channel) -> case header(<<"transaction">>, Headers) of @@ -769,6 +769,12 @@ handle_out(receipt, ReceiptId, Channel) -> Frame = receipt_frame(ReceiptId), {ok, {outgoing, Frame}, Channel}. +handle_out_and_update(receipt, undefined, Channel) -> + {ok, [{event, updated}], Channel}; +handle_out_and_update(receipt, ReceiptId, Channel) -> + Frame = receipt_frame(ReceiptId), + {ok, [{outgoing, Frame}, {event, updated}], Channel}. + %%-------------------------------------------------------------------- %% Handle call %%-------------------------------------------------------------------- @@ -812,7 +818,7 @@ handle_call( ), NSubs = [{SubId, MountedTopic, <<"auto">>, NSubOpts} | Subs], NChannel1 = NChannel#channel{subscriptions = NSubs}, - reply({ok, {MountedTopic, NSubOpts}}, NChannel1); + reply({ok, {MountedTopic, NSubOpts}}, [{event, updated}], NChannel1); {error, ErrMsg, NChannel} -> ?SLOG(error, #{ msg => "failed_to_subscribe_topic", @@ -841,6 +847,7 @@ handle_call( ), reply( ok, + [{event, updated}], Channel#channel{ subscriptions = lists:keydelete(MountedTopic, 2, Subs) } @@ -1107,6 +1114,9 @@ terminate(Reason, #channel{ reply(Reply, Channel) -> {reply, Reply, Channel}. +reply(Reply, Msgs, Channel) -> + {reply, Reply, Msgs, Channel}. + shutdown(Reason, Channel) -> {shutdown, Reason, Channel}. diff --git a/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl b/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl index 4323cf32f..b4a8fe139 100644 --- a/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl +++ b/apps/emqx_gateway_stomp/test/emqx_stomp_SUITE.erl @@ -256,6 +256,10 @@ t_subscribe(_) -> ] ), + %% assert subscription stats + [ClientInfo1] = clients(), + ?assertMatch(#{subscriptions_cnt := 1}, ClientInfo1), + %% Unsubscribe gen_tcp:send( Sock, @@ -278,6 +282,10 @@ t_subscribe(_) -> }, _, _} = parse(Data2), + %% assert subscription stats + [ClientInfo2] = clients(), + ?assertMatch(#{subscriptions_cnt := 0}, ClientInfo2), + gen_tcp:send( Sock, serialize( @@ -802,10 +810,14 @@ t_rest_clienit_info(_) -> {200, Subs1} = request(get, ClientPath ++ "/subscriptions"), ?assertEqual(2, length(Subs1)), + {200, StompClient2} = request(get, ClientPath), + ?assertMatch(#{subscriptions_cnt := 2}, StompClient2), {204, _} = request(delete, ClientPath ++ "/subscriptions/t%2Fa"), {200, Subs2} = request(get, ClientPath ++ "/subscriptions"), ?assertEqual(1, length(Subs2)), + {200, StompClient3} = request(get, ClientPath), + ?assertMatch(#{subscriptions_cnt := 1}, StompClient3), %% kickout {204, _} = request(delete, ClientPath), @@ -855,3 +867,7 @@ get_field(command, #stomp_frame{command = Command}) -> Command; get_field(body, #stomp_frame{body = Body}) -> Body. + +clients() -> + {200, Clients} = request(get, "/gateways/stomp/clients"), + maps:get(data, Clients).