feat(ft): removed replicated data

This commit is contained in:
Ilya Averyanov 2023-02-02 23:52:42 +02:00
parent 72e3eee6c9
commit b4a42a447c
9 changed files with 56 additions and 162 deletions

View File

@ -172,7 +172,6 @@ register_channel(ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid)
true = ets:insert(?CHAN_CONN_TAB, {Chan, ConnMod}),
ok = emqx_cm_registry:register_channel(Chan),
mark_channel_connected(ChanPid),
ok = emqx_hooks:run('channel.registered', [ConnMod, ChanPid]),
cast({registered, Chan}).
%% @doc Unregister a channel.
@ -282,7 +281,7 @@ open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
{ok, #{session => Session1, present => false}}
end,
emqx_cm_locker:trans(ClientId, CleanStart);
open_session(false, ClientInfo = #{clientid := ClientId}, #{conn_mod := NewConnMod} = ConnInfo) ->
open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
Self = self(),
ResumeStart = fun(_) ->
CreateSess =
@ -309,11 +308,10 @@ open_session(false, ClientInfo = #{clientid := ClientId}, #{conn_mod := NewConnM
{living, ConnMod, ChanPid, Session} ->
ok = emqx_session:resume(ClientInfo, Session),
case wrap_rpc(emqx_cm_proto_v2:takeover_finish(ConnMod, ChanPid)) of
{ok, Pendings, TakoverData} ->
{ok, Pendings} ->
Session1 = emqx_persistent_session:persist(
ClientInfo, ConnInfo, Session
),
ok = emqx_hooks:run('channel.takenover', [NewConnMod, Self, TakoverData]),
{ok, #{
session => clean_session(Session1),
present => true,
@ -399,18 +397,11 @@ takeover_session(ClientId) ->
end.
takeover_finish(ConnMod, ChanPid) ->
TakoverData = emqx_hooks:run_fold('channel.takeover', [ConnMod, ChanPid], #{}),
case
%% node-local call
request_stepdown(
{takeover, 'end'},
ConnMod,
ChanPid
)
of
{ok, Pendings} -> {ok, Pendings, TakoverData};
{error, _} = Error -> Error
end.
request_stepdown(
{takeover, 'end'},
ConnMod,
ChanPid
).
takeover_session(ClientId, Pid) ->
try

View File

@ -2,13 +2,13 @@ emqx_ft_schema {
local {
desc {
en: "Use local file system to store uploaded files and temporary data."
zh: "使用本地文件系统来存储上传的文件和临时数据。"
en: "Use local file system to store uploaded files and temporary data."
zh: "使用本地文件系统来存储上传的文件和临时数据。"
}
label: {
en: "Local Storage"
zh: "本地存储"
}
en: "Local Storage"
zh: "本地存储"
}
}
}

View File

@ -13,5 +13,3 @@
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-define(FT_TAB, emqx_ft).

View File

@ -16,29 +16,21 @@
-module(emqx_ft).
-include("emqx_ft.hrl").
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("emqx/include/emqx_hooks.hrl").
-export([
create_tab/0,
hook/0,
unhook/0
]).
-export([
on_channel_unregistered/1,
on_channel_takeover/3,
on_channel_takenover/3,
on_message_publish/1,
on_message_puback/4
]).
%% For Debug
-export([transfer/2, storage/0]).
-export([on_assemble_timeout/1]).
-export_type([
@ -79,43 +71,17 @@
-type segment() :: {offset(), _Content :: binary()}.
-type ft_data() :: #{
nodes := list(node())
}.
-record(emqx_ft, {
chan_pid :: pid(),
ft_data :: ft_data()
}).
-define(ASSEMBLE_TIMEOUT, 5000).
%%--------------------------------------------------------------------
%% API for app
%%--------------------------------------------------------------------
create_tab() ->
_Tab = ets:new(?FT_TAB, [
set,
public,
named_table,
{keypos, #emqx_ft.chan_pid}
]),
ok.
hook() ->
ok = emqx_hooks:put('channel.unregistered', {?MODULE, on_channel_unregistered, []}, ?HP_LOWEST),
ok = emqx_hooks:put('channel.takeover', {?MODULE, on_channel_takeover, []}, ?HP_LOWEST),
ok = emqx_hooks:put('channel.takenover', {?MODULE, on_channel_takenover, []}, ?HP_LOWEST),
ok = emqx_hooks:put('message.publish', {?MODULE, on_message_publish, []}, ?HP_LOWEST),
ok = emqx_hooks:put('message.puback', {?MODULE, on_message_puback, []}, ?HP_LOWEST).
unhook() ->
ok = emqx_hooks:del('channel.unregistered', {?MODULE, on_channel_unregistered}),
ok = emqx_hooks:del('channel.takeover', {?MODULE, on_channel_takeover}),
ok = emqx_hooks:del('channel.takenover', {?MODULE, on_channel_takenover}),
ok = emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
ok = emqx_hooks:del('message.puback', {?MODULE, on_message_puback}).
@ -123,22 +89,6 @@ unhook() ->
%% Hooks
%%--------------------------------------------------------------------
on_channel_unregistered(ChanPid) ->
ok = delete_ft_data(ChanPid).
on_channel_takeover(_ConnMod, ChanPid, TakeoverData) ->
case get_ft_data(ChanPid) of
{ok, FTData} ->
{ok, TakeoverData#{ft_data => FTData}};
none ->
ok
end.
on_channel_takenover(_ConnMod, ChanPid, #{ft_data := FTData}) ->
ok = put_ft_data(ChanPid, FTData);
on_channel_takenover(_ConnMod, _ChanPid, _) ->
ok.
on_message_publish(
Msg = #message{
id = _Id,
@ -190,8 +140,7 @@ on_init(Msg, FileId) ->
% %% Add validations here
Meta = emqx_json:decode(Payload, [return_maps]),
case emqx_ft_storage:store_filemeta(transfer(Msg, FileId), Meta) of
{ok, Ctx} ->
ok = put_context(Ctx),
ok ->
?RC_SUCCESS;
{error, _Reason} ->
?RC_UNSPECIFIED_ERROR
@ -213,9 +162,8 @@ on_segment(Msg, FileId, Offset, Checksum) ->
Payload = Msg#message.payload,
Segment = {binary_to_integer(Offset), Payload},
%% Add offset/checksum validations
case emqx_ft_storage:store_segment(get_context(), transfer(Msg, FileId), Segment) of
{ok, Ctx} ->
ok = put_context(Ctx),
case emqx_ft_storage:store_segment(transfer(Msg, FileId), Segment) of
ok ->
?RC_SUCCESS;
{error, _Reason} ->
?RC_UNSPECIFIED_ERROR
@ -267,7 +215,6 @@ on_fin(PacketId, Msg, FileId, Checksum) ->
assemble(Transfer, Callback) ->
emqx_ft_storage:assemble(
get_context(),
Transfer,
Callback
).
@ -277,12 +224,12 @@ callback({ChanPid, PacketId} = Key, _FileId) ->
case emqx_ft_responder:unregister(Key) of
ok ->
case Result of
{ok, _} ->
ok ->
erlang:send(ChanPid, {puback, PacketId, [], ?RC_SUCCESS});
{error, _} ->
erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR})
end;
{error, not_registered} ->
{error, not_found} ->
ok
end
end.
@ -291,35 +238,6 @@ transfer(Msg, FileId) ->
ClientId = Msg#message.from,
{ClientId, FileId}.
%% TODO: configure
storage() ->
emqx_config:get([file_transfer, storage]).
on_assemble_timeout({ChanPid, PacketId}) ->
?SLOG(warning, #{msg => "on_assemble_timeout", packet_id => PacketId}),
erlang:send(ChanPid, {puback, PacketId, [], ?RC_UNSPECIFIED_ERROR}).
%%--------------------------------------------------------------------
%% Context management
%%--------------------------------------------------------------------
get_context() ->
get_ft_data(self()).
put_context(Context) ->
put_ft_data(self(), Context).
get_ft_data(ChanPid) ->
case ets:lookup(?FT_TAB, ChanPid) of
[#emqx_ft{ft_data = FTData}] -> {ok, FTData};
[] -> none
end.
delete_ft_data(ChanPid) ->
true = ets:delete(?FT_TAB, ChanPid),
ok.
put_ft_data(ChanPid, FTData) ->
true = ets:insert(?FT_TAB, #emqx_ft{chan_pid = ChanPid, ft_data = FTData}),
ok.

View File

@ -19,12 +19,11 @@
-export(
[
store_filemeta/2,
store_segment/3,
assemble/3
store_segment/2,
assemble/2
]
).
-type ctx() :: term().
-type storage() :: emqx_config:config().
-export_type([assemble_callback/0]).
@ -32,14 +31,14 @@
-type assemble_callback() :: fun((ok | {error, term()}) -> any()).
%%--------------------------------------------------------------------
%% behaviour
%% Behaviour
%%--------------------------------------------------------------------
-callback store_filemeta(storage(), emqx_ft:transfer(), emqx_ft:filemeta()) ->
{ok, ctx()} | {error, term()}.
-callback store_segment(storage(), ctx(), emqx_ft:transfer(), emqx_ft:segment()) ->
{ok, ctx()} | {error, term()}.
-callback assemble(storage(), ctx(), emqx_ft:transfer(), assemble_callback()) ->
ok | {error, term()}.
-callback store_segment(storage(), emqx_ft:transfer(), emqx_ft:segment()) ->
ok | {error, term()}.
-callback assemble(storage(), emqx_ft:transfer(), assemble_callback()) ->
{ok, pid()} | {error, term()}.
%%--------------------------------------------------------------------
@ -47,28 +46,28 @@
%%--------------------------------------------------------------------
-spec store_filemeta(emqx_ft:transfer(), emqx_ft:filemeta()) ->
{ok, ctx()} | {error, term()}.
ok | {error, term()}.
store_filemeta(Transfer, FileMeta) ->
Mod = mod(),
Mod:store_filemeta(storage(), Transfer, FileMeta).
-spec store_segment(ctx(), emqx_ft:transfer(), emqx_ft:segment()) ->
{ok, ctx()} | {error, term()}.
store_segment(Ctx, Transfer, Segment) ->
-spec store_segment(emqx_ft:transfer(), emqx_ft:segment()) ->
ok | {error, term()}.
store_segment(Transfer, Segment) ->
Mod = mod(),
Mod:store_segment(storage(), Ctx, Transfer, Segment).
Mod:store_segment(storage(), Transfer, Segment).
-spec assemble(ctx(), emqx_ft:transfer(), assemble_callback()) ->
-spec assemble(emqx_ft:transfer(), assemble_callback()) ->
{ok, pid()} | {error, term()}.
assemble(Ctx, Transfer, Callback) ->
assemble(Transfer, Callback) ->
Mod = mod(),
Mod:assemble(storage(), Ctx, Transfer, Callback).
Mod:assemble(storage(), Transfer, Callback).
mod() ->
case storage() of
#{type := local} ->
% emqx_ft_storage_fs
emqx_ft_storage_dummy
emqx_ft_storage_fs
% emqx_ft_storage_dummy
end.
storage() ->

View File

@ -20,16 +20,16 @@
-export([
store_filemeta/3,
store_segment/4,
assemble/4
store_segment/3,
assemble/3
]).
store_filemeta(_Storage, _Transfer, _Meta) ->
{ok, #{}}.
ok.
store_segment(_Storage, Ctx, _Transfer, _Segment) ->
{ok, Ctx}.
store_segment(_Storage, _Transfer, _Segment) ->
ok.
assemble(_Storage, _Ctx, _Transfer, Callback) ->
assemble(_Storage, _Transfer, Callback) ->
Pid = spawn(fun() -> Callback({error, not_implemented}) end),
{ok, Pid}.

View File

@ -22,7 +22,7 @@
-behaviour(emqx_ft_storage).
-export([store_filemeta/3]).
-export([store_segment/4]).
-export([store_segment/3]).
-export([list/2]).
-export([read_segment/5]).
-export([assemble/3]).
@ -84,8 +84,7 @@ store_filemeta(Storage, Transfer, Meta) ->
case read_file(Filepath, fun decode_filemeta/1) of
{ok, Meta} ->
_ = touch_file(Filepath),
%% No context is needed for this implementation.
{ok, #{}};
ok;
{ok, _Conflict} ->
% TODO
% We won't see conflicts in case of concurrent `store_filemeta`
@ -98,18 +97,13 @@ store_filemeta(Storage, Transfer, Meta) ->
%% Store a segment in the backing filesystem.
%% Atomic operation.
-spec store_segment(storage(), emqx_ft_storage:ctx(), transfer(), segment()) ->
-spec store_segment(storage(), transfer(), segment()) ->
% Where is the checksum gets verified? Upper level probably.
% Quota? Some lower level errors?
ok | {error, _TODO}.
store_segment(Storage, Ctx, Transfer, Segment = {_Offset, Content}) ->
store_segment(Storage, Transfer, Segment = {_Offset, Content}) ->
Filepath = mk_filepath(Storage, Transfer, mk_segment_filename(Segment)),
case write_file_atomic(Filepath, Content) of
ok ->
{ok, Ctx};
{error, _} = Error ->
Error
end.
write_file_atomic(Filepath, Content).
-spec list(storage(), transfer()) ->
% Some lower level errors? {error, notfound}?
@ -149,8 +143,9 @@ read_segment(_Storage, _Transfer, Segment, Offset, Size) ->
end.
-spec assemble(storage(), transfer(), fun((ok | {error, term()}) -> any())) ->
% {ok, _Assembler :: pid()} | {error, incomplete} | {error, badrpc} | {error, _TODO}.
{ok, _Assembler :: pid()} | {error, _TODO}.
assemble(Storage, _Ctx, Transfer, Callback) ->
assemble(Storage, Transfer, Callback) ->
emqx_ft_assembler_sup:start_child(Storage, Transfer, Callback).
-type handle() :: {file:name(), io:device(), crypto:hash_state()}.
@ -321,8 +316,8 @@ mk_filedir(Storage, {ClientId, FileId}) ->
mk_filepath(Storage, Transfer, Filename) ->
filename:join(mk_filedir(Storage, Transfer), Filename).
get_storage_root(_Storage) ->
filename:join(emqx:data_dir(), "file_transfer").
get_storage_root(Storage) ->
maps:get(root, Storage, filename:join(emqx:data_dir(), "file_transfer")).
-include_lib("kernel/include/file.hrl").

View File

@ -27,17 +27,7 @@
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
%% sup_flags() = #{strategy => strategy(), % optional
%% intensity => non_neg_integer(), % optional
%% period => pos_integer()} % optional
%% child_spec() = #{id => child_id(), % mandatory
%% start => mfargs(), % mandatory
%% restart => restart(), % optional
%% shutdown => shutdown(), % optional
%% type => worker(), % optional
%% modules => modules()} % optional
init([]) ->
ok = emqx_ft:create_tab(),
SupFlags = #{
strategy => one_for_all,
intensity => 100,
@ -64,5 +54,3 @@ init([]) ->
ChildSpecs = [Responder, AssemblerSup],
{ok, {SupFlags, ChildSpecs}}.
%% internal functions

View File

@ -54,7 +54,7 @@ end_per_testcase(_TC, Config) ->
-define(CLIENTID, <<"thatsme">>).
t_assemble_empty_transfer(Config) ->
Storage = ?config(storage_root, Config),
Storage = storage(Config),
Transfer = {?CLIENTID, mk_fileid()},
Filename = "important.pdf",
Meta = #{
@ -84,7 +84,7 @@ t_assemble_empty_transfer(Config) ->
ok.
t_assemble_complete_local_transfer(Config) ->
Storage = ?config(storage_root, Config),
Storage = storage(Config),
Transfer = {?CLIENTID, mk_fileid()},
Filename = "topsecret.pdf",
TransferSize = 10000 + rand:uniform(50000),
@ -161,3 +161,8 @@ inspect_file(Filename) ->
mk_fileid() ->
integer_to_binary(erlang:system_time(millisecond)).
storage(Config) ->
#{
root => ?config(storage_root, Config)
}.