feat(ft): improve remote reader

This commit is contained in:
Ilya Averyanov 2023-02-07 17:49:14 +02:00
parent 0aefd4a8c7
commit b7d0bad970
2 changed files with 14 additions and 5 deletions

View File

@ -53,6 +53,9 @@ table(ReaderPid) ->
[Data | fun() -> NextFun(Pid) end]; [Data | fun() -> NextFun(Pid) end];
{error, Reason} -> {error, Reason} ->
?SLOG(warning, #{msg => "file_read_error", reason => Reason}), ?SLOG(warning, #{msg => "file_read_error", reason => Reason}),
[];
{BadRPC, Reason} when BadRPC =:= badrpc orelse BadRPC =:= badtcp ->
?SLOG(warning, #{msg => "file_read_rpc_error", kind => BadRPC, reason => Reason}),
[] []
end end
catch catch
@ -84,13 +87,15 @@ read(Pid) ->
gen_server:call(Pid, read). gen_server:call(Pid, read).
init([CallerPid, Filename, ChunkSize]) -> init([CallerPid, Filename, ChunkSize]) ->
true = link(CallerPid), MRef = erlang:monitor(process, CallerPid),
case file:open(Filename, [read, raw, binary]) of case file:open(Filename, [read, raw, binary]) of
{ok, File} -> {ok, File} ->
{ok, #{ {ok, #{
filename => Filename, filename => Filename,
file => File, file => File,
chunk_size => ChunkSize chunk_size => ChunkSize,
caller_pid => CallerPid,
mref => MRef
}}; }};
{error, Reason} -> {error, Reason} ->
{stop, Reason} {stop, Reason}
@ -99,10 +104,10 @@ init([CallerPid, Filename, ChunkSize]) ->
handle_call(read, _From, #{file := File, chunk_size := ChunkSize} = State) -> handle_call(read, _From, #{file := File, chunk_size := ChunkSize} = State) ->
case file:read(File, ChunkSize) of case file:read(File, ChunkSize) of
{ok, Data} -> {ok, Data} ->
?SLOG(warning, #{msg => "read", bytes => byte_size(Data)}), ?SLOG(debug, #{msg => "read", bytes => byte_size(Data)}),
{reply, {ok, Data}, State}; {reply, {ok, Data}, State};
eof -> eof ->
?SLOG(warning, #{msg => "read", eof => true}), ?SLOG(debug, #{msg => "read", eof => true}),
{stop, normal, eof, State}; {stop, normal, eof, State};
{error, Reason} = Error -> {error, Reason} = Error ->
{stop, Reason, Error, State} {stop, Reason, Error, State}
@ -110,6 +115,10 @@ handle_call(read, _From, #{file := File, chunk_size := ChunkSize} = State) ->
handle_call(Msg, _From, State) -> handle_call(Msg, _From, State) ->
{stop, {bad_call, Msg}, {bad_call, Msg}, State}. {stop, {bad_call, Msg}, {bad_call, Msg}, State}.
handle_info(
{'DOWN', MRef, process, CallerPid, _Reason}, #{mref := MRef, caller_pid := CallerPid} = State
) ->
{stop, {caller_down, CallerPid}, State};
handle_info(Msg, State) -> handle_info(Msg, State) ->
?SLOG(warning, #{msg => "unexpected_message", info_msg => Msg}), ?SLOG(warning, #{msg => "unexpected_message", info_msg => Msg}),
{noreply, State}. {noreply, State}.

View File

@ -30,4 +30,4 @@ introduced_in() ->
-spec read(node(), pid()) -> -spec read(node(), pid()) ->
{ok, binary()} | eof | {error, term()} | no_return(). {ok, binary()} | eof | {error, term()} | no_return().
read(Node, Pid) -> read(Node, Pid) ->
erpc:call(Node, emqx_ft_storage_fs_reader, read, [Pid]). emqx_rpc:call(Node, emqx_ft_storage_fs_reader, read, [Pid]).