diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index 41020e76f..c886b86bd 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -90,6 +90,10 @@ -define(FT_EVENT(EVENT), {?MODULE, EVENT}). +-define(ACK_AND_PUBLISH(Result), {true, Result}). +-define(ACK(Result), {false, Result}). +-define(DELAY_ACK, delay). + %%-------------------------------------------------------------------- %% API for app %%-------------------------------------------------------------------- @@ -116,46 +120,34 @@ unhook() -> %% API %%-------------------------------------------------------------------- -decode_filemeta(Payload) when is_binary(Payload) -> - case emqx_utils_json:safe_decode(Payload, [return_maps]) of - {ok, Map} -> - decode_filemeta(Map); - {error, Error} -> - {error, {invalid_filemeta_json, Error}} - end; -decode_filemeta(Map) when is_map(Map) -> - Schema = emqx_ft_schema:schema(filemeta), - try - Meta = hocon_tconf:check_plain(Schema, Map, #{atom_key => true, required => false}), - {ok, Meta} - catch - throw:{_Schema, Errors} -> - {error, {invalid_filemeta, Errors}} - end. +decode_filemeta(Payload) -> + emqx_ft_schema:decode(filemeta, Payload). encode_filemeta(Meta = #{}) -> - Schema = emqx_ft_schema:schema(filemeta), - hocon_tconf:make_serializable(Schema, emqx_utils_maps:binary_key_map(Meta), #{}). + emqx_ft_schema:encode(filemeta, Meta). + +encode_response(Response) -> + emqx_ft_schema:encode(command_response, Response). %%-------------------------------------------------------------------- %% Hooks %%-------------------------------------------------------------------- -on_message_publish( - Msg = #message{ - id = _Id, - topic = <<"$file/", _/binary>> - } -) -> +on_message_publish(Msg = #message{topic = <<"$file-async/", _/binary>>}) -> + Headers = Msg#message.headers, + {stop, Msg#message{headers = Headers#{allow_publish => false}}}; +on_message_publish(Msg = #message{topic = <<"$file/", _/binary>>}) -> Headers = Msg#message.headers, {stop, Msg#message{headers = Headers#{allow_publish => false}}}; on_message_publish(Msg) -> {ok, Msg}. -on_message_puback(PacketId, #message{topic = Topic} = Msg, _PubRes, _RC) -> +on_message_puback(PacketId, #message{from = From, topic = Topic} = Msg, _PubRes, _RC) -> case Topic of - <<"$file/", FileCommand/binary>> -> - {stop, on_file_command(PacketId, Msg, FileCommand)}; + <<"$file/", _/binary>> -> + {stop, on_file_command(sync, From, PacketId, Msg, Topic)}; + <<"$file-async/", _/binary>> -> + {stop, on_file_command(async, From, PacketId, Msg, Topic)}; _ -> ignore end. @@ -163,18 +155,33 @@ on_message_puback(PacketId, #message{topic = Topic} = Msg, _PubRes, _RC) -> on_channel_unregistered(ChannelPid) -> ok = emqx_ft_async_reply:deregister_all(ChannelPid). -on_client_timeout(_TRef, ?FT_EVENT({MRef, PacketId}), Acc) -> +on_client_timeout(_TRef0, ?FT_EVENT({MRef, TopicReplyData}), Acc) -> _ = erlang:demonitor(MRef, [flush]), - _ = emqx_ft_async_reply:take_by_mref(MRef), - {stop, [?REPLY_OUTGOING(?PUBACK_PACKET(PacketId, ?RC_UNSPECIFIED_ERROR)) | Acc]}; + Result = {error, timeout}, + _ = publish_response(Result, TopicReplyData), + case emqx_ft_async_reply:take_by_mref(MRef) of + {ok, undefined, _TRef1, _TopicReplyData} -> + {stop, Acc}; + {ok, PacketId, _TRef1, _TopicReplyData} -> + {stop, [?REPLY_OUTGOING(?PUBACK_PACKET(PacketId, result_to_rc(Result))) | Acc]}; + not_found -> + {ok, Acc} + end; on_client_timeout(_TRef, _Event, Acc) -> {ok, Acc}. -on_process_down(MRef, _Pid, Reason, Acc) -> +on_process_down(MRef, _Pid, DownReason, Acc) -> case emqx_ft_async_reply:take_by_mref(MRef) of - {ok, PacketId, TRef} -> + {ok, PacketId, TRef, TopicReplyData} -> _ = emqx_utils:cancel_timer(TRef), - {stop, [?REPLY_OUTGOING(?PUBACK_PACKET(PacketId, reason_to_rc(Reason))) | Acc]}; + Result = down_reason_to_result(DownReason), + _ = publish_response(Result, TopicReplyData), + case PacketId of + undefined -> + {stop, Acc}; + _ -> + {stop, [?REPLY_OUTGOING(?PUBACK_PACKET(PacketId, result_to_rc(Result))) | Acc]} + end; not_found -> {ok, Acc} end. @@ -185,24 +192,27 @@ on_process_down(MRef, _Pid, Reason, Acc) -> %% TODO Move to emqx_ft_mqtt? -on_file_command(PacketId, Msg, FileCommand) -> - case emqx_topic:tokens(FileCommand) of - [FileIdIn | Rest] -> - validate([{fileid, FileIdIn}], fun([FileId]) -> - on_file_command(PacketId, FileId, Msg, Rest) - end); - [] -> - ?RC_UNSPECIFIED_ERROR - end. +on_file_command(Mode, From, PacketId, Msg, Topic) -> + TopicReplyData = topic_reply_data(Mode, From, PacketId, Msg), + Result = + case emqx_topic:tokens(Topic) of + [_FTPrefix, FileIdIn | Rest] -> + validate([{fileid, FileIdIn}], fun([FileId]) -> + do_on_file_command(TopicReplyData, FileId, Msg, Rest) + end); + [] -> + ?ACK_AND_PUBLISH({error, {invalid_topic, Topic}}) + end, + maybe_publish_response(Result, TopicReplyData). -on_file_command(PacketId, FileId, Msg, FileCommand) -> +do_on_file_command(TopicReplyData, FileId, Msg, FileCommand) -> Transfer = transfer(Msg, FileId), case FileCommand of [<<"init">>] -> validate( [{filemeta, Msg#message.payload}], fun([Meta]) -> - on_init(PacketId, Msg, Transfer, Meta) + on_init(TopicReplyData, Msg, Transfer, Meta) end ); [<<"fin">>, FinalSizeBin | MaybeChecksum] when length(MaybeChecksum) =< 1 -> @@ -210,14 +220,14 @@ on_file_command(PacketId, FileId, Msg, FileCommand) -> validate( [{size, FinalSizeBin}, {{maybe, checksum}, ChecksumBin}], fun([FinalSize, FinalChecksum]) -> - on_fin(PacketId, Msg, Transfer, FinalSize, FinalChecksum) + on_fin(TopicReplyData, Msg, Transfer, FinalSize, FinalChecksum) end ); [<<"abort">>] -> - on_abort(Msg, Transfer); + on_abort(TopicReplyData, Msg, Transfer); [OffsetBin] -> validate([{offset, OffsetBin}], fun([Offset]) -> - on_segment(PacketId, Msg, Transfer, Offset, undefined) + on_segment(TopicReplyData, Msg, Transfer, Offset, undefined) end); [OffsetBin, ChecksumBin] -> validate( @@ -226,16 +236,16 @@ on_file_command(PacketId, FileId, Msg, FileCommand) -> validate( [{integrity, Msg#message.payload, Checksum}], fun(_) -> - on_segment(PacketId, Msg, Transfer, Offset, Checksum) + on_segment(TopicReplyData, Msg, Transfer, Offset, Checksum) end ) end ); _ -> - ?RC_UNSPECIFIED_ERROR + ?ACK_AND_PUBLISH({error, {invalid_file_command, FileCommand}}) end. -on_init(PacketId, Msg, Transfer, Meta) -> +on_init(#{packet_id := PacketId}, Msg, Transfer, Meta) -> ?tp(info, "file_transfer_init", #{ mqtt_msg => Msg, packet_id => PacketId, @@ -245,16 +255,13 @@ on_init(PacketId, Msg, Transfer, Meta) -> %% Currently synchronous. %% If we want to make it async, we need to use `emqx_ft_async_reply`, %% like in `on_fin`. - case store_filemeta(Transfer, Meta) of - ok -> ?RC_SUCCESS; - {error, _} -> ?RC_UNSPECIFIED_ERROR - end. + ?ACK_AND_PUBLISH(store_filemeta(Transfer, Meta)). -on_abort(_Msg, _FileId) -> +on_abort(_TopicReplyData, _Msg, _FileId) -> %% TODO - ?RC_SUCCESS. + ?ACK_AND_PUBLISH(ok). -on_segment(PacketId, Msg, Transfer, Offset, Checksum) -> +on_segment(#{packet_id := PacketId}, Msg, Transfer, Offset, Checksum) -> ?tp(info, "file_transfer_segment", #{ mqtt_msg => Msg, packet_id => PacketId, @@ -266,12 +273,9 @@ on_segment(PacketId, Msg, Transfer, Offset, Checksum) -> %% Currently synchronous. %% If we want to make it async, we need to use `emqx_ft_async_reply`, %% like in `on_fin`. - case store_segment(Transfer, Segment) of - ok -> ?RC_SUCCESS; - {error, _} -> ?RC_UNSPECIFIED_ERROR - end. + ?ACK_AND_PUBLISH(store_segment(Transfer, Segment)). -on_fin(PacketId, Msg, Transfer, FinalSize, FinalChecksum) -> +on_fin(#{packet_id := PacketId} = TopicReplyData, Msg, Transfer, FinalSize, FinalChecksum) -> ?tp(info, "file_transfer_fin", #{ mqtt_msg => Msg, packet_id => PacketId, @@ -280,30 +284,94 @@ on_fin(PacketId, Msg, Transfer, FinalSize, FinalChecksum) -> checksum => FinalChecksum }), %% TODO: handle checksum? Do we need it? - emqx_ft_async_reply:with_new_packet( + with_new_packet( + TopicReplyData, PacketId, fun() -> case assemble(Transfer, FinalSize, FinalChecksum) of ok -> - ?RC_SUCCESS; - %% Assembling started, packet will be acked by monitor or timeout + ?ACK_AND_PUBLISH(ok); + %% Assembling started, packet will be acked/replied by monitor or timeout {async, Pid} -> - ok = register_async_reply(Pid, PacketId), - ok = emqx_ft_storage:kickoff(Pid), - undefined; - {error, _} -> - ?RC_UNSPECIFIED_ERROR + register_async_worker(Pid, TopicReplyData); + {error, _} = Error -> + ?ACK_AND_PUBLISH(Error) end - end, - undefined + end ). -register_async_reply(Pid, PacketId) -> +register_async_worker(Pid, #{mode := Mode, packet_id := PacketId} = TopicReplyData) -> MRef = erlang:monitor(process, Pid), TRef = erlang:start_timer( - emqx_ft_conf:assemble_timeout(), self(), ?FT_EVENT({MRef, PacketId}) + emqx_ft_conf:assemble_timeout(), self(), ?FT_EVENT({MRef, TopicReplyData}) ), - ok = emqx_ft_async_reply:register(PacketId, MRef, TRef). + case Mode of + async -> + ok = emqx_ft_async_reply:register(MRef, TRef, TopicReplyData), + ok = emqx_ft_storage:kickoff(Pid), + ?ACK(ok); + sync -> + ok = emqx_ft_async_reply:register(PacketId, MRef, TRef, TopicReplyData), + ok = emqx_ft_storage:kickoff(Pid), + ?DELAY_ACK + end. + +topic_reply_data(Mode, From, PacketId, #message{topic = Topic, headers = Headers}) -> + Props = maps:get(properties, Headers, #{}), + #{ + mode => Mode, + clientid => From, + command_topic => Topic, + correlation_data => maps:get('Correlation-Data', Props, undefined), + response_topic => maps:get('Response-Topic', Props, undefined), + packet_id => PacketId + }. + +maybe_publish_response(?DELAY_ACK, _TopicReplyData) -> + undefined; +maybe_publish_response(?ACK(Result), _TopicReplyData) -> + result_to_rc(Result); +maybe_publish_response(?ACK_AND_PUBLISH(Result), TopicReplyData) -> + publish_response(Result, TopicReplyData). + +publish_response(Result, #{ + clientid := ClientId, + command_topic := CommandTopic, + correlation_data := CorrelationData, + response_topic := ResponseTopic, + packet_id := PacketId +}) -> + ResultCode = result_to_rc(Result), + Response = encode_response(#{ + topic => CommandTopic, + packet_id => PacketId, + reason_code => ResultCode, + reason_description => emqx_ft_error:format(Result) + }), + Payload = emqx_utils_json:encode(Response), + Topic = emqx_maybe:define(ResponseTopic, response_topic(ClientId)), + Msg = emqx_message:make( + emqx_guid:gen(), + undefined, + ?QOS_1, + Topic, + Payload, + #{}, + #{properties => response_properties(CorrelationData)} + ), + _ = emqx_broker:publish(Msg), + ResultCode. + +response_properties(undefined) -> #{}; +response_properties(CorrelationData) -> #{'Correlation-Data' => CorrelationData}. + +response_topic(ClientId) -> + <<"$file-response/", (clientid_to_binary(ClientId))/binary>>. + +result_to_rc(ok) -> + ?RC_SUCCESS; +result_to_rc({error, _}) -> + ?RC_UNSPECIFIED_ERROR. store_filemeta(Transfer, Segment) -> try @@ -347,9 +415,9 @@ validate(Validations, Fun) -> case do_validate(Validations, []) of {ok, Parsed} -> Fun(Parsed); - {error, Reason} -> + {error, Reason} = Error -> ?tp(info, "client_violated_protocol", #{reason => Reason}), - ?RC_UNSPECIFIED_ERROR + ?ACK_AND_PUBLISH(Error) end. do_validate([], Parsed) -> @@ -416,19 +484,18 @@ clientid_to_binary(A) when is_atom(A) -> clientid_to_binary(B) when is_binary(B) -> B. -reason_to_rc(Reason) -> - case map_down_reason(Reason) of - ok -> ?RC_SUCCESS; - {error, _} -> ?RC_UNSPECIFIED_ERROR - end. - -map_down_reason(normal) -> +down_reason_to_result(normal) -> ok; -map_down_reason(shutdown) -> +down_reason_to_result(shutdown) -> ok; -map_down_reason({shutdown, Result}) -> +down_reason_to_result({shutdown, Result}) -> Result; -map_down_reason(noproc) -> +down_reason_to_result(noproc) -> {error, noproc}; -map_down_reason(Error) -> +down_reason_to_result(Error) -> {error, {internal_error, Error}}. + +with_new_packet(#{mode := async}, _PacketId, Fun) -> + Fun(); +with_new_packet(#{mode := sync}, PacketId, Fun) -> + emqx_ft_async_reply:with_new_packet(PacketId, Fun, undefined). diff --git a/apps/emqx_ft/src/emqx_ft_assembler.erl b/apps/emqx_ft/src/emqx_ft_assembler.erl index 0d9e86a49..b625d0ffe 100644 --- a/apps/emqx_ft/src/emqx_ft_assembler.erl +++ b/apps/emqx_ft/src/emqx_ft_assembler.erl @@ -156,12 +156,16 @@ handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #{export := % Currently, race is possible between getting segment info from the remote node and % this node garbage collecting the segment itself. % TODO: pipelining - % TODO: better error handling - {ok, Content} = pread(Node, Segment, St), - case emqx_ft_storage_exporter:write(Export, Content) of - {ok, NExport} -> - {next_state, {assemble, Rest}, St#{export := NExport}, ?internal([])}; - {error, _} = Error -> + case pread(Node, Segment, St) of + {ok, Content} -> + case emqx_ft_storage_exporter:write(Export, Content) of + {ok, NExport} -> + {next_state, {assemble, Rest}, St#{export := NExport}, ?internal([])}; + {error, _} = Error -> + {stop, {shutdown, Error}, maps:remove(export, St)} + end; + {error, ReadError} -> + Error = {error, {read_segment, ReadError}}, {stop, {shutdown, Error}, maps:remove(export, St)} end; handle_event(internal, _, {assemble, []}, St = #{}) -> diff --git a/apps/emqx_ft/src/emqx_ft_async_reply.erl b/apps/emqx_ft/src/emqx_ft_async_reply.erl index 501f91629..e099196f9 100644 --- a/apps/emqx_ft/src/emqx_ft_async_reply.erl +++ b/apps/emqx_ft/src/emqx_ft_async_reply.erl @@ -27,6 +27,7 @@ -export([ register/3, + register/4, take_by_mref/1, with_new_packet/3, deregister_all/1 @@ -42,12 +43,14 @@ -define(MON_TAB, emqx_ft_async_mons). -define(MON_KEY(MRef), ?MON_KEY(self(), MRef)). -define(MON_KEY(ChannelPid, MRef), {ChannelPid, MRef}). +-define(MON_RECORD(KEY, PACKET_ID, TREF, DATA), {KEY, PACKET_ID, TREF, DATA}). %% async worker monitors by packet ids -define(PACKET_TAB, emqx_ft_async_packets). -define(PACKET_KEY(PacketId), ?PACKET_KEY(self(), PacketId)). -define(PACKET_KEY(ChannelPid, PacketId), {ChannelPid, PacketId}). +-define(PACKET_RECORD(KEY, MREF, DATA), {KEY, MREF, DATA}). %%-------------------------------------------------------------------- %% API @@ -66,10 +69,15 @@ create_tables() -> ok = emqx_utils_ets:new(?PACKET_TAB, EtsOptions), ok. --spec register(packet_id(), mon_ref(), timer_ref()) -> ok. -register(PacketId, MRef, TRef) -> - _ = ets:insert(?PACKET_TAB, {?PACKET_KEY(PacketId), MRef}), - _ = ets:insert(?MON_TAB, {?MON_KEY(MRef), PacketId, TRef}), +-spec register(packet_id(), mon_ref(), timer_ref(), term()) -> ok. +register(PacketId, MRef, TRef, Data) -> + _ = ets:insert(?PACKET_TAB, ?PACKET_RECORD(?PACKET_KEY(PacketId), MRef, Data)), + _ = ets:insert(?MON_TAB, ?MON_RECORD(?MON_KEY(MRef), PacketId, TRef, Data)), + ok. + +-spec register(mon_ref(), timer_ref(), term()) -> ok. +register(MRef, TRef, Data) -> + _ = ets:insert(?MON_TAB, ?MON_RECORD(?MON_KEY(MRef), undefined, TRef, Data)), ok. -spec with_new_packet(packet_id(), fun(() -> any()), any()) -> any(). @@ -79,12 +87,12 @@ with_new_packet(PacketId, Fun, Default) -> false -> Fun() end. --spec take_by_mref(mon_ref()) -> {ok, packet_id(), timer_ref()} | not_found. +-spec take_by_mref(mon_ref()) -> {ok, packet_id() | undefined, timer_ref(), term()} | not_found. take_by_mref(MRef) -> case ets:take(?MON_TAB, ?MON_KEY(MRef)) of - [{_, PacketId, TRef}] -> - _ = ets:delete(?PACKET_TAB, ?PACKET_KEY(PacketId)), - {ok, PacketId, TRef}; + [?MON_RECORD(_, PacketId, TRef, Data)] -> + PacketId =/= undefined andalso ets:delete(?PACKET_TAB, ?PACKET_KEY(PacketId)), + {ok, PacketId, TRef, Data}; [] -> not_found end. @@ -104,11 +112,11 @@ info() -> %%------------------------------------------------------------------- deregister_packets(ChannelPid) when is_pid(ChannelPid) -> - MS = [{{?PACKET_KEY(ChannelPid, '_'), '_'}, [], [true]}], + MS = [{?PACKET_RECORD(?PACKET_KEY(ChannelPid, '_'), '_', '_'), [], [true]}], _ = ets:select_delete(?PACKET_TAB, MS), ok. deregister_mons(ChannelPid) -> - MS = [{{?MON_KEY(ChannelPid, '_'), '_', '_'}, [], [true]}], + MS = [{?MON_RECORD(?MON_KEY(ChannelPid, '_'), '_', '_', '_'), [], [true]}], _ = ets:select_delete(?MON_TAB, MS), ok. diff --git a/apps/emqx_ft/src/emqx_ft_error.erl b/apps/emqx_ft/src/emqx_ft_error.erl new file mode 100644 index 000000000..06d575ede --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_error.erl @@ -0,0 +1,39 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc File Transfer error description module + +-module(emqx_ft_error). + +-export([format/1]). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +format(ok) -> <<"success">>; +format({error, Reason}) -> format_error_reson(Reason). + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +format_error_reson(Reason) when is_atom(Reason) -> + atom_to_binary(Reason, utf8); +format_error_reson({ErrorKind, _}) when is_atom(ErrorKind) -> + atom_to_binary(ErrorKind, utf8); +format_error_reson(_Reason) -> + <<"internal_error">>. diff --git a/apps/emqx_ft/src/emqx_ft_schema.erl b/apps/emqx_ft/src/emqx_ft_schema.erl index dd21e9524..c89ba7d6a 100644 --- a/apps/emqx_ft/src/emqx_ft_schema.erl +++ b/apps/emqx_ft/src/emqx_ft_schema.erl @@ -26,7 +26,7 @@ -export([schema/1]). %% Utilities --export([backend/1]). +-export([backend/1, encode/2, decode/2]). %% Test-only helpers -export([translate/1]). @@ -76,7 +76,7 @@ fields(file_transfer) -> #{ desc => ?DESC("init_timeout"), required => false, - importance => ?IMPORTANCE_LOW, + importance => ?IMPORTANCE_HIDDEN, default => "10s" } )}, @@ -86,7 +86,7 @@ fields(file_transfer) -> #{ desc => ?DESC("store_segment_timeout"), required => false, - importance => ?IMPORTANCE_LOW, + importance => ?IMPORTANCE_HIDDEN, default => "5m" } )}, @@ -282,6 +282,16 @@ schema(filemeta) -> {segments_ttl, hoconsc:mk(pos_integer())}, {user_data, hoconsc:mk(json_value())} ] + }; +schema(command_response) -> + #{ + roots => [ + {vsn, hoconsc:mk(string(), #{default => <<"0.1">>})}, + {topic, hoconsc:mk(string())}, + {packet_id, hoconsc:mk(pos_integer())}, + {reason_code, hoconsc:mk(non_neg_integer())}, + {reason_description, hoconsc:mk(binary())} + ] }. validator(filename) -> @@ -345,6 +355,27 @@ backend(Config) -> emit_enabled(Type, BConf = #{enable := Enabled}) -> Enabled andalso throw({Type, BConf}). +decode(SchemaName, Payload) when is_binary(Payload) -> + case emqx_utils_json:safe_decode(Payload, [return_maps]) of + {ok, Map} -> + decode(SchemaName, Map); + {error, Error} -> + {error, {invalid_filemeta_json, Error}} + end; +decode(SchemaName, Map) when is_map(Map) -> + Schema = schema(SchemaName), + try + Meta = hocon_tconf:check_plain(Schema, Map, #{atom_key => true, required => false}), + {ok, Meta} + catch + throw:{_Schema, Errors} -> + {error, {invalid_filemeta, Errors}} + end. + +encode(SchemaName, Map = #{}) -> + Schema = schema(SchemaName), + hocon_tconf:make_serializable(Schema, emqx_utils_maps:binary_key_map(Map), #{}). + %% Test-only helpers -spec translate(emqx_config:raw_config()) -> diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter.erl index bc1b5fb4d..886dc27f6 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter.erl @@ -195,7 +195,7 @@ verify_checksum(Ctx, {Algo, Digest} = Checksum) -> Digest -> {ok, Checksum}; Mismatch -> - {error, {checksum, Algo, binary:encode_hex(Mismatch)}} + {error, {checksum_mismatch, Algo, binary:encode_hex(Mismatch)}} end; verify_checksum(Ctx, undefined) -> Digest = crypto:hash_final(Ctx), diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index 5d0395989..0102756ca 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -145,7 +145,7 @@ store_filemeta(Storage, Transfer, Meta) -> % We won't see conflicts in case of concurrent `store_filemeta` % requests. It's rather odd scenario so it's fine not to worry % about it too much now. - {error, conflict}; + {error, filemeta_conflict}; {error, Reason} when Reason =:= notfound; Reason =:= corrupted; Reason =:= enoent -> write_file_atomic(Storage, Transfer, Filepath, encode_filemeta(Meta)); {error, _} = Error -> diff --git a/apps/emqx_ft/test/emqx_ft_SUITE.erl b/apps/emqx_ft/test/emqx_ft_SUITE.erl index 7da9ccf69..6b6437971 100644 --- a/apps/emqx_ft/test/emqx_ft_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_SUITE.erl @@ -31,7 +31,8 @@ all() -> [ - {group, single_node}, + {group, async_mode}, + {group, sync_mode}, {group, cluster} ]. @@ -50,7 +51,14 @@ groups() -> t_nasty_filenames, t_no_meta, t_no_segment, - t_simple_transfer + t_simple_transfer, + t_assemble_timeout + ]}, + {async_mode, [], [ + {group, single_node} + ]}, + {sync_mode, [], [ + {group, single_node} ]}, {cluster, [], [ t_switch_node, @@ -72,9 +80,10 @@ init_per_suite(Config) -> emqx_ft_test_helpers:local_storage(Config), #{<<"local">> => #{<<"segments">> => #{<<"gc">> => #{<<"interval">> => 0}}}} ), + FTConfig = emqx_ft_test_helpers:config(Storage, #{<<"assemble_timeout">> => <<"2s">>}), Apps = emqx_cth_suite:start( [ - {emqx_ft, #{config => emqx_ft_test_helpers:config(Storage)}} + {emqx_ft, #{config => FTConfig}} ], #{work_dir => emqx_cth_suite:work_dir(Config)} ), @@ -85,7 +94,10 @@ end_per_suite(Config) -> ok. init_per_testcase(Case, Config) -> - ClientId = atom_to_binary(Case), + ClientId = iolist_to_binary([ + atom_to_binary(Case), <<"-">>, emqx_ft_test_helpers:unique_binary_string() + ]), + ok = set_client_specific_ft_dirs(ClientId, Config), case ?config(group, Config) of cluster -> [{clientid, ClientId} | Config]; @@ -103,6 +115,10 @@ init_per_group(Group = cluster, Config) -> Cluster = mk_cluster_specs(Config), Nodes = emqx_cth_cluster:start(Cluster, #{work_dir => WorkDir}), [{group, Group}, {cluster_nodes, Nodes} | Config]; +init_per_group(_Group = async_mode, Config) -> + [{mode, sync} | Config]; +init_per_group(_Group = sync_mode, Config) -> + [{mode, async} | Config]; init_per_group(Group, Config) -> [{group, Group} | Config]. @@ -127,7 +143,7 @@ mk_cluster_specs(_Config) -> ]. %%-------------------------------------------------------------------- -%% Tests +%% Single node tests %%-------------------------------------------------------------------- t_invalid_topic_format(Config) -> @@ -171,32 +187,32 @@ t_invalid_fileid(Config) -> C = ?config(client, Config), ?assertRCName( unspecified_error, - emqtt:publish(C, <<"$file//init">>, <<>>, 1) + emqtt:publish(C, mk_init_topic(Config, <<>>), <<>>, 1) ). t_invalid_filename(Config) -> C = ?config(client, Config), ?assertRCName( unspecified_error, - emqtt:publish(C, mk_init_topic(<<"f1">>), encode_meta(meta(".", <<>>)), 1) + emqtt:publish(C, mk_init_topic(Config, <<"f1">>), encode_meta(meta(".", <<>>)), 1) ), ?assertRCName( unspecified_error, - emqtt:publish(C, mk_init_topic(<<"f2">>), encode_meta(meta("..", <<>>)), 1) + emqtt:publish(C, mk_init_topic(Config, <<"f2">>), encode_meta(meta("..", <<>>)), 1) ), ?assertRCName( unspecified_error, - emqtt:publish(C, mk_init_topic(<<"f2">>), encode_meta(meta("../nice", <<>>)), 1) + emqtt:publish(C, mk_init_topic(Config, <<"f2">>), encode_meta(meta("../nice", <<>>)), 1) ), ?assertRCName( unspecified_error, - emqtt:publish(C, mk_init_topic(<<"f3">>), encode_meta(meta("/etc/passwd", <<>>)), 1) + emqtt:publish(C, mk_init_topic(Config, <<"f3">>), encode_meta(meta("/etc/passwd", <<>>)), 1) ), ?assertRCName( unspecified_error, emqtt:publish( C, - mk_init_topic(<<"f4">>), + mk_init_topic(Config, <<"f4">>), encode_meta(meta(lists:duplicate(1000, $A), <<>>)), 1 ) @@ -204,6 +220,7 @@ t_invalid_filename(Config) -> t_simple_transfer(Config) -> C = ?config(client, Config), + ClientId = ?config(clientid, Config), Filename = "topsecret.pdf", FileId = <<"f1">>, @@ -214,22 +231,24 @@ t_simple_transfer(Config) -> ?assertRCName( success, - emqtt:publish(C, mk_init_topic(FileId), encode_meta(Meta), 1) + emqtt:publish(C, mk_init_topic(Config, FileId), encode_meta(Meta), 1) ), lists:foreach( fun({Chunk, Offset}) -> ?assertRCName( success, - emqtt:publish(C, mk_segment_topic(FileId, Offset), Chunk, 1) + emqtt:publish(C, mk_segment_topic(Config, FileId, Offset), Chunk, 1) ) end, with_offsets(Data) ), - ?assertRCName( - success, - emqtt:publish(C, mk_fin_topic(FileId, Filesize), <<>>, 1) + ?assertEqual( + ok, + emqx_ft_test_helpers:fin_result( + mode(Config), ClientId, C, mk_fin_topic(Config, FileId, Filesize) + ) ), [Export] = list_files(?config(clientid, Config)), @@ -238,7 +257,7 @@ t_simple_transfer(Config) -> read_export(Export) ). -t_nasty_clientids_fileids(_Config) -> +t_nasty_clientids_fileids(Config) -> Transfers = [ {<<".">>, <<".">>}, {<<"🌚"/utf8>>, <<"🌝"/utf8>>}, @@ -249,15 +268,16 @@ t_nasty_clientids_fileids(_Config) -> ok = lists:foreach( fun({ClientId, FileId}) -> - ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "justfile", ClientId), + Data = ClientId, + ok = emqx_ft_test_helpers:upload_file(mode(Config), ClientId, FileId, "justfile", Data), [Export] = list_files(ClientId), ?assertMatch(#{meta := #{name := "justfile"}}, Export), - ?assertEqual({ok, ClientId}, read_export(Export)) + ?assertEqual({ok, Data}, read_export(Export)) end, Transfers ). -t_nasty_filenames(_Config) -> +t_nasty_filenames(Config) -> Filenames = [ {<<"nasty1">>, "146%"}, {<<"nasty2">>, "🌚"}, @@ -267,7 +287,7 @@ t_nasty_filenames(_Config) -> ok = lists:foreach( fun({ClientId, Filename}) -> FileId = unicode:characters_to_binary(Filename), - ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, Filename, FileId), + ok = emqx_ft_test_helpers:upload_file(mode(Config), ClientId, FileId, Filename, FileId), [Export] = list_files(ClientId), ?assertMatch(#{meta := #{name := Filename}}, Export), ?assertEqual({ok, FileId}, read_export(Export)) @@ -285,34 +305,36 @@ t_meta_conflict(Config) -> ?assertRCName( success, - emqtt:publish(C, mk_init_topic(FileId), encode_meta(Meta), 1) + emqtt:publish(C, mk_init_topic(Config, FileId), encode_meta(Meta), 1) ), ConflictMeta = Meta#{name => "conflict.pdf"}, ?assertRCName( unspecified_error, - emqtt:publish(C, mk_init_topic(FileId), encode_meta(ConflictMeta), 1) + emqtt:publish(C, mk_init_topic(Config, FileId), encode_meta(ConflictMeta), 1) ). t_no_meta(Config) -> C = ?config(client, Config), + ClientId = ?config(clientid, Config), FileId = <<"f1">>, Data = <<"first">>, ?assertRCName( success, - emqtt:publish(C, mk_segment_topic(FileId, 0), Data, 1) + emqtt:publish(C, mk_segment_topic(Config, FileId, 0), Data, 1) ), - ?assertRCName( - unspecified_error, - emqtt:publish(C, mk_fin_topic(FileId, 42), <<>>, 1) + ?assertEqual( + {error, unspecified_error}, + emqx_ft_test_helpers:fin_result(mode(Config), ClientId, C, mk_fin_topic(Config, FileId, 42)) ). t_no_segment(Config) -> C = ?config(client, Config), + ClientId = ?config(clientid, Config), Filename = "topsecret.pdf", FileId = <<"f1">>, @@ -323,23 +345,25 @@ t_no_segment(Config) -> ?assertRCName( success, - emqtt:publish(C, mk_init_topic(FileId), encode_meta(Meta), 1) + emqtt:publish(C, mk_init_topic(Config, FileId), encode_meta(Meta), 1) ), lists:foreach( fun({Chunk, Offset}) -> ?assertRCName( success, - emqtt:publish(C, mk_segment_topic(FileId, Offset), Chunk, 1) + emqtt:publish(C, mk_segment_topic(Config, FileId, Offset), Chunk, 1) ) end, %% Skip the first segment tl(with_offsets(Data)) ), - ?assertRCName( - unspecified_error, - emqtt:publish(C, mk_fin_topic(FileId, Filesize), <<>>, 1) + ?assertEqual( + {error, unspecified_error}, + emqx_ft_test_helpers:fin_result( + mode(Config), ClientId, C, mk_fin_topic(Config, FileId, Filesize) + ) ). t_invalid_meta(Config) -> @@ -352,17 +376,18 @@ t_invalid_meta(Config) -> MetaPayload = emqx_utils_json:encode(Meta), ?assertRCName( unspecified_error, - emqtt:publish(C, mk_init_topic(FileId), MetaPayload, 1) + emqtt:publish(C, mk_init_topic(Config, FileId), MetaPayload, 1) ), %% Invalid JSON ?assertRCName( unspecified_error, - emqtt:publish(C, mk_init_topic(FileId), <<"{oops;">>, 1) + emqtt:publish(C, mk_init_topic(Config, FileId), <<"{oops;">>, 1) ). t_invalid_checksum(Config) -> C = ?config(client, Config), + ClientId = ?config(clientid, Config), Filename = "topsecret.pdf", FileId = <<"f1">>, @@ -374,35 +399,39 @@ t_invalid_checksum(Config) -> ?assertRCName( success, - emqtt:publish(C, mk_init_topic(FileId), MetaPayload, 1) + emqtt:publish(C, mk_init_topic(Config, FileId), MetaPayload, 1) ), lists:foreach( fun({Chunk, Offset}) -> ?assertRCName( success, - emqtt:publish(C, mk_segment_topic(FileId, Offset), Chunk, 1) + emqtt:publish(C, mk_segment_topic(Config, FileId, Offset), Chunk, 1) ) end, with_offsets(Data) ), % Send `fin` w/o checksum, should fail since filemeta checksum is invalid - FinTopic = mk_fin_topic(FileId, Filesize), - ?assertRCName( - unspecified_error, - emqtt:publish(C, FinTopic, <<>>, 1) + FinTopic = mk_fin_topic(Config, FileId, Filesize), + + ?assertEqual( + {error, unspecified_error}, + emqx_ft_test_helpers:fin_result(mode(Config), ClientId, C, FinTopic) ), % Send `fin` with the correct checksum Checksum = binary:encode_hex(sha256(Data)), - ?assertRCName( - success, - emqtt:publish(C, <>, <<>>, 1) + ?assertEqual( + ok, + emqx_ft_test_helpers:fin_result( + mode(Config), ClientId, C, <> + ) ). t_corrupted_segment_retry(Config) -> C = ?config(client, Config), + ClientId = ?config(clientid, Config), Filename = "corruption.pdf", FileId = <<"4242-4242">>, @@ -421,35 +450,89 @@ t_corrupted_segment_retry(Config) -> Meta = #{size := Filesize} = meta(Filename, Data), - ?assertRCName(success, emqtt:publish(C, mk_init_topic(FileId), encode_meta(Meta), 1)), + ?assertRCName(success, emqtt:publish(C, mk_init_topic(Config, FileId), encode_meta(Meta), 1)), ?assertRCName( success, - emqtt:publish(C, mk_segment_topic(FileId, Offset1, Checksum1), Seg1, 1) + emqtt:publish(C, mk_segment_topic(Config, FileId, Offset1, Checksum1), Seg1, 1) ), % segment is corrupted ?assertRCName( unspecified_error, - emqtt:publish(C, mk_segment_topic(FileId, Offset2, Checksum2), <>, 1) + emqtt:publish( + C, mk_segment_topic(Config, FileId, Offset2, Checksum2), <>, 1 + ) ), % retry ?assertRCName( success, - emqtt:publish(C, mk_segment_topic(FileId, Offset2, Checksum2), Seg2, 1) + emqtt:publish(C, mk_segment_topic(Config, FileId, Offset2, Checksum2), Seg2, 1) ), ?assertRCName( success, - emqtt:publish(C, mk_segment_topic(FileId, Offset3, Checksum3), Seg3, 1) + emqtt:publish(C, mk_segment_topic(Config, FileId, Offset3, Checksum3), Seg3, 1) ), - ?assertRCName( - success, - emqtt:publish(C, mk_fin_topic(FileId, Filesize), <<>>, 1) + ?assertEqual( + ok, + emqx_ft_test_helpers:fin_result( + mode(Config), ClientId, C, mk_fin_topic(Config, FileId, Filesize) + ) ). +t_assemble_crash(Config) -> + C = ?config(client, Config), + + meck:new(emqx_ft_storage_fs), + meck:expect(emqx_ft_storage_fs, assemble, fun(_, _, _, _) -> meck:exception(error, oops) end), + + ?assertRCName( + unspecified_error, + emqtt:publish(C, <<"$file/someid/fin">>, <<>>, 1) + ), + + meck:unload(emqx_ft_storage_fs). + +t_assemble_timeout(Config) -> + C = ?config(client, Config), + ClientId = ?config(clientid, Config), + + SleepForever = fun() -> + Ref = make_ref(), + receive + Ref -> ok + end + end, + + ok = meck:new(emqx_ft_storage, [passthrough]), + ok = meck:expect(emqx_ft_storage, assemble, fun(_, _, _) -> + {async, spawn_link(SleepForever)} + end), + + {Time, Res} = timer:tc( + fun() -> + emqx_ft_test_helpers:fin_result( + mode(Config), ClientId, C, <<"$file/someid/fin/9999999">> + ) + end + ), + + ok = meck:unload(emqx_ft_storage), + + ?assertEqual( + {error, unspecified_error}, + Res + ), + + ?assert(2_000_000 < Time). + +%%-------------------------------------------------------------------- +%% Cluster tests +%%-------------------------------------------------------------------- + t_switch_node(Config) -> [Node | _] = ?config(cluster_nodes, Config), AdditionalNodePort = emqx_ft_test_helpers:tcp_port(Node), @@ -471,11 +554,11 @@ t_switch_node(Config) -> ?assertRCName( success, - emqtt:publish(C1, mk_init_topic(FileId), encode_meta(Meta), 1) + emqtt:publish(C1, mk_init_topic(Config, FileId), encode_meta(Meta), 1) ), ?assertRCName( success, - emqtt:publish(C1, mk_segment_topic(FileId, Offset0), Data0, 1) + emqtt:publish(C1, mk_segment_topic(Config, FileId, Offset0), Data0, 1) ), %% Then, switch the client to the main node @@ -487,16 +570,16 @@ t_switch_node(Config) -> ?assertRCName( success, - emqtt:publish(C2, mk_segment_topic(FileId, Offset1), Data1, 1) + emqtt:publish(C2, mk_segment_topic(Config, FileId, Offset1), Data1, 1) ), ?assertRCName( success, - emqtt:publish(C2, mk_segment_topic(FileId, Offset2), Data2, 1) + emqtt:publish(C2, mk_segment_topic(Config, FileId, Offset2), Data2, 1) ), ?assertRCName( success, - emqtt:publish(C2, mk_fin_topic(FileId, Filesize), <<>>, 1) + emqtt:publish(C2, mk_fin_topic(Config, FileId, Filesize), <<>>, 1) ), ok = emqtt:stop(C2), @@ -509,17 +592,6 @@ t_switch_node(Config) -> read_export(Export) ). -t_assemble_crash(Config) -> - C = ?config(client, Config), - - meck:new(emqx_ft_storage_fs), - meck:expect(emqx_ft_storage_fs, assemble, fun(_, _, _, _) -> meck:exception(error, oops) end), - - ?assertRCName( - unspecified_error, - emqtt:publish(C, <<"$file/someid/fin">>, <<>>, 1) - ). - t_unreliable_migrating_client(Config) -> NodeSelf = node(), [Node1, Node2] = ?config(cluster_nodes, Config), @@ -543,10 +615,10 @@ t_unreliable_migrating_client(Config) -> {fun connect_mqtt_client/2, [NodeSelf]}, % Send filemeta and 3 initial segments % (assuming client chose 100 bytes as a desired segment size) - {fun send_filemeta/2, [Meta]}, - {fun send_segment/3, [0, 100]}, - {fun send_segment/3, [100, 100]}, - {fun send_segment/3, [200, 100]}, + {fun send_filemeta/3, [Config, Meta]}, + {fun send_segment/4, [Config, 0, 100]}, + {fun send_segment/4, [Config, 100, 100]}, + {fun send_segment/4, [Config, 200, 100]}, % Disconnect the client cleanly {fun stop_mqtt_client/1, []}, % Connect to the broker on `Node1` @@ -555,27 +627,27 @@ t_unreliable_migrating_client(Config) -> % Client forgot the state for some reason and started the transfer again. % (assuming this is usual for a client on a device that was rebooted) {fun connect_mqtt_client/2, [Node2]}, - {fun send_filemeta/2, [Meta]}, + {fun send_filemeta/3, [Config, Meta]}, % This time it chose 200 bytes as a segment size - {fun send_segment/3, [0, 200]}, - {fun send_segment/3, [200, 200]}, + {fun send_segment/4, [Config, 0, 200]}, + {fun send_segment/4, [Config, 200, 200]}, % But now it downscaled back to 100 bytes segments - {fun send_segment/3, [400, 100]}, + {fun send_segment/4, [Config, 400, 100]}, % Client lost connectivity and reconnected % (also had last few segments unacked and decided to resend them) {fun connect_mqtt_client/2, [Node2]}, - {fun send_segment/3, [200, 200]}, - {fun send_segment/3, [400, 200]}, + {fun send_segment/4, [Config, 200, 200]}, + {fun send_segment/4, [Config, 400, 200]}, % Client lost connectivity and reconnected, this time to another node % (also had last segment unacked and decided to resend it) {fun connect_mqtt_client/2, [Node1]}, - {fun send_segment/3, [400, 200]}, - {fun send_segment/3, [600, eof]}, - {fun send_finish/1, []}, + {fun send_segment/4, [Config, 400, 200]}, + {fun send_segment/4, [Config, 600, eof]}, + {fun send_finish/2, [Config]}, % Client lost connectivity and reconnected, this time to the current node % (client had `fin` unacked and decided to resend it) {fun connect_mqtt_client/2, [NodeSelf]}, - {fun send_finish/1, []} + {fun send_finish/2, [Config]} ], _Context = run_commands(Commands, Context), @@ -621,8 +693,8 @@ t_concurrent_fins(Config) -> Context1 = run_commands( [ {fun connect_mqtt_client/2, [Node1]}, - {fun send_filemeta/2, [Meta]}, - {fun send_segment/3, [0, 100]}, + {fun send_filemeta/3, [Config, Meta]}, + {fun send_segment/4, [Config, 0, 100]}, {fun stop_mqtt_client/1, []} ], Context0 @@ -634,7 +706,7 @@ t_concurrent_fins(Config) -> run_commands( [ {fun connect_mqtt_client/2, [Node]}, - {fun send_finish/1, []} + {fun send_finish/2, [Config]} ], Context1 ) @@ -708,14 +780,16 @@ disown_mqtt_client(Context = #{client := Client}) -> disown_mqtt_client(Context = #{}) -> Context. -send_filemeta(Meta, Context = #{client := Client, fileid := FileId}) -> +send_filemeta(Config, Meta, Context = #{client := Client, fileid := FileId}) -> ?assertRCName( success, - emqtt:publish(Client, mk_init_topic(FileId), encode_meta(Meta), 1) + emqtt:publish(Client, mk_init_topic(Config, FileId), encode_meta(Meta), 1) ), Context. -send_segment(Offset, Size, Context = #{client := Client, fileid := FileId, payload := Payload}) -> +send_segment( + Config, Offset, Size, Context = #{client := Client, fileid := FileId, payload := Payload} +) -> Data = case Size of eof -> @@ -725,14 +799,14 @@ send_segment(Offset, Size, Context = #{client := Client, fileid := FileId, paylo end, ?assertRCName( success, - emqtt:publish(Client, mk_segment_topic(FileId, Offset), Data, 1) + emqtt:publish(Client, mk_segment_topic(Config, FileId, Offset), Data, 1) ), Context. -send_finish(Context = #{client := Client, fileid := FileId, filesize := Filesize}) -> +send_finish(Config, Context = #{client := Client, fileid := FileId, filesize := Filesize}) -> ?assertRCName( success, - emqtt:publish(Client, mk_fin_topic(FileId, Filesize), <<>>, 1) + emqtt:publish(Client, mk_fin_topic(Config, FileId, Filesize), <<>>, 1) ), Context. @@ -749,23 +823,30 @@ fs_exported_file_attributes(FSExports) -> lists:sort(FSExports) ). -mk_init_topic(FileId) -> - <<"$file/", FileId/binary, "/init">>. +mk_init_topic(Config, FileId) -> + RequestTopicPrefix = request_topic_prefix(Config, FileId), + <>. -mk_segment_topic(FileId, Offset) when is_integer(Offset) -> - mk_segment_topic(FileId, integer_to_binary(Offset)); -mk_segment_topic(FileId, Offset) when is_binary(Offset) -> - <<"$file/", FileId/binary, "/", Offset/binary>>. +mk_segment_topic(Config, FileId, Offset) when is_integer(Offset) -> + mk_segment_topic(Config, FileId, integer_to_binary(Offset)); +mk_segment_topic(Config, FileId, Offset) when is_binary(Offset) -> + RequestTopicPrefix = request_topic_prefix(Config, FileId), + <>. -mk_segment_topic(FileId, Offset, Checksum) when is_integer(Offset) -> - mk_segment_topic(FileId, integer_to_binary(Offset), Checksum); -mk_segment_topic(FileId, Offset, Checksum) when is_binary(Offset) -> - <<"$file/", FileId/binary, "/", Offset/binary, "/", Checksum/binary>>. +mk_segment_topic(Config, FileId, Offset, Checksum) when is_integer(Offset) -> + mk_segment_topic(Config, FileId, integer_to_binary(Offset), Checksum); +mk_segment_topic(Config, FileId, Offset, Checksum) when is_binary(Offset) -> + RequestTopicPrefix = request_topic_prefix(Config, FileId), + <>. -mk_fin_topic(FileId, Size) when is_integer(Size) -> - mk_fin_topic(FileId, integer_to_binary(Size)); -mk_fin_topic(FileId, Size) when is_binary(Size) -> - <<"$file/", FileId/binary, "/fin/", Size/binary>>. +mk_fin_topic(Config, FileId, Size) when is_integer(Size) -> + mk_fin_topic(Config, FileId, integer_to_binary(Size)); +mk_fin_topic(Config, FileId, Size) when is_binary(Size) -> + RequestTopicPrefix = request_topic_prefix(Config, FileId), + <>. + +request_topic_prefix(Config, FileId) -> + emqx_ft_test_helpers:request_topic_prefix(mode(Config), FileId). with_offsets(Items) -> {List, _} = lists:mapfoldl( @@ -799,3 +880,17 @@ list_files(ClientId) -> read_export(#{path := AbsFilepath}) -> % TODO: only works for the local filesystem exporter right now file:read_file(AbsFilepath). + +set_client_specific_ft_dirs(ClientId, Config) -> + FTRoot = emqx_ft_test_helpers:ft_root(Config), + ok = emqx_config:put( + [file_transfer, storage, local, segments, root], + filename:join([FTRoot, ClientId, segments]) + ), + ok = emqx_config:put( + [file_transfer, storage, local, exporter, local, root], + filename:join([FTRoot, ClientId, exports]) + ). + +mode(Config) -> + proplists:get_value(mode, Config, sync). diff --git a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl index ae8a5c01c..092927d70 100644 --- a/apps/emqx_ft/test/emqx_ft_api_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_api_SUITE.erl @@ -85,7 +85,7 @@ t_list_files(Config) -> FileId = <<"f1">>, Node = lists:last(test_nodes(Config)), - ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, Node), + ok = emqx_ft_test_helpers:upload_file(sync, ClientId, FileId, "f1", <<"data">>, Node), {ok, 200, #{<<"files">> := Files}} = request_json(get, uri(["file_transfer", "files"]), Config), @@ -114,7 +114,7 @@ t_download_transfer(Config) -> Nodes = [Node | _] = test_nodes(Config), NodeUpload = lists:last(Nodes), - ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, NodeUpload), + ok = emqx_ft_test_helpers:upload_file(sync, ClientId, FileId, "f1", <<"data">>, NodeUpload), ?assertMatch( {ok, 400, #{<<"code">> := <<"BAD_REQUEST">>}}, @@ -185,7 +185,7 @@ t_list_files_paging(Config) -> ], ok = lists:foreach( fun({FileId, Name, Node}) -> - ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, Name, <<"data">>, Node) + ok = emqx_ft_test_helpers:upload_file(sync, ClientId, FileId, Name, <<"data">>, Node) end, Uploads ), diff --git a/apps/emqx_ft/test/emqx_ft_async_reply_SUITE.erl b/apps/emqx_ft/test/emqx_ft_async_reply_SUITE.erl index 78a9b371c..daa83de74 100644 --- a/apps/emqx_ft/test/emqx_ft_async_reply_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_async_reply_SUITE.erl @@ -55,7 +55,7 @@ t_register(_Config) -> PacketId = 1, MRef = make_ref(), TRef = make_ref(), - ok = emqx_ft_async_reply:register(PacketId, MRef, TRef), + ok = emqx_ft_async_reply:register(PacketId, MRef, TRef, somedata), ?assertEqual( undefined, @@ -68,7 +68,7 @@ t_register(_Config) -> ), ?assertEqual( - {ok, PacketId, TRef}, + {ok, PacketId, TRef, somedata}, emqx_ft_async_reply:take_by_mref(MRef) ). @@ -76,7 +76,7 @@ t_process_independence(_Config) -> PacketId = 1, MRef = make_ref(), TRef = make_ref(), - ok = emqx_ft_async_reply:register(PacketId, MRef, TRef), + ok = emqx_ft_async_reply:register(PacketId, MRef, TRef, somedata), Self = self(), @@ -112,10 +112,10 @@ t_take(_Config) -> PacketId = 1, MRef = make_ref(), TRef = make_ref(), - ok = emqx_ft_async_reply:register(PacketId, MRef, TRef), + ok = emqx_ft_async_reply:register(PacketId, MRef, TRef, somedata), ?assertEqual( - {ok, PacketId, TRef}, + {ok, PacketId, TRef, somedata}, emqx_ft_async_reply:take_by_mref(MRef) ), @@ -135,12 +135,12 @@ t_cleanup(_Config) -> TRef0 = make_ref(), MRef1 = make_ref(), TRef1 = make_ref(), - ok = emqx_ft_async_reply:register(PacketId, MRef0, TRef0), + ok = emqx_ft_async_reply:register(PacketId, MRef0, TRef0, somedata0), Self = self(), Pid = spawn_link(fun() -> - ok = emqx_ft_async_reply:register(PacketId, MRef1, TRef1), + ok = emqx_ft_async_reply:register(PacketId, MRef1, TRef1, somedata1), receive kickoff -> ?assertEqual( @@ -149,7 +149,7 @@ t_cleanup(_Config) -> ), ?assertEqual( - {ok, PacketId, TRef1}, + {ok, PacketId, TRef1, somedata1}, emqx_ft_async_reply:take_by_mref(MRef1) ), diff --git a/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl index 0acdea213..8ce282f6d 100644 --- a/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_conf_SUITE.erl @@ -39,10 +39,10 @@ init_per_testcase(Case, Config) -> ], #{work_dir => emqx_cth_suite:work_dir(Case, Config)} ), - [{suite_apps, Apps} | Config]. + [{apps, Apps} | Config]. end_per_testcase(_Case, Config) -> - ok = emqx_cth_suite:stop(?config(suite_apps, Config)), + ok = emqx_cth_suite:stop(?config(apps, Config)), ok. %%-------------------------------------------------------------------- diff --git a/apps/emqx_ft/test/emqx_ft_request_SUITE.erl b/apps/emqx_ft/test/emqx_ft_request_SUITE.erl new file mode 100644 index 000000000..b21917093 --- /dev/null +++ b/apps/emqx_ft/test/emqx_ft_request_SUITE.erl @@ -0,0 +1,113 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_request_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("stdlib/include/assert.hrl"). + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + Apps = emqx_cth_suite:start( + [ + {emqx_ft, "file_transfer { enable = true, assemble_timeout = 1s}"} + ], + #{work_dir => ?config(priv_dir, Config)} + ), + [{suite_apps, Apps} | Config]. + +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(suite_apps, Config)), + ok. + +init_per_testcase(_Case, Config) -> + Config. + +end_per_testcase(_Case, _Config) -> + ok. + +%%------------------------------------------------------------------- +%% Tests +%%------------------------------------------------------------------- + +t_upload_via_requests(_Config) -> + C = emqx_ft_test_helpers:start_client(<<"client">>), + + FileId = <<"f1">>, + Data = <<"hello world">>, + Size = byte_size(Data), + Meta = #{ + name => "test.txt", + expire_at => erlang:system_time(_Unit = second) + 3600, + size => Size + }, + MetaPayload = emqx_utils_json:encode(emqx_ft:encode_filemeta(Meta)), + MetaTopic = <<"$file/", FileId/binary, "/init">>, + + ?assertMatch( + {ok, #{<<"reason_code">> := 0, <<"topic">> := MetaTopic}}, + request(C, MetaTopic, MetaPayload) + ), + + SegmentTopic = <<"$file/", FileId/binary, "/0">>, + + ?assertMatch( + {ok, #{<<"reason_code">> := 0, <<"topic">> := SegmentTopic}}, + request(C, SegmentTopic, Data) + ), + + FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Size))/binary>>, + + ?assertMatch( + {ok, #{<<"reason_code">> := 0, <<"topic">> := FinTopic}}, + request(C, FinTopic, <<>>) + ). + +%%-------------------------------------------------------------------- +%% Helper functions +%%-------------------------------------------------------------------- + +request(C, Topic, Request) -> + CorrelaionData = emqx_ft_test_helpers:unique_binary_string(), + ResponseTopic = emqx_ft_test_helpers:unique_binary_string(), + + Properties = #{ + 'Correlation-Data' => CorrelaionData, + 'Response-Topic' => ResponseTopic + }, + Opts = [{qos, 1}], + + {ok, _, _} = emqtt:subscribe(C, ResponseTopic, 1), + {ok, _} = emqtt:publish(C, Topic, Properties, Request, Opts), + + try + receive + {publish, #{ + topic := ResponseTopic, + payload := Payload, + properties := #{'Correlation-Data' := CorrelaionData} + }} -> + {ok, emqx_utils_json:decode(Payload)} + after 1000 -> + {error, timeout} + end + after + emqtt:unsubscribe(C, ResponseTopic) + end. diff --git a/apps/emqx_ft/test/emqx_ft_storage_exporter_s3_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_exporter_s3_SUITE.erl index 9e6050f36..90da824ef 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_exporter_s3_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_exporter_s3_SUITE.erl @@ -38,25 +38,23 @@ init_per_suite(Config) -> end_per_suite(_Config) -> ok. -set_special_configs(Config) -> - fun - (emqx_ft) -> - Storage = emqx_ft_test_helpers:local_storage(Config, #{ - exporter => s3, bucket_name => ?config(bucket_name, Config) - }), - emqx_ft_test_helpers:load_config(#{<<"enable">> => true, <<"storage">> => Storage}); - (_) -> - ok - end. - -init_per_testcase(Case, Config0) -> +init_per_testcase(Case, Config) -> ClientId = atom_to_binary(Case), BucketName = create_bucket(), - Config1 = [{bucket_name, BucketName}, {clientid, ClientId} | Config0], - ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_ft], set_special_configs(Config1)), - Config1. -end_per_testcase(_Case, _Config) -> - ok = emqx_common_test_helpers:stop_apps([emqx_ft, emqx_conf]), + Storage = emqx_ft_test_helpers:local_storage(Config, #{ + exporter => s3, bucket_name => BucketName + }), + WorkDir = filename:join(?config(priv_dir, Config), atom_to_list(Case)), + Apps = emqx_cth_suite:start( + [ + emqx_conf, + {emqx_ft, #{config => emqx_ft_test_helpers:config(Storage)}} + ], + #{work_dir => WorkDir} + ), + [{apps, Apps}, {bucket_name, BucketName}, {clientid, ClientId} | Config]. +end_per_testcase(_Case, Config) -> + ok = emqx_cth_suite:stop(?config(apps, Config)), ok. %%-------------------------------------------------------------------- diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl index 52d372e63..6ca158833 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_fs_SUITE.erl @@ -81,8 +81,8 @@ end_per_group(_Group, _Config) -> t_multinode_exports(Config) -> [Node1, Node2 | _] = ?config(cluster, Config), - ok = emqx_ft_test_helpers:upload_file(<<"c/1">>, <<"f:1">>, "fn1", <<"data">>, Node1), - ok = emqx_ft_test_helpers:upload_file(<<"c/2">>, <<"f:2">>, "fn2", <<"data">>, Node2), + ok = emqx_ft_test_helpers:upload_file(sync, <<"c/1">>, <<"f:1">>, "fn1", <<"data">>, Node1), + ok = emqx_ft_test_helpers:upload_file(sync, <<"c/2">>, <<"f:2">>, "fn2", <<"data">>, Node2), ?assertMatch( [ #{transfer := {<<"c/1">>, <<"f:1">>}, name := "fn1"}, diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_reader_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_reader_SUITE.erl index 217205f6f..f0b658e0d 100644 --- a/apps/emqx_ft/test/emqx_ft_storage_fs_reader_SUITE.erl +++ b/apps/emqx_ft/test/emqx_ft_storage_fs_reader_SUITE.erl @@ -25,11 +25,18 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> - ok = emqx_common_test_helpers:start_apps([emqx_ft], emqx_ft_test_helpers:env_handler(Config)), - Config. + WorkDir = ?config(priv_dir, Config), + Storage = emqx_ft_test_helpers:local_storage(Config), + Apps = emqx_cth_suite:start( + [ + {emqx_ft, #{config => emqx_ft_test_helpers:config(Storage)}} + ], + #{work_dir => WorkDir} + ), + [{suite_apps, Apps} | Config]. -end_per_suite(_Config) -> - ok = emqx_common_test_helpers:stop_apps([emqx_ft]), +end_per_suite(Config) -> + ok = emqx_cth_suite:stop(?config(suite_apps, Config)), ok. init_per_testcase(_Case, Config) -> diff --git a/apps/emqx_ft/test/emqx_ft_test_helpers.erl b/apps/emqx_ft/test/emqx_ft_test_helpers.erl index 9e69118c8..efcaa3048 100644 --- a/apps/emqx_ft/test/emqx_ft_test_helpers.erl +++ b/apps/emqx_ft/test/emqx_ft_test_helpers.erl @@ -24,16 +24,15 @@ -define(S3_HOST, <<"minio">>). -define(S3_PORT, 9000). -env_handler(Config) -> - fun - (emqx_ft) -> - load_config(#{<<"enable">> => true, <<"storage">> => local_storage(Config)}); - (_) -> - ok - end. - config(Storage) -> - #{<<"file_transfer">> => #{<<"enable">> => true, <<"storage">> => Storage}}. + config(Storage, #{}). + +config(Storage, FTOptions0) -> + FTOptions1 = maps:merge( + #{<<"enable">> => true, <<"storage">> => Storage}, + FTOptions0 + ), + #{<<"file_transfer">> => FTOptions1}. local_storage(Config) -> local_storage(Config, #{exporter => local}). @@ -73,7 +72,13 @@ tcp_port(Node) -> Port. root(Config, Node, Tail) -> - iolist_to_binary(filename:join([?config(priv_dir, Config), "file_transfer", Node | Tail])). + iolist_to_binary(filename:join([ft_root(Config), Node | Tail])). + +ft_root(Config) -> + filename:join([?config(priv_dir, Config), "file_transfer"]). + +cleanup_ft_root(Config) -> + file:del_dir_r(emqx_ft_test_helpers:ft_root(Config)). start_client(ClientId) -> start_client(ClientId, node()). @@ -85,11 +90,15 @@ start_client(ClientId, Node) -> Client. upload_file(ClientId, FileId, Name, Data) -> - upload_file(ClientId, FileId, Name, Data, node()). + upload_file(sync, ClientId, FileId, Name, Data). -upload_file(ClientId, FileId, Name, Data, Node) -> +upload_file(Mode, ClientId, FileId, Name, Data) -> + upload_file(Mode, ClientId, FileId, Name, Data, node()). + +upload_file(Mode, ClientId, FileId, Name, Data, Node) -> C1 = start_client(ClientId, Node), + ReqTopicPrefix = request_topic_prefix(Mode, FileId), Size = byte_size(Data), Meta = #{ name => Name, @@ -98,25 +107,53 @@ upload_file(ClientId, FileId, Name, Data, Node) -> }, MetaPayload = emqx_utils_json:encode(emqx_ft:encode_filemeta(Meta)), - ct:pal("MetaPayload = ~ts", [MetaPayload]), - - MetaTopic = <<"$file/", FileId/binary, "/init">>, + MetaTopic = <>, {ok, #{reason_code_name := success}} = emqtt:publish(C1, MetaTopic, MetaPayload, 1), {ok, #{reason_code_name := success}} = emqtt:publish( - C1, <<"$file/", FileId/binary, "/0">>, Data, 1 + C1, <>, Data, 1 ), - FinTopic = <<"$file/", FileId/binary, "/fin/", (integer_to_binary(Size))/binary>>, - FinResult = - case emqtt:publish(C1, FinTopic, <<>>, 1) of - {ok, #{reason_code_name := success}} -> - ok; - {ok, #{reason_code_name := Error}} -> - {error, Error} - end, + FinTopic = <>, + FinResult = fin_result(Mode, ClientId, C1, FinTopic), ok = emqtt:stop(C1), FinResult. +fin_result(Mode, ClientId, C, FinTopic) -> + {ok, _, _} = emqtt:subscribe(C, response_topic(ClientId), 1), + case emqtt:publish(C, FinTopic, <<>>, 1) of + {ok, #{reason_code_name := success}} -> + maybe_wait_for_assemble(Mode, ClientId, FinTopic); + {ok, #{reason_code_name := Error}} -> + {error, Error} + end. + +maybe_wait_for_assemble(sync, _ClientId, _FinTopic) -> + ok; +maybe_wait_for_assemble(async, ClientId, FinTopic) -> + ResponseTopic = response_topic(ClientId), + receive + {publish, #{payload := Payload, topic := ResponseTopic}} -> + case emqx_utils_json:decode(Payload) of + #{<<"topic">> := FinTopic, <<"reason_code">> := 0} -> + ok; + #{<<"topic">> := FinTopic, <<"reason_code">> := Code} -> + {error, emqx_reason_codes:name(Code)}; + _ -> + maybe_wait_for_assemble(async, ClientId, FinTopic) + end + end. + +response_topic(ClientId) -> + <<"$file-response/", (to_bin(ClientId))/binary>>. + +request_topic_prefix(sync, FileId) -> + <<"$file/", (to_bin(FileId))/binary>>; +request_topic_prefix(async, FileId) -> + <<"$file-async/", (to_bin(FileId))/binary>>. + +to_bin(Val) -> + iolist_to_binary(Val). + aws_config() -> emqx_s3_test_helpers:aws_config(tcp, binary_to_list(?S3_HOST), ?S3_PORT). @@ -129,3 +166,6 @@ pem_privkey() -> "ju0VBj6tOX1y6C0U+85VOM0UU5xqvw==\n" "-----END EC PRIVATE KEY-----\n" >>. + +unique_binary_string() -> + emqx_guid:to_hexstr(emqx_guid:gen()). diff --git a/changes/ee/feat-11541.en.md b/changes/ee/feat-11541.en.md new file mode 100644 index 000000000..dee06609d --- /dev/null +++ b/changes/ee/feat-11541.en.md @@ -0,0 +1,3 @@ +Introduced additional way of file transfer interactions. Now client may send file transfer commands to `$file-async/...` topic instead of `$file/...` and receive command execution results as messages to `$file-response/{clientId}` topic. +This simplifies file transfer feature usage in certain cases, for example, when a client uses MQTTv3 or when the broker is behind an MQTT bridge. +See the [EIP-0021](https://github.com/emqx/eip) for more details.