From b0d4a22aa850db1a6f81637db7f011a0f86d3225 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Fri, 18 Aug 2023 21:10:20 +0300 Subject: [PATCH 1/6] chore(ft): refactor async reply mechanism --- apps/emqx/src/emqx_channel.erl | 15 +- apps/emqx/src/emqx_cm.erl | 2 +- apps/emqx_ft/src/emqx_ft.erl | 181 +++++++++--------- apps/emqx_ft/src/emqx_ft_app.erl | 1 + apps/emqx_ft/src/emqx_ft_async_reply.erl | 106 ++++++++++ apps/emqx_ft/src/emqx_ft_responder.erl | 116 ----------- apps/emqx_ft/src/emqx_ft_responder_sup.erl | 48 ----- apps/emqx_ft/src/emqx_ft_storage.erl | 8 + apps/emqx_ft/src/emqx_ft_sup.erl | 11 +- apps/emqx_ft/test/emqx_ft_SUITE.erl | 2 +- apps/emqx_ft/test/emqx_ft_responder_SUITE.erl | 84 -------- 11 files changed, 221 insertions(+), 353 deletions(-) create mode 100644 apps/emqx_ft/src/emqx_ft_async_reply.erl delete mode 100644 apps/emqx_ft/src/emqx_ft_responder.erl delete mode 100644 apps/emqx_ft/src/emqx_ft_responder_sup.erl delete mode 100644 apps/emqx_ft/test/emqx_ft_responder_SUITE.erl diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 87680358f..8d0a58767 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1252,6 +1252,11 @@ handle_info({disconnect, ReasonCode, ReasonName, Props}, Channel) -> handle_out(disconnect, {ReasonCode, ReasonName, Props}, Channel); handle_info({puback, PacketId, PubRes, RC}, Channel) -> do_finish_publish(PacketId, PubRes, RC, Channel); +handle_info({'DOWN', Ref, process, Pid, Reason}, Channel) -> + case emqx_hooks:run_fold('client.monitored_process_down', [Ref, Pid, Reason], []) of + [] -> {ok, Channel}; + Msgs -> {ok, Msgs, Channel} + end; handle_info(Info, Channel) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), {ok, Channel}. @@ -1358,9 +1363,13 @@ handle_timeout( {_, Quota2} -> {ok, clean_timer(quota_timer, Channel#channel{quota = Quota2})} end; -handle_timeout(_TRef, Msg, Channel) -> - ?SLOG(error, #{msg => "unexpected_timeout", timeout_msg => Msg}), - {ok, Channel}. +handle_timeout(TRef, Msg, Channel) -> + case emqx_hooks:run_fold('client.timeout', [TRef, Msg], []) of + [] -> + {ok, Channel}; + Msgs -> + {ok, Msgs, Channel} + end. %%-------------------------------------------------------------------- %% Ensure timers diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index c680560fb..4e4afa678 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -189,7 +189,7 @@ do_unregister_channel({_ClientId, ChanPid} = Chan) -> true = ets:delete(?CHAN_CONN_TAB, Chan), true = ets:delete(?CHAN_INFO_TAB, Chan), ets:delete_object(?CHAN_TAB, Chan), - ok = emqx_hooks:run('channel.unregistered', [ChanPid]), + ok = emqx_hooks:run('cm.channel.unregistered', [ChanPid]), true. -spec connection_closed(emqx_types:clientid()) -> true. diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index 41046907b..521d5b10a 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -28,7 +28,10 @@ -export([ on_message_publish/1, - on_message_puback/4 + on_message_puback/4, + on_client_timeout/3, + on_process_down/4, + on_channel_unregistered/1 ]). -export([ @@ -36,8 +39,6 @@ encode_filemeta/1 ]). --export([on_complete/4]). - -export_type([ clientid/0, transfer/0, @@ -85,17 +86,29 @@ checksum => checksum() }. +-define(FT_EVENT(EVENT), {?MODULE, EVENT}). + %%-------------------------------------------------------------------- %% API for app %%-------------------------------------------------------------------- hook() -> ok = emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_LOWEST), - ok = emqx_hooks:put('message.puback', {?MODULE, on_message_puback, []}, ?HP_LOWEST). + ok = emqx_hooks:put('message.puback', {?MODULE, on_message_puback, []}, ?HP_LOWEST), + ok = emqx_hooks:put('client.timeout', {?MODULE, on_client_timeout, []}, ?HP_LOWEST), + ok = emqx_hooks:put( + 'client.monitored_process_down', {?MODULE, on_process_down, []}, ?HP_LOWEST + ), + ok = emqx_hooks:put( + 'cm.channel.unregistered', {?MODULE, on_channel_unregistered, []}, ?HP_LOWEST + ). unhook() -> ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}), - ok = emqx_hooks:del('message.puback', {?MODULE, on_message_puback}). + ok = emqx_hooks:del('message.puback', {?MODULE, on_message_puback}), + ok = emqx_hooks:del('client.timeout', {?MODULE, on_client_timeout}), + ok = emqx_hooks:del('client.monitored_process_down', {?MODULE, on_process_down}), + ok = emqx_hooks:del('cm.channel.unregistered', {?MODULE, on_channel_unregistered}). %%-------------------------------------------------------------------- %% API @@ -145,6 +158,25 @@ on_message_puback(PacketId, #message{topic = Topic} = Msg, _PubRes, _RC) -> ignore end. +on_channel_unregistered(ChannelPid) -> + ok = emqx_ft_async_reply:deregister_all(ChannelPid). + +on_client_timeout(_TRef, ?FT_EVENT({MRef, PacketId}), Acc) -> + _ = erlang:demonitor(MRef, [flush]), + _ = emqx_ft_async_reply:take_by_mref(MRef), + {ok, [{outgoing, ?PUBACK_PACKET(PacketId, ?RC_UNSPECIFIED_ERROR)} | Acc]}; +on_client_timeout(_TRef, _Event, Acc) -> + {ok, Acc}. + +on_process_down(MRef, _Pid, Reason, Acc) -> + case emqx_ft_async_reply:take_by_mref(MRef) of + {ok, PacketId, TRef} -> + _ = emqx_utils:cancel_timer(TRef), + {ok, [{outgoing, ?PUBACK_PACKET(PacketId, reason_to_rc(Reason))} | Acc]}; + not_found -> + {ok, Acc} + end. + %%-------------------------------------------------------------------- %% Handlers for transfer messages %%-------------------------------------------------------------------- @@ -208,24 +240,13 @@ on_init(PacketId, Msg, Transfer, Meta) -> transfer => Transfer, filemeta => Meta }), - PacketKey = {self(), PacketId}, - Callback = fun(Result) -> - ?MODULE:on_complete("store_filemeta", PacketKey, Transfer, Result) - end, - with_responder(PacketKey, Callback, emqx_ft_conf:init_timeout(), fun() -> - case store_filemeta(Transfer, Meta) of - % Stored, ack through the responder right away - ok -> - emqx_ft_responder:ack(PacketKey, ok); - % Storage operation started, packet will be acked by the responder - % {async, Pid} -> - % ok = emqx_ft_responder:kickoff(PacketKey, Pid), - % ok; - %% Storage operation failed, ack through the responder - {error, _} = Error -> - emqx_ft_responder:ack(PacketKey, Error) - end - end). + %% Currently synchronous. + %% If we want to make it async, we need to use `emqx_ft_async_reply`, + %% like in `on_fin`. + case store_filemeta(Transfer, Meta) of + ok -> ?RC_SUCCESS; + {error, _} -> ?RC_UNSPECIFIED_ERROR + end. on_abort(_Msg, _FileId) -> %% TODO @@ -240,21 +261,13 @@ on_segment(PacketId, Msg, Transfer, Offset, Checksum) -> checksum => Checksum }), Segment = {Offset, Msg#message.payload}, - PacketKey = {self(), PacketId}, - Callback = fun(Result) -> - ?MODULE:on_complete("store_segment", PacketKey, Transfer, Result) - end, - with_responder(PacketKey, Callback, emqx_ft_conf:store_segment_timeout(), fun() -> - case store_segment(Transfer, Segment) of - ok -> - emqx_ft_responder:ack(PacketKey, ok); - % {async, Pid} -> - % ok = emqx_ft_responder:kickoff(PacketKey, Pid), - % ok; - {error, _} = Error -> - emqx_ft_responder:ack(PacketKey, Error) - end - end). + %% Currently synchronous. + %% If we want to make it async, we need to use `emqx_ft_async_reply`, + %% like in `on_fin`. + case store_segment(Transfer, Segment) of + ok -> ?RC_SUCCESS; + {error, _} -> ?RC_UNSPECIFIED_ERROR + end. on_fin(PacketId, Msg, Transfer, FinalSize, FinalChecksum) -> ?tp(info, "file_transfer_fin", #{ @@ -265,37 +278,30 @@ on_fin(PacketId, Msg, Transfer, FinalSize, FinalChecksum) -> checksum => FinalChecksum }), %% TODO: handle checksum? Do we need it? - FinPacketKey = {self(), PacketId}, - Callback = fun(Result) -> - ?MODULE:on_complete("assemble", FinPacketKey, Transfer, Result) - end, - with_responder(FinPacketKey, Callback, emqx_ft_conf:assemble_timeout(), fun() -> - case assemble(Transfer, FinalSize, FinalChecksum) of - %% Assembling completed, ack through the responder right away - ok -> - emqx_ft_responder:ack(FinPacketKey, ok); - %% Assembling started, packet will be acked by the responder - {async, Pid} -> - ok = emqx_ft_responder:kickoff(FinPacketKey, Pid), - ok; - %% Assembling failed, ack through the responder - {error, _} = Error -> - emqx_ft_responder:ack(FinPacketKey, Error) - end - end). + emqx_ft_async_reply:with_new_packet( + PacketId, + fun() -> + case assemble(Transfer, FinalSize, FinalChecksum) of + ok -> + ?RC_SUCCESS; + %% Assembling started, packet will be acked by monitor or timeout + {async, Pid} -> + ok = register_async_reply(Pid, PacketId), + ok = emqx_ft_storage:kickoff(Pid), + undefined; + {error, _} -> + ?RC_UNSPECIFIED_ERROR + end + end, + undefined + ). -with_responder(Key, Callback, Timeout, CriticalSection) -> - case emqx_ft_responder:start(Key, Callback, Timeout) of - %% We have new packet - {ok, _} -> - CriticalSection(); - %% Packet already received. - %% Since we are still handling the previous one, - %% we probably have retransmit here - {error, {already_started, _}} -> - ok - end, - undefined. +register_async_reply(Pid, PacketId) -> + MRef = erlang:monitor(process, Pid), + TRef = erlang:start_timer( + emqx_ft_conf:assemble_timeout(), self(), ?FT_EVENT({MRef, PacketId}) + ), + ok = emqx_ft_async_reply:register(PacketId, MRef, TRef). store_filemeta(Transfer, Segment) -> try @@ -335,28 +341,6 @@ transfer(Msg, FileId) -> ClientId = Msg#message.from, {clientid_to_binary(ClientId), FileId}. -on_complete(Op, {ChanPid, PacketId}, Transfer, Result) -> - ?tp(debug, "on_complete", #{ - operation => Op, - packet_id => PacketId, - transfer => Transfer - }), - case Result of - {Mode, ok} when Mode == ack orelse Mode == down -> - erlang:send(ChanPid, {puback, PacketId, [], ?RC_SUCCESS}); - {Mode, {error, _} = Reason} when Mode == ack orelse Mode == down -> - ?tp(error, Op ++ "_failed", #{ - transfer => Transfer, - reason => Reason - }), - erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR}); - timeout -> - ?tp(error, Op ++ "_timed_out", #{ - transfer => Transfer - }), - erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR}) - end. - validate(Validations, Fun) -> case do_validate(Validations, []) of {ok, Parsed} -> @@ -429,3 +413,20 @@ clientid_to_binary(A) when is_atom(A) -> atom_to_binary(A); clientid_to_binary(B) when is_binary(B) -> B. + +reason_to_rc(Reason) -> + case map_down_reason(Reason) of + ok -> ?RC_SUCCESS; + {error, _} -> ?RC_UNSPECIFIED_ERROR + end. + +map_down_reason(normal) -> + ok; +map_down_reason(shutdown) -> + ok; +map_down_reason({shutdown, Result}) -> + Result; +map_down_reason(noproc) -> + {error, noproc}; +map_down_reason(Error) -> + {error, {internal_error, Error}}. diff --git a/apps/emqx_ft/src/emqx_ft_app.erl b/apps/emqx_ft/src/emqx_ft_app.erl index 43a4cc816..9ef215bf9 100644 --- a/apps/emqx_ft/src/emqx_ft_app.erl +++ b/apps/emqx_ft/src/emqx_ft_app.erl @@ -22,6 +22,7 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqx_ft_sup:start_link(), + ok = emqx_ft_async_reply:create_table(), ok = emqx_ft_conf:load(), {ok, Sup}. diff --git a/apps/emqx_ft/src/emqx_ft_async_reply.erl b/apps/emqx_ft/src/emqx_ft_async_reply.erl new file mode 100644 index 000000000..4eee2c544 --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_async_reply.erl @@ -0,0 +1,106 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_async_reply). + +-include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/types.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). + +-export([ + create_tables/0 +]). + +-export([ + register/3, + take_by_mref/1, + with_new_packet/3, + deregister_all/1 +]). + +-type channel_pid() :: pid(). +-type mon_ref() :: reference(). +-type timer_ref() :: reference(). +-type packet_id() :: emqx_types:packet_id(). + +%% packets waiting for async workers + +-define(WORKER_TAB, emqx_ft_async_mons). +-define(WORKER_KEY(MRef), ?WORKER_KEY(self(), MRef)). +-define(WORKER_KEY(ChannelPid, MRef), {ChannelPid, MRef}). + +%% async worker monitors by packet ids + +-define(PACKET_TAB, emqx_ft_async_packets). +-define(PACKET_KEY(PacketId), ?PACKET_KEY(self(), PacketId)). +-define(PACKET_KEY(ChannelPid, PacketId), {ChannelPid, PacketId}). + +%%-------------------------------------------------------------------- +%% API +%% ------------------------------------------------------------------- + +-spec create_tables() -> ok. +create_tables() -> + _ = ets:new(?WORKER_TAB, [named_table, public, ordered_set]), + _ = ets:new(?PACKET_TAB, [named_table, public, ordered_set]), + ok. + +-spec register(packet_id(), mon_ref(), timer_ref()) -> ok. +register(PacketId, MRef, TRef) -> + _ = ets:insert(?PACKET_TAB, {?PACKET_KEY(PacketId), MRef}), + _ = ets:insert(?WORKER_TAB, {?WORKER_KEY(MRef), PacketId, TRef}), + ok. + +-spec with_new_packet(packet_id(), fun(() -> any()), any()) -> any(). +with_new_packet(PacketId, Fun, Default) -> + case ets:member(?PACKET_TAB, ?PACKET_KEY(PacketId)) of + true -> Default; + false -> Fun() + end. + +-spec take_by_mref(mon_ref()) -> {ok, packet_id(), timer_ref()} | not_found. +take_by_mref(MRef) -> + case ets:take(?WORKER_TAB, ?WORKER_KEY(MRef)) of + [{_, PacketId, TRef}] -> + _ = ets:delete(?PACKET_TAB, ?PACKET_KEY(PacketId)), + {ok, PacketId, TRef}; + [] -> + not_found + end. + +-spec deregister_all(channel_pid()) -> ok. +deregister_all(ChannelPid) -> + ok = deregister_packets(ChannelPid), + ok = deregister_mons(ChannelPid), + ok. + +-spec info() -> {non_neg_integer(), non_neg_integer()}. +info() -> + {ets:info(?MON_TAB, size), ets:info(?PACKET_TAB, size)}. + +%%-------------------------------------------------------------------- +%% Internal +%%------------------------------------------------------------------- + +deregister_packets(ChannelPid) when is_pid(ChannelPid) -> + MS = [{{?PACKET_KEY(ChannelPid, '_'), '_'}, [], [true]}], + _ = ets:select_delete(?PACKET_TAB, MS), + ok. + +deregister_mons(ChannelPid) -> + MS = [{{?MON_KEY(ChannelPid, '_'), '_', '_'}, [], [true]}], + _ = ets:select_delete(?MON_TAB, MS), + ok. diff --git a/apps/emqx_ft/src/emqx_ft_responder.erl b/apps/emqx_ft/src/emqx_ft_responder.erl deleted file mode 100644 index c2c62e1c2..000000000 --- a/apps/emqx_ft/src/emqx_ft_responder.erl +++ /dev/null @@ -1,116 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_ft_responder). - --behaviour(gen_server). - --include_lib("emqx/include/logger.hrl"). --include_lib("emqx/include/types.hrl"). - --include_lib("snabbkaffe/include/snabbkaffe.hrl"). - -%% API --export([start/3]). --export([kickoff/2]). --export([ack/2]). - -%% Supervisor API --export([start_link/3]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). - --define(REF(Key), {via, gproc, {n, l, {?MODULE, Key}}}). - --type key() :: term(). --type respfun() :: fun(({ack, _Result} | {down, _Result} | timeout) -> _SideEffect). - -%%-------------------------------------------------------------------- -%% API -%% ------------------------------------------------------------------- - --spec start(key(), respfun(), timeout()) -> startlink_ret(). -start(Key, RespFun, Timeout) -> - emqx_ft_responder_sup:start_child(Key, RespFun, Timeout). - --spec kickoff(key(), pid()) -> ok. -kickoff(Key, Pid) -> - gen_server:call(?REF(Key), {kickoff, Pid}). - --spec ack(key(), _Result) -> _Return. -ack(Key, Result) -> - % TODO: it's possible to avoid term copy - gen_server:call(?REF(Key), {ack, Result}, infinity). - --spec start_link(key(), timeout(), respfun()) -> startlink_ret(). -start_link(Key, RespFun, Timeout) -> - gen_server:start_link(?REF(Key), ?MODULE, {Key, RespFun, Timeout}, []). - -%%-------------------------------------------------------------------- -%% gen_server callbacks -%% ------------------------------------------------------------------- - -init({Key, RespFun, Timeout}) -> - _ = erlang:process_flag(trap_exit, true), - _TRef = erlang:send_after(Timeout, self(), timeout), - {ok, {Key, RespFun}}. - -handle_call({kickoff, Pid}, _From, St) -> - % TODO: more state? - _MRef = erlang:monitor(process, Pid), - _ = Pid ! kickoff, - {reply, ok, St}; -handle_call({ack, Result}, _From, {Key, RespFun}) -> - Ret = apply(RespFun, [{ack, Result}]), - ?tp(debug, ft_responder_ack, #{key => Key, result => Result, return => Ret}), - {stop, {shutdown, Ret}, Ret, undefined}; -handle_call(Msg, _From, State) -> - ?SLOG(warning, #{msg => "unknown_call", call_msg => Msg}), - {reply, {error, unknown_call}, State}. - -handle_cast(Msg, State) -> - ?SLOG(warning, #{msg => "unknown_cast", cast_msg => Msg}), - {noreply, State}. - -handle_info(timeout, {Key, RespFun}) -> - Ret = apply(RespFun, [timeout]), - ?tp(debug, ft_responder_timeout, #{key => Key, return => Ret}), - {stop, {shutdown, Ret}, undefined}; -handle_info({'DOWN', _MRef, process, _Pid, Reason}, {Key, RespFun}) -> - Ret = apply(RespFun, [{down, map_down_reason(Reason)}]), - ?tp(debug, ft_responder_procdown, #{key => Key, reason => Reason, return => Ret}), - {stop, {shutdown, Ret}, undefined}; -handle_info(Msg, State) -> - ?SLOG(warning, #{msg => "unknown_message", info_msg => Msg}), - {noreply, State}. - -terminate(_Reason, undefined) -> - ok; -terminate(Reason, {Key, RespFun}) -> - Ret = apply(RespFun, [timeout]), - ?tp(debug, ft_responder_shutdown, #{key => Key, reason => Reason, return => Ret}), - ok. - -map_down_reason(normal) -> - ok; -map_down_reason(shutdown) -> - ok; -map_down_reason({shutdown, Result}) -> - Result; -map_down_reason(noproc) -> - {error, noproc}; -map_down_reason(Error) -> - {error, {internal_error, Error}}. diff --git a/apps/emqx_ft/src/emqx_ft_responder_sup.erl b/apps/emqx_ft/src/emqx_ft_responder_sup.erl deleted file mode 100644 index fb3932425..000000000 --- a/apps/emqx_ft/src/emqx_ft_responder_sup.erl +++ /dev/null @@ -1,48 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_ft_responder_sup). - --export([start_link/0]). --export([start_child/3]). - --behaviour(supervisor). --export([init/1]). - --define(SUPERVISOR, ?MODULE). - -%% - --spec start_link() -> {ok, pid()}. -start_link() -> - supervisor:start_link({local, ?SUPERVISOR}, ?MODULE, []). - -start_child(Key, RespFun, Timeout) -> - supervisor:start_child(?SUPERVISOR, [Key, RespFun, Timeout]). - --spec init(_) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}. -init(_) -> - Flags = #{ - strategy => simple_one_for_one, - intensity => 100, - period => 100 - }, - ChildSpec = #{ - id => responder, - start => {emqx_ft_responder, start_link, []}, - restart => temporary - }, - {ok, {Flags, [ChildSpec]}}. diff --git a/apps/emqx_ft/src/emqx_ft_storage.erl b/apps/emqx_ft/src/emqx_ft_storage.erl index 04fac3b38..506cf9789 100644 --- a/apps/emqx_ft/src/emqx_ft_storage.erl +++ b/apps/emqx_ft/src/emqx_ft_storage.erl @@ -23,6 +23,7 @@ store_filemeta/2, store_segment/2, assemble/3, + kickoff/1, files/0, files/1, @@ -121,6 +122,13 @@ store_segment(Transfer, Segment) -> assemble(Transfer, Size, FinOpts) -> dispatch(assemble, [Transfer, Size, FinOpts]). +-spec kickoff(pid()) -> ok. +kickoff(Pid) -> + _ = erlang:send(Pid, kickoff), + ok. + +%% + -spec files() -> {ok, page(file_info(), _)} | {error, term()}. files() -> diff --git a/apps/emqx_ft/src/emqx_ft_sup.erl b/apps/emqx_ft/src/emqx_ft_sup.erl index 0308668ab..512d534c3 100644 --- a/apps/emqx_ft/src/emqx_ft_sup.erl +++ b/apps/emqx_ft/src/emqx_ft_sup.erl @@ -52,14 +52,5 @@ init([]) -> modules => [emqx_ft_storage_fs_reader_sup] }, - Responder = #{ - id => emqx_ft_responder_sup, - start => {emqx_ft_responder_sup, start_link, []}, - restart => permanent, - shutdown => infinity, - type => worker, - modules => [emqx_ft_responder_sup] - }, - - ChildSpecs = [Responder, AssemblerSup, FileReaderSup], + ChildSpecs = [AssemblerSup, FileReaderSup], {ok, {SupFlags, ChildSpecs}}. diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl index 290cda333..405ab86ba 100644 --- a/apps/emqx_ft/test/emqx_ft_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl @@ -37,7 +37,7 @@ all() -> groups() -> [ - {single_node, [parallel], [ + {single_node, [], [ t_assemble_crash, t_corrupted_segment_retry, t_invalid_checksum, diff --git a/apps/emqx_ft/test/emqx_ft_responder_SUITE.erl b/apps/emqx_ft/test/emqx_ft_responder_SUITE.erl deleted file mode 100644 index 751861206..000000000 --- a/apps/emqx_ft/test/emqx_ft_responder_SUITE.erl +++ /dev/null @@ -1,84 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_ft_responder_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("stdlib/include/assert.hrl"). - -all() -> emqx_common_test_helpers:all(?MODULE). - -init_per_suite(Config) -> - ok = emqx_common_test_helpers:start_apps([emqx_ft], emqx_ft_test_helpers:env_handler(Config)), - Config. - -end_per_suite(_Config) -> - ok = emqx_common_test_helpers:stop_apps([emqx_ft]), - ok. - -init_per_testcase(_Case, Config) -> - Config. - -end_per_testcase(_Case, _Config) -> - ok. - -t_start_ack(_Config) -> - Key = <<"test">>, - DefaultAction = fun({ack, Ref}) -> Ref end, - ?assertMatch( - {ok, _Pid}, - emqx_ft_responder:start(Key, DefaultAction, 1000) - ), - ?assertMatch( - {error, {already_started, _Pid}}, - emqx_ft_responder:start(Key, DefaultAction, 1000) - ), - Ref = make_ref(), - ?assertEqual( - Ref, - emqx_ft_responder:ack(Key, Ref) - ), - ?assertExit( - {noproc, _}, - emqx_ft_responder:ack(Key, Ref) - ). - -t_timeout(_Config) -> - Key = <<"test">>, - Self = self(), - DefaultAction = fun(timeout) -> Self ! {timeout, Key} end, - {ok, _Pid} = emqx_ft_responder:start(Key, DefaultAction, 20), - receive - {timeout, Key} -> - ok - after 100 -> - ct:fail("emqx_ft_responder not called") - end, - ?assertExit( - {noproc, _}, - emqx_ft_responder:ack(Key, oops) - ). - -t_unknown_msgs(_Config) -> - {ok, Pid} = emqx_ft_responder:start(make_ref(), fun(_) -> ok end, 100), - Pid ! {unknown_msg, <<"test">>}, - ok = gen_server:cast(Pid, {unknown_msg, <<"test">>}), - ?assertEqual( - {error, unknown_call}, - gen_server:call(Pid, {unknown_call, <<"test">>}) - ). From b8cacd28336521db8f50a87898391bbd91c48b8e Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Tue, 22 Aug 2023 22:34:06 +0300 Subject: [PATCH 2/6] chore(ft): add tests for async reply registry --- apps/emqx/test/emqx_connection_SUITE.erl | 2 +- apps/emqx_ft/src/emqx_ft_app.erl | 2 +- apps/emqx_ft/src/emqx_ft_async_reply.erl | 15 +- .../test/emqx_ft_async_reply_SUITE.erl | 247 ++++++++++++++++++ 4 files changed, 257 insertions(+), 9 deletions(-) create mode 100644 apps/emqx_ft/test/emqx_ft_async_reply_SUITE.erl diff --git a/apps/emqx/test/emqx_connection_SUITE.erl b/apps/emqx/test/emqx_connection_SUITE.erl index ea451eea5..2a96594e1 100644 --- a/apps/emqx/test/emqx_connection_SUITE.erl +++ b/apps/emqx/test/emqx_connection_SUITE.erl @@ -49,7 +49,7 @@ init_per_suite(Config) -> %% Meck Hooks ok = meck:new(emqx_hooks, [passthrough, no_history, no_link]), ok = meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end), - ok = meck:expect(emqx_hooks, run_fold, fun(_Hook, _Args, Acc) -> {ok, Acc} end), + ok = meck:expect(emqx_hooks, run_fold, fun(_Hook, _Args, Acc) -> Acc end), ok = meck:expect(emqx_channel, ensure_disconnected, fun(_, Channel) -> Channel end), diff --git a/apps/emqx_ft/src/emqx_ft_app.erl b/apps/emqx_ft/src/emqx_ft_app.erl index 9ef215bf9..114b4bff3 100644 --- a/apps/emqx_ft/src/emqx_ft_app.erl +++ b/apps/emqx_ft/src/emqx_ft_app.erl @@ -22,7 +22,7 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqx_ft_sup:start_link(), - ok = emqx_ft_async_reply:create_table(), + ok = emqx_ft_async_reply:create_tables(), ok = emqx_ft_conf:load(), {ok, Sup}. diff --git a/apps/emqx_ft/src/emqx_ft_async_reply.erl b/apps/emqx_ft/src/emqx_ft_async_reply.erl index 4eee2c544..f33558434 100644 --- a/apps/emqx_ft/src/emqx_ft_async_reply.erl +++ b/apps/emqx_ft/src/emqx_ft_async_reply.erl @@ -21,7 +21,8 @@ -include_lib("stdlib/include/ms_transform.hrl"). -export([ - create_tables/0 + create_tables/0, + info/0 ]). -export([ @@ -38,9 +39,9 @@ %% packets waiting for async workers --define(WORKER_TAB, emqx_ft_async_mons). --define(WORKER_KEY(MRef), ?WORKER_KEY(self(), MRef)). --define(WORKER_KEY(ChannelPid, MRef), {ChannelPid, MRef}). +-define(MON_TAB, emqx_ft_async_mons). +-define(MON_KEY(MRef), ?MON_KEY(self(), MRef)). +-define(MON_KEY(ChannelPid, MRef), {ChannelPid, MRef}). %% async worker monitors by packet ids @@ -54,14 +55,14 @@ -spec create_tables() -> ok. create_tables() -> - _ = ets:new(?WORKER_TAB, [named_table, public, ordered_set]), + _ = ets:new(?MON_TAB, [named_table, public, ordered_set]), _ = ets:new(?PACKET_TAB, [named_table, public, ordered_set]), ok. -spec register(packet_id(), mon_ref(), timer_ref()) -> ok. register(PacketId, MRef, TRef) -> _ = ets:insert(?PACKET_TAB, {?PACKET_KEY(PacketId), MRef}), - _ = ets:insert(?WORKER_TAB, {?WORKER_KEY(MRef), PacketId, TRef}), + _ = ets:insert(?MON_TAB, {?MON_KEY(MRef), PacketId, TRef}), ok. -spec with_new_packet(packet_id(), fun(() -> any()), any()) -> any(). @@ -73,7 +74,7 @@ with_new_packet(PacketId, Fun, Default) -> -spec take_by_mref(mon_ref()) -> {ok, packet_id(), timer_ref()} | not_found. take_by_mref(MRef) -> - case ets:take(?WORKER_TAB, ?WORKER_KEY(MRef)) of + case ets:take(?MON_TAB, ?MON_KEY(MRef)) of [{_, PacketId, TRef}] -> _ = ets:delete(?PACKET_TAB, ?PACKET_KEY(PacketId)), {ok, PacketId, TRef}; diff --git a/apps/emqx_ft/test/emqx_ft_async_reply_SUITE.erl b/apps/emqx_ft/test/emqx_ft_async_reply_SUITE.erl new file mode 100644 index 000000000..78a9b371c --- /dev/null +++ b/apps/emqx_ft/test/emqx_ft_async_reply_SUITE.erl @@ -0,0 +1,247 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_async_reply_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("stdlib/include/assert.hrl"). +-include_lib("emqx/include/asserts.hrl"). + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + Apps = emqx_cth_suite:start( + [ + {emqx, #{override_env => [{boot_modules, [broker, listeners]}]}}, + {emqx_ft, "file_transfer { enable = true, assemble_timeout = 1s }"} + ], + #{work_dir => ?config(priv_dir, Config)} + ), + [{suite_apps, Apps} | Config]. + +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(suite_apps, Config)), + ok. + +init_per_testcase(_Case, Config) -> + ok = snabbkaffe:start_trace(), + Config. + +end_per_testcase(_Case, _Config) -> + ok = snabbkaffe:stop(), + ok. + +%%-------------------------------------------------------------------- +%% Tests +%%-------------------------------------------------------------------- + +t_register(_Config) -> + PacketId = 1, + MRef = make_ref(), + TRef = make_ref(), + ok = emqx_ft_async_reply:register(PacketId, MRef, TRef), + + ?assertEqual( + undefined, + emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined) + ), + + ?assertEqual( + ok, + emqx_ft_async_reply:with_new_packet(2, fun() -> ok end, undefined) + ), + + ?assertEqual( + {ok, PacketId, TRef}, + emqx_ft_async_reply:take_by_mref(MRef) + ). + +t_process_independence(_Config) -> + PacketId = 1, + MRef = make_ref(), + TRef = make_ref(), + ok = emqx_ft_async_reply:register(PacketId, MRef, TRef), + + Self = self(), + + spawn_link(fun() -> + Self ! emqx_ft_async_reply:take_by_mref(MRef) + end), + + Res1 = + receive + Msg1 -> Msg1 + end, + + ?assertEqual( + not_found, + Res1 + ), + + spawn_link(fun() -> + Self ! emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined) + end), + + Res2 = + receive + Msg2 -> Msg2 + end, + + ?assertEqual( + ok, + Res2 + ). + +t_take(_Config) -> + PacketId = 1, + MRef = make_ref(), + TRef = make_ref(), + ok = emqx_ft_async_reply:register(PacketId, MRef, TRef), + + ?assertEqual( + {ok, PacketId, TRef}, + emqx_ft_async_reply:take_by_mref(MRef) + ), + + ?assertEqual( + not_found, + emqx_ft_async_reply:take_by_mref(MRef) + ), + + ?assertEqual( + ok, + emqx_ft_async_reply:with_new_packet(2, fun() -> ok end, undefined) + ). + +t_cleanup(_Config) -> + PacketId = 1, + MRef0 = make_ref(), + TRef0 = make_ref(), + MRef1 = make_ref(), + TRef1 = make_ref(), + ok = emqx_ft_async_reply:register(PacketId, MRef0, TRef0), + + Self = self(), + + Pid = spawn_link(fun() -> + ok = emqx_ft_async_reply:register(PacketId, MRef1, TRef1), + receive + kickoff -> + ?assertEqual( + undefined, + emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined) + ), + + ?assertEqual( + {ok, PacketId, TRef1}, + emqx_ft_async_reply:take_by_mref(MRef1) + ), + + Self ! done + end + end), + + ?assertEqual( + undefined, + emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined) + ), + + ok = emqx_ft_async_reply:deregister_all(Self), + + ?assertEqual( + ok, + emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined) + ), + + Pid ! kickoff, + + receive + done -> ok + end. + +t_reply_by_tiemout(_Config) -> + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + C = emqx_ft_test_helpers:start_client(ClientId, node()), + + SleepForever = fun() -> + Ref = make_ref(), + receive + Ref -> ok + end + end, + + ok = meck:new(emqx_ft_storage, [passthrough]), + meck:expect(emqx_ft_storage, assemble, fun(_, _, _) -> {async, spawn_link(SleepForever)} end), + + FinTopic = <<"$file/fakeid/fin/999999">>, + + ?assertMatch( + {ok, #{reason_code_name := unspecified_error}}, + emqtt:publish(C, FinTopic, <<>>, 1) + ), + + meck:unload(emqx_ft_storage), + emqtt:stop(C). + +t_cleanup_by_cm(_Config) -> + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + C = emqx_ft_test_helpers:start_client(ClientId, node()), + + ok = meck:new(emqx_ft_storage, [passthrough]), + meck:expect(emqx_ft_storage, kickoff, fun(_) -> meck:exception(error, oops) end), + + FinTopic = <<"$file/fakeid/fin/999999">>, + + [ClientPid] = emqx_cm:lookup_channels(ClientId), + + ?assertWaitEvent( + begin + emqtt:publish(C, FinTopic, <<>>, 1), + exit(ClientPid, kill) + end, + #{?snk_kind := emqx_cm_clean_down, client_id := ClientId}, + 1000 + ), + + ?assertEqual( + {0, 0}, + emqx_ft_async_reply:info() + ), + + meck:unload(emqx_ft_storage). + +t_unrelated_events(_Config) -> + process_flag(trap_exit, true), + ClientId = atom_to_binary(?FUNCTION_NAME), + C = emqx_ft_test_helpers:start_client(ClientId, node()), + [ClientPid] = emqx_cm:lookup_channels(ClientId), + + erlang:monitor(process, ClientPid), + + ClientPid ! {'DOWN', make_ref(), process, self(), normal}, + ClientPid ! {timeout, make_ref(), unknown_timer_event}, + + ?assertNotReceive( + {'DOWN', _Ref, process, ClientPid, _Reason}, + 500 + ), + + emqtt:stop(C). From 39a48179ea29d672e4f3e5864e850b3019bb14cb Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Wed, 23 Aug 2023 12:09:40 +0300 Subject: [PATCH 3/6] chore(emqx_channel): use macros for reply construction --- apps/emqx/include/emqx_channel.hrl | 5 +++++ apps/emqx/src/emqx_channel.erl | 13 +++++++------ apps/emqx_ft/src/emqx_ft.erl | 6 ++++-- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/apps/emqx/include/emqx_channel.hrl b/apps/emqx/include/emqx_channel.hrl index be2448a20..53abcafd6 100644 --- a/apps/emqx/include/emqx_channel.hrl +++ b/apps/emqx/include/emqx_channel.hrl @@ -41,4 +41,9 @@ will_msg ]). +-define(REPLY_OUTGOING(Packets), {outgoing, Packets}). +-define(REPLY_CONNACK(Packet), {connack, Packet}). +-define(REPLY_EVENT(StateOrEvent), {event, StateOrEvent}). +-define(REPLY_CLOSE(Reason), {close, Reason}). + -define(EXPIRE_INTERVAL_INFINITE, 4294967295000). diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 8d0a58767..53cac2400 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -122,6 +122,7 @@ -type reply() :: {outgoing, emqx_types:packet()} | {outgoing, [emqx_types:packet()]} + | {connack, emqx_types:packet()} | {event, conn_state() | updated} | {close, Reason :: atom()}. @@ -1023,7 +1024,7 @@ handle_out(publish, [], Channel) -> {ok, Channel}; handle_out(publish, Publishes, Channel) -> {Packets, NChannel} = do_deliver(Publishes, Channel), - {ok, {outgoing, Packets}, NChannel}; + {ok, ?REPLY_OUTGOING(Packets), NChannel}; handle_out(puback, {PacketId, ReasonCode}, Channel) -> {ok, ?PUBACK_PACKET(PacketId, ReasonCode), Channel}; handle_out(pubrec, {PacketId, ReasonCode}, Channel) -> @@ -1048,7 +1049,7 @@ handle_out(disconnect, {ReasonCode, ReasonName}, Channel) -> handle_out(disconnect, {ReasonCode, ReasonName, #{}}, Channel); handle_out(disconnect, {ReasonCode, ReasonName, Props}, Channel = ?IS_MQTT_V5) -> Packet = ?DISCONNECT_PACKET(ReasonCode, Props), - {ok, [{outgoing, Packet}, {close, ReasonName}], Channel}; + {ok, [?REPLY_OUTGOING(Packet), {close, ReasonName}], Channel}; handle_out(disconnect, {_ReasonCode, ReasonName, _Props}, Channel) -> {ok, {close, ReasonName}, Channel}; handle_out(auth, {ReasonCode, Properties}, Channel) -> @@ -1062,7 +1063,7 @@ handle_out(Type, Data, Channel) -> %%-------------------------------------------------------------------- return_connack(AckPacket, Channel) -> - Replies = [{event, connected}, {connack, AckPacket}], + Replies = [?REPLY_EVENT(connected), ?REPLY_CONNACK(AckPacket)], case maybe_resume_session(Channel) of ignore -> {ok, Replies, Channel}; @@ -1073,7 +1074,7 @@ return_connack(AckPacket, Channel) -> session = NSession }, {Packets, NChannel2} = do_deliver(Publishes, NChannel1), - Outgoing = [{outgoing, Packets} || length(Packets) > 0], + Outgoing = [?REPLY_OUTGOING(Packets) || length(Packets) > 0], {ok, Replies ++ Outgoing, NChannel2} end. @@ -1121,7 +1122,7 @@ do_deliver(Publishes, Channel) when is_list(Publishes) -> %%-------------------------------------------------------------------- return_sub_unsub_ack(Packet, Channel) -> - {ok, [{outgoing, Packet}, {event, updated}], Channel}. + {ok, [?REPLY_OUTGOING(Packet), ?REPLY_EVENT(updated)], Channel}. %%-------------------------------------------------------------------- %% Handle call @@ -1235,7 +1236,7 @@ handle_info( -> Channel1 = ensure_disconnected(Reason, maybe_publish_will_msg(Channel)), case maybe_shutdown(Reason, Channel1) of - {ok, Channel2} -> {ok, {event, disconnected}, Channel2}; + {ok, Channel2} -> {ok, ?REPLY_EVENT(disconnected), Channel2}; Shutdown -> Shutdown end; handle_info({sock_closed, Reason}, Channel = #channel{conn_state = disconnected}) -> diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index 521d5b10a..6a98c51f0 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -18,7 +18,9 @@ -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx_mqtt.hrl"). +-include_lib("emqx/include/emqx_channel.hrl"). -include_lib("emqx/include/emqx_hooks.hrl"). + -include_lib("snabbkaffe/include/trace.hrl"). -export([ @@ -164,7 +166,7 @@ on_channel_unregistered(ChannelPid) -> on_client_timeout(_TRef, ?FT_EVENT({MRef, PacketId}), Acc) -> _ = erlang:demonitor(MRef, [flush]), _ = emqx_ft_async_reply:take_by_mref(MRef), - {ok, [{outgoing, ?PUBACK_PACKET(PacketId, ?RC_UNSPECIFIED_ERROR)} | Acc]}; + {ok, [?REPLY_OUTGOING(?PUBACK_PACKET(PacketId, ?RC_UNSPECIFIED_ERROR)) | Acc]}; on_client_timeout(_TRef, _Event, Acc) -> {ok, Acc}. @@ -172,7 +174,7 @@ on_process_down(MRef, _Pid, Reason, Acc) -> case emqx_ft_async_reply:take_by_mref(MRef) of {ok, PacketId, TRef} -> _ = emqx_utils:cancel_timer(TRef), - {ok, [{outgoing, ?PUBACK_PACKET(PacketId, reason_to_rc(Reason))} | Acc]}; + {ok, [?REPLY_OUTGOING(?PUBACK_PACKET(PacketId, reason_to_rc(Reason))) | Acc]}; not_found -> {ok, Acc} end. From 4488e9e591592a27dd95797ba43497d0cf2d5b12 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Wed, 23 Aug 2023 12:13:13 +0300 Subject: [PATCH 4/6] chore(ft): stop hook chain when doing actual handling --- apps/emqx_ft/src/emqx_ft.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index 6a98c51f0..41020e76f 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -166,7 +166,7 @@ on_channel_unregistered(ChannelPid) -> on_client_timeout(_TRef, ?FT_EVENT({MRef, PacketId}), Acc) -> _ = erlang:demonitor(MRef, [flush]), _ = emqx_ft_async_reply:take_by_mref(MRef), - {ok, [?REPLY_OUTGOING(?PUBACK_PACKET(PacketId, ?RC_UNSPECIFIED_ERROR)) | Acc]}; + {stop, [?REPLY_OUTGOING(?PUBACK_PACKET(PacketId, ?RC_UNSPECIFIED_ERROR)) | Acc]}; on_client_timeout(_TRef, _Event, Acc) -> {ok, Acc}. @@ -174,7 +174,7 @@ on_process_down(MRef, _Pid, Reason, Acc) -> case emqx_ft_async_reply:take_by_mref(MRef) of {ok, PacketId, TRef} -> _ = emqx_utils:cancel_timer(TRef), - {ok, [?REPLY_OUTGOING(?PUBACK_PACKET(PacketId, reason_to_rc(Reason))) | Acc]}; + {stop, [?REPLY_OUTGOING(?PUBACK_PACKET(PacketId, reason_to_rc(Reason))) | Acc]}; not_found -> {ok, Acc} end. From 279895b8fd9482da249db6fa95a40a3a329982f6 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Mon, 28 Aug 2023 22:45:05 +0300 Subject: [PATCH 5/6] chore(ft): add read/write concurrency to emqx_ft_async_reply tabs --- apps/emqx_ft/src/emqx_ft_async_reply.erl | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/apps/emqx_ft/src/emqx_ft_async_reply.erl b/apps/emqx_ft/src/emqx_ft_async_reply.erl index f33558434..7ac4c527f 100644 --- a/apps/emqx_ft/src/emqx_ft_async_reply.erl +++ b/apps/emqx_ft/src/emqx_ft_async_reply.erl @@ -55,8 +55,15 @@ -spec create_tables() -> ok. create_tables() -> - _ = ets:new(?MON_TAB, [named_table, public, ordered_set]), - _ = ets:new(?PACKET_TAB, [named_table, public, ordered_set]), + EtsOptions = [ + named_table, + public, + ordered_set, + {read_concurrency, true}, + {write_concurrency, true} + ], + _ = ets:new(?MON_TAB, EtsOptions), + _ = ets:new(?PACKET_TAB, EtsOptions), ok. -spec register(packet_id(), mon_ref(), timer_ref()) -> ok. From 54ac4a85277a4255c562785b6c7c58ad054bd435 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Tue, 29 Aug 2023 23:21:36 +0300 Subject: [PATCH 6/6] chore(ft): tidy up the code according to the review --- apps/emqx/src/emqx_channel.erl | 2 +- apps/emqx_ft/src/emqx_ft_app.erl | 1 - apps/emqx_ft/src/emqx_ft_async_reply.erl | 4 ++-- apps/emqx_ft/src/emqx_ft_sup.erl | 2 ++ 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 53cac2400..d5941c8e9 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -1049,7 +1049,7 @@ handle_out(disconnect, {ReasonCode, ReasonName}, Channel) -> handle_out(disconnect, {ReasonCode, ReasonName, #{}}, Channel); handle_out(disconnect, {ReasonCode, ReasonName, Props}, Channel = ?IS_MQTT_V5) -> Packet = ?DISCONNECT_PACKET(ReasonCode, Props), - {ok, [?REPLY_OUTGOING(Packet), {close, ReasonName}], Channel}; + {ok, [?REPLY_OUTGOING(Packet), ?REPLY_CLOSE(ReasonName)], Channel}; handle_out(disconnect, {_ReasonCode, ReasonName, _Props}, Channel) -> {ok, {close, ReasonName}, Channel}; handle_out(auth, {ReasonCode, Properties}, Channel) -> diff --git a/apps/emqx_ft/src/emqx_ft_app.erl b/apps/emqx_ft/src/emqx_ft_app.erl index 114b4bff3..43a4cc816 100644 --- a/apps/emqx_ft/src/emqx_ft_app.erl +++ b/apps/emqx_ft/src/emqx_ft_app.erl @@ -22,7 +22,6 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqx_ft_sup:start_link(), - ok = emqx_ft_async_reply:create_tables(), ok = emqx_ft_conf:load(), {ok, Sup}. diff --git a/apps/emqx_ft/src/emqx_ft_async_reply.erl b/apps/emqx_ft/src/emqx_ft_async_reply.erl index 7ac4c527f..501f91629 100644 --- a/apps/emqx_ft/src/emqx_ft_async_reply.erl +++ b/apps/emqx_ft/src/emqx_ft_async_reply.erl @@ -62,8 +62,8 @@ create_tables() -> {read_concurrency, true}, {write_concurrency, true} ], - _ = ets:new(?MON_TAB, EtsOptions), - _ = ets:new(?PACKET_TAB, EtsOptions), + ok = emqx_utils_ets:new(?MON_TAB, EtsOptions), + ok = emqx_utils_ets:new(?PACKET_TAB, EtsOptions), ok. -spec register(packet_id(), mon_ref(), timer_ref()) -> ok. diff --git a/apps/emqx_ft/src/emqx_ft_sup.erl b/apps/emqx_ft/src/emqx_ft_sup.erl index 512d534c3..6d3936cf6 100644 --- a/apps/emqx_ft/src/emqx_ft_sup.erl +++ b/apps/emqx_ft/src/emqx_ft_sup.erl @@ -28,6 +28,8 @@ start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []). init([]) -> + ok = emqx_ft_async_reply:create_tables(), + SupFlags = #{ strategy => one_for_one, intensity => 100,