From 1a8b59045b8439360f0a4a4300b8bdf3819e7f3c Mon Sep 17 00:00:00 2001 From: firest Date: Fri, 8 Dec 2023 20:55:20 +0800 Subject: [PATCH] fix(gbt): Add unimplemented command call --- .../src/emqx_gbt32960_channel.erl | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl b/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl index 5cb65f104..65a696a16 100644 --- a/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl +++ b/apps/emqx_gateway_gbt32960/src/emqx_gbt32960_channel.erl @@ -51,6 +51,8 @@ inflight :: emqx_inflight:inflight(), %% Message Queue mqueue :: queue:queue(), + %% Subscriptions + subscriptions :: map(), retx_interval, retx_max_times, max_mqueue_len @@ -115,7 +117,7 @@ stats(#channel{inflight = Inflight, mqueue = Queue}) -> %% XXX: A fake stats for managed by emqx_management [ {subscriptions_cnt, 1}, - {subscriptions_max, 0}, + {subscriptions_max, 1}, {inflight_cnt, emqx_inflight:size(Inflight)}, {inflight_max, emqx_inflight:max_size(Inflight)}, {mqueue_len, queue:len(Queue)}, @@ -181,6 +183,7 @@ init( clientinfo = ClientInfo, inflight = emqx_inflight:new(1), mqueue = queue:new(), + subscriptions = #{}, timers = #{}, conn_state = idle, retx_interval = RetxInterv, @@ -370,9 +373,15 @@ handle_call(kick, _From, Channel) -> disconnect_and_shutdown(kicked, ok, Channel1); handle_call(discard, _From, Channel) -> disconnect_and_shutdown(discarded, ok, Channel); +handle_call({subscribe, _Topic, _SubOpts}, _From, Channel) -> + reply({error, not_support}, Channel); +handle_call({unsubscribe, _Topic}, _From, Channel) -> + reply({error, not_found}, Channel); +handle_call(subscriptions, _From, Channel = #channel{subscriptions = Subscriptions}) -> + reply({ok, maps:to_list(Subscriptions)}, Channel); handle_call(Req, _From, Channel) -> log(error, #{msg => "unexpected_call", call => Req}, Channel), - reply(ignored, Channel). + reply({error, unexpected_call}, Channel). %%-------------------------------------------------------------------- %% Handle cast @@ -606,8 +615,8 @@ process_connect( ) of {ok, #{session := Session}} -> - NChannel = Channel#channel{session = Session}, - subscribe_downlink(?DEFAULT_DOWNLINK_TOPIC, Channel), + NChannel0 = Channel#channel{session = Session}, + NChannel = subscribe_downlink(?DEFAULT_DOWNLINK_TOPIC, NChannel0), _ = upstreaming(Frame, NChannel), %% XXX: connection_accepted is not defined by stomp protocol _ = run_hooks(Ctx, 'client.connack', [ConnInfo, connection_accepted, #{}]), @@ -854,11 +863,13 @@ subscribe_downlink( #{ clientid := ClientId, mountpoint := Mountpoint - } - } + }, + subscriptions = Subscriptions + } = Channel ) -> {ParsedTopic, SubOpts0} = emqx_topic:parse(Topic), SubOpts = maps:merge(emqx_gateway_utils:default_subopts(), SubOpts0), MountedTopic = emqx_mountpoint:mount(Mountpoint, ParsedTopic), _ = emqx_broker:subscribe(MountedTopic, ClientId, SubOpts), - run_hooks(Ctx, 'session.subscribed', [ClientInfo, MountedTopic, SubOpts]). + run_hooks(Ctx, 'session.subscribed', [ClientInfo, MountedTopic, SubOpts]), + Channel#channel{subscriptions = Subscriptions#{MountedTopic => SubOpts}}.