feat(coap): support subscribe/unsubscribe operations

This commit is contained in:
JianBo He 2021-12-17 10:21:55 +08:00
parent 6b8bdfd113
commit bfbf377a45
5 changed files with 48 additions and 9 deletions

View File

@ -226,11 +226,50 @@ handle_call({send_request, Msg}, From, Channel) ->
Result = call_session(handle_out, {{send_request, From}, Msg}, Channel),
erlang:setelement(1, Result, noreply);
handle_call({subscribe, _Topic, _SubOpts}, _From, Channel) ->
{reply, {error, nosupport}, Channel};
handle_call({subscribe, Topic, SubOpts}, _From,
Channel = #channel{
ctx = Ctx,
clientinfo = ClientInfo
= #{clientid := ClientId,
mountpoint := Mountpoint},
session = Session}) ->
Token = maps:get(token,
maps:get(sub_props, SubOpts, #{}),
undefined),
NSubOpts = maps:merge(
emqx_gateway_utils:default_subopts(),
SubOpts),
MountedTopic = emqx_mountpoint:mount(Mountpoint, Topic),
_ = emqx_broker:subscribe(MountedTopic, ClientId, NSubOpts),
handle_call({unsubscribe, _Topic}, _From, Channel) ->
{reply, {error, noimpl}, Channel};
_ = run_hooks(Ctx, 'session.subscribed',
[ClientInfo, MountedTopic, NSubOpts]),
%% modifty session state
SubReq = {Topic, Token},
TempMsg = #coap_message{},
Result = emqx_coap_session:process_subscribe(
SubReq, TempMsg, #{}, Session),
NSession = maps:get(session, Result),
{reply, ok, Channel#channel{session = NSession}};
handle_call({unsubscribe, Topic}, _From,
Channel = #channel{
ctx = Ctx,
clientinfo = ClientInfo
= #{mountpoint := Mountpoint},
session = Session}) ->
MountedTopic = emqx_mountpoint:mount(Mountpoint, Topic),
ok = emqx_broker:unsubscribe(MountedTopic),
_ = run_hooks(Ctx, 'session.unsubscribe',
[ClientInfo, MountedTopic, #{}]),
%% modifty session state
UnSubReq = Topic,
TempMsg = #coap_message{},
Result = emqx_coap_session:process_subscribe(
UnSubReq, TempMsg, #{}, Session),
NSession = maps:get(session, Result),
{reply, ok, Channel#channel{session = NSession}};
handle_call(subscriptions, _From, Channel = #channel{session = Session}) ->
Subs = emqx_coap_session:info(subscriptions, Session),

View File

@ -53,8 +53,8 @@ out(Msg, Result) ->
proto_out(Proto) ->
proto_out(Proto, #{}).
proto_out(Proto, Resut) ->
Resut#{proto => Proto}.
proto_out(Proto, Result) ->
Result#{proto => Proto}.
reply(Method, Req) when not is_record(Method, coap_message) ->
reply(Method, <<>>, Req);

View File

@ -146,7 +146,7 @@ subscribe(#coap_message{token = Token} = Msg, Topic, Ctx, CInfo) ->
SubOpts = get_sub_opts(Msg),
MountTopic = mount(CInfo, Topic),
emqx_broker:subscribe(MountTopic, ClientId, SubOpts),
run_hooks(Ctx, 'session.subscribed', [CInfo, Topic, SubOpts]),
run_hooks(Ctx, 'session.subscribed', [CInfo, MountTopic, SubOpts]),
?SUB(MountTopic, Token, Msg);
_ ->
reply({error, unauthorized}, Msg)

View File

@ -16,8 +16,6 @@
-module(emqx_gateway).
-behaviour(emqx_config_handler).
-include("include/emqx_gateway.hrl").
%% Gateway APIs

View File

@ -17,6 +17,8 @@
%% @doc The gateway configuration management module
-module(emqx_gateway_conf).
-behaviour(emqx_config_handler).
%% Load/Unload
-export([ load/0
, unload/0