From d77d5e33bc2143a946131825a27321b9ea3b7bc2 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 18 May 2023 14:48:37 +0800 Subject: [PATCH] fix(mqttsn): Instantly refresh client info after subscribed/unsubscribed --- apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl index ae1da5dac..914f837e1 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl @@ -1791,14 +1791,14 @@ message_to_packet( handle_call({subscribe, Topic, SubOpts}, _From, Channel) -> case do_subscribe({?SN_INVALID_TOPIC_ID, Topic, SubOpts}, Channel) of {ok, {_, NTopicName, NSubOpts}, NChannel} -> - reply({ok, {NTopicName, NSubOpts}}, NChannel); + reply_and_update({ok, {NTopicName, NSubOpts}}, NChannel); {error, ?SN_RC2_EXCEED_LIMITATION} -> reply({error, exceed_limitation}, Channel) end; handle_call({unsubscribe, Topic}, _From, Channel) -> TopicFilters = [emqx_topic:parse(Topic)], {ok, _, NChannel} = do_unsubscribe(TopicFilters, Channel), - reply(ok, NChannel); + reply_and_update(ok, NChannel); handle_call(subscriptions, _From, Channel = #channel{session = Session}) -> reply({ok, maps:to_list(emqx_session:info(subscriptions, Session))}, Channel); handle_call(kick, _From, Channel) -> @@ -2192,6 +2192,9 @@ terminate(_Reason, _Channel) -> reply(Reply, Channel) -> {reply, Reply, Channel}. +reply_and_update(Reply, Channel) -> + {reply, Reply, [{event, updated}], Channel}. + shutdown(Reason, Channel) -> {shutdown, Reason, Channel}.