From 8038a3fd4ac5b0f0b9726869f92d1698be6a703a Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Tue, 7 Feb 2023 23:29:22 +0200 Subject: [PATCH] feat(ft): add tests for remote reader --- apps/emqx_ft/etc/emqx_ft.conf | 5 + .../emqx_ft/src/emqx_ft_storage_fs_reader.erl | 108 +++++++------ .../src/emqx_ft_storage_fs_reader_sup.erl | 6 +- .../emqx_ft_storage_fs_reader_proto_v1.erl | 10 +- .../test/emqx_ft_storage_fs_reader_SUITE.erl | 153 ++++++++++++++++++ 5 files changed, 225 insertions(+), 57 deletions(-) create mode 100644 apps/emqx_ft/etc/emqx_ft.conf create mode 100644 apps/emqx_ft/test/emqx_ft_storage_fs_reader_SUITE.erl diff --git a/apps/emqx_ft/etc/emqx_ft.conf b/apps/emqx_ft/etc/emqx_ft.conf new file mode 100644 index 000000000..250dca6a9 --- /dev/null +++ b/apps/emqx_ft/etc/emqx_ft.conf @@ -0,0 +1,5 @@ +file_transfer { + storage { + type = local + } +} diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs_reader.erl b/apps/emqx_ft/src/emqx_ft_storage_fs_reader.erl index 9c4aa5e0c..782959e19 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs_reader.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs_reader.erl @@ -19,7 +19,18 @@ -behaviour(gen_server). -include_lib("emqx/include/logger.hrl"). +-include_lib("emqx/include/types.hrl"). +%% API +-export([ + start_link/2, + start_supervised/2, + table/1, + table/2, + read/2 +]). + +%% gen_server callbacks -export([ init/1, handle_call/3, @@ -29,71 +40,68 @@ code_change/3 ]). --export([ - start_link/2, - start_link/3, - start_supervised/2, - start_supervised/3, - read/1 -]). - --export([ - table/1 -]). - -define(DEFAULT_CHUNK_SIZE, 1024). +-define(IS_FILENAME(Filename), (is_list(Filename) or is_binary(Filename))). -table(ReaderPid) -> +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +-spec table(pid()) -> qlc:query_handle(). +table(ReaderPid) when is_pid(ReaderPid) -> + table(ReaderPid, ?DEFAULT_CHUNK_SIZE). + +-spec table(pid(), pos_integer()) -> qlc:query_handle(). +table(ReaderPid, Bytes) when is_pid(ReaderPid) andalso is_integer(Bytes) andalso Bytes > 0 -> NextFun = fun NextFun(Pid) -> - try - case emqx_ft_storage_fs_reader_proto_v1:read(node(Pid), Pid) of - eof -> - []; - {ok, Data} -> - [Data | fun() -> NextFun(Pid) end]; - {error, Reason} -> - ?SLOG(warning, #{msg => "file_read_error", reason => Reason}), - []; - {BadRPC, Reason} when BadRPC =:= badrpc orelse BadRPC =:= badtcp -> - ?SLOG(warning, #{msg => "file_read_rpc_error", kind => BadRPC, reason => Reason}), - [] - end - catch - Class:Error:Stacktrace -> - ?SLOG(warning, #{ - msg => "file_read_error", - class => Class, - reason => Error, - stacktrace => Stacktrace - }), + case emqx_ft_storage_fs_reader_proto_v1:read(node(Pid), Pid, Bytes) of + eof -> + []; + {ok, Data} -> + [Data | fun() -> NextFun(Pid) end]; + {error, Reason} -> + ?SLOG(warning, #{msg => "file_read_error", reason => Reason}), + []; + {BadRPC, Reason} when BadRPC =:= badrpc orelse BadRPC =:= badtcp -> + ?SLOG(warning, #{msg => "file_read_rpc_error", kind => BadRPC, reason => Reason}), [] end end, qlc:table(fun() -> NextFun(ReaderPid) end, []). -start_link(CallerPid, Filename) -> - start_link(CallerPid, Filename, ?DEFAULT_CHUNK_SIZE). +-spec start_link(pid(), filename:filename()) -> startlink_ret(). +start_link(CallerPid, Filename) when + is_pid(CallerPid) andalso + ?IS_FILENAME(Filename) +-> + gen_server:start_link(?MODULE, [CallerPid, Filename], []). -start_link(CallerPid, Filename, ChunkSize) -> - gen_server:start_link(?MODULE, [CallerPid, Filename, ChunkSize], []). +-spec start_supervised(pid(), filename:filename()) -> startlink_ret(). +start_supervised(CallerPid, Filename) when + is_pid(CallerPid) andalso + ?IS_FILENAME(Filename) +-> + emqx_ft_storage_fs_reader_sup:start_child(CallerPid, Filename). -start_supervised(CallerPid, Filename) -> - start_supervised(CallerPid, Filename, ?DEFAULT_CHUNK_SIZE). +-spec read(pid(), pos_integer()) -> {ok, binary()} | eof | {error, term()}. +read(Pid, Bytes) when + is_pid(Pid) andalso + is_integer(Bytes) andalso + Bytes > 0 +-> + gen_server:call(Pid, {read, Bytes}). -start_supervised(CallerPid, Filename, ChunkSize) -> - emqx_ft_storage_fs_reader_sup:start_child(CallerPid, Filename, ChunkSize). +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- -read(Pid) -> - gen_server:call(Pid, read). - -init([CallerPid, Filename, ChunkSize]) -> +init([CallerPid, Filename]) -> MRef = erlang:monitor(process, CallerPid), case file:open(Filename, [read, raw, binary]) of {ok, File} -> {ok, #{ filename => Filename, file => File, - chunk_size => ChunkSize, caller_pid => CallerPid, mref => MRef }}; @@ -101,8 +109,8 @@ init([CallerPid, Filename, ChunkSize]) -> {stop, Reason} end. -handle_call(read, _From, #{file := File, chunk_size := ChunkSize} = State) -> - case file:read(File, ChunkSize) of +handle_call({read, Bytes}, _From, #{file := File} = State) -> + case file:read(File, Bytes) of {ok, Data} -> ?SLOG(debug, #{msg => "read", bytes => byte_size(Data)}), {reply, {ok, Data}, State}; @@ -113,7 +121,7 @@ handle_call(read, _From, #{file := File, chunk_size := ChunkSize} = State) -> {stop, Reason, Error, State} end; handle_call(Msg, _From, State) -> - {stop, {bad_call, Msg}, {bad_call, Msg}, State}. + {reply, {error, {bad_call, Msg}}, State}. handle_info( {'DOWN', MRef, process, CallerPid, _Reason}, #{mref := MRef, caller_pid := CallerPid} = State diff --git a/apps/emqx_ft/src/emqx_ft_storage_fs_reader_sup.erl b/apps/emqx_ft/src/emqx_ft_storage_fs_reader_sup.erl index 7435d7e1c..934e2888c 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_fs_reader_sup.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_fs_reader_sup.erl @@ -21,16 +21,16 @@ -export([ init/1, start_link/0, - start_child/3 + start_child/2 ]). start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). -start_child(CallerPid, Filename, ChunkSize) -> +start_child(CallerPid, Filename) -> Childspec = #{ id => {CallerPid, Filename}, - start => {emqx_ft_storage_fs_reader, start_link, [CallerPid, Filename, ChunkSize]}, + start => {emqx_ft_storage_fs_reader, start_link, [CallerPid, Filename]}, restart => temporary }, supervisor:start_child(?MODULE, Childspec). diff --git a/apps/emqx_ft/src/proto/emqx_ft_storage_fs_reader_proto_v1.erl b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_reader_proto_v1.erl index 982b9ca57..db5e35f94 100644 --- a/apps/emqx_ft/src/proto/emqx_ft_storage_fs_reader_proto_v1.erl +++ b/apps/emqx_ft/src/proto/emqx_ft_storage_fs_reader_proto_v1.erl @@ -20,14 +20,16 @@ -export([introduced_in/0]). --export([read/2]). +-export([read/3]). -include_lib("emqx/include/bpapi.hrl"). introduced_in() -> "5.0.17". --spec read(node(), pid()) -> +-spec read(node(), pid(), pos_integer()) -> {ok, binary()} | eof | {error, term()} | no_return(). -read(Node, Pid) -> - emqx_rpc:call(Node, emqx_ft_storage_fs_reader, read, [Pid]). +read(Node, Pid, Bytes) when + is_atom(Node) andalso is_pid(Pid) andalso is_integer(Bytes) andalso Bytes > 0 +-> + emqx_rpc:call(Node, emqx_ft_storage_fs_reader, read, [Pid, Bytes]). diff --git a/apps/emqx_ft/test/emqx_ft_storage_fs_reader_SUITE.erl b/apps/emqx_ft/test/emqx_ft_storage_fs_reader_SUITE.erl new file mode 100644 index 000000000..e979d06fc --- /dev/null +++ b/apps/emqx_ft/test/emqx_ft_storage_fs_reader_SUITE.erl @@ -0,0 +1,153 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_ft_storage_fs_reader_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("stdlib/include/assert.hrl"). + +all() -> emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + ok = emqx_common_test_helpers:start_apps([emqx_ft]), + Config. + +end_per_suite(_Config) -> + ok = emqx_common_test_helpers:stop_apps([emqx_ft]), + ok. + +init_per_testcase(_Case, Config) -> + file:make_dir(?config(data_dir, Config)), + Data = <<"hello world">>, + Path = expand_path(Config, "test_file"), + ok = mk_test_file(Path, Data), + [{path, Path} | Config]. + +end_per_testcase(_Case, _Config) -> + ok. + +t_successful_read(Config) -> + Path = ?config(path, Config), + + {ok, ReaderPid} = emqx_ft_storage_fs_reader:start_link(self(), Path), + ?assertEqual( + {ok, <<"hello ">>}, + emqx_ft_storage_fs_reader:read(ReaderPid, 6) + ), + ?assertEqual( + {ok, <<"world">>}, + emqx_ft_storage_fs_reader:read(ReaderPid, 6) + ), + ?assertEqual( + eof, + emqx_ft_storage_fs_reader:read(ReaderPid, 6) + ), + ?assertNot(is_process_alive(ReaderPid)). + +t_caller_dead(Config) -> + erlang:process_flag(trap_exit, true), + + Path = ?config(path, Config), + + CallerPid = spawn_link( + fun() -> + receive + stop -> ok + end + end + ), + {ok, ReaderPid} = emqx_ft_storage_fs_reader:start_link(CallerPid, Path), + _ = erlang:monitor(process, ReaderPid), + ?assertEqual( + {ok, <<"hello ">>}, + emqx_ft_storage_fs_reader:read(ReaderPid, 6) + ), + CallerPid ! stop, + receive + {'DOWN', _, process, ReaderPid, _} -> ok + after 1000 -> + ct:fail("Reader process did not die") + end. + +t_tables(Config) -> + Path = ?config(path, Config), + + {ok, ReaderPid0} = emqx_ft_storage_fs_reader:start_link(self(), Path), + + ReaderQH0 = emqx_ft_storage_fs_reader:table(ReaderPid0, 6), + ?assertEqual( + [<<"hello ">>, <<"world">>], + qlc:eval(ReaderQH0) + ), + + {ok, ReaderPid1} = emqx_ft_storage_fs_reader:start_link(self(), Path), + + ReaderQH1 = emqx_ft_storage_fs_reader:table(ReaderPid1), + ?assertEqual( + [<<"hello world">>], + qlc:eval(ReaderQH1) + ). + +t_bad_messages(Config) -> + Path = ?config(path, Config), + + {ok, ReaderPid} = emqx_ft_storage_fs_reader:start_link(self(), Path), + + ReaderPid ! {bad, message}, + gen_server:cast(ReaderPid, {bad, message}), + + ?assertEqual( + {error, {bad_call, {bad, message}}}, + gen_server:call(ReaderPid, {bad, message}) + ). + +t_nonexistent_file(_Config) -> + ?assertEqual( + {error, enoent}, + emqx_ft_storage_fs_reader:start_link(self(), "/a/b/c/bar") + ). + +t_start_supervised(Config) -> + Path = ?config(path, Config), + + {ok, ReaderPid} = emqx_ft_storage_fs_reader:start_supervised(self(), Path), + ?assertEqual( + {ok, <<"hello ">>}, + emqx_ft_storage_fs_reader:read(ReaderPid, 6) + ). + +t_rpc_error(_Config) -> + ReaderQH = emqx_ft_storage_fs_reader:table(fake_remote_pid('dummy@127.0.0.1'), 6), + ?assertEqual( + [], + qlc:eval(ReaderQH) + ). + +mk_test_file(Path, Data) -> + ok = file:write_file(Path, Data). + +expand_path(Config, Filename) -> + filename:join([?config(data_dir, Config), Filename]). + +%% This is a hack to create a pid that is not registered on the local node. +%% https://www.erlang.org/doc/apps/erts/erl_ext_dist.html#new_pid_ext +fake_remote_pid(Node) -> + <<131, NodeAtom/binary>> = term_to_binary(Node), + PidBin = <<131, 88, NodeAtom/binary, 1:32/big, 1:32/big, 1:32/big>>, + binary_to_term(PidBin).