From a3d8981635b0179b421fe968ff530b93c30a5c1a Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 15 Mar 2022 11:28:40 +0800 Subject: [PATCH] refactor(mqttsn): assign subs_resume to mqtt-sn client process state --- apps/emqx_sn/src/emqx_sn.appup.src | 8 ++++++ apps/emqx_sn/src/emqx_sn_app.erl | 2 ++ apps/emqx_sn/src/emqx_sn_gateway.erl | 20 ++++++++------- apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl | 26 +++++++++++--------- 4 files changed, 36 insertions(+), 20 deletions(-) diff --git a/apps/emqx_sn/src/emqx_sn.appup.src b/apps/emqx_sn/src/emqx_sn.appup.src index 7a8679db1..269605afa 100644 --- a/apps/emqx_sn/src/emqx_sn.appup.src +++ b/apps/emqx_sn/src/emqx_sn.appup.src @@ -3,20 +3,24 @@ [ {"4.3.5",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, {update,emqx_sn_gateway,{advanced,["4.3.5"]}} ]}, {"4.3.4",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, {update,emqx_sn_gateway,{advanced,["4.3.4"]}} ]}, {"4.3.3",[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, {update,emqx_sn_gateway,{advanced,["4.3.3"]}} ]}, {"4.3.2",[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, {update,emqx_sn_gateway,{advanced,["4.3.2"]}} ]}, {<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]} @@ -24,20 +28,24 @@ [ {"4.3.5",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, {update,emqx_sn_gateway,{advanced,["4.3.5"]}} ]}, {"4.3.4",[ {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, {update,emqx_sn_gateway,{advanced,["4.3.4"]}} ]}, {"4.3.3",[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, {update,emqx_sn_gateway,{advanced,["4.3.3"]}} ]}, {"4.3.2",[ {load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_sn_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_sn_app,brutal_purge,soft_purge,[]}, {update,emqx_sn_gateway,{advanced,["4.3.2"]}} ]}, {<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]} diff --git a/apps/emqx_sn/src/emqx_sn_app.erl b/apps/emqx_sn/src/emqx_sn_app.erl index 9575523f8..e7c86d5b7 100644 --- a/apps/emqx_sn/src/emqx_sn_app.erl +++ b/apps/emqx_sn/src/emqx_sn_app.erl @@ -122,12 +122,14 @@ listeners_confs() -> EnableQos3 = application:get_env(emqx_sn, enable_qos3, false), EnableStats = application:get_env(emqx_sn, enable_stats, false), IdleTimeout = application:get_env(emqx_sn, idle_timeout, 30000), + SubsResume = application:get_env(emqx_sn, subs_resume, false), [{udp, ListenOn, [{gateway_id, GwId}, {username, Username}, {password, Password}, {enable_qos3, EnableQos3}, {enable_stats, EnableStats}, {idle_timeout, IdleTimeout}, + {subs_resume, SubsResume}, {max_connections, 1024000}, {max_conn_rate, 1000}, {udp_options, []}]}]. diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_sn/src/emqx_sn_gateway.erl index 3e8c18dc9..80a1339c0 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_sn/src/emqx_sn_gateway.erl @@ -98,6 +98,7 @@ %% Store all qos0 messages for waiting REGACK %% Note: QoS1/QoS2 messages will kept inflight queue pending_topic_ids = #{} :: pending_msgs(), + subs_resume = false, waiting_sync_topics = [], previous_outgoings_and_state = undefined }). @@ -158,6 +159,7 @@ init([{_, SockPid, Sock}, Peername, Options]) -> Password = proplists:get_value(password, Options, undefined), EnableQos3 = proplists:get_value(enable_qos3, Options, false), IdleTimeout = proplists:get_value(idle_timeout, Options, 30000), + SubsResume = proplists:get_value(subs_resume, Options, false), EnableStats = proplists:get_value(enable_stats, Options, false), case inet:sockname(Sock) of {ok, Sockname} -> @@ -174,7 +176,8 @@ init([{_, SockPid, Sock}, Peername, Options]) -> asleep_timer = emqx_sn_asleep_timer:init(), enable_stats = EnableStats, enable_qos3 = EnableQos3, - idle_timeout = IdleTimeout + idle_timeout = IdleTimeout, + subs_resume = SubsResume }, emqx_logger:set_metadata_peername(esockd:format(Peername)), {ok, idle, State, [IdleTimeout]}; @@ -689,8 +692,10 @@ terminate(Reason, _StateName, #state{channel = Channel}) -> code_change({down, _Vsn}, StateName, State, [ToVsn]) -> case re:run(ToVsn, "4\\.3\\.[2-5]") of {match, _} -> - NState0 = lists:droplast(lists:droplast(tuple_to_list(State))), - NState = list_to_tuple(lists:reverse(NState0)), + NState0 = lists:droplast( + lists:droplast( + lists:droplast(tuple_to_list(State)))), + NState = list_to_tuple(NState0), {ok, StateName, NState}; _ -> {ok, StateName, State} @@ -700,7 +705,7 @@ code_change(_Vsn, StateName, State, [FromVsn]) -> case re:run(FromVsn, "4\\.3\\.[2-5]") of {match, _} -> NState = list_to_tuple( - tuple_to_list(State) ++ [[], undefined] + tuple_to_list(State) ++ [false, [], undefined] ), {ok, StateName, NState}; _ -> @@ -1178,9 +1183,9 @@ handle_incoming( clean_start = false} } = Packet, _, - State) -> + State = #state{subs_resume = SubsResume}) -> Result = channel_handle_in(Packet, State), - case {subs_resume(), Result} of + case {SubsResume, Result} of {true, {ok, Replies, NChannel}} -> case maps:get( subscriptions, @@ -1327,9 +1332,6 @@ maybe_send_puback(?QOS_0, _TopicId, _MsgId, _ReasonCode, State) -> maybe_send_puback(_QoS, TopicId, MsgId, ReasonCode, State) -> send_message(?SN_PUBACK_MSG(TopicId, MsgId, ReasonCode), State). -subs_resume() -> - application:get_env(emqx_sn, subs_resume, false). - %% Replies = [{event, connected}, {connack, ConnAck}, {outgoing, Pkts}] split_connack_replies([A = {event, connected}, B = {connack, _ConnAck} | Outgoings]) -> diff --git a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl index 9fd731f28..cdecc06bb 100644 --- a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl @@ -79,6 +79,12 @@ set_special_confs(emqx_sn) -> set_special_confs(_App) -> ok. +restart_emqx_sn(#{subs_resume := Bool}) -> + application:set_env(emqx_sn, subs_resume, Bool), + _ = application:stop(emqx_sn), + _ = application:ensure_all_started(emqx_sn), + ok. + %%-------------------------------------------------------------------- %% Test cases %%-------------------------------------------------------------------- @@ -1624,7 +1630,7 @@ t_broadcast_test1(_) -> gen_udp:close(Socket). t_register_subs_resume_on(_) -> - application:set_env(emqx_sn, subs_resume, true), + restart_emqx_sn(#{subs_resume => true}), MsgId = 1, {ok, Socket} = gen_udp:open(0, [binary]), send_connect_msg(Socket, <<"test">>, 0), @@ -1710,8 +1716,6 @@ t_register_subs_resume_on(_) -> %% no more messages ?assertEqual(udp_receive_timeout, receive_response(NSocket)), - application:set_env(emqx_sn, subs_resume, false), - gen_udp:close(NSocket), {ok, NSocket1} = gen_udp:open(0, [binary]), send_connect_msg(NSocket1, <<"test">>), @@ -1719,7 +1723,8 @@ t_register_subs_resume_on(_) -> receive_response(NSocket1)), send_disconnect_msg(NSocket1, undefined), ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)), - gen_udp:close(NSocket1). + gen_udp:close(NSocket1), + restart_emqx_sn(#{subs_resume => false}). t_register_subs_resume_off(_) -> MsgId = 1, @@ -1829,7 +1834,7 @@ t_register_subs_resume_off(_) -> gen_udp:close(NSocket1). t_register_skip_failure_topic_name_and_reach_max_retry_times(_) -> - application:set_env(emqx_sn, subs_resume, true), + restart_emqx_sn(#{subs_resume => true}), MsgId = 1, {ok, Socket} = gen_udp:open(0, [binary]), send_connect_msg(Socket, <<"test">>, 0), @@ -1883,11 +1888,11 @@ t_register_skip_failure_topic_name_and_reach_max_retry_times(_) -> %% shutdown due to reached max retry times timer:sleep(5000), %% RETYRY_TIMEOUT ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)), - application:set_env(emqx_sn, subs_resume, false), - gen_udp:close(NSocket). + gen_udp:close(NSocket), + restart_emqx_sn(#{subs_resume => false}). t_register_enqueue_delivering_messages(_) -> - application:set_env(emqx_sn, subs_resume, true), + restart_emqx_sn(#{subs_resume => true}), MsgId = 1, {ok, Socket} = gen_udp:open(0, [binary]), send_connect_msg(Socket, <<"test">>, 0), @@ -1932,8 +1937,6 @@ t_register_enqueue_delivering_messages(_) -> %% no more messages ?assertEqual(udp_receive_timeout, receive_response(NSocket)), - application:set_env(emqx_sn, subs_resume, false), - gen_udp:close(NSocket), {ok, NSocket1} = gen_udp:open(0, [binary]), send_connect_msg(NSocket1, <<"test">>), @@ -1941,7 +1944,8 @@ t_register_enqueue_delivering_messages(_) -> receive_response(NSocket1)), send_disconnect_msg(NSocket1, undefined), ?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)), - gen_udp:close(NSocket1). + gen_udp:close(NSocket1), + restart_emqx_sn(#{subs_resume => false}). %%-------------------------------------------------------------------- %% Helper funcs