diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index bc9154933..712b2bb99 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -63,7 +63,8 @@ emqx_psk_schema, emqx_limiter_schema, emqx_slow_subs_schema, - emqx_mgmt_api_key_schema + emqx_mgmt_api_key_schema, + emqx_ft_schema ]). %% root config should not have a namespace diff --git a/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf b/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf new file mode 100644 index 000000000..85fb81cfe --- /dev/null +++ b/apps/emqx_ft/i18n/emqx_ft_schema_i18n.conf @@ -0,0 +1,14 @@ +emqx_ft_schema { + + local { + desc { + en: "Use local file system to store uploaded files and temporary data." + zh: "使用本地文件系统来存储上传的文件和临时数据。" + } + label: { + en: "Local Storage" + zh: "本地存储" + } + } + +} diff --git a/apps/emqx_ft/src/emqx_ft.erl b/apps/emqx_ft/src/emqx_ft.erl index 014c74ac3..a286a6186 100644 --- a/apps/emqx_ft/src/emqx_ft.erl +++ b/apps/emqx_ft/src/emqx_ft.erl @@ -41,9 +41,13 @@ -export([on_assemble_timeout/1]). --export_type([clientid/0]). --export_type([transfer/0]). --export_type([offset/0]). +-export_type([ + clientid/0, + transfer/0, + offset/0, + filemeta/0, + segment/0 +]). %% Number of bytes -type bytes() :: non_neg_integer(). @@ -55,6 +59,26 @@ -type transfer() :: {clientid(), fileid()}. -type offset() :: bytes(). +-type filemeta() :: #{ + %% Display name + name := string(), + %% Size in bytes, as advertised by the client. + %% Client is free to specify here whatever it wants, which means we can end + %% up with a file of different size after assembly. It's not clear from + %% specification what that means (e.g. what are clients' expectations), we + %% currently do not condider that an error (or, specifically, a signal that + %% the resulting file is corrupted during transmission). + size => _Bytes :: non_neg_integer(), + checksum => {sha256, <<_:256>>}, + expire_at := emqx_datetime:epoch_second(), + %% TTL of individual segments + %% Somewhat confusing that we won't know it on the nodes where the filemeta + %% is missing. + segments_ttl => _Seconds :: pos_integer() +}. + +-type segment() :: {offset(), _Content :: binary()}. + -type ft_data() :: #{ nodes := list(node()) }. @@ -135,23 +159,9 @@ on_message_puback(PacketId, #message{topic = Topic} = Msg, _PubRes, _RC) -> end. %%-------------------------------------------------------------------- -%% Private funs +%% Handlers for transfer messages %%-------------------------------------------------------------------- -get_ft_data(ChanPid) -> - case ets:lookup(?FT_TAB, ChanPid) of - [#emqx_ft{ft_data = FTData}] -> {ok, FTData}; - [] -> none - end. - -delete_ft_data(ChanPid) -> - true = ets:delete(?FT_TAB, ChanPid), - ok. - -put_ft_data(ChanPid, FTData) -> - true = ets:insert(?FT_TAB, #emqx_ft{chan_pid = ChanPid, ft_data = FTData}), - ok. - on_file_command(PacketId, Msg, FileCommand) -> case string:split(FileCommand, <<"/">>, all) of [FileId, <<"init">>] -> @@ -176,12 +186,16 @@ on_init(Msg, FileId) -> mqtt_msg => Msg, file_id => FileId }), - % Payload = Msg#message.payload, + Payload = Msg#message.payload, % %% Add validations here - % Meta = emqx_json:decode(Payload, [return_maps]), - % ok = emqx_ft_storage_fs:store_filemeta(storage(), transfer(Msg, FileId), Meta), - % ?RC_SUCCESS. - ?RC_UNSPECIFIED_ERROR. + Meta = emqx_json:decode(Payload, [return_maps]), + case emqx_ft_storage:store_filemeta(transfer(Msg, FileId), Meta) of + {ok, Ctx} -> + ok = put_context(Ctx), + ?RC_SUCCESS; + {error, _Reason} -> + ?RC_UNSPECIFIED_ERROR + end. on_abort(_Msg, _FileId) -> %% TODO @@ -195,16 +209,17 @@ on_segment(Msg, FileId, Offset, Checksum) -> offset => Offset, checksum => Checksum }), - % %% TODO: handle checksum - % Payload = Msg#message.payload, - % %% Add offset/checksum validations - % ok = emqx_ft_storage_fs:store_segment( - % storage(), - % transfer(Msg, FileId), - % {binary_to_integer(Offset), Payload} - % ), - % ?RC_SUCCESS. - ?RC_UNSPECIFIED_ERROR. + %% TODO: handle checksum + Payload = Msg#message.payload, + Segment = {binary_to_integer(Offset), Payload}, + %% Add offset/checksum validations + case emqx_ft_storage:store_segment(get_context(), transfer(Msg, FileId), Segment) of + {ok, Ctx} -> + ok = put_context(Ctx), + ?RC_SUCCESS; + {error, _Reason} -> + ?RC_UNSPECIFIED_ERROR + end. on_fin(PacketId, Msg, FileId, Checksum) -> ?SLOG(info, #{ @@ -227,7 +242,7 @@ on_fin(PacketId, Msg, FileId, Checksum) -> Callback = callback(FinPacketKey, FileId), case assemble(transfer(Msg, FileId), Callback) of %% Assembling started, packet will be acked by the callback or the responder - ok -> + {ok, _} -> undefined; %% Assembling failed, unregister the packet key {error, _} -> @@ -250,16 +265,12 @@ on_fin(PacketId, Msg, FileId, Checksum) -> undefined end. -assemble(_Transfer, _Callback) -> - % spawn(fun() -> Callback({error, not_implemented}) end), - ok. - -% assemble(Transfer, Callback) -> -% emqx_ft_storage_fs:assemble( -% storage(), -% Transfer, -% Callback -% ). +assemble(Transfer, Callback) -> + emqx_ft_storage:assemble( + get_context(), + Transfer, + Callback + ). callback({ChanPid, PacketId} = Key, _FileId) -> fun(Result) -> @@ -281,9 +292,34 @@ transfer(Msg, FileId) -> {ClientId, FileId}. %% TODO: configure + storage() -> - filename:join(emqx:data_dir(), "file_transfer"). + emqx_config:get([file_transfer, storage]). on_assemble_timeout({ChanPid, PacketId}) -> ?SLOG(warning, #{msg => "on_assemble_timeout", packet_id => PacketId}), erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR}). + +%%-------------------------------------------------------------------- +%% Context management +%%-------------------------------------------------------------------- + +get_context() -> + get_ft_data(self()). + +put_context(Context) -> + put_ft_data(self(), Context). + +get_ft_data(ChanPid) -> + case ets:lookup(?FT_TAB, ChanPid) of + [#emqx_ft{ft_data = FTData}] -> {ok, FTData}; + [] -> none + end. + +delete_ft_data(ChanPid) -> + true = ets:delete(?FT_TAB, ChanPid), + ok. + +put_ft_data(ChanPid, FTData) -> + true = ets:insert(?FT_TAB, #emqx_ft{chan_pid = ChanPid, ft_data = FTData}), + ok. diff --git a/apps/emqx_ft/src/emqx_ft_app.erl b/apps/emqx_ft/src/emqx_ft_app.erl index 4778da1a1..9b1513b46 100644 --- a/apps/emqx_ft/src/emqx_ft_app.erl +++ b/apps/emqx_ft/src/emqx_ft_app.erl @@ -23,8 +23,10 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqx_ft_sup:start_link(), ok = emqx_ft:hook(), + ok = emqx_ft_conf:load(), {ok, Sup}. stop(_State) -> + ok = emqx_ft_conf:unload(), ok = emqx_ft:unhook(), ok. diff --git a/apps/emqx_ft/src/emqx_ft_conf.erl b/apps/emqx_ft/src/emqx_ft_conf.erl new file mode 100644 index 000000000..b88fd2532 --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_conf.erl @@ -0,0 +1,65 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc File Transfer configuration management module + +-module(emqx_ft_conf). + +-behaviour(emqx_config_handler). + +%% Load/Unload +-export([ + load/0, + unload/0 +]). + +%% callbacks for emqx_config_handler +-export([ + pre_config_update/3, + post_config_update/5 +]). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +-spec load() -> ok. +load() -> + emqx_conf:add_handler([file_transfer], ?MODULE). + +-spec unload() -> ok. +unload() -> + emqx_conf:remove_handler([file_transfer]). + +%%-------------------------------------------------------------------- +%% emqx_config_handler callbacks +%%-------------------------------------------------------------------- + +-spec pre_config_update(list(atom()), emqx_config:update_request(), emqx_config:raw_config()) -> + {ok, emqx_config:update_request()} | {error, term()}. +pre_config_update(_, _Req, Config) -> + {ok, Config}. + +-spec post_config_update( + list(atom()), + emqx_config:update_request(), + emqx_config:config(), + emqx_config:config(), + emqx_config:app_envs() +) -> + ok | {ok, Result :: any()} | {error, Reason :: term()}. +post_config_update(_, _Req, _NewConfig, _OldConfig, _AppEnvs) -> + ok. diff --git a/apps/emqx_ft/src/emqx_ft_responder.erl b/apps/emqx_ft/src/emqx_ft_responder.erl index dcb45d5d3..f58569a27 100644 --- a/apps/emqx_ft/src/emqx_ft_responder.erl +++ b/apps/emqx_ft/src/emqx_ft_responder.erl @@ -79,6 +79,7 @@ handle_call({register, Key, DefaultAction, Timeout}, _From, State) -> {reply, {error, already_registered}, State} end; handle_call({unregister, Key}, _From, State) -> + ?SLOG(warning, #{msg => "unregister", key => Key}), case ets:lookup(?TAB, Key) of [] -> {reply, {error, not_found}, State}; diff --git a/apps/emqx_ft/src/emqx_ft_schema.erl b/apps/emqx_ft/src/emqx_ft_schema.erl new file mode 100644 index 000000000..f40d2f40e --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_schema.erl @@ -0,0 +1,49 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_schema). + +-behaviour(hocon_schema). + +-include_lib("hocon/include/hoconsc.hrl"). +-include_lib("typerefl/include/types.hrl"). + +-export([namespace/0, roots/0, fields/1, tags/0]). + +namespace() -> file_transfer. + +tags() -> + [<<"File Transfer">>]. + +roots() -> [file_transfer]. + +fields(file_transfer) -> + [ + {storage, #{ + type => hoconsc:union([ + hoconsc:ref(?MODULE, local_storage) + ]) + }} + ]; +fields(local_storage) -> + [ + {type, #{ + type => local, + default => local, + required => false, + desc => ?DESC("local") + }} + ]. diff --git a/apps/emqx_ft/src/emqx_ft_storage.erl b/apps/emqx_ft/src/emqx_ft_storage.erl new file mode 100644 index 000000000..5e945965d --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_storage.erl @@ -0,0 +1,75 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_storage). + +-export( + [ + store_filemeta/2, + store_segment/3, + assemble/3 + ] +). + +-type ctx() :: term(). +-type storage() :: emqx_config:config(). + +-export_type([assemble_callback/0]). + +-type assemble_callback() :: fun((ok | {error, term()}) -> any()). + +%%-------------------------------------------------------------------- +%% behaviour +%%-------------------------------------------------------------------- + +-callback store_filemeta(storage(), emqx_ft:transfer(), emqx_ft:filemeta()) -> + {ok, ctx()} | {error, term()}. +-callback store_segment(storage(), ctx(), emqx_ft:transfer(), emqx_ft:segment()) -> + {ok, ctx()} | {error, term()}. +-callback assemble(storage(), ctx(), emqx_ft:transfer(), assemble_callback()) -> + {ok, pid()} | {error, term()}. + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +-spec store_filemeta(emqx_ft:transfer(), emqx_ft:filemeta()) -> + {ok, ctx()} | {error, term()}. +store_filemeta(Transfer, FileMeta) -> + Mod = mod(), + Mod:store_filemeta(storage(), Transfer, FileMeta). + +-spec store_segment(ctx(), emqx_ft:transfer(), emqx_ft:segment()) -> + {ok, ctx()} | {error, term()}. +store_segment(Ctx, Transfer, Segment) -> + Mod = mod(), + Mod:store_segment(storage(), Ctx, Transfer, Segment). + +-spec assemble(ctx(), emqx_ft:transfer(), assemble_callback()) -> + {ok, pid()} | {error, term()}. +assemble(Ctx, Transfer, Callback) -> + Mod = mod(), + Mod:assemble(storage(), Ctx, Transfer, Callback). + +mod() -> + case storage() of + #{type := local} -> + % emqx_ft_storage_fs + emqx_ft_storage_dummy + end. + +storage() -> + emqx_config:get([file_transfer, storage]). diff --git a/apps/emqx_ft/src/emqx_ft_storage_dummy.erl b/apps/emqx_ft/src/emqx_ft_storage_dummy.erl new file mode 100644 index 000000000..1ab7f558e --- /dev/null +++ b/apps/emqx_ft/src/emqx_ft_storage_dummy.erl @@ -0,0 +1,35 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_storage_dummy). + +-behaviour(emqx_ft_storage). + +-export([ + store_filemeta/3, + store_segment/4, + assemble/4 +]). + +store_filemeta(_Storage, _Transfer, _Meta) -> + {ok, #{}}. + +store_segment(_Storage, Ctx, _Transfer, _Segment) -> + {ok, Ctx}. + +assemble(_Storage, _Ctx, _Transfer, Callback) -> + Pid = spawn(fun() -> Callback({error, not_implemented}) end), + {ok, Pid}. diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs.erl b/apps/emqx_ft/src/emqx_ft_storage_fs.erl index cce7cc19e..bbd61eba9 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs.erl @@ -19,10 +19,10 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -% -compile(export_all). +-behaviour(emqx_ft_storage). -export([store_filemeta/3]). --export([store_segment/3]). +-export([store_segment/4]). -export([list/2]). -export([read_segment/5]). -export([assemble/3]). @@ -32,45 +32,10 @@ -export([write/2]). -export([discard/1]). -% -behaviour(gen_server). -% -export([init/1]). -% -export([handle_call/3]). -% -export([handle_cast/2]). - --type json_value() :: - null - | boolean() - | binary() - | number() - | [json_value()] - | #{binary() => json_value()}. - --reflect_type([json_value/0]). - -type transfer() :: emqx_ft:transfer(). -type offset() :: emqx_ft:offset(). -%% TODO: move to `emqx_ft` interface module -% -type sha256_hex() :: <<_:512>>. - --type filemeta() :: #{ - %% Display name - name := string(), - %% Size in bytes, as advertised by the client. - %% Client is free to specify here whatever it wants, which means we can end - %% up with a file of different size after assembly. It's not clear from - %% specification what that means (e.g. what are clients' expectations), we - %% currently do not condider that an error (or, specifically, a signal that - %% the resulting file is corrupted during transmission). - size => _Bytes :: non_neg_integer(), - checksum => {sha256, <<_:256>>}, - expire_at := emqx_datetime:epoch_second(), - %% TTL of individual segments - %% Somewhat confusing that we won't know it on the nodes where the filemeta - %% is missing. - segments_ttl => _Seconds :: pos_integer(), - user_data => json_value() -}. +-type filemeta() :: emqx_ft:filemeta(). -type segment() :: {offset(), _Content :: binary()}. @@ -113,13 +78,14 @@ %% Atomic operation. -spec store_filemeta(storage(), transfer(), filemeta()) -> % Quota? Some lower level errors? - ok | {error, conflict} | {error, _TODO}. + {ok, emqx_ft_storage:ctx()} | {error, conflict} | {error, _TODO}. store_filemeta(Storage, Transfer, Meta) -> Filepath = mk_filepath(Storage, Transfer, ?MANIFEST), case read_file(Filepath, fun decode_filemeta/1) of {ok, Meta} -> _ = touch_file(Filepath), - ok; + %% No context is needed for this implementation. + {ok, #{}}; {ok, _Conflict} -> % TODO % We won't see conflicts in case of concurrent `store_filemeta` @@ -132,13 +98,18 @@ store_filemeta(Storage, Transfer, Meta) -> %% Store a segment in the backing filesystem. %% Atomic operation. --spec store_segment(storage(), transfer(), segment()) -> +-spec store_segment(storage(), emqx_ft_storage:ctx(), transfer(), segment()) -> % Where is the checksum gets verified? Upper level probably. % Quota? Some lower level errors? ok | {error, _TODO}. -store_segment(Storage, Transfer, Segment = {_Offset, Content}) -> +store_segment(Storage, Ctx, Transfer, Segment = {_Offset, Content}) -> Filepath = mk_filepath(Storage, Transfer, mk_segment_filename(Segment)), - write_file_atomic(Filepath, Content). + case write_file_atomic(Filepath, Content) of + ok -> + {ok, Ctx}; + {error, _} = Error -> + Error + end. -spec list(storage(), transfer()) -> % Some lower level errors? {error, notfound}? @@ -178,13 +149,10 @@ read_segment(_Storage, _Transfer, Segment, Offset, Size) -> end. -spec assemble(storage(), transfer(), fun((ok | {error, term()}) -> any())) -> - % {ok, _Assembler :: pid()} | {error, incomplete} | {error, badrpc} | {error, _TODO}. {ok, _Assembler :: pid()} | {error, _TODO}. -assemble(Storage, Transfer, Callback) -> +assemble(Storage, _Ctx, Transfer, Callback) -> emqx_ft_assembler_sup:start_child(Storage, Transfer, Callback). -%% - -type handle() :: {file:name(), io:device(), crypto:hash_state()}. -spec open_file(storage(), transfer(), filemeta()) -> @@ -268,8 +236,7 @@ schema() -> {size, hoconsc:mk(non_neg_integer())}, {expire_at, hoconsc:mk(non_neg_integer())}, {checksum, hoconsc:mk({atom(), binary()}, #{converter => converter(checksum)})}, - {segments_ttl, hoconsc:mk(pos_integer())}, - {user_data, hoconsc:mk(json_value())} + {segments_ttl, hoconsc:mk(pos_integer())} ] }. @@ -354,10 +321,8 @@ mk_filedir(Storage, {ClientId, FileId}) -> mk_filepath(Storage, Transfer, Filename) -> filename:join(mk_filedir(Storage, Transfer), Filename). -get_storage_root(Storage) -> - Storage. - -%% +get_storage_root(_Storage) -> + filename:join(emqx:data_dir(), "file_transfer"). -include_lib("kernel/include/file.hrl").