feat(ft): use new utils application
This commit is contained in:
parent
b951de4c6e
commit
69c4ba2a62
|
@ -98,7 +98,7 @@ unhook() ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
decode_filemeta(Payload) when is_binary(Payload) ->
|
decode_filemeta(Payload) when is_binary(Payload) ->
|
||||||
case emqx_json:safe_decode(Payload, [return_maps]) of
|
case emqx_utils_json:safe_decode(Payload, [return_maps]) of
|
||||||
{ok, Map} ->
|
{ok, Map} ->
|
||||||
decode_filemeta(Map);
|
decode_filemeta(Map);
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
|
@ -116,7 +116,7 @@ decode_filemeta(Map) when is_map(Map) ->
|
||||||
|
|
||||||
encode_filemeta(Meta = #{}) ->
|
encode_filemeta(Meta = #{}) ->
|
||||||
Schema = emqx_ft_schema:schema(filemeta),
|
Schema = emqx_ft_schema:schema(filemeta),
|
||||||
hocon_tconf:make_serializable(Schema, emqx_map_lib:binary_key_map(Meta), #{}).
|
hocon_tconf:make_serializable(Schema, emqx_utils_maps:binary_key_map(Meta), #{}).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Hooks
|
%% Hooks
|
||||||
|
|
|
@ -75,7 +75,7 @@ schema("/file_transfer/files") ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
error_msg(Code, Msg) ->
|
error_msg(Code, Msg) ->
|
||||||
#{code => Code, message => emqx_misc:readable_error_msg(Msg)}.
|
#{code => Code, message => emqx_utils:readable_error_msg(Msg)}.
|
||||||
|
|
||||||
roots() ->
|
roots() ->
|
||||||
[].
|
[].
|
||||||
|
|
|
@ -63,18 +63,18 @@ storage() ->
|
||||||
-spec gc_interval(_Storage) -> milliseconds().
|
-spec gc_interval(_Storage) -> milliseconds().
|
||||||
gc_interval(_Storage) ->
|
gc_interval(_Storage) ->
|
||||||
Conf = assert_storage(local),
|
Conf = assert_storage(local),
|
||||||
emqx_map_lib:deep_get([segments, gc, interval], Conf, ?DEFAULT_GC_INTERVAL).
|
emqx_utils_maps:deep_get([segments, gc, interval], Conf, ?DEFAULT_GC_INTERVAL).
|
||||||
|
|
||||||
-spec segments_ttl(_Storage) -> {_Min :: seconds(), _Max :: seconds()}.
|
-spec segments_ttl(_Storage) -> {_Min :: seconds(), _Max :: seconds()}.
|
||||||
segments_ttl(_Storage) ->
|
segments_ttl(_Storage) ->
|
||||||
Conf = assert_storage(local),
|
Conf = assert_storage(local),
|
||||||
{
|
{
|
||||||
emqx_map_lib:deep_get(
|
emqx_utils_maps:deep_get(
|
||||||
[segments, gc, minimum_segments_ttl],
|
[segments, gc, minimum_segments_ttl],
|
||||||
Conf,
|
Conf,
|
||||||
?DEFAULT_MIN_SEGMENTS_TTL
|
?DEFAULT_MIN_SEGMENTS_TTL
|
||||||
),
|
),
|
||||||
emqx_map_lib:deep_get(
|
emqx_utils_maps:deep_get(
|
||||||
[segments, gc, maximum_segments_ttl],
|
[segments, gc, maximum_segments_ttl],
|
||||||
Conf,
|
Conf,
|
||||||
?DEFAULT_MAX_SEGMENTS_TTL
|
?DEFAULT_MAX_SEGMENTS_TTL
|
||||||
|
|
|
@ -315,10 +315,10 @@ list(_Options) ->
|
||||||
-define(PRELUDE(Vsn, Meta), [<<"filemeta">>, Vsn, Meta]).
|
-define(PRELUDE(Vsn, Meta), [<<"filemeta">>, Vsn, Meta]).
|
||||||
|
|
||||||
encode_filemeta(Meta) ->
|
encode_filemeta(Meta) ->
|
||||||
emqx_json:encode(?PRELUDE(_Vsn = 1, emqx_ft:encode_filemeta(Meta))).
|
emqx_utils_json:encode(?PRELUDE(_Vsn = 1, emqx_ft:encode_filemeta(Meta))).
|
||||||
|
|
||||||
decode_filemeta(Binary) when is_binary(Binary) ->
|
decode_filemeta(Binary) when is_binary(Binary) ->
|
||||||
?PRELUDE(_Vsn = 1, Map) = emqx_json:decode(Binary, [return_maps]),
|
?PRELUDE(_Vsn = 1, Map) = emqx_utils_json:decode(Binary, [return_maps]),
|
||||||
case emqx_ft:decode_filemeta(Map) of
|
case emqx_ft:decode_filemeta(Map) of
|
||||||
{ok, Meta} ->
|
{ok, Meta} ->
|
||||||
Meta;
|
Meta;
|
||||||
|
|
|
@ -134,7 +134,7 @@ fields(file_node) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
error_msg(Code, Msg) ->
|
error_msg(Code, Msg) ->
|
||||||
#{code => Code, message => emqx_misc:readable_error_msg(Msg)}.
|
#{code => Code, message => emqx_utils:readable_error_msg(Msg)}.
|
||||||
|
|
||||||
-spec mk_export_uri(node(), file:name()) ->
|
-spec mk_export_uri(node(), file:name()) ->
|
||||||
uri_string:uri_string().
|
uri_string:uri_string().
|
||||||
|
@ -150,7 +150,7 @@ mk_export_uri(Node, Filepath) ->
|
||||||
%%
|
%%
|
||||||
|
|
||||||
parse_node(NodeBin) ->
|
parse_node(NodeBin) ->
|
||||||
case emqx_misc:safe_to_existing_atom(NodeBin) of
|
case emqx_utils:safe_to_existing_atom(NodeBin) of
|
||||||
{ok, Node} ->
|
{ok, Node} ->
|
||||||
Node;
|
Node;
|
||||||
{error, _} ->
|
{error, _} ->
|
||||||
|
|
|
@ -135,7 +135,7 @@ s3_headers({ClientId, FileId}, Filemeta) ->
|
||||||
}.
|
}.
|
||||||
|
|
||||||
s3_header_filemeta(Filemeta) ->
|
s3_header_filemeta(Filemeta) ->
|
||||||
emqx_json:encode(emqx_ft:encode_filemeta(Filemeta), [force_utf8, uescape]).
|
emqx_utils_json:encode(emqx_ft:encode_filemeta(Filemeta), [force_utf8, uescape]).
|
||||||
|
|
||||||
list(Client, Options) ->
|
list(Client, Options) ->
|
||||||
case list_key_info(Client, Options) of
|
case list_key_info(Client, Options) of
|
||||||
|
|
|
@ -271,7 +271,7 @@ read_transferinfo(Storage, Transfer, Acc) ->
|
||||||
-spec get_root(storage()) ->
|
-spec get_root(storage()) ->
|
||||||
file:name().
|
file:name().
|
||||||
get_root(Storage) ->
|
get_root(Storage) ->
|
||||||
case emqx_map_lib:deep_find([segments, root], Storage) of
|
case emqx_utils_maps:deep_find([segments, root], Storage) of
|
||||||
{ok, Root} ->
|
{ok, Root} ->
|
||||||
Root;
|
Root;
|
||||||
{not_found, _, _} ->
|
{not_found, _, _} ->
|
||||||
|
@ -296,10 +296,10 @@ get_subdirs_for(temporary) ->
|
||||||
-define(PRELUDE(Vsn, Meta), [<<"filemeta">>, Vsn, Meta]).
|
-define(PRELUDE(Vsn, Meta), [<<"filemeta">>, Vsn, Meta]).
|
||||||
|
|
||||||
encode_filemeta(Meta) ->
|
encode_filemeta(Meta) ->
|
||||||
emqx_json:encode(?PRELUDE(_Vsn = 1, emqx_ft:encode_filemeta(Meta))).
|
emqx_utils_json:encode(?PRELUDE(_Vsn = 1, emqx_ft:encode_filemeta(Meta))).
|
||||||
|
|
||||||
decode_filemeta(Binary) when is_binary(Binary) ->
|
decode_filemeta(Binary) when is_binary(Binary) ->
|
||||||
?PRELUDE(_Vsn = 1, Map) = emqx_json:decode(Binary, [return_maps]),
|
?PRELUDE(_Vsn = 1, Map) = emqx_utils_json:decode(Binary, [return_maps]),
|
||||||
case emqx_ft:decode_filemeta(Map) of
|
case emqx_ft:decode_filemeta(Map) of
|
||||||
{ok, Meta} ->
|
{ok, Meta} ->
|
||||||
Meta;
|
Meta;
|
||||||
|
@ -347,7 +347,7 @@ read_file(Filepath, DecodeFun) ->
|
||||||
|
|
||||||
write_file_atomic(Storage, Transfer, Filepath, Content) when is_binary(Content) ->
|
write_file_atomic(Storage, Transfer, Filepath, Content) when is_binary(Content) ->
|
||||||
TempFilepath = mk_temp_filepath(Storage, Transfer, filename:basename(Filepath)),
|
TempFilepath = mk_temp_filepath(Storage, Transfer, filename:basename(Filepath)),
|
||||||
Result = emqx_misc:pipeline(
|
Result = emqx_utils:pipeline(
|
||||||
[
|
[
|
||||||
fun filelib:ensure_dir/1,
|
fun filelib:ensure_dir/1,
|
||||||
fun write_contents/2,
|
fun write_contents/2,
|
||||||
|
|
|
@ -135,7 +135,7 @@ maybe_report(#gcstats{} = _Stats, #st{storage = _Storage}) ->
|
||||||
start_timer(St = #st{storage = Storage, next_gc_timer = undefined}) ->
|
start_timer(St = #st{storage = Storage, next_gc_timer = undefined}) ->
|
||||||
case emqx_ft_conf:gc_interval(Storage) of
|
case emqx_ft_conf:gc_interval(Storage) of
|
||||||
Delay when Delay > 0 ->
|
Delay when Delay > 0 ->
|
||||||
St#st{next_gc_timer = emqx_misc:start_timer(Delay, collect)};
|
St#st{next_gc_timer = emqx_utils:start_timer(Delay, collect)};
|
||||||
0 ->
|
0 ->
|
||||||
?SLOG(warning, #{msg => "periodic_gc_disabled"}),
|
?SLOG(warning, #{msg => "periodic_gc_disabled"}),
|
||||||
St
|
St
|
||||||
|
@ -144,7 +144,7 @@ start_timer(St = #st{storage = Storage, next_gc_timer = undefined}) ->
|
||||||
reset_timer(St = #st{next_gc_timer = undefined}) ->
|
reset_timer(St = #st{next_gc_timer = undefined}) ->
|
||||||
start_timer(St);
|
start_timer(St);
|
||||||
reset_timer(St = #st{next_gc_timer = TRef}) ->
|
reset_timer(St = #st{next_gc_timer = TRef}) ->
|
||||||
ok = emqx_misc:cancel_timer(TRef),
|
ok = emqx_utils:cancel_timer(TRef),
|
||||||
start_timer(St#st{next_gc_timer = undefined}).
|
start_timer(St#st{next_gc_timer = undefined}).
|
||||||
|
|
||||||
gc_enabled(St) ->
|
gc_enabled(St) ->
|
||||||
|
|
|
@ -63,7 +63,7 @@ set_special_configs(Config) ->
|
||||||
% NOTE
|
% NOTE
|
||||||
% Inhibit local fs GC to simulate it isn't fast enough to collect
|
% Inhibit local fs GC to simulate it isn't fast enough to collect
|
||||||
% complete transfers.
|
% complete transfers.
|
||||||
storage => emqx_map_lib:deep_merge(
|
storage => emqx_utils_maps:deep_merge(
|
||||||
Storage,
|
Storage,
|
||||||
#{segments => #{gc => #{interval => 0}}}
|
#{segments => #{gc => #{interval => 0}}}
|
||||||
)
|
)
|
||||||
|
@ -325,7 +325,7 @@ t_invalid_meta(Config) ->
|
||||||
|
|
||||||
%% Invalid schema
|
%% Invalid schema
|
||||||
Meta = #{foo => <<"bar">>},
|
Meta = #{foo => <<"bar">>},
|
||||||
MetaPayload = emqx_json:encode(Meta),
|
MetaPayload = emqx_utils_json:encode(Meta),
|
||||||
?assertRCName(
|
?assertRCName(
|
||||||
unspecified_error,
|
unspecified_error,
|
||||||
emqtt:publish(C, mk_init_topic(FileId), MetaPayload, 1)
|
emqtt:publish(C, mk_init_topic(FileId), MetaPayload, 1)
|
||||||
|
@ -667,7 +667,7 @@ meta(FileName, Data) ->
|
||||||
}.
|
}.
|
||||||
|
|
||||||
encode_meta(Meta) ->
|
encode_meta(Meta) ->
|
||||||
emqx_json:encode(emqx_ft:encode_filemeta(Meta)).
|
emqx_utils_json:encode(emqx_ft:encode_filemeta(Meta)).
|
||||||
|
|
||||||
list_files(ClientId) ->
|
list_files(ClientId) ->
|
||||||
{ok, Files} = emqx_ft_storage:files(),
|
{ok, Files} = emqx_ft_storage:files(),
|
||||||
|
|
|
@ -137,7 +137,7 @@ request(Method, Url, Decoder) when is_function(Decoder) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
json(Body) when is_binary(Body) ->
|
json(Body) when is_binary(Body) ->
|
||||||
emqx_json:decode(Body, [return_maps]).
|
emqx_utils_json:decode(Body, [return_maps]).
|
||||||
|
|
||||||
query(Params) ->
|
query(Params) ->
|
||||||
KVs = lists:map(fun({K, V}) -> uri_encode(K) ++ "=" ++ uri_encode(V) end, maps:to_list(Params)),
|
KVs = lists:map(fun({K, V}) -> uri_encode(K) ++ "=" ++ uri_encode(V) end, maps:to_list(Params)),
|
||||||
|
|
|
@ -127,7 +127,7 @@ t_gc_complete_transfers(_Config) ->
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
% 1. Start all transfers
|
% 1. Start all transfers
|
||||||
TransferSizes = emqx_misc:pmap(
|
TransferSizes = emqx_utils:pmap(
|
||||||
fun(Transfer) -> start_transfer(Storage, Transfer) end,
|
fun(Transfer) -> start_transfer(Storage, Transfer) end,
|
||||||
Transfers
|
Transfers
|
||||||
),
|
),
|
||||||
|
@ -162,7 +162,7 @@ t_gc_complete_transfers(_Config) ->
|
||||||
),
|
),
|
||||||
?assertEqual(
|
?assertEqual(
|
||||||
[ok, ok],
|
[ok, ok],
|
||||||
emqx_misc:pmap(
|
emqx_utils:pmap(
|
||||||
fun({Transfer, Size}) -> complete_transfer(Storage, Transfer, Size) end,
|
fun({Transfer, Size}) -> complete_transfer(Storage, Transfer, Size) end,
|
||||||
[{T2, S2}, {T3, S3}]
|
[{T2, S2}, {T3, S3}]
|
||||||
)
|
)
|
||||||
|
@ -221,7 +221,7 @@ t_gc_incomplete_transfers(_Config) ->
|
||||||
}
|
}
|
||||||
],
|
],
|
||||||
% 1. Start transfers, send all the segments but don't trigger completion.
|
% 1. Start transfers, send all the segments but don't trigger completion.
|
||||||
_ = emqx_misc:pmap(fun(Transfer) -> start_transfer(Storage, Transfer) end, Transfers),
|
_ = emqx_utils:pmap(fun(Transfer) -> start_transfer(Storage, Transfer) end, Transfers),
|
||||||
% 2. Enable periodic GC every 0.5 seconds.
|
% 2. Enable periodic GC every 0.5 seconds.
|
||||||
ok = set_gc_config(interval, 500),
|
ok = set_gc_config(interval, 500),
|
||||||
ok = emqx_ft_storage_fs_gc:reset(Storage),
|
ok = emqx_ft_storage_fs_gc:reset(Storage),
|
||||||
|
|
|
@ -82,7 +82,7 @@ upload_file(ClientId, FileId, Name, Data, Node) ->
|
||||||
expire_at => erlang:system_time(_Unit = second) + 3600,
|
expire_at => erlang:system_time(_Unit = second) + 3600,
|
||||||
size => Size
|
size => Size
|
||||||
},
|
},
|
||||||
MetaPayload = emqx_json:encode(emqx_ft:encode_filemeta(Meta)),
|
MetaPayload = emqx_utils_json:encode(emqx_ft:encode_filemeta(Meta)),
|
||||||
|
|
||||||
ct:pal("MetaPayload = ~ts", [MetaPayload]),
|
ct:pal("MetaPayload = ~ts", [MetaPayload]),
|
||||||
|
|
||||||
|
|
|
@ -351,7 +351,7 @@ http_config(
|
||||||
].
|
].
|
||||||
|
|
||||||
maybe_ipv6_probe(TransportOpts, true) ->
|
maybe_ipv6_probe(TransportOpts, true) ->
|
||||||
emqx_misc:ipv6_probe(TransportOpts);
|
emqx_utils:ipv6_probe(TransportOpts);
|
||||||
maybe_ipv6_probe(TransportOpts, false) ->
|
maybe_ipv6_probe(TransportOpts, false) ->
|
||||||
TransportOpts.
|
TransportOpts.
|
||||||
|
|
||||||
|
|
|
@ -159,7 +159,7 @@ profile_config(Config) ->
|
||||||
?config(bucket, Config),
|
?config(bucket, Config),
|
||||||
ProfileConfig0
|
ProfileConfig0
|
||||||
),
|
),
|
||||||
ProfileConfig2 = emqx_map_lib:deep_put(
|
ProfileConfig2 = emqx_utils_maps:deep_put(
|
||||||
[transport_options, pool_type],
|
[transport_options, pool_type],
|
||||||
ProfileConfig1,
|
ProfileConfig1,
|
||||||
?config(pool_type, Config)
|
?config(pool_type, Config)
|
||||||
|
|
|
@ -62,7 +62,7 @@ t_regular_outdated_pool_cleanup(Config) ->
|
||||||
[OldPool] = emqx_s3_profile_http_pools:all(profile_id()),
|
[OldPool] = emqx_s3_profile_http_pools:all(profile_id()),
|
||||||
|
|
||||||
ProfileBaseConfig = ?config(profile_config, Config),
|
ProfileBaseConfig = ?config(profile_config, Config),
|
||||||
ProfileConfig = emqx_map_lib:deep_put(
|
ProfileConfig = emqx_utils_maps:deep_put(
|
||||||
[transport_options, pool_size], ProfileBaseConfig, 16
|
[transport_options, pool_size], ProfileBaseConfig, 16
|
||||||
),
|
),
|
||||||
ok = emqx_s3:update_profile(profile_id(), ProfileConfig),
|
ok = emqx_s3:update_profile(profile_id(), ProfileConfig),
|
||||||
|
@ -110,7 +110,7 @@ t_timeout_pool_cleanup(Config) ->
|
||||||
|
|
||||||
[OldPool] = emqx_s3_profile_http_pools:all(profile_id()),
|
[OldPool] = emqx_s3_profile_http_pools:all(profile_id()),
|
||||||
|
|
||||||
NewProfileConfig = emqx_map_lib:deep_put(
|
NewProfileConfig = emqx_utils_maps:deep_put(
|
||||||
[transport_options, pool_size], ProfileConfig, 16
|
[transport_options, pool_size], ProfileConfig, 16
|
||||||
),
|
),
|
||||||
|
|
||||||
|
@ -153,7 +153,7 @@ t_httpc_pool_update_error(Config) ->
|
||||||
meck:expect(ehttpc_pool, init, fun(_) -> meck:raise(error, badarg) end),
|
meck:expect(ehttpc_pool, init, fun(_) -> meck:raise(error, badarg) end),
|
||||||
|
|
||||||
ProfileBaseConfig = ?config(profile_config, Config),
|
ProfileBaseConfig = ?config(profile_config, Config),
|
||||||
NewProfileConfig = emqx_map_lib:deep_put(
|
NewProfileConfig = emqx_utils_maps:deep_put(
|
||||||
[transport_options, pool_size], ProfileBaseConfig, 16
|
[transport_options, pool_size], ProfileBaseConfig, 16
|
||||||
),
|
),
|
||||||
|
|
||||||
|
@ -237,7 +237,7 @@ t_checkout_client(Config) ->
|
||||||
%% Now change config for the profile
|
%% Now change config for the profile
|
||||||
ProfileBaseConfig = ?config(profile_config, Config),
|
ProfileBaseConfig = ?config(profile_config, Config),
|
||||||
NewProfileConfig0 = ProfileBaseConfig#{bucket => <<"new_bucket">>},
|
NewProfileConfig0 = ProfileBaseConfig#{bucket => <<"new_bucket">>},
|
||||||
NewProfileConfig1 = emqx_map_lib:deep_put(
|
NewProfileConfig1 = emqx_utils_maps:deep_put(
|
||||||
[transport_options, pool_size], NewProfileConfig0, 16
|
[transport_options, pool_size], NewProfileConfig0, 16
|
||||||
),
|
),
|
||||||
ok = emqx_s3:update_profile(profile_id(), NewProfileConfig1),
|
ok = emqx_s3:update_profile(profile_id(), NewProfileConfig1),
|
||||||
|
|
|
@ -526,7 +526,7 @@ t_tls_error(Config) ->
|
||||||
_ = process_flag(trap_exit, true),
|
_ = process_flag(trap_exit, true),
|
||||||
|
|
||||||
ProfileBaseConfig = ?config(profile_config, Config),
|
ProfileBaseConfig = ?config(profile_config, Config),
|
||||||
ProfileConfig = emqx_map_lib:deep_put(
|
ProfileConfig = emqx_utils_maps:deep_put(
|
||||||
[transport_options, ssl, server_name_indication], ProfileBaseConfig, "invalid-hostname"
|
[transport_options, ssl, server_name_indication], ProfileBaseConfig, "invalid-hostname"
|
||||||
),
|
),
|
||||||
ok = emqx_s3:update_profile(profile_id(), ProfileConfig),
|
ok = emqx_s3:update_profile(profile_id(), ProfileConfig),
|
||||||
|
|
Loading…
Reference in New Issue