Compare commits

...

18 Commits

Author SHA1 Message Date
Ilya Averyanov e4fd878998 feat(ft): add API 2023-02-06 01:49:06 +02:00
Ilya Averyanov af4360f5c5
Merge pull request #9900 from keynslug/file-transfer
test(ft): add some basic assembler tests
2023-02-03 19:41:35 +02:00
Andrew Mayorov 9a56c34c8d
feat(ft): add fs storage bpapi and use it in assembler 2023-02-03 18:48:50 +03:00
Andrew Mayorov c11e251902
feat(ft-fs): allow to list all transfers in storage
This is rather simplistic and thus, temporary solution.
2023-02-03 17:04:27 +03:00
Andrew Mayorov 19cd66198b
feat(ft-fs): make `list` / `read` more generic
And usable in wider contexts as a consequence, for example querying and
fetching resulting files from remote nodes.
2023-02-03 14:37:02 +03:00
Andrew Mayorov 3bb08fe945
fix(ft-fs): put fragments into separate directories
In order to avoid potential filename collisions.
2023-02-03 13:59:05 +03:00
Andrew Mayorov 6b5ef45084
refactor(ft): bring back userdata to filemeta schema 2023-02-03 12:39:55 +03:00
Ilya Averyanov 73405772cd feat(ft): removed replicated data 2023-02-03 00:45:44 +02:00
Ilya Averyanov ac5fcfe9f1 feat(ft): add config & backend behaviour 2023-02-03 00:31:55 +02:00
Andrew Mayorov f9078e8401 fix(ft-fs): add missing `read_segment/5` + fix atomic write 2023-02-03 00:31:55 +02:00
Andrew Mayorov 5dae423f1e fix(ft-asm): follow proper `segment` fragment type 2023-02-03 00:31:55 +02:00
Andrew Mayorov d2bb574921 fix(ft-asm): ensure module follows statem behaviour 2023-02-03 00:31:55 +02:00
Andrew Mayorov 37d8930341 test(ft): add some basic assembler tests 2023-02-03 00:31:55 +02:00
Ilya Averyanov 3967c9c5b2 feat(ft): improve robustness of asynchronous acks
* add auto ack after timeout
* add fin file transfer packet registration to avoid
duplication and multiple acks
2023-02-03 00:31:55 +02:00
Ilya Averyanov 8f544041e4 tie file transfer frontend and backend together 2023-02-03 00:31:55 +02:00
Andrew Mayorov ac6e9292dd feat(ft): introduce simple filesystem storage backend + assembler 2023-02-03 00:31:55 +02:00
Ilya Averyanov 2258285edf add file transfer app and bootstrap replicated ft data structure 2023-02-03 00:31:51 +02:00
Ilya Averyanov 63987bf6d4 add basic hooks 2023-02-01 20:10:20 +02:00
31 changed files with 2826 additions and 26 deletions

View File

