chore(gw): support to get subscriptions from coap

This commit is contained in:
JianBo He 2021-12-16 18:05:10 +08:00
parent 3443aeff18
commit 6b8bdfd113
6 changed files with 20 additions and 12 deletions

View File

@ -232,8 +232,9 @@ handle_call({subscribe, _Topic, _SubOpts}, _From, Channel) ->
handle_call({unsubscribe, _Topic}, _From, Channel) -> handle_call({unsubscribe, _Topic}, _From, Channel) ->
{reply, {error, noimpl}, Channel}; {reply, {error, noimpl}, Channel};
handle_call(subscriptions, _From, Channel) -> handle_call(subscriptions, _From, Channel = #channel{session = Session}) ->
{reply, {error, noimpl}, Channel}; Subs = emqx_coap_session:info(subscriptions, Session),
{reply, {ok, maps:to_list(Subs)}, Channel};
handle_call(kick, _From, Channel) -> handle_call(kick, _From, Channel) ->
NChannel = ensure_disconnected(kicked, Channel), NChannel = ensure_disconnected(kicked, Channel),

View File

@ -23,7 +23,6 @@
-define(MAX_SEQ_ID, 16777215). -define(MAX_SEQ_ID, 16777215).
-type topic() :: binary().
-type token() :: binary(). -type token() :: binary().
-type seq_id() :: 0 .. ?MAX_SEQ_ID. -type seq_id() :: 0 .. ?MAX_SEQ_ID.
@ -31,7 +30,7 @@
, seq_id := seq_id() , seq_id := seq_id()
}. }.
-type manager() :: #{topic => res()}. -type manager() :: #{emqx_types:topic() => res()}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API
@ -40,7 +39,7 @@
new_manager() -> new_manager() ->
#{}. #{}.
-spec insert(topic(), token(), manager()) -> {seq_id(), manager()}. -spec insert(emqx_types:topic(), token(), manager()) -> {seq_id(), manager()}.
insert(Topic, Token, Manager) -> insert(Topic, Token, Manager) ->
Res = case maps:get(Topic, Manager, undefined) of Res = case maps:get(Topic, Manager, undefined) of
undefined -> undefined ->
@ -50,11 +49,11 @@ insert(Topic, Token, Manager) ->
end, end,
{maps:get(seq_id, Res), Manager#{Topic => Res}}. {maps:get(seq_id, Res), Manager#{Topic => Res}}.
-spec remove(topic(), manager()) -> manager(). -spec remove(emqx_types:topic(), manager()) -> manager().
remove(Topic, Manager) -> remove(Topic, Manager) ->
maps:remove(Topic, Manager). maps:remove(Topic, Manager).
-spec res_changed(topic(), manager()) -> undefined | {token(), seq_id(), manager()}. -spec res_changed(emqx_types:topic(), manager()) -> undefined | {token(), seq_id(), manager()}.
res_changed(Topic, Manager) -> res_changed(Topic, Manager) ->
case maps:get(Topic, Manager, undefined) of case maps:get(Topic, Manager, undefined) of
undefined -> undefined ->
@ -73,6 +72,7 @@ foreach(F, Manager) ->
Manager), Manager),
ok. ok.
-spec subscriptions(manager()) -> [emqx_types:topic()].
subscriptions(Manager) -> subscriptions(Manager) ->
maps:keys(Manager). maps:keys(Manager).

View File

@ -91,7 +91,8 @@ info(Session) ->
info(Keys, Session) when is_list(Keys) -> info(Keys, Session) when is_list(Keys) ->
[{Key, info(Key, Session)} || Key <- Keys]; [{Key, info(Key, Session)} || Key <- Keys];
info(subscriptions, #session{observe_manager = OM}) -> info(subscriptions, #session{observe_manager = OM}) ->
emqx_coap_observe_res:subscriptions(OM); Topics = emqx_coap_observe_res:subscriptions(OM),
lists:foldl(fun(T, Acc) -> Acc#{T => ?DEFAULT_SUBOPTS} end, #{}, Topics);
info(subscriptions_cnt, #session{observe_manager = OM}) -> info(subscriptions_cnt, #session{observe_manager = OM}) ->
erlang:length(emqx_coap_observe_res:subscriptions(OM)); erlang:length(emqx_coap_observe_res:subscriptions(OM));
info(subscriptions_max, _) -> info(subscriptions_max, _) ->

View File

@ -351,6 +351,9 @@ handle_call({unsubscribe, Topic}, _From, Channel) ->
{ok, NChannel} = do_unsubscribe([Topic], Channel), {ok, NChannel} = do_unsubscribe([Topic], Channel),
{reply, ok, NChannel}; {reply, ok, NChannel};
handle_call(subscriptions, _From, Channel = #channel{subscriptions = Subs}) ->
{reply, {ok, maps:to_list(Subs)}, Channel};
handle_call({publish, Topic, Qos, Payload}, _From, handle_call({publish, Topic, Qos, Payload}, _From,
Channel = #channel{ Channel = #channel{
ctx = Ctx, ctx = Ctx,
@ -369,7 +372,10 @@ handle_call({publish, Topic, Qos, Payload}, _From,
end; end;
handle_call(kick, _From, Channel) -> handle_call(kick, _From, Channel) ->
{shutdown, kicked, ok, Channel}; {shutdown, kicked, ok, ensure_disconnected(kicked, Channel)};
handle_call(discard, _From, Channel) ->
{shutdown, discarded, ok, Channel};
handle_call(Req, _From, Channel) -> handle_call(Req, _From, Channel) ->
?SLOG(warning, #{ msg => "unexpected_call" ?SLOG(warning, #{ msg => "unexpected_call"

View File

@ -1214,7 +1214,7 @@ handle_call({unsubscribe, Topic}, _From, Channel) ->
reply(ok, NChannel); reply(ok, NChannel);
handle_call(subscriptions, _From, Channel = #channel{session = Session}) -> handle_call(subscriptions, _From, Channel = #channel{session = Session}) ->
reply(maps:to_list(emqx_session:info(subscriptions, Session)), Channel); reply({ok, maps:to_list(emqx_session:info(subscriptions, Session))}, Channel);
handle_call(kick, _From, Channel) -> handle_call(kick, _From, Channel) ->
NChannel = ensure_disconnected(kicked, Channel), NChannel = ensure_disconnected(kicked, Channel),

View File

@ -688,11 +688,11 @@ handle_call({unsubscribe, Topic}, _From,
%% Reply :: [{emqx_types:topic(), emqx_types:subopts()}] %% Reply :: [{emqx_types:topic(), emqx_types:subopts()}]
handle_call(subscriptions, _From, Channel = #channel{subscriptions = Subs}) -> handle_call(subscriptions, _From, Channel = #channel{subscriptions = Subs}) ->
Reply = lists:map( NSubs = lists:map(
fun({_SubId, Topic, _Ack, SubOpts}) -> fun({_SubId, Topic, _Ack, SubOpts}) ->
{Topic, SubOpts} {Topic, SubOpts}
end, Subs), end, Subs),
reply(Reply, Channel); reply({ok, NSubs}, Channel);
handle_call(kick, _From, Channel) -> handle_call(kick, _From, Channel) ->
NChannel = ensure_disconnected(kicked, Channel), NChannel = ensure_disconnected(kicked, Channel),