Merge pull request #10713 from zhongwencool/put-webhook-request-timeout-into-resource-opts
feat: update wehbook's request_timeout into resource_opts
This commit is contained in:
commit
ea8ac877b0
|
@ -1,7 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{application, emqx_bridge, [
|
{application, emqx_bridge, [
|
||||||
{description, "EMQX bridges"},
|
{description, "EMQX bridges"},
|
||||||
{vsn, "0.1.18"},
|
{vsn, "0.1.19"},
|
||||||
{registered, [emqx_bridge_sup]},
|
{registered, [emqx_bridge_sup]},
|
||||||
{mod, {emqx_bridge_app, []}},
|
{mod, {emqx_bridge_app, []}},
|
||||||
{applications, [
|
{applications, [
|
||||||
|
|
|
@ -892,11 +892,18 @@ fill_defaults(Type, RawConf) ->
|
||||||
pack_bridge_conf(Type, RawConf) ->
|
pack_bridge_conf(Type, RawConf) ->
|
||||||
#{<<"bridges">> => #{bin(Type) => #{<<"foo">> => RawConf}}}.
|
#{<<"bridges">> => #{bin(Type) => #{<<"foo">> => RawConf}}}.
|
||||||
|
|
||||||
unpack_bridge_conf(Type, PackedConf) ->
|
%% Hide webhook's resource_opts.request_timeout from user.
|
||||||
#{<<"bridges">> := Bridges} = PackedConf,
|
filter_raw_conf(<<"webhook">>, RawConf0) ->
|
||||||
#{<<"foo">> := RawConf} = maps:get(bin(Type), Bridges),
|
emqx_utils_maps:deep_remove([<<"resource_opts">>, <<"request_timeout">>], RawConf0);
|
||||||
|
filter_raw_conf(_TypeBin, RawConf) ->
|
||||||
RawConf.
|
RawConf.
|
||||||
|
|
||||||
|
unpack_bridge_conf(Type, PackedConf) ->
|
||||||
|
TypeBin = bin(Type),
|
||||||
|
#{<<"bridges">> := Bridges} = PackedConf,
|
||||||
|
#{<<"foo">> := RawConf} = maps:get(TypeBin, Bridges),
|
||||||
|
filter_raw_conf(TypeBin, RawConf).
|
||||||
|
|
||||||
is_ok(ok) ->
|
is_ok(ok) ->
|
||||||
ok;
|
ok;
|
||||||
is_ok(OkResult = {ok, _}) ->
|
is_ok(OkResult = {ok, _}) ->
|
||||||
|
|
|
@ -165,20 +165,20 @@ create(BridgeId, Conf) ->
|
||||||
create(Type, Name, Conf) ->
|
create(Type, Name, Conf) ->
|
||||||
create(Type, Name, Conf, #{}).
|
create(Type, Name, Conf, #{}).
|
||||||
|
|
||||||
create(Type, Name, Conf, Opts0) ->
|
create(Type, Name, Conf, Opts) ->
|
||||||
?SLOG(info, #{
|
?SLOG(info, #{
|
||||||
msg => "create bridge",
|
msg => "create bridge",
|
||||||
type => Type,
|
type => Type,
|
||||||
name => Name,
|
name => Name,
|
||||||
config => emqx_utils:redact(Conf)
|
config => emqx_utils:redact(Conf)
|
||||||
}),
|
}),
|
||||||
Opts = override_start_after_created(Conf, Opts0),
|
TypeBin = bin(Type),
|
||||||
{ok, _Data} = emqx_resource:create_local(
|
{ok, _Data} = emqx_resource:create_local(
|
||||||
resource_id(Type, Name),
|
resource_id(Type, Name),
|
||||||
<<"emqx_bridge">>,
|
<<"emqx_bridge">>,
|
||||||
bridge_to_resource_type(Type),
|
bridge_to_resource_type(Type),
|
||||||
parse_confs(bin(Type), Name, Conf),
|
parse_confs(TypeBin, Name, Conf),
|
||||||
Opts
|
parse_opts(Conf, Opts)
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
@ -189,7 +189,7 @@ update(BridgeId, {OldConf, Conf}) ->
|
||||||
update(Type, Name, {OldConf, Conf}) ->
|
update(Type, Name, {OldConf, Conf}) ->
|
||||||
update(Type, Name, {OldConf, Conf}, #{}).
|
update(Type, Name, {OldConf, Conf}, #{}).
|
||||||
|
|
||||||
update(Type, Name, {OldConf, Conf}, Opts0) ->
|
update(Type, Name, {OldConf, Conf}, Opts) ->
|
||||||
%% TODO: sometimes its not necessary to restart the bridge connection.
|
%% TODO: sometimes its not necessary to restart the bridge connection.
|
||||||
%%
|
%%
|
||||||
%% - if the connection related configs like `servers` is updated, we should restart/start
|
%% - if the connection related configs like `servers` is updated, we should restart/start
|
||||||
|
@ -198,7 +198,6 @@ update(Type, Name, {OldConf, Conf}, Opts0) ->
|
||||||
%% the `method` or `headers` of a WebHook is changed, then the bridge can be updated
|
%% the `method` or `headers` of a WebHook is changed, then the bridge can be updated
|
||||||
%% without restarting the bridge.
|
%% without restarting the bridge.
|
||||||
%%
|
%%
|
||||||
Opts = override_start_after_created(Conf, Opts0),
|
|
||||||
case emqx_utils_maps:if_only_to_toggle_enable(OldConf, Conf) of
|
case emqx_utils_maps:if_only_to_toggle_enable(OldConf, Conf) of
|
||||||
false ->
|
false ->
|
||||||
?SLOG(info, #{
|
?SLOG(info, #{
|
||||||
|
@ -241,11 +240,12 @@ recreate(Type, Name, Conf) ->
|
||||||
recreate(Type, Name, Conf, #{}).
|
recreate(Type, Name, Conf, #{}).
|
||||||
|
|
||||||
recreate(Type, Name, Conf, Opts) ->
|
recreate(Type, Name, Conf, Opts) ->
|
||||||
|
TypeBin = bin(Type),
|
||||||
emqx_resource:recreate_local(
|
emqx_resource:recreate_local(
|
||||||
resource_id(Type, Name),
|
resource_id(Type, Name),
|
||||||
bridge_to_resource_type(Type),
|
bridge_to_resource_type(Type),
|
||||||
parse_confs(bin(Type), Name, Conf),
|
parse_confs(TypeBin, Name, Conf),
|
||||||
Opts
|
parse_opts(Conf, Opts)
|
||||||
).
|
).
|
||||||
|
|
||||||
create_dry_run(Type, Conf0) ->
|
create_dry_run(Type, Conf0) ->
|
||||||
|
@ -402,6 +402,9 @@ bin(Bin) when is_binary(Bin) -> Bin;
|
||||||
bin(Str) when is_list(Str) -> list_to_binary(Str);
|
bin(Str) when is_list(Str) -> list_to_binary(Str);
|
||||||
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
|
bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8).
|
||||||
|
|
||||||
|
parse_opts(Conf, Opts0) ->
|
||||||
|
override_start_after_created(Conf, Opts0).
|
||||||
|
|
||||||
override_start_after_created(Config, Opts) ->
|
override_start_after_created(Config, Opts) ->
|
||||||
Enabled = maps:get(enable, Config, true),
|
Enabled = maps:get(enable, Config, true),
|
||||||
StartAfterCreated = Enabled andalso maps:get(start_after_created, Opts, Enabled),
|
StartAfterCreated = Enabled andalso maps:get(start_after_created, Opts, Enabled),
|
||||||
|
|
|
@ -238,36 +238,10 @@ webhook_bridge_converter(Conf0, _HoconOpts) ->
|
||||||
)
|
)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% We hide resource_opts.request_timeout from user.
|
||||||
do_convert_webhook_config(
|
do_convert_webhook_config(
|
||||||
#{<<"request_timeout">> := ReqT, <<"resource_opts">> := #{<<"request_timeout">> := ReqT}} = Conf
|
#{<<"request_timeout">> := ReqT, <<"resource_opts">> := ResOpts} = Conf
|
||||||
) ->
|
) ->
|
||||||
%% ok: same values
|
Conf#{<<"resource_opts">> => ResOpts#{<<"request_timeout">> => ReqT}};
|
||||||
Conf;
|
|
||||||
do_convert_webhook_config(
|
|
||||||
#{
|
|
||||||
<<"request_timeout">> := ReqTRootRaw,
|
|
||||||
<<"resource_opts">> := #{<<"request_timeout">> := ReqTResourceRaw}
|
|
||||||
} = Conf0
|
|
||||||
) ->
|
|
||||||
%% different values; we set them to the same, if they are valid
|
|
||||||
%% durations
|
|
||||||
MReqTRoot = emqx_schema:to_duration_ms(ReqTRootRaw),
|
|
||||||
MReqTResource = emqx_schema:to_duration_ms(ReqTResourceRaw),
|
|
||||||
case {MReqTRoot, MReqTResource} of
|
|
||||||
{{ok, ReqTRoot}, {ok, ReqTResource}} ->
|
|
||||||
{_Parsed, ReqTRaw} = max({ReqTRoot, ReqTRootRaw}, {ReqTResource, ReqTResourceRaw}),
|
|
||||||
Conf1 = emqx_utils_maps:deep_merge(
|
|
||||||
Conf0,
|
|
||||||
#{
|
|
||||||
<<"request_timeout">> => ReqTRaw,
|
|
||||||
<<"resource_opts">> => #{<<"request_timeout">> => ReqTRaw}
|
|
||||||
}
|
|
||||||
),
|
|
||||||
Conf1;
|
|
||||||
_ ->
|
|
||||||
%% invalid values; let the type checker complain about
|
|
||||||
%% that.
|
|
||||||
Conf0
|
|
||||||
end;
|
|
||||||
do_convert_webhook_config(Conf) ->
|
do_convert_webhook_config(Conf) ->
|
||||||
Conf.
|
Conf.
|
||||||
|
|
|
@ -40,12 +40,15 @@ fields("put") ->
|
||||||
fields("get") ->
|
fields("get") ->
|
||||||
emqx_bridge_schema:status_fields() ++ fields("post");
|
emqx_bridge_schema:status_fields() ++ fields("post");
|
||||||
fields("creation_opts") ->
|
fields("creation_opts") ->
|
||||||
lists:filter(
|
[
|
||||||
fun({K, _V}) ->
|
hidden_request_timeout()
|
||||||
not lists:member(K, unsupported_opts())
|
| lists:filter(
|
||||||
end,
|
fun({K, _V}) ->
|
||||||
emqx_resource_schema:fields("creation_opts")
|
not lists:member(K, unsupported_opts())
|
||||||
).
|
end,
|
||||||
|
emqx_resource_schema:fields("creation_opts")
|
||||||
|
)
|
||||||
|
].
|
||||||
|
|
||||||
desc("config") ->
|
desc("config") ->
|
||||||
?DESC("desc_config");
|
?DESC("desc_config");
|
||||||
|
@ -163,7 +166,8 @@ unsupported_opts() ->
|
||||||
[
|
[
|
||||||
enable_batch,
|
enable_batch,
|
||||||
batch_size,
|
batch_size,
|
||||||
batch_time
|
batch_time,
|
||||||
|
request_timeout
|
||||||
].
|
].
|
||||||
|
|
||||||
%%======================================================================================
|
%%======================================================================================
|
||||||
|
@ -190,3 +194,13 @@ name_field() ->
|
||||||
|
|
||||||
method() ->
|
method() ->
|
||||||
enum([post, put, get, delete]).
|
enum([post, put, get, delete]).
|
||||||
|
|
||||||
|
hidden_request_timeout() ->
|
||||||
|
{request_timeout,
|
||||||
|
mk(
|
||||||
|
hoconsc:union([infinity, emqx_schema:duration_ms()]),
|
||||||
|
#{
|
||||||
|
required => false,
|
||||||
|
importance => ?IMPORTANCE_HIDDEN
|
||||||
|
}
|
||||||
|
)}.
|
||||||
|
|
|
@ -1284,21 +1284,43 @@ t_inconsistent_webhook_request_timeouts(Config) ->
|
||||||
<<"resource_opts">> => #{<<"request_timeout">> => <<"2s">>}
|
<<"resource_opts">> => #{<<"request_timeout">> => <<"2s">>}
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
?assertMatch(
|
{ok, 201, #{
|
||||||
{ok, 201, #{
|
<<"request_timeout">> := <<"1s">>,
|
||||||
%% note: same value on both fields
|
<<"resource_opts">> := ResourceOpts
|
||||||
<<"request_timeout">> := <<"2s">>,
|
}} =
|
||||||
<<"resource_opts">> := #{<<"request_timeout">> := <<"2s">>}
|
|
||||||
}},
|
|
||||||
request_json(
|
request_json(
|
||||||
post,
|
post,
|
||||||
uri(["bridges"]),
|
uri(["bridges"]),
|
||||||
BadBridgeParams,
|
BadBridgeParams,
|
||||||
Config
|
Config
|
||||||
)
|
),
|
||||||
),
|
?assertNot(maps:is_key(<<"request_timeout">>, ResourceOpts)),
|
||||||
|
validate_resource_request_timeout(proplists:get_value(group, Config), 1000, Name),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
validate_resource_request_timeout(single, Timeout, Name) ->
|
||||||
|
SentData = #{payload => <<"Hello EMQX">>, timestamp => 1668602148000},
|
||||||
|
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
|
||||||
|
ResId = emqx_bridge_resource:resource_id(<<"webhook">>, Name),
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
{ok, Res} =
|
||||||
|
?wait_async_action(
|
||||||
|
emqx_bridge:send_message(BridgeID, SentData),
|
||||||
|
#{?snk_kind := async_query},
|
||||||
|
1000
|
||||||
|
),
|
||||||
|
?assertMatch({ok, #{id := ResId, query_opts := #{timeout := Timeout}}}, Res)
|
||||||
|
end,
|
||||||
|
fun(Trace0) ->
|
||||||
|
Trace = ?of_kind(async_query, Trace0),
|
||||||
|
?assertMatch([#{query_opts := #{timeout := Timeout}}], Trace),
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
);
|
||||||
|
validate_resource_request_timeout(_Cluster, _Timeout, _Name) ->
|
||||||
|
ignore.
|
||||||
|
|
||||||
%%
|
%%
|
||||||
|
|
||||||
request(Method, URL, Config) ->
|
request(Method, URL, Config) ->
|
||||||
|
|
|
@ -59,27 +59,21 @@ webhook_config_test() ->
|
||||||
},
|
},
|
||||||
check(Conf2)
|
check(Conf2)
|
||||||
),
|
),
|
||||||
|
#{
|
||||||
%% the converter should pick the greater of the two
|
<<"bridges">> := #{
|
||||||
%% request_timeouts and place them in the root and inside
|
<<"webhook">> := #{
|
||||||
%% resource_opts.
|
<<"the_name">> :=
|
||||||
?assertMatch(
|
#{
|
||||||
#{
|
<<"method">> := get,
|
||||||
<<"bridges">> := #{
|
<<"request_timeout">> := RequestTime,
|
||||||
<<"webhook">> := #{
|
<<"resource_opts">> := ResourceOpts,
|
||||||
<<"the_name">> :=
|
<<"body">> := <<"${payload}">>
|
||||||
#{
|
}
|
||||||
<<"method">> := get,
|
|
||||||
<<"request_timeout">> := 60_000,
|
|
||||||
<<"resource_opts">> := #{<<"request_timeout">> := 60_000},
|
|
||||||
<<"body">> := <<"${payload}">>
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
},
|
}
|
||||||
check(Conf3)
|
} = check(Conf3),
|
||||||
),
|
?assertEqual(60_000, RequestTime),
|
||||||
|
?assertMatch(#{<<"request_timeout">> := 60_000}, ResourceOpts),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
up(#{<<"bridges">> := Bridges0} = Conf0) ->
|
up(#{<<"bridges">> := Bridges0} = Conf0) ->
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
We hide the request_timeout in resource_option of the webhook to keep it consistent with the http request_timeout of the webhook.
|
||||||
|
From now on, when configuring a webhook through API or configuration files,
|
||||||
|
it is no longer necessary to configure the request_timeout of the resource. Only configuring the http request_timeout is sufficient, and the request_timeout in the resource will automatically be consistent with the http request_timeout.
|
Loading…
Reference in New Issue