fix(ft): fix typespecs

This commit is contained in:
Ilya Averyanov 2023-02-08 20:37:56 +02:00
parent 836ec213c9
commit 9e4a37a398
5 changed files with 31 additions and 16 deletions

View File

@ -78,7 +78,7 @@ takeover_finish(ConnMod, ChanPid) ->
erpc:call( erpc:call(
node(ChanPid), node(ChanPid),
emqx_cm, emqx_cm,
takeover_session_finish, takeover_finish,
[ConnMod, ChanPid], [ConnMod, ChanPid],
?T_TAKEOVER * 2 ?T_TAKEOVER * 2
). ).

View File

@ -34,7 +34,7 @@
storage :: _Storage, storage :: _Storage,
transfer :: emqx_ft:transfer(), transfer :: emqx_ft:transfer(),
assembly :: _TODO, assembly :: _TODO,
file :: io:device(), file :: {file:filename(), io:device(), term()} | undefined,
hash, hash,
callback :: fun((ok | {error, term()}) -> any()) callback :: fun((ok | {error, term()}) -> any())
}). }).
@ -120,10 +120,16 @@ handle_event(internal, _, {assemble, [{Node, Segment} | Rest]}, St = #st{}) ->
% TODO: pipelining % TODO: pipelining
case pread(Node, Segment, St) of case pread(Node, Segment, St) of
{ok, Content} -> {ok, Content} ->
{ok, NHandle} = emqx_ft_storage_fs:write(St#st.file, Content), case emqx_ft_storage_fs:write(St#st.file, Content) of
{next_state, {assemble, Rest}, St#st{file = NHandle}, ?internal([])} {ok, NHandle} ->
% {error, _} -> {next_state, {assemble, Rest}, St#st{file = NHandle}, ?internal([])};
% ... %% TODO: better error handling
{error, Error} ->
error(Error)
end;
{error, Error} ->
%% TODO: better error handling
error(Error)
end; end;
handle_event(internal, _, {assemble, []}, St = #st{}) -> handle_event(internal, _, {assemble, []}, St = #st{}) ->
{next_state, complete, St, ?internal([])}; {next_state, complete, St, ?internal([])};

View File

@ -85,7 +85,7 @@
%% Atomic operation. %% Atomic operation.
-spec store_filemeta(storage(), transfer(), filemeta()) -> -spec store_filemeta(storage(), transfer(), filemeta()) ->
% Quota? Some lower level errors? % Quota? Some lower level errors?
{ok, emqx_ft_storage:ctx()} | {error, conflict} | {error, _TODO}. ok | {error, conflict} | {error, _TODO}.
store_filemeta(Storage, Transfer, Meta) -> store_filemeta(Storage, Transfer, Meta) ->
% TODO safeguard against bad clientids / fileids. % TODO safeguard against bad clientids / fileids.
Filepath = mk_filepath(Storage, Transfer, [?FRAGDIR], ?MANIFEST), Filepath = mk_filepath(Storage, Transfer, [?FRAGDIR], ?MANIFEST),
@ -226,8 +226,16 @@ ready_transfers(_Storage) ->
end, end,
Results Results
), ),
?SLOG(warning, #{msg => "ready_transfers", failures => BadResults}), case {GoodResults, BadResults} of
{ok, [File || {ok, Files} <- GoodResults, File <- Files]}. {[], _} ->
?SLOG(warning, #{msg => "ready_transfers", failures => BadResults}),
{error, no_nodes};
{_, []} ->
{ok, [File || {ok, Files} <- GoodResults, File <- Files]};
{_, _} ->
?SLOG(warning, #{msg => "ready_transfers", failures => BadResults}),
{ok, [File || {ok, Files} <- GoodResults, File <- Files]}
end.
ready_transfers_local(Storage) -> ready_transfers_local(Storage) ->
{ok, Transfers} = transfers(Storage), {ok, Transfers} = transfers(Storage),
@ -323,7 +331,7 @@ open_file(Storage, Transfer, Filemeta) ->
end. end.
-spec write(handle(), iodata()) -> -spec write(handle(), iodata()) ->
ok | {error, _TODO}. {ok, handle()} | {error, _TODO}.
write({Filepath, IoDevice, Ctx}, IoData) -> write({Filepath, IoDevice, Ctx}, IoData) ->
case file:write(IoDevice, IoData) of case file:write(IoDevice, IoData) of
ok -> ok ->

View File

@ -58,12 +58,9 @@ table(ReaderPid, Bytes) when is_pid(ReaderPid) andalso is_integer(Bytes) andalso
eof -> eof ->
[]; [];
{ok, Data} -> {ok, Data} ->
[Data | fun() -> NextFun(Pid) end]; [Data] ++ fun() -> NextFun(Pid) end;
{error, Reason} -> {error, Reason} ->
?SLOG(warning, #{msg => "file_read_error", reason => Reason}), ?SLOG(warning, #{msg => "file_read_error", reason => Reason}),
[];
{BadRPC, Reason} when BadRPC =:= badrpc orelse BadRPC =:= badtcp ->
?SLOG(warning, #{msg => "file_read_rpc_error", kind => BadRPC, reason => Reason}),
[] []
end end
end, end,

View File

@ -51,8 +51,12 @@ pread(Node, Transfer, Frag, Offset, Size) ->
erpc:call(Node, emqx_ft_storage_fs_proxy, pread_local, [Transfer, Frag, Offset, Size]). erpc:call(Node, emqx_ft_storage_fs_proxy, pread_local, [Transfer, Frag, Offset, Size]).
-spec ready_transfers([node()]) -> -spec ready_transfers([node()]) ->
{ok, [{emqx_ft_storage:ready_transfer_id(), emqx_ft_storage:ready_transfer_info()}]} [
| {error, term()}. {ok, [{emqx_ft_storage:ready_transfer_id(), emqx_ft_storage:ready_transfer_info()}]}
| {error, term()}
| {exit, term()}
| {throw, term()}
].
ready_transfers(Nodes) -> ready_transfers(Nodes) ->
erpc:multicall(Nodes, emqx_ft_storage_fs_proxy, ready_transfers_local, []). erpc:multicall(Nodes, emqx_ft_storage_fs_proxy, ready_transfers_local, []).