refactor(mqttsn): assign subs_resume to mqtt-sn client process state
This commit is contained in:
parent
af65310ce7
commit
a3d8981635
|
@ -3,20 +3,24 @@
|
||||||
[
|
[
|
||||||
{"4.3.5",[
|
{"4.3.5",[
|
||||||
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_sn_app,brutal_purge,soft_purge,[]},
|
||||||
{update,emqx_sn_gateway,{advanced,["4.3.5"]}}
|
{update,emqx_sn_gateway,{advanced,["4.3.5"]}}
|
||||||
]},
|
]},
|
||||||
{"4.3.4",[
|
{"4.3.4",[
|
||||||
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_sn_app,brutal_purge,soft_purge,[]},
|
||||||
{update,emqx_sn_gateway,{advanced,["4.3.4"]}}
|
{update,emqx_sn_gateway,{advanced,["4.3.4"]}}
|
||||||
]},
|
]},
|
||||||
{"4.3.3",[
|
{"4.3.3",[
|
||||||
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
|
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_sn_app,brutal_purge,soft_purge,[]},
|
||||||
{update,emqx_sn_gateway,{advanced,["4.3.3"]}}
|
{update,emqx_sn_gateway,{advanced,["4.3.3"]}}
|
||||||
]},
|
]},
|
||||||
{"4.3.2",[
|
{"4.3.2",[
|
||||||
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
|
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_sn_app,brutal_purge,soft_purge,[]},
|
||||||
{update,emqx_sn_gateway,{advanced,["4.3.2"]}}
|
{update,emqx_sn_gateway,{advanced,["4.3.2"]}}
|
||||||
]},
|
]},
|
||||||
{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]}
|
{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]}
|
||||||
|
@ -24,20 +28,24 @@
|
||||||
[
|
[
|
||||||
{"4.3.5",[
|
{"4.3.5",[
|
||||||
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_sn_app,brutal_purge,soft_purge,[]},
|
||||||
{update,emqx_sn_gateway,{advanced,["4.3.5"]}}
|
{update,emqx_sn_gateway,{advanced,["4.3.5"]}}
|
||||||
]},
|
]},
|
||||||
{"4.3.4",[
|
{"4.3.4",[
|
||||||
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_sn_app,brutal_purge,soft_purge,[]},
|
||||||
{update,emqx_sn_gateway,{advanced,["4.3.4"]}}
|
{update,emqx_sn_gateway,{advanced,["4.3.4"]}}
|
||||||
]},
|
]},
|
||||||
{"4.3.3",[
|
{"4.3.3",[
|
||||||
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
|
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_sn_app,brutal_purge,soft_purge,[]},
|
||||||
{update,emqx_sn_gateway,{advanced,["4.3.3"]}}
|
{update,emqx_sn_gateway,{advanced,["4.3.3"]}}
|
||||||
]},
|
]},
|
||||||
{"4.3.2",[
|
{"4.3.2",[
|
||||||
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
|
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
|
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
|
||||||
|
{load_module,emqx_sn_app,brutal_purge,soft_purge,[]},
|
||||||
{update,emqx_sn_gateway,{advanced,["4.3.2"]}}
|
{update,emqx_sn_gateway,{advanced,["4.3.2"]}}
|
||||||
]},
|
]},
|
||||||
{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]}
|
{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]}
|
||||||
|
|
|
@ -122,12 +122,14 @@ listeners_confs() ->
|
||||||
EnableQos3 = application:get_env(emqx_sn, enable_qos3, false),
|
EnableQos3 = application:get_env(emqx_sn, enable_qos3, false),
|
||||||
EnableStats = application:get_env(emqx_sn, enable_stats, false),
|
EnableStats = application:get_env(emqx_sn, enable_stats, false),
|
||||||
IdleTimeout = application:get_env(emqx_sn, idle_timeout, 30000),
|
IdleTimeout = application:get_env(emqx_sn, idle_timeout, 30000),
|
||||||
|
SubsResume = application:get_env(emqx_sn, subs_resume, false),
|
||||||
[{udp, ListenOn, [{gateway_id, GwId},
|
[{udp, ListenOn, [{gateway_id, GwId},
|
||||||
{username, Username},
|
{username, Username},
|
||||||
{password, Password},
|
{password, Password},
|
||||||
{enable_qos3, EnableQos3},
|
{enable_qos3, EnableQos3},
|
||||||
{enable_stats, EnableStats},
|
{enable_stats, EnableStats},
|
||||||
{idle_timeout, IdleTimeout},
|
{idle_timeout, IdleTimeout},
|
||||||
|
{subs_resume, SubsResume},
|
||||||
{max_connections, 1024000},
|
{max_connections, 1024000},
|
||||||
{max_conn_rate, 1000},
|
{max_conn_rate, 1000},
|
||||||
{udp_options, []}]}].
|
{udp_options, []}]}].
|
||||||
|
|
|
@ -98,6 +98,7 @@
|
||||||
%% Store all qos0 messages for waiting REGACK
|
%% Store all qos0 messages for waiting REGACK
|
||||||
%% Note: QoS1/QoS2 messages will kept inflight queue
|
%% Note: QoS1/QoS2 messages will kept inflight queue
|
||||||
pending_topic_ids = #{} :: pending_msgs(),
|
pending_topic_ids = #{} :: pending_msgs(),
|
||||||
|
subs_resume = false,
|
||||||
waiting_sync_topics = [],
|
waiting_sync_topics = [],
|
||||||
previous_outgoings_and_state = undefined
|
previous_outgoings_and_state = undefined
|
||||||
}).
|
}).
|
||||||
|
@ -158,6 +159,7 @@ init([{_, SockPid, Sock}, Peername, Options]) ->
|
||||||
Password = proplists:get_value(password, Options, undefined),
|
Password = proplists:get_value(password, Options, undefined),
|
||||||
EnableQos3 = proplists:get_value(enable_qos3, Options, false),
|
EnableQos3 = proplists:get_value(enable_qos3, Options, false),
|
||||||
IdleTimeout = proplists:get_value(idle_timeout, Options, 30000),
|
IdleTimeout = proplists:get_value(idle_timeout, Options, 30000),
|
||||||
|
SubsResume = proplists:get_value(subs_resume, Options, false),
|
||||||
EnableStats = proplists:get_value(enable_stats, Options, false),
|
EnableStats = proplists:get_value(enable_stats, Options, false),
|
||||||
case inet:sockname(Sock) of
|
case inet:sockname(Sock) of
|
||||||
{ok, Sockname} ->
|
{ok, Sockname} ->
|
||||||
|
@ -174,7 +176,8 @@ init([{_, SockPid, Sock}, Peername, Options]) ->
|
||||||
asleep_timer = emqx_sn_asleep_timer:init(),
|
asleep_timer = emqx_sn_asleep_timer:init(),
|
||||||
enable_stats = EnableStats,
|
enable_stats = EnableStats,
|
||||||
enable_qos3 = EnableQos3,
|
enable_qos3 = EnableQos3,
|
||||||
idle_timeout = IdleTimeout
|
idle_timeout = IdleTimeout,
|
||||||
|
subs_resume = SubsResume
|
||||||
},
|
},
|
||||||
emqx_logger:set_metadata_peername(esockd:format(Peername)),
|
emqx_logger:set_metadata_peername(esockd:format(Peername)),
|
||||||
{ok, idle, State, [IdleTimeout]};
|
{ok, idle, State, [IdleTimeout]};
|
||||||
|
@ -689,8 +692,10 @@ terminate(Reason, _StateName, #state{channel = Channel}) ->
|
||||||
code_change({down, _Vsn}, StateName, State, [ToVsn]) ->
|
code_change({down, _Vsn}, StateName, State, [ToVsn]) ->
|
||||||
case re:run(ToVsn, "4\\.3\\.[2-5]") of
|
case re:run(ToVsn, "4\\.3\\.[2-5]") of
|
||||||
{match, _} ->
|
{match, _} ->
|
||||||
NState0 = lists:droplast(lists:droplast(tuple_to_list(State))),
|
NState0 = lists:droplast(
|
||||||
NState = list_to_tuple(lists:reverse(NState0)),
|
lists:droplast(
|
||||||
|
lists:droplast(tuple_to_list(State)))),
|
||||||
|
NState = list_to_tuple(NState0),
|
||||||
{ok, StateName, NState};
|
{ok, StateName, NState};
|
||||||
_ ->
|
_ ->
|
||||||
{ok, StateName, State}
|
{ok, StateName, State}
|
||||||
|
@ -700,7 +705,7 @@ code_change(_Vsn, StateName, State, [FromVsn]) ->
|
||||||
case re:run(FromVsn, "4\\.3\\.[2-5]") of
|
case re:run(FromVsn, "4\\.3\\.[2-5]") of
|
||||||
{match, _} ->
|
{match, _} ->
|
||||||
NState = list_to_tuple(
|
NState = list_to_tuple(
|
||||||
tuple_to_list(State) ++ [[], undefined]
|
tuple_to_list(State) ++ [false, [], undefined]
|
||||||
),
|
),
|
||||||
{ok, StateName, NState};
|
{ok, StateName, NState};
|
||||||
_ ->
|
_ ->
|
||||||
|
@ -1178,9 +1183,9 @@ handle_incoming(
|
||||||
clean_start = false}
|
clean_start = false}
|
||||||
} = Packet,
|
} = Packet,
|
||||||
_,
|
_,
|
||||||
State) ->
|
State = #state{subs_resume = SubsResume}) ->
|
||||||
Result = channel_handle_in(Packet, State),
|
Result = channel_handle_in(Packet, State),
|
||||||
case {subs_resume(), Result} of
|
case {SubsResume, Result} of
|
||||||
{true, {ok, Replies, NChannel}} ->
|
{true, {ok, Replies, NChannel}} ->
|
||||||
case maps:get(
|
case maps:get(
|
||||||
subscriptions,
|
subscriptions,
|
||||||
|
@ -1327,9 +1332,6 @@ maybe_send_puback(?QOS_0, _TopicId, _MsgId, _ReasonCode, State) ->
|
||||||
maybe_send_puback(_QoS, TopicId, MsgId, ReasonCode, State) ->
|
maybe_send_puback(_QoS, TopicId, MsgId, ReasonCode, State) ->
|
||||||
send_message(?SN_PUBACK_MSG(TopicId, MsgId, ReasonCode), State).
|
send_message(?SN_PUBACK_MSG(TopicId, MsgId, ReasonCode), State).
|
||||||
|
|
||||||
subs_resume() ->
|
|
||||||
application:get_env(emqx_sn, subs_resume, false).
|
|
||||||
|
|
||||||
%% Replies = [{event, connected}, {connack, ConnAck}, {outgoing, Pkts}]
|
%% Replies = [{event, connected}, {connack, ConnAck}, {outgoing, Pkts}]
|
||||||
split_connack_replies([A = {event, connected},
|
split_connack_replies([A = {event, connected},
|
||||||
B = {connack, _ConnAck} | Outgoings]) ->
|
B = {connack, _ConnAck} | Outgoings]) ->
|
||||||
|
|
|
@ -79,6 +79,12 @@ set_special_confs(emqx_sn) ->
|
||||||
set_special_confs(_App) ->
|
set_special_confs(_App) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
restart_emqx_sn(#{subs_resume := Bool}) ->
|
||||||
|
application:set_env(emqx_sn, subs_resume, Bool),
|
||||||
|
_ = application:stop(emqx_sn),
|
||||||
|
_ = application:ensure_all_started(emqx_sn),
|
||||||
|
ok.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Test cases
|
%% Test cases
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -1624,7 +1630,7 @@ t_broadcast_test1(_) ->
|
||||||
gen_udp:close(Socket).
|
gen_udp:close(Socket).
|
||||||
|
|
||||||
t_register_subs_resume_on(_) ->
|
t_register_subs_resume_on(_) ->
|
||||||
application:set_env(emqx_sn, subs_resume, true),
|
restart_emqx_sn(#{subs_resume => true}),
|
||||||
MsgId = 1,
|
MsgId = 1,
|
||||||
{ok, Socket} = gen_udp:open(0, [binary]),
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
||||||
send_connect_msg(Socket, <<"test">>, 0),
|
send_connect_msg(Socket, <<"test">>, 0),
|
||||||
|
@ -1710,8 +1716,6 @@ t_register_subs_resume_on(_) ->
|
||||||
%% no more messages
|
%% no more messages
|
||||||
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
|
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
|
||||||
|
|
||||||
application:set_env(emqx_sn, subs_resume, false),
|
|
||||||
|
|
||||||
gen_udp:close(NSocket),
|
gen_udp:close(NSocket),
|
||||||
{ok, NSocket1} = gen_udp:open(0, [binary]),
|
{ok, NSocket1} = gen_udp:open(0, [binary]),
|
||||||
send_connect_msg(NSocket1, <<"test">>),
|
send_connect_msg(NSocket1, <<"test">>),
|
||||||
|
@ -1719,7 +1723,8 @@ t_register_subs_resume_on(_) ->
|
||||||
receive_response(NSocket1)),
|
receive_response(NSocket1)),
|
||||||
send_disconnect_msg(NSocket1, undefined),
|
send_disconnect_msg(NSocket1, undefined),
|
||||||
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)),
|
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)),
|
||||||
gen_udp:close(NSocket1).
|
gen_udp:close(NSocket1),
|
||||||
|
restart_emqx_sn(#{subs_resume => false}).
|
||||||
|
|
||||||
t_register_subs_resume_off(_) ->
|
t_register_subs_resume_off(_) ->
|
||||||
MsgId = 1,
|
MsgId = 1,
|
||||||
|
@ -1829,7 +1834,7 @@ t_register_subs_resume_off(_) ->
|
||||||
gen_udp:close(NSocket1).
|
gen_udp:close(NSocket1).
|
||||||
|
|
||||||
t_register_skip_failure_topic_name_and_reach_max_retry_times(_) ->
|
t_register_skip_failure_topic_name_and_reach_max_retry_times(_) ->
|
||||||
application:set_env(emqx_sn, subs_resume, true),
|
restart_emqx_sn(#{subs_resume => true}),
|
||||||
MsgId = 1,
|
MsgId = 1,
|
||||||
{ok, Socket} = gen_udp:open(0, [binary]),
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
||||||
send_connect_msg(Socket, <<"test">>, 0),
|
send_connect_msg(Socket, <<"test">>, 0),
|
||||||
|
@ -1883,11 +1888,11 @@ t_register_skip_failure_topic_name_and_reach_max_retry_times(_) ->
|
||||||
%% shutdown due to reached max retry times
|
%% shutdown due to reached max retry times
|
||||||
timer:sleep(5000), %% RETYRY_TIMEOUT
|
timer:sleep(5000), %% RETYRY_TIMEOUT
|
||||||
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)),
|
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)),
|
||||||
application:set_env(emqx_sn, subs_resume, false),
|
gen_udp:close(NSocket),
|
||||||
gen_udp:close(NSocket).
|
restart_emqx_sn(#{subs_resume => false}).
|
||||||
|
|
||||||
t_register_enqueue_delivering_messages(_) ->
|
t_register_enqueue_delivering_messages(_) ->
|
||||||
application:set_env(emqx_sn, subs_resume, true),
|
restart_emqx_sn(#{subs_resume => true}),
|
||||||
MsgId = 1,
|
MsgId = 1,
|
||||||
{ok, Socket} = gen_udp:open(0, [binary]),
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
||||||
send_connect_msg(Socket, <<"test">>, 0),
|
send_connect_msg(Socket, <<"test">>, 0),
|
||||||
|
@ -1932,8 +1937,6 @@ t_register_enqueue_delivering_messages(_) ->
|
||||||
%% no more messages
|
%% no more messages
|
||||||
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
|
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
|
||||||
|
|
||||||
application:set_env(emqx_sn, subs_resume, false),
|
|
||||||
|
|
||||||
gen_udp:close(NSocket),
|
gen_udp:close(NSocket),
|
||||||
{ok, NSocket1} = gen_udp:open(0, [binary]),
|
{ok, NSocket1} = gen_udp:open(0, [binary]),
|
||||||
send_connect_msg(NSocket1, <<"test">>),
|
send_connect_msg(NSocket1, <<"test">>),
|
||||||
|
@ -1941,7 +1944,8 @@ t_register_enqueue_delivering_messages(_) ->
|
||||||
receive_response(NSocket1)),
|
receive_response(NSocket1)),
|
||||||
send_disconnect_msg(NSocket1, undefined),
|
send_disconnect_msg(NSocket1, undefined),
|
||||||
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)),
|
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket1)),
|
||||||
gen_udp:close(NSocket1).
|
gen_udp:close(NSocket1),
|
||||||
|
restart_emqx_sn(#{subs_resume => false}).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Helper funcs
|
%% Helper funcs
|
||||||
|
|
Loading…
Reference in New Issue