From 7ade24b3444e87f50b812d4c174cb19d1f85fc11 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Wed, 2 Mar 2022 14:57:35 +0800 Subject: [PATCH] feat(mqttsn): support to register unknown topic-name to the client --- .../src/mqttsn/emqx_sn_channel.erl | 103 ++++++++++++++---- .../emqx_gateway/src/mqttsn/emqx_sn_frame.erl | 2 +- .../test/emqx_sn_protocol_SUITE.erl | 5 - 3 files changed, 80 insertions(+), 30 deletions(-) diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index f8bbfd985..f01225431 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -338,9 +338,16 @@ process_connect(Channel = #channel{ SessFun ) of {ok, #{session := Session, - present := _Present}} -> + present := false}} -> handle_out(connack, ?SN_RC_ACCEPTED, Channel#channel{session = Session}); + {ok, #{session := Session, present := true, pendings := Pendings}} -> + Pendings1 = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())), + NChannel = Channel#channel{session = Session, + resuming = true, + pendings = Pendings1 + }, + handle_out(connack, ?SN_RC_ACCEPTED, NChannel); {error, Reason} -> ?SLOG(error, #{ msg => "failed_to_open_session" , reason => Reason @@ -1101,10 +1108,11 @@ awake(Channel = #channel{session = Session, clientinfo = ClientInfo}) -> {ok, More, Session2} -> {lists:append(Publishes, More), Session2} end, - {Packets, NChannel} = do_deliver(NPublishes, - Channel#channel{session = NSession}), - Outgoing = [{outgoing, Packets} || length(Packets) > 0], - {ok, Outgoing, NChannel}. + {Replies, NChannel} = outgoing_deliver_and_register( + do_deliver(NPublishes, + Channel#channel{session = NSession}) + ), + {ok, Replies, NChannel}. asleep(Duration, Channel = #channel{conn_state = asleep}) -> %% 6.14: The client can also modify its sleep duration @@ -1146,8 +1154,10 @@ handle_out(connack, ReasonCode, shutdown(Reason, AckPacket, Channel); handle_out(publish, Publishes, Channel) -> - {Packets, NChannel} = do_deliver(Publishes, Channel), - {ok, {outgoing, Packets}, NChannel}; + {Replies, NChannel} = outgoing_deliver_and_register( + do_deliver(Publishes, Channel) + ), + {ok, Replies, NChannel}; handle_out(puback, {TopicId, MsgId, Rc}, Channel) -> {ok, {outgoing, ?SN_PUBACK_MSG(TopicId, MsgId, Rc)}, Channel}; @@ -1207,17 +1217,18 @@ handle_out(disconnect, RC, Channel) -> %%-------------------------------------------------------------------- return_connack(AckPacket, Channel) -> - Replies = [{event, connected}, {outgoing, AckPacket}], + Replies1 = [{event, connected}, {outgoing, AckPacket}], case maybe_resume_session(Channel) of - ignore -> {ok, Replies, Channel}; + ignore -> {ok, Replies1, Channel}; {ok, Publishes, NSession} -> NChannel = Channel#channel{session = NSession, resuming = false, pendings = [] }, - {Packets, NChannel1} = do_deliver(Publishes, NChannel), - Outgoing = [{outgoing, Packets} || length(Packets) > 0], - {ok, Replies ++ Outgoing, NChannel1} + {Replies2, NChannel1} = outgoing_deliver_and_register( + do_deliver(Publishes, NChannel) + ), + {ok, Replies1 ++ Replies2, NChannel1} end. %%-------------------------------------------------------------------- @@ -1240,7 +1251,6 @@ maybe_resume_session(#channel{session = Session, %% Deliver publish: broker -> client %%-------------------------------------------------------------------- -%% return list(emqx_types:packet()) do_deliver({pubrel, MsgId}, Channel) -> {[?SN_PUBREC_MSG(?SN_PUBREL, MsgId)], Channel}; @@ -1271,6 +1281,18 @@ do_deliver(Publishes, Channel) when is_list(Publishes) -> end, {[], Channel}, Publishes), {lists:reverse(Packets), NChannel}. +outgoing_deliver_and_register({Packets, Channel}) -> + {NPackets, NRegisters} = + lists:foldl(fun(P, {Acc0, Acc1}) -> + case P of + {register, _} -> + {Acc0, [P|Acc1]}; + _ -> + {[P|Acc0], Acc1} + end + end, {[], []}, Packets), + {[{outgoing, lists:reverse(NPackets)}] ++ lists:reverse(NRegisters), Channel}. + message_to_packet(MsgId, Message, #channel{registry = Registry, clientinfo = #{clientid := ClientId}}) -> @@ -1281,17 +1303,19 @@ message_to_packet(MsgId, Message, ?QOS_0 -> 0; _ -> MsgId end, - {TopicIdType, NTopicId} = - case emqx_sn_registry:lookup_topic_id(Registry, ClientId, Topic) of - {predef, PredefTopicId} -> - {?SN_PREDEFINED_TOPIC, PredefTopicId}; - TopicId when is_integer(TopicId) -> - {?SN_NORMAL_TOPIC, TopicId}; - undefined -> - {?SN_SHORT_TOPIC, Topic} - end, - Flags = #mqtt_sn_flags{qos = QoS, topic_id_type = TopicIdType}, - ?SN_PUBLISH_MSG(Flags, NTopicId, NMsgId, Payload). + case emqx_sn_registry:lookup_topic_id(Registry, ClientId, Topic) of + {predef, PredefTopicId} -> + Flags = #mqtt_sn_flags{qos = QoS, topic_id_type = ?SN_PREDEFINED_TOPIC}, + ?SN_PUBLISH_MSG(Flags, PredefTopicId, NMsgId, Payload); + TopicId when is_integer(TopicId) -> + Flags = #mqtt_sn_flags{qos = QoS, topic_id_type = ?SN_NORMAL_TOPIC}, + ?SN_PUBLISH_MSG(Flags, TopicId, NMsgId, Payload); + undefined when byte_size(Topic) =:= 2 -> + Flags = #mqtt_sn_flags{qos = QoS, topic_id_type = ?SN_SHORT_TOPIC}, + ?SN_PUBLISH_MSG(Flags, Topic, NMsgId, Payload); + undefined -> + {register, Topic} + end. %%-------------------------------------------------------------------- %% Handle call @@ -1423,6 +1447,34 @@ handle_info(clean_authz_cache, Channel) -> handle_info({subscribe, _}, Channel) -> {ok, Channel}; +handle_info({register, TopicName}, + Channel = #channel{ + registry = Registry, + session = Session}) -> + ClientId = clientid(Channel), + case emqx_sn_registry:lookup_topic_id(Registry, ClientId, TopicName) of + undefined -> + case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of + {error, Reason} -> + ?SLOG(error, #{ msg => "register_topic_failed" + , topic_name => TopicName + , reason => Reason + }), + {ok, Channel}; + TopicId -> + {MsgId, NSession} = emqx_session:obtain_next_pkt_id(Session), + handle_out( + register, + {TopicId, MsgId, TopicName}, + Channel#channel{session = NSession}) + end; + Registered -> + ?SLOG(debug, #{ msg => "ignore_register_request" + , registered_as => Registered + }), + {ok, Channel} + end; + handle_info(Info, Channel) -> ?SLOG(error, #{ msg => "unexpected_info" , info => Info @@ -1678,6 +1730,9 @@ interval(await_timer, #channel{session = Session}) -> %% Helper functions %%-------------------------------------------------------------------- +clientid(#channel{clientinfo = #{clientid := ClientId}}) -> + ClientId. + run_hooks(Ctx, Name, Args) -> emqx_gateway_ctx:metrics_inc(Ctx, Name), emqx_hooks:run(Name, Args). diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_frame.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_frame.erl index b3d6004df..e6d136494 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_frame.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_frame.erl @@ -349,7 +349,7 @@ format(?SN_SUBACK_MSG(Flags, TopicId, MsgId, ReturnCode)) -> [QoS, MsgId, TopicId, ReturnCode]); format(?SN_UNSUBSCRIBE_MSG(Flags, Msgid, Topic)) -> #mqtt_sn_flags{topic_id_type = TopicIdType} = Flags, - io_lib:format("SN_UNSUBSCRIBE(TopicIdType=~s, MsgId=~w, TopicId=~w)", + io_lib:format("SN_UNSUBSCRIBE(TopicIdType=~w, MsgId=~w, TopicId=~w)", [TopicIdType, Msgid, Topic]); format(?SN_UNSUBACK_MSG(MsgId)) -> io_lib:format("SN_UNSUBACK(MsgId=~w)", [MsgId]); diff --git a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl index 9181cb881..6a046b854 100644 --- a/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_sn_protocol_SUITE.erl @@ -1048,11 +1048,6 @@ t_delivery_takeover_and_re_register(_) -> _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m2">>)), _ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)), - emqx_logger:set_log_level(debug), - dbg:tracer(),dbg:p(all,call), - dbg:tp(emqx_gateway_cm,x), - %dbg:tpl(emqx_gateway_cm, request_stepdown,x), - {ok, NSocket} = gen_udp:open(0, [binary]), send_connect_msg(NSocket, <<"test">>, 0), ?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,