diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index c73d31559..014c74ac3 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -39,6 +39,8 @@ %% For Debug -export([transfer/2, storage/0]). +-export([on_assemble_timeout/1]). + -export_type([clientid/0]). -export_type([transfer/0]). -export_type([offset/0]). @@ -62,6 +64,8 @@ ft_data :: ft_data() }). +-define(ASSEMBLE_TIMEOUT, 5000). + %%-------------------------------------------------------------------- %% API for app %%-------------------------------------------------------------------- @@ -210,23 +214,66 @@ on_fin(PacketId, Msg, FileId, Checksum) -> checksum => Checksum, packet_id => PacketId }), - % %% TODO: handle checksum? Do we need it? - % {ok, _} = emqx_ft_storage_fs:assemble( - % storage(), - % transfer(Msg, FileId), - % callback(FileId, Msg) - % ), - Callback = callback(FileId, PacketId), - spawn(fun() -> Callback({error, not_implemented}) end), - undefined. + %% TODO: handle checksum? Do we need it? + FinPacketKey = {self(), PacketId}, + _ = + case + emqx_ft_responder:register( + FinPacketKey, fun ?MODULE:on_assemble_timeout/1, ?ASSEMBLE_TIMEOUT + ) + of + %% We have new fin packet + ok -> + Callback = callback(FinPacketKey, FileId), + case assemble(transfer(Msg, FileId), Callback) of + %% Assembling started, packet will be acked by the callback or the responder + ok -> + undefined; + %% Assembling failed, unregister the packet key + {error, _} -> + case emqx_ft_responder:unregister(FinPacketKey) of + %% We successfully unregistered the packet key, + %% so we can send the error code at once + ok -> + ?RC_UNSPECIFIED_ERROR; + %% Someone else already unregistered the key, + %% that is, either responder or someone else acked the packet, + %% we do not have to ack + {error, not_found} -> + undefined + end + end; + %% Fin packet already received. + %% Since we are still handling the previous one, + %% we probably have retransmit here + {error, already_registered} -> + undefined + end. -callback(_FileId, PacketId) -> - ChanPid = self(), - fun - (ok) -> - erlang:send(ChanPid, {puback, PacketId, [], ?RC_SUCCESS}); - ({error, _}) -> - erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR}) +assemble(_Transfer, _Callback) -> + % spawn(fun() -> Callback({error, not_implemented}) end), + ok. + +% assemble(Transfer, Callback) -> +% emqx_ft_storage_fs:assemble( +% storage(), +% Transfer, +% Callback +% ). + +callback({ChanPid, PacketId} = Key, _FileId) -> + fun(Result) -> + case emqx_ft_responder:unregister(Key) of + ok -> + case Result of + {ok, _} -> + erlang:send(ChanPid, {puback, PacketId, [], ?RC_SUCCESS}); + {error, _} -> + erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR}) + end; + {error, not_registered} -> + ok + end end. transfer(Msg, FileId) -> @@ -236,3 +283,7 @@ transfer(Msg, FileId) -> %% TODO: configure storage() -> filename:join(emqx:data_dir(), "file_transfer"). + +on_assemble_timeout({ChanPid, PacketId}) -> + ?SLOG(warning, #{msg => "on_assemble_timeout", packet_id => PacketId}), + erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR}). diff --git a/apps/emqx_ft/src/emqx_ft_responder.erl b/apps/emqx_ft/src/emqx_ft_responder.erl new file mode 100644 index 000000000..dcb45d5d3 --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_responder.erl @@ -0,0 +1,114 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_responder). + +-behaviour(gen_server). + +-include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/types.hrl"). + +-export([start_link/0]). + +-export([ + register/3, + unregister/1 +]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). +-define(TAB, ?MODULE). + +-type key() :: term(). + +%%-------------------------------------------------------------------- +%% API +%% ------------------------------------------------------------------- + +-spec start_link() -> startlink_ret(). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +-spec register(Key, DefaultAction, Timeout) -> ok | {error, already_registered} when + Key :: key(), + DefaultAction :: fun((Key) -> any()), + Timeout :: timeout(). +register(Key, DefaultAction, Timeout) -> + case ets:lookup(?TAB, Key) of + [] -> + gen_server:call(?SERVER, {register, Key, DefaultAction, Timeout}); + [{Key, _Action, _Ref}] -> + {error, already_registered} + end. + +-spec unregister(Key) -> ok | {error, not_found} when + Key :: key(). +unregister(Key) -> + gen_server:call(?SERVER, {unregister, Key}). + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%% ------------------------------------------------------------------- + +init([]) -> + _ = ets:new(?TAB, [named_table, protected, set, {read_concurrency, true}]), + {ok, #{}}. + +handle_call({register, Key, DefaultAction, Timeout}, _From, State) -> + ?SLOG(warning, #{msg => "register", key => Key, timeout => Timeout}), + case ets:lookup(?TAB, Key) of + [] -> + TRef = erlang:start_timer(Timeout, self(), {timeout, Key}), + true = ets:insert(?TAB, {Key, DefaultAction, TRef}), + {reply, ok, State}; + [{_, _Action, _Ref}] -> + {reply, {error, already_registered}, State} + end; +handle_call({unregister, Key}, _From, State) -> + case ets:lookup(?TAB, Key) of + [] -> + {reply, {error, not_found}, State}; + [{_, _Action, TRef}] -> + _ = erlang:cancel_timer(TRef), + true = ets:delete(?TAB, Key), + {reply, ok, State} + end. + +handle_cast(Msg, State) -> + ?SLOG(warning, #{msg => "unknown cast", cast_msg => Msg}), + {noreply, State}. + +handle_info({timeout, TRef, {timeout, Key}}, State) -> + case ets:lookup(?TAB, Key) of + [] -> + {noreply, State}; + [{_, Action, TRef}] -> + _ = erlang:cancel_timer(TRef), + true = ets:delete(?TAB, Key), + %% TODO: safe apply + _ = Action(Key), + {noreply, State} + end; +handle_info(Msg, State) -> + ?SLOG(warning, #{msg => "unknown message", info_msg => Msg}), + {noreply, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, _State) -> + ok. diff --git a/apps/emqx_ft/src/emqx_ft_sup.erl b/apps/emqx_ft/src/emqx_ft_sup.erl index fb7a6104f..ef2d8033f 100644 --- a/apps/emqx_ft/src/emqx_ft_sup.erl +++ b/apps/emqx_ft/src/emqx_ft_sup.erl @@ -53,7 +53,16 @@ init([]) -> modules => [emqx_ft_assembler_sup] }, - ChildSpecs = [AssemblerSup], + Responder = #{ + id => emqx_ft_responder, + start => {emqx_ft_responder, start_link, []}, + restart => permanent, + shutdown => infinity, + type => worker, + modules => [emqx_ft_responder] + }, + + ChildSpecs = [Responder, AssemblerSup], {ok, {SupFlags, ChildSpecs}}. %% internal functions