feat(ft): add config & backend behaviour

This commit is contained in:
Ilya Averyanov 2023-02-02 20:23:12 +02:00
parent f9078e8401
commit ac5fcfe9f1
10 changed files with 342 additions and 99 deletions

View File

@ -63,7 +63,8 @@
emqx_psk_schema,
emqx_limiter_schema,
emqx_slow_subs_schema,
emqx_mgmt_api_key_schema
emqx_mgmt_api_key_schema,
emqx_ft_schema
]).
%% root config should not have a namespace

View File

@ -0,0 +1,14 @@
emqx_ft_schema {
local {
desc {
en: "Use local file system to store uploaded files and temporary data."
zh: "使用本地文件系统来存储上传的文件和临时数据。"
}
label: {
en: "Local Storage"
zh: "本地存储"
}
}
}

View File

@ -41,9 +41,13 @@
-export([on_assemble_timeout/1]).
-export_type([clientid/0]).
-export_type([transfer/0]).
-export_type([offset/0]).
-export_type([
clientid/0,
transfer/0,
offset/0,
filemeta/0,
segment/0
]).
%% Number of bytes
-type bytes() :: non_neg_integer().
@ -55,6 +59,26 @@
-type transfer() :: {clientid(), fileid()}.
-type offset() :: bytes().
-type filemeta() :: #{
%% Display name
name := string(),
%% Size in bytes, as advertised by the client.
%% Client is free to specify here whatever it wants, which means we can end
%% up with a file of different size after assembly. It's not clear from
%% specification what that means (e.g. what are clients' expectations), we
%% currently do not condider that an error (or, specifically, a signal that
%% the resulting file is corrupted during transmission).
size => _Bytes :: non_neg_integer(),
checksum => {sha256, <<_:256>>},
expire_at := emqx_datetime:epoch_second(),
%% TTL of individual segments
%% Somewhat confusing that we won't know it on the nodes where the filemeta
%% is missing.
segments_ttl => _Seconds :: pos_integer()
}.
-type segment() :: {offset(), _Content :: binary()}.
-type ft_data() :: #{
nodes := list(node())
}.
@ -135,23 +159,9 @@ on_message_puback(PacketId, #message{topic = Topic} = Msg, _PubRes, _RC) ->
end.
%%--------------------------------------------------------------------
%% Private funs
%% Handlers for transfer messages
%%--------------------------------------------------------------------
get_ft_data(ChanPid) ->
case ets:lookup(?FT_TAB, ChanPid) of
[#emqx_ft{ft_data = FTData}] -> {ok, FTData};
[] -> none
end.
delete_ft_data(ChanPid) ->
true = ets:delete(?FT_TAB, ChanPid),
ok.
put_ft_data(ChanPid, FTData) ->
true = ets:insert(?FT_TAB, #emqx_ft{chan_pid = ChanPid, ft_data = FTData}),
ok.
on_file_command(PacketId, Msg, FileCommand) ->
case string:split(FileCommand, <<"/">>, all) of
[FileId, <<"init">>] ->
@ -176,12 +186,16 @@ on_init(Msg, FileId) ->
mqtt_msg => Msg,
file_id => FileId
}),
% Payload = Msg#message.payload,
Payload = Msg#message.payload,
% %% Add validations here
% Meta = emqx_json:decode(Payload, [return_maps]),
% ok = emqx_ft_storage_fs:store_filemeta(storage(), transfer(Msg, FileId), Meta),
% ?RC_SUCCESS.
?RC_UNSPECIFIED_ERROR.
Meta = emqx_json:decode(Payload, [return_maps]),
case emqx_ft_storage:store_filemeta(transfer(Msg, FileId), Meta) of
{ok, Ctx} ->
ok = put_context(Ctx),
?RC_SUCCESS;
{error, _Reason} ->
?RC_UNSPECIFIED_ERROR
end.
on_abort(_Msg, _FileId) ->
%% TODO
@ -195,16 +209,17 @@ on_segment(Msg, FileId, Offset, Checksum) ->
offset => Offset,
checksum => Checksum
}),
% %% TODO: handle checksum
% Payload = Msg#message.payload,
% %% Add offset/checksum validations
% ok = emqx_ft_storage_fs:store_segment(
% storage(),
% transfer(Msg, FileId),
% {binary_to_integer(Offset), Payload}
% ),
% ?RC_SUCCESS.
?RC_UNSPECIFIED_ERROR.
%% TODO: handle checksum
Payload = Msg#message.payload,
Segment = {binary_to_integer(Offset), Payload},
%% Add offset/checksum validations
case emqx_ft_storage:store_segment(get_context(), transfer(Msg, FileId), Segment) of
{ok, Ctx} ->
ok = put_context(Ctx),
?RC_SUCCESS;
{error, _Reason} ->
?RC_UNSPECIFIED_ERROR
end.
on_fin(PacketId, Msg, FileId, Checksum) ->
?SLOG(info, #{
@ -227,7 +242,7 @@ on_fin(PacketId, Msg, FileId, Checksum) ->
Callback = callback(FinPacketKey, FileId),
case assemble(transfer(Msg, FileId), Callback) of
%% Assembling started, packet will be acked by the callback or the responder
ok ->
{ok, _} ->
undefined;
%% Assembling failed, unregister the packet key
{error, _} ->
@ -250,16 +265,12 @@ on_fin(PacketId, Msg, FileId, Checksum) ->
undefined
end.
assemble(_Transfer, _Callback) ->
% spawn(fun() -> Callback({error, not_implemented}) end),
ok.
% assemble(Transfer, Callback) ->
% emqx_ft_storage_fs:assemble(
% storage(),
% Transfer,
% Callback
% ).
assemble(Transfer, Callback) ->
emqx_ft_storage:assemble(
get_context(),
Transfer,
Callback
).
callback({ChanPid, PacketId} = Key, _FileId) ->
fun(Result) ->
@ -281,9 +292,34 @@ transfer(Msg, FileId) ->
{ClientId, FileId}.
%% TODO: configure
storage() ->
filename:join(emqx:data_dir(), "file_transfer").
emqx_config:get([file_transfer, storage]).
on_assemble_timeout({ChanPid, PacketId}) ->
?SLOG(warning, #{msg => "on_assemble_timeout", packet_id => PacketId}),
erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR}).
%%--------------------------------------------------------------------
%% Context management
%%--------------------------------------------------------------------
get_context() ->
get_ft_data(self()).
put_context(Context) ->
put_ft_data(self(), Context).
get_ft_data(ChanPid) ->
case ets:lookup(?FT_TAB, ChanPid) of
[#emqx_ft{ft_data = FTData}] -> {ok, FTData};
[] -> none
end.
delete_ft_data(ChanPid) ->
true = ets:delete(?FT_TAB, ChanPid),
ok.
put_ft_data(ChanPid, FTData) ->
true = ets:insert(?FT_TAB, #emqx_ft{chan_pid = ChanPid, ft_data = FTData}),
ok.

View File

@ -23,8 +23,10 @@
start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_ft_sup:start_link(),
ok = emqx_ft:hook(),
ok = emqx_ft_conf:load(),
{ok, Sup}.
stop(_State) ->
ok = emqx_ft_conf:unload(),
ok = emqx_ft:unhook(),
ok.

View File

@ -0,0 +1,65 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc File Transfer configuration management module
-module(emqx_ft_conf).
-behaviour(emqx_config_handler).
%% Load/Unload
-export([
load/0,
unload/0
]).
%% callbacks for emqx_config_handler
-export([
pre_config_update/3,
post_config_update/5
]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec load() -> ok.
load() ->
emqx_conf:add_handler([file_transfer], ?MODULE).
-spec unload() -> ok.
unload() ->
emqx_conf:remove_handler([file_transfer]).
%%--------------------------------------------------------------------
%% emqx_config_handler callbacks
%%--------------------------------------------------------------------
-spec pre_config_update(list(atom()), emqx_config:update_request(), emqx_config:raw_config()) ->
{ok, emqx_config:update_request()} | {error, term()}.
pre_config_update(_, _Req, Config) ->
{ok, Config}.
-spec post_config_update(
list(atom()),
emqx_config:update_request(),
emqx_config:config(),
emqx_config:config(),
emqx_config:app_envs()
) ->
ok | {ok, Result :: any()} | {error, Reason :: term()}.
post_config_update(_, _Req, _NewConfig, _OldConfig, _AppEnvs) ->
ok.

View File

@ -79,6 +79,7 @@ handle_call({register, Key, DefaultAction, Timeout}, _From, State) ->
{reply, {error, already_registered}, State}
end;
handle_call({unregister, Key}, _From, State) ->
?SLOG(warning, #{msg => "unregister", key => Key}),
case ets:lookup(?TAB, Key) of
[] ->
{reply, {error, not_found}, State};

View File

@ -0,0 +1,49 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_schema).
-behaviour(hocon_schema).
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("typerefl/include/types.hrl").
-export([namespace/0, roots/0, fields/1, tags/0]).
namespace() -> file_transfer.
tags() ->
[<<"File Transfer">>].
roots() -> [file_transfer].
fields(file_transfer) ->
[
{storage, #{
type => hoconsc:union([
hoconsc:ref(?MODULE, local_storage)
])
}}
];
fields(local_storage) ->
[
{type, #{
type => local,
default => local,
required => false,
desc => ?DESC("local")
}}
].

View File

@ -0,0 +1,75 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_storage).
-export(
[
store_filemeta/2,
store_segment/3,
assemble/3
]
).
-type ctx() :: term().
-type storage() :: emqx_config:config().
-export_type([assemble_callback/0]).
-type assemble_callback() :: fun((ok | {error, term()}) -> any()).
%%--------------------------------------------------------------------
%% behaviour
%%--------------------------------------------------------------------
-callback store_filemeta(storage(), emqx_ft:transfer(), emqx_ft:filemeta()) ->
{ok, ctx()} | {error, term()}.
-callback store_segment(storage(), ctx(), emqx_ft:transfer(), emqx_ft:segment()) ->
{ok, ctx()} | {error, term()}.
-callback assemble(storage(), ctx(), emqx_ft:transfer(), assemble_callback()) ->
{ok, pid()} | {error, term()}.
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec store_filemeta(emqx_ft:transfer(), emqx_ft:filemeta()) ->
{ok, ctx()} | {error, term()}.
store_filemeta(Transfer, FileMeta) ->
Mod = mod(),
Mod:store_filemeta(storage(), Transfer, FileMeta).
-spec store_segment(ctx(), emqx_ft:transfer(), emqx_ft:segment()) ->
{ok, ctx()} | {error, term()}.
store_segment(Ctx, Transfer, Segment) ->
Mod = mod(),
Mod:store_segment(storage(), Ctx, Transfer, Segment).
-spec assemble(ctx(), emqx_ft:transfer(), assemble_callback()) ->
{ok, pid()} | {error, term()}.
assemble(Ctx, Transfer, Callback) ->
Mod = mod(),
Mod:assemble(storage(), Ctx, Transfer, Callback).
mod() ->
case storage() of
#{type := local} ->
% emqx_ft_storage_fs
emqx_ft_storage_dummy
end.
storage() ->
emqx_config:get([file_transfer, storage]).

