Merge pull request #10955 from HJianBo/clean-mqttsn-topic-registry

Fix(mqttsn): clean predefined topics once gateway unload
This commit is contained in:
JianBo He 2023-06-12 09:37:47 +08:00 committed by GitHub
commit d14d87b443
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 426 additions and 408 deletions

View File

@ -389,7 +389,7 @@ open_session(
end, end,
case takeover_session(GwName, ClientId) of case takeover_session(GwName, ClientId) of
{ok, ConnMod, ChanPid, Session} -> {ok, ConnMod, ChanPid, Session} ->
ok = emqx_session:resume(ClientInfo, Session), ok = SessionMod:resume(ClientInfo, Session),
case request_stepdown({takeover, 'end'}, ConnMod, ChanPid) of case request_stepdown({takeover, 'end'}, ConnMod, ChanPid) of
{ok, Pendings} -> {ok, Pendings} ->
register_channel( register_channel(

View File

@ -14,6 +14,8 @@
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-define(SN_MAX_PREDEF_TOPIC_ID, 1024).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% MQTT-SN Types %% MQTT-SN Types
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -56,8 +56,7 @@ on_gateway_load(
}, },
Ctx Ctx
) -> ) ->
%% We Also need to start `emqx_mqttsn_broadcast` & %% We Also need to start `emqx_mqttsn_broadcast`
%% `emqx_mqttsn_registry` process
case maps:get(broadcast, Config, false) of case maps:get(broadcast, Config, false) of
false -> false ->
ok; ok;
@ -70,12 +69,9 @@ on_gateway_load(
end, end,
PredefTopics = maps:get(predefined, Config, []), PredefTopics = maps:get(predefined, Config, []),
{ok, RegistrySvr} = emqx_mqttsn_registry:start_link(GwName, PredefTopics), ok = emqx_mqttsn_registry:persist_predefined_topics(PredefTopics),
NConfig = maps:without( NConfig = maps:without([broadcast, predefined], Config),
[broadcast, predefined],
Config#{registry => emqx_mqttsn_registry:lookup_name(RegistrySvr)}
),
Listeners = emqx_gateway_utils:normalize_config(NConfig), Listeners = emqx_gateway_utils:normalize_config(NConfig),
@ -125,5 +121,7 @@ on_gateway_unload(
}, },
_GwState _GwState
) -> ) ->
PredefTopics = maps:get(predefined, Config, []),
ok = emqx_mqttsn_registry:clear_predefined_topics(PredefTopics),
Listeners = normalize_config(Config), Listeners = normalize_config(Config),
stop_listeners(GwName, Listeners). stop_listeners(GwName, Listeners).

View File

