diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index 2704e1e55..ac419cb36 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -35,7 +35,7 @@ decode_filemeta/1 ]). --export([on_assemble_timeout/1]). +-export([on_assemble/2]). -export_type([ clientid/0, @@ -227,45 +227,25 @@ on_fin(PacketId, Msg, FileId, Checksum) -> }), %% TODO: handle checksum? Do we need it? FinPacketKey = {self(), PacketId}, - _ = - case - emqx_ft_responder:register( - FinPacketKey, fun ?MODULE:on_assemble_timeout/1, ?ASSEMBLE_TIMEOUT - ) - of - %% We have new fin packet - ok -> - Callback = callback(FinPacketKey, FileId), - case assemble(transfer(Msg, FileId), Callback) of - %% Assembling started, packet will be acked by the callback or the responder - {ok, _} -> - undefined; - %% Assembling failed, unregister the packet key - {error, Reason} -> - ?SLOG(warning, #{ - msg => "assemble_not_started", - mqtt_msg => Msg, - file_id => FileId, - reason => Reason - }), - case emqx_ft_responder:unregister(FinPacketKey) of - %% We successfully unregistered the packet key, - %% so we can send the error code at once - ok -> - ?RC_UNSPECIFIED_ERROR; - %% Someone else already unregistered the key, - %% that is, either responder or someone else acked the packet, - %% we do not have to ack - {error, not_found} -> - undefined - end - end; - %% Fin packet already received. - %% Since we are still handling the previous one, - %% we probably have retransmit here - {error, already_registered} -> - undefined - end. + case emqx_ft_responder:start(FinPacketKey, fun ?MODULE:on_assemble/2, ?ASSEMBLE_TIMEOUT) of + %% We have new fin packet + {ok, _} -> + Callback = fun(Result) -> emqx_ft_responder:ack(FinPacketKey, Result) end, + case assemble(transfer(Msg, FileId), Callback) of + %% Assembling started, packet will be acked by the callback or the responder + {ok, _} -> + ok; + %% Assembling failed, ack through the responder + {error, _} = Error -> + emqx_ft_responder:ack(FinPacketKey, Error) + end; + %% Fin packet already received. + %% Since we are still handling the previous one, + %% we probably have retransmit here + {error, {already_started, _}} -> + ok + end, + undefined. assemble(Transfer, Callback) -> try @@ -278,28 +258,20 @@ assemble(Transfer, Callback) -> {error, {internal_error, E}} end. -callback({ChanPid, PacketId} = Key, _FileId) -> - fun(Result) -> - case emqx_ft_responder:unregister(Key) of - ok -> - case Result of - ok -> - erlang:send(ChanPid, {puback, PacketId, [], ?RC_SUCCESS}); - {error, _} -> - erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR}) - end; - {error, not_found} -> - ok - end - end. - transfer(Msg, FileId) -> ClientId = Msg#message.from, {ClientId, FileId}. -on_assemble_timeout({ChanPid, PacketId}) -> - ?SLOG(warning, #{msg => "on_assemble_timeout", packet_id => PacketId}), - erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR}). +on_assemble({ChanPid, PacketId}, Result) -> + ?SLOG(debug, #{msg => "on_assemble", packet_id => PacketId, result => Result}), + case Result of + {ack, ok} -> + erlang:send(ChanPid, {puback, PacketId, [], ?RC_SUCCESS}); + {ack, {error, _}} -> + erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR}); + timeout -> + erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR}) + end. validate(Validations, Fun) -> case do_validate(Validations, []) of diff --git a/apps/emqx_ft/src/emqx_ft_responder.erl b/apps/emqx_ft/src/emqx_ft_responder.erl index 8417053f6..7b9220774 100644 --- a/apps/emqx_ft/src/emqx_ft_responder.erl +++ b/apps/emqx_ft/src/emqx_ft_responder.erl @@ -23,73 +23,50 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). --export([start_link/0]). +%% API +-export([start/3]). +-export([ack/2]). --export([ - register/3, - unregister/1 -]). +%% Supervisor API +-export([start_link/3]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). --define(SERVER, ?MODULE). --define(TAB, ?MODULE). +-define(REF(Key), {via, gproc, {n, l, {?MODULE, Key}}}). -type key() :: term(). +-type respfun() :: fun(({ack, _Result} | timeout) -> _SideEffect). %%-------------------------------------------------------------------- %% API %% ------------------------------------------------------------------- --spec start_link() -> startlink_ret(). -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +-spec start(key(), timeout(), respfun()) -> startlink_ret(). +start(Key, RespFun, Timeout) -> + emqx_ft_responder_sup:start_child(Key, RespFun, Timeout). --spec register(Key, DefaultAction, Timeout) -> ok | {error, already_registered} when - Key :: key(), - DefaultAction :: fun((Key) -> any()), - Timeout :: timeout(). -register(Key, DefaultAction, Timeout) -> - case ets:lookup(?TAB, Key) of - [] -> - gen_server:call(?SERVER, {register, Key, DefaultAction, Timeout}); - [{Key, _Action, _Ref}] -> - {error, already_registered} - end. +-spec ack(key(), _Result) -> _Return. +ack(Key, Result) -> + % TODO: it's possible to avoid term copy + gen_server:call(?REF(Key), {ack, Result}, infinity). --spec unregister(Key) -> ok | {error, not_found} when - Key :: key(). -unregister(Key) -> - gen_server:call(?SERVER, {unregister, Key}). +-spec start_link(key(), timeout(), respfun()) -> startlink_ret(). +start_link(Key, RespFun, Timeout) -> + gen_server:start_link(?REF(Key), ?MODULE, {Key, RespFun, Timeout}, []). %%-------------------------------------------------------------------- %% gen_server callbacks %% ------------------------------------------------------------------- -init([]) -> - _ = ets:new(?TAB, [named_table, protected, set, {read_concurrency, true}]), - {ok, #{}}. +init({Key, RespFun, Timeout}) -> + _ = erlang:process_flag(trap_exit, true), + _TRef = erlang:send_after(Timeout, self(), timeout), + {ok, {Key, RespFun}}. -handle_call({register, Key, DefaultAction, Timeout}, _From, State) -> - ?SLOG(warning, #{msg => "register", key => Key, timeout => Timeout}), - case ets:lookup(?TAB, Key) of - [] -> - TRef = erlang:start_timer(Timeout, self(), {timeout, Key}), - true = ets:insert(?TAB, {Key, DefaultAction, TRef}), - {reply, ok, State}; - [{_, _Action, _Ref}] -> - {reply, {error, already_registered}, State} - end; -handle_call({unregister, Key}, _From, State) -> - ?SLOG(warning, #{msg => "unregister", key => Key}), - case ets:lookup(?TAB, Key) of - [] -> - {reply, {error, not_found}, State}; - [{_, _Action, TRef}] -> - _ = erlang:cancel_timer(TRef), - true = ets:delete(?TAB, Key), - {reply, ok, State} - end; +handle_call({ack, Result}, _From, {Key, RespFun}) -> + Ret = apply(RespFun, [Key, {ack, Result}]), + ?tp(ft_responder_ack, #{key => Key, result => Result, return => Ret}), + {stop, {shutdown, Ret}, Ret, undefined}; handle_call(Msg, _From, State) -> ?SLOG(warning, #{msg => "unknown_call", call_msg => Msg}), {reply, {error, unknown_call}, State}. @@ -98,40 +75,17 @@ handle_cast(Msg, State) -> ?SLOG(warning, #{msg => "unknown_cast", cast_msg => Msg}), {noreply, State}. -handle_info({timeout, TRef, {timeout, Key}}, State) -> - case ets:lookup(?TAB, Key) of - [] -> - {noreply, State}; - [{_, Action, TRef}] -> - _ = erlang:cancel_timer(TRef), - true = ets:delete(?TAB, Key), - ok = safe_apply(Action, [Key]), - ?tp(ft_timeout_action_applied, #{key => Key}), - {noreply, State} - end; +handle_info(timeout, {Key, RespFun}) -> + Ret = apply(RespFun, [Key, timeout]), + ?tp(ft_responder_timeout, #{key => Key, return => Ret}), + {stop, {shutdown, Ret}, undefined}; handle_info(Msg, State) -> ?SLOG(warning, #{msg => "unknown_message", info_msg => Msg}), {noreply, State}. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -terminate(_Reason, _State) -> +terminate(_Reason, undefined) -> + ok; +terminate(Reason, {Key, RespFun}) -> + Ret = apply(RespFun, [Key, timeout]), + ?tp(ft_responder_shutdown, #{key => Key, reason => Reason, return => Ret}), ok. - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -safe_apply(Fun, Args) -> - try apply(Fun, Args) of - _ -> ok - catch - Class:Reason:Stacktrace -> - ?SLOG(error, #{ - msg => "safe_apply_failed", - class => Class, - reason => Reason, - stacktrace => Stacktrace - }) - end. diff --git a/apps/emqx_ft/src/emqx_ft_responder_sup.erl b/apps/emqx_ft/src/emqx_ft_responder_sup.erl new file mode 100644 index 000000000..23d4f55fa --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_responder_sup.erl @@ -0,0 +1,48 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_responder_sup). + +-export([start_link/0]). +-export([start_child/3]). + +-behaviour(supervisor). +-export([init/1]). + +-define(SUPERVISOR, ?MODULE). + +%% + +-spec start_link() -> {ok, pid()}. +start_link() -> + supervisor:start_link({local, ?SUPERVISOR}, ?MODULE, []). + +start_child(Key, RespFun, Timeout) -> + supervisor:start_child(?SUPERVISOR, [Key, RespFun, Timeout]). + +-spec init(_) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}. +init(_) -> + Flags = #{ + strategy => simple_one_for_one, + intensity => 100, + period => 100 + }, + ChildSpec = #{ + id => responder, + start => {emqx_ft_responder, start_link, []}, + restart => temporary + }, + {ok, {Flags, [ChildSpec]}}. diff --git a/apps/emqx_ft/src/emqx_ft_sup.erl b/apps/emqx_ft/src/emqx_ft_sup.erl index dfdde3a8a..8d388814c 100644 --- a/apps/emqx_ft/src/emqx_ft_sup.erl +++ b/apps/emqx_ft/src/emqx_ft_sup.erl @@ -53,12 +53,12 @@ init([]) -> }, Responder = #{ - id => emqx_ft_responder, - start => {emqx_ft_responder, start_link, []}, + id => emqx_ft_responder_sup, + start => {emqx_ft_responder_sup, start_link, []}, restart => permanent, shutdown => infinity, type => worker, - modules => [emqx_ft_responder] + modules => [emqx_ft_responder_sup] }, ChildSpecs = [Responder, AssemblerSup, FileReaderSup], diff --git a/apps/emqx_ft/test/emqx_ft_responder_SUITE.erl b/apps/emqx_ft/test/emqx_ft_responder_SUITE.erl index 9098edcf6..447d41f11 100644 --- a/apps/emqx_ft/test/emqx_ft_responder_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_responder_SUITE.erl @@ -19,7 +19,6 @@ -compile(export_all). -compile(nowarn_export_all). --include_lib("common_test/include/ct.hrl"). -include_lib("stdlib/include/assert.hrl"). -include_lib("emqx/include/asserts.hrl"). @@ -39,58 +38,58 @@ init_per_testcase(_Case, Config) -> end_per_testcase(_Case, _Config) -> ok. -t_register_unregister(_Config) -> +t_start_ack(_Config) -> Key = <<"test">>, - DefaultAction = fun(_) -> ok end, - ?assertEqual( - ok, - emqx_ft_responder:register(Key, DefaultAction, 1000) + DefaultAction = fun(_Key, {ack, Ref}) -> Ref end, + ?assertMatch( + {ok, _Pid}, + emqx_ft_responder:start(Key, DefaultAction, 1000) ), - ?assertEqual( - {error, already_registered}, - emqx_ft_responder:register(Key, DefaultAction, 1000) + ?assertMatch( + {error, {already_started, _Pid}}, + emqx_ft_responder:start(Key, DefaultAction, 1000) ), + Ref = make_ref(), ?assertEqual( - ok, - emqx_ft_responder:unregister(Key) + Ref, + emqx_ft_responder:ack(Key, Ref) ), - ?assertEqual( - {error, not_found}, - emqx_ft_responder:unregister(Key) + ?assertExit( + {noproc, _}, + emqx_ft_responder:ack(Key, Ref) ). t_timeout(_Config) -> Key = <<"test">>, Self = self(), - DefaultAction = fun(K) -> Self ! {timeout, K} end, - ok = emqx_ft_responder:register(Key, DefaultAction, 20), + DefaultAction = fun(K, timeout) -> Self ! {timeout, K} end, + {ok, _Pid} = emqx_ft_responder:start(Key, DefaultAction, 20), receive {timeout, Key} -> ok after 100 -> ct:fail("emqx_ft_responder not called") end, - ?assertEqual( - {error, not_found}, - emqx_ft_responder:unregister(Key) + ?assertExit( + {noproc, _}, + emqx_ft_responder:ack(Key, oops) ). -t_action_exception(_Config) -> - Key = <<"test">>, - DefaultAction = fun(K) -> error({oops, K}) end, - - ?assertWaitEvent( - emqx_ft_responder:register(Key, DefaultAction, 10), - #{?snk_kind := ft_timeout_action_applied, key := <<"test">>}, - 1000 - ), - ?assertEqual( - {error, not_found}, - emqx_ft_responder:unregister(Key) - ). +% t_action_exception(_Config) -> +% Key = <<"test">>, +% DefaultAction = fun(K) -> error({oops, K}) end, +% ?assertWaitEvent( +% emqx_ft_responder:start(Key, DefaultAction, 10), +% #{?snk_kind := ft_timeout_action_applied, key := <<"test">>}, +% 1000 +% ), +% ?assertEqual( +% {error, not_found}, +% emqx_ft_responder:ack(Key, oops) +% ). t_unknown_msgs(_Config) -> - Pid = whereis(emqx_ft_responder), + {ok, Pid} = emqx_ft_responder:start(make_ref(), fun(_, _) -> ok end, 100), Pid ! {unknown_msg, <<"test">>}, ok = gen_server:cast(Pid, {unknown_msg, <<"test">>}), ?assertEqual(