feat: update wehbook's request_timeout into resource_opts

This commit is contained in:
某文 2023-05-16 15:32:43 +08:00
parent ba72695f04
commit 7d7c069257
4 changed files with 42 additions and 63 deletions

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_bridge, [ {application, emqx_bridge, [
{description, "EMQX bridges"}, {description, "EMQX bridges"},
{vsn, "0.1.18"}, {vsn, "0.1.19"},
{registered, [emqx_bridge_sup]}, {registered, [emqx_bridge_sup]},
{mod, {emqx_bridge_app, []}}, {mod, {emqx_bridge_app, []}},
{applications, [ {applications, [

View File

@ -165,20 +165,20 @@ create(BridgeId, Conf) ->
create(Type, Name, Conf) -> create(Type, Name, Conf) ->
create(Type, Name, Conf, #{}). create(Type, Name, Conf, #{}).
create(Type, Name, Conf, Opts0) -> create(Type, Name, Conf, Opts) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "create bridge", msg => "create bridge",
type => Type, type => Type,
name => Name, name => Name,
config => emqx_utils:redact(Conf) config => emqx_utils:redact(Conf)
}), }),
Opts = override_start_after_created(Conf, Opts0), TypeBin = bin(Type),
{ok, _Data} = emqx_resource:create_local( {ok, _Data} = emqx_resource:create_local(
resource_id(Type, Name), resource_id(Type, Name),
<<"emqx_bridge">>, <<"emqx_bridge">>,
bridge_to_resource_type(Type), bridge_to_resource_type(Type),
parse_confs(bin(Type), Name, Conf), parse_confs(TypeBin, Name, Conf),
Opts parse_opts(TypeBin, Conf, Opts)
), ),
ok. ok.
@ -189,7 +189,7 @@ update(BridgeId, {OldConf, Conf}) ->
update(Type, Name, {OldConf, Conf}) -> update(Type, Name, {OldConf, Conf}) ->
update(Type, Name, {OldConf, Conf}, #{}). update(Type, Name, {OldConf, Conf}, #{}).
update(Type, Name, {OldConf, Conf}, Opts0) -> update(Type, Name, {OldConf, Conf}, Opts) ->
%% TODO: sometimes its not necessary to restart the bridge connection. %% TODO: sometimes its not necessary to restart the bridge connection.
%% %%
%% - if the connection related configs like `servers` is updated, we should restart/start %% - if the connection related configs like `servers` is updated, we should restart/start
@ -198,7 +198,6 @@ update(Type, Name, {OldConf, Conf}, Opts0) ->
%% the `method` or `headers` of a WebHook is changed, then the bridge can be updated %% the `method` or `headers` of a WebHook is changed, then the bridge can be updated
%% without restarting the bridge. %% without restarting the bridge.
%% %%
Opts = override_start_after_created(Conf, Opts0),
case emqx_utils_maps:if_only_to_toggle_enable(OldConf, Conf) of case emqx_utils_maps:if_only_to_toggle_enable(OldConf, Conf) of
false -> false ->
?SLOG(info, #{ ?SLOG(info, #{
@ -241,11 +240,12 @@ recreate(Type, Name, Conf) ->
recreate(Type, Name, Conf, #{}). recreate(Type, Name, Conf, #{}).
recreate(Type, Name, Conf, Opts) -> recreate(Type, Name, Conf, Opts) ->
TypeBin = bin(Type),
emqx_resource:recreate_local( emqx_resource:recreate_local(
resource_id(Type, Name), resource_id(Type, Name),
bridge_to_resource_type(Type), bridge_to_resource_type(Type),
parse_confs(bin(Type), Name, Conf), parse_confs(TypeBin, Name, Conf),
Opts parse_opts(TypeBin, Conf, Opts)
). ).
create_dry_run(Type, Conf0) -> create_dry_run(Type, Conf0) ->
@ -402,6 +402,16 @@ bin(Bin) when is_binary(Bin) -> Bin;
bin(Str) when is_list(Str) -> list_to_binary(Str); bin(Str) when is_list(Str) -> list_to_binary(Str);
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
parse_opts(Type, Conf, Opts0) ->
Opts1 = override_start_after_created(Conf, Opts0),
override_resource_request_timeout(Type, Conf, Opts1).
%% Put webhook's http request_timeout into the resource options
override_resource_request_timeout(<<"webhook">>, #{request_timeout := ReqTimeout}, Opts) ->
Opts#{request_timeout => ReqTimeout};
override_resource_request_timeout(_Type, _Conf, Opts) ->
Opts.
override_start_after_created(Config, Opts) -> override_start_after_created(Config, Opts) ->
Enabled = maps:get(enable, Config, true), Enabled = maps:get(enable, Config, true),
StartAfterCreated = Enabled andalso maps:get(start_after_created, Opts, Enabled), StartAfterCreated = Enabled andalso maps:get(start_after_created, Opts, Enabled),

View File

@ -223,51 +223,6 @@ node_name() ->
{"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}. {"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}.
webhook_bridge_converter(Conf0, _HoconOpts) -> webhook_bridge_converter(Conf0, _HoconOpts) ->
Conf1 = emqx_bridge_compatible_config:upgrade_pre_ee( emqx_bridge_compatible_config:upgrade_pre_ee(
Conf0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1 Conf0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1
), ).
case Conf1 of
undefined ->
undefined;
_ ->
maps:map(
fun(_Name, Conf) ->
do_convert_webhook_config(Conf)
end,
Conf1
)
end.
do_convert_webhook_config(
#{<<"request_timeout">> := ReqT, <<"resource_opts">> := #{<<"request_timeout">> := ReqT}} = Conf
) ->
%% ok: same values
Conf;
do_convert_webhook_config(
#{
<<"request_timeout">> := ReqTRootRaw,
<<"resource_opts">> := #{<<"request_timeout">> := ReqTResourceRaw}
} = Conf0
) ->
%% different values; we set them to the same, if they are valid
%% durations
MReqTRoot = emqx_schema:to_duration_ms(ReqTRootRaw),
MReqTResource = emqx_schema:to_duration_ms(ReqTResourceRaw),
case {MReqTRoot, MReqTResource} of
{{ok, ReqTRoot}, {ok, ReqTResource}} ->
{_Parsed, ReqTRaw} = max({ReqTRoot, ReqTRootRaw}, {ReqTResource, ReqTResourceRaw}),
Conf1 = emqx_utils_maps:deep_merge(
Conf0,
#{
<<"request_timeout">> => ReqTRaw,
<<"resource_opts">> => #{<<"request_timeout">> => ReqTRaw}
}
),
Conf1;
_ ->
%% invalid values; let the type checker complain about
%% that.
Conf0
end;
do_convert_webhook_config(Conf) ->
Conf.

View File

@ -40,12 +40,15 @@ fields("put") ->
fields("get") -> fields("get") ->
emqx_bridge_schema:status_fields() ++ fields("post"); emqx_bridge_schema:status_fields() ++ fields("post");
fields("creation_opts") -> fields("creation_opts") ->
lists:filter( [
fun({K, _V}) -> deprecated_request_timeout()
not lists:member(K, unsupported_opts()) | lists:filter(
end, fun({K, _V}) ->
emqx_resource_schema:fields("creation_opts") not lists:member(K, unsupported_opts())
). end,
emqx_resource_schema:fields("creation_opts")
)
].
desc("config") -> desc("config") ->
?DESC("desc_config"); ?DESC("desc_config");
@ -163,7 +166,8 @@ unsupported_opts() ->
[ [
enable_batch, enable_batch,
batch_size, batch_size,
batch_time batch_time,
request_timeout
]. ].
%%====================================================================================== %%======================================================================================
@ -190,3 +194,13 @@ name_field() ->
method() -> method() ->
enum([post, put, get, delete]). enum([post, put, get, delete]).
deprecated_request_timeout() ->
{request_timeout,
mk(
hoconsc:union([infinity, emqx_schema:duration_ms()]),
#{
default => <<"15s">>,
deprecated => {since, "5.0.26"}
}
)}.