diff --git a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl index efffabee3..bfd505750 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl @@ -232,8 +232,9 @@ handle_call({subscribe, _Topic, _SubOpts}, _From, Channel) -> handle_call({unsubscribe, _Topic}, _From, Channel) -> {reply, {error, noimpl}, Channel}; -handle_call(subscriptions, _From, Channel) -> - {reply, {error, noimpl}, Channel}; +handle_call(subscriptions, _From, Channel = #channel{session = Session}) -> + Subs = emqx_coap_session:info(subscriptions, Session), + {reply, {ok, maps:to_list(Subs)}, Channel}; handle_call(kick, _From, Channel) -> NChannel = ensure_disconnected(kicked, Channel), diff --git a/apps/emqx_gateway/src/coap/emqx_coap_observe_res.erl b/apps/emqx_gateway/src/coap/emqx_coap_observe_res.erl index 20473322e..dbec52d2b 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_observe_res.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_observe_res.erl @@ -23,7 +23,6 @@ -define(MAX_SEQ_ID, 16777215). --type topic() :: binary(). -type token() :: binary(). -type seq_id() :: 0 .. ?MAX_SEQ_ID. @@ -31,7 +30,7 @@ , seq_id := seq_id() }. --type manager() :: #{topic => res()}. +-type manager() :: #{emqx_types:topic() => res()}. %%-------------------------------------------------------------------- %% API @@ -40,7 +39,7 @@ new_manager() -> #{}. --spec insert(topic(), token(), manager()) -> {seq_id(), manager()}. +-spec insert(emqx_types:topic(), token(), manager()) -> {seq_id(), manager()}. insert(Topic, Token, Manager) -> Res = case maps:get(Topic, Manager, undefined) of undefined -> @@ -50,11 +49,11 @@ insert(Topic, Token, Manager) -> end, {maps:get(seq_id, Res), Manager#{Topic => Res}}. --spec remove(topic(), manager()) -> manager(). +-spec remove(emqx_types:topic(), manager()) -> manager(). remove(Topic, Manager) -> maps:remove(Topic, Manager). --spec res_changed(topic(), manager()) -> undefined | {token(), seq_id(), manager()}. +-spec res_changed(emqx_types:topic(), manager()) -> undefined | {token(), seq_id(), manager()}. res_changed(Topic, Manager) -> case maps:get(Topic, Manager, undefined) of undefined -> @@ -73,6 +72,7 @@ foreach(F, Manager) -> Manager), ok. +-spec subscriptions(manager()) -> [emqx_types:topic()]. subscriptions(Manager) -> maps:keys(Manager). diff --git a/apps/emqx_gateway/src/coap/emqx_coap_session.erl b/apps/emqx_gateway/src/coap/emqx_coap_session.erl index f94b4e9b3..9c3a8c451 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_session.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_session.erl @@ -91,7 +91,8 @@ info(Session) -> info(Keys, Session) when is_list(Keys) -> [{Key, info(Key, Session)} || Key <- Keys]; info(subscriptions, #session{observe_manager = OM}) -> - emqx_coap_observe_res:subscriptions(OM); + Topics = emqx_coap_observe_res:subscriptions(OM), + lists:foldl(fun(T, Acc) -> Acc#{T => ?DEFAULT_SUBOPTS} end, #{}, Topics); info(subscriptions_cnt, #session{observe_manager = OM}) -> erlang:length(emqx_coap_observe_res:subscriptions(OM)); info(subscriptions_max, _) -> diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl index 58b497cfc..bb72d24e6 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl @@ -351,6 +351,9 @@ handle_call({unsubscribe, Topic}, _From, Channel) -> {ok, NChannel} = do_unsubscribe([Topic], Channel), {reply, ok, NChannel}; +handle_call(subscriptions, _From, Channel = #channel{subscriptions = Subs}) -> + {reply, {ok, maps:to_list(Subs)}, Channel}; + handle_call({publish, Topic, Qos, Payload}, _From, Channel = #channel{ ctx = Ctx, @@ -369,7 +372,10 @@ handle_call({publish, Topic, Qos, Payload}, _From, end; handle_call(kick, _From, Channel) -> - {shutdown, kicked, ok, Channel}; + {shutdown, kicked, ok, ensure_disconnected(kicked, Channel)}; + +handle_call(discard, _From, Channel) -> + {shutdown, discarded, ok, Channel}; handle_call(Req, _From, Channel) -> ?SLOG(warning, #{ msg => "unexpected_call" diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index f9cfa3cbe..0b1bdde73 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -1214,7 +1214,7 @@ handle_call({unsubscribe, Topic}, _From, Channel) -> reply(ok, NChannel); handle_call(subscriptions, _From, Channel = #channel{session = Session}) -> - reply(maps:to_list(emqx_session:info(subscriptions, Session)), Channel); + reply({ok, maps:to_list(emqx_session:info(subscriptions, Session))}, Channel); handle_call(kick, _From, Channel) -> NChannel = ensure_disconnected(kicked, Channel), diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl index 48cef70f8..17a7644ec 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl @@ -688,11 +688,11 @@ handle_call({unsubscribe, Topic}, _From, %% Reply :: [{emqx_types:topic(), emqx_types:subopts()}] handle_call(subscriptions, _From, Channel = #channel{subscriptions = Subs}) -> - Reply = lists:map( + NSubs = lists:map( fun({_SubId, Topic, _Ack, SubOpts}) -> {Topic, SubOpts} end, Subs), - reply(Reply, Channel); + reply({ok, NSubs}, Channel); handle_call(kick, _From, Channel) -> NChannel = ensure_disconnected(kicked, Channel),