feat(ft): improve robustness of asynchronous acks

* add auto ack after timeout
* add fin file transfer packet registration to avoid
duplication and multiple acks
This commit is contained in:
Ilya Averyanov 2023-02-01 23:31:52 +02:00
parent 8f544041e4
commit 3967c9c5b2
3 changed files with 191 additions and 17 deletions

View File

@ -39,6 +39,8 @@
%% For Debug
-export([transfer/2, storage/0]).
-export([on_assemble_timeout/1]).
-export_type([clientid/0]).
-export_type([transfer/0]).
-export_type([offset/0]).
@ -62,6 +64,8 @@
ft_data :: ft_data()
}).
-define(ASSEMBLE_TIMEOUT, 5000).
%%--------------------------------------------------------------------
%% API for app
%%--------------------------------------------------------------------
@ -210,23 +214,66 @@ on_fin(PacketId, Msg, FileId, Checksum) ->
checksum => Checksum,
packet_id => PacketId
}),
% %% TODO: handle checksum? Do we need it?
% {ok, _} = emqx_ft_storage_fs:assemble(
% storage(),
% transfer(Msg, FileId),
% callback(FileId, Msg)
% ),
Callback = callback(FileId, PacketId),
spawn(fun() -> Callback({error, not_implemented}) end),
undefined.
%% TODO: handle checksum? Do we need it?
FinPacketKey = {self(), PacketId},
_ =
case
emqx_ft_responder:register(
FinPacketKey, fun ?MODULE:on_assemble_timeout/1, ?ASSEMBLE_TIMEOUT
)
of
%% We have new fin packet
ok ->
Callback = callback(FinPacketKey, FileId),
case assemble(transfer(Msg, FileId), Callback) of
%% Assembling started, packet will be acked by the callback or the responder
ok ->
undefined;
%% Assembling failed, unregister the packet key
{error, _} ->
case emqx_ft_responder:unregister(FinPacketKey) of
%% We successfully unregistered the packet key,
%% so we can send the error code at once
ok ->
?RC_UNSPECIFIED_ERROR;
%% Someone else already unregistered the key,
%% that is, either responder or someone else acked the packet,
%% we do not have to ack
{error, not_found} ->
undefined
end
end;
%% Fin packet already received.
%% Since we are still handling the previous one,
%% we probably have retransmit here
{error, already_registered} ->
undefined
end.
callback(_FileId, PacketId) ->
ChanPid = self(),
fun
(ok) ->
assemble(_Transfer, _Callback) ->
% spawn(fun() -> Callback({error, not_implemented}) end),
ok.
% assemble(Transfer, Callback) ->
% emqx_ft_storage_fs:assemble(
% storage(),
% Transfer,
% Callback
% ).
callback({ChanPid, PacketId} = Key, _FileId) ->
fun(Result) ->
case emqx_ft_responder:unregister(Key) of
ok ->
case Result of
{ok, _} ->
erlang:send(ChanPid, {puback, PacketId, [], ?RC_SUCCESS});
({error, _}) ->
{error, _} ->
erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR})
end;
{error, not_registered} ->
ok
end
end.
transfer(Msg, FileId) ->
@ -236,3 +283,7 @@ transfer(Msg, FileId) ->
%% TODO: configure
storage() ->
filename:join(emqx:data_dir(), "file_transfer").
on_assemble_timeout({ChanPid, PacketId}) ->
?SLOG(warning, #{msg => "on_assemble_timeout", packet_id => PacketId}),
erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR}).

View File

@ -0,0 +1,114 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_responder).
-behaviour(gen_server).
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/types.hrl").
-export([start_link/0]).
-export([
register/3,
unregister/1
]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-define(TAB, ?MODULE).
-type key() :: term().
%%--------------------------------------------------------------------
%% API
%% -------------------------------------------------------------------
-spec start_link() -> startlink_ret().
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-spec register(Key, DefaultAction, Timeout) -> ok | {error, already_registered} when
Key :: key(),
DefaultAction :: fun((Key) -> any()),
Timeout :: timeout().
register(Key, DefaultAction, Timeout) ->
case ets:lookup(?TAB, Key) of
[] ->
gen_server:call(?SERVER, {register, Key, DefaultAction, Timeout});
[{Key, _Action, _Ref}] ->
{error, already_registered}
end.
-spec unregister(Key) -> ok | {error, not_found} when
Key :: key().
unregister(Key) ->
gen_server:call(?SERVER, {unregister, Key}).
%%--------------------------------------------------------------------
%% gen_server callbacks
%% -------------------------------------------------------------------
init([]) ->
_ = ets:new(?TAB, [named_table, protected, set, {read_concurrency, true}]),
{ok, #{}}.
handle_call({register, Key, DefaultAction, Timeout}, _From, State) ->
?SLOG(warning, #{msg => "register", key => Key, timeout => Timeout}),
case ets:lookup(?TAB, Key) of
[] ->
TRef = erlang:start_timer(Timeout, self(), {timeout, Key}),
true = ets:insert(?TAB, {Key, DefaultAction, TRef}),
{reply, ok, State};
[{_, _Action, _Ref}] ->
{reply, {error, already_registered}, State}
end;
handle_call({unregister, Key}, _From, State) ->
case ets:lookup(?TAB, Key) of
[] ->
{reply, {error, not_found}, State};
[{_, _Action, TRef}] ->
_ = erlang:cancel_timer(TRef),
true = ets:delete(?TAB, Key),
{reply, ok, State}
end.
handle_cast(Msg, State) ->
?SLOG(warning, #{msg => "unknown cast", cast_msg => Msg}),
{noreply, State}.
handle_info({timeout, TRef, {timeout, Key}}, State) ->
case ets:lookup(?TAB, Key) of
[] ->
{noreply, State};
[{_, Action, TRef}] ->
_ = erlang:cancel_timer(TRef),
true = ets:delete(?TAB, Key),
%% TODO: safe apply
_ = Action(Key),
{noreply, State}
end;
handle_info(Msg, State) ->
?SLOG(warning, #{msg => "unknown message", info_msg => Msg}),
{noreply, State}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
terminate(_Reason, _State) ->
ok.

View File

@ -53,7 +53,16 @@ init([]) ->
modules => [emqx_ft_assembler_sup]
},
ChildSpecs = [AssemblerSup],
Responder = #{
id => emqx_ft_responder,
start => {emqx_ft_responder, start_link, []},
restart => permanent,
shutdown => infinity,
type => worker,
modules => [emqx_ft_responder]
},
ChildSpecs = [Responder, AssemblerSup],
{ok, {SupFlags, ChildSpecs}}.
%% internal functions