diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index ed2baec8f..6b96e5150 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -17,6 +17,7 @@ -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). +-include_lib("emqx/include/logger.hrl"). -import(hoconsc, [mk/2, ref/2]). @@ -140,11 +141,7 @@ fields(bridges) -> #{ desc => ?DESC("bridges_webhook"), required => false, - converter => fun(X, _HoconOpts) -> - emqx_bridge_compatible_config:upgrade_pre_ee( - X, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1 - ) - end + converter => fun webhook_bridge_converter/2 } )}, {mqtt, @@ -212,3 +209,55 @@ status() -> node_name() -> {"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}. + +webhook_bridge_converter(Conf0, _HoconOpts) -> + Conf1 = emqx_bridge_compatible_config:upgrade_pre_ee( + Conf0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1 + ), + case Conf1 of + undefined -> + undefined; + _ -> + do_convert_webhook_config(Conf1) + end. + +do_convert_webhook_config( + #{<<"request_timeout">> := ReqT, <<"resource_opts">> := #{<<"request_timeout">> := ReqT}} = Conf +) -> + %% ok: same values + Conf; +do_convert_webhook_config( + #{ + <<"request_timeout">> := ReqTRootRaw, + <<"resource_opts">> := #{<<"request_timeout">> := ReqTResourceRaw} + } = Conf0 +) -> + %% different values; we set them to the same, if they are valid + %% durations + MReqTRoot = emqx_schema:to_duration_ms(ReqTRootRaw), + MReqTResource = emqx_schema:to_duration_ms(ReqTResourceRaw), + case {MReqTRoot, MReqTResource} of + {{ok, ReqTRoot}, {ok, ReqTResource}} -> + {_Parsed, ReqTRaw} = max({ReqTRoot, ReqTRootRaw}, {ReqTResource, ReqTResourceRaw}), + ?SLOG( + debug, + #{ + msg => adjusting_webhook_bridge_request_time, + new_value => ReqTRaw + } + ), + Conf1 = emqx_map_lib:deep_merge( + Conf0, + #{ + <<"request_timeout">> => ReqTRaw, + <<"resource_opts">> => #{<<"request_timeout">> => ReqTRaw} + } + ), + Conf1; + _ -> + %% invalid values; let the type checker complain about + %% that. + Conf0 + end; +do_convert_webhook_config(Conf) -> + Conf. diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 5f863ed63..d242111dc 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -818,6 +818,35 @@ t_metrics(Config) -> ), ok. +%% request_timeout in bridge root should match request_timeout in +%% resource_opts. +t_inconsistent_webhook_request_timeouts(Config) -> + Port = ?config(port, Config), + URL1 = ?URL(Port, "path1"), + Name = ?BRIDGE_NAME, + BadBridgeParams = + emqx_map_lib:deep_merge( + ?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name), + #{ + <<"request_timeout">> => <<"1s">>, + <<"resource_opts">> => #{<<"request_timeout">> => <<"2s">>} + } + ), + {ok, 201, RawResponse} = request( + post, + uri(["bridges"]), + BadBridgeParams + ), + %% note: same value on both fields + ?assertMatch( + #{ + <<"request_timeout">> := <<"2s">>, + <<"resource_opts">> := #{<<"request_timeout">> := <<"2s">>} + }, + emqx_json:decode(RawResponse, [return_maps]) + ), + ok. + operation_path(node, Oper, BridgeID) -> uri(["nodes", node(), "bridges", BridgeID, Oper]); operation_path(cluster, Oper, BridgeID) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl index 5e0b4912f..acafb84ca 100644 --- a/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl +++ b/apps/emqx_bridge/test/emqx_bridge_compatible_config_tests.erl @@ -28,6 +28,7 @@ empty_config_test() -> webhook_config_test() -> Conf1 = parse(webhook_v5011_hocon()), Conf2 = parse(full_webhook_v5011_hocon()), + Conf3 = parse(full_webhook_v5019_hocon()), ?assertMatch( #{ @@ -59,6 +60,26 @@ webhook_config_test() -> check(Conf2) ), + %% the converter should pick the greater of the two + %% request_timeouts and place them in the root and inside + %% resource_opts. + ?assertMatch( + #{ + <<"bridges">> := #{ + <<"webhook">> := #{ + <<"the_name">> := + #{ + <<"method">> := get, + <<"request_timeout">> := 60_000, + <<"resource_opts">> := #{<<"request_timeout">> := 60_000}, + <<"body">> := <<"${payload}">> + } + } + } + }, + check(Conf3) + ), + ok. up(#{<<"bridges">> := Bridges0} = Conf0) -> @@ -124,7 +145,7 @@ bridges{ max_retries = 3 method = \"get\" pool_size = 4 - request_timeout = \"5s\" + request_timeout = \"15s\" ssl {enable = false, verify = \"verify_peer\"} url = \"http://localhost:8080\" } @@ -164,6 +185,41 @@ full_webhook_v5011_hocon() -> "}\n" "". +%% does not contain direction +full_webhook_v5019_hocon() -> + "" + "\n" + "bridges{\n" + " webhook {\n" + " the_name{\n" + " body = \"${payload}\"\n" + " connect_timeout = \"5s\"\n" + " enable_pipelining = 100\n" + " headers {\"content-type\" = \"application/json\"}\n" + " max_retries = 3\n" + " method = \"get\"\n" + " pool_size = 4\n" + " pool_type = \"random\"\n" + " request_timeout = \"1m\"\n" + " resource_opts = {\n" + " request_timeout = \"7s\"\n" + " }\n" + " ssl {\n" + " ciphers = \"\"\n" + " depth = 10\n" + " enable = false\n" + " reuse_sessions = true\n" + " secure_renegotiate = true\n" + " user_lookup_fun = \"emqx_tls_psk:lookup\"\n" + " verify = \"verify_peer\"\n" + " versions = [\"tlsv1.3\", \"tlsv1.2\", \"tlsv1.1\", \"tlsv1\"]\n" + " }\n" + " url = \"http://localhost:8080\"\n" + " }\n" + " }\n" + "}\n" + "". + %% erlfmt-ignore %% this is a generated from v5.0.11 mqtt_v5011_hocon() -> diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 0f65b21f4..58a97b5fe 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1544,7 +1544,7 @@ ensure_flush_timer(Data = #{tref := undefined}, 0) -> %% if the batch_time is 0, we don't need to start a timer, which %% can be costly at high rates. Ref = make_ref(), - self() ! {flush, {Ref, Ref}}, + self() ! {flush, Ref}, Data#{tref => {Ref, Ref}}; ensure_flush_timer(Data = #{tref := undefined}, T) -> Ref = make_ref(),