diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs_reader.erl b/apps/emqx_ft/src/emqx_ft_storage_fs_reader.erl index a6307765e..9c4aa5e0c 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs_reader.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs_reader.erl @@ -53,6 +53,9 @@ table(ReaderPid) -> [Data | fun() -> NextFun(Pid) end]; {error, Reason} -> ?SLOG(warning, #{msg => "file_read_error", reason => Reason}), + []; + {BadRPC, Reason} when BadRPC =:= badrpc orelse BadRPC =:= badtcp -> + ?SLOG(warning, #{msg => "file_read_rpc_error", kind => BadRPC, reason => Reason}), [] end catch @@ -84,13 +87,15 @@ read(Pid) -> gen_server:call(Pid, read). init([CallerPid, Filename, ChunkSize]) -> - true = link(CallerPid), + MRef = erlang:monitor(process, CallerPid), case file:open(Filename, [read, raw, binary]) of {ok, File} -> {ok, #{ filename => Filename, file => File, - chunk_size => ChunkSize + chunk_size => ChunkSize, + caller_pid => CallerPid, + mref => MRef }}; {error, Reason} -> {stop, Reason} @@ -99,10 +104,10 @@ init([CallerPid, Filename, ChunkSize]) -> handle_call(read, _From, #{file := File, chunk_size := ChunkSize} = State) -> case file:read(File, ChunkSize) of {ok, Data} -> - ?SLOG(warning, #{msg => "read", bytes => byte_size(Data)}), + ?SLOG(debug, #{msg => "read", bytes => byte_size(Data)}), {reply, {ok, Data}, State}; eof -> - ?SLOG(warning, #{msg => "read", eof => true}), + ?SLOG(debug, #{msg => "read", eof => true}), {stop, normal, eof, State}; {error, Reason} = Error -> {stop, Reason, Error, State} @@ -110,6 +115,10 @@ handle_call(read, _From, #{file := File, chunk_size := ChunkSize} = State) -> handle_call(Msg, _From, State) -> {stop, {bad_call, Msg}, {bad_call, Msg}, State}. +handle_info( + {'DOWN', MRef, process, CallerPid, _Reason}, #{mref := MRef, caller_pid := CallerPid} = State +) -> + {stop, {caller_down, CallerPid}, State}; handle_info(Msg, State) -> ?SLOG(warning, #{msg => "unexpected_message", info_msg => Msg}), {noreply, State}. diff --git a/apps/emqx_ft/src/proto/emqx_ft_storage_fs_reader_proto_v1.erl b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_reader_proto_v1.erl index 1bbb05471..982b9ca57 100644 --- a/apps/emqx_ft/src/proto/emqx_ft_storage_fs_reader_proto_v1.erl +++ b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_reader_proto_v1.erl @@ -30,4 +30,4 @@ introduced_in() -> -spec read(node(), pid()) -> {ok, binary()} | eof | {error, term()} | no_return(). read(Node, Pid) -> - erpc:call(Node, emqx_ft_storage_fs_reader, read, [Pid]). + emqx_rpc:call(Node, emqx_ft_storage_fs_reader, read, [Pid]).