206 lines
6.1 KiB
Erlang
206 lines
6.1 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_ft_storage).
|
|
|
|
-include_lib("emqx/include/types.hrl").
|
|
|
|
-export(
|
|
[
|
|
store_filemeta/2,
|
|
store_segment/2,
|
|
assemble/3,
|
|
kickoff/1,
|
|
|
|
files/0,
|
|
files/1,
|
|
|
|
with_storage_type/2,
|
|
with_storage_type/3,
|
|
|
|
backend/0,
|
|
update_config/2
|
|
]
|
|
).
|
|
|
|
-type type() :: local.
|
|
-type backend() :: {type(), storage()}.
|
|
-type storage() :: config().
|
|
-type config() :: emqx_config:config().
|
|
|
|
-export_type([backend/0]).
|
|
|
|
-export_type([assemble_callback/0]).
|
|
|
|
-export_type([query/1]).
|
|
-export_type([page/2]).
|
|
-export_type([file_info/0]).
|
|
-export_type([export_data/0]).
|
|
-export_type([reader/0]).
|
|
|
|
-type assemble_callback() :: fun((ok | {error, term()}) -> any()).
|
|
|
|
-type query(Cursor) ::
|
|
#{transfer => emqx_ft:transfer()}
|
|
| #{
|
|
limit => non_neg_integer(),
|
|
following => Cursor
|
|
}.
|
|
|
|
-type page(Item, Cursor) :: #{
|
|
items := [Item],
|
|
cursor => Cursor
|
|
}.
|
|
|
|
-type file_info() :: #{
|
|
transfer := emqx_ft:transfer(),
|
|
name := file:name(),
|
|
size := _Bytes :: non_neg_integer(),
|
|
timestamp := emqx_utils_calendar:epoch_second(),
|
|
uri => uri_string:uri_string(),
|
|
meta => emqx_ft:filemeta()
|
|
}.
|
|
|
|
-type export_data() :: binary() | qlc:query_handle().
|
|
-type reader() :: pid().
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Behaviour
|
|
%%--------------------------------------------------------------------
|
|
|
|
%% NOTE
|
|
%% An async task will wait for a `kickoff` message to start processing, to give some time
|
|
%% to set up monitors, etc. Async task will not explicitly report the processing result,
|
|
%% you are expected to receive and handle exit reason of the process, which is
|
|
%% -type result() :: `{shutdown, ok | {error, _}}`.
|
|
|
|
-callback store_filemeta(storage(), emqx_ft:transfer(), emqx_ft:filemeta()) ->
|
|
ok | {async, pid()} | {error, term()}.
|
|
-callback store_segment(storage(), emqx_ft:transfer(), emqx_ft:segment()) ->
|
|
ok | {async, pid()} | {error, term()}.
|
|
-callback assemble(storage(), emqx_ft:transfer(), _Size :: emqx_ft:bytes(), emqx_ft:finopts()) ->
|
|
ok | {async, pid()} | {error, term()}.
|
|
|
|
-callback files(storage(), query(Cursor)) ->
|
|
{ok, page(file_info(), Cursor)} | {error, term()}.
|
|
|
|
-callback start(storage()) -> any().
|
|
-callback stop(storage()) -> any().
|
|
|
|
-callback update_config(_OldConfig :: option(storage()), _NewConfig :: option(storage())) ->
|
|
any().
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% API
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec store_filemeta(emqx_ft:transfer(), emqx_ft:filemeta()) ->
|
|
ok | {async, pid()} | {error, term()}.
|
|
store_filemeta(Transfer, FileMeta) ->
|
|
dispatch(store_filemeta, [Transfer, FileMeta]).
|
|
|
|
-spec store_segment(emqx_ft:transfer(), emqx_ft:segment()) ->
|
|
ok | {async, pid()} | {error, term()}.
|
|
store_segment(Transfer, Segment) ->
|
|
dispatch(store_segment, [Transfer, Segment]).
|
|
|
|
-spec assemble(emqx_ft:transfer(), emqx_ft:bytes(), emqx_ft:finopts()) ->
|
|
ok | {async, pid()} | {error, term()}.
|
|
assemble(Transfer, Size, FinOpts) ->
|
|
dispatch(assemble, [Transfer, Size, FinOpts]).
|
|
|
|
-spec kickoff(pid()) -> ok.
|
|
kickoff(Pid) ->
|
|
_ = erlang:send(Pid, kickoff),
|
|
ok.
|
|
|
|
%%
|
|
|
|
-spec files() ->
|
|
{ok, page(file_info(), _)} | {error, term()}.
|
|
files() ->
|
|
files(#{}).
|
|
|
|
-spec files(query(Cursor)) ->
|
|
{ok, page(file_info(), Cursor)} | {error, term()}.
|
|
files(Query) ->
|
|
dispatch(files, [Query]).
|
|
|
|
-spec dispatch(atom(), list(term())) -> any().
|
|
dispatch(Fun, Args) when is_atom(Fun) ->
|
|
{Type, Storage} = backend(),
|
|
apply(mod(Type), Fun, [Storage | Args]).
|
|
|
|
%%
|
|
|
|
-spec with_storage_type(atom(), atom() | function()) -> any().
|
|
with_storage_type(Type, Fun) ->
|
|
with_storage_type(Type, Fun, []).
|
|
|
|
-spec with_storage_type(atom(), atom() | function(), list(term())) -> any().
|
|
with_storage_type(Type, Fun, Args) ->
|
|
case backend() of
|
|
{Type, Storage} when is_atom(Fun) ->
|
|
apply(mod(Type), Fun, [Storage | Args]);
|
|
{Type, Storage} when is_function(Fun) ->
|
|
apply(Fun, [Storage | Args]);
|
|
{_, _} = Backend ->
|
|
{error, {invalid_storage_backend, Backend}}
|
|
end.
|
|
|
|
%%
|
|
|
|
-spec backend() -> backend().
|
|
backend() ->
|
|
backend(emqx_ft_conf:storage()).
|
|
|
|
-spec update_config(_Old :: emqx_maybe:t(config()), _New :: emqx_maybe:t(config())) ->
|
|
ok.
|
|
update_config(ConfigOld, ConfigNew) ->
|
|
on_backend_update(
|
|
emqx_maybe:apply(fun backend/1, ConfigOld),
|
|
emqx_maybe:apply(fun backend/1, ConfigNew)
|
|
).
|
|
|
|
on_backend_update({Type, _} = Backend, {Type, _} = Backend) ->
|
|
ok;
|
|
on_backend_update({Type, StorageOld}, {Type, StorageNew}) ->
|
|
ok = (mod(Type)):update_config(StorageOld, StorageNew);
|
|
on_backend_update(BackendOld, BackendNew) when
|
|
(BackendOld =:= undefined orelse is_tuple(BackendOld)) andalso
|
|
(BackendNew =:= undefined orelse is_tuple(BackendNew))
|
|
->
|
|
_ = emqx_maybe:apply(fun stop_backend/1, BackendOld),
|
|
_ = emqx_maybe:apply(fun start_backend/1, BackendNew),
|
|
ok.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Local API
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec backend(config()) -> backend().
|
|
backend(Config) ->
|
|
emqx_ft_schema:backend(Config).
|
|
|
|
start_backend({Type, Storage}) ->
|
|
(mod(Type)):start(Storage).
|
|
|
|
stop_backend({Type, Storage}) ->
|
|
(mod(Type)):stop(Storage).
|
|
|
|
mod(local) ->
|
|
emqx_ft_storage_fs.
|