diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_sn/src/emqx_sn_gateway.erl index 335bd5531..55f70f943 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_sn/src/emqx_sn_gateway.erl @@ -83,11 +83,11 @@ keepalive_interval :: maybe(integer()), connpkt :: term(), asleep_timer :: tuple(), - asleep_msg_queue :: list(), enable_stats :: boolean(), stats_timer :: maybe(reference()), idle_timeout :: integer(), - enable_qos3 = false :: boolean() + enable_qos3 = false :: boolean(), + has_pending_pingresp = false :: boolean() }). -define(INFO_KEYS, [socktype, peername, sockname, sockstate]). %, active_n]). @@ -104,6 +104,15 @@ -define(NO_PEERCERT, undefined). +-define(CONN_INFO(Sockname, Peername), + #{socktype => udp, + sockname => Sockname, + peername => Peername, + protocol => 'mqtt-sn', + peercert => ?NO_PEERCERT, + conn_mod => ?MODULE + }). + %%-------------------------------------------------------------------- %% Exported APIs %%-------------------------------------------------------------------- @@ -134,13 +143,7 @@ init([{_, SockPid, Sock}, Peername, Options]) -> EnableStats = proplists:get_value(enable_stats, Options, false), case inet:sockname(Sock) of {ok, Sockname} -> - Channel = emqx_channel:init(#{socktype => udp, - sockname => Sockname, - peername => Peername, - protocol => 'mqtt-sn', - peercert => ?NO_PEERCERT, - conn_mod => ?MODULE - }, ?DEFAULT_CHAN_OPTIONS), + Channel = emqx_channel:init(?CONN_INFO(Sockname, Peername), ?DEFAULT_CHAN_OPTIONS), State = #state{gwid = GwId, username = Username, password = Password, @@ -152,7 +155,6 @@ init([{_, SockPid, Sock}, Peername, Options]) -> channel = Channel, registry = Registry, asleep_timer = emqx_sn_asleep_timer:init(), - asleep_msg_queue = [], enable_stats = EnableStats, enable_qos3 = EnableQos3, idle_timeout = IdleTimeout @@ -175,9 +177,6 @@ idle(cast, {incoming, ?SN_CONNECT_MSG(Flags, _ProtoId, Duration, ClientId)}, Sta #mqtt_sn_flags{will = Will, clean_start = CleanStart} = Flags, do_connect(ClientId, CleanStart, Will, Duration, State); -idle(cast, {incoming, Packet = ?CONNECT_PACKET(_ConnPkt)}, State) -> - handle_incoming(Packet, State); - idle(cast, {incoming, ?SN_ADVERTISE_MSG(_GwId, _Radius)}, State) -> % ignore {keep_state, State, State#state.idle_timeout}; @@ -188,7 +187,7 @@ idle(cast, {incoming, ?SN_DISCONNECT_MSG(_Duration)}, State) -> idle(cast, {incoming, ?SN_PUBLISH_MSG(_Flag, _TopicId, _MsgId, _Data)}, State = #state{enable_qos3 = false}) -> ?LOG(debug, "The enable_qos3 is false, ignore the received publish with QoS=-1 in idle mode!", [], State), - {keep_state_and_data, State#state.idle_timeout}; + {keep_state, State#state.idle_timeout}; idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1, topic_id_type = TopicIdType @@ -206,7 +205,7 @@ idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1, ok end, ?LOG(debug, "Client id=~p receives a publish with QoS=-1 in idle mode!", [ClientId], State), - {keep_state_and_data, State#state.idle_timeout}; + {keep_state, State#state.idle_timeout}; idle(cast, {incoming, PingReq = ?SN_PINGREQ_MSG(_ClientId)}, State) -> handle_ping(PingReq, State); @@ -400,15 +399,23 @@ asleep(cast, {incoming, ?SN_PINGREQ_MSG(undefined)}, State) -> % ClientId in PINGREQ is mandatory {keep_state, State}; -asleep(cast, {incoming, PingReq = ?SN_PINGREQ_MSG(ClientIdPing)}, - State = #state{clientid = ClientId}) -> +asleep(cast, {incoming, ?SN_PINGREQ_MSG(ClientIdPing)}, + State = #state{clientid = ClientId, channel = Channel}) -> case ClientIdPing of ClientId -> - _ = handle_ping(PingReq, State), - self() ! do_awake_jobs, - % it is better to go awake state, since the jobs in awake may take long time - % and asleep timer get timeout, it will cause disaster - {next_state, awake, State}; + inc_ping_counter(), + case emqx_session:dequeue(emqx_channel:get_session(Channel)) of + {ok, Session0} -> + send_message(?SN_PINGRESP_MSG(), State), + {keep_state, State#state{ + channel = emqx_channel:set_session(Session0, Channel)}}; + {ok, Delivers, Session0} -> + Events = [emqx_message:to_packet(PckId, Msg) || {PckId, Msg} <- Delivers] + ++ [try_goto_asleep], + {next_state, awake, State#state{ + channel = emqx_channel:set_session(Session0, Channel), + has_pending_pingresp = true}, outgoing_events(Events)} + end; _Other -> {next_state, asleep, State} end; @@ -453,6 +460,20 @@ awake(cast, {outgoing, Packet}, State) -> ok = handle_outgoing(Packet, State), {keep_state, State}; +awake(cast, {incoming, ?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode)}, State) -> + do_puback(TopicId, MsgId, ReturnCode, awake, State); + +awake(cast, try_goto_asleep, State=#state{channel = Channel, + has_pending_pingresp = PingPending}) -> + case emqx_mqueue:is_empty(emqx_session:info(mqueue, emqx_channel:get_session(Channel))) of + true when PingPending =:= true -> + send_message(?SN_PINGRESP_MSG(), State), + goto_asleep_state(State#state{has_pending_pingresp = false}); + true when PingPending =:= false -> + goto_asleep_state(State); + false -> keep_state_and_data + end; + awake(EventType, EventContent, State) -> handle_event(EventType, EventContent, awake, State). @@ -489,11 +510,12 @@ handle_event(info, {datagram, SockPid, Data}, StateName, shutdown(frame_error, State) end; -handle_event(info, Deliver = {deliver, _Topic, Msg}, asleep, - State = #state{asleep_msg_queue = AsleepMsgQ}) -> +handle_event(info, {deliver, _Topic, Msg}, asleep, + State = #state{channel = Channel}) -> % section 6.14, Support of sleeping clients ?LOG(debug, "enqueue downlink message in asleep state Msg=~p", [Msg], State), - {keep_state, State#state{asleep_msg_queue = [Deliver|AsleepMsgQ]}}; + Session = emqx_session:enqueue(Msg, emqx_channel:get_session(Channel)), + {keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}}; handle_event(info, Deliver = {deliver, _Topic, _Msg}, _StateName, State = #state{channel = Channel}) -> @@ -518,18 +540,6 @@ handle_event(info, {timeout, TRef, keepalive}, _StateName, State) -> handle_event(info, {timeout, TRef, TMsg}, _StateName, State) -> handle_timeout(TRef, TMsg, State); -handle_event(info, do_awake_jobs, StateName, State=#state{clientid = ClientId}) -> - ?LOG(debug, "Do awake jobs, statename : ~p", [StateName], State), - case process_awake_jobs(ClientId, State) of - {keep_state, NewState} -> - case StateName of - awake -> goto_asleep_state(NewState); - _Other -> {keep_state, NewState} - %% device send a CONNECT immediately before this do_awake_jobs is handled - end; - Stop -> Stop - end; - handle_event(info, asleep_timeout, asleep, State) -> ?LOG(debug, "asleep timer timeout, shutdown now", [], State), stop(asleep_timeout, State); @@ -593,33 +603,31 @@ handle_call(_From, Req, State = #state{channel = Channel}) -> handle_info(Info, State = #state{channel = Channel}) -> handle_return(emqx_channel:handle_info(Info, Channel), State). -handle_ping(_PingReq, State) -> - inc_counter(recv_oct, 2), - inc_counter(recv_msg, 1), - ok = send_message(?SN_PINGRESP_MSG(), State), - {keep_state, State}. - handle_timeout(TRef, TMsg, State = #state{channel = Channel}) -> handle_return(emqx_channel:handle_timeout(TRef, TMsg, Channel), State). -handle_return({ok, NChannel}, State) -> - {keep_state, State#state{channel = NChannel}}; -handle_return({ok, Replies, NChannel}, State) -> - {keep_state, State#state{channel = NChannel}, next_events(Replies)}; +handle_return(Return, State) -> + handle_return(Return, State, []). -handle_return({shutdown, Reason, NChannel}, State) -> +handle_return({ok, NChannel}, State, AddEvents) -> + handle_return({ok, AddEvents, NChannel}, State, []); +handle_return({ok, Replies, NChannel}, State, AddEvents) -> + {keep_state, State#state{channel = NChannel}, outgoing_events(append(Replies, AddEvents))}; +handle_return({shutdown, Reason, NChannel}, State, _AddEvents) -> stop({shutdown, Reason}, State#state{channel = NChannel}); -handle_return({shutdown, Reason, OutPacket, NChannel}, State) -> +handle_return({shutdown, Reason, OutPacket, NChannel}, State, _AddEvents) -> NState = State#state{channel = NChannel}, ok = handle_outgoing(OutPacket, NState), stop({shutdown, Reason}, NState). -next_events(Packet) when is_record(Packet, mqtt_packet) -> +outgoing_events(Actions) -> + lists:map(fun outgoing_event/1, Actions). + +outgoing_event(Packet) when is_record(Packet, mqtt_packet); + is_record(Packet, mqtt_sn_message)-> next_event({outgoing, Packet}); -next_events(Action) when is_tuple(Action) -> - next_event(Action); -next_events(Actions) when is_list(Actions) -> - lists:map(fun next_event/1, Actions). +outgoing_event(Action) -> + next_event(Action). close_socket(State = #state{sockstate = closed}) -> State; close_socket(State = #state{socket = _Socket}) -> @@ -673,6 +681,13 @@ call(Pid, Req) -> %%-------------------------------------------------------------------- %% Internal Functions %%-------------------------------------------------------------------- +handle_ping(_PingReq, State) -> + ok = send_message(?SN_PINGRESP_MSG(), State), + inc_ping_counter(), + {keep_state, State}. + +inc_ping_counter() -> + inc_counter(recv_msg, 1). mqtt2sn(?CONNACK_PACKET(0, _SessPresent), _State) -> ?SN_CONNACK_MSG(0); @@ -786,11 +801,17 @@ mqttsn_to_mqtt(?SN_PUBCOMP, MsgId) -> ?PUBCOMP_PACKET(MsgId). do_connect(ClientId, CleanStart, WillFlag, Duration, State) -> + %% 6.6 Client’s Publish Procedure + %% At any point in time a client may have only one QoS level 1 or 2 PUBLISH message + %% outstanding, i.e. it has to wait for the termination of this PUBLISH message exchange + %% before it could start a new level 1 or 2 transaction. + OnlyOneInflight = #{'Receive-Maximum' => 1}, ConnPkt = #mqtt_packet_connect{clientid = ClientId, clean_start = CleanStart, username = State#state.username, password = State#state.password, - keepalive = Duration + keepalive = Duration, + properties = OnlyOneInflight }, put(clientid, ClientId), case WillFlag of @@ -939,11 +960,11 @@ do_publish_will(#state{will_msg = WillMsg, clientid = ClientId}) -> _ = emqx_broker:publish(emqx_packet:to_message(Publish, ClientId)), ok. -do_puback(TopicId, MsgId, ReturnCode, _StateName, +do_puback(TopicId, MsgId, ReturnCode, StateName, State=#state{clientid = ClientId, registry = Registry}) -> case ReturnCode of ?SN_RC_ACCEPTED -> - handle_incoming(?PUBACK_PACKET(MsgId), State); + handle_incoming(?PUBACK_PACKET(MsgId), StateName, State); ?SN_RC_INVALID_TOPIC_ID -> case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of undefined -> ok; @@ -990,15 +1011,6 @@ update_will_msg(undefined, Msg) -> update_will_msg(Will = #will_msg{}, Msg) -> Will#will_msg{payload = Msg}. -process_awake_jobs(_ClientId, State = #state{asleep_msg_queue = []}) -> - {keep_state, State}; -process_awake_jobs(_ClientId, State = #state{channel = Channel, - asleep_msg_queue = AsleepMsgQ}) -> - Delivers = lists:reverse(AsleepMsgQ), - NState = State#state{asleep_msg_queue = []}, - Result = emqx_channel:handle_deliver(Delivers, Channel), - handle_return(Result, NState). - enqueue_msgid(suback, MsgId, TopicId) -> put({suback, MsgId}, TopicId); enqueue_msgid(puback, MsgId, TopicId) -> @@ -1022,12 +1034,21 @@ get_topic_id(Type, MsgId) -> TopicId -> TopicId end. -handle_incoming(Packet = ?PACKET(Type), State = #state{channel = Channel}) -> +handle_incoming(Packet, State) -> + handle_incoming(Packet, unknown, State). + +handle_incoming(?PUBACK_PACKET(_) = Packet, awake, State) -> + Result = channel_handle_in(Packet, State), + handle_return(Result, State, [try_goto_asleep]); +handle_incoming(Packet, _StName, State) -> + Result = channel_handle_in(Packet, State), + handle_return(Result, State). + +channel_handle_in(Packet = ?PACKET(Type), State = #state{channel = Channel}) -> _ = inc_incoming_stats(Type), ok = emqx_metrics:inc_recv(Packet), ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)], State), - Result = emqx_channel:handle_in(Packet, Channel), - handle_return(Result, State). + emqx_channel:handle_in(Packet, Channel). handle_outgoing(Packets, State) when is_list(Packets) -> lists:foreach(fun(Packet) -> handle_outgoing(Packet, State) end, Packets); @@ -1081,3 +1102,8 @@ next_event(Content) -> inc_counter(Key, Inc) -> _ = emqx_pd:inc_counter(Key, Inc), ok. + +append(Replies, AddEvents) when is_list(Replies) -> + Replies ++ AddEvents; +append(Replies, AddEvents) -> + [Replies] ++ AddEvents. diff --git a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl index 35ad84193..be783dc66 100644 --- a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl @@ -402,8 +402,6 @@ t_publish_negqos_case09(_) -> What = receive_response(Socket), ?assertEqual(Eexp, What) end, - %% dbg:start(), dbg:tracer(), dbg:p(all, c), - %% dbg:tpl(emqx_sn_gateway, send_message, x), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), @@ -1049,14 +1047,15 @@ t_asleep_test03_to_awake_qos1_dl_msg(_) -> % goto awake state, receive downlink messages, and go back to asleep send_pingreq_msg(Socket, ClientId), - ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), - - %% {unexpected_udp_data, _} = receive_response(Socket), + %% the broker should sent dl msgs to the awake client before sending the pingresp UdpData = receive_response(Socket), MsgId_udp = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicId1, Payload1}, UdpData), send_puback_msg(Socket, TopicId1, MsgId_udp), + %% check the pingresp is received at last + ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), + gen_udp:close(Socket). t_asleep_test04_to_awake_qos1_dl_msg(_) -> @@ -1106,8 +1105,6 @@ t_asleep_test04_to_awake_qos1_dl_msg(_) -> % goto awake state, receive downlink messages, and go back to asleep send_pingreq_msg(Socket, <<"test">>), - ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), - %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% get REGISTER first, since this topic has never been registered %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -1119,6 +1116,8 @@ t_asleep_test04_to_awake_qos1_dl_msg(_) -> MsgId_udp = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload1}, UdpData), send_puback_msg(Socket, TopicIdNew, MsgId_udp), + ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), + gen_udp:close(Socket). t_asleep_test05_to_awake_qos1_dl_msg(_) -> @@ -1173,7 +1172,6 @@ t_asleep_test05_to_awake_qos1_dl_msg(_) -> % goto awake state, receive downlink messages, and go back to asleep send_pingreq_msg(Socket, <<"test">>), - ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), UdpData_reg = receive_response(Socket), {TopicIdNew, MsgId_reg} = check_register_msg_on_udp(TopicName_test5, UdpData_reg), @@ -1197,7 +1195,7 @@ t_asleep_test05_to_awake_qos1_dl_msg(_) -> TopicIdNew, Payload4}, UdpData4), send_puback_msg(Socket, TopicIdNew, MsgId4) end, - timer:sleep(50), + ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), gen_udp:close(Socket). t_asleep_test06_to_awake_qos2_dl_msg(_) -> @@ -1249,14 +1247,12 @@ t_asleep_test06_to_awake_qos2_dl_msg(_) -> % goto awake state, receive downlink messages, and go back to asleep send_pingreq_msg(Socket, <<"test">>), - ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), UdpData = wrap_receive_response(Socket), MsgId_udp = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicId_tom, Payload1}, UdpData), send_pubrec_msg(Socket, MsgId_udp), - timer:sleep(300), - + ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), gen_udp:close(Socket). t_asleep_test07_to_connected(_) -> @@ -1391,8 +1387,6 @@ t_asleep_test09_to_awake_again_qos1_dl_msg(_) -> % goto awake state, receive downlink messages, and go back to asleep send_pingreq_msg(Socket, <<"test">>), - ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), - UdpData_reg = receive_response(Socket), {TopicIdNew, MsgId_reg} = check_register_msg_on_udp(TopicName_test9, UdpData_reg), send_regack_msg(Socket, TopicIdNew, MsgId_reg), @@ -1424,7 +1418,7 @@ t_asleep_test09_to_awake_again_qos1_dl_msg(_) -> TopicIdNew, Payload4}, UdpData4), send_puback_msg(Socket, TopicIdNew, MsgId4) end, - timer:sleep(100), + ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), %% send PINGREQ again to enter awake state send_pingreq_msg(Socket, <<"test">>), @@ -1787,7 +1781,7 @@ wrap_receive_response(Socket) -> Other end. receive_response(Socket) -> - receive_response(Socket, 5000). + receive_response(Socket, 2000). receive_response(Socket, Timeout) -> receive {udp, Socket, _, _, Bin} -> @@ -1832,8 +1826,8 @@ get_udp_broadcast_address() -> "255.255.255.255". check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, TopicType, TopicId, Payload}, UdpData) -> - ct:pal("UdpData: ~p, Payload: ~p", [UdpData, Payload]), <> = UdpData, + ct:pal("UdpData: ~p, Payload: ~p, PayloadIn: ~p", [UdpData, Payload, PayloadIn]), Size9 = byte_size(Payload) + 7, Eexp = <>, ?assertEqual(Eexp, HeaderUdp), % mqtt-sn header should be same diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 6e44e0a22..267101a9c 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -32,6 +32,8 @@ -export([ info/1 , info/2 , set_conn_state/2 + , get_session/1 + , set_session/2 , stats/1 , caps/1 ]). @@ -167,6 +169,12 @@ info(timers, #channel{timers = Timers}) -> Timers. set_conn_state(ConnState, Channel) -> Channel#channel{conn_state = ConnState}. +get_session(#channel{session = Session}) -> + Session. + +set_session(Session, Channel) -> + Channel#channel{session = Session}. + %% TODO: Add more stats. -spec(stats(channel()) -> emqx_types:stats()). stats(#channel{session = Session})-> diff --git a/src/emqx_session.erl b/src/emqx_session.erl index b920cc294..4dd26d3da 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -75,6 +75,7 @@ -export([ deliver/2 , enqueue/2 + , dequeue/1 , retry/1 , terminate/3 ]).