diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index e408250be..d2bf0f0c2 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge, [ {description, "EMQX bridges"}, - {vsn, "0.1.18"}, + {vsn, "0.1.19"}, {registered, [emqx_bridge_sup]}, {mod, {emqx_bridge_app, []}}, {applications, [ diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index 0d2feef83..a756f535e 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -165,20 +165,20 @@ create(BridgeId, Conf) -> create(Type, Name, Conf) -> create(Type, Name, Conf, #{}). -create(Type, Name, Conf, Opts0) -> +create(Type, Name, Conf, Opts) -> ?SLOG(info, #{ msg => "create bridge", type => Type, name => Name, config => emqx_utils:redact(Conf) }), - Opts = override_start_after_created(Conf, Opts0), + TypeBin = bin(Type), {ok, _Data} = emqx_resource:create_local( resource_id(Type, Name), <<"emqx_bridge">>, bridge_to_resource_type(Type), - parse_confs(bin(Type), Name, Conf), - Opts + parse_confs(TypeBin, Name, Conf), + parse_opts(TypeBin, Conf, Opts) ), ok. @@ -189,7 +189,7 @@ update(BridgeId, {OldConf, Conf}) -> update(Type, Name, {OldConf, Conf}) -> update(Type, Name, {OldConf, Conf}, #{}). -update(Type, Name, {OldConf, Conf}, Opts0) -> +update(Type, Name, {OldConf, Conf}, Opts) -> %% TODO: sometimes its not necessary to restart the bridge connection. %% %% - if the connection related configs like `servers` is updated, we should restart/start @@ -198,7 +198,6 @@ update(Type, Name, {OldConf, Conf}, Opts0) -> %% the `method` or `headers` of a WebHook is changed, then the bridge can be updated %% without restarting the bridge. %% - Opts = override_start_after_created(Conf, Opts0), case emqx_utils_maps:if_only_to_toggle_enable(OldConf, Conf) of false -> ?SLOG(info, #{ @@ -241,11 +240,12 @@ recreate(Type, Name, Conf) -> recreate(Type, Name, Conf, #{}). recreate(Type, Name, Conf, Opts) -> + TypeBin = bin(Type), emqx_resource:recreate_local( resource_id(Type, Name), bridge_to_resource_type(Type), - parse_confs(bin(Type), Name, Conf), - Opts + parse_confs(TypeBin, Name, Conf), + parse_opts(TypeBin, Conf, Opts) ). create_dry_run(Type, Conf0) -> @@ -402,6 +402,16 @@ bin(Bin) when is_binary(Bin) -> Bin; bin(Str) when is_list(Str) -> list_to_binary(Str); bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8). +parse_opts(Type, Conf, Opts0) -> + Opts1 = override_start_after_created(Conf, Opts0), + override_resource_request_timeout(Type, Conf, Opts1). + +%% Put webhook's http request_timeout into the resource options +override_resource_request_timeout(<<"webhook">>, #{request_timeout := ReqTimeout}, Opts) -> + Opts#{request_timeout => ReqTimeout}; +override_resource_request_timeout(_Type, _Conf, Opts) -> + Opts. + override_start_after_created(Config, Opts) -> Enabled = maps:get(enable, Config, true), StartAfterCreated = Enabled andalso maps:get(start_after_created, Opts, Enabled), diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index f58805b6b..d1755bf73 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -223,51 +223,6 @@ node_name() -> {"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}. webhook_bridge_converter(Conf0, _HoconOpts) -> - Conf1 = emqx_bridge_compatible_config:upgrade_pre_ee( + emqx_bridge_compatible_config:upgrade_pre_ee( Conf0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1 - ), - case Conf1 of - undefined -> - undefined; - _ -> - maps:map( - fun(_Name, Conf) -> - do_convert_webhook_config(Conf) - end, - Conf1 - ) - end. - -do_convert_webhook_config( - #{<<"request_timeout">> := ReqT, <<"resource_opts">> := #{<<"request_timeout">> := ReqT}} = Conf -) -> - %% ok: same values - Conf; -do_convert_webhook_config( - #{ - <<"request_timeout">> := ReqTRootRaw, - <<"resource_opts">> := #{<<"request_timeout">> := ReqTResourceRaw} - } = Conf0 -) -> - %% different values; we set them to the same, if they are valid - %% durations - MReqTRoot = emqx_schema:to_duration_ms(ReqTRootRaw), - MReqTResource = emqx_schema:to_duration_ms(ReqTResourceRaw), - case {MReqTRoot, MReqTResource} of - {{ok, ReqTRoot}, {ok, ReqTResource}} -> - {_Parsed, ReqTRaw} = max({ReqTRoot, ReqTRootRaw}, {ReqTResource, ReqTResourceRaw}), - Conf1 = emqx_utils_maps:deep_merge( - Conf0, - #{ - <<"request_timeout">> => ReqTRaw, - <<"resource_opts">> => #{<<"request_timeout">> => ReqTRaw} - } - ), - Conf1; - _ -> - %% invalid values; let the type checker complain about - %% that. - Conf0 - end; -do_convert_webhook_config(Conf) -> - Conf. + ). diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl index 1540f77bf..83a3dba9b 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl @@ -40,12 +40,15 @@ fields("put") -> fields("get") -> emqx_bridge_schema:status_fields() ++ fields("post"); fields("creation_opts") -> - lists:filter( - fun({K, _V}) -> - not lists:member(K, unsupported_opts()) - end, - emqx_resource_schema:fields("creation_opts") - ). + [ + deprecated_request_timeout() + | lists:filter( + fun({K, _V}) -> + not lists:member(K, unsupported_opts()) + end, + emqx_resource_schema:fields("creation_opts") + ) + ]. desc("config") -> ?DESC("desc_config"); @@ -163,7 +166,8 @@ unsupported_opts() -> [ enable_batch, batch_size, - batch_time + batch_time, + request_timeout ]. %%====================================================================================== @@ -190,3 +194,13 @@ name_field() -> method() -> enum([post, put, get, delete]). + +deprecated_request_timeout() -> + {request_timeout, + mk( + hoconsc:union([infinity, emqx_schema:duration_ms()]), + #{ + default => <<"15s">>, + deprecated => {since, "5.0.26"} + } + )}.