View File

@ -0,0 +1,35 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_storage_dummy).
-behaviour(emqx_ft_storage).
-export([
store_filemeta/3,
store_segment/4,
assemble/4
]).
store_filemeta(_Storage, _Transfer, _Meta) ->
{ok, #{}}.
store_segment(_Storage, Ctx, _Transfer, _Segment) ->
{ok, Ctx}.
assemble(_Storage, _Ctx, _Transfer, Callback) ->
Pid = spawn(fun() -> Callback({error, not_implemented}) end),
{ok, Pid}.

View File

@ -19,10 +19,10 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
% -compile(export_all).
-behaviour(emqx_ft_storage).
-export([store_filemeta/3]).
-export([store_segment/3]).
-export([store_segment/4]).
-export([list/2]).
-export([read_segment/5]).
-export([assemble/3]).
@ -32,45 +32,10 @@
-export([write/2]).
-export([discard/1]).
% -behaviour(gen_server).
% -export([init/1]).
% -export([handle_call/3]).
% -export([handle_cast/2]).
-type json_value() ::
null
| boolean()
| binary()
| number()
| [json_value()]
| #{binary() => json_value()}.
-reflect_type([json_value/0]).
-type transfer() :: emqx_ft:transfer().
-type offset() :: emqx_ft:offset().
%% TODO: move to `emqx_ft` interface module
% -type sha256_hex() :: <<_:512>>.
-type filemeta() :: #{
%% Display name
name := string(),
%% Size in bytes, as advertised by the client.
%% Client is free to specify here whatever it wants, which means we can end
%% up with a file of different size after assembly. It's not clear from
%% specification what that means (e.g. what are clients' expectations), we
%% currently do not condider that an error (or, specifically, a signal that
%% the resulting file is corrupted during transmission).
size => _Bytes :: non_neg_integer(),
checksum => {sha256, <<_:256>>},
expire_at := emqx_datetime:epoch_second(),
%% TTL of individual segments
%% Somewhat confusing that we won't know it on the nodes where the filemeta
%% is missing.
segments_ttl => _Seconds :: pos_integer(),
user_data => json_value()
}.
-type filemeta() :: emqx_ft:filemeta().
-type segment() :: {offset(), _Content :: binary()}.
@ -113,13 +78,14 @@
%% Atomic operation.
-spec store_filemeta(storage(), transfer(), filemeta()) ->
% Quota? Some lower level errors?
ok | {error, conflict} | {error, _TODO}.
{ok, emqx_ft_storage:ctx()} | {error, conflict} | {error, _TODO}.
store_filemeta(Storage, Transfer, Meta) ->
Filepath = mk_filepath(Storage, Transfer, ?MANIFEST),
case read_file(Filepath, fun decode_filemeta/1) of
{ok, Meta} ->
_ = touch_file(Filepath),
ok;
%% No context is needed for this implementation.
{ok, #{}};
{ok, _Conflict} ->
% TODO
% We won't see conflicts in case of concurrent `store_filemeta`
@ -132,13 +98,18 @@ store_filemeta(Storage, Transfer, Meta) ->
%% Store a segment in the backing filesystem.
%% Atomic operation.
-spec store_segment(storage(), transfer(), segment()) ->
-spec store_segment(storage(), emqx_ft_storage:ctx(), transfer(), segment()) ->
% Where is the checksum gets verified? Upper level probably.
% Quota? Some lower level errors?
ok | {error, _TODO}.
store_segment(Storage, Transfer, Segment = {_Offset, Content}) ->
store_segment(Storage, Ctx, Transfer, Segment = {_Offset, Content}) ->
Filepath = mk_filepath(Storage, Transfer, mk_segment_filename(Segment)),
write_file_atomic(Filepath, Content).
case write_file_atomic(Filepath, Content) of
ok ->
{ok, Ctx};
{error, _} = Error ->
Error
end.
-spec list(storage(), transfer()) ->
% Some lower level errors? {error, notfound}?
@ -178,13 +149,10 @@ read_segment(_Storage, _Transfer, Segment, Offset, Size) ->
end.
-spec assemble(storage(), transfer(), fun((ok | {error, term()}) -> any())) ->
% {ok, _Assembler :: pid()} | {error, incomplete} | {error, badrpc} | {error, _TODO}.
{ok, _Assembler :: pid()} | {error, _TODO}.
assemble(Storage, Transfer, Callback) ->
assemble(Storage, _Ctx, Transfer, Callback) ->
emqx_ft_assembler_sup:start_child(Storage, Transfer, Callback).
%%
-type handle() :: {file:name(), io:device(), crypto:hash_state()}.
-spec open_file(storage(), transfer(), filemeta()) ->
@ -268,8 +236,7 @@ schema() ->
{size, hoconsc:mk(non_neg_integer())},
{expire_at, hoconsc:mk(non_neg_integer())},
{checksum, hoconsc:mk({atom(), binary()}, #{converter => converter(checksum)})},
{segments_ttl, hoconsc:mk(pos_integer())},
{user_data, hoconsc:mk(json_value())}
{segments_ttl, hoconsc:mk(pos_integer())}
]
}.
@ -354,10 +321,8 @@ mk_filedir(Storage, {ClientId, FileId}) ->
mk_filepath(Storage, Transfer, Filename) ->
filename:join(mk_filedir(Storage, Transfer), Filename).
get_storage_root(Storage) ->
Storage.
%%
get_storage_root(_Storage) ->
filename:join(emqx:data_dir(), "file_transfer").
-include_lib("kernel/include/file.hrl").