feat(gw-mqttsn): support subscribe/unsubscribe operation

This commit is contained in:
JianBo He 2021-09-02 11:32:33 +08:00
parent dc05cdc586
commit 956308f0ca
3 changed files with 93 additions and 57 deletions

View File

@ -20,7 +20,6 @@
-include_lib("emqx/include/types.hrl"). -include_lib("emqx/include/types.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
%% API %% API
-export([ start_link/3 -export([ start_link/3
, stop/1 , stop/1
@ -48,7 +47,6 @@
%% Internal callback %% Internal callback
-export([wakeup_from_hib/2, recvloop/2]). -export([wakeup_from_hib/2, recvloop/2]).
-record(state, { -record(state, {
%% TCP/SSL/UDP/DTLS Wrapped Socket %% TCP/SSL/UDP/DTLS Wrapped Socket
socket :: {esockd_transport, esockd:socket()} | {udp, _, _}, socket :: {esockd_transport, esockd:socket()} | {udp, _, _},

View File

@ -95,9 +95,9 @@
}). }).
-define(DEFAULT_OVERRIDE, -define(DEFAULT_OVERRIDE,
#{ clientid => <<"">> %% Generate clientid by default #{ clientid => <<"${ConnInfo.clientid}">>
, username => <<"${Packet.headers.login}">> %, username => <<"${ConnInfo.clientid}">>
, password => <<"${Packet.headers.passcode}">> %, password => <<"${Packet.headers.passcode}">>
}). }).
-define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]). -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]).
@ -189,9 +189,10 @@ stats(#channel{session = Session})->
set_conn_state(ConnState, Channel) -> set_conn_state(ConnState, Channel) ->
Channel#channel{conn_state = ConnState}. Channel#channel{conn_state = ConnState}.
enrich_conninfo(?SN_CONNECT_MSG(_Flags, _ProtoId, Duration, _ClientId), enrich_conninfo(?SN_CONNECT_MSG(_Flags, _ProtoId, Duration, ClientId),
Channel = #channel{conninfo = ConnInfo}) -> Channel = #channel{conninfo = ConnInfo}) ->
NConnInfo = ConnInfo#{ proto_name => <<"MQTT-SN">> NConnInfo = ConnInfo#{ clientid => ClientId
, proto_name => <<"MQTT-SN">>
, proto_ver => <<"1.2">> , proto_ver => <<"1.2">>
, clean_start => true , clean_start => true
, keepalive => Duration , keepalive => Duration
@ -592,9 +593,11 @@ handle_in(SubPkt = ?SN_SUBSCRIBE_MSG(_, MsgId, _), Channel) ->
case emqx_misc:pipeline( case emqx_misc:pipeline(
[ fun preproc_subs_type/2 [ fun preproc_subs_type/2
, fun check_subscribe_authz/2 , fun check_subscribe_authz/2
, fun run_client_subs_hook/2
, fun do_subscribe/2 , fun do_subscribe/2
], SubPkt, Channel) of ], SubPkt, Channel) of
{ok, {TopicId, GrantedQoS}, NChannel} -> {ok, {TopicId, _TopicName, SubOpts}, NChannel} ->
GrantedQoS = maps:get(qos, SubOpts),
SubAck = ?SN_SUBACK_MSG(#mqtt_sn_flags{qos = GrantedQoS}, SubAck = ?SN_SUBACK_MSG(#mqtt_sn_flags{qos = GrantedQoS},
TopicId, MsgId, ?SN_RC_ACCEPTED), TopicId, MsgId, ?SN_RC_ACCEPTED),
{ok, outgoing_and_update(SubAck), NChannel}; {ok, outgoing_and_update(SubAck), NChannel};
@ -610,6 +613,7 @@ handle_in(UnsubPkt = ?SN_UNSUBSCRIBE_MSG(_, MsgId, TopicIdOrName),
Channel) -> Channel) ->
case emqx_misc:pipeline( case emqx_misc:pipeline(
[ fun preproc_unsub_type/2 [ fun preproc_unsub_type/2
, fun run_client_unsub_hook/2
, fun do_unsubscribe/2 , fun do_unsubscribe/2
], UnsubPkt, Channel) of ], UnsubPkt, Channel) of
{ok, _TopicName, NChannel} -> {ok, _TopicName, NChannel} ->
@ -841,13 +845,10 @@ check_subscribe_authz({_TopicId, TopicName, _QoS},
{error, ?SN_RC_NOT_AUTHORIZE} {error, ?SN_RC_NOT_AUTHORIZE}
end. end.
do_subscribe({TopicId, TopicName, QoS}, run_client_subs_hook({TopicId, TopicName, QoS},
Channel = #channel{ Channel = #channel{
ctx = Ctx, ctx = Ctx,
session = Session, clientinfo = ClientInfo}) ->
clientinfo = ClientInfo
= #{mountpoint := Mountpoint}}) ->
{TopicName1, SubOpts0} = emqx_topic:parse(TopicName), {TopicName1, SubOpts0} = emqx_topic:parse(TopicName),
TopicFilters = [{TopicName1, SubOpts0#{qos => QoS}}], TopicFilters = [{TopicName1, SubOpts0#{qos => QoS}}],
case run_hooks(Ctx, 'client.subscribe', case run_hooks(Ctx, 'client.subscribe',
@ -855,19 +856,26 @@ do_subscribe({TopicId, TopicName, QoS},
[] -> [] ->
?LOG(warning, "Skip to subscribe ~s, " ?LOG(warning, "Skip to subscribe ~s, "
"due to 'client.subscribe' denied!", [TopicName]), "due to 'client.subscribe' denied!", [TopicName]),
{ok, Channel}; {error, ?SN_EXCEED_LIMITATION};
[{NTopicName, NSubOpts}|_] -> [{NTopicName, NSubOpts}|_] ->
NTopicName1 = emqx_mountpoint:mount(Mountpoint, NTopicName), {ok, {TopicId, NTopicName, NSubOpts}, Channel}
NSubOpts1 = maps:merge(?DEFAULT_SUBOPTS, NSubOpts), end.
case emqx_session:subscribe(ClientInfo, NTopicName1, NSubOpts1, Session) of
{ok, NSession} -> do_subscribe({TopicId, TopicName, SubOpts},
{ok, {TopicId, QoS}, Channel = #channel{
Channel#channel{session = NSession}}; session = Session,
{error, ?RC_QUOTA_EXCEEDED} -> clientinfo = ClientInfo
?LOG(warning, "Cannot subscribe ~s due to ~s.", = #{mountpoint := Mountpoint}}) ->
[TopicName, emqx_reason_codes:text(?RC_QUOTA_EXCEEDED)]), NTopicName = emqx_mountpoint:mount(Mountpoint, TopicName),
{error, ?SN_EXCEED_LIMITATION} NSubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts),
end case emqx_session:subscribe(ClientInfo, NTopicName, NSubOpts, Session) of
{ok, NSession} ->
{ok, {TopicId, NTopicName, NSubOpts},
Channel#channel{session = NSession}};
{error, ?RC_QUOTA_EXCEEDED} ->
?LOG(warning, "Cannot subscribe ~s due to ~s.",
[TopicName, emqx_reason_codes:text(?RC_QUOTA_EXCEEDED)]),
{error, ?SN_EXCEED_LIMITATION}
end. end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -899,33 +907,42 @@ preproc_unsub_type(?SN_UNSUBSCRIBE_MSG_TYPE(?SN_SHORT_TOPIC,
end, end,
{ok, TopicName, Channel}. {ok, TopicName, Channel}.
do_unsubscribe(TopicName, run_client_unsub_hook(TopicName,
Channel = #channel{ Channel = #channel{
ctx = Ctx, ctx = Ctx,
session = Session, clientinfo = ClientInfo
clientinfo = ClientInfo }) ->
= #{mountpoint := Mountpoint}}) ->
TopicFilters = [emqx_topic:parse(TopicName)], TopicFilters = [emqx_topic:parse(TopicName)],
case run_hooks(Ctx, 'client.unsubscribe', case run_hooks(Ctx, 'client.unsubscribe',
[ClientInfo, #{}], TopicFilters) of [ClientInfo, #{}], TopicFilters) of
[] -> [] ->
%% Skip to unsubscribe {ok, [], Channel};
{ok, Channel}; NTopicFilters ->
[{NTopicName, NSubOpts}|_] -> {ok, NTopicFilters, Channel}
NTopicName1 = emqx_mountpoint:mount(Mountpoint, NTopicName),
NSubOpts1 = maps:merge(
emqx_gateway_utils:default_subopts(),
NSubOpts
),
case emqx_session:unsubscribe(ClientInfo, NTopicName1,
NSubOpts1, Session) of
{ok, NSession} ->
{ok, Channel#channel{session = NSession}};
{error, ?RC_NO_SUBSCRIPTION_EXISTED} ->
{ok, Channel}
end
end. end.
do_unsubscribe(TopicFilters,
Channel = #channel{
session = Session,
clientinfo = ClientInfo
= #{mountpoint := Mountpoint}}) ->
NChannel =
lists:foldl(fun({TopicName, SubOpts}, ChannAcc) ->
NTopicName = emqx_mountpoint:mount(Mountpoint, TopicName),
NSubOpts = maps:merge(
emqx_gateway_utils:default_subopts(),
SubOpts
),
case emqx_session:unsubscribe(ClientInfo, NTopicName,
NSubOpts, Session) of
{ok, NSession} ->
ChannAcc#channel{session = NSession};
{error, ?RC_NO_SUBSCRIPTION_EXISTED} ->
ChannAcc
end
end, Channel, TopicFilters),
{ok, TopicFilters, NChannel}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Awake & Asleep %% Awake & Asleep
@ -1101,14 +1118,36 @@ message_to_packet(MsgId, Message,
| {shutdown, Reason :: term(), Reply :: term(), channel()} | {shutdown, Reason :: term(), Reply :: term(), channel()}
| {shutdown, Reason :: term(), Reply :: term(), | {shutdown, Reason :: term(), Reply :: term(),
emqx_types:packet(), channel()}. emqx_types:packet(), channel()}.
handle_call({subscribe, _Topic, _Subopts}, Channel) -> handle_call({subscribe, Topic, SubOpts}, Channel) ->
reply({error, not_supported_now}, Channel); %% XXX: Only support short_topic_name
SubProps = maps:get(sub_props, SubOpts, #{}),
case maps:get(subtype, SubProps, short_topic_name) of
short_topic_name ->
case byte_size(Topic) of
2 ->
case do_subscribe({?SN_INVALID_TOPIC_ID,
Topic, SubOpts}, Channel) of
{ok, _, NChannel} ->
reply(ok, NChannel);
{error, ?SN_EXCEED_LIMITATION} ->
reply({error, exceed_limitation}, Channel)
end;
_ ->
reply({error, bad_topic_name}, Channel)
end;
predefined_topic_id ->
reply({error, only_support_short_name_topic}, Channel);
_ ->
reply({error, only_support_short_name_topic}, Channel)
end;
handle_call({unsubscribe, _Topic}, Channel) -> handle_call({unsubscribe, Topic}, Channel) ->
reply({error, not_supported_now}, Channel); TopicFilters = [emqx_topic:parse(Topic)],
{ok, _, NChannel} = do_unsubscribe(TopicFilters, Channel),
reply(ok, NChannel);
handle_call(subscriptions, Channel) -> handle_call(subscriptions, Channel = #channel{session = Session}) ->
reply({error, not_supported_now}, Channel); reply(maps:to_list(emqx_session:info(subscriptions, Session)), Channel);
handle_call(kick, Channel) -> handle_call(kick, Channel) ->
NChannel = ensure_disconnected(kicked, Channel), NChannel = ensure_disconnected(kicked, Channel),

View File

@ -22,7 +22,6 @@
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-import(proplists, [get_value/2, get_value/3]). -import(proplists, [get_value/2, get_value/3]).
%% API %% API
@ -548,9 +547,9 @@ check_subscribed_status({SubId, {ParsedTopic, _SubOpts}},
}) -> }) ->
MountedTopic = emqx_mountpoint:mount(Mountpoint, ParsedTopic), MountedTopic = emqx_mountpoint:mount(Mountpoint, ParsedTopic),
case lists:keyfind(SubId, 1, Subs) of case lists:keyfind(SubId, 1, Subs) of
{SubId, MountedTopic, _Ack, _SubOpts} -> {SubId, MountedTopic, _Ack, _} ->
ok; ok;
{SubId, _OtherTopic, _Ack, _SubOpts} -> {SubId, _OtherTopic, _Ack, _} ->
{error, "Conflict subscribe id"}; {error, "Conflict subscribe id"};
false -> false ->
ok ok