diff --git a/apps/emqx_ft/src/emqx_ft.app.src b/apps/emqx_ft/src/emqx_ft.app.src index 1b1ee05ec..f95cf64e3 100644 --- a/apps/emqx_ft/src/emqx_ft.app.src +++ b/apps/emqx_ft/src/emqx_ft.app.src @@ -1,6 +1,6 @@ {application, emqx_ft, [ {description, "EMQX file transfer over MQTT"}, - {vsn, "0.1.14"}, + {vsn, "0.1.15"}, {registered, []}, {mod, {emqx_ft_app, []}}, {applications, [ diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index 877d8bdac..9bcee3dfb 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -30,10 +30,7 @@ -export([ on_message_publish/1, - on_message_puback/4, - on_client_timeout/3, - on_process_down/4, - on_channel_unregistered/1 + on_message_puback/4 ]). -export([ @@ -88,8 +85,6 @@ checksum => checksum() }. --define(FT_EVENT(EVENT), {?MODULE, EVENT}). - -define(ACK_AND_PUBLISH(Result), {true, Result}). -define(ACK(Result), {false, Result}). -define(DELAY_ACK, delay). @@ -100,21 +95,11 @@ hook() -> ok = emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_LOWEST), - ok = emqx_hooks:put('message.puback', {?MODULE, on_message_puback, []}, ?HP_LOWEST), - ok = emqx_hooks:put('client.timeout', {?MODULE, on_client_timeout, []}, ?HP_LOWEST), - ok = emqx_hooks:put( - 'client.monitored_process_down', {?MODULE, on_process_down, []}, ?HP_LOWEST - ), - ok = emqx_hooks:put( - 'cm.channel.unregistered', {?MODULE, on_channel_unregistered, []}, ?HP_LOWEST - ). + ok = emqx_hooks:put('message.puback', {?MODULE, on_message_puback, []}, ?HP_LOWEST). unhook() -> ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}), - ok = emqx_hooks:del('message.puback', {?MODULE, on_message_puback}), - ok = emqx_hooks:del('client.timeout', {?MODULE, on_client_timeout}), - ok = emqx_hooks:del('client.monitored_process_down', {?MODULE, on_process_down}), - ok = emqx_hooks:del('cm.channel.unregistered', {?MODULE, on_channel_unregistered}). + ok = emqx_hooks:del('message.puback', {?MODULE, on_message_puback}). %%-------------------------------------------------------------------- %% API @@ -152,40 +137,6 @@ on_message_puback(PacketId, #message{from = From, topic = Topic} = Msg, _PubRes, ignore end. -on_channel_unregistered(ChannelPid) -> - ok = emqx_ft_async_reply:deregister_all(ChannelPid). - -on_client_timeout(_TRef0, ?FT_EVENT({MRef, TopicReplyData}), Acc) -> - _ = erlang:demonitor(MRef, [flush]), - Result = {error, timeout}, - _ = publish_response(Result, TopicReplyData), - case emqx_ft_async_reply:take_by_mref(MRef) of - {ok, undefined, _TRef1, _TopicReplyData} -> - {stop, Acc}; - {ok, PacketId, _TRef1, _TopicReplyData} -> - {stop, [?REPLY_OUTGOING(?PUBACK_PACKET(PacketId, result_to_rc(Result))) | Acc]}; - not_found -> - {ok, Acc} - end; -on_client_timeout(_TRef, _Event, Acc) -> - {ok, Acc}. - -on_process_down(MRef, _Pid, DownReason, Acc) -> - case emqx_ft_async_reply:take_by_mref(MRef) of - {ok, PacketId, TRef, TopicReplyData} -> - _ = emqx_utils:cancel_timer(TRef), - Result = down_reason_to_result(DownReason), - _ = publish_response(Result, TopicReplyData), - case PacketId of - undefined -> - {stop, Acc}; - _ -> - {stop, [?REPLY_OUTGOING(?PUBACK_PACKET(PacketId, result_to_rc(Result))) | Acc]} - end; - not_found -> - {ok, Acc} - end. - %%-------------------------------------------------------------------- %% Handlers for transfer messages %%-------------------------------------------------------------------- @@ -253,7 +204,7 @@ on_init(#{packet_id := PacketId}, Msg, Transfer, Meta) -> filemeta => Meta }), %% Currently synchronous. - %% If we want to make it async, we need to use `emqx_ft_async_reply`, + %% If we want to make it async, we need to use `with_responder`, %% like in `on_fin`. ?ACK_AND_PUBLISH(store_filemeta(Transfer, Meta)). @@ -271,11 +222,13 @@ on_segment(#{packet_id := PacketId}, Msg, Transfer, Offset, Checksum) -> }), Segment = {Offset, Msg#message.payload}, %% Currently synchronous. - %% If we want to make it async, we need to use `emqx_ft_async_reply`, + %% If we want to make it async, we need to use `with_responder`, %% like in `on_fin`. ?ACK_AND_PUBLISH(store_segment(Transfer, Segment)). -on_fin(#{packet_id := PacketId} = TopicReplyData, Msg, Transfer, FinalSize, FinalChecksum) -> +on_fin( + #{packet_id := PacketId, mode := Mode} = TopicReplyData, Msg, Transfer, FinalSize, FinalChecksum +) -> ?tp(info, "file_transfer_fin", #{ mqtt_msg => Msg, packet_id => PacketId, @@ -283,39 +236,78 @@ on_fin(#{packet_id := PacketId} = TopicReplyData, Msg, Transfer, FinalSize, Fina final_size => FinalSize, checksum => FinalChecksum }), - %% TODO: handle checksum? Do we need it? - with_new_packet( - TopicReplyData, - PacketId, - fun() -> - case assemble(Transfer, FinalSize, FinalChecksum) of - ok -> - ?ACK_AND_PUBLISH(ok); - %% Assembling started, packet will be acked/replied by monitor or timeout - {async, Pid} -> - register_async_worker(Pid, TopicReplyData); - {error, _} = Error -> - ?ACK_AND_PUBLISH(Error) - end + FinPacketKey = {self(), PacketId}, + Callback = fun(Result) -> + on_complete("assemble", TopicReplyData, FinPacketKey, Transfer, Result) + end, + with_responder(FinPacketKey, Callback, emqx_ft_conf:assemble_timeout(), fun() -> + case assemble(Transfer, FinalSize, FinalChecksum) of + %% Assembling completed, ack through the responder right away + ok -> + emqx_ft_responder:ack(FinPacketKey, ok), + ?DELAY_ACK; + %% Assembling started, packet will be acked by the responder + {async, Pid} -> + ok = emqx_ft_responder:kickoff(FinPacketKey, Pid), + ack_if_async(Mode); + %% Assembling failed, ack through the responder + {error, _} = Error -> + emqx_ft_responder:ack(FinPacketKey, Error), + ?DELAY_ACK end - ). + end). -register_async_worker(Pid, #{mode := Mode, packet_id := PacketId} = TopicReplyData) -> - MRef = erlang:monitor(process, Pid), - TRef = erlang:start_timer( - emqx_ft_conf:assemble_timeout(), self(), ?FT_EVENT({MRef, TopicReplyData}) - ), - case Mode of - async -> - ok = emqx_ft_async_reply:register(MRef, TRef, TopicReplyData), - ok = emqx_ft_storage:kickoff(Pid), - ?ACK(ok); - sync -> - ok = emqx_ft_async_reply:register(PacketId, MRef, TRef, TopicReplyData), - ok = emqx_ft_storage:kickoff(Pid), +with_responder(Key, Callback, Timeout, CriticalSection) -> + case emqx_ft_responder:start(Key, Callback, Timeout) of + %% We have new packet + {ok, _} -> + CriticalSection(); + %% Packet already received. + %% Since we are still handling the previous one, + %% we probably have retransmit here + {error, {already_started, _}} -> ?DELAY_ACK end. +ack_if_async(sync) -> + ?DELAY_ACK; +ack_if_async(async) -> + ?ACK(ok). + +on_complete(Op, #{mode := Mode} = TopicReplyData, {ChanPid, PacketId}, Transfer, ResponderResult) -> + ?tp(debug, "on_complete", #{ + operation => Op, + packet_id => PacketId, + transfer => Transfer + }), + Result = + case ResponderResult of + {RespMode, ok} when RespMode == ack orelse RespMode == down -> + ok; + {RespMode, {error, _} = Reason} when RespMode == ack orelse RespMode == down -> + ?tp(error, Op ++ "_failed", #{ + transfer => Transfer, + reason => Reason + }), + Reason; + timeout -> + ?tp(error, Op ++ "_timed_out", #{ + transfer => Transfer + }), + {error, timeout} + end, + NeedAck = + case {ResponderResult, Mode} of + {{down, _}, async} -> false; + {timeout, async} -> false; + _ -> true + end, + NeedAck andalso ack_packet(ChanPid, PacketId, Result), + _ = publish_response(Result, TopicReplyData). + +ack_packet(ChanPid, PacketId, Result) -> + erlang:send(ChanPid, {puback, PacketId, [], result_to_rc(Result)}). + topic_reply_data(Mode, From, PacketId, #message{topic = Topic, headers = Headers}) -> Props = maps:get(properties, Headers, #{}), #{ @@ -483,19 +475,3 @@ clientid_to_binary(A) when is_atom(A) -> atom_to_binary(A); clientid_to_binary(B) when is_binary(B) -> B. - -down_reason_to_result(normal) -> - ok; -down_reason_to_result(shutdown) -> - ok; -down_reason_to_result({shutdown, Result}) -> - Result; -down_reason_to_result(noproc) -> - {error, noproc}; -down_reason_to_result(Error) -> - {error, {internal_error, Error}}. - -with_new_packet(#{mode := async}, _PacketId, Fun) -> - Fun(); -with_new_packet(#{mode := sync}, PacketId, Fun) -> - emqx_ft_async_reply:with_new_packet(PacketId, Fun, undefined). diff --git a/apps/emqx_ft/src/emqx_ft_async_reply.erl b/apps/emqx_ft/src/emqx_ft_async_reply.erl deleted file mode 100644 index dc68f5653..000000000 --- a/apps/emqx_ft/src/emqx_ft_async_reply.erl +++ /dev/null @@ -1,122 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_ft_async_reply). - --include_lib("emqx/include/logger.hrl"). --include_lib("emqx/include/types.hrl"). --include_lib("stdlib/include/ms_transform.hrl"). - --export([ - create_tables/0, - info/0 -]). - --export([ - register/3, - register/4, - take_by_mref/1, - with_new_packet/3, - deregister_all/1 -]). - --type channel_pid() :: pid(). --type mon_ref() :: reference(). --type timer_ref() :: reference(). --type packet_id() :: emqx_types:packet_id(). - -%% packets waiting for async workers - --define(MON_TAB, emqx_ft_async_mons). --define(MON_KEY(MRef), ?MON_KEY(self(), MRef)). --define(MON_KEY(ChannelPid, MRef), {ChannelPid, MRef}). --define(MON_RECORD(KEY, PACKET_ID, TREF, DATA), {KEY, PACKET_ID, TREF, DATA}). - -%% async worker monitors by packet ids - --define(PACKET_TAB, emqx_ft_async_packets). --define(PACKET_KEY(PacketId), ?PACKET_KEY(self(), PacketId)). --define(PACKET_KEY(ChannelPid, PacketId), {ChannelPid, PacketId}). --define(PACKET_RECORD(KEY, MREF, DATA), {KEY, MREF, DATA}). - -%%-------------------------------------------------------------------- -%% API -%% ------------------------------------------------------------------- - --spec create_tables() -> ok. -create_tables() -> - EtsOptions = [ - named_table, - public, - ordered_set, - {read_concurrency, true}, - {write_concurrency, true} - ], - ok = emqx_utils_ets:new(?MON_TAB, EtsOptions), - ok = emqx_utils_ets:new(?PACKET_TAB, EtsOptions), - ok. - --spec register(packet_id(), mon_ref(), timer_ref(), term()) -> ok. -register(PacketId, MRef, TRef, Data) -> - _ = ets:insert(?PACKET_TAB, ?PACKET_RECORD(?PACKET_KEY(PacketId), MRef, Data)), - _ = ets:insert(?MON_TAB, ?MON_RECORD(?MON_KEY(MRef), PacketId, TRef, Data)), - ok. - --spec register(mon_ref(), timer_ref(), term()) -> ok. -register(MRef, TRef, Data) -> - _ = ets:insert(?MON_TAB, ?MON_RECORD(?MON_KEY(MRef), undefined, TRef, Data)), - ok. - --spec with_new_packet(packet_id(), fun(() -> any()), any()) -> any(). -with_new_packet(PacketId, Fun, Default) -> - case ets:member(?PACKET_TAB, ?PACKET_KEY(PacketId)) of - true -> Default; - false -> Fun() - end. - --spec take_by_mref(mon_ref()) -> {ok, packet_id() | undefined, timer_ref(), term()} | not_found. -take_by_mref(MRef) -> - case ets:take(?MON_TAB, ?MON_KEY(MRef)) of - [?MON_RECORD(_, PacketId, TRef, Data)] -> - PacketId =/= undefined andalso ets:delete(?PACKET_TAB, ?PACKET_KEY(PacketId)), - {ok, PacketId, TRef, Data}; - [] -> - not_found - end. - --spec deregister_all(channel_pid()) -> ok. -deregister_all(ChannelPid) -> - ok = deregister_packets(ChannelPid), - ok = deregister_mons(ChannelPid), - ok. - --spec info() -> {non_neg_integer(), non_neg_integer()}. -info() -> - {ets:info(?MON_TAB, size), ets:info(?PACKET_TAB, size)}. - -%%-------------------------------------------------------------------- -%% Internal -%%------------------------------------------------------------------- - -deregister_packets(ChannelPid) when is_pid(ChannelPid) -> - MS = [{?PACKET_RECORD(?PACKET_KEY(ChannelPid, '_'), '_', '_'), [], [true]}], - _ = ets:select_delete(?PACKET_TAB, MS), - ok. - -deregister_mons(ChannelPid) -> - MS = [{?MON_RECORD(?MON_KEY(ChannelPid, '_'), '_', '_', '_'), [], [true]}], - _ = ets:select_delete(?MON_TAB, MS), - ok. diff --git a/apps/emqx_ft/src/emqx_ft_responder.erl b/apps/emqx_ft/src/emqx_ft_responder.erl new file mode 100644 index 000000000..a9712338d --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_responder.erl @@ -0,0 +1,116 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_responder). + +-behaviour(gen_server). + +-include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/types.hrl"). + +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +%% API +-export([start/3]). +-export([kickoff/2]). +-export([ack/2]). + +%% Supervisor API +-export([start_link/3]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). + +-define(REF(Key), {via, gproc, {n, l, {?MODULE, Key}}}). + +-type key() :: term(). +-type respfun() :: fun(({ack, _Result} | {down, _Result} | timeout) -> _SideEffect). + +%%-------------------------------------------------------------------- +%% API +%% ------------------------------------------------------------------- + +-spec start(key(), respfun(), timeout()) -> startlink_ret(). +start(Key, RespFun, Timeout) -> + emqx_ft_responder_sup:start_child(Key, RespFun, Timeout). + +-spec kickoff(key(), pid()) -> ok. +kickoff(Key, Pid) -> + gen_server:call(?REF(Key), {kickoff, Pid}). + +-spec ack(key(), _Result) -> _Return. +ack(Key, Result) -> + % TODO: it's possible to avoid term copy + gen_server:call(?REF(Key), {ack, Result}, infinity). + +-spec start_link(key(), timeout(), respfun()) -> startlink_ret(). +start_link(Key, RespFun, Timeout) -> + gen_server:start_link(?REF(Key), ?MODULE, {Key, RespFun, Timeout}, []). + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%% ------------------------------------------------------------------- + +init({Key, RespFun, Timeout}) -> + _ = erlang:process_flag(trap_exit, true), + _TRef = erlang:send_after(Timeout, self(), timeout), + {ok, {Key, RespFun}}. + +handle_call({kickoff, Pid}, _From, St) -> + % TODO: more state? + _MRef = erlang:monitor(process, Pid), + _ = Pid ! kickoff, + {reply, ok, St}; +handle_call({ack, Result}, _From, {Key, RespFun}) -> + Ret = apply(RespFun, [{ack, Result}]), + ?tp(debug, ft_responder_ack, #{key => Key, result => Result, return => Ret}), + {stop, {shutdown, Ret}, Ret, undefined}; +handle_call(Msg, _From, State) -> + ?SLOG(warning, #{msg => "unknown_call", call_msg => Msg}), + {reply, {error, unknown_call}, State}. + +handle_cast(Msg, State) -> + ?SLOG(warning, #{msg => "unknown_cast", cast_msg => Msg}), + {noreply, State}. + +handle_info(timeout, {Key, RespFun}) -> + Ret = apply(RespFun, [timeout]), + ?tp(debug, ft_responder_timeout, #{key => Key, return => Ret}), + {stop, {shutdown, Ret}, undefined}; +handle_info({'DOWN', _MRef, process, _Pid, Reason}, {Key, RespFun}) -> + Ret = apply(RespFun, [{down, map_down_reason(Reason)}]), + ?tp(debug, ft_responder_procdown, #{key => Key, reason => Reason, return => Ret}), + {stop, {shutdown, Ret}, undefined}; +handle_info(Msg, State) -> + ?SLOG(warning, #{msg => "unknown_message", info_msg => Msg}), + {noreply, State}. + +terminate(_Reason, undefined) -> + ok; +terminate(Reason, {Key, RespFun}) -> + Ret = apply(RespFun, [timeout]), + ?tp(debug, ft_responder_shutdown, #{key => Key, reason => Reason, return => Ret}), + ok. + +map_down_reason(normal) -> + ok; +map_down_reason(shutdown) -> + ok; +map_down_reason({shutdown, Result}) -> + Result; +map_down_reason(noproc) -> + {error, noproc}; +map_down_reason(Error) -> + {error, {internal_error, Error}}. diff --git a/apps/emqx_ft/src/emqx_ft_responder_sup.erl b/apps/emqx_ft/src/emqx_ft_responder_sup.erl new file mode 100644 index 000000000..fb3932425 --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_responder_sup.erl @@ -0,0 +1,48 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_responder_sup). + +-export([start_link/0]). +-export([start_child/3]). + +-behaviour(supervisor). +-export([init/1]). + +-define(SUPERVISOR, ?MODULE). + +%% + +-spec start_link() -> {ok, pid()}. +start_link() -> + supervisor:start_link({local, ?SUPERVISOR}, ?MODULE, []). + +start_child(Key, RespFun, Timeout) -> + supervisor:start_child(?SUPERVISOR, [Key, RespFun, Timeout]). + +-spec init(_) -> {ok, {supervisor:sup_flags(), [supervisor:child_spec()]}}. +init(_) -> + Flags = #{ + strategy => simple_one_for_one, + intensity => 100, + period => 100 + }, + ChildSpec = #{ + id => responder, + start => {emqx_ft_responder, start_link, []}, + restart => temporary + }, + {ok, {Flags, [ChildSpec]}}. diff --git a/apps/emqx_ft/src/emqx_ft_sup.erl b/apps/emqx_ft/src/emqx_ft_sup.erl index d3b90aa3d..af7035755 100644 --- a/apps/emqx_ft/src/emqx_ft_sup.erl +++ b/apps/emqx_ft/src/emqx_ft_sup.erl @@ -28,8 +28,6 @@ start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []). init([]) -> - ok = emqx_ft_async_reply:create_tables(), - SupFlags = #{ strategy => one_for_one, intensity => 100, @@ -54,5 +52,14 @@ init([]) -> modules => [emqx_ft_storage_fs_reader_sup] }, - ChildSpecs = [AssemblerSup, FileReaderSup], + ResponderSup = #{ + id => emqx_ft_responder_sup, + start => {emqx_ft_responder_sup, start_link, []}, + restart => permanent, + shutdown => infinity, + type => supervisor, + modules => [emqx_ft_responder_sup] + }, + + ChildSpecs = [ResponderSup, AssemblerSup, FileReaderSup], {ok, {SupFlags, ChildSpecs}}. diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl index 9e1fd9f3b..4ca18898a 100644 --- a/apps/emqx_ft/test/emqx_ft_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl @@ -55,7 +55,8 @@ groups() -> t_assemble_timeout ]}, {async_mode, [], [ - {group, single_node} + {group, single_node}, + t_client_disconnect_while_assembling ]}, {sync_mode, [], [ {group, single_node} @@ -116,9 +117,9 @@ init_per_group(Group = cluster, Config) -> Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => WorkDir}), [{group, Group}, {cluster_nodes, Nodes} | Config]; init_per_group(_Group = async_mode, Config) -> - [{mode, sync} | Config]; -init_per_group(_Group = sync_mode, Config) -> [{mode, async} | Config]; +init_per_group(_Group = sync_mode, Config) -> + [{mode, sync} | Config]; init_per_group(Group, Config) -> [{group, Group} | Config]. @@ -529,6 +530,62 @@ t_assemble_timeout(Config) -> ?assert(2_000_000 < Time). +t_client_disconnect_while_assembling(Config) -> + ClientId = atom_to_binary(?FUNCTION_NAME), + {ok, Client} = emqtt:start_link([{proto_ver, v5}, {clientid, ClientId}]), + {ok, _} = emqtt:connect(Client), + + Filename = "topsecret.pdf", + FileId = <<"f1">>, + + Data = <<"data">>, + + Meta = #{size := Filesize} = meta(Filename, Data), + + ?assertRCName( + success, + emqtt:publish(Client, mk_init_topic(Config, FileId), encode_meta(Meta), 1) + ), + + ?assertRCName( + success, + emqtt:publish(Client, mk_segment_topic(Config, FileId, 0), Data, 1) + ), + + ResponseTopic = emqx_ft_test_helpers:response_topic(ClientId), + + {ok, OtherClient} = emqtt:start_link([{proto_ver, v5}, {clientid, <<"other">>}]), + {ok, _} = emqtt:connect(OtherClient), + {ok, _, _} = emqtt:subscribe(OtherClient, ResponseTopic, 1), + + FinTopic = mk_fin_topic(Config, FileId, Filesize), + + ?assertRCName( + success, + emqtt:publish(Client, FinTopic, <<>>, 1) + ), + + ok = emqtt:stop(Client), + + ResultReceive = fun Receive() -> + receive + {publish, #{payload := Payload, topic := ResponseTopic}} -> + case emqx_utils_json:decode(Payload) of + #{<<"topic">> := FinTopic, <<"reason_code">> := 0} -> + ok; + #{<<"topic">> := FinTopic, <<"reason_code">> := _} = FTResult -> + ct:fail("unexpected fin result: ~p", [FTResult]); + _ -> + Receive() + end + after 1000 -> + ct:fail("timeout waiting for fin result") + end + end, + ResultReceive(), + + ok = emqtt:stop(OtherClient). + %%-------------------------------------------------------------------- %% Cluster tests %%-------------------------------------------------------------------- diff --git a/apps/emqx_ft/test/emqx_ft_async_reply_SUITE.erl b/apps/emqx_ft/test/emqx_ft_async_reply_SUITE.erl deleted file mode 100644 index cad3c4b4f..000000000 --- a/apps/emqx_ft/test/emqx_ft_async_reply_SUITE.erl +++ /dev/null @@ -1,247 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_ft_async_reply_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("common_test/include/ct.hrl"). --include_lib("stdlib/include/assert.hrl"). --include_lib("emqx/include/asserts.hrl"). - -all() -> emqx_common_test_helpers:all(?MODULE). - -init_per_suite(Config) -> - Apps = emqx_cth_suite:start( - [ - {emqx, #{override_env => [{boot_modules, [broker, listeners]}]}}, - {emqx_ft, "file_transfer { enable = true, assemble_timeout = 1s }"} - ], - #{work_dir => ?config(priv_dir, Config)} - ), - [{suite_apps, Apps} | Config]. - -end_per_suite(Config) -> - ok = emqx_cth_suite:stop(?config(suite_apps, Config)), - ok. - -init_per_testcase(_Case, Config) -> - ok = snabbkaffe:start_trace(), - Config. - -end_per_testcase(_Case, _Config) -> - ok = snabbkaffe:stop(), - ok. - -%%-------------------------------------------------------------------- -%% Tests -%%-------------------------------------------------------------------- - -t_register(_Config) -> - PacketId = 1, - MRef = make_ref(), - TRef = make_ref(), - ok = emqx_ft_async_reply:register(PacketId, MRef, TRef, somedata), - - ?assertEqual( - undefined, - emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined) - ), - - ?assertEqual( - ok, - emqx_ft_async_reply:with_new_packet(2, fun() -> ok end, undefined) - ), - - ?assertEqual( - {ok, PacketId, TRef, somedata}, - emqx_ft_async_reply:take_by_mref(MRef) - ). - -t_process_independence(_Config) -> - PacketId = 1, - MRef = make_ref(), - TRef = make_ref(), - ok = emqx_ft_async_reply:register(PacketId, MRef, TRef, somedata), - - Self = self(), - - spawn_link(fun() -> - Self ! emqx_ft_async_reply:take_by_mref(MRef) - end), - - Res1 = - receive - Msg1 -> Msg1 - end, - - ?assertEqual( - not_found, - Res1 - ), - - spawn_link(fun() -> - Self ! emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined) - end), - - Res2 = - receive - Msg2 -> Msg2 - end, - - ?assertEqual( - ok, - Res2 - ). - -t_take(_Config) -> - PacketId = 1, - MRef = make_ref(), - TRef = make_ref(), - ok = emqx_ft_async_reply:register(PacketId, MRef, TRef, somedata), - - ?assertEqual( - {ok, PacketId, TRef, somedata}, - emqx_ft_async_reply:take_by_mref(MRef) - ), - - ?assertEqual( - not_found, - emqx_ft_async_reply:take_by_mref(MRef) - ), - - ?assertEqual( - ok, - emqx_ft_async_reply:with_new_packet(2, fun() -> ok end, undefined) - ). - -t_cleanup(_Config) -> - PacketId = 1, - MRef0 = make_ref(), - TRef0 = make_ref(), - MRef1 = make_ref(), - TRef1 = make_ref(), - ok = emqx_ft_async_reply:register(PacketId, MRef0, TRef0, somedata0), - - Self = self(), - - Pid = spawn_link(fun() -> - ok = emqx_ft_async_reply:register(PacketId, MRef1, TRef1, somedata1), - receive - kickoff -> - ?assertEqual( - undefined, - emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined) - ), - - ?assertEqual( - {ok, PacketId, TRef1, somedata1}, - emqx_ft_async_reply:take_by_mref(MRef1) - ), - - Self ! done - end - end), - - ?assertEqual( - undefined, - emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined) - ), - - ok = emqx_ft_async_reply:deregister_all(Self), - - ?assertEqual( - ok, - emqx_ft_async_reply:with_new_packet(PacketId, fun() -> ok end, undefined) - ), - - Pid ! kickoff, - - receive - done -> ok - end. - -t_reply_by_tiemout(_Config) -> - process_flag(trap_exit, true), - ClientId = atom_to_binary(?FUNCTION_NAME), - C = emqx_ft_test_helpers:start_client(ClientId, node()), - - SleepForever = fun() -> - Ref = make_ref(), - receive - Ref -> ok - end - end, - - ok = meck:new(emqx_ft_storage, [passthrough]), - meck:expect(emqx_ft_storage, assemble, fun(_, _, _) -> {async, spawn_link(SleepForever)} end), - - FinTopic = <<"$file/fakeid/fin/999999">>, - - ?assertMatch( - {ok, #{reason_code_name := unspecified_error}}, - emqtt:publish(C, FinTopic, <<>>, 1) - ), - - meck:unload(emqx_ft_storage), - emqtt:stop(C). - -t_cleanup_by_cm(_Config) -> - process_flag(trap_exit, true), - ClientId = atom_to_binary(?FUNCTION_NAME), - C = emqx_ft_test_helpers:start_client(ClientId, node()), - - ok = meck:new(emqx_ft_storage, [passthrough]), - meck:expect(emqx_ft_storage, kickoff, fun(_) -> meck:exception(error, oops) end), - - FinTopic = <<"$file/fakeid/fin/999999">>, - - [ClientPid] = emqx_cm:lookup_channels(ClientId), - - ?assertWaitEvent( - begin - emqtt:publish(C, FinTopic, <<>>, 1), - exit(ClientPid, kill) - end, - #{?snk_kind := emqx_cm_clean_down, client_id := ClientId}, - 1000 - ), - - ?assertEqual( - {0, 0}, - emqx_ft_async_reply:info() - ), - - meck:unload(emqx_ft_storage). - -t_unrelated_events(_Config) -> - process_flag(trap_exit, true), - ClientId = atom_to_binary(?FUNCTION_NAME), - C = emqx_ft_test_helpers:start_client(ClientId, node()), - [ClientPid] = emqx_cm:lookup_channels(ClientId), - - erlang:monitor(process, ClientPid), - - ClientPid ! {'DOWN', make_ref(), process, self(), normal}, - ClientPid ! {timeout, make_ref(), unknown_timer_event}, - - ?assertNotReceive( - {'DOWN', _Ref, process, ClientPid, _Reason}, - 500 - ), - - emqtt:stop(C). diff --git a/apps/emqx_ft/test/emqx_ft_responder_SUITE.erl b/apps/emqx_ft/test/emqx_ft_responder_SUITE.erl new file mode 100644 index 000000000..056c75747 --- /dev/null +++ b/apps/emqx_ft/test/emqx_ft_responder_SUITE.erl @@ -0,0 +1,93 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_responder_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("stdlib/include/assert.hrl"). +-include_lib("emqx/include/asserts.hrl"). + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + Apps = emqx_cth_suite:start( + [ + {emqx_ft, "file_transfer {enable = true}"} + ], + #{work_dir => ?config(priv_dir, Config)} + ), + [{suite_apps, Apps} | Config]. + +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(suite_apps, Config)), + ok. + +init_per_testcase(_Case, Config) -> + Config. + +end_per_testcase(_Case, _Config) -> + ok. + +t_start_ack(_Config) -> + Key = <<"test">>, + DefaultAction = fun({ack, Ref}) -> Ref end, + {ok, ResponderPid} = emqx_ft_responder:start(Key, DefaultAction, 1000), + erlang:monitor(process, ResponderPid), + ?assertMatch( + {error, {already_started, _Pid}}, + emqx_ft_responder:start(Key, DefaultAction, 1000) + ), + Ref = make_ref(), + ?assertEqual( + Ref, + emqx_ft_responder:ack(Key, Ref) + ), + ?assertExit( + {noproc, _}, + emqx_ft_responder:ack(Key, Ref) + ), + ?assertReceive( + {'DOWN', _, process, ResponderPid, {shutdown, _}}, + 1000 + ). + +t_timeout(_Config) -> + Key = <<"test">>, + Self = self(), + DefaultAction = fun(timeout) -> Self ! {timeout, Key} end, + {ok, _Pid} = emqx_ft_responder:start(Key, DefaultAction, 20), + receive + {timeout, Key} -> + ok + after 100 -> + ct:fail("emqx_ft_responder not called") + end, + ?assertExit( + {noproc, _}, + emqx_ft_responder:ack(Key, oops) + ). + +t_unknown_msgs(_Config) -> + {ok, Pid} = emqx_ft_responder:start(make_ref(), fun(_) -> ok end, 100), + Pid ! {unknown_msg, <<"test">>}, + ok = gen_server:cast(Pid, {unknown_msg, <<"test">>}), + ?assertEqual( + {error, unknown_call}, + gen_server:call(Pid, {unknown_call, <<"test">>}) + ). diff --git a/changes/ce/fix-12514.en.md b/changes/ce/fix-12514.en.md new file mode 100644 index 000000000..1d97fb84d --- /dev/null +++ b/changes/ce/fix-12514.en.md @@ -0,0 +1,3 @@ +Fixed file transfer command result reporting to the `$file-response/${clientid}` channel. + +Previously, if a channel issued an `assemble` command but disconnected before the assemble was completed, the status message was lost (not sent to the response topic).