feat(ft): add tests for remote reader
This commit is contained in:
parent
b7d0bad970
commit
8038a3fd4a
|
@ -0,0 +1,5 @@
|
|||
file_transfer {
|
||||
storage {
|
||||
type = local
|
||||
}
|
||||
}
|
|
@ -19,7 +19,18 @@
|
|||
-behaviour(gen_server).
|
||||
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
-include_lib("emqx/include/types.hrl").
|
||||
|
||||
%% API
|
||||
-export([
|
||||
start_link/2,
|
||||
start_supervised/2,
|
||||
table/1,
|
||||
table/2,
|
||||
read/2
|
||||
]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([
|
||||
init/1,
|
||||
handle_call/3,
|
||||
|
@ -29,71 +40,68 @@
|
|||
code_change/3
|
||||
]).
|
||||
|
||||
-export([
|
||||
start_link/2,
|
||||
start_link/3,
|
||||
start_supervised/2,
|
||||
start_supervised/3,
|
||||
read/1
|
||||
]).
|
||||
|
||||
-export([
|
||||
table/1
|
||||
]).
|
||||
|
||||
-define(DEFAULT_CHUNK_SIZE, 1024).
|
||||
-define(IS_FILENAME(Filename), (is_list(Filename) or is_binary(Filename))).
|
||||
|
||||
table(ReaderPid) ->
|
||||
%%--------------------------------------------------------------------
|
||||
%% API
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-spec table(pid()) -> qlc:query_handle().
|
||||
table(ReaderPid) when is_pid(ReaderPid) ->
|
||||
table(ReaderPid, ?DEFAULT_CHUNK_SIZE).
|
||||
|
||||
-spec table(pid(), pos_integer()) -> qlc:query_handle().
|
||||
table(ReaderPid, Bytes) when is_pid(ReaderPid) andalso is_integer(Bytes) andalso Bytes > 0 ->
|
||||
NextFun = fun NextFun(Pid) ->
|
||||
try
|
||||
case emqx_ft_storage_fs_reader_proto_v1:read(node(Pid), Pid) of
|
||||
eof ->
|
||||
[];
|
||||
{ok, Data} ->
|
||||
[Data | fun() -> NextFun(Pid) end];
|
||||
{error, Reason} ->
|
||||
?SLOG(warning, #{msg => "file_read_error", reason => Reason}),
|
||||
[];
|
||||
{BadRPC, Reason} when BadRPC =:= badrpc orelse BadRPC =:= badtcp ->
|
||||
?SLOG(warning, #{msg => "file_read_rpc_error", kind => BadRPC, reason => Reason}),
|
||||
[]
|
||||
end
|
||||
catch
|
||||
Class:Error:Stacktrace ->
|
||||
?SLOG(warning, #{
|
||||
msg => "file_read_error",
|
||||
class => Class,
|
||||
reason => Error,
|
||||
stacktrace => Stacktrace
|
||||
}),
|
||||
case emqx_ft_storage_fs_reader_proto_v1:read(node(Pid), Pid, Bytes) of
|
||||
eof ->
|
||||
[];
|
||||
{ok, Data} ->
|
||||
[Data | fun() -> NextFun(Pid) end];
|
||||
{error, Reason} ->
|
||||
?SLOG(warning, #{msg => "file_read_error", reason => Reason}),
|
||||
[];
|
||||
{BadRPC, Reason} when BadRPC =:= badrpc orelse BadRPC =:= badtcp ->
|
||||
?SLOG(warning, #{msg => "file_read_rpc_error", kind => BadRPC, reason => Reason}),
|
||||
[]
|
||||
end
|
||||
end,
|
||||
qlc:table(fun() -> NextFun(ReaderPid) end, []).
|
||||
|
||||
start_link(CallerPid, Filename) ->
|
||||
start_link(CallerPid, Filename, ?DEFAULT_CHUNK_SIZE).
|
||||
-spec start_link(pid(), filename:filename()) -> startlink_ret().
|
||||
start_link(CallerPid, Filename) when
|
||||
is_pid(CallerPid) andalso
|
||||
?IS_FILENAME(Filename)
|
||||
->
|
||||
gen_server:start_link(?MODULE, [CallerPid, Filename], []).
|
||||
|
||||
start_link(CallerPid, Filename, ChunkSize) ->
|
||||
gen_server:start_link(?MODULE, [CallerPid, Filename, ChunkSize], []).
|
||||
-spec start_supervised(pid(), filename:filename()) -> startlink_ret().
|
||||
start_supervised(CallerPid, Filename) when
|
||||
is_pid(CallerPid) andalso
|
||||
?IS_FILENAME(Filename)
|
||||
->
|
||||
emqx_ft_storage_fs_reader_sup:start_child(CallerPid, Filename).
|
||||
|
||||
start_supervised(CallerPid, Filename) ->
|
||||
start_supervised(CallerPid, Filename, ?DEFAULT_CHUNK_SIZE).
|
||||
-spec read(pid(), pos_integer()) -> {ok, binary()} | eof | {error, term()}.
|
||||
read(Pid, Bytes) when
|
||||
is_pid(Pid) andalso
|
||||
is_integer(Bytes) andalso
|
||||
Bytes > 0
|
||||
->
|
||||
gen_server:call(Pid, {read, Bytes}).
|
||||
|
||||
start_supervised(CallerPid, Filename, ChunkSize) ->
|
||||
emqx_ft_storage_fs_reader_sup:start_child(CallerPid, Filename, ChunkSize).
|
||||
%%--------------------------------------------------------------------
|
||||
%% gen_server callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
read(Pid) ->
|
||||
gen_server:call(Pid, read).
|
||||
|
||||
init([CallerPid, Filename, ChunkSize]) ->
|
||||
init([CallerPid, Filename]) ->
|
||||
MRef = erlang:monitor(process, CallerPid),
|
||||
case file:open(Filename, [read, raw, binary]) of
|
||||
{ok, File} ->
|
||||
{ok, #{
|
||||
filename => Filename,
|
||||
file => File,
|
||||
chunk_size => ChunkSize,
|
||||
caller_pid => CallerPid,
|
||||
mref => MRef
|
||||
}};
|
||||
|
@ -101,8 +109,8 @@ init([CallerPid, Filename, ChunkSize]) ->
|
|||
{stop, Reason}
|
||||
end.
|
||||
|
||||
handle_call(read, _From, #{file := File, chunk_size := ChunkSize} = State) ->
|
||||
case file:read(File, ChunkSize) of
|
||||
handle_call({read, Bytes}, _From, #{file := File} = State) ->
|
||||
case file:read(File, Bytes) of
|
||||
{ok, Data} ->
|
||||
?SLOG(debug, #{msg => "read", bytes => byte_size(Data)}),
|
||||
{reply, {ok, Data}, State};
|
||||
|
@ -113,7 +121,7 @@ handle_call(read, _From, #{file := File, chunk_size := ChunkSize} = State) ->
|
|||
{stop, Reason, Error, State}
|
||||
end;
|
||||
handle_call(Msg, _From, State) ->
|
||||
{stop, {bad_call, Msg}, {bad_call, Msg}, State}.
|
||||
{reply, {error, {bad_call, Msg}}, State}.
|
||||
|
||||
handle_info(
|
||||
{'DOWN', MRef, process, CallerPid, _Reason}, #{mref := MRef, caller_pid := CallerPid} = State
|
||||
|
|
|
@ -21,16 +21,16 @@
|
|||
-export([
|
||||
init/1,
|
||||
start_link/0,
|
||||
start_child/3
|
||||
start_child/2
|
||||
]).
|
||||
|
||||
start_link() ->
|
||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||
|
||||
start_child(CallerPid, Filename, ChunkSize) ->
|
||||
start_child(CallerPid, Filename) ->
|
||||
Childspec = #{
|
||||
id => {CallerPid, Filename},
|
||||
start => {emqx_ft_storage_fs_reader, start_link, [CallerPid, Filename, ChunkSize]},
|
||||
start => {emqx_ft_storage_fs_reader, start_link, [CallerPid, Filename]},
|
||||
restart => temporary
|
||||
},
|
||||
supervisor:start_child(?MODULE, Childspec).
|
||||
|
|
|
@ -20,14 +20,16 @@
|
|||
|
||||
-export([introduced_in/0]).
|
||||
|
||||
-export([read/2]).
|
||||
-export([read/3]).
|
||||
|
||||
-include_lib("emqx/include/bpapi.hrl").
|
||||
|
||||
introduced_in() ->
|
||||
"5.0.17".
|
||||
|
||||
-spec read(node(), pid()) ->
|
||||
-spec read(node(), pid(), pos_integer()) ->
|
||||
{ok, binary()} | eof | {error, term()} | no_return().
|
||||
read(Node, Pid) ->
|
||||
emqx_rpc:call(Node, emqx_ft_storage_fs_reader, read, [Pid]).
|
||||
read(Node, Pid, Bytes) when
|
||||
is_atom(Node) andalso is_pid(Pid) andalso is_integer(Bytes) andalso Bytes > 0
|
||||
->
|
||||
emqx_rpc:call(Node, emqx_ft_storage_fs_reader, read, [Pid, Bytes]).
|
||||
|
|
|
@ -0,0 +1,153 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_ft_storage_fs_reader_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("stdlib/include/assert.hrl").
|
||||
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_ft]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
ok = emqx_common_test_helpers:stop_apps([emqx_ft]),
|
||||
ok.
|
||||
|
||||
init_per_testcase(_Case, Config) ->
|
||||
file:make_dir(?config(data_dir, Config)),
|
||||
Data = <<"hello world">>,
|
||||
Path = expand_path(Config, "test_file"),
|
||||
ok = mk_test_file(Path, Data),
|
||||
[{path, Path} | Config].
|
||||
|
||||
end_per_testcase(_Case, _Config) ->
|
||||
ok.
|
||||
|
||||
t_successful_read(Config) ->
|
||||
Path = ?config(path, Config),
|
||||
|
||||
{ok, ReaderPid} = emqx_ft_storage_fs_reader:start_link(self(), Path),
|
||||
?assertEqual(
|
||||
{ok, <<"hello ">>},
|
||||
emqx_ft_storage_fs_reader:read(ReaderPid, 6)
|
||||
),
|
||||
?assertEqual(
|
||||
{ok, <<"world">>},
|
||||
emqx_ft_storage_fs_reader:read(ReaderPid, 6)
|
||||
),
|
||||
?assertEqual(
|
||||
eof,
|
||||
emqx_ft_storage_fs_reader:read(ReaderPid, 6)
|
||||
),
|
||||
?assertNot(is_process_alive(ReaderPid)).
|
||||
|
||||
t_caller_dead(Config) ->
|
||||
erlang:process_flag(trap_exit, true),
|
||||
|
||||
Path = ?config(path, Config),
|
||||
|
||||
CallerPid = spawn_link(
|
||||
fun() ->
|
||||
receive
|
||||
stop -> ok
|
||||
end
|
||||
end
|
||||
),
|
||||
{ok, ReaderPid} = emqx_ft_storage_fs_reader:start_link(CallerPid, Path),
|
||||
_ = erlang:monitor(process, ReaderPid),
|
||||
?assertEqual(
|
||||
{ok, <<"hello ">>},
|
||||
emqx_ft_storage_fs_reader:read(ReaderPid, 6)
|
||||
),
|
||||
CallerPid ! stop,
|
||||
receive
|
||||
{'DOWN', _, process, ReaderPid, _} -> ok
|
||||
after 1000 ->
|
||||
ct:fail("Reader process did not die")
|
||||
end.
|
||||
|
||||
t_tables(Config) ->
|
||||
Path = ?config(path, Config),
|
||||
|
||||
{ok, ReaderPid0} = emqx_ft_storage_fs_reader:start_link(self(), Path),
|
||||
|
||||
ReaderQH0 = emqx_ft_storage_fs_reader:table(ReaderPid0, 6),
|
||||
?assertEqual(
|
||||
[<<"hello ">>, <<"world">>],
|
||||
qlc:eval(ReaderQH0)
|
||||
),
|
||||
|
||||
{ok, ReaderPid1} = emqx_ft_storage_fs_reader:start_link(self(), Path),
|
||||
|
||||
ReaderQH1 = emqx_ft_storage_fs_reader:table(ReaderPid1),
|
||||
?assertEqual(
|
||||
[<<"hello world">>],
|
||||
qlc:eval(ReaderQH1)
|
||||
).
|
||||
|
||||
t_bad_messages(Config) ->
|
||||
Path = ?config(path, Config),
|
||||
|
||||
{ok, ReaderPid} = emqx_ft_storage_fs_reader:start_link(self(), Path),
|
||||
|
||||
ReaderPid ! {bad, message},
|
||||
gen_server:cast(ReaderPid, {bad, message}),
|
||||
|
||||
?assertEqual(
|
||||
{error, {bad_call, {bad, message}}},
|
||||
gen_server:call(ReaderPid, {bad, message})
|
||||
).
|
||||
|
||||
t_nonexistent_file(_Config) ->
|
||||
?assertEqual(
|
||||
{error, enoent},
|
||||
emqx_ft_storage_fs_reader:start_link(self(), "/a/b/c/bar")
|
||||
).
|
||||
|
||||
t_start_supervised(Config) ->
|
||||
Path = ?config(path, Config),
|
||||
|
||||
{ok, ReaderPid} = emqx_ft_storage_fs_reader:start_supervised(self(), Path),
|
||||
?assertEqual(
|
||||
{ok, <<"hello ">>},
|
||||
emqx_ft_storage_fs_reader:read(ReaderPid, 6)
|
||||
).
|
||||
|
||||
t_rpc_error(_Config) ->
|
||||
ReaderQH = emqx_ft_storage_fs_reader:table(fake_remote_pid('dummy@127.0.0.1'), 6),
|
||||
?assertEqual(
|
||||
[],
|
||||
qlc:eval(ReaderQH)
|
||||
).
|
||||
|
||||
mk_test_file(Path, Data) ->
|
||||
ok = file:write_file(Path, Data).
|
||||
|
||||
expand_path(Config, Filename) ->
|
||||
filename:join([?config(data_dir, Config), Filename]).
|
||||
|
||||
%% This is a hack to create a pid that is not registered on the local node.
|
||||
%% https://www.erlang.org/doc/apps/erts/erl_ext_dist.html#new_pid_ext
|
||||
fake_remote_pid(Node) ->
|
||||
<<131, NodeAtom/binary>> = term_to_binary(Node),
|
||||
PidBin = <<131, 88, NodeAtom/binary, 1:32/big, 1:32/big, 1:32/big>>,
|
||||
binary_to_term(PidBin).
|
Loading…
Reference in New Issue