From a9866fede4cb4feba41a78d79885843ade0ad729 Mon Sep 17 00:00:00 2001 From: Andrew Mayorov Date: Tue, 25 Apr 2023 14:42:26 +0300 Subject: [PATCH] feat(ft-api): support paging in S3 storage exporter --- .../src/emqx_ft_storage_exporter_s3.erl | 118 ++++++++++++------ 1 file changed, 82 insertions(+), 36 deletions(-) diff --git a/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl b/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl index adf000346..c7110c74a 100644 --- a/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl +++ b/apps/emqx_ft/src/emqx_ft_storage_exporter_s3.erl @@ -23,7 +23,7 @@ -export([write/2]). -export([complete/2]). -export([discard/1]). --export([list/1]). +-export([list/2]). -export([ start/1, @@ -43,6 +43,10 @@ filemeta => filemeta() }. +-type query() :: emqx_ft_storage:query(cursor()). +-type page(T) :: emqx_ft_storage:page(T, cursor()). +-type cursor() :: iodata(). + -type export_st() :: #{ pid := pid(), filemeta := filemeta(), @@ -92,10 +96,10 @@ complete(#{pid := Pid} = _ExportSt, _Checksum) -> discard(#{pid := Pid} = _ExportSt) -> emqx_s3_uploader:abort(Pid). --spec list(options()) -> - {ok, [exportinfo()]} | {error, term()}. -list(Options) -> - emqx_s3:with_client(?S3_PROFILE_ID, fun(Client) -> list(Client, Options) end). +-spec list(options(), query()) -> + {ok, page(exportinfo())} | {error, term()}. +list(Options, Query) -> + emqx_s3:with_client(?S3_PROFILE_ID, fun(Client) -> list(Client, Options, Query) end). %%-------------------------------------------------------------------- %% Exporter behaviour (lifecycle) @@ -117,12 +121,11 @@ update(_OldOptions, NewOptions) -> %% Internal functions %% ------------------------------------------------------------------- -s3_key({ClientId, FileId} = _Transfer, #{name := Filename}) -> - filename:join([ - emqx_ft_fs_util:escape_filename(ClientId), - emqx_ft_fs_util:escape_filename(FileId), - Filename - ]). +s3_key(Transfer, #{name := Filename}) -> + s3_prefix(Transfer) ++ "/" ++ Filename. + +s3_prefix({ClientId, FileId} = _Transfer) -> + emqx_ft_fs_util:escape_filename(ClientId) ++ "/" ++ emqx_ft_fs_util:escape_filename(FileId). s3_headers({ClientId, FileId}, Filemeta) -> #{ @@ -137,54 +140,97 @@ s3_headers({ClientId, FileId}, Filemeta) -> s3_header_filemeta(Filemeta) -> emqx_utils_json:encode(emqx_ft:encode_filemeta(Filemeta), [force_utf8, uescape]). -list(Client, Options) -> - case list_key_info(Client, Options) of - {ok, KeyInfos} -> - MaybeExportInfos = lists:map( - fun(KeyInfo) -> key_info_to_exportinfo(Client, KeyInfo, Options) end, KeyInfos - ), - ExportInfos = [ExportInfo || {ok, ExportInfo} <- MaybeExportInfos], - {ok, ExportInfos}; +list(Client, _Options, #{transfer := Transfer}) -> + case list_key_info(Client, [{prefix, s3_prefix(Transfer)}, {max_keys, ?S3_LIST_LIMIT}]) of + {ok, {Exports, _Marker}} -> + {ok, #{items => Exports}}; + {error, _Reason} = Error -> + Error + end; +list(Client, _Options, Query) -> + Limit = maps:get(limit, Query, undefined), + Marker = emqx_maybe:apply(fun decode_cursor/1, maps:get(cursor, Query, undefined)), + case list_pages(Client, Marker, Limit, []) of + {ok, {Exports, undefined}} -> + {ok, #{items => Exports}}; + {ok, {Exports, NextMarker}} -> + {ok, #{items => Exports, cursor => encode_cursor(NextMarker)}}; {error, _Reason} = Error -> Error end. -list_key_info(Client, Options) -> - list_key_info(Client, Options, _Marker = [], _Acc = []). +list_pages(Client, Marker, Limit, Acc) -> + MaxKeys = min(?S3_LIST_LIMIT, Limit), + ListOptions = [{marker, Marker} || Marker =/= undefined], + case list_key_info(Client, [{max_keys, MaxKeys} | ListOptions]) of + {ok, {Exports, NextMarker}} -> + list_accumulate(Client, Limit, NextMarker, [Exports | Acc]); + {error, _Reason} = Error -> + Error + end. -list_key_info(Client, Options, Marker, Acc) -> - ListOptions = [{max_keys, ?S3_LIST_LIMIT}] ++ Marker, +list_accumulate(_Client, _Limit, undefined, Acc) -> + {ok, {flatten_pages(Acc), undefined}}; +list_accumulate(Client, undefined, Marker, Acc) -> + list_pages(Client, Marker, undefined, Acc); +list_accumulate(Client, Limit, Marker, Acc = [Exports | _]) -> + case Limit - length(Exports) of + 0 -> + {ok, {flatten_pages(Acc), Marker}}; + Left -> + list_pages(Client, Marker, Left, Acc) + end. + +flatten_pages(Pages) -> + lists:append(lists:reverse(Pages)). + +list_key_info(Client, ListOptions) -> case emqx_s3_client:list(Client, ListOptions) of {ok, Result} -> ?SLOG(debug, #{msg => "list_key_info", result => Result}), KeyInfos = proplists:get_value(contents, Result, []), - case proplists:get_value(is_truncated, Result, false) of - true -> - NewMarker = next_marker(KeyInfos), - list_key_info(Client, Options, NewMarker, [KeyInfos | Acc]); - false -> - {ok, lists:append(lists:reverse([KeyInfos | Acc]))} - end; + Exports = lists:filtermap( + fun(KeyInfo) -> key_info_to_exportinfo(Client, KeyInfo) end, KeyInfos + ), + Marker = + case proplists:get_value(is_truncated, Result, false) of + true -> + next_marker(KeyInfos); + false -> + undefined + end, + {ok, {Exports, Marker}}; {error, _Reason} = Error -> Error end. -next_marker(KeyInfos) -> - [{marker, proplists:get_value(key, lists:last(KeyInfos))}]. +encode_cursor(Key) -> + unicode:characters_to_binary(Key). -key_info_to_exportinfo(Client, KeyInfo, _Options) -> +decode_cursor(Cursor) -> + case unicode:characters_to_list(Cursor) of + Key when is_list(Key) -> + Key; + _ -> + error({badarg, cursor}) + end. + +next_marker(KeyInfos) -> + proplists:get_value(key, lists:last(KeyInfos)). + +key_info_to_exportinfo(Client, KeyInfo) -> Key = proplists:get_value(key, KeyInfo), case parse_transfer_and_name(Key) of {ok, {Transfer, Name}} -> - {ok, #{ + {true, #{ transfer => Transfer, name => unicode:characters_to_binary(Name), uri => emqx_s3_client:uri(Client, Key), timestamp => datetime_to_epoch_second(proplists:get_value(last_modified, KeyInfo)), size => proplists:get_value(size, KeyInfo) }}; - {error, _Reason} = Error -> - Error + {error, _Reason} -> + false end. -define(EPOCH_START, 62167219200).