@ -637,6 +637,13 @@ process_connect(
%%--------------------------------------------------------------------
process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel) ->
?SLOG(
warning,
#{
packet => Packet,
packet_id => PacketId
}
),
case
pipeline(
[
@ -733,9 +740,13 @@ do_publish(_PacketId, Msg = #message{qos = ?QOS_0}, Channel) ->
{ok, NChannel};
do_publish(PacketId, Msg = #message{qos = ?QOS_1}, Channel) ->
PubRes = emqx_broker:publish(Msg),
RC = puback_reason_code(PubRes),
NChannel = ensure_quota(PubRes, Channel),
handle_out(puback, {PacketId, RC}, NChannel);
RC = puback_reason_code(PacketId, Msg, PubRes),
case RC of
undefined ->
{ok, Channel};
_Value ->
do_finish_publish(PacketId, PubRes, RC, Channel)
end;
do_publish(
PacketId,
Msg = #message{qos = ?QOS_2},
@ -743,7 +754,7 @@ do_publish(
) ->
case emqx_session:publish(ClientInfo, PacketId, Msg, Session) of
{ok, PubRes, NSession} ->
RC = puback_reason_code(PubRes),
RC = pubrec_reason_code(PubRes),
NChannel0 = set_session(NSession, Channel),
NChannel1 = ensure_timer(await_timer, NChannel0),
NChannel2 = ensure_quota(PubRes, NChannel1),
@ -756,6 +767,10 @@ do_publish(
handle_out(disconnect, RC, Channel)
end.
do_finish_publish(PacketId, PubRes, RC, Channel) ->
NChannel = ensure_quota(PubRes, Channel),
handle_out(puback, {PacketId, RC}, NChannel).
ensure_quota(_, Channel = #channel{quota = undefined}) ->
Channel;
ensure_quota(PubRes, Channel = #channel{quota = Limiter}) ->
@ -775,9 +790,14 @@ ensure_quota(PubRes, Channel = #channel{quota = Limiter}) ->
ensure_timer(quota_timer, Intv, Channel#channel{quota = NLimiter})
end.
-compile({inline, [puback_reason_code/1]}).
puback_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS;
puback_reason_code([_ | _]) -> ?RC_SUCCESS.
-compile({inline, [pubrec_reason_code/1]}).
pubrec_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS;
pubrec_reason_code([_ | _]) -> ?RC_SUCCESS.
puback_reason_code(PacketId, Msg, [] = PubRes) ->
emqx_hooks:run_fold('message.puback', [PacketId, Msg, PubRes], ?RC_NO_MATCHING_SUBSCRIBERS);
puback_reason_code(PacketId, Msg, [_ | _] = PubRes) ->
emqx_hooks:run_fold('message.puback', [PacketId, Msg, PubRes], ?RC_SUCCESS).
-compile({inline, [after_message_acked/3]}).
after_message_acked(ClientInfo, Msg, PubAckProps) ->
@ -1272,6 +1292,8 @@ handle_info(die_if_test = Info, Channel) ->
die_if_test_compiled(),
?SLOG(error, #{msg => "unexpected_info", info => Info}),
{ok, Channel};
handle_info({puback, PacketId, PubRes, RC}, Channel) ->
do_finish_publish(PacketId, PubRes, RC, Channel);
handle_info(Info, Channel) ->
?SLOG(error, #{msg => "unexpected_info", info => Info}),
{ok, Channel}.

View File

@ -87,6 +87,7 @@
mark_channel_connected/1,
mark_channel_disconnected/1,
get_connected_client_count/0,
takeover_finish/2,
do_kick_session/3,
do_get_chan_stats/2,
@ -178,11 +179,13 @@ unregister_channel(ClientId) when is_binary(ClientId) ->
ok.
%% @private
do_unregister_channel(Chan) ->
do_unregister_channel({_ClientId, ChanPid} = Chan) ->
ok = emqx_cm_registry:unregister_channel(Chan),
true = ets:delete(?CHAN_CONN_TAB, Chan),
true = ets:delete(?CHAN_INFO_TAB, Chan),
ets:delete_object(?CHAN_TAB, Chan).
ets:delete_object(?CHAN_TAB, Chan),
ok = emqx_hooks:run('channel.unregistered', [ChanPid]),
true.
-spec connection_closed(emqx_types:clientid()) -> true.
connection_closed(ClientId) ->
@ -210,7 +213,7 @@ do_get_chan_info(ClientId, ChanPid) ->
-spec get_chan_info(emqx_types:clientid(), chan_pid()) ->
maybe(emqx_types:infos()).
get_chan_info(ClientId, ChanPid) ->
wrap_rpc(emqx_cm_proto_v1:get_chan_info(ClientId, ChanPid)).
wrap_rpc(emqx_cm_proto_v2:get_chan_info(ClientId, ChanPid)).
%% @doc Update infos of the channel.
-spec set_chan_info(emqx_types:clientid(), emqx_types:attrs()) -> boolean().
@ -240,7 +243,7 @@ do_get_chan_stats(ClientId, ChanPid) ->
-spec get_chan_stats(emqx_types:clientid(), chan_pid()) ->
maybe(emqx_types:stats()).
get_chan_stats(ClientId, ChanPid) ->
wrap_rpc(emqx_cm_proto_v1:get_chan_stats(ClientId, ChanPid)).
wrap_rpc(emqx_cm_proto_v2:get_chan_stats(ClientId, ChanPid)).
%% @doc Set channel's stats.
-spec set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> boolean().
@ -302,18 +305,11 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
}};
{living, ConnMod, ChanPid, Session} ->
ok = emqx_session:resume(ClientInfo, Session),
case
request_stepdown(
{takeover, 'end'},
ConnMod,
ChanPid
)
of
case wrap_rpc(emqx_cm_proto_v2:takeover_finish(ConnMod, ChanPid)) of
{ok, Pendings} ->
Session1 = emqx_persistent_session:persist(
ClientInfo, ConnInfo, Session
),
register_channel(ClientId, Self, ConnInfo),
{ok, #{
session => Session1,
present => true,
@ -397,6 +393,13 @@ takeover_session(ClientId) ->
takeover_session(ClientId, ChanPid)
end.
takeover_finish(ConnMod, ChanPid) ->
request_stepdown(
{takeover, 'end'},
ConnMod,
ChanPid
).
takeover_session(ClientId, Pid) ->
try
do_takeover_session(ClientId, Pid)
@ -426,7 +429,7 @@ do_takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
end
end;
do_takeover_session(ClientId, ChanPid) ->
wrap_rpc(emqx_cm_proto_v1:takeover_session(ClientId, ChanPid)).
wrap_rpc(emqx_cm_proto_v2:takeover_session(ClientId, ChanPid)).
%% @doc Discard all the sessions identified by the ClientId.
-spec discard_session(emqx_types:clientid()) -> ok.
@ -528,7 +531,7 @@ do_kick_session(Action, ClientId, ChanPid) ->
%% @private This function is shared for session 'kick' and 'discard' (as the first arg Action).
kick_session(Action, ClientId, ChanPid) ->
try
wrap_rpc(emqx_cm_proto_v1:kick_session(Action, ClientId, ChanPid))
wrap_rpc(emqx_cm_proto_v2:kick_session(Action, ClientId, ChanPid))
catch
Error:Reason ->
%% This should mostly be RPC failures.
@ -713,7 +716,7 @@ do_get_chann_conn_mod(ClientId, ChanPid) ->
end.
get_chann_conn_mod(ClientId, ChanPid) ->
wrap_rpc(emqx_cm_proto_v1:get_chann_conn_mod(ClientId, ChanPid)).
wrap_rpc(emqx_cm_proto_v2:get_chann_conn_mod(ClientId, ChanPid)).
mark_channel_connected(ChanPid) ->
?tp(emqx_cm_connected_client_count_inc, #{}),

View File

@ -101,6 +101,8 @@
-export_type([oom_policy/0]).
-export_type([takeover_data/0]).
-type proto_ver() ::
?MQTT_PROTO_V3
| ?MQTT_PROTO_V4
@ -242,3 +244,5 @@
max_heap_size => non_neg_integer(),
enable => boolean()
}.
-type takeover_data() :: map().

View File

@ -0,0 +1,88 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_cm_proto_v2).
-behaviour(emqx_bpapi).
-export([
introduced_in/0,
lookup_client/2,
kickout_client/2,
get_chan_stats/2,
get_chan_info/2,
get_chann_conn_mod/2,
takeover_session/2,
takeover_finish/2,
kick_session/3
]).
-include("bpapi.hrl").
-include("src/emqx_cm.hrl").
introduced_in() ->
"5.0.0".
-spec kickout_client(node(), emqx_types:clientid()) -> ok | {badrpc, _}.
kickout_client(Node, ClientId) ->
rpc:call(Node, emqx_cm, kick_session, [ClientId]).
-spec lookup_client(node(), {clientid, emqx_types:clientid()} | {username, emqx_types:username()}) ->
[emqx_cm:channel_info()] | {badrpc, _}.
lookup_client(Node, Key) ->
rpc:call(Node, emqx_cm, lookup_client, [Key]).
-spec get_chan_stats(emqx_types:clientid(), emqx_cm:chan_pid()) -> emqx_types:stats() | {badrpc, _}.
get_chan_stats(ClientId, ChanPid) ->
rpc:call(node(ChanPid), emqx_cm, do_get_chan_stats, [ClientId, ChanPid], ?T_GET_INFO * 2).
-spec get_chan_info(emqx_types:clientid(), emqx_cm:chan_pid()) -> emqx_types:infos() | {badrpc, _}.
get_chan_info(ClientId, ChanPid) ->
rpc:call(node(ChanPid), emqx_cm, do_get_chan_info, [ClientId, ChanPid], ?T_GET_INFO * 2).
-spec get_chann_conn_mod(emqx_types:clientid(), emqx_cm:chan_pid()) ->
module() | undefined | {badrpc, _}.
get_chann_conn_mod(ClientId, ChanPid) ->
rpc:call(node(ChanPid), emqx_cm, do_get_chann_conn_mod, [ClientId, ChanPid], ?T_GET_INFO * 2).
-spec takeover_session(emqx_types:clientid(), emqx_cm:chan_pid()) ->
none
| {expired | persistent, emqx_session:session()}
| {living, _ConnMod :: atom(), emqx_cm:chan_pid(), emqx_session:session()}
| {badrpc, _}.
takeover_session(ClientId, ChanPid) ->
rpc:call(node(ChanPid), emqx_cm, takeover_session, [ClientId, ChanPid], ?T_TAKEOVER * 2).
-spec takeover_finish(module(), emqx_cm:chan_pid()) ->
{ok, emqx_type:takeover_data()}
| {ok, list(emqx_type:deliver()), emqx_type:takeover_data()}
| {error, term()}
| {badrpc, _}.
takeover_finish(ConnMod, ChanPid) ->
erpc:call(
node(ChanPid),
emqx_cm,
takeover_session_finish,
[ConnMod, ChanPid],
?T_TAKEOVER * 2
).
-spec kick_session(kick | discard, emqx_types:clientid(), emqx_cm:chan_pid()) -> ok | {badrpc, _}.
kick_session(Action, ClientId, ChanPid) ->
rpc:call(node(ChanPid), emqx_cm, do_kick_session, [Action, ClientId, ChanPid], ?T_KICK * 2).

View File

@ -1133,7 +1133,7 @@ t_ws_cookie_init(_) ->
?assertMatch(#{ws_cookie := WsCookie}, emqx_channel:info(clientinfo, Channel)).
%%--------------------------------------------------------------------
%% Test cases for other mechnisms
%% Test cases for other mechanisms
%%--------------------------------------------------------------------
t_flapping_detect(_) ->

View File

@ -0,0 +1,70 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2018-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_channel_delayed_puback_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_hooks.hrl").
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
Config.
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([]).
init_per_testcase(Case, Config) ->
?MODULE:Case({init, Config}).
end_per_testcase(Case, Config) ->
?MODULE:Case({'end', Config}).
%%--------------------------------------------------------------------
%% Test cases
%%--------------------------------------------------------------------
t_delayed_puback({init, Config}) ->
emqx_hooks:put('message.puback', {?MODULE, on_message_puback, []}, ?HP_LOWEST),
Config;
t_delayed_puback({'end', _Config}) ->
emqx_hooks:del('message.puback', {?MODULE, on_message_puback});
t_delayed_puback(_Config) ->
{ok, ConnPid} = emqtt:start_link([{clientid, <<"clientid">>}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(ConnPid),
{ok, #{reason_code := ?RC_UNSPECIFIED_ERROR}} = emqtt:publish(
ConnPid, <<"topic">>, <<"hello">>, 1
),
emqtt:disconnect(ConnPid).
%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------
on_message_puback(PacketId, _Msg, PubRes, _RC) ->
erlang:send(self(), {puback, PacketId, PubRes, ?RC_UNSPECIFIED_ERROR}),
{stop, undefined}.

View File

@ -1,6 +1,6 @@
{application, emqx_conf, [
{description, "EMQX configuration management"},
{vsn, "0.1.10"},
{vsn, "0.1.11"},
{registered, []},
{mod, {emqx_conf_app, []}},
{applications, [kernel, stdlib]},

View File

@ -63,7 +63,8 @@
emqx_psk_schema,
emqx_limiter_schema,
emqx_slow_subs_schema,
emqx_mgmt_api_key_schema
emqx_mgmt_api_key_schema,
emqx_ft_schema
]).
%% root config should not have a namespace

9
apps/emqx_ft/README.md Normal file
View File

@ -0,0 +1,9 @@
emqx_ft
=====
EMQX file transfer over MQTT
Build
-----
$ rebar3 compile

View File

@ -0,0 +1,25 @@
emqx_ft_api {
file_list {
desc {
en: "List all uploaded files."
zh: "列出所有上传的文件。"
}
label: {
en: "List all uploaded files"
zh: "列出所有上传的文件"
}
}
file_get {
desc {
en: "Get a file by its id."
zh: "根据文件 id 获取文件。"
}
label: {
en: "Get a file by its id"
zh: "根据文件 id 获取文件"
}
}
}

View File

@ -0,0 +1,14 @@
emqx_ft_schema {
local {
desc {
en: "Use local file system to store uploaded files and temporary data."
zh: "使用本地文件系统来存储上传的文件和临时数据。"
}
label: {
en: "Local Storage"
zh: "本地存储"
}
}
}

View File

@ -0,0 +1,15 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

11
apps/emqx_ft/rebar.config Normal file
View File

@ -0,0 +1,11 @@
%% -*- mode: erlang -*-
{erl_opts, [debug_info]}.
{deps, [{emqx, {path, "../emqx"}}]}.
{shell, [
% {config, "config/sys.config"},
{apps, [emqx_ft]}
]}.
{project_plugins, [erlfmt]}.

View File

@ -0,0 +1,13 @@
{application, emqx_ft, [
{description, "EMQX file transfer over MQTT"},
{vsn, "0.1.0"},
{registered, []},
{mod, {emqx_ft_app, []}},
{applications, [
kernel,
stdlib,
gproc
]},
{env, []},
{modules, []}
]}.

View File

@ -0,0 +1,238 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft).
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_hooks.hrl").
-export([
hook/0,
unhook/0
]).
-export([
on_message_publish/1,
on_message_puback/4
]).
-export([on_assemble_timeout/1]).
-export_type([
clientid/0,
transfer/0,
offset/0,
filemeta/0,
segment/0
]).
%% Number of bytes
-type bytes() :: non_neg_integer().
%% MQTT Client ID
-type clientid() :: emqx_types:clientid().
-type fileid() :: binary().
-type transfer() :: {clientid(), fileid()}.
-type offset() :: bytes().
-type filemeta() :: #{
%% Display name
name := string(),
%% Size in bytes, as advertised by the client.
%% Client is free to specify here whatever it wants, which means we can end
%% up with a file of different size after assembly. It's not clear from
%% specification what that means (e.g. what are clients' expectations), we
%% currently do not condider that an error (or, specifically, a signal that
%% the resulting file is corrupted during transmission).
size => _Bytes :: non_neg_integer(),
checksum => {sha256, <<_:256>>},
expire_at := emqx_datetime:epoch_second(),
%% TTL of individual segments
%% Somewhat confusing that we won't know it on the nodes where the filemeta
%% is missing.
segments_ttl => _Seconds :: pos_integer(),
user_data => emqx_ft_schema:json_value()
}.
-type segment() :: {offset(), _Content :: binary()}.
-define(ASSEMBLE_TIMEOUT, 5000).
%%--------------------------------------------------------------------
%% API for app
%%--------------------------------------------------------------------
hook() ->
ok = emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_LOWEST),
ok = emqx_hooks:put('message.puback', {?MODULE, on_message_puback, []}, ?HP_LOWEST).
unhook() ->
ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
ok = emqx_hooks:del('message.puback', {?MODULE, on_message_puback}).
%%--------------------------------------------------------------------
%% Hooks
%%--------------------------------------------------------------------
on_message_publish(
Msg = #message{
id = _Id,
topic = <<"$file/", _/binary>>
}
) ->
Headers = Msg#message.headers,
{stop, Msg#message{headers = Headers#{allow_publish => false}}};
on_message_publish(Msg) ->
{ok, Msg}.
on_message_puback(PacketId, #message{topic = Topic} = Msg, _PubRes, _RC) ->
case Topic of
<<"$file/", FileCommand/binary>> ->
{stop, on_file_command(PacketId, Msg, FileCommand)};
_ ->
ignore
end.
%%--------------------------------------------------------------------
%% Handlers for transfer messages
%%--------------------------------------------------------------------
on_file_command(PacketId, Msg, FileCommand) ->
case string:split(FileCommand, <<"/">>, all) of
[FileId, <<"init">>] ->
on_init(Msg, FileId);
[FileId, <<"fin">>] ->
on_fin(PacketId, Msg, FileId, undefined);
[FileId, <<"fin">>, Checksum] ->
on_fin(PacketId, Msg, FileId, Checksum);
[FileId, <<"abort">>] ->
on_abort(Msg, FileId);
[FileId, Offset] ->
on_segment(Msg, FileId, Offset, undefined);
[FileId, Offset, Checksum] ->
on_segment(Msg, FileId, Offset, Checksum);
_ ->
?RC_UNSPECIFIED_ERROR
end.
on_init(Msg, FileId) ->
?SLOG(info, #{
msg => "on_init",
mqtt_msg => Msg,
file_id => FileId
}),
Payload = Msg#message.payload,
% %% Add validations here
Meta = emqx_json:decode(Payload, [return_maps]),
case emqx_ft_storage:store_filemeta(transfer(Msg, FileId), Meta) of
ok ->
?RC_SUCCESS;
{error, _Reason} ->
?RC_UNSPECIFIED_ERROR
end.
on_abort(_Msg, _FileId) ->
%% TODO
?RC_SUCCESS.
on_segment(Msg, FileId, Offset, Checksum) ->
?SLOG(info, #{
msg => "on_segment",
mqtt_msg => Msg,
file_id => FileId,
offset => Offset,
checksum => Checksum
}),
%% TODO: handle checksum
Payload = Msg#message.payload,
Segment = {binary_to_integer(Offset), Payload},
%% Add offset/checksum validations
case emqx_ft_storage:store_segment(transfer(Msg, FileId), Segment) of
ok ->
?RC_SUCCESS;
{error, _Reason} ->
?RC_UNSPECIFIED_ERROR
end.
on_fin(PacketId, Msg, FileId, Checksum) ->
?SLOG(info, #{
msg => "on_fin",
mqtt_msg => Msg,
file_id => FileId,
checksum => Checksum,
packet_id => PacketId
}),
%% TODO: handle checksum? Do we need it?
FinPacketKey = {self(), PacketId},
_ =
case
emqx_ft_responder:register(
FinPacketKey, fun ?MODULE:on_assemble_timeout/1, ?ASSEMBLE_TIMEOUT
)
of
%% We have new fin packet
ok ->
Callback = callback(FinPacketKey, FileId),
case emqx_ft_storage:assemble(transfer(Msg, FileId), Callback) of
%% Assembling started, packet will be acked by the callback or the responder
{ok, _} ->
undefined;
%% Assembling failed, unregister the packet key
{error, _} ->
case emqx_ft_responder:unregister(FinPacketKey) of
%% We successfully unregistered the packet key,
%% so we can send the error code at once
ok ->
?RC_UNSPECIFIED_ERROR;
%% Someone else already unregistered the key,
%% that is, either responder or someone else acked the packet,
%% we do not have to ack
{error, not_found} ->
undefined
end
end;
%% Fin packet already received.
%% Since we are still handling the previous one,
%% we probably have retransmit here
{error, already_registered} ->
undefined
end.
callback({ChanPid, PacketId} = Key, _FileId) ->
fun(Result) ->
case emqx_ft_responder:unregister(Key) of
ok ->
case Result of
ok ->
erlang:send(ChanPid, {puback, PacketId, [], ?RC_SUCCESS});
{error, _} ->
erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR})
end;
{error, not_found} ->
ok
end
end.
transfer(Msg, FileId) ->
ClientId = Msg#message.from,
{ClientId, FileId}.
on_assemble_timeout({ChanPid, PacketId}) ->
?SLOG(warning, #{msg => "on_assemble_timeout", packet_id => PacketId}),
erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR}).

View File

@ -0,0 +1,163 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_api).
-behaviour(minirest_api).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl").
%% Swagger specs from hocon schema
-export([
api_spec/0,
paths/0,
schema/1,
namespace/0
]).
-export([
fields/1,
roots/0
]).
%% API callbacks
-export([
'/file_transfer/files'/2,
'/file_transfer/file'/2
]).
-import(hoconsc, [mk/2, ref/1, ref/2]).
namespace() -> "file_transfer".
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
paths() ->
[
"/file_transfer/files",
"/file_transfer/file"
].
schema("/file_transfer/files") ->
#{
'operationId' => '/file_transfer/files',
get => #{
tags => [<<"file_transfer">>],
summary => <<"List all uploaded files">>,
description => ?DESC("file_list"),
responses => #{
200 => <<"Operation success">>,
503 => emqx_dashboard_swagger:error_codes(
['SERVICE_UNAVAILABLE'], <<"Service unavailable">>
)
}
}
};
schema("/file_transfer/file") ->
#{
'operationId' => '/file_transfer/file',
get => #{
tags => [<<"file_transfer">>],
summary => <<"Download a particular file">>,
description => ?DESC("file_get"),
parameters => [
ref(file_node),
ref(file_clientid),
ref(file_id)
],
responses => #{
200 => <<"Operation success">>,
404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Not found">>),
503 => emqx_dashboard_swagger:error_codes(
['SERVICE_UNAVAILABLE'], <<"Service unavailable">>
)
}
}
}.
'/file_transfer/files'(get, #{}) ->
case emqx_ft_storage:ready_transfers() of
{ok, Transfers} ->
FormattedTransfers = lists:map(
fun({Id, Info}) ->
#{id => Id, info => format_file_info(Info)}
end,
Transfers
),
{200, #{<<"files">> => FormattedTransfers}};
{error, _} ->
{503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)}
end.
'/file_transfer/file'(get, #{query_string := Query}) ->
case emqx_ft_storage:get_ready_transfer(Query) of
{ok, FileData} ->
{200, #{<<"content-type">> => <<"application/data">>}, FileData};
{error, enoent} ->
{404, error_msg('NOT_FOUND', <<"Not found">>)};
{error, _} ->
{503, error_msg('SERVICE_UNAVAILABLE', <<"Service unavailable">>)}
end.
error_msg(Code, Msg) ->
#{code => Code, message => emqx_misc:readable_error_msg(Msg)}.
-spec fields(hocon_schema:name()) -> hocon_schema:fields().
fields(file_node) ->
Desc = <<"File Node">>,
Meta = #{
in => query, desc => Desc, example => <<"emqx@127.0.0.1">>, required => false
},
[{node, hoconsc:mk(binary(), Meta)}];
fields(file_clientid) ->
Desc = <<"File ClientId">>,
Meta = #{
in => query, desc => Desc, example => <<"client1">>, required => false
},
[{clientid, hoconsc:mk(binary(), Meta)}];
fields(file_id) ->
Desc = <<"File">>,
Meta = #{
in => query, desc => Desc, example => <<"file1">>, required => false
},
[{fileid, hoconsc:mk(binary(), Meta)}].
roots() ->
[
file_node,
file_clientid,
file_id
].
%%--------------------------------------------------------------------
%% Helpers
%%--------------------------------------------------------------------
format_file_info(#{path := Path, size := Size, timestamp := Timestamp}) ->
#{
path => Path,
size => Size,
timestamp => format_datetime(Timestamp)
}.
format_datetime({{Year, Month, Day}, {Hour, Minute, Second}}) ->
iolist_to_binary(
io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w", [
Year, Month, Day, Hour, Minute, Second
])
).

