feat(quic): handle ctrl stream normal shutdown
This commit is contained in:
parent
2a6cdd9da6
commit
1692a16778
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
%% MQTT Over QUIC Shutdown Error code.
|
%% MQTT Over QUIC Shutdown Error code.
|
||||||
-define(MQTT_QUIC_CONN_NOERROR, 0).
|
-define(MQTT_QUIC_CONN_NOERROR, 0).
|
||||||
|
-define(MQTT_QUIC_CONN_ERROR_CTRL_STREAM_DOWN, 1).
|
||||||
-define(MQTT_QUIC_CONN_ERROR_OVERLOADED, 2).
|
-define(MQTT_QUIC_CONN_ERROR_OVERLOADED, 2).
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
|
@ -921,7 +921,8 @@ handle_info({sock_error, Reason}, State) ->
|
||||||
false -> ok
|
false -> ok
|
||||||
end,
|
end,
|
||||||
handle_info({sock_closed, Reason}, close_socket(State));
|
handle_info({sock_closed, Reason}, close_socket(State));
|
||||||
handle_info({quic, Event, Handle, Prop}, State) ->
|
%% handle QUIC control stream events
|
||||||
|
handle_info({quic, Event, Handle, Prop}, State) when is_atom(Event) ->
|
||||||
emqx_quic_stream:Event(Handle, Prop, State);
|
emqx_quic_stream:Event(Handle, Prop, State);
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
with_channel(handle_info, [Info], State).
|
with_channel(handle_info, [Info], State).
|
||||||
|
|
|
@ -179,7 +179,13 @@ new_stream(
|
||||||
SOpts1,
|
SOpts1,
|
||||||
Props
|
Props
|
||||||
),
|
),
|
||||||
quicer:handoff_stream(Stream, NewStreamOwner, {PS, Serialize, Channel}),
|
case quicer:handoff_stream(Stream, NewStreamOwner, {PS, Serialize, Channel}) of
|
||||||
|
ok ->
|
||||||
|
ok;
|
||||||
|
E ->
|
||||||
|
%% Only log, keep connecion alive.
|
||||||
|
?SLOG(error, #{message => "new stream handoff failed", stream => Stream, error => E})
|
||||||
|
end,
|
||||||
%% @TODO maybe keep them in `inactive_streams'
|
%% @TODO maybe keep them in `inactive_streams'
|
||||||
{ok, S#{streams := [{NewStreamOwner, Stream} | Streams]}}.
|
{ok, S#{streams := [{NewStreamOwner, Stream} | Streams]}}.
|
||||||
|
|
||||||
|
@ -200,7 +206,7 @@ transport_shutdown(_C, DownInfo, S) when is_map(DownInfo) ->
|
||||||
%% @doc callback for handling for peer addr changed.
|
%% @doc callback for handling for peer addr changed.
|
||||||
-spec peer_address_changed(quicer:connection_handle(), quicer:quicer_addr(), cb_state) -> cb_ret().
|
-spec peer_address_changed(quicer:connection_handle(), quicer:quicer_addr(), cb_state) -> cb_ret().
|
||||||
peer_address_changed(_C, _NewAddr, S) ->
|
peer_address_changed(_C, _NewAddr, S) ->
|
||||||
%% @TODO update session info?
|
%% @TODO update conn info in emqx_quic_stream
|
||||||
{ok, S}.
|
{ok, S}.
|
||||||
|
|
||||||
%% @doc callback for handling local addr change, currently unused
|
%% @doc callback for handling local addr change, currently unused
|
||||||
|
@ -224,7 +230,7 @@ streams_available(_C, {BidirCnt, UnidirCnt}, S) ->
|
||||||
%% @doc callback for handling request when remote wants for more streams
|
%% @doc callback for handling request when remote wants for more streams
|
||||||
%% should cope with rate limiting
|
%% should cope with rate limiting
|
||||||
%% @TODO this is not going to get triggered in current version
|
%% @TODO this is not going to get triggered in current version
|
||||||
%% for https://github.com/microsoft/msquic/issues/3120
|
%% ref: https://github.com/microsoft/msquic/issues/3120
|
||||||
-spec peer_needs_streams(quicer:connection_handle(), undefined, cb_state()) -> cb_ret().
|
-spec peer_needs_streams(quicer:connection_handle(), undefined, cb_state()) -> cb_ret().
|
||||||
peer_needs_streams(_C, undefined, S) ->
|
peer_needs_streams(_C, undefined, S) ->
|
||||||
?SLOG(info, #{
|
?SLOG(info, #{
|
||||||
|
@ -240,6 +246,10 @@ handle_call(
|
||||||
#{streams := Streams} = S
|
#{streams := Streams} = S
|
||||||
) ->
|
) ->
|
||||||
[
|
[
|
||||||
|
%% Try to activate streams individually if failed, stream will shutdown on its own.
|
||||||
|
%% we dont care about the return val here.
|
||||||
|
%% note, this is only used after control stream pass the validation. The data streams
|
||||||
|
%% that are called here are assured to be inactived (data processing hasn't been started).
|
||||||
catch emqx_quic_data_stream:activate_data(OwnerPid, ActivateData)
|
catch emqx_quic_data_stream:activate_data(OwnerPid, ActivateData)
|
||||||
|| {OwnerPid, _Stream} <- Streams
|
|| {OwnerPid, _Stream} <- Streams
|
||||||
],
|
],
|
||||||
|
@ -255,10 +265,15 @@ handle_call(_Req, _From, S) ->
|
||||||
handle_info({'EXIT', Pid, Reason}, #{ctrl_pid := Pid, conn := Conn} = S) ->
|
handle_info({'EXIT', Pid, Reason}, #{ctrl_pid := Pid, conn := Conn} = S) ->
|
||||||
case Reason of
|
case Reason of
|
||||||
normal ->
|
normal ->
|
||||||
quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0);
|
quicer:async_shutdown_connection(
|
||||||
|
Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, ?MQTT_QUIC_CONN_NOERROR
|
||||||
|
);
|
||||||
_ ->
|
_ ->
|
||||||
%% @TODO have some reasons mappings here.
|
quicer:async_shutdown_connection(
|
||||||
quicer:async_shutdown_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 1)
|
Conn,
|
||||||
|
?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE,
|
||||||
|
?MQTT_QUIC_CONN_ERROR_CTRL_STREAM_DOWN
|
||||||
|
)
|
||||||
end,
|
end,
|
||||||
{ok, S};
|
{ok, S};
|
||||||
handle_info({'EXIT', Pid, Reason}, #{streams := Streams} = S) ->
|
handle_info({'EXIT', Pid, Reason}, #{streams := Streams} = S) ->
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
%% limitations under the License.
|
%% limitations under the License.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
%% MQTT/QUIC Stream
|
%% MQTT/QUIC control Stream
|
||||||
-module(emqx_quic_stream).
|
-module(emqx_quic_stream).
|
||||||
|
|
||||||
-ifndef(BUILD_WITHOUT_QUIC).
|
-ifndef(BUILD_WITHOUT_QUIC).
|
||||||
|
@ -38,6 +38,7 @@
|
||||||
peercert/1
|
peercert/1
|
||||||
]).
|
]).
|
||||||
-include_lib("quicer/include/quicer.hrl").
|
-include_lib("quicer/include/quicer.hrl").
|
||||||
|
-include_lib("emqx/include/emqx_quic.hrl").
|
||||||
|
|
||||||
-type cb_ret() :: quicer_stream:cb_ret().
|
-type cb_ret() :: quicer_stream:cb_ret().
|
||||||
-type cb_data() :: quicer_stream:cb_state().
|
-type cb_data() :: quicer_stream:cb_state().
|
||||||
|
@ -223,10 +224,17 @@ stream_closed(
|
||||||
is_atom(Status) andalso
|
is_atom(Status) andalso
|
||||||
is_integer(Code)
|
is_integer(Code)
|
||||||
->
|
->
|
||||||
%% @TODO for now we fake a sock_closed for
|
%% For now we fake a sock_closed for
|
||||||
%% emqx_connection:process_msg to append
|
%% emqx_connection:process_msg to append
|
||||||
%% a msg to be processed
|
%% a msg to be processed
|
||||||
{ok, {sock_closed, Status}, S}.
|
Reason =
|
||||||
|
case Code of
|
||||||
|
?MQTT_QUIC_CONN_NOERROR ->
|
||||||
|
normal;
|
||||||
|
_ ->
|
||||||
|
Status
|
||||||
|
end,
|
||||||
|
{ok, {sock_closed, Reason}, S}.
|
||||||
|
|
||||||
%%%
|
%%%
|
||||||
%%% Internals
|
%%% Internals
|
||||||
|
|
|
@ -118,6 +118,8 @@ groups() ->
|
||||||
t_multi_streams_shutdown_ctrl_stream,
|
t_multi_streams_shutdown_ctrl_stream,
|
||||||
t_multi_streams_shutdown_ctrl_stream_then_reconnect,
|
t_multi_streams_shutdown_ctrl_stream_then_reconnect,
|
||||||
t_multi_streams_remote_shutdown,
|
t_multi_streams_remote_shutdown,
|
||||||
|
t_multi_streams_emqx_ctrl_kill,
|
||||||
|
t_multi_streams_emqx_ctrl_exit_normal,
|
||||||
t_multi_streams_remote_shutdown_with_reconnect
|
t_multi_streams_remote_shutdown_with_reconnect
|
||||||
]},
|
]},
|
||||||
|
|
||||||
|
@ -1327,7 +1329,13 @@ t_multi_streams_shutdown_ctrl_stream(Config) ->
|
||||||
),
|
),
|
||||||
|
|
||||||
{quic, _Conn, Ctrlstream} = proplists:get_value(socket, emqtt:info(C)),
|
{quic, _Conn, Ctrlstream} = proplists:get_value(socket, emqtt:info(C)),
|
||||||
quicer:shutdown_stream(Ctrlstream, ?config(stream_shutdown_flag, Config), 500, 1000),
|
Flag = ?config(stream_shutdown_flag, Config),
|
||||||
|
AppErrorCode =
|
||||||
|
case Flag of
|
||||||
|
?QUIC_STREAM_SHUTDOWN_FLAG_GRACEFUL -> 0;
|
||||||
|
_ -> 500
|
||||||
|
end,
|
||||||
|
quicer:shutdown_stream(Ctrlstream, Flag, AppErrorCode, 1000),
|
||||||
timer:sleep(500),
|
timer:sleep(500),
|
||||||
%% Client should be closed
|
%% Client should be closed
|
||||||
?assertMatch({'EXIT', {noproc, {gen_statem, call, [_, info, infinity]}}}, catch emqtt:info(C)).
|
?assertMatch({'EXIT', {noproc, {gen_statem, call, [_, info, infinity]}}}, catch emqtt:info(C)).
|
||||||
|
@ -1384,6 +1392,114 @@ t_multi_streams_shutdown_ctrl_stream_then_reconnect(Config) ->
|
||||||
%% Client should be closed
|
%% Client should be closed
|
||||||
?assert(is_list(emqtt:info(C))).
|
?assert(is_list(emqtt:info(C))).
|
||||||
|
|
||||||
|
t_multi_streams_emqx_ctrl_kill(Config) ->
|
||||||
|
erlang:process_flag(trap_exit, true),
|
||||||
|
PubQos = ?config(pub_qos, Config),
|
||||||
|
SubQos = ?config(sub_qos, Config),
|
||||||
|
RecQos = calc_qos(PubQos, SubQos),
|
||||||
|
PktId1 = calc_pkt_id(RecQos, 1),
|
||||||
|
|
||||||
|
Topic = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
Topic2 = <<Topic/binary, "two">>,
|
||||||
|
{ok, C} = emqtt:start_link([
|
||||||
|
{proto_ver, v5},
|
||||||
|
{reconnect, false},
|
||||||
|
%% speedup test
|
||||||
|
{connect_timeout, 5}
|
||||||
|
| Config
|
||||||
|
]),
|
||||||
|
{ok, _} = emqtt:quic_connect(C),
|
||||||
|
{ok, #{via := SVia}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [
|
||||||
|
{Topic, [{qos, SubQos}]}
|
||||||
|
]),
|
||||||
|
{ok, #{via := SVia2}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [
|
||||||
|
{Topic2, [{qos, SubQos}]}
|
||||||
|
]),
|
||||||
|
|
||||||
|
?assert(SVia2 =/= SVia),
|
||||||
|
|
||||||
|
case
|
||||||
|
emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<1, 2, 3, 4, 5>>, [{qos, PubQos}])
|
||||||
|
of
|
||||||
|
ok when PubQos == 0 -> ok;
|
||||||
|
{ok, #{reason_code := 0, via := _PVia}} -> ok
|
||||||
|
end,
|
||||||
|
|
||||||
|
PubRecvs = recv_pub(1),
|
||||||
|
?assertMatch(
|
||||||
|
[
|
||||||
|
{publish, #{
|
||||||
|
client_pid := C,
|
||||||
|
packet_id := PktId1,
|
||||||
|
payload := <<1, 2, 3, 4, 5>>,
|
||||||
|
qos := RecQos
|
||||||
|
}}
|
||||||
|
],
|
||||||
|
PubRecvs
|
||||||
|
),
|
||||||
|
|
||||||
|
ClientId = proplists:get_value(clientid, emqtt:info(C)),
|
||||||
|
[{ClientId, TransPid}] = ets:lookup(emqx_channel, ClientId),
|
||||||
|
exit(TransPid, kill),
|
||||||
|
|
||||||
|
timer:sleep(200),
|
||||||
|
%% Client should be closed
|
||||||
|
?assertMatch({'EXIT', {noproc, {gen_statem, call, [_, info, infinity]}}}, catch emqtt:info(C)).
|
||||||
|
|
||||||
|
t_multi_streams_emqx_ctrl_exit_normal(Config) ->
|
||||||
|
erlang:process_flag(trap_exit, true),
|
||||||
|
PubQos = ?config(pub_qos, Config),
|
||||||
|
SubQos = ?config(sub_qos, Config),
|
||||||
|
RecQos = calc_qos(PubQos, SubQos),
|
||||||
|
PktId1 = calc_pkt_id(RecQos, 1),
|
||||||
|
|
||||||
|
Topic = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
Topic2 = <<Topic/binary, "two">>,
|
||||||
|
{ok, C} = emqtt:start_link([
|
||||||
|
{proto_ver, v5},
|
||||||
|
{reconnect, false},
|
||||||
|
%% speedup test
|
||||||
|
{connect_timeout, 5}
|
||||||
|
| Config
|
||||||
|
]),
|
||||||
|
{ok, _} = emqtt:quic_connect(C),
|
||||||
|
{ok, #{via := SVia}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [
|
||||||
|
{Topic, [{qos, SubQos}]}
|
||||||
|
]),
|
||||||
|
{ok, #{via := SVia2}, [SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [
|
||||||
|
{Topic2, [{qos, SubQos}]}
|
||||||
|
]),
|
||||||
|
|
||||||
|
?assert(SVia2 =/= SVia),
|
||||||
|
|
||||||
|
case
|
||||||
|
emqtt:publish_via(C, {new_data_stream, []}, Topic, #{}, <<1, 2, 3, 4, 5>>, [{qos, PubQos}])
|
||||||
|
of
|
||||||
|
ok when PubQos == 0 -> ok;
|
||||||
|
{ok, #{reason_code := 0, via := _PVia}} -> ok
|
||||||
|
end,
|
||||||
|
|
||||||
|
PubRecvs = recv_pub(1),
|
||||||
|
?assertMatch(
|
||||||
|
[
|
||||||
|
{publish, #{
|
||||||
|
client_pid := C,
|
||||||
|
packet_id := PktId1,
|
||||||
|
payload := <<1, 2, 3, 4, 5>>,
|
||||||
|
qos := RecQos
|
||||||
|
}}
|
||||||
|
],
|
||||||
|
PubRecvs
|
||||||
|
),
|
||||||
|
|
||||||
|
ClientId = proplists:get_value(clientid, emqtt:info(C)),
|
||||||
|
[{ClientId, TransPid}] = ets:lookup(emqx_channel, ClientId),
|
||||||
|
|
||||||
|
emqx_connection:stop(TransPid),
|
||||||
|
timer:sleep(200),
|
||||||
|
%% Client exit normal.
|
||||||
|
?assertMatch({'EXIT', {normal, {gen_statem, call, [_, info, infinity]}}}, catch emqtt:info(C)).
|
||||||
|
|
||||||
t_multi_streams_remote_shutdown(Config) ->
|
t_multi_streams_remote_shutdown(Config) ->
|
||||||
erlang:process_flag(trap_exit, true),
|
erlang:process_flag(trap_exit, true),
|
||||||
PubQos = ?config(pub_qos, Config),
|
PubQos = ?config(pub_qos, Config),
|
||||||
|
|
Loading…
Reference in New Issue