@ -51,8 +51,6 @@
-record(channel, { -record(channel, {
%% Context %% Context
ctx :: emqx_gateway_ctx:context(), ctx :: emqx_gateway_ctx:context(),
%% Registry
registry :: emqx_mqttsn_registry:registry(),
%% Gateway Id %% Gateway Id
gateway_id :: integer(), gateway_id :: integer(),
%% Enable negative_qos %% Enable negative_qos
@ -62,7 +60,7 @@
%% MQTT-SN Client Info %% MQTT-SN Client Info
clientinfo :: emqx_types:clientinfo(), clientinfo :: emqx_types:clientinfo(),
%% Session %% Session
session :: emqx_session:session() | undefined, session :: emqx_mqttsn_session:session() | undefined,
%% Keepalive %% Keepalive
keepalive :: emqx_keepalive:keepalive() | undefined, keepalive :: emqx_keepalive:keepalive() | undefined,
%% Will Msg %% Will Msg
@ -147,7 +145,6 @@ init(
) -> ) ->
Peercert = maps:get(peercert, ConnInfo, undefined), Peercert = maps:get(peercert, ConnInfo, undefined),
Mountpoint = maps:get(mountpoint, Option, undefined), Mountpoint = maps:get(mountpoint, Option, undefined),
Registry = maps:get(registry, Option),
GwId = maps:get(gateway_id, Option), GwId = maps:get(gateway_id, Option),
EnableNegQoS = maps:get(enable_qos3, Option, true), EnableNegQoS = maps:get(enable_qos3, Option, true),
ListenerId = ListenerId =
@ -180,7 +177,6 @@ init(
), ),
#channel{ #channel{
ctx = Ctx, ctx = Ctx,
registry = Registry,
gateway_id = GwId, gateway_id = GwId,
enable_negative_qos = EnableNegQoS, enable_negative_qos = EnableNegQoS,
conninfo = ConnInfo, conninfo = ConnInfo,
@ -217,7 +213,7 @@ info(conn_state, #channel{conn_state = ConnState}) ->
info(clientinfo, #channel{clientinfo = ClientInfo}) -> info(clientinfo, #channel{clientinfo = ClientInfo}) ->
ClientInfo; ClientInfo;
info(session, #channel{session = Session}) -> info(session, #channel{session = Session}) ->
emqx_utils:maybe_apply(fun emqx_session:info/1, Session); emqx_utils:maybe_apply(fun emqx_mqttsn_session:info/1, Session);
info(will_msg, #channel{will_msg = WillMsg}) -> info(will_msg, #channel{will_msg = WillMsg}) ->
WillMsg; WillMsg;
info(clientid, #channel{clientinfo = #{clientid := ClientId}}) -> info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
@ -229,7 +225,7 @@ info(ctx, #channel{ctx = Ctx}) ->
stats(#channel{session = undefined}) -> stats(#channel{session = undefined}) ->
[]; [];
stats(#channel{session = Session}) -> stats(#channel{session = Session}) ->
emqx_session:stats(Session). emqx_mqttsn_session:stats(Session).
set_conn_state(ConnState, Channel) -> set_conn_state(ConnState, Channel) ->
Channel#channel{conn_state = ConnState}. Channel#channel{conn_state = ConnState}.
@ -388,19 +384,15 @@ process_connect(
clientinfo = ClientInfo clientinfo = ClientInfo
} }
) -> ) ->
SessFun = fun(ClientInfoT, _) -> SessFun = fun(ClientInfoT, _) -> emqx_mqttsn_session:init(ClientInfoT) end,
Conf = emqx_cm:get_session_confs(
ClientInfoT, #{receive_maximum => 1, expiry_interval => 0}
),
emqx_session:init(Conf)
end,
case case
emqx_gateway_ctx:open_session( emqx_gateway_ctx:open_session(
Ctx, Ctx,
CleanStart, CleanStart,
ClientInfo, ClientInfo,
ConnInfo, ConnInfo,
SessFun SessFun,
_SessMod = emqx_mqttsn_session
) )
of of
{ok, #{ {ok, #{
@ -470,7 +462,7 @@ handle_in(
MsgId, MsgId,
Data Data
), ),
Channel = #channel{conn_state = idle, registry = Registry} Channel = #channel{conn_state = idle}
) -> ) ->
case check_negative_qos_enable(Publish, Channel) of case check_negative_qos_enable(Publish, Channel) of
ok -> ok ->
@ -479,11 +471,8 @@ handle_in(
?SN_SHORT_TOPIC -> ?SN_SHORT_TOPIC ->
TopicId; TopicId;
?SN_PREDEFINED_TOPIC -> ?SN_PREDEFINED_TOPIC ->
emqx_mqttsn_registry:lookup_topic( Registry = emqx_mqttsn_registry:init(),
Registry, emqx_mqttsn_registry:lookup_topic(TopicId, Registry);
?NEG_QOS_CLIENT_ID,
TopicId
);
_ -> _ ->
undefined undefined
end, end,
@ -631,20 +620,19 @@ handle_in(
end; end;
handle_in( handle_in(
?SN_REGISTER_MSG(_TopicId, MsgId, TopicName), ?SN_REGISTER_MSG(_TopicId, MsgId, TopicName),
Channel = #channel{ Channel = #channel{session = Session}
registry = Registry,
clientinfo = #{clientid := ClientId}
}
) -> ) ->
case emqx_mqttsn_registry:register_topic(Registry, ClientId, TopicName) of Registry = emqx_mqttsn_session:registry(Session),
TopicId when is_integer(TopicId) -> case emqx_mqttsn_registry:reg(TopicName, Registry) of
{ok, TopicId, NRegistry} ->
?SLOG(debug, #{ ?SLOG(debug, #{
msg => "registered_topic_name", msg => "registered_topic_name",
topic_name => TopicName, topic_name => TopicName,
topic_id => TopicId topic_id => TopicId
}), }),
AckPacket = ?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED), AckPacket = ?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED),
{ok, {outgoing, AckPacket}, Channel}; NSession = emqx_mqttsn_session:set_registry(NRegistry, Session),
{ok, {outgoing, AckPacket}, Channel#channel{session = NSession}};
{error, too_large} -> {error, too_large} ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "register_topic_failed", msg => "register_topic_failed",
@ -756,14 +744,14 @@ handle_in(
?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode), ?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode),
Channel = #channel{ Channel = #channel{
ctx = Ctx, ctx = Ctx,
registry = Registry,
session = Session, session = Session,
clientinfo = ClientInfo = #{clientid := ClientId} clientinfo = ClientInfo
} }
) -> ) ->
Registry = emqx_mqttsn_session:registry(Session),
case ReturnCode of case ReturnCode of
?SN_RC_ACCEPTED -> ?SN_RC_ACCEPTED ->
case emqx_session:puback(ClientInfo, MsgId, Session) of case emqx_mqttsn_session:puback(ClientInfo, MsgId, Session) of
{ok, Msg, NSession} -> {ok, Msg, NSession} ->
ok = after_message_acked(ClientInfo, Msg, Channel), ok = after_message_acked(ClientInfo, Msg, Channel),
{Replies, NChannel} = goto_asleep_if_buffered_msgs_sent( {Replies, NChannel} = goto_asleep_if_buffered_msgs_sent(
@ -795,7 +783,7 @@ handle_in(
{ok, Channel} {ok, Channel}
end; end;
?SN_RC_INVALID_TOPIC_ID -> ?SN_RC_INVALID_TOPIC_ID ->
case emqx_mqttsn_registry:lookup_topic(Registry, ClientId, TopicId) of case emqx_mqttsn_registry:lookup_topic(TopicId, Registry) of
undefined -> undefined ->
{ok, Channel}; {ok, Channel};
TopicName -> TopicName ->
@ -820,7 +808,7 @@ handle_in(
clientinfo = ClientInfo clientinfo = ClientInfo
} }
) -> ) ->
case emqx_session:pubrec(ClientInfo, MsgId, Session) of case emqx_mqttsn_session:pubrec(ClientInfo, MsgId, Session) of
{ok, Msg, NSession} -> {ok, Msg, NSession} ->
ok = after_message_acked(ClientInfo, Msg, Channel), ok = after_message_acked(ClientInfo, Msg, Channel),
NChannel = Channel#channel{session = NSession}, NChannel = Channel#channel{session = NSession},
@ -846,7 +834,7 @@ handle_in(
?SN_PUBREC_MSG(?SN_PUBREL, MsgId), ?SN_PUBREC_MSG(?SN_PUBREL, MsgId),
Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo} Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo}
) -> ) ->
case emqx_session:pubrel(ClientInfo, MsgId, Session) of case emqx_mqttsn_session:pubrel(ClientInfo, MsgId, Session) of
{ok, NSession} -> {ok, NSession} ->
NChannel = Channel#channel{session = NSession}, NChannel = Channel#channel{session = NSession},
handle_out(pubcomp, MsgId, NChannel); handle_out(pubcomp, MsgId, NChannel);
@ -863,7 +851,7 @@ handle_in(
?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId), ?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId),
Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo} Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo}
) -> ) ->
case emqx_session:pubcomp(ClientInfo, MsgId, Session) of case emqx_mqttsn_session:pubcomp(ClientInfo, MsgId, Session) of
{ok, NSession} -> {ok, NSession} ->
{Replies, NChannel} = goto_asleep_if_buffered_msgs_sent( {Replies, NChannel} = goto_asleep_if_buffered_msgs_sent(
Channel#channel{session = NSession} Channel#channel{session = NSession}
@ -1100,12 +1088,10 @@ convert_topic_id_to_name({{name, TopicName}, Flags, Data}, Channel) ->
{ok, {TopicName, Flags, Data}, Channel}; {ok, {TopicName, Flags, Data}, Channel};
convert_topic_id_to_name( convert_topic_id_to_name(
{{id, TopicId}, Flags, Data}, {{id, TopicId}, Flags, Data},
Channel = #channel{ Channel = #channel{session = Session}
registry = Registry,
clientinfo = #{clientid := ClientId}
}
) -> ) ->
case emqx_mqttsn_registry:lookup_topic(Registry, ClientId, TopicId) of Registry = emqx_mqttsn_session:registry(Session),
case emqx_mqttsn_registry:lookup_topic(TopicId, Registry) of
undefined -> undefined ->
{error, ?SN_RC_INVALID_TOPIC_ID}; {error, ?SN_RC_INVALID_TOPIC_ID};
TopicName -> TopicName ->
@ -1174,7 +1160,7 @@ do_publish(
Msg = #message{qos = ?QOS_2}, Msg = #message{qos = ?QOS_2},
Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo} Channel = #channel{ctx = Ctx, session = Session, clientinfo = ClientInfo}
) -> ) ->
case emqx_session:publish(ClientInfo, MsgId, Msg, Session) of case emqx_mqttsn_session:publish(ClientInfo, MsgId, Msg, Session) of
{ok, _PubRes, NSession} -> {ok, _PubRes, NSession} ->
NChannel1 = ensure_timer( NChannel1 = ensure_timer(
await_timer, await_timer,
@ -1207,15 +1193,13 @@ preproc_subs_type(
TopicName, TopicName,
QoS QoS
), ),
Channel = #channel{ Channel = #channel{session = Session}
registry = Registry,
clientinfo = #{clientid := ClientId}
}
) -> ) ->
Registry = emqx_mqttsn_session:registry(Session),
%% If the gateway is able accept the subscription, %% If the gateway is able accept the subscription,
%% it assigns a topic id to the received topic name %% it assigns a topic id to the received topic name
%% and returns it within a SUBACK message %% and returns it within a SUBACK message
case emqx_mqttsn_registry:register_topic(Registry, ClientId, TopicName) of case emqx_mqttsn_registry:reg(TopicName, Registry) of
{error, too_large} -> {error, too_large} ->
{error, ?SN_RC2_EXCEED_LIMITATION}; {error, ?SN_RC2_EXCEED_LIMITATION};
{error, wildcard_topic} -> {error, wildcard_topic} ->
@ -1226,8 +1210,9 @@ preproc_subs_type(
%% value when it has the first PUBLISH message with a matching %% value when it has the first PUBLISH message with a matching
%% topic name to be sent to the client, see also Section 6.10. %% topic name to be sent to the client, see also Section 6.10.
{ok, {?SN_INVALID_TOPIC_ID, TopicName, QoS}, Channel}; {ok, {?SN_INVALID_TOPIC_ID, TopicName, QoS}, Channel};
TopicId when is_integer(TopicId) -> {ok, TopicId, NRegistry} ->
{ok, {TopicId, TopicName, QoS}, Channel} NSession = emqx_mqttsn_session:set_registry(NRegistry, Session),
{ok, {TopicId, TopicName, QoS}, Channel#channel{session = NSession}}
end; end;
preproc_subs_type( preproc_subs_type(
?SN_SUBSCRIBE_MSG_TYPE( ?SN_SUBSCRIBE_MSG_TYPE(
@ -1235,18 +1220,10 @@ preproc_subs_type(
TopicId, TopicId,
QoS QoS
), ),
Channel = #channel{ Channel = #channel{session = Session}
registry = Registry,
clientinfo = #{clientid := ClientId}
}
) -> ) ->
case Registry = emqx_mqttsn_session:registry(Session),
emqx_mqttsn_registry:lookup_topic( case emqx_mqttsn_registry:lookup_topic(TopicId, Registry) of
Registry,
ClientId,
TopicId
)
of
undefined -> undefined ->
{error, ?SN_RC_INVALID_TOPIC_ID}; {error, ?SN_RC_INVALID_TOPIC_ID};
TopicName -> TopicName ->
@ -1323,7 +1300,7 @@ do_subscribe(
) -> ) ->
NTopicName = emqx_mountpoint:mount(Mountpoint, TopicName), NTopicName = emqx_mountpoint:mount(Mountpoint, TopicName),
NSubOpts = maps:merge(emqx_gateway_utils:default_subopts(), SubOpts), NSubOpts = maps:merge(emqx_gateway_utils:default_subopts(), SubOpts),
case emqx_session:subscribe(ClientInfo, NTopicName, NSubOpts, Session) of case emqx_mqttsn_session:subscribe(ClientInfo, NTopicName, NSubOpts, Session) of
{ok, NSession} -> {ok, NSession} ->
{ok, {TopicId, NTopicName, NSubOpts}, Channel#channel{session = NSession}}; {ok, {TopicId, NTopicName, NSubOpts}, Channel#channel{session = NSession}};
{error, ?RC_QUOTA_EXCEEDED} -> {error, ?RC_QUOTA_EXCEEDED} ->
@ -1351,18 +1328,10 @@ preproc_unsub_type(
?SN_PREDEFINED_TOPIC, ?SN_PREDEFINED_TOPIC,
TopicId TopicId
), ),
Channel = #channel{ Channel = #channel{session = Session}
registry = Registry,
clientinfo = #{clientid := ClientId}
}
) -> ) ->
case Registry = emqx_mqttsn_session:registry(Session),
emqx_mqttsn_registry:lookup_topic( case emqx_mqttsn_registry:lookup_topic(TopicId, Registry) of
Registry,
ClientId,
TopicId
)
of
undefined -> undefined ->
{error, not_found}; {error, not_found};
TopicName -> TopicName ->
@ -1422,7 +1391,7 @@ do_unsubscribe(
SubOpts SubOpts
), ),
case case
emqx_session:unsubscribe( emqx_mqttsn_session:unsubscribe(
ClientInfo, ClientInfo,
NTopicName, NTopicName,
NSubOpts, NSubOpts,
@ -1467,9 +1436,9 @@ awake(
clientid => ClientId, clientid => ClientId,
previous_state => ConnState previous_state => ConnState
}), }),
{ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session), {ok, Publishes, Session1} = emqx_mqttsn_session:replay(ClientInfo, Session),
{NPublishes, NSession} = {NPublishes, NSession} =
case emqx_session:deliver(ClientInfo, [], Session1) of case emqx_mqttsn_session:deliver(ClientInfo, [], Session1) of
{ok, Session2} -> {ok, Session2} ->
{Publishes, Session2}; {Publishes, Session2};
{ok, More, Session2} -> {ok, More, Session2} ->
@ -1497,8 +1466,8 @@ goto_asleep_if_buffered_msgs_sent(
} }
) -> ) ->
case case
emqx_mqueue:is_empty(emqx_session:info(mqueue, Session)) andalso emqx_mqueue:is_empty(emqx_mqttsn_session:info(mqueue, Session)) andalso
emqx_inflight:is_empty(emqx_session:info(inflight, Session)) emqx_inflight:is_empty(emqx_mqttsn_session:info(inflight, Session))
of of
true -> true ->
?SLOG(info, #{ ?SLOG(info, #{
@ -1591,7 +1560,7 @@ handle_out(
register_inflight = undefined register_inflight = undefined
} }
) -> ) ->
{MsgId, NSession} = emqx_session:obtain_next_pkt_id(Session), {MsgId, NSession} = emqx_mqttsn_session:obtain_next_pkt_id(Session),
Outgoing = {outgoing, ?SN_REGISTER_MSG(TopicId, MsgId, TopicName)}, Outgoing = {outgoing, ?SN_REGISTER_MSG(TopicId, MsgId, TopicName)},
NChannel = Channel#channel{ NChannel = Channel#channel{
session = NSession, session = NSession,
@ -1667,7 +1636,7 @@ maybe_resume_session(
resuming = true resuming = true
} }
) -> ) ->
Subs = emqx_session:info(subscriptions, Session), Subs = emqx_mqttsn_session:info(subscriptions, Session),
case subs_resume() andalso map_size(Subs) =/= 0 of case subs_resume() andalso map_size(Subs) =/= 0 of
true -> true ->
TopicNames = lists:filter(fun(T) -> not emqx_topic:wildcard(T) end, maps:keys(Subs)), TopicNames = lists:filter(fun(T) -> not emqx_topic:wildcard(T) end, maps:keys(Subs)),
@ -1692,9 +1661,9 @@ resume_or_replay_messages(
false -> false ->
{[], Channel} {[], Channel}
end, end,
{ok, Publishes, Session1} = emqx_session:replay(ClientInfo, Session), {ok, Publishes, Session1} = emqx_mqttsn_session:replay(ClientInfo, Session),
{NPublishes, NSession} = {NPublishes, NSession} =
case emqx_session:deliver(ClientInfo, NPendings, Session1) of case emqx_mqttsn_session:deliver(ClientInfo, NPendings, Session1) of
{ok, Session2} -> {ok, Session2} ->
{Publishes, Session2}; {Publishes, Session2};
{ok, More, Session2} -> {ok, More, Session2} ->
@ -1765,10 +1734,7 @@ outgoing_deliver_and_register({Packets, Channel}) ->
message_to_packet( message_to_packet(
MsgId, MsgId,
Message, Message,
#channel{ #channel{session = Session}
registry = Registry,
clientinfo = #{clientid := ClientId}
}
) -> ) ->
QoS = emqx_message:qos(Message), QoS = emqx_message:qos(Message),
Topic = emqx_message:topic(Message), Topic = emqx_message:topic(Message),
@ -1778,7 +1744,8 @@ message_to_packet(
?QOS_0 -> 0; ?QOS_0 -> 0;
_ -> MsgId _ -> MsgId
end, end,
case emqx_mqttsn_registry:lookup_topic_id(Registry, ClientId, Topic) of Registry = emqx_mqttsn_session:registry(Session),
case emqx_mqttsn_registry:lookup_topic_id(Topic, Registry) of
{predef, PredefTopicId} -> {predef, PredefTopicId} ->
Flags = #mqtt_sn_flags{qos = QoS, topic_id_type = ?SN_PREDEFINED_TOPIC}, Flags = #mqtt_sn_flags{qos = QoS, topic_id_type = ?SN_PREDEFINED_TOPIC},
?SN_PUBLISH_MSG(Flags, PredefTopicId, NMsgId, Payload); ?SN_PUBLISH_MSG(Flags, PredefTopicId, NMsgId, Payload);
@ -1813,7 +1780,7 @@ handle_call({unsubscribe, Topic}, _From, Channel) ->
{ok, _, NChannel} = do_unsubscribe(TopicFilters, Channel), {ok, _, NChannel} = do_unsubscribe(TopicFilters, Channel),
reply_and_update(ok, NChannel); reply_and_update(ok, NChannel);
handle_call(subscriptions, _From, Channel = #channel{session = Session}) -> handle_call(subscriptions, _From, Channel = #channel{session = Session}) ->
reply({ok, maps:to_list(emqx_session:info(subscriptions, Session))}, Channel); reply({ok, maps:to_list(emqx_mqttsn_session:info(subscriptions, Session))}, Channel);
handle_call(kick, _From, Channel) -> handle_call(kick, _From, Channel) ->
NChannel = ensure_disconnected(kicked, Channel), NChannel = ensure_disconnected(kicked, Channel),
shutdown_and_reply(kicked, ok, NChannel); shutdown_and_reply(kicked, ok, NChannel);
@ -1834,7 +1801,7 @@ handle_call(
pendings = Pendings pendings = Pendings
} }
) -> ) ->
ok = emqx_session:takeover(Session), ok = emqx_mqttsn_session:takeover(Session),
%% TODO: Should not drain deliver here (side effect) %% TODO: Should not drain deliver here (side effect)
Delivers = emqx_utils:drain_deliver(), Delivers = emqx_utils:drain_deliver(),
AllPendings = lists:append(Delivers, Pendings), AllPendings = lists:append(Delivers, Pendings),
@ -1911,8 +1878,9 @@ handle_info(clean_authz_cache, Channel) ->
{ok, Channel}; {ok, Channel};
handle_info({subscribe, _}, Channel) -> handle_info({subscribe, _}, Channel) ->
{ok, Channel}; {ok, Channel};
handle_info({register, TopicName}, Channel) -> handle_info({register, TopicName}, Channel = #channel{session = Session}) ->
case ensure_registered_topic_name(TopicName, Channel) of Registry = emqx_mqttsn_session:registry(Session),
case emqx_mqttsn_registry:reg(TopicName, Registry) of
{error, Reason} -> {error, Reason} ->
?SLOG(error, #{ ?SLOG(error, #{
msg => "register_topic_failed", msg => "register_topic_failed",
@ -1920,8 +1888,9 @@ handle_info({register, TopicName}, Channel) ->
reason => Reason reason => Reason
}), }),
{ok, Channel}; {ok, Channel};
{ok, TopicId} -> {ok, TopicId, NRegistry} ->
handle_out(register, {TopicId, TopicName}, Channel) NSession = emqx_mqttsn_session:set_registry(NRegistry, Session),
handle_out(register, {TopicId, TopicName}, Channel#channel{session = NSession})
end; end;
handle_info(Info, Channel) -> handle_info(Info, Channel) ->
?SLOG(error, #{ ?SLOG(error, #{
@ -1940,21 +1909,6 @@ maybe_shutdown(Reason, Channel = #channel{conninfo = ConnInfo}) ->
shutdown(Reason, Channel) shutdown(Reason, Channel)
end. end.
ensure_registered_topic_name(
TopicName,
Channel = #channel{registry = Registry}
) ->
ClientId = clientid(Channel),
case emqx_mqttsn_registry:lookup_topic_id(Registry, ClientId, TopicName) of
undefined ->
case emqx_mqttsn_registry:register_topic(Registry, ClientId, TopicName) of
{error, Reason} -> {error, Reason};
TopicId -> {ok, TopicId}
end;
TopicId ->
{ok, TopicId}
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Ensure disconnected %% Ensure disconnected
@ -2003,7 +1957,7 @@ handle_deliver(
ConnState =:= disconnected; ConnState =:= disconnected;
ConnState =:= asleep ConnState =:= asleep
-> ->
NSession = emqx_session:enqueue( NSession = emqx_mqttsn_session:enqueue(
ClientInfo, ClientInfo,
ignore_local(maybe_nack(Delivers), ClientId, Session, Ctx), ignore_local(maybe_nack(Delivers), ClientId, Session, Ctx),
Session Session
@ -2039,7 +1993,7 @@ handle_deliver(
} }
) -> ) ->
case case
emqx_session:deliver( emqx_mqttsn_session:deliver(
ClientInfo, ClientInfo,
ignore_local(Delivers, ClientId, Session, Ctx), ignore_local(Delivers, ClientId, Session, Ctx),
Session Session
@ -2057,7 +2011,7 @@ handle_deliver(
end. end.
ignore_local(Delivers, Subscriber, Session, Ctx) -> ignore_local(Delivers, Subscriber, Session, Ctx) ->
Subs = emqx_session:info(subscriptions, Session), Subs = emqx_mqttsn_session:info(subscriptions, Session),
lists:filter( lists:filter(
fun({deliver, Topic, #message{from = Publisher}}) -> fun({deliver, Topic, #message{from = Publisher}}) ->
case maps:find(Topic, Subs) of case maps:find(Topic, Subs) of
@ -2132,7 +2086,7 @@ handle_timeout(
retry_delivery, retry_delivery,
Channel = #channel{session = Session, clientinfo = ClientInfo} Channel = #channel{session = Session, clientinfo = ClientInfo}
) -> ) ->
case emqx_session:retry(ClientInfo, Session) of case emqx_mqttsn_session:retry(ClientInfo, Session) of
{ok, NSession} -> {ok, NSession} ->
{ok, clean_timer(retry_timer, Channel#channel{session = NSession})}; {ok, clean_timer(retry_timer, Channel#channel{session = NSession})};
{ok, Publishes, Timeout, NSession} -> {ok, Publishes, Timeout, NSession} ->
@ -2157,7 +2111,7 @@ handle_timeout(
expire_awaiting_rel, expire_awaiting_rel,
Channel = #channel{session = Session, clientinfo = ClientInfo} Channel = #channel{session = Session, clientinfo = ClientInfo}
) -> ) ->
case emqx_session:expire(ClientInfo, awaiting_rel, Session) of case emqx_mqttsn_session:expire(ClientInfo, awaiting_rel, Session) of
{ok, NSession} -> {ok, NSession} ->
{ok, clean_timer(await_timer, Channel#channel{session = NSession})}; {ok, clean_timer(await_timer, Channel#channel{session = NSession})};
{ok, Timeout, NSession} -> {ok, Timeout, NSession} ->
@ -2301,17 +2255,14 @@ clean_timer(Name, Channel = #channel{timers = Timers}) ->
interval(alive_timer, #channel{keepalive = KeepAlive}) -> interval(alive_timer, #channel{keepalive = KeepAlive}) ->
emqx_keepalive:info(interval, KeepAlive); emqx_keepalive:info(interval, KeepAlive);
interval(retry_timer, #channel{session = Session}) -> interval(retry_timer, #channel{session = Session}) ->
emqx_session:info(retry_interval, Session); emqx_mqttsn_session:info(retry_interval, Session);
interval(await_timer, #channel{session = Session}) -> interval(await_timer, #channel{session = Session}) ->
emqx_session:info(await_rel_timeout, Session). emqx_mqttsn_session:info(await_rel_timeout, Session).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper functions %% Helper functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
clientid(#channel{clientinfo = #{clientid := ClientId}}) ->
ClientId.
run_hooks(Ctx, Name, Args) -> run_hooks(Ctx, Name, Args) ->
emqx_gateway_ctx:metrics_inc(Ctx, Name), emqx_gateway_ctx:metrics_inc(Ctx, Name),
emqx_hooks:run(Name, Args). emqx_hooks:run(Name, Args).

View File

@ -17,64 +17,92 @@
%% @doc The MQTT-SN Topic Registry %% @doc The MQTT-SN Topic Registry
-module(emqx_mqttsn_registry). -module(emqx_mqttsn_registry).
-behaviour(gen_server).
-include("emqx_mqttsn.hrl"). -include("emqx_mqttsn.hrl").
-include_lib("emqx/include/logger.hrl").
-export([start_link/2]).
-export([ -export([
register_topic/3, persist_predefined_topics/1,
unregister_topic/2 clear_predefined_topics/1
]). ]).
-export([ -export([
lookup_topic/3, init/0,
lookup_topic_id/3 reg/2,
unreg/2,
lookup_topic/2,
lookup_topic_id/2
]). ]).
%% gen_server callbacks -define(PKEY(Id), {mqttsn, predef_topics, Id}).
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
%% Internal exports (RPC) -type registry() :: #{
-export([ %% The last topic id aallocated
do_register/4 last_topic_id := pos_integer(),
]). %% The mapping from topic id to topic name
id_to_name := map(),
%% The mapping from topic name to topic id
name_to_id := map()
}.
-export([lookup_name/1]). -type predef_topic() :: #{
id := 1..1024,
-define(SN_SHARD, emqx_mqttsn_shard). topic := iolist()
}.
-record(state, {tabname, max_predef_topic_id = 0}).
-record(emqx_mqttsn_registry, {key, value}).
-type registry() :: {Tab :: atom(), RegistryPid :: pid()}.
%%----------------------------------------------------------------------------- %%-----------------------------------------------------------------------------
%% APIs
-spec start_link(atom(), list()) -> -spec persist_predefined_topics([predef_topic()]) -> ok.
ignore persist_predefined_topics(PredefTopics) when is_list(PredefTopics) ->
| {ok, pid()} try
| {error, Reason :: term()}. F = fun(#{id := TopicId, topic := TopicName0}) when TopicId =< 1024 ->
start_link(InstaId, PredefTopics) -> TopicName = iolist_to_binary(TopicName0),
gen_server:start_link(?MODULE, [InstaId, PredefTopics], []). persistent_term:put(?PKEY(TopicId), TopicName),
persistent_term:put(?PKEY(TopicName), TopicId)
end,
lists:foreach(F, PredefTopics)
catch
_:_ ->
clear_predefined_topics(PredefTopics),
error(badarg)
end.
-spec register_topic(registry(), emqx_types:clientid(), emqx_types:topic()) -> -spec clear_predefined_topics([predef_topic()]) -> ok.
integer() clear_predefined_topics(PredefTopics) ->
lists:foreach(
fun(#{id := TopicId, topic := TopicName0}) ->
TopicName = iolist_to_binary(TopicName0),
persistent_term:erase(?PKEY(TopicId)),
persistent_term:erase(?PKEY(TopicName))
end,
PredefTopics
),
ok.
-spec init() -> registry().
init() ->
#{
last_topic_id => ?SN_MAX_PREDEF_TOPIC_ID,
id_to_name => #{},
name_to_id => #{}
}.
-spec reg(emqx_types:topic(), registry()) ->
{ok, integer(), registry()}
| {error, term()}. | {error, term()}.
register_topic({_, Pid}, ClientId, TopicName) when is_binary(TopicName) -> reg(
TopicName,
Registry
) when is_binary(TopicName) ->
case emqx_topic:wildcard(TopicName) of case emqx_topic:wildcard(TopicName) of
false -> false ->
gen_server:call(Pid, {register, ClientId, TopicName}); case lookup_topic_id(TopicName, Registry) of
{predef, TopicId} when is_integer(TopicId) ->
{ok, TopicId, Registry};
TopicId when is_integer(TopicId) ->
{ok, TopicId, Registry};
undefined ->
do_reg(TopicName, Registry)
end;
%% TopicId: in case of accepted the value that will be used as topic %% TopicId: in case of accepted the value that will be used as topic
%% id by the gateway when sending PUBLISH messages to the client (not %% id by the gateway when sending PUBLISH messages to the client (not
%% relevant in case of subscriptions to a short topic name or to a topic %% relevant in case of subscriptions to a short topic name or to a topic
@ -83,182 +111,64 @@ register_topic({_, Pid}, ClientId, TopicName) when is_binary(TopicName) ->
{error, wildcard_topic} {error, wildcard_topic}
end. end.
-spec lookup_topic(registry(), emqx_types:clientid(), pos_integer()) -> do_reg(
TopicName,
Registry = #{
last_topic_id := TopicId0,
id_to_name := IdMap,
name_to_id := NameMap
}
) ->
case next_topic_id(TopicId0) of
{error, too_large} ->
{error, too_large};
NextTopicId ->
NRegistry = Registry#{
last_topic_id := NextTopicId,
id_to_name := maps:put(NextTopicId, TopicName, IdMap),
name_to_id := maps:put(TopicName, NextTopicId, NameMap)
},
{ok, NextTopicId, NRegistry}
end.
next_topic_id(Id) when is_integer(Id) andalso (Id < 16#FFFF) ->
Id + 1;
next_topic_id(Id) when is_integer(Id) ->
{error, too_large}.
-spec lookup_topic(pos_integer(), registry()) ->
undefined undefined
| binary(). | binary().
lookup_topic({Tab, _}, ClientId, TopicId) when is_integer(TopicId) -> lookup_topic(TopicId, _Registry = #{id_to_name := IdMap}) when is_integer(TopicId) ->
case lookup_element(Tab, {predef, TopicId}, 3) of case persistent_term:get(?PKEY(TopicId), undefined) of
undefined -> undefined ->
lookup_element(Tab, {ClientId, TopicId}, 3); maps:get(TopicId, IdMap, undefined);
Topic -> Topic ->
Topic Topic
end. end.
-spec lookup_topic_id(registry(), emqx_types:clientid(), emqx_types:topic()) -> -spec lookup_topic_id(emqx_types:topic(), registry()) ->
undefined undefined
| pos_integer() | pos_integer()
| {predef, integer()}. | {predef, integer()}.
lookup_topic_id({Tab, _}, ClientId, TopicName) when is_binary(TopicName) -> lookup_topic_id(TopicName, _Registry = #{name_to_id := NameMap}) when is_binary(TopicName) ->
case lookup_element(Tab, {predef, TopicName}, 3) of case persistent_term:get(?PKEY(TopicName), undefined) of
undefined -> undefined ->
lookup_element(Tab, {ClientId, TopicName}, 3); maps:get(TopicName, NameMap, undefined);
TopicId -> TopicId ->
{predef, TopicId} {predef, TopicId}
end. end.
%% @private -spec unreg(emqx_types:topic(), registry()) -> registry().
lookup_element(Tab, Key, Pos) -> unreg(TopicName, Registry = #{name_to_id := NameMap, id_to_name := IdMap}) when
try is_binary(TopicName)
ets:lookup_element(Tab, Key, Pos) ->
catch case maps:find(TopicName, NameMap) of
error:badarg -> undefined {ok, TopicId} ->
end. Registry#{
name_to_id := maps:remove(TopicName, NameMap),
-spec unregister_topic(registry(), emqx_types:clientid()) -> ok. id_to_name := maps:remove(TopicId, IdMap)
unregister_topic({_, Pid}, ClientId) -> };
gen_server:call(Pid, {unregister, ClientId}). error ->
lookup_name(Pid) ->
gen_server:call(Pid, name).
%%-----------------------------------------------------------------------------
name(InstaId) ->
list_to_atom(lists:concat([emqx_mqttsn_, InstaId, '_registry'])).
init([InstaId, PredefTopics]) ->
%% {predef, TopicId} -> TopicName
%% {predef, TopicName} -> TopicId
%% {ClientId, TopicId} -> TopicName
%% {ClientId, TopicName} -> TopicId
Tab = name(InstaId),
ok = mria:create_table(Tab, [
{storage, ram_copies},
{record_name, emqx_mqttsn_registry},
{attributes, record_info(fields, emqx_mqttsn_registry)},
{storage_properties, [{ets, [{read_concurrency, true}]}]},
{rlog_shard, ?SN_SHARD}
]),
ok = mria:wait_for_tables([Tab]),
MaxPredefId = lists:foldl(
fun(#{id := TopicId, topic := TopicName0}, AccId) ->
TopicName = iolist_to_binary(TopicName0),
mria:dirty_write(Tab, #emqx_mqttsn_registry{
key = {predef, TopicId},
value = TopicName
}),
mria:dirty_write(Tab, #emqx_mqttsn_registry{
key = {predef, TopicName},
value = TopicId
}),
case TopicId > AccId of
true -> TopicId;
false -> AccId
end
end,
0,
PredefTopics
),
{ok, #state{tabname = Tab, max_predef_topic_id = MaxPredefId}}.
handle_call(
{register, ClientId, TopicName},
_From,
State = #state{tabname = Tab, max_predef_topic_id = PredefId}
) ->
case lookup_topic_id({Tab, self()}, ClientId, TopicName) of
{predef, PredefTopicId} when is_integer(PredefTopicId) ->
{reply, PredefTopicId, State};
TopicId when is_integer(TopicId) ->
{reply, TopicId, State};
undefined ->
case next_topic_id(Tab, PredefId, ClientId) of
TopicId when TopicId >= 16#FFFF ->
{reply, {error, too_large}, State};
TopicId ->
case
mria:transaction(?SN_SHARD, fun ?MODULE:do_register/4, [
Tab, ClientId, TopicId, TopicName
])
of
{atomic, ok} ->
{reply, TopicId, State};
{aborted, Error} ->
{reply, {error, Error}, State}
end
end
end;
handle_call({unregister, ClientId}, _From, State = #state{tabname = Tab}) ->
Registry = mnesia:dirty_match_object(
Tab,
{emqx_mqttsn_registry, {ClientId, '_'}, '_'}
),
lists:foreach(
fun(R) ->
mria:dirty_delete_object(Tab, R)
end,
Registry Registry
),
{reply, ok, State};
handle_call(name, _From, State = #state{tabname = Tab}) ->
{reply, {Tab, self()}, State};
handle_call(Req, _From, State) ->
?SLOG(error, #{
msg => "unexpected_call",
call => Req
}),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?SLOG(error, #{
msg => "unexpected_cast",
cast => Msg
}),
{noreply, State}.
handle_info(Info, State) ->
?SLOG(error, #{
msg => "unexpected_info",
info => Info
}),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
do_register(Tab, ClientId, TopicId, TopicName) ->
mnesia:write(
Tab,
#emqx_mqttsn_registry{
key = {ClientId, next_topic_id},
value = TopicId + 1
},
write
),
mnesia:write(
Tab,
#emqx_mqttsn_registry{
key = {ClientId, TopicName},
value = TopicId
},
write
),
mnesia:write(
Tab,
#emqx_mqttsn_registry{
key = {ClientId, TopicId},
value = TopicName
},
write
).
%%-----------------------------------------------------------------------------
next_topic_id(Tab, PredefId, ClientId) ->
case mnesia:dirty_read(Tab, {ClientId, next_topic_id}) of
[#emqx_mqttsn_registry{value = Id}] -> Id;
[] -> PredefId + 1
end. end.

View File

@ -16,6 +16,7 @@
-module(emqx_mqttsn_schema). -module(emqx_mqttsn_schema).
-include("emqx_mqttsn.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
@ -72,7 +73,7 @@ fields(mqttsn) ->
fields(mqttsn_predefined) -> fields(mqttsn_predefined) ->
[ [
{id, {id,
sc(integer(), #{ sc(range(1, ?SN_MAX_PREDEF_TOPIC_ID), #{
required => true, required => true,
desc => ?DESC(mqttsn_predefined_id) desc => ?DESC(mqttsn_predefined_id)
})}, })},

View File

@ -0,0 +1,144 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mqttsn_session).
-export([registry/1, set_registry/2]).
-export([
init/1,
info/1,
info/2,
stats/1,
resume/2
]).
-export([
publish/4,
subscribe/4,
unsubscribe/4,
puback/3,
pubrec/3,
pubrel/3,
pubcomp/3
]).
-export([
replay/2,
deliver/3,
obtain_next_pkt_id/1,
takeover/1,
enqueue/3,
retry/2,
expire/3
]).
-type session() :: #{
registry := emqx_mqttsn_registry:registry(),
session := emqx_session:session()
}.
-export_type([session/0]).
init(ClientInfo) ->
Conf = emqx_cm:get_session_confs(
ClientInfo, #{receive_maximum => 1, expiry_interval => 0}
),
#{
registry => emqx_mqttsn_registry:init(),
session => emqx_session:init(Conf)
}.
registry(#{registry := Registry}) ->
Registry.
set_registry(Registry, Session) ->
Session#{registry := Registry}.
info(#{session := Session}) ->
emqx_session:info(Session).
info(Key, #{session := Session}) ->
emqx_session:info(Key, Session).
stats(#{session := Session}) ->
emqx_session:stats(Session).
puback(ClientInfo, MsgId, Session) ->
with_sess(?FUNCTION_NAME, [ClientInfo, MsgId], Session).
pubrec(ClientInfo, MsgId, Session) ->
with_sess(?FUNCTION_NAME, [ClientInfo, MsgId], Session).
pubrel(ClientInfo, MsgId, Session) ->
with_sess(?FUNCTION_NAME, [ClientInfo, MsgId], Session).
pubcomp(ClientInfo, MsgId, Session) ->
with_sess(?FUNCTION_NAME, [ClientInfo, MsgId], Session).
publish(ClientInfo, MsgId, Msg, Session) ->
with_sess(?FUNCTION_NAME, [ClientInfo, MsgId, Msg], Session).
subscribe(ClientInfo, Topic, SubOpts, Session) ->
with_sess(?FUNCTION_NAME, [ClientInfo, Topic, SubOpts], Session).
unsubscribe(ClientInfo, Topic, SubOpts, Session) ->
with_sess(?FUNCTION_NAME, [ClientInfo, Topic, SubOpts], Session).
replay(ClientInfo, Session) ->
with_sess(?FUNCTION_NAME, [ClientInfo], Session).
deliver(ClientInfo, Delivers, Session1) ->
with_sess(?FUNCTION_NAME, [ClientInfo, Delivers], Session1).
obtain_next_pkt_id(Session = #{session := Sess}) ->
{Id, Sess1} = emqx_session:obtain_next_pkt_id(Sess),
{Id, Session#{session := Sess1}}.
takeover(_Session = #{session := Sess}) ->
emqx_session:takeover(Sess).
enqueue(ClientInfo, Delivers, Session = #{session := Sess}) ->
Sess1 = emqx_session:enqueue(ClientInfo, Delivers, Sess),
Session#{session := Sess1}.
retry(ClientInfo, Session) ->
with_sess(?FUNCTION_NAME, [ClientInfo], Session).
expire(ClientInfo, awaiting_rel, Session) ->
with_sess(?FUNCTION_NAME, [ClientInfo, awaiting_rel], Session).
resume(ClientInfo, #{session := Sess}) ->
emqx_session:resume(ClientInfo, Sess).
%%--------------------------------------------------------------------
%% internal funcs
with_sess(Fun, Args, Session = #{session := Sess}) ->
case apply(emqx_session, Fun, Args ++ [Sess]) of
%% for subscribe
{error, Reason} ->
{error, Reason};
%% for pubrel
{ok, Sess1} ->
{ok, Session#{session := Sess1}};
%% for publish and puback
{ok, Result, Sess1} ->
{ok, Result, Session#{session := Sess1}};
%% for puback
{ok, Msgs, Replies, Sess1} ->
{ok, Msgs, Replies, Session#{session := Sess1}}
end.

View File

@ -47,12 +47,15 @@
-define(LOG(Format, Args), ct:log("TEST: " ++ Format, Args)). -define(LOG(Format, Args), ct:log("TEST: " ++ Format, Args)).
-define(MAX_PRED_TOPIC_ID, 2). -define(MAX_PRED_TOPIC_ID, ?SN_MAX_PREDEF_TOPIC_ID).
-define(PREDEF_TOPIC_ID1, 1). -define(PREDEF_TOPIC_ID1, 1).
-define(PREDEF_TOPIC_ID2, 2). -define(PREDEF_TOPIC_ID2, 2).
-define(PREDEF_TOPIC_NAME1, <<"/predefined/topic/name/hello">>). -define(PREDEF_TOPIC_NAME1, <<"/predefined/topic/name/hello">>).
-define(PREDEF_TOPIC_NAME2, <<"/predefined/topic/name/nice">>). -define(PREDEF_TOPIC_NAME2, <<"/predefined/topic/name/nice">>).
-define(ENABLE_QOS3, true). -define(DEFAULT_PREDEFINED_TOPICS, [
#{<<"id">> => ?PREDEF_TOPIC_ID1, <<"topic">> => ?PREDEF_TOPIC_NAME1},
#{<<"id">> => ?PREDEF_TOPIC_ID2, <<"topic">> => ?PREDEF_TOPIC_NAME2}
]).
% FLAG NOT USED % FLAG NOT USED
-define(FNU, 0). -define(FNU, 0).
@ -143,6 +146,13 @@ restart_mqttsn_with_mountpoint(Mp) ->
Conf#{<<"mountpoint">> => Mp} Conf#{<<"mountpoint">> => Mp}
). ).
restart_mqttsn_with_predefined_topics(Topics) ->
Conf = emqx:get_raw_config([gateway, mqttsn]),
emqx_gateway_conf:update_gateway(
mqttsn,
Conf#{<<"predefined">> => Topics}
).
default_config() -> default_config() ->
?CONF_DEFAULT. ?CONF_DEFAULT.
@ -487,6 +497,35 @@ t_subscribe_case08(_) ->
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket). gen_udp:close(Socket).
t_subscribe_predefined_topic(_) ->
Dup = 0,
QoS = 0,
Retain = 0,
Will = 0,
CleanSession = 0,
MsgId = 1,
Socket = ensure_connected_client(?CLIENTID),
send_subscribe_msg_predefined_topic(Socket, 0, ?PREDEF_TOPIC_ID1, 1),
?assertEqual(
<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
?PREDEF_TOPIC_ID1:16, MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)
),
send_disconnect_msg(Socket, undefined),
gen_udp:close(Socket),
restart_mqttsn_with_predefined_topics([]),
Socket1 = ensure_connected_client(?CLIENTID),
send_subscribe_msg_predefined_topic(Socket1, 0, ?PREDEF_TOPIC_ID1, 1),
?assertEqual(
<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, 0:16,
MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>,
receive_response(Socket1)
),
send_disconnect_msg(Socket1, undefined),
restart_mqttsn_with_predefined_topics(?DEFAULT_PREDEFINED_TOPICS),
gen_udp:close(Socket1).
t_publish_negqos_enabled(_) -> t_publish_negqos_enabled(_) ->
Dup = 0, Dup = 0,
QoS = 0, QoS = 0,
@ -513,14 +552,11 @@ t_publish_negqos_enabled(_) ->
Payload1 = <<20, 21, 22, 23>>, Payload1 = <<20, 21, 22, 23>>,
send_publish_msg_normal_topic(Socket, NegQoS, MsgId1, TopicId1, Payload1), send_publish_msg_normal_topic(Socket, NegQoS, MsgId1, TopicId1, Payload1),
timer:sleep(100), timer:sleep(100),
case ?ENABLE_QOS3 of
true ->
Eexp = Eexp =
<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
?SN_NORMAL_TOPIC:2, TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
What = receive_response(Socket), What = receive_response(Socket),
?assertEqual(Eexp, What) ?assertEqual(Eexp, What),
end,
send_disconnect_msg(Socket, undefined), send_disconnect_msg(Socket, undefined),
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
@ -2777,3 +2813,9 @@ flush(Msgs) ->
M -> flush([M | Msgs]) M -> flush([M | Msgs])
after 0 -> lists:reverse(Msgs) after 0 -> lists:reverse(Msgs)
end. end.
ensure_connected_client(ClientId) ->
{ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
Socket.

View File

@ -19,10 +19,11 @@
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include("emqx_mqttsn.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-define(REGISTRY, emqx_mqttsn_registry). -define(REGISTRY, emqx_mqttsn_registry).
-define(MAX_PREDEF_ID, 2). -define(MAX_PREDEF_ID, ?SN_MAX_PREDEF_TOPIC_ID).
-define(PREDEF_TOPICS, [ -define(PREDEF_TOPICS, [
#{id => 1, topic => <<"/predefined/topic/name/hello">>}, #{id => 1, topic => <<"/predefined/topic/name/hello">>},
#{id => 2, topic => <<"/predefined/topic/name/nice">>} #{id => 2, topic => <<"/predefined/topic/name/nice">>}
@ -36,96 +37,64 @@ all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) -> init_per_suite(Config) ->
application:ensure_all_started(ekka),
mria:start(),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
application:stop(ekka),
ok. ok.
init_per_testcase(_TestCase, Config) -> init_per_testcase(_TestCase, Config) ->
{ok, Pid} = ?REGISTRY:start_link('mqttsn', ?PREDEF_TOPICS), emqx_mqttsn_registry:persist_predefined_topics(?PREDEF_TOPICS),
{Tab, Pid} = ?REGISTRY:lookup_name(Pid), Config.
[{reg, {Tab, Pid}} | Config].
end_per_testcase(_TestCase, Config) -> end_per_testcase(_TestCase, Config) ->
{Tab, _Pid} = proplists:get_value(reg, Config), emqx_mqttsn_registry:clear_predefined_topics(?PREDEF_TOPICS),
mria:clear_table(Tab),
Config. Config.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Test cases %% Test cases
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_register(Config) -> t_register(_) ->
Reg = proplists:get_value(reg, Config), Reg = ?REGISTRY:init(),
?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic1">>)), {ok, ?MAX_PREDEF_ID + 1, Reg1} = ?REGISTRY:reg(<<"Topic1">>, Reg),
?assertEqual(?MAX_PREDEF_ID + 2, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic2">>)), {ok, ?MAX_PREDEF_ID + 2, Reg2} = ?REGISTRY:reg(<<"Topic2">>, Reg1),
?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 1)), ?assertMatch({ok, ?MAX_PREDEF_ID + 1, Reg2}, ?REGISTRY:reg(<<"Topic1">>, Reg2)),
?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 2)), ?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(?MAX_PREDEF_ID + 1, Reg2)),
?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic1">>)), ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(?MAX_PREDEF_ID + 2, Reg2)),
?assertEqual(?MAX_PREDEF_ID + 2, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic2">>)), ?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:lookup_topic_id(<<"Topic1">>, Reg2)),
emqx_mqttsn_registry:unregister_topic(Reg, <<"ClientId">>), ?assertEqual(?MAX_PREDEF_ID + 2, ?REGISTRY:lookup_topic_id(<<"Topic2">>, Reg2)),
?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 1)),
?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 2)),
?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic1">>)),
?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic2">>)).
t_register_case2(Config) -> Reg3 = emqx_mqttsn_registry:unreg(<<"Topic1">>, Reg2),
Reg = proplists:get_value(reg, Config), ?assertEqual(undefined, ?REGISTRY:lookup_topic(?MAX_PREDEF_ID + 1, Reg3)),
?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic1">>)), ?assertEqual(undefined, ?REGISTRY:lookup_topic_id(<<"Topic1">>, Reg3)),
?assertEqual(?MAX_PREDEF_ID + 2, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic2">>)), ?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(?MAX_PREDEF_ID + 2, Reg3)),
?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"Topic1">>)), ?assertEqual(?MAX_PREDEF_ID + 2, ?REGISTRY:lookup_topic_id(<<"Topic2">>, Reg3)),
?assertEqual(<<"Topic1">>, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 1)),
?assertEqual(<<"Topic2">>, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 2)),
?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic1">>)),
?assertEqual(?MAX_PREDEF_ID + 2, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic2">>)),
?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic3">>)),
?REGISTRY:unregister_topic(Reg, <<"ClientId">>),
?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 1)),
?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 2)),
?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic1">>)),
?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, <<"Topic2">>)).
t_reach_maximum(Config) -> ?assertMatch({ok, ?MAX_PREDEF_ID + 3, _Reg4}, ?REGISTRY:reg(<<"Topic3">>, Reg3)).
Reg = proplists:get_value(reg, Config),
register_a_lot(?MAX_PREDEF_ID + 1, 16#ffff, Reg),
?assertEqual({error, too_large}, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicABC">>)),
Topic1 = iolist_to_binary(io_lib:format("Topic~p", [?MAX_PREDEF_ID + 1])),
Topic2 = iolist_to_binary(io_lib:format("Topic~p", [?MAX_PREDEF_ID + 2])),
?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, Topic1)),
?assertEqual(?MAX_PREDEF_ID + 2, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, Topic2)),
?REGISTRY:unregister_topic(Reg, <<"ClientId">>),
?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 1)),
?assertEqual(undefined, ?REGISTRY:lookup_topic(Reg, <<"ClientId">>, ?MAX_PREDEF_ID + 2)),
?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, Topic1)),
?assertEqual(undefined, ?REGISTRY:lookup_topic_id(Reg, <<"ClientId">>, Topic2)).
t_register_case4(Config) -> t_reach_maximum(_) ->
Reg = proplists:get_value(reg, Config), Reg0 = ?REGISTRY:init(),
?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicA">>)), Reg = register_a_lot(?MAX_PREDEF_ID + 1, 16#ffff, Reg0),
?assertEqual(?MAX_PREDEF_ID + 2, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicB">>)), ?assertEqual({error, too_large}, ?REGISTRY:reg(<<"TopicABC">>, Reg)),
?assertEqual(?MAX_PREDEF_ID + 3, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicC">>)), ?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:lookup_topic_id(<<"Topic1025">>, Reg)),
?REGISTRY:unregister_topic(Reg, <<"ClientId">>), ?assertEqual(?MAX_PREDEF_ID + 2, ?REGISTRY:lookup_topic_id(<<"Topic1026">>, Reg)).
?assertEqual(?MAX_PREDEF_ID + 1, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"TopicD">>)).
t_deny_wildcard_topic(Config) -> t_deny_wildcard_topic(_) ->
Reg = proplists:get_value(reg, Config), Reg = ?REGISTRY:init(),
?assertEqual( ?assertEqual({error, wildcard_topic}, ?REGISTRY:reg(<<"/TopicA/#">>, Reg)),
{error, wildcard_topic}, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"/TopicA/#">>) ?assertEqual({error, wildcard_topic}, ?REGISTRY:reg(<<"/+/TopicB">>, Reg)).
),
?assertEqual(
{error, wildcard_topic}, ?REGISTRY:register_topic(Reg, <<"ClientId">>, <<"/+/TopicB">>)
).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper funcs %% Helper funcs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
register_a_lot(Max, Max, _Reg) -> register_a_lot(N, Max, Reg) when N =< Max ->
ok;
register_a_lot(N, Max, Reg) when N < Max ->
Topic = iolist_to_binary(["Topic", integer_to_list(N)]), Topic = iolist_to_binary(["Topic", integer_to_list(N)]),
?assertEqual(N, ?REGISTRY:register_topic(Reg, <<"ClientId">>, Topic)), {ok, ReturnedId, Reg1} = ?REGISTRY:reg(Topic, Reg),
register_a_lot(N + 1, Max, Reg). ?assertEqual(N, ReturnedId),
case N == Max of
true ->
Reg1;
_ ->
register_a_lot(N + 1, Max, Reg1)
end.

View File

@ -0,0 +1 @@
Fix the issue in MQTT-SN gateway where deleting Predefined Topics configuration does not work.