View File

@ -0,0 +1,32 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_app).
-behaviour(application).
-export([start/2, stop/1]).
start(_StartType, _StartArgs) ->
{ok, Sup} = emqx_ft_sup:start_link(),
ok = emqx_ft:hook(),
ok = emqx_ft_conf:load(),
{ok, Sup}.
stop(_State) ->
ok = emqx_ft_conf:unload(),
ok = emqx_ft:unhook(),
ok.

View File

@ -0,0 +1,166 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_assembler).
-export([start_link/3]).
-behaviour(gen_statem).
-export([callback_mode/0]).
-export([init/1]).
% -export([list_local_fragments/3]).
% -export([list_remote_fragments/3]).
% -export([start_assembling/3]).
-export([handle_event/4]).
% -export([handle_continue/2]).
% -export([handle_call/3]).
% -export([handle_cast/2]).
-record(st, {
storage :: _Storage,
transfer :: emqx_ft:transfer(),
assembly :: _TODO,
file :: io:device(),
hash,
callback :: fun((ok | {error, term()}) -> any())
}).
-define(RPC_LIST_TIMEOUT, 1000).
-define(RPC_READSEG_TIMEOUT, 5000).
%%
start_link(Storage, Transfer, Callback) ->
gen_statem:start_link(?MODULE, {Storage, Transfer, Callback}, []).
%%
-define(internal(C), {next_event, internal, C}).
callback_mode() ->
handle_event_function.
init({Storage, Transfer, Callback}) ->
St = #st{
storage = Storage,
transfer = Transfer,
assembly = emqx_ft_assembly:new(),
hash = crypto:hash_init(sha256),
callback = Callback
},
{ok, list_local_fragments, St, ?internal([])}.
handle_event(internal, _, list_local_fragments, St = #st{assembly = Asm}) ->
% TODO: what we do with non-transients errors here (e.g. `eacces`)?
{ok, Fragments} = emqx_ft_storage_fs:list(St#st.storage, St#st.transfer, fragment),
NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(Asm, node(), Fragments)),
NSt = St#st{assembly = NAsm},
case emqx_ft_assembly:status(NAsm) of
complete ->
{next_state, start_assembling, NSt, ?internal([])};
{incomplete, _} ->
Nodes = ekka:nodelist() -- [node()],
{next_state, {list_remote_fragments, Nodes}, NSt, ?internal([])}
% TODO: recovery?
% {error, _} = Reason ->
% {stop, Reason}
end;
handle_event(internal, _, {list_remote_fragments, Nodes}, St) ->
% TODO
% Async would better because we would not need to wait for some lagging nodes if
% the coverage is already complete.
% TODO: portable "storage" ref
Results = emqx_ft_storage_fs_proto_v1:multilist(Nodes, St#st.transfer, fragment),
NodeResults = lists:zip(Nodes, Results),
NAsm = emqx_ft_assembly:update(
lists:foldl(
fun
({Node, {ok, {ok, Fragments}}}, Asm) ->
emqx_ft_assembly:append(Asm, Node, Fragments);
({_Node, _Result}, Asm) ->
% TODO: log?
Asm
end,
St#st.assembly,
NodeResults
)
),
NSt = St#st{assembly = NAsm},
case emqx_ft_assembly:status(NAsm) of
complete ->
{next_state, start_assembling, NSt, ?internal([])};
% TODO: retries / recovery?
{incomplete, _} = Status ->
{stop, {error, Status}}
end;
handle_event(internal, _, start_assembling, St = #st{assembly = Asm}) ->
Filemeta = emqx_ft_assembly:filemeta(Asm),
Coverage = emqx_ft_assembly:coverage(Asm),
% TODO: errors
{ok, Handle} = emqx_ft_storage_fs:open_file(St#st.storage, St#st.transfer, Filemeta),
{next_state, {assemble, Coverage}, St#st{file = Handle}, ?internal([])};
handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) ->
% TODO
% Currently, race is possible between getting segment info from the remote node and
% this node garbage collecting the segment itself.
% TODO: pipelining
case pread(Node, Segment, St) of
{ok, Content} ->
{ok, NHandle} = emqx_ft_storage_fs:write(St#st.file, Content),
{next_state, {assemble, Rest}, St#st{file = NHandle}, ?internal([])}
% {error, _} ->
% ...
end;
handle_event(internal, _, {assemble, []}, St = #st{}) ->
{next_state, complete, St, ?internal([])};
handle_event(internal, _, complete, St = #st{assembly = Asm, file = Handle, callback = Callback}) ->
Filemeta = emqx_ft_assembly:filemeta(Asm),
Result = emqx_ft_storage_fs:complete(St#st.storage, St#st.transfer, Filemeta, Handle),
%% TODO: safe apply
_ = Callback(Result),
{stop, shutdown}.
% handle_continue(list_local, St = #st{storage = Storage, transfer = Transfer, assembly = Asm}) ->
% % TODO: what we do with non-transients errors here (e.g. `eacces`)?
% {ok, Fragments} = emqx_ft_storage_fs:list(Storage, Transfer),
% NAsm = emqx_ft_assembly:update(emqx_ft_assembly:append(Asm, node(), Fragments)),
% NSt = St#st{assembly = NAsm},
% case emqx_ft_assembly:status(NAsm) of
% complete ->
% {noreply, NSt, {continue}};
% {more, _} ->
% error(noimpl);
% {error, _} ->
% error(noimpl)
% end,
% {noreply, St}.
% handle_call(_Call, _From, St) ->
% {reply, {error, badcall}, St}.
% handle_cast(_Cast, St) ->
% {noreply, St}.
pread(Node, Segment, St) when Node =:= node() ->
emqx_ft_storage_fs:pread(St#st.storage, St#st.transfer, Segment, 0, segsize(Segment));
pread(Node, Segment, St) ->
emqx_ft_storage_fs_proto_v1:pread(Node, St#st.transfer, Segment, 0, segsize(Segment)).
%%
segsize(#{fragment := {segment, Info}}) ->
maps:get(size, Info).

View File

@ -0,0 +1,42 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_assembler_sup).
-export([start_link/0]).
-export([start_child/3]).
-behaviour(supervisor).
-export([init/1]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
start_child(Storage, Transfer, Callback) ->
Childspec = #{
id => {Storage, Transfer},
start => {emqx_ft_assembler, start_link, [Storage, Transfer, Callback]},
restart => transient
},
supervisor:start_child(?MODULE, Childspec).
init(_) ->
SupFlags = #{
strategy => one_for_one,
intensity => 10,
period => 1000
},
{ok, {SupFlags, []}}.

View File

@ -0,0 +1,364 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_assembly).
-export([new/0]).
-export([append/3]).
-export([update/1]).
-export([status/1]).
-export([filemeta/1]).
-export([coverage/1]).
-export([properties/1]).
-record(asm, {
status :: _TODO,
coverage :: _TODO,
properties :: _TODO,
meta :: _TODO,
% orddict:orddict(K, V)
segs :: _TODO,
size
}).
new() ->
#asm{
status = {incomplete, {missing, filemeta}},
meta = orddict:new(),
segs = orddict:new(),
size = 0
}.
append(Asm, Node, Fragments) when is_list(Fragments) ->
lists:foldl(fun(F, AsmIn) -> append(AsmIn, Node, F) end, Asm, Fragments);
append(Asm, Node, Fragment = #{fragment := {filemeta, _}}) ->
append_filemeta(Asm, Node, Fragment);
append(Asm, Node, Segment = #{fragment := {segment, _}}) ->
append_segmentinfo(Asm, Node, Segment).
update(Asm) ->
case status(meta, Asm) of
{complete, _Meta} ->
case status(coverage, Asm) of
{complete, Coverage, Props} ->
Asm#asm{
status = complete,
coverage = Coverage,
properties = Props
};
Status ->
Asm#asm{status = Status}
end;
Status ->
Asm#asm{status = Status}
end.
status(#asm{status = Status}) ->
Status.
filemeta(Asm) ->
case status(meta, Asm) of
{complete, Meta} -> Meta;
_Other -> undefined
end.
coverage(#asm{coverage = Coverage}) ->
Coverage.
properties(#asm{properties = Properties}) ->
Properties.
status(meta, #asm{meta = Meta}) ->
status(meta, orddict:to_list(Meta));
status(meta, [{Meta, {_Node, _Frag}}]) ->
{complete, Meta};
status(meta, []) ->
{incomplete, {missing, filemeta}};
status(meta, [_M1, _M2 | _] = Metas) ->
{error, {inconsistent, [Frag#{node => Node} || {_, {Node, Frag}} <- Metas]}};
status(coverage, #asm{segs = Segments, size = Size}) ->
case coverage(orddict:to_list(Segments), 0, Size) of
Coverage when is_list(Coverage) ->
{complete, Coverage, #{
dominant => dominant(Coverage)
}};
Missing = {missing, _} ->
{incomplete, Missing}
end.
append_filemeta(Asm, Node, Fragment = #{fragment := {filemeta, Meta}}) ->
Asm#asm{
meta = orddict:store(Meta, {Node, Fragment}, Asm#asm.meta)
}.
append_segmentinfo(Asm, Node, Fragment = #{fragment := {segment, Info}}) ->
Offset = maps:get(offset, Info),
Size = maps:get(size, Info),
End = Offset + Size,
Asm#asm{
% TODO
% In theory it's possible to have two segments with same offset + size on
% different nodes but with differing content. We'd need a checksum to
% be able to disambiguate them though.
segs = orddict:store({Offset, locality(Node), -End, Node}, Fragment, Asm#asm.segs),
size = max(End, Asm#asm.size)
}.
coverage([{{Offset, _, _, _}, _Segment} | Rest], Cursor, Sz) when Offset < Cursor ->
coverage(Rest, Cursor, Sz);
coverage([{{Cursor, _Locality, MEnd, Node}, Segment} | Rest], Cursor, Sz) ->
% NOTE
% We consider only whole fragments here, so for example from the point of view of
% this algo `[{Offset1 = 0, Size1 = 15}, {Offset2 = 10, Size2 = 10}]` has no
% coverage.
case coverage(Rest, -MEnd, Sz) of
Coverage when is_list(Coverage) ->
[{Node, Segment} | Coverage];
Missing = {missing, _} ->
case coverage(Rest, Cursor, Sz) of
CoverageAlt when is_list(CoverageAlt) ->
CoverageAlt;
{missing, _} ->
Missing
end
end;
coverage([{{Offset, _MEnd, _, _}, _Segment} | _], Cursor, _Sz) when Offset > Cursor ->
{missing, {segment, Cursor, Offset}};
coverage([], Cursor, Sz) when Cursor < Sz ->
{missing, {segment, Cursor, Sz}};
coverage([], Cursor, Cursor) ->
[].
dominant(Coverage) ->
% TODO: needs improvement, better defined _dominance_, maybe some score
Freqs = frequencies(fun({Node, Segment}) -> {Node, segsize(Segment)} end, Coverage),
maxfreq(Freqs, node()).
frequencies(Fun, List) ->
lists:foldl(
fun(E, Acc) ->
{K, N} = Fun(E),
maps:update_with(K, fun(M) -> M + N end, N, Acc)
end,
#{},
List
).
maxfreq(Freqs, Init) ->
{_, Max} = maps:fold(
fun
(F, N, {M, _MF}) when N > M -> {N, F};
(_F, _N, {M, MF}) -> {M, MF}
end,
{0, Init},
Freqs
),
Max.
locality(Node) when Node =:= node() ->
% NOTE
% This should prioritize locally available segments over those on remote nodes.
0;
locality(_RemoteNode) ->
1.
segsize(#{fragment := {segment, Info}}) ->
maps:get(size, Info).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
incomplete_new_test() ->
?assertEqual(
{incomplete, {missing, filemeta}},
status(update(new()))
).
incomplete_test() ->
?assertEqual(
{incomplete, {missing, filemeta}},
status(
update(
append(new(), node(), [
segment(p1, 0, 42),
segment(p1, 42, 100)
])
)
)
).
consistent_test() ->
Asm1 = append(new(), n1, [filemeta(m1, "blarg")]),
Asm2 = append(Asm1, n2, [segment(s2, 0, 42)]),
Asm3 = append(Asm2, n3, [filemeta(m3, "blarg")]),
?assertMatch({complete, _}, status(meta, Asm3)).
inconsistent_test() ->
Asm1 = append(new(), node(), [segment(s1, 0, 42)]),
Asm2 = append(Asm1, n1, [filemeta(m1, "blarg")]),
Asm3 = append(Asm2, n2, [segment(s2, 0, 42), filemeta(m1, "blorg")]),
Asm4 = append(Asm3, n3, [filemeta(m3, "blarg")]),
?assertMatch(
{error,
{inconsistent, [
% blarg < blorg
#{node := n3, path := m3, fragment := {filemeta, #{name := "blarg"}}},
#{node := n2, path := m1, fragment := {filemeta, #{name := "blorg"}}}
]}},
status(meta, Asm4)
).
simple_coverage_test() ->
Node = node(),
Segs = [
{node42, segment(n1, 20, 30)},
{Node, segment(n2, 0, 10)},
{Node, segment(n3, 50, 50)},
{Node, segment(n4, 10, 10)}
],
Asm = append_many(new(), Segs),
?assertMatch(
{complete,
[
{Node, #{path := n2}},
{Node, #{path := n4}},
{node42, #{path := n1}},
{Node, #{path := n3}}
],
#{dominant := Node}},
status(coverage, Asm)
).
redundant_coverage_test() ->
Node = node(),
Segs = [
{Node, segment(n1, 0, 20)},
{node1, segment(n2, 0, 10)},
{Node, segment(n3, 20, 40)},
{node2, segment(n4, 10, 10)},
{node2, segment(n5, 50, 20)},
{node3, segment(n6, 20, 20)},
{Node, segment(n7, 50, 10)},
{node1, segment(n8, 40, 10)}
],
Asm = append_many(new(), Segs),
?assertMatch(
{complete,
[
{Node, #{path := n1}},
{node3, #{path := n6}},
{node1, #{path := n8}},
{node2, #{path := n5}}
],
#{dominant := _}},
status(coverage, Asm)
).
redundant_coverage_prefer_local_test() ->
Node = node(),
Segs = [
{node1, segment(n1, 0, 20)},
{Node, segment(n2, 0, 10)},
{Node, segment(n3, 10, 10)},
{node2, segment(n4, 20, 20)},
{Node, segment(n5, 30, 10)},
{Node, segment(n6, 20, 10)}
],
Asm = append_many(new(), Segs),
?assertMatch(
{complete,
[
{Node, #{path := n2}},
{Node, #{path := n3}},
{Node, #{path := n6}},
{Node, #{path := n5}}
],
#{dominant := Node}},
status(coverage, Asm)
).
missing_coverage_test() ->
Node = node(),
Segs = [
{Node, segment(n1, 0, 10)},
{node1, segment(n3, 10, 20)},
{Node, segment(n2, 0, 20)},
{node2, segment(n4, 50, 50)},
{Node, segment(n5, 40, 60)}
],
Asm = append_many(new(), Segs),
?assertEqual(
% {incomplete, {missing, {segment, 30, 40}}}, ???
{incomplete, {missing, {segment, 20, 40}}},
status(coverage, Asm)
).
missing_end_coverage_test() ->
Node = node(),
Segs = [
{Node, segment(n1, 0, 15)},
{node1, segment(n3, 10, 10)}
],
Asm = append_many(new(), Segs),
?assertEqual(
{incomplete, {missing, {segment, 15, 20}}},
status(coverage, Asm)
).
missing_coverage_with_redudancy_test() ->
Segs = [
{node(), segment(n1, 0, 10)},
{node(), segment(n2, 0, 20)},
{node42, segment(n3, 10, 20)},
{node43, segment(n4, 10, 50)},
{node(), segment(n5, 40, 60)}
],
Asm = append_many(new(), Segs),
?assertEqual(
% {incomplete, {missing, {segment, 50, 60}}}, ???
{incomplete, {missing, {segment, 20, 40}}},
status(coverage, Asm)
).
append_many(Asm, List) ->
lists:foldl(
fun({Node, Frag}, Acc) -> append(Acc, Node, Frag) end,
Asm,
List
).
filemeta(Path, Name) ->
#{
path => Path,
fragment =>
{filemeta, #{
name => Name
}}
}.
segment(Path, Offset, Size) ->
#{
path => Path,
fragment =>
{segment, #{
offset => Offset,
size => Size
}}
}.
-endif.

View File

@ -0,0 +1,65 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc File Transfer configuration management module
-module(emqx_ft_conf).
-behaviour(emqx_config_handler).
%% Load/Unload
-export([
load/0,
unload/0
]).
%% callbacks for emqx_config_handler
-export([
pre_config_update/3,
post_config_update/5
]).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec load() -> ok.
load() ->
emqx_conf:add_handler([file_transfer], ?MODULE).
-spec unload() -> ok.
unload() ->
emqx_conf:remove_handler([file_transfer]).
%%--------------------------------------------------------------------
%% emqx_config_handler callbacks
%%--------------------------------------------------------------------
-spec pre_config_update(list(atom()), emqx_config:update_request(), emqx_config:raw_config()) ->
{ok, emqx_config:update_request()} | {error, term()}.
pre_config_update(_, _Req, Config) ->
{ok, Config}.
-spec post_config_update(
list(atom()),
emqx_config:update_request(),
emqx_config:config(),
emqx_config:config(),
emqx_config:app_envs()
) ->
ok | {ok, Result :: any()} | {error, Reason :: term()}.
post_config_update(_, _Req, _NewConfig, _OldConfig, _AppEnvs) ->
ok.

View File

@ -0,0 +1,115 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_responder).
-behaviour(gen_server).
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/types.hrl").
-export([start_link/0]).
-export([
register/3,
unregister/1
]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-define(TAB, ?MODULE).
-type key() :: term().
%%--------------------------------------------------------------------
%% API
%% -------------------------------------------------------------------
-spec start_link() -> startlink_ret().
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-spec register(Key, DefaultAction, Timeout) -> ok | {error, already_registered} when
Key :: key(),
DefaultAction :: fun((Key) -> any()),
Timeout :: timeout().
register(Key, DefaultAction, Timeout) ->
case ets:lookup(?TAB, Key) of
[] ->
gen_server:call(?SERVER, {register, Key, DefaultAction, Timeout});
[{Key, _Action, _Ref}] ->
{error, already_registered}
end.
-spec unregister(Key) -> ok | {error, not_found} when
Key :: key().
unregister(Key) ->
gen_server:call(?SERVER, {unregister, Key}).
%%--------------------------------------------------------------------
%% gen_server callbacks
%% -------------------------------------------------------------------
init([]) ->
_ = ets:new(?TAB, [named_table, protected, set, {read_concurrency, true}]),
{ok, #{}}.
handle_call({register, Key, DefaultAction, Timeout}, _From, State) ->
?SLOG(warning, #{msg => "register", key => Key, timeout => Timeout}),
case ets:lookup(?TAB, Key) of
[] ->
TRef = erlang:start_timer(Timeout, self(), {timeout, Key}),
true = ets:insert(?TAB, {Key, DefaultAction, TRef}),
{reply, ok, State};
[{_, _Action, _Ref}] ->
{reply, {error, already_registered}, State}
end;
handle_call({unregister, Key}, _From, State) ->
?SLOG(warning, #{msg => "unregister", key => Key}),
case ets:lookup(?TAB, Key) of
[] ->
{reply, {error, not_found}, State};
[{_, _Action, TRef}] ->
_ = erlang:cancel_timer(TRef),
true = ets:delete(?TAB, Key),
{reply, ok, State}
end.
handle_cast(Msg, State) ->
?SLOG(warning, #{msg => "unknown cast", cast_msg => Msg}),
{noreply, State}.
handle_info({timeout, TRef, {timeout, Key}}, State) ->
case ets:lookup(?TAB, Key) of
[] ->
{noreply, State};
[{_, Action, TRef}] ->
_ = erlang:cancel_timer(TRef),
true = ets:delete(?TAB, Key),
%% TODO: safe apply
_ = Action(Key),
{noreply, State}
end;
handle_info(Msg, State) ->
?SLOG(warning, #{msg => "unknown message", info_msg => Msg}),
{noreply, State}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
terminate(_Reason, _State) ->
ok.

View File

@ -0,0 +1,89 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_schema).
-behaviour(hocon_schema).
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("typerefl/include/types.hrl").
-export([namespace/0, roots/0, fields/1, tags/0]).
-export([schema/1]).
-type json_value() ::
null
| boolean()
| binary()
| number()
| [json_value()]
| #{binary() => json_value()}.
-reflect_type([json_value/0]).
%%
namespace() -> file_transfer.
tags() ->
[<<"File Transfer">>].
roots() -> [file_transfer].
fields(file_transfer) ->
[
{storage, #{
type => hoconsc:union([
hoconsc:ref(?MODULE, local_storage)
])
}}
];
fields(local_storage) ->
[
{type, #{
type => local,
default => local,
required => false,
desc => ?DESC("local")
}}
].
schema(filemeta) ->
#{
roots => [
{name, hoconsc:mk(string(), #{required => true})},
{size, hoconsc:mk(non_neg_integer())},
{expire_at, hoconsc:mk(non_neg_integer())},
{checksum, hoconsc:mk({atom(), binary()}, #{converter => converter(checksum)})},
{segments_ttl, hoconsc:mk(pos_integer())},
{user_data, hoconsc:mk(json_value())}
]
}.
converter(checksum) ->
fun
(undefined, #{}) ->
undefined;
({sha256, Bin}, #{make_serializable := true}) ->
_ = is_binary(Bin) orelse throw({expected_type, string}),
_ = byte_size(Bin) =:= 32 orelse throw({expected_length, 32}),
binary:encode_hex(Bin);
(Hex, #{}) ->
_ = is_binary(Hex) orelse throw({expected_type, string}),
_ = byte_size(Hex) =:= 64 orelse throw({expected_length, 64}),
{sha256, binary:decode_hex(Hex)}
end.

View File

@ -0,0 +1,130 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_storage).
-export(
[
store_filemeta/2,
store_segment/2,
assemble/2,
parse_id/1,
ready_transfers/0,
get_ready_transfer/1,
with_storage_type/3
]
).
-type storage() :: emqx_config:config().
-export_type([assemble_callback/0]).
-type assemble_callback() :: fun((ok | {error, term()}) -> any()).
-type ready_transfer_id() :: term().
-type ready_transfer_info() :: map().
-type ready_transfer_data() :: binary().
%%--------------------------------------------------------------------
%% Behaviour
%%--------------------------------------------------------------------
-callback store_filemeta(storage(), emqx_ft:transfer(), emqx_ft:filemeta()) ->
ok | {error, term()}.
-callback store_segment(storage(), emqx_ft:transfer(), emqx_ft:segment()) ->
ok | {error, term()}.
-callback assemble(storage(), emqx_ft:transfer(), assemble_callback()) ->
{ok, pid()} | {error, term()}.
-callback ready_transfers(storage()) ->
{ok, [{ready_transfer_id(), ready_transfer_info()}]} | {error, term()}.
-callback get_ready_transfer(storage(), ready_transfer_id()) ->
{ok, ready_transfer_data()} | {error, term()}.
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec store_filemeta(emqx_ft:transfer(), emqx_ft:filemeta()) ->
ok | {error, term()}.
store_filemeta(Transfer, FileMeta) ->
Mod = mod(),
Mod:store_filemeta(storage(), Transfer, FileMeta).
-spec store_segment(emqx_ft:transfer(), emqx_ft:segment()) ->
ok | {error, term()}.
store_segment(Transfer, Segment) ->
Mod = mod(),
Mod:store_segment(storage(), Transfer, Segment).
-spec assemble(emqx_ft:transfer(), assemble_callback()) ->
{ok, pid()} | {error, term()}.
assemble(Transfer, Callback) ->
Mod = mod(),
Mod:assemble(storage(), Transfer, Callback).
-spec ready_transfers() -> {ok, [{ready_transfer_id(), ready_transfer_info()}]} | {error, term()}.
ready_transfers() ->
Mod = mod(),
Mod:ready_transfers(storage()).
-spec get_ready_transfer(ready_transfer_id()) -> {ok, ready_transfer_data()} | {error, term()}.
get_ready_transfer(ReadyTransferId) ->
Mod = mod(),
Mod:get_ready_transfer(storage(), ReadyTransferId).
-spec parse_id(map()) -> {ok, ready_transfer_id()} | {error, term()}.
parse_id(#{
<<"type">> := local, <<"node">> := NodeBin, <<"clientid">> := ClientId, <<"id">> := Id
}) ->
case emqx_misc:safe_to_existing_atom(NodeBin) of
{ok, Node} ->
{ok, {local, Node, ClientId, Id}};
{error, _} ->
{error, {invalid_node, NodeBin}}
end;
parse_id(#{}) ->
{error, invalid_file_id}.
-spec with_storage_type(atom(), atom(), list(term())) -> any().
with_storage_type(Type, Fun, Args) ->
Storage = storage(),
case Storage of
#{type := Type} ->
Mod = mod(Storage),
apply(Mod, Fun, [Storage | Args]);
_ ->
{error, {invalid_storage_type, Type}}
end.
%%--------------------------------------------------------------------
%% Local FS API
%%--------------------------------------------------------------------
storage() ->
emqx_config:get([file_transfer, storage]).
mod() ->
mod(storage()).
mod(Storage) ->
case Storage of
#{type := local} ->
emqx_ft_storage_fs
% emqx_ft_storage_dummy
end.

View File

@ -0,0 +1,43 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_storage_dummy).
-behaviour(emqx_ft_storage).
-export([
store_filemeta/3,
store_segment/3,
assemble/3,
ready_transfers/1,
get_ready_transfer/2
]).
store_filemeta(_Storage, _Transfer, _Meta) ->
ok.
store_segment(_Storage, _Transfer, _Segment) ->
ok.
assemble(_Storage, _Transfer, Callback) ->
Pid = spawn(fun() -> Callback({error, not_implemented}) end),
{ok, Pid}.
ready_transfers(_Storage) ->
{ok, []}.
get_ready_transfer(_Storage, _Id) ->
{error, not_implemented}.

View File

@ -0,0 +1,520 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_storage_fs).
-behaviour(emqx_ft_storage).
-include_lib("emqx/include/logger.hrl").
-export([store_filemeta/3]).
-export([store_segment/3]).
-export([list/3]).
-export([pread/5]).
-export([assemble/3]).
-export([transfers/1]).
-export([pread_local/4]).
-export([list_local/2]).
-export([ready_transfers_local/0, ready_transfers_local/1]).
-export([get_ready_transfer_local/1, get_ready_transfer_local/2]).
-export([ready_transfers/1]).
-export([get_ready_transfer/2]).
-export([open_file/3]).
-export([complete/4]).
-export([write/2]).
-export([discard/1]).
-export_type([filefrag/1]).
-export_type([filefrag/0]).
-export_type([transferinfo/0]).
-type transfer() :: emqx_ft:transfer().
-type offset() :: emqx_ft:offset().
-type filemeta() :: emqx_ft:filemeta().
-type segment() :: emqx_ft:segment().
-type segmentinfo() :: #{
offset := offset(),
size := _Bytes :: non_neg_integer()
}.
-type transferinfo() :: #{
status := complete | incomplete,
result => [filefrag({result, #{}})]
}.
% TODO naming
-type filefrag(T) :: #{
path := file:name(),
timestamp := emqx_datetime:epoch_second(),
size := _Bytes :: non_neg_integer(),
fragment := T
}.
-type filefrag() :: filefrag(
{filemeta, filemeta()}
| {segment, segmentinfo()}
| {result, #{}}
).
-define(FRAGDIR, frags).
-define(TEMPDIR, tmp).
-define(RESULTDIR, result).
-define(MANIFEST, "MANIFEST.json").
-define(SEGMENT, "SEG").
%% TODO
-type storage() :: emqx_config:config().
%% Store manifest in the backing filesystem.
%% Atomic operation.
-spec store_filemeta(storage(), transfer(), filemeta()) ->
% Quota? Some lower level errors?
{ok, emqx_ft_storage:ctx()} | {error, conflict} | {error, _TODO}.
store_filemeta(Storage, Transfer, Meta) ->
% TODO safeguard against bad clientids / fileids.
Filepath = mk_filepath(Storage, Transfer, [?FRAGDIR], ?MANIFEST),
case read_file(Filepath, fun decode_filemeta/1) of
{ok, Meta} ->
_ = touch_file(Filepath),
ok;
{ok, _Conflict} ->
% TODO
% We won't see conflicts in case of concurrent `store_filemeta`
% requests. It's rather odd scenario so it's fine not to worry
% about it too much now.
{error, conflict};
{error, Reason} when Reason =:= notfound; Reason =:= corrupted; Reason =:= enoent ->
write_file_atomic(Storage, Transfer, Filepath, encode_filemeta(Meta))
end.
%% Store a segment in the backing filesystem.
%% Atomic operation.
-spec store_segment(storage(), transfer(), segment()) ->
% Where is the checksum gets verified? Upper level probably.
% Quota? Some lower level errors?
ok | {error, _TODO}.
store_segment(Storage, Transfer, Segment = {_Offset, Content}) ->
Filepath = mk_filepath(Storage, Transfer, [?FRAGDIR], mk_segment_filename(Segment)),
write_file_atomic(Storage, Transfer, Filepath, Content).
-spec list(storage(), transfer(), _What :: fragment | result) ->
% Some lower level errors? {error, notfound}?
% Result will contain zero or only one filemeta.
{ok, [filefrag({filemeta, filemeta()} | {segment, segmentinfo()})]} | {error, _TODO}.
list(Storage, Transfer, What) ->
Dirname = mk_filedir(Storage, Transfer, get_subdirs_for(What)),
case file:list_dir(Dirname) of
{ok, Filenames} ->
% TODO
% In case of `What = result` there might be more than one file (though
% extremely bad luck is needed for that, e.g. concurrent assemblers with
% different filemetas from different nodes). This might be unexpected for a
% client given the current protocol, yet might be helpful in the future.
{ok, filtermap_files(get_filefrag_fun_for(What), Dirname, Filenames)};
{error, enoent} ->
{ok, []};
{error, _} = Error ->
Error
end.
get_subdirs_for(fragment) ->
[?FRAGDIR];
get_subdirs_for(result) ->
[?RESULTDIR].
get_filefrag_fun_for(fragment) ->
fun mk_filefrag/2;
get_filefrag_fun_for(result) ->
fun mk_result_filefrag/2.
-spec pread(storage(), transfer(), filefrag(), offset(), _Size :: non_neg_integer()) ->
{ok, _Content :: iodata()} | {error, _TODO}.
pread(_Storage, _Transfer, Frag, Offset, Size) ->
Filepath = maps:get(path, Frag),
case file:open(Filepath, [read, raw, binary]) of
{ok, IoDevice} ->
% NOTE
% Reading empty file is always `eof`.
Read = file:pread(IoDevice, Offset, Size),
ok = file:close(IoDevice),
case Read of
{ok, Content} ->
{ok, Content};
eof ->
{error, eof};
{error, Reason} ->
{error, Reason}
end;
{error, Reason} ->
{error, Reason}
end.
-spec assemble(storage(), transfer(), fun((ok | {error, term()}) -> any())) ->
% {ok, _Assembler :: pid()} | {error, incomplete} | {error, badrpc} | {error, _TODO}.
{ok, _Assembler :: pid()} | {error, _TODO}.
assemble(Storage, Transfer, Callback) ->
emqx_ft_assembler_sup:start_child(Storage, Transfer, Callback).
-spec list_local(transfer(), fragment | result) ->
{ok, [filefrag()]} | {error, term()}.
list_local(Transfer, What) ->
emqx_ft_storage:with_storage_type(local, list, [Transfer, What]).
-spec pread_local(transfer(), filefrag(), offset(), _Size :: non_neg_integer()) ->
{ok, [filefrag()]} | {error, term()}.
pread_local(Transfer, Frag, Offset, Size) ->
emqx_ft_storage:with_storage_type(local, pread, [Transfer, Frag, Offset, Size]).
get_ready_transfer(_Storage, ReadyTransferId) ->
case parse_ready_transfer_id(ReadyTransferId) of
{ok, {Node, Transfer}} ->
try
emqx_ft_storage_fs_proto_v1:get_ready_transfer(Node, Transfer)
catch
error:Error ->
{error, Error};
C:Error ->
{error, {C, Error}}
end;
{error, _} = Error ->
Error
end.
get_ready_transfer_local(Transfer) ->
emqx_ft_storage:with_storage_type(local, get_ready_transfer_local, [Transfer]).
get_ready_transfer_local(Storage, Transfer) ->
Dirname = mk_filedir(Storage, Transfer, get_subdirs_for(result)),
case file:list_dir(Dirname) of
{ok, [Filename | _]} ->
file:read_file(filename:join([Dirname, Filename]));
{error, _} = Error ->
Error
end.
ready_transfers(_Storage) ->
Nodes = mria_mnesia:running_nodes(),
Results = emqx_ft_storage_fs_proto_v1:ready_transfers(Nodes),
{GoodResults, BadResults} = lists:partition(
fun
({ok, _}) -> true;
(_) -> false
end,
Results
),
?SLOG(warning, #{msg => "ready_transfers", failures => BadResults}),
{ok, [File || {ok, Files} <- GoodResults, File <- Files]}.
ready_transfers_local() ->
emqx_ft_storage:with_storage_type(local, ready_transfers_local, []).
ready_transfers_local(Storage) ->
{ok, Transfers} = transfers(Storage),
lists:filtermap(
fun
({Transfer, #{status := complete, result := [Result | _]}}) ->
{true, {ready_transfer_id(Transfer), maps:without([fragment], Result)}};
(_) ->
false
end,
maps:to_list(Transfers)
).
ready_transfer_id({ClientId, FileId}) ->
#{
<<"node">> => atom_to_binary(node()),
<<"clientid">> => ClientId,
<<"fileid">> => FileId
}.
parse_ready_transfer_id(#{
<<"node">> := NodeBin, <<"clientid">> := ClientId, <<"fileid">> := FileId
}) ->
case emqx_misc:safe_to_existing_atom(NodeBin) of
{ok, Node} ->
{ok, {Node, {ClientId, FileId}}};
{error, _} ->
{error, {invalid_node, NodeBin}}
end;
parse_ready_transfer_id(#{}) ->
{error, invalid_file_id}.
-spec transfers(storage()) ->
{ok, #{transfer() => transferinfo()}}.
transfers(Storage) ->
% TODO `Continuation`
% There might be millions of transfers on the node, we need a protocol and
% storage schema to iterate through them effectively.
ClientIds = try_list_dir(get_storage_root(Storage)),
{ok,
lists:foldl(
fun(ClientId, Acc) -> transfers(Storage, ClientId, Acc) end,
#{},
ClientIds
)}.
transfers(Storage, ClientId, AccIn) ->
Dirname = mk_client_filedir(Storage, ClientId),
case file:list_dir(Dirname) of
{ok, FileIds} ->
lists:foldl(
fun(FileId, Acc) ->
Transfer = {filename_to_binary(ClientId), filename_to_binary(FileId)},
read_transferinfo(Storage, Transfer, Acc)
end,
AccIn,
FileIds
);
{error, _Reason} ->
% TODO worth logging
AccIn
end.
read_transferinfo(Storage, Transfer, Acc) ->
case list(Storage, Transfer, result) of
{ok, Result = [_ | _]} ->
Info = #{status => complete, result => Result},
Acc#{Transfer => Info};
{ok, []} ->
Info = #{status => incomplete},
Acc#{Transfer => Info};
{error, _Reason} ->
% TODO worth logging
Acc
end.
%%
-type handle() :: {file:name(), io:device(), crypto:hash_state()}.
-spec open_file(storage(), transfer(), filemeta()) ->
{ok, handle()} | {error, _TODO}.
open_file(Storage, Transfer, Filemeta) ->
Filename = maps:get(name, Filemeta),
TempFilepath = mk_temp_filepath(Storage, Transfer, Filename),
_ = filelib:ensure_dir(TempFilepath),
case file:open(TempFilepath, [write, raw, binary]) of
{ok, Handle} ->
_ = file:truncate(Handle),
{ok, {TempFilepath, Handle, init_checksum(Filemeta)}};
{error, _} = Error ->
Error
end.
-spec write(handle(), iodata()) ->
ok | {error, _TODO}.
write({Filepath, IoDevice, Ctx}, IoData) ->
case file:write(IoDevice, IoData) of
ok ->
{ok, {Filepath, IoDevice, update_checksum(Ctx, IoData)}};
{error, _} = Error ->
Error
end.
-spec complete(storage(), transfer(), filemeta(), handle()) ->
ok | {error, {checksum, _Algo, _Computed}} | {error, _TODO}.
complete(Storage, Transfer, Filemeta, Handle = {Filepath, IoDevice, Ctx}) ->
TargetFilepath = mk_filepath(Storage, Transfer, [?RESULTDIR], maps:get(name, Filemeta)),
case verify_checksum(Ctx, Filemeta) of
ok ->
ok = file:close(IoDevice),
mv_temp_file(Filepath, TargetFilepath);
{error, _} = Error ->
_ = discard(Handle),
Error
end.
-spec discard(handle()) ->
ok.
discard({Filepath, IoDevice, _Ctx}) ->
ok = file:close(IoDevice),
file:delete(Filepath).
init_checksum(#{checksum := {Algo, _}}) ->
crypto:hash_init(Algo);
init_checksum(#{}) ->
undefined.
update_checksum(Ctx, IoData) when Ctx /= undefined ->
crypto:hash_update(Ctx, IoData);
update_checksum(undefined, _IoData) ->
undefined.
verify_checksum(Ctx, #{checksum := {Algo, Digest}}) when Ctx /= undefined ->
case crypto:hash_final(Ctx) of
Digest ->
ok;
Mismatch ->
{error, {checksum, Algo, binary:encode_hex(Mismatch)}}
end;
verify_checksum(undefined, _) ->
ok.
-define(PRELUDE(Vsn, Meta), [<<"filemeta">>, Vsn, Meta]).
encode_filemeta(Meta) ->
% TODO: Looks like this should be hocon's responsibility.
Schema = emqx_ft_schema:schema(filemeta),
Term = hocon_tconf:make_serializable(Schema, emqx_map_lib:binary_key_map(Meta), #{}),
emqx_json:encode(?PRELUDE(_Vsn = 1, Term)).
decode_filemeta(Binary) ->
Schema = emqx_ft_schema:schema(filemeta),
?PRELUDE(_Vsn = 1, Term) = emqx_json:decode(Binary, [return_maps]),
hocon_tconf:check_plain(Schema, Term, #{atom_key => true, required => false}).
mk_segment_filename({Offset, Content}) ->
lists:concat([?SEGMENT, ".", Offset, ".", byte_size(Content)]).
break_segment_filename(Filename) ->
Regex = "^" ?SEGMENT "[.]([0-9]+)[.]([0-9]+)$",
Result = re:run(Filename, Regex, [{capture, all_but_first, list}]),
case Result of
{match, [Offset, Size]} ->
{ok, #{offset => list_to_integer(Offset), size => list_to_integer(Size)}};
nomatch ->
{error, invalid}
end.
mk_filedir(Storage, {ClientId, FileId}, SubDirs) ->
filename:join([get_storage_root(Storage), ClientId, FileId | SubDirs]).
mk_client_filedir(Storage, ClientId) ->
filename:join([get_storage_root(Storage), ClientId]).
mk_filepath(Storage, Transfer, SubDirs, Filename) ->
filename:join(mk_filedir(Storage, Transfer, SubDirs), Filename).
try_list_dir(Dirname) ->
case file:list_dir(Dirname) of
{ok, List} -> List;
{error, _} -> []
end.
get_storage_root(Storage) ->
maps:get(root, Storage, filename:join(emqx:data_dir(), "file_transfer")).
-include_lib("kernel/include/file.hrl").
read_file(Filepath) ->
file:read_file(Filepath).
read_file(Filepath, DecodeFun) ->
case read_file(Filepath) of
{ok, Content} ->
safe_decode(Content, DecodeFun);
{error, _} = Error ->
Error
end.
safe_decode(Content, DecodeFun) ->
try
{ok, DecodeFun(Content)}
catch
_C:_R:_Stacktrace ->
% TODO: Log?
{error, corrupted}
end.
write_file_atomic(Storage, Transfer, Filepath, Content) when is_binary(Content) ->
TempFilepath = mk_temp_filepath(Storage, Transfer, filename:basename(Filepath)),
Result = emqx_misc:pipeline(
[
fun filelib:ensure_dir/1,
fun write_contents/2,
fun(_) -> mv_temp_file(TempFilepath, Filepath) end
],
TempFilepath,
Content
),
case Result of
{ok, _, _} ->
_ = file:delete(TempFilepath),
ok;
{error, Reason, _} ->
{error, Reason}
end.
mk_temp_filepath(Storage, Transfer, Filename) ->
Unique = erlang:unique_integer([positive]),
filename:join(mk_filedir(Storage, Transfer, [?TEMPDIR]), mk_filename([Unique, ".", Filename])).
mk_filename(Comps) ->
lists:append(lists:map(fun mk_filename_component/1, Comps)).
mk_filename_component(I) when is_integer(I) -> integer_to_list(I);
mk_filename_component(A) when is_atom(A) -> atom_to_list(A);
mk_filename_component(B) when is_binary(B) -> unicode:characters_to_list(B);
mk_filename_component(S) when is_list(S) -> S.
write_contents(Filepath, Content) ->
file:write_file(Filepath, Content).
mv_temp_file(TempFilepath, Filepath) ->
_ = filelib:ensure_dir(Filepath),
file:rename(TempFilepath, Filepath).
touch_file(Filepath) ->
Now = erlang:localtime(),
file:change_time(Filepath, _Mtime = Now, _Atime = Now).
filtermap_files(Fun, Dirname, Filenames) ->
lists:filtermap(fun(Filename) -> Fun(Dirname, Filename) end, Filenames).
mk_filefrag(Dirname, Filename = ?MANIFEST) ->
mk_filefrag(Dirname, Filename, filemeta, fun read_filemeta/2);
mk_filefrag(Dirname, Filename = ?SEGMENT ++ _) ->
mk_filefrag(Dirname, Filename, segment, fun read_segmentinfo/2);
mk_filefrag(_Dirname, _Filename) ->
% TODO this is unexpected, worth logging?
false.
mk_result_filefrag(Dirname, Filename) ->
% NOTE
% Any file in the `?RESULTDIR` subdir is currently considered the result of
% the file transfer.
mk_filefrag(Dirname, Filename, result, fun(_, _) -> {ok, #{}} end).
mk_filefrag(Dirname, Filename, Tag, Fun) ->
Filepath = filename:join(Dirname, Filename),
% TODO error handling?
{ok, Fileinfo} = file:read_file_info(Filepath),
case Fun(Filename, Filepath) of
{ok, Frag} ->
{true, #{
path => Filepath,
timestamp => Fileinfo#file_info.mtime,
size => Fileinfo#file_info.size,
fragment => {Tag, Frag}
}};
{error, _Reason} ->
% TODO loss of information
false
end.
read_filemeta(_Filename, Filepath) ->
read_file(Filepath, fun decode_filemeta/1).
read_segmentinfo(Filename, _Filepath) ->
break_segment_filename(Filename).
filename_to_binary(S) when is_list(S) -> unicode:characters_to_binary(S);
filename_to_binary(B) when is_binary(B) -> B.

View File

@ -0,0 +1,56 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_sup).
-behaviour(supervisor).
-export([start_link/0]).
-export([init/1]).
-define(SERVER, ?MODULE).
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
init([]) ->
SupFlags = #{
strategy => one_for_all,
intensity => 100,
period => 10
},
AssemblerSup = #{
id => emqx_ft_assembler_sup,
start => {emqx_ft_assembler_sup, start_link, []},
restart => permanent,
shutdown => infinity,
type => supervisor,
modules => [emqx_ft_assembler_sup]
},
Responder = #{
id => emqx_ft_responder,
start => {emqx_ft_responder, start_link, []},
restart => permanent,
shutdown => infinity,
type => worker,
modules => [emqx_ft_responder]
},
ChildSpecs = [Responder, AssemblerSup],
{ok, {SupFlags, ChildSpecs}}.

View File

@ -0,0 +1,63 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_storage_fs_proto_v1).
-behaviour(emqx_bpapi).
-export([introduced_in/0]).
-export([list/3]).
-export([multilist/3]).
-export([pread/5]).
-export([ready_transfers/1]).
-export([get_ready_transfer/2]).
-type offset() :: emqx_ft:offset().
-type transfer() :: emqx_ft:transfer().
-type filefrag() :: emqx_ft_storage_fs:filefrag().
-include_lib("emqx/include/bpapi.hrl").
introduced_in() ->
"5.0.17".
-spec list(node(), transfer(), fragment | result) ->
{ok, [filefrag()]} | {error, term()}.
list(Node, Transfer, What) ->
erpc:call(Node, emqx_ft_storage_fs, list_local, [Transfer, What]).
-spec multilist([node()], transfer(), fragment | result) ->
emqx_rpc:erpc_multicall({ok, [filefrag()]} | {error, term()}).
multilist(Nodes, Transfer, What) ->
erpc:multicall(Nodes, emqx_ft_storage_fs, list_local, [Transfer, What]).
-spec pread(node(), transfer(), filefrag(), offset(), _Size :: non_neg_integer()) ->
{ok, [filefrag()]} | {error, term()}.
pread(Node, Transfer, Frag, Offset, Size) ->
erpc:call(Node, emqx_ft_storage_fs, pread_local, [Transfer, Frag, Offset, Size]).
-spec ready_transfers([node()]) ->
{ok, [{emqx_ft_storage:ready_transfer_id(), emqx_ft_storage:ready_transfer_info()}]}
| {error, term()}.
ready_transfers(Nodes) ->
erpc:multicall(Nodes, emqx_ft_storage_fs, ready_transfers_local, []).
-spec get_ready_transfer(node(), emqx_ft_storage:ready_transfer_id()) ->
{ok, emqx_ft_storage:ready_transfer_data()}
| {error, term()}.
get_ready_transfer(Node, ReadyTransferId) ->
erpc:call(Node, emqx_ft_storage_fs, get_ready_transfer_local, [ReadyTransferId]).

View File

@ -0,0 +1,209 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ft_assembler_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").
-include_lib("kernel/include/file.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
all() ->
[
t_assemble_empty_transfer,
t_assemble_complete_local_transfer,
% NOTE
% It depends on the side effects of all previous testcases.
t_list_transfers
].
init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
ok.
init_per_testcase(TC, Config) ->
ok = snabbkaffe:start_trace(),
{ok, Pid} = emqx_ft_assembler_sup:start_link(),
[
{storage_root, "file_transfer_root"},
{file_id, atom_to_binary(TC)},
{assembler_sup, Pid}
| Config
].
end_per_testcase(_TC, Config) ->
ok = inspect_storage_root(Config),
ok = gen:stop(?config(assembler_sup, Config)),
ok = snabbkaffe:stop(),
ok.
%%
-define(CLIENTID1, <<"thatsme">>).
-define(CLIENTID2, <<"thatsnotme">>).
t_assemble_empty_transfer(Config) ->
Storage = storage(Config),
Transfer = {?CLIENTID1, ?config(file_id, Config)},
Filename = "important.pdf",
Meta = #{
name => Filename,
size => 0,
expire_at => 42
},
ok = emqx_ft_storage_fs:store_filemeta(Storage, Transfer, Meta),
?assertMatch(
{ok, [
#{
path := _,
timestamp := {{_, _, _}, {_, _, _}},
fragment := {filemeta, Meta}
}
]},
emqx_ft_storage_fs:list(Storage, Transfer, fragment)
),
{ok, _AsmPid} = emqx_ft_storage_fs:assemble(Storage, Transfer, fun on_assembly_finished/1),
{ok, Event} = ?block_until(#{?snk_kind := test_assembly_finished}),
?assertMatch(#{result := ok}, Event),
?assertEqual(
{ok, <<>>},
% TODO
file:read_file(mk_assembly_filename(Config, Transfer, Filename))
),
{ok, [Result = #{size := Size = 0}]} = emqx_ft_storage_fs:list(Storage, Transfer, result),
?assertEqual(
{error, eof},
emqx_ft_storage_fs:pread(Storage, Transfer, Result, 0, Size)
),
ok.
t_assemble_complete_local_transfer(Config) ->
Storage = storage(Config),
Transfer = {?CLIENTID2, ?config(file_id, Config)},
Filename = "topsecret.pdf",
TransferSize = 10000 + rand:uniform(50000),
SegmentSize = 4096,
Gen = emqx_ft_content_gen:new({Transfer, TransferSize}, SegmentSize),
Hash = emqx_ft_content_gen:hash(Gen, crypto:hash_init(sha256)),
Meta = #{
name => Filename,
checksum => {sha256, Hash},
expire_at => 42
},
ok = emqx_ft_storage_fs:store_filemeta(Storage, Transfer, Meta),
_ = emqx_ft_content_gen:consume(
Gen,
fun({Content, SegmentNum, _SegmentCount}) ->
Offset = (SegmentNum - 1) * SegmentSize,
?assertEqual(
ok,
emqx_ft_storage_fs:store_segment(Storage, Transfer, {Offset, Content})
)
end
),
{ok, Fragments} = emqx_ft_storage_fs:list(Storage, Transfer, fragment),
?assertEqual((TransferSize div SegmentSize) + 1 + 1, length(Fragments)),
?assertEqual(
[Meta],
[FM || #{fragment := {filemeta, FM}} <- Fragments],
Fragments
),
{ok, _AsmPid} = emqx_ft_storage_fs:assemble(Storage, Transfer, fun on_assembly_finished/1),
{ok, Event} = ?block_until(#{?snk_kind := test_assembly_finished}),
?assertMatch(#{result := ok}, Event),
AssemblyFilename = mk_assembly_filename(Config, Transfer, Filename),
?assertMatch(
{ok, [
#{
path := AssemblyFilename,
size := TransferSize,
fragment := {result, #{}}
}
]},
emqx_ft_storage_fs:list(Storage, Transfer, result)
),
?assertMatch(
{ok, #file_info{type = regular, size = TransferSize}},
file:read_file_info(AssemblyFilename)
),
ok = emqx_ft_content_gen:check_file_consistency(
{Transfer, TransferSize},
100,
AssemblyFilename
).
mk_assembly_filename(Config, {ClientID, FileID}, Filename) ->
filename:join([?config(storage_root, Config), ClientID, FileID, result, Filename]).
on_assembly_finished(Result) ->
?tp(test_assembly_finished, #{result => Result}).
%%
t_list_transfers(Config) ->
Storage = storage(Config),
?assertMatch(
{ok, #{
{?CLIENTID1, <<"t_assemble_empty_transfer">>} := #{
status := complete,
result := [#{path := _, size := 0, fragment := {result, _}}]
},
{?CLIENTID2, <<"t_assemble_complete_local_transfer">>} := #{
status := complete,
result := [#{path := _, size := Size, fragment := {result, _}}]
}
}} when Size > 0,
emqx_ft_storage_fs:transfers(Storage)
).
%%
-include_lib("kernel/include/file.hrl").
inspect_storage_root(Config) ->
inspect_dir(?config(storage_root, Config)).
inspect_dir(Dir) ->
FileInfos = filelib:fold_files(
Dir,
".*",
true,
fun(Filename, Acc) -> orddict:store(Filename, inspect_file(Filename), Acc) end,
orddict:new()
),
ct:pal("inspect '~s': ~p", [Dir, FileInfos]).
inspect_file(Filename) ->
{ok, Info} = file:read_file_info(Filename),
{Info#file_info.type, Info#file_info.size, Info#file_info.mtime}.
mk_fileid() ->
integer_to_binary(erlang:system_time(millisecond)).
storage(Config) ->
#{
root => ?config(storage_root, Config)
}.

View File

@ -0,0 +1,229 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% Inspired by
%% https://github.com/kafka4beam/kflow/blob/master/src/testbed/payload_gen.erl
-module(emqx_ft_content_gen).
-include_lib("eunit/include/eunit.hrl").
-dialyzer(no_improper_lists).
-export([new/2]).
-export([generate/3]).
-export([next/1]).
-export([consume/1]).
-export([consume/2]).
-export([fold/3]).
-export([hash/2]).
-export([check_file_consistency/3]).
-export_type([cont/1]).
-export_type([stream/1]).
-export_type([binary_payload/0]).
-define(hash_size, 16).
-type payload() :: {Seed :: term(), Size :: integer()}.
-type binary_payload() :: {
binary(), _ChunkNum :: non_neg_integer(), _ChunkCnt :: non_neg_integer()
}.
-type cont(Data) ::
fun(() -> stream(Data))
| stream(Data).
-type stream(Data) ::
maybe_improper_list(Data, cont(Data))
| eos.
-record(chunk_state, {
seed :: term(),
payload_size :: non_neg_integer(),
offset :: non_neg_integer(),
chunk_size :: non_neg_integer()
}).
-type chunk_state() :: #chunk_state{}.
%% -----------------------------------------------------------------------------
%% Generic streams
%% -----------------------------------------------------------------------------
%% @doc Consume one element from the stream.
-spec next(cont(A)) -> stream(A).
next(Fun) when is_function(Fun, 0) ->
Fun();
next(L) ->
L.
%% @doc Consume all elements of the stream and feed them into a
%% callback (e.g. brod:produce)
-spec consume(cont(A), fun((A) -> Ret)) -> [Ret].
consume([Data | Cont], Callback) ->
[Callback(Data) | consume(next(Cont), Callback)];
consume(Cont, Callback) when is_function(Cont, 0) ->
consume(next(Cont), Callback);
consume(eos, _Callback) ->
[].
%% @equiv consume(Stream, fun(A) -> A end)
-spec consume(cont(A)) -> [A].
consume(Stream) ->
consume(Stream, fun(A) -> A end).
-spec fold(fun((A, Acc) -> Acc), Acc, cont(A)) -> Acc.
fold(Fun, Acc, [Data | Cont]) ->
fold(Fun, Fun(Data, Acc), next(Cont));
fold(Fun, Acc, Cont) when is_function(Cont, 0) ->
fold(Fun, Acc, next(Cont));
fold(_Fun, Acc, eos) ->
Acc.
%% -----------------------------------------------------------------------------
%% Binary streams
%% -----------------------------------------------------------------------------
%% @doc Stream of binary chunks.
%% Limitation: `ChunkSize' should be dividable by `?hash_size'
-spec new(payload(), integer()) -> cont(binary_payload()).
new({Seed, Size}, ChunkSize) when ChunkSize rem ?hash_size =:= 0 ->
fun() ->
generate_next_chunk(#chunk_state{
seed = Seed,
payload_size = Size,
chunk_size = ChunkSize,
offset = 0
})
end.
%% @doc Generate chunks of data and feed them into
%% `Callback'
-spec generate(payload(), integer(), fun((binary_payload()) -> A)) -> [A].
generate(Payload, ChunkSize, Callback) ->
consume(new(Payload, ChunkSize), Callback).
-spec hash(cont(binary_payload()), crypto:hash_state()) -> binary().
hash(Stream, HashCtxIn) ->
crypto:hash_final(
fold(
fun({Chunk, _, _}, HashCtx) ->
crypto:hash_update(HashCtx, Chunk)
end,
HashCtxIn,
Stream
)
).
-spec check_consistency(
payload(),
integer(),
fun((integer()) -> {ok, binary()} | undefined)
) -> ok.
check_consistency({Seed, Size}, SampleSize, Callback) ->
SeedHash = seed_hash(Seed),
Random = [rand:uniform(Size) - 1 || _ <- lists:seq(1, SampleSize)],
%% Always check first and last bytes, and one that should not exist:
Samples = [0, Size - 1, Size | Random],
lists:foreach(
fun
(N) when N < Size ->
Expected = do_get_byte(N, SeedHash),
?assertEqual(
{N, {ok, Expected}},
{N, Callback(N)}
);
(N) ->
?assertMatch(undefined, Callback(N))
end,
Samples
).
-spec check_file_consistency(
payload(),
integer(),
file:filename()
) -> ok.
check_file_consistency(Payload, SampleSize, FileName) ->
{ok, FD} = file:open(FileName, [read, raw]),
try
Fun = fun(N) ->
case file:pread(FD, [{N, 1}]) of
{ok, [[X]]} -> {ok, X};
{ok, [eof]} -> undefined
end
end,
check_consistency(Payload, SampleSize, Fun)
after
file:close(FD)
end.
%% =============================================================================
%% Internal functions
%% =============================================================================
%% @doc Continue generating chunks
-spec generate_next_chunk(chunk_state()) -> stream(binary()).
generate_next_chunk(#chunk_state{offset = Offset, payload_size = Size}) when Offset >= Size ->
eos;
generate_next_chunk(State0 = #chunk_state{offset = Offset, chunk_size = ChunkSize}) ->
State = State0#chunk_state{offset = Offset + ChunkSize},
Payload = generate_chunk(
State#chunk_state.seed,
Offset,
ChunkSize,
State#chunk_state.payload_size
),
[Payload | fun() -> generate_next_chunk(State) end].
generate_chunk(Seed, Offset, ChunkSize, Size) ->
SeedHash = seed_hash(Seed),
To = min(Offset + ChunkSize, Size) - 1,
Payload = iolist_to_binary([
generator_fun(I, SeedHash)
|| I <- lists:seq(Offset div 16, To div 16)
]),
ChunkNum = Offset div ChunkSize + 1,
ChunkCnt = ceil(Size / ChunkSize),
Chunk =
case Offset + ChunkSize of
NextOffset when NextOffset > Size ->
binary:part(Payload, 0, Size rem ChunkSize);
_ ->
Payload
end,
{Chunk, ChunkNum, ChunkCnt}.
%% @doc First argument is a chunk number, the second one is a seed.
%% This implementation is hardly efficient, but it was chosen for
%% clarity reasons
-spec generator_fun(integer(), binary()) -> binary().
generator_fun(N, Seed) ->
crypto:hash(md5, <<N:32, Seed/binary>>).
%% @doc Hash any term
-spec seed_hash(term()) -> binary().
seed_hash(Seed) ->
crypto:hash(md5, term_to_binary(Seed)).
%% @private Get byte at offset `N'
-spec do_get_byte(integer(), binary()) -> byte().
do_get_byte(N, Seed) ->
Chunk = generator_fun(N div ?hash_size, Seed),
binary:at(Chunk, N rem ?hash_size).

View File

@ -398,7 +398,8 @@ relx_apps(ReleaseType, Edition) ->
emqx_prometheus,
emqx_psk,
emqx_slow_subs,
emqx_plugins
emqx_plugins,
emqx_ft
] ++
[quicer || is_quicer_supported()] ++
[bcrypt || provide_bcrypt_release(ReleaseType)] ++