Merge remote-tracking branch 'origin/master' into 0621-merge-release-51-to-master

This commit is contained in:
Zaiming (Stone) Shi 2023-06-21 16:08:41 +02:00
commit 5fa87091c4
79 changed files with 718 additions and 278 deletions

View File

@ -29,7 +29,7 @@
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.3"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.3"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.8"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.9"}}},
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}},
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
{recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},
@ -44,7 +44,7 @@
{meck, "0.9.2"}, {meck, "0.9.2"},
{proper, "1.4.0"}, {proper, "1.4.0"},
{bbmustache, "1.10.0"}, {bbmustache, "1.10.0"},
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.5"}}} {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.6"}}}
]}, ]},
{extra_src_dirs, [{"test", [recursive]}]} {extra_src_dirs, [{"test", [recursive]}]}
]} ]}

View File

@ -270,6 +270,9 @@ check(#mqtt_packet_subscribe{topic_filters = TopicFilters}) ->
try try
validate_topic_filters(TopicFilters) validate_topic_filters(TopicFilters)
catch catch
%% Known Specificed Reason Code
error:{error, RC} ->
{error, RC};
error:_Error -> error:_Error ->
{error, ?RC_TOPIC_FILTER_INVALID} {error, ?RC_TOPIC_FILTER_INVALID}
end; end;
@ -413,6 +416,10 @@ run_checks([Check | More], Packet, Options) ->
validate_topic_filters(TopicFilters) -> validate_topic_filters(TopicFilters) ->
lists:foreach( lists:foreach(
fun fun
%% Protocol Error and Should Disconnect
%% MQTT-5.0 [MQTT-3.8.3-4] and [MQTT-4.13.1-1]
({<<?SHARE, "/", _Rest/binary>>, #{nl := 1}}) ->
error({error, ?RC_PROTOCOL_ERROR});
({TopicFilter, _SubOpts}) -> ({TopicFilter, _SubOpts}) ->
emqx_topic:validate(TopicFilter); emqx_topic:validate(TopicFilter);
(TopicFilter) -> (TopicFilter) ->

View File

@ -1758,7 +1758,7 @@ base_listener(Bind) ->
)}, )},
{"bind", {"bind",
sc( sc(
hoconsc:union([ip_port(), integer()]), ip_port(),
#{ #{
default => Bind, default => Bind,
required => true, required => true,
@ -2418,13 +2418,13 @@ mk_duration(Desc, OverrideMeta) ->
to_duration(Str) -> to_duration(Str) ->
case hocon_postprocess:duration(Str) of case hocon_postprocess:duration(Str) of
I when is_integer(I) -> {ok, I}; I when is_integer(I) -> {ok, I};
_ -> {error, Str} _ -> to_integer(Str)
end. end.
to_duration_s(Str) -> to_duration_s(Str) ->
case hocon_postprocess:duration(Str) of case hocon_postprocess:duration(Str) of
I when is_number(I) -> {ok, ceiling(I / 1000)}; I when is_number(I) -> {ok, ceiling(I / 1000)};
_ -> {error, Str} _ -> to_integer(Str)
end. end.
-spec to_duration_ms(Input) -> {ok, integer()} | {error, Input} when -spec to_duration_ms(Input) -> {ok, integer()} | {error, Input} when
@ -2432,7 +2432,7 @@ to_duration_s(Str) ->
to_duration_ms(Str) -> to_duration_ms(Str) ->
case hocon_postprocess:duration(Str) of case hocon_postprocess:duration(Str) of
I when is_number(I) -> {ok, ceiling(I)}; I when is_number(I) -> {ok, ceiling(I)};
_ -> {error, Str} _ -> to_integer(Str)
end. end.
-spec to_timeout_duration(Input) -> {ok, timeout_duration()} | {error, Input} when -spec to_timeout_duration(Input) -> {ok, timeout_duration()} | {error, Input} when
@ -2473,7 +2473,7 @@ do_to_timeout_duration(Str, Fn, Max, Unit) ->
to_bytesize(Str) -> to_bytesize(Str) ->
case hocon_postprocess:bytesize(Str) of case hocon_postprocess:bytesize(Str) of
I when is_integer(I) -> {ok, I}; I when is_integer(I) -> {ok, I};
_ -> {error, Str} _ -> to_integer(Str)
end. end.
to_wordsize(Str) -> to_wordsize(Str) ->
@ -2483,6 +2483,13 @@ to_wordsize(Str) ->
Error -> Error Error -> Error
end. end.
to_integer(Str) ->
case string:to_integer(Str) of
{Int, []} -> {ok, Int};
{Int, <<>>} -> {ok, Int};
_ -> {error, Str}
end.
to_percent(Str) -> to_percent(Str) ->
{ok, hocon_postprocess:percent(Str)}. {ok, hocon_postprocess:percent(Str)}.
@ -2525,9 +2532,9 @@ to_ip_port(Str) ->
case split_ip_port(Str) of case split_ip_port(Str) of
{"", Port} -> {"", Port} ->
%% this is a local address %% this is a local address
{ok, list_to_integer(Port)}; {ok, parse_port(Port)};
{MaybeIp, Port} -> {MaybeIp, Port} ->
PortVal = list_to_integer(Port), PortVal = parse_port(Port),
case inet:parse_address(MaybeIp) of case inet:parse_address(MaybeIp) of
{ok, IpTuple} -> {ok, IpTuple} ->
{ok, {IpTuple, PortVal}}; {ok, {IpTuple, PortVal}};
@ -2543,18 +2550,11 @@ split_ip_port(Str0) ->
case lists:split(string:rchr(Str, $:), Str) of case lists:split(string:rchr(Str, $:), Str) of
%% no colon %% no colon
{[], Str} -> {[], Str} ->
try {"", Str};
%% if it's just a port number, then return as-is
_ = list_to_integer(Str),
{"", Str}
catch
_:_ ->
error
end;
{IpPlusColon, PortString} -> {IpPlusColon, PortString} ->
IpStr0 = lists:droplast(IpPlusColon), IpStr0 = lists:droplast(IpPlusColon),
case IpStr0 of case IpStr0 of
%% dropp head/tail brackets %% drop head/tail brackets
[$[ | S] -> [$[ | S] ->
case lists:last(S) of case lists:last(S) of
$] -> {lists:droplast(S), PortString}; $] -> {lists:droplast(S), PortString};

View File

@ -227,8 +227,11 @@ find_managed_files(Filter, Dir) ->
false -> false ->
Acc Acc
end; end;
(AbsPath, {error, enoent}, Acc) when AbsPath == Dir ->
Acc;
(AbsPath, {error, Reason}, Acc) -> (AbsPath, {error, Reason}, Acc) ->
?SLOG(notice, "filesystem_object_inaccessible", #{ ?SLOG(notice, #{
msg => "filesystem_object_inaccessible",
abspath => AbsPath, abspath => AbsPath,
reason => Reason reason => Reason
}), }),

View File

@ -654,10 +654,13 @@ ensure_quic_listener(Name, UdpPort, ExtraSettings) ->
%% Extras app starting handler. It is the second arg passed to emqx_common_test_helpers:start_apps/2 %% Extras app starting handler. It is the second arg passed to emqx_common_test_helpers:start_apps/2
env_handler => fun((AppName :: atom()) -> term()), env_handler => fun((AppName :: atom()) -> term()),
%% Application env preset before calling `emqx_common_test_helpers:start_apps/2` %% Application env preset before calling `emqx_common_test_helpers:start_apps/2`
env => {AppName :: atom(), Key :: atom(), Val :: term()}, env => [{AppName :: atom(), Key :: atom(), Val :: term()}],
%% Whether to execute `emqx_config:init_load(SchemaMod)` %% Whether to execute `emqx_config:init_load(SchemaMod)`
%% default: true %% default: true
load_schema => boolean(), load_schema => boolean(),
%% Which node in the cluster to join to.
%% default: first core node
join_to => node(),
%% If we want to exercise the scenario where a node joins an %% If we want to exercise the scenario where a node joins an
%% existing cluster where there has already been some %% existing cluster where there has already been some
%% configuration changes (via cluster rpc), then we need to enable %% configuration changes (via cluster rpc), then we need to enable
@ -692,28 +695,38 @@ emqx_cluster(Specs0, CommonOpts) ->
]), ]),
%% Set the default node of the cluster: %% Set the default node of the cluster:
CoreNodes = [node_name(Name) || {{core, Name, _}, _} <- Specs], CoreNodes = [node_name(Name) || {{core, Name, _}, _} <- Specs],
JoinTo0 = JoinTo =
case CoreNodes of case CoreNodes of
[First | _] -> First; [First | _] -> First;
_ -> undefined _ -> undefined
end, end,
JoinTo = NodeOpts = fun(Number) ->
case maps:find(join_to, CommonOpts) of #{
{ok, true} -> JoinTo0; base_port => base_port(Number),
{ok, JT} -> JT; env => [
error -> JoinTo0 {mria, core_nodes, CoreNodes},
end, {gen_rpc, client_config_per_node, {internal, GenRpcPorts}}
[ ]
{Name, }
merge_opts(Opts, #{ end,
base_port => base_port(Number), RoleOpts = fun
(core) ->
#{
join_to => JoinTo, join_to => JoinTo,
env => [ env => [
{mria, core_nodes, CoreNodes}, {mria, node_role, core}
{mria, node_role, Role},
{gen_rpc, client_config_per_node, {internal, GenRpcPorts}}
] ]
})} };
(replicant) ->
#{
env => [
{mria, node_role, replicant},
{ekka, cluster_discovery, {static, [{seeds, CoreNodes}]}}
]
}
end,
[
{Name, merge_opts(merge_opts(NodeOpts(Number), RoleOpts(Role)), Opts)}
|| {{Role, Name, Opts}, Number} <- Specs || {{Role, Name, Opts}, Number} <- Specs
]. ].

View File

@ -497,17 +497,24 @@ t_update_config(_Config) ->
emqx_config_handler:start_link(), emqx_config_handler:start_link(),
{ok, Pid} = emqx_crl_cache:start_link(), {ok, Pid} = emqx_crl_cache:start_link(),
Conf = #{ Conf = #{
refresh_interval => timer:minutes(5), refresh_interval => <<"5m">>,
http_timeout => timer:minutes(10), http_timeout => <<"10m">>,
capacity => 123 capacity => 123
}, },
?assertMatch({ok, _}, emqx:update_config([<<"crl_cache">>], Conf)), ?assertMatch({ok, _}, emqx:update_config([<<"crl_cache">>], Conf)),
State = sys:get_state(Pid), State = sys:get_state(Pid),
?assertEqual(Conf, #{ ?assertEqual(
refresh_interval => element(3, State), #{
http_timeout => element(4, State), refresh_interval => timer:minutes(5),
capacity => element(7, State) http_timeout => timer:minutes(10),
}), capacity => 123
},
#{
refresh_interval => element(3, State),
http_timeout => element(4, State),
capacity => element(7, State)
}
),
emqx_config:erase(<<"crl_cache">>), emqx_config:erase(<<"crl_cache">>),
emqx_config_handler:stop(), emqx_config_handler:stop(),
ok. ok.

View File

@ -33,9 +33,9 @@ init_per_suite(Config) ->
<<"enable">> => true, <<"enable">> => true,
<<"max_count">> => 3, <<"max_count">> => 3,
% 0.1s % 0.1s
<<"window_time">> => 100, <<"window_time">> => <<"100ms">>,
%% 2s %% 2s
<<"ban_time">> => "2s" <<"ban_time">> => <<"2s">>
} }
), ),
Config. Config.
@ -119,16 +119,16 @@ t_conf_update(_) ->
?assertEqual(Global#{window_time := 100}, emqx_flapping:get_policy(zone_1)), ?assertEqual(Global#{window_time := 100}, emqx_flapping:get_policy(zone_1)),
Zones = #{ Zones = #{
<<"zone_1">> => #{<<"flapping_detect">> => #{<<"window_time">> => 123}}, <<"zone_1">> => #{<<"flapping_detect">> => #{<<"window_time">> => <<"123s">>}},
<<"zone_2">> => #{<<"flapping_detect">> => #{<<"window_time">> => 456}} <<"zone_2">> => #{<<"flapping_detect">> => #{<<"window_time">> => <<"456s">>}}
}, },
?assertMatch({ok, _}, emqx:update_config([zones], Zones)), ?assertMatch({ok, _}, emqx:update_config([zones], Zones)),
%% new_zone is already deleted %% new_zone is already deleted
?assertError({config_not_found, _}, get_policy(new_zone)), ?assertError({config_not_found, _}, get_policy(new_zone)),
%% update zone(zone_1) has default. %% update zone(zone_1) has default.
?assertEqual(Global#{window_time := 123}, emqx_flapping:get_policy(zone_1)), ?assertEqual(Global#{window_time := 123000}, emqx_flapping:get_policy(zone_1)),
%% create zone(zone_2) has default %% create zone(zone_2) has default
?assertEqual(Global#{window_time := 456}, emqx_flapping:get_policy(zone_2)), ?assertEqual(Global#{window_time := 456000}, emqx_flapping:get_policy(zone_2)),
%% reset to default(empty) andalso get default from global %% reset to default(empty) andalso get default from global
?assertMatch({ok, _}, emqx:update_config([zones], #{})), ?assertMatch({ok, _}, emqx:update_config([zones], #{})),
?assertEqual(Global, emqx:get_config([zones, default, flapping_detect])), ?assertEqual(Global, emqx:get_config([zones, default, flapping_detect])),
@ -172,13 +172,13 @@ validate_timer(Lists) ->
ok. ok.
t_window_compatibility_check(_Conf) -> t_window_compatibility_check(_Conf) ->
Flapping = emqx:get_config([flapping_detect]), Flapping = emqx:get_raw_config([flapping_detect]),
ok = emqx_config:init_load(emqx_schema, <<"flapping_detect {window_time = disable}">>), ok = emqx_config:init_load(emqx_schema, <<"flapping_detect {window_time = disable}">>),
?assertMatch(#{window_time := 60000, enable := false}, emqx:get_config([flapping_detect])), ?assertMatch(#{window_time := 60000, enable := false}, emqx:get_config([flapping_detect])),
%% reset %% reset
FlappingBin = iolist_to_binary(["flapping_detect {", hocon_pp:do(Flapping, #{}), "}"]), FlappingBin = iolist_to_binary(["flapping_detect {", hocon_pp:do(Flapping, #{}), "}"]),
ok = emqx_config:init_load(emqx_schema, FlappingBin), ok = emqx_config:init_load(emqx_schema, FlappingBin),
?assertEqual(Flapping, emqx:get_config([flapping_detect])), ?assertEqual(Flapping, emqx:get_raw_config([flapping_detect])),
ok. ok.
get_policy(Zone) -> get_policy(Zone) ->

View File

@ -985,3 +985,18 @@ t_shared_subscriptions_client_terminates_when_qos_eq_2(Config) ->
?assertEqual(1, counters:get(CRef, 1)), ?assertEqual(1, counters:get(CRef, 1)),
process_flag(trap_exit, false). process_flag(trap_exit, false).
t_share_subscribe_no_local(Config) ->
ConnFun = ?config(conn_fun, Config),
process_flag(trap_exit, true),
ShareTopic = <<"$share/sharename/TopicA">>,
{ok, Client} = emqtt:start_link([{proto_ver, v5} | Config]),
{ok, _} = emqtt:ConnFun(Client),
%% MQTT-5.0 [MQTT-3.8.3-4] and [MQTT-4.13.1-1] (Disconnect)
case catch emqtt:subscribe(Client, #{}, [{ShareTopic, [{nl, true}, {qos, 1}]}]) of
{'EXIT', {Reason, _Stk}} ->
?assertEqual({disconnected, ?RC_PROTOCOL_ERROR, #{}}, Reason)
end,
process_flag(trap_exit, false).

View File

@ -137,15 +137,19 @@ init_per_testcase(t_ocsp_responder_error_responses, Config) ->
enable_ocsp_stapling => true, enable_ocsp_stapling => true,
responder_url => <<"http://localhost:9877/">>, responder_url => <<"http://localhost:9877/">>,
issuer_pem => filename:join(DataDir, "ocsp-issuer.pem"), issuer_pem => filename:join(DataDir, "ocsp-issuer.pem"),
refresh_http_timeout => 15_000, refresh_http_timeout => <<"15s">>,
refresh_interval => 1_000 refresh_interval => <<"1s">>
} }
} }
}, },
Conf = #{listeners => #{Type => #{Name => ListenerOpts}}}, Conf = #{listeners => #{Type => #{Name => ListenerOpts}}},
ConfBin = emqx_utils_maps:binary_key_map(Conf), ConfBin = emqx_utils_maps:binary_key_map(Conf),
hocon_tconf:check_plain(emqx_schema, ConfBin, #{required => false, atom_keys => false}), CheckedConf = hocon_tconf:check_plain(emqx_schema, ConfBin, #{
emqx_config:put_listener_conf(Type, Name, [], ListenerOpts), required => false, atom_keys => false
}),
Conf2 = emqx_utils_maps:unsafe_atom_key_map(CheckedConf),
ListenerOpts2 = emqx_utils_maps:deep_get([listeners, Type, Name], Conf2),
emqx_config:put_listener_conf(Type, Name, [], ListenerOpts2),
snabbkaffe:start_trace(), snabbkaffe:start_trace(),
_Heir = spawn_dummy_heir(), _Heir = spawn_dummy_heir(),
{ok, CachePid} = emqx_ocsp_cache:start_link(), {ok, CachePid} = emqx_ocsp_cache:start_link(),
@ -179,15 +183,19 @@ init_per_testcase(_TestCase, Config) ->
enable_ocsp_stapling => true, enable_ocsp_stapling => true,
responder_url => <<"http://localhost:9877/">>, responder_url => <<"http://localhost:9877/">>,
issuer_pem => filename:join(DataDir, "ocsp-issuer.pem"), issuer_pem => filename:join(DataDir, "ocsp-issuer.pem"),
refresh_http_timeout => 15_000, refresh_http_timeout => <<"15s">>,
refresh_interval => 1_000 refresh_interval => <<"1s">>
} }
} }
}, },
Conf = #{listeners => #{Type => #{Name => ListenerOpts}}}, Conf = #{listeners => #{Type => #{Name => ListenerOpts}}},
ConfBin = emqx_utils_maps:binary_key_map(Conf), ConfBin = emqx_utils_maps:binary_key_map(Conf),
hocon_tconf:check_plain(emqx_schema, ConfBin, #{required => false, atom_keys => false}), CheckedConf = hocon_tconf:check_plain(emqx_schema, ConfBin, #{
emqx_config:put_listener_conf(Type, Name, [], ListenerOpts), required => false, atom_keys => false
}),
Conf2 = emqx_utils_maps:unsafe_atom_key_map(CheckedConf),
ListenerOpts2 = emqx_utils_maps:deep_get([listeners, Type, Name], Conf2),
emqx_config:put_listener_conf(Type, Name, [], ListenerOpts2),
[ [
{cache_pid, CachePid} {cache_pid, CachePid}
| Config | Config

View File

@ -1316,8 +1316,8 @@ authenticator_examples() ->
<<"password">> => ?PH_PASSWORD <<"password">> => ?PH_PASSWORD
}, },
pool_size => 8, pool_size => 8,
connect_timeout => 5000, connect_timeout => <<"5s">>,
request_timeout => 5000, request_timeout => <<"5s">>,
enable_pipelining => 100, enable_pipelining => 100,
ssl => #{enable => false} ssl => #{enable => false}
} }

View File

@ -80,7 +80,7 @@ listener_mqtt_tcp_conf(Port, EnableAuthn) ->
<<"max_connections">> => 1024000, <<"max_connections">> => 1024000,
<<"mountpoint">> => <<>>, <<"mountpoint">> => <<>>,
<<"proxy_protocol">> => false, <<"proxy_protocol">> => false,
<<"proxy_protocol_timeout">> => 3000, <<"proxy_protocol_timeout">> => <<"3s">>,
<<"enable_authn">> => EnableAuthn <<"enable_authn">> => EnableAuthn
}. }.

View File

@ -105,7 +105,7 @@ set_special_configs(_App) ->
<<"headers">> => #{}, <<"headers">> => #{},
<<"ssl">> => #{<<"enable">> => true}, <<"ssl">> => #{<<"enable">> => true},
<<"method">> => <<"get">>, <<"method">> => <<"get">>,
<<"request_timeout">> => 5000 <<"request_timeout">> => <<"5s">>
}). }).
-define(SOURCE2, #{ -define(SOURCE2, #{
<<"type">> => <<"mongodb">>, <<"type">> => <<"mongodb">>,

View File

@ -70,7 +70,7 @@ t_api(_) ->
<<"cache">> => #{ <<"cache">> => #{
<<"enable">> => false, <<"enable">> => false,
<<"max_size">> => 32, <<"max_size">> => 32,
<<"ttl">> => 60000 <<"ttl">> => <<"60s">>
} }
}, },
@ -84,7 +84,7 @@ t_api(_) ->
<<"cache">> => #{ <<"cache">> => #{
<<"enable">> => true, <<"enable">> => true,
<<"max_size">> => 32, <<"max_size">> => 32,
<<"ttl">> => 60000 <<"ttl">> => <<"60s">>
} }
}, },

View File

@ -140,7 +140,7 @@ mk_cluster_specs(Config, Opts) ->
{core, emqx_bridge_api_SUITE1, #{}}, {core, emqx_bridge_api_SUITE1, #{}},
{core, emqx_bridge_api_SUITE2, #{}} {core, emqx_bridge_api_SUITE2, #{}}
], ],
CommonOpts = #{ CommonOpts = Opts#{
env => [{emqx, boot_modules, [broker]}], env => [{emqx, boot_modules, [broker]}],
apps => [], apps => [],
% NOTE % NOTE
@ -157,7 +157,6 @@ mk_cluster_specs(Config, Opts) ->
load_apps => ?SUITE_APPS ++ [emqx_dashboard], load_apps => ?SUITE_APPS ++ [emqx_dashboard],
env_handler => fun load_suite_config/1, env_handler => fun load_suite_config/1,
load_schema => false, load_schema => false,
join_to => maps:get(join_to, Opts, true),
priv_data_dir => ?config(priv_dir, Config) priv_data_dir => ?config(priv_dir, Config)
}, },
emqx_common_test_helpers:emqx_cluster(Specs, CommonOpts). emqx_common_test_helpers:emqx_cluster(Specs, CommonOpts).

View File

@ -105,29 +105,32 @@ parse_and_check(Config, ConfigString, Name) ->
resource_id(Config) -> resource_id(Config) ->
BridgeType = ?config(bridge_type, Config), BridgeType = ?config(bridge_type, Config),
Name = ?config(bridge_name, Config), BridgeName = ?config(bridge_name, Config),
emqx_bridge_resource:resource_id(BridgeType, Name). emqx_bridge_resource:resource_id(BridgeType, BridgeName).
create_bridge(Config) -> create_bridge(Config) ->
create_bridge(Config, _Overrides = #{}). create_bridge(Config, _Overrides = #{}).
create_bridge(Config, Overrides) -> create_bridge(Config, Overrides) ->
BridgeType = ?config(bridge_type, Config), BridgeType = ?config(bridge_type, Config),
Name = ?config(bridge_name, Config), BridgeName = ?config(bridge_name, Config),
BridgeConfig0 = ?config(bridge_config, Config), BridgeConfig0 = ?config(bridge_config, Config),
BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides), BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
ct:pal("creating bridge with config: ~p", [BridgeConfig]), ct:pal("creating bridge with config: ~p", [BridgeConfig]),
emqx_bridge:create(BridgeType, Name, BridgeConfig). emqx_bridge:create(BridgeType, BridgeName, BridgeConfig).
create_bridge_api(Config) -> create_bridge_api(Config) ->
create_bridge_api(Config, _Overrides = #{}). create_bridge_api(Config, _Overrides = #{}).
create_bridge_api(Config, Overrides) -> create_bridge_api(Config, Overrides) ->
BridgeType = ?config(bridge_type, Config), BridgeType = ?config(bridge_type, Config),
Name = ?config(bridge_name, Config), BridgeName = ?config(bridge_name, Config),
BridgeConfig0 = ?config(bridge_config, Config), BridgeConfig0 = ?config(bridge_config, Config),
BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides), BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => Name}, create_bridge_api(BridgeType, BridgeName, BridgeConfig).
create_bridge_api(BridgeType, BridgeName, BridgeConfig) ->
Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => BridgeName},
Path = emqx_mgmt_api_test_util:api_path(["bridges"]), Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(), AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Opts = #{return_all => true}, Opts = #{return_all => true},
@ -164,14 +167,38 @@ update_bridge_api(Config, Overrides) ->
ct:pal("bridge update result: ~p", [Res]), ct:pal("bridge update result: ~p", [Res]),
Res. Res.
op_bridge_api(Op, BridgeType, BridgeName) ->
BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId, Op]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Opts = #{return_all => true},
ct:pal("calling bridge ~p (via http): ~p", [BridgeId, Op]),
Res =
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, "", Opts) of
{ok, {Status = {_, 204, _}, Headers, Body}} ->
{ok, {Status, Headers, Body}};
{ok, {Status, Headers, Body}} ->
{ok, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}};
{error, {Status, Headers, Body}} ->
{error, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}};
Error ->
Error
end,
ct:pal("bridge op result: ~p", [Res]),
Res.
probe_bridge_api(Config) -> probe_bridge_api(Config) ->
probe_bridge_api(Config, _Overrides = #{}). probe_bridge_api(Config, _Overrides = #{}).
probe_bridge_api(Config, _Overrides) -> probe_bridge_api(Config, Overrides) ->
BridgeType = ?config(bridge_type, Config), BridgeType = ?config(bridge_type, Config),
Name = ?config(bridge_name, Config), BridgeName = ?config(bridge_name, Config),
BridgeConfig = ?config(bridge_config, Config), BridgeConfig0 = ?config(bridge_config, Config),
Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => Name}, BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
probe_bridge_api(BridgeType, BridgeName, BridgeConfig).
probe_bridge_api(BridgeType, BridgeName, BridgeConfig) ->
Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => BridgeName},
Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]), Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(), AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Opts = #{return_all => true}, Opts = #{return_all => true},
@ -289,10 +316,34 @@ t_create_via_http(Config) ->
t_start_stop(Config, StopTracePoint) -> t_start_stop(Config, StopTracePoint) ->
BridgeType = ?config(bridge_type, Config), BridgeType = ?config(bridge_type, Config),
BridgeName = ?config(bridge_name, Config), BridgeName = ?config(bridge_name, Config),
ResourceId = resource_id(Config), BridgeConfig = ?config(bridge_config, Config),
t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint).
t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint) ->
ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
?check_trace( ?check_trace(
begin begin
?assertMatch({ok, _}, create_bridge(Config)), %% Check that the bridge probe API doesn't leak atoms.
ProbeRes0 = probe_bridge_api(
BridgeType,
BridgeName,
BridgeConfig#{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}}
),
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0),
AtomsBefore = erlang:system_info(atom_count),
%% Probe again; shouldn't have created more atoms.
ProbeRes1 = probe_bridge_api(
BridgeType,
BridgeName,
BridgeConfig#{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}}
),
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1),
AtomsAfter = erlang:system_info(atom_count),
?assertEqual(AtomsBefore, AtomsAfter),
?assertMatch({ok, _}, emqx_bridge:create(BridgeType, BridgeName, BridgeConfig)),
%% Since the connection process is async, we give it some time to %% Since the connection process is async, we give it some time to
%% stabilize and avoid flakiness. %% stabilize and avoid flakiness.
?retry( ?retry(
@ -301,24 +352,48 @@ t_start_stop(Config, StopTracePoint) ->
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
), ),
%% Check that the bridge probe API doesn't leak atoms. %% `start` bridge to trigger `already_started`
ProbeRes0 = probe_bridge_api( ?assertMatch(
Config, {ok, {{_, 204, _}, _Headers, []}},
#{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}} emqx_bridge_testlib:op_bridge_api("start", BridgeType, BridgeName)
),
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0),
AtomsBefore = erlang:system_info(atom_count),
%% Probe again; shouldn't have created more atoms.
ProbeRes1 = probe_bridge_api(
Config,
#{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}}
), ),
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1), ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)),
AtomsAfter = erlang:system_info(atom_count),
?assertEqual(AtomsBefore, AtomsAfter),
%% Now stop the bridge. ?assertMatch(
{{ok, _}, {ok, _}},
?wait_async_action(
emqx_bridge_testlib:op_bridge_api("stop", BridgeType, BridgeName),
#{?snk_kind := StopTracePoint},
5_000
)
),
?assertEqual(
{error, resource_is_stopped}, emqx_resource_manager:health_check(ResourceId)
),
?assertMatch(
{ok, {{_, 204, _}, _Headers, []}},
emqx_bridge_testlib:op_bridge_api("stop", BridgeType, BridgeName)
),
?assertEqual(
{error, resource_is_stopped}, emqx_resource_manager:health_check(ResourceId)
),
?assertMatch(
{ok, {{_, 204, _}, _Headers, []}},
emqx_bridge_testlib:op_bridge_api("start", BridgeType, BridgeName)
),
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
%% Disable the bridge, which will also stop it.
?assertMatch( ?assertMatch(
{{ok, _}, {ok, _}}, {{ok, _}, {ok, _}},
?wait_async_action( ?wait_async_action(
@ -331,8 +406,11 @@ t_start_stop(Config, StopTracePoint) ->
ok ok
end, end,
fun(Trace) -> fun(Trace) ->
%% one for each probe, one for real %% one for each probe, two for real
?assertMatch([_, _, #{instance_id := ResourceId}], ?of_kind(StopTracePoint, Trace)), ?assertMatch(
[_, _, #{instance_id := ResourceId}, #{instance_id := ResourceId}],
?of_kind(StopTracePoint, Trace)
),
ok ok
end end
), ),

View File

@ -28,6 +28,9 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-define(BRIDGE_TYPE, <<"webhook">>).
-define(BRIDGE_NAME, atom_to_binary(?MODULE)).
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
@ -36,15 +39,13 @@ groups() ->
init_per_suite(_Config) -> init_per_suite(_Config) ->
emqx_common_test_helpers:render_and_load_app_config(emqx_conf), emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), ok = emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_bridge]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource]), ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
{ok, _} = application:ensure_all_started(emqx_connector), {ok, _} = application:ensure_all_started(emqx_connector),
[]. [].
end_per_suite(_Config) -> end_per_suite(_Config) ->
ok = emqx_config:put([bridges], #{}), ok = emqx_mgmt_api_test_util:end_suite([emqx_conf, emqx_bridge]),
ok = emqx_config:put_raw([bridges], #{}),
ok = emqx_common_test_helpers:stop_apps([emqx_conf, emqx_bridge]),
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
_ = application:stop(emqx_connector), _ = application:stop(emqx_connector),
_ = application:stop(emqx_bridge), _ = application:stop(emqx_bridge),
@ -53,10 +54,22 @@ end_per_suite(_Config) ->
suite() -> suite() ->
[{timetrap, {seconds, 60}}]. [{timetrap, {seconds, 60}}].
init_per_testcase(t_bad_bridge_config, Config) ->
Config;
init_per_testcase(t_send_async_connection_timeout, Config) ->
ResponseDelayMS = 500,
Server = start_http_server(#{response_delay_ms => ResponseDelayMS}),
[{http_server, Server}, {response_delay_ms, ResponseDelayMS} | Config];
init_per_testcase(_TestCase, Config) -> init_per_testcase(_TestCase, Config) ->
Config. Server = start_http_server(#{response_delay_ms => 0}),
[{http_server, Server} | Config].
end_per_testcase(_TestCase, _Config) -> end_per_testcase(_TestCase, Config) ->
case ?config(http_server, Config) of
undefined -> ok;
Server -> stop_http_server(Server)
end,
emqx_bridge_testlib:delete_all_bridges(),
emqx_common_test_helpers:call_janitor(), emqx_common_test_helpers:call_janitor(),
ok. ok.
@ -65,13 +78,14 @@ end_per_testcase(_TestCase, _Config) ->
%% (Orginally copied from emqx_bridge_api_SUITE) %% (Orginally copied from emqx_bridge_api_SUITE)
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
start_http_server(HTTPServerConfig) -> start_http_server(HTTPServerConfig) ->
ct:pal("Start server\n"),
process_flag(trap_exit, true), process_flag(trap_exit, true),
Parent = self(), Parent = self(),
ct:pal("Starting server for ~p", [Parent]),
{ok, {Port, Sock}} = listen_on_random_port(), {ok, {Port, Sock}} = listen_on_random_port(),
Acceptor = spawn(fun() -> Acceptor = spawn(fun() ->
accept_loop(Sock, Parent, HTTPServerConfig) accept_loop(Sock, Parent, HTTPServerConfig)
end), end),
ct:pal("Started server on port ~p", [Port]),
timer:sleep(100), timer:sleep(100),
#{port => Port, sock => Sock, acceptor => Acceptor}. #{port => Port, sock => Sock, acceptor => Acceptor}.
@ -160,25 +174,25 @@ parse_http_request_assertive(ReqStr0) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
bridge_async_config(#{port := Port} = Config) -> bridge_async_config(#{port := Port} = Config) ->
Type = maps:get(type, Config, <<"webhook">>), Type = maps:get(type, Config, ?BRIDGE_TYPE),
Name = maps:get(name, Config, atom_to_binary(?MODULE)), Name = maps:get(name, Config, ?BRIDGE_NAME),
PoolSize = maps:get(pool_size, Config, 1), PoolSize = maps:get(pool_size, Config, 1),
QueryMode = maps:get(query_mode, Config, "async"), QueryMode = maps:get(query_mode, Config, "async"),
ConnectTimeout = maps:get(connect_timeout, Config, 1), ConnectTimeout = maps:get(connect_timeout, Config, "1s"),
RequestTimeout = maps:get(request_timeout, Config, 10000), RequestTimeout = maps:get(request_timeout, Config, "10s"),
ResumeInterval = maps:get(resume_interval, Config, "1s"), ResumeInterval = maps:get(resume_interval, Config, "1s"),
ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"), ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"),
ConfigString = io_lib:format( ConfigString = io_lib:format(
"bridges.~s.~s {\n" "bridges.~s.~s {\n"
" url = \"http://localhost:~p\"\n" " url = \"http://localhost:~p\"\n"
" connect_timeout = \"~ps\"\n" " connect_timeout = \"~p\"\n"
" enable = true\n" " enable = true\n"
" enable_pipelining = 100\n" " enable_pipelining = 100\n"
" max_retries = 2\n" " max_retries = 2\n"
" method = \"post\"\n" " method = \"post\"\n"
" pool_size = ~p\n" " pool_size = ~p\n"
" pool_type = \"random\"\n" " pool_type = \"random\"\n"
" request_timeout = \"~ps\"\n" " request_timeout = \"~s\"\n"
" body = \"${id}\"" " body = \"${id}\""
" resource_opts {\n" " resource_opts {\n"
" inflight_window = 100\n" " inflight_window = 100\n"
@ -217,8 +231,8 @@ parse_and_check(ConfigString, BridgeType, Name) ->
RetConfig. RetConfig.
make_bridge(Config) -> make_bridge(Config) ->
Type = <<"webhook">>, Type = ?BRIDGE_TYPE,
Name = atom_to_binary(?MODULE), Name = ?BRIDGE_NAME,
BridgeConfig = bridge_async_config(Config#{ BridgeConfig = bridge_async_config(Config#{
name => Name, name => Name,
type => Type type => Type
@ -236,16 +250,15 @@ make_bridge(Config) ->
%% This test ensures that https://emqx.atlassian.net/browse/CI-62 is fixed. %% This test ensures that https://emqx.atlassian.net/browse/CI-62 is fixed.
%% When the connection time out all the queued requests where dropped in %% When the connection time out all the queued requests where dropped in
t_send_async_connection_timeout(_Config) -> t_send_async_connection_timeout(Config) ->
ResponseDelayMS = 90, ResponseDelayMS = ?config(response_delay_ms, Config),
#{port := Port} = Server = start_http_server(#{response_delay_ms => 900}), #{port := Port} = ?config(http_server, Config),
% Port = 9000,
BridgeID = make_bridge(#{ BridgeID = make_bridge(#{
port => Port, port => Port,
pool_size => 1, pool_size => 1,
query_mode => "async", query_mode => "async",
connect_timeout => ResponseDelayMS * 2, connect_timeout => integer_to_list(ResponseDelayMS * 2) ++ "s",
request_timeout => 10000, request_timeout => "10s",
resource_request_ttl => "infinity" resource_request_ttl => "infinity"
}), }),
NumberOfMessagesToSend = 10, NumberOfMessagesToSend = 10,
@ -257,17 +270,16 @@ t_send_async_connection_timeout(_Config) ->
ct:pal("Sent messages\n"), ct:pal("Sent messages\n"),
MessageIDs = maps:from_keys(lists:seq(1, NumberOfMessagesToSend), void), MessageIDs = maps:from_keys(lists:seq(1, NumberOfMessagesToSend), void),
receive_request_notifications(MessageIDs, ResponseDelayMS), receive_request_notifications(MessageIDs, ResponseDelayMS),
stop_http_server(Server),
ok. ok.
t_async_free_retries(_Config) -> t_async_free_retries(Config) ->
#{port := Port} = start_http_server(#{response_delay_ms => 0}), #{port := Port} = ?config(http_server, Config),
BridgeID = make_bridge(#{ BridgeID = make_bridge(#{
port => Port, port => Port,
pool_size => 1, pool_size => 1,
query_mode => "sync", query_mode => "sync",
connect_timeout => 1_000, connect_timeout => "1s",
request_timeout => 10_000, request_timeout => "10s",
resource_request_ttl => "10000s" resource_request_ttl => "10000s"
}), }),
%% Fail 5 times then succeed. %% Fail 5 times then succeed.
@ -285,15 +297,15 @@ t_async_free_retries(_Config) ->
do_t_async_retries(Context, {error, {shutdown, normal}}, Fn), do_t_async_retries(Context, {error, {shutdown, normal}}, Fn),
ok. ok.
t_async_common_retries(_Config) -> t_async_common_retries(Config) ->
#{port := Port} = start_http_server(#{response_delay_ms => 0}), #{port := Port} = ?config(http_server, Config),
BridgeID = make_bridge(#{ BridgeID = make_bridge(#{
port => Port, port => Port,
pool_size => 1, pool_size => 1,
query_mode => "sync", query_mode => "sync",
resume_interval => "100ms", resume_interval => "100ms",
connect_timeout => 1_000, connect_timeout => "1s",
request_timeout => 10_000, request_timeout => "10s",
resource_request_ttl => "10000s" resource_request_ttl => "10000s"
}), }),
%% Keeps failing until connector gives up. %% Keeps failing until connector gives up.
@ -323,6 +335,39 @@ t_async_common_retries(_Config) ->
do_t_async_retries(Context, {error, something_else}, FnFail), do_t_async_retries(Context, {error, something_else}, FnFail),
ok. ok.
t_bad_bridge_config(_Config) ->
BridgeConfig = bridge_async_config(#{port => 12345}),
?assertMatch(
{ok,
{{_, 201, _}, _Headers, #{
<<"status">> := <<"disconnected">>,
<<"status_reason">> := <<"Connection refused">>
}}},
emqx_bridge_testlib:create_bridge_api(
?BRIDGE_TYPE,
?BRIDGE_NAME,
BridgeConfig
)
),
%% try `/start` bridge
?assertMatch(
{error, {{_, 400, _}, _Headers, #{<<"message">> := <<"Connection refused">>}}},
emqx_bridge_testlib:op_bridge_api("start", ?BRIDGE_TYPE, ?BRIDGE_NAME)
),
ok.
t_start_stop(Config) ->
#{port := Port} = ?config(http_server, Config),
BridgeConfig = bridge_async_config(#{
type => ?BRIDGE_TYPE,
name => ?BRIDGE_NAME,
port => Port
}),
emqx_bridge_testlib:t_start_stop(
?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig, emqx_connector_http_stopped
).
%% helpers
do_t_async_retries(TestContext, Error, Fn) -> do_t_async_retries(TestContext, Error, Fn) ->
#{error_attempts := ErrorAttempts} = TestContext, #{error_attempts := ErrorAttempts} = TestContext,
persistent_term:put({?MODULE, ?FUNCTION_NAME, attempts}, 0), persistent_term:put({?MODULE, ?FUNCTION_NAME, attempts}, 0),

View File

@ -635,9 +635,9 @@ t_bad_sql_parameter(Config) ->
Config, Config,
#{ #{
<<"resource_opts">> => #{ <<"resource_opts">> => #{
<<"request_ttl">> => 500, <<"request_ttl">> => <<"500ms">>,
<<"resume_interval">> => 100, <<"resume_interval">> => <<"100ms">>,
<<"health_check_interval">> => 100 <<"health_check_interval">> => <<"100ms">>
} }
} }
) )

View File

@ -184,7 +184,7 @@ clickhouse_config() ->
] ]
) )
), ),
connect_timeout => 10000 connect_timeout => <<"10s">>
}, },
#{<<"config">> => Config}. #{<<"config">> => Config}.

View File

@ -1,6 +1,6 @@
%% -*- mode: erlang; -*- %% -*- mode: erlang; -*-
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.6.8-emqx-1"}}} {deps, [ {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-1"}}}
, {emqx_connector, {path, "../../apps/emqx_connector"}} , {emqx_connector, {path, "../../apps/emqx_connector"}}
, {emqx_resource, {path, "../../apps/emqx_resource"}} , {emqx_resource, {path, "../../apps/emqx_resource"}}
, {emqx_bridge, {path, "../../apps/emqx_bridge"}} , {emqx_bridge, {path, "../../apps/emqx_bridge"}}

View File

@ -135,8 +135,8 @@ bridge_config(TestCase, _TestGroup, Config) ->
" iotdb_version = \"~s\"\n" " iotdb_version = \"~s\"\n"
" pool_size = 1\n" " pool_size = 1\n"
" resource_opts = {\n" " resource_opts = {\n"
" health_check_interval = 5000\n" " health_check_interval = \"5s\"\n"
" request_ttl = 30000\n" " request_ttl = 30s\n"
" query_mode = \"async\"\n" " query_mode = \"async\"\n"
" worker_pool_size = 1\n" " worker_pool_size = 1\n"
" }\n" " }\n"

View File

@ -590,7 +590,7 @@ kafka_config(TestCase, _KafkaType, Config) ->
" kafka {\n" " kafka {\n"
" max_batch_bytes = 896KB\n" " max_batch_bytes = 896KB\n"
" max_rejoin_attempts = 5\n" " max_rejoin_attempts = 5\n"
" offset_commit_interval_seconds = 3\n" " offset_commit_interval_seconds = 3s\n"
%% todo: matrix this %% todo: matrix this
" offset_reset_policy = latest\n" " offset_reset_policy = latest\n"
" }\n" " }\n"

View File

@ -307,7 +307,7 @@ bridges.kafka_consumer.my_consumer {
kafka { kafka {
max_batch_bytes = 896KB max_batch_bytes = 896KB
max_rejoin_attempts = 5 max_rejoin_attempts = 5
offset_commit_interval_seconds = 3 offset_commit_interval_seconds = 3s
offset_reset_policy = latest offset_reset_policy = latest
} }
topic_mapping = [ topic_mapping = [

View File

@ -298,9 +298,9 @@ t_write_timeout(Config) ->
Config, Config,
#{ #{
<<"resource_opts">> => #{ <<"resource_opts">> => #{
<<"request_ttl">> => 500, <<"request_ttl">> => <<"500ms">>,
<<"resume_interval">> => 100, <<"resume_interval">> => <<"100ms">>,
<<"health_check_interval">> => 100 <<"health_check_interval">> => <<"100ms">>
} }
} }
), ),

View File

@ -456,9 +456,9 @@ t_write_timeout(Config) ->
Config, Config,
#{ #{
<<"resource_opts">> => #{ <<"resource_opts">> => #{
<<"request_ttl">> => 500, <<"request_ttl">> => <<"500ms">>,
<<"resume_interval">> => 100, <<"resume_interval">> => <<"100ms">>,
<<"health_check_interval">> => 100 <<"health_check_interval">> => <<"100ms">>
} }
} }
), ),

View File

@ -247,7 +247,6 @@ cluster(Specs, Config) ->
{env, Env}, {env, Env},
{apps, [emqx_conf]}, {apps, [emqx_conf]},
{load_schema, false}, {load_schema, false},
{join_to, true},
{priv_data_dir, PrivDataDir}, {priv_data_dir, PrivDataDir},
{env_handler, fun {env_handler, fun
(emqx) -> (emqx) ->

View File

@ -55,7 +55,7 @@ log.console_handler {
burst_limit { burst_limit {
enable = true enable = true
max_count = 10000 max_count = 10000
window_time = 1000 window_time = 1s
} }
chars_limit = unlimited chars_limit = unlimited
drop_mode_qlen = 3000 drop_mode_qlen = 3000
@ -66,9 +66,9 @@ log.console_handler {
max_depth = 100 max_depth = 100
overload_kill { overload_kill {
enable = true enable = true
mem_size = 31457280 mem_size = \"30MB\"
qlen = 20000 qlen = 20000
restart_after = 5000 restart_after = \"5s\"
} }
single_line = true single_line = true
supervisor_reports = error supervisor_reports = error
@ -80,7 +80,7 @@ log.file_handlers {
burst_limit { burst_limit {
enable = true enable = true
max_count = 10000 max_count = 10000
window_time = 1000 window_time = 1s
} }
chars_limit = unlimited chars_limit = unlimited
drop_mode_qlen = 3000 drop_mode_qlen = 3000
@ -93,9 +93,9 @@ log.file_handlers {
max_size = \"1024MB\" max_size = \"1024MB\"
overload_kill { overload_kill {
enable = true enable = true
mem_size = 31457280 mem_size = \"30MB\"
qlen = 20000 qlen = 20000
restart_after = 5000 restart_after = \"5s\"
} }
rotation {count = 20, enable = true} rotation {count = 20, enable = true}
single_line = true single_line = true

View File

@ -35,7 +35,7 @@ t_run_gc(_) ->
node => #{ node => #{
cookie => <<"cookie">>, cookie => <<"cookie">>,
data_dir => <<"data">>, data_dir => <<"data">>,
global_gc_interval => 1000 global_gc_interval => <<"1s">>
} }
}, },
emqx_common_test_helpers:load_config(emqx_conf_schema, Conf0), emqx_common_test_helpers:load_config(emqx_conf_schema, Conf0),

View File

@ -16,11 +16,10 @@
-module(emqx_connector_http). -module(emqx_connector_http).
-include("emqx_connector.hrl").
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-behaviour(emqx_resource). -behaviour(emqx_resource).
@ -219,10 +218,31 @@ on_start(
base_path => BasePath, base_path => BasePath,
request => preprocess_request(maps:get(request, Config, undefined)) request => preprocess_request(maps:get(request, Config, undefined))
}, },
case ehttpc_sup:start_pool(InstId, PoolOpts) of case start_pool(InstId, PoolOpts) of
{ok, _} -> {ok, State}; ok ->
{error, {already_started, _}} -> {ok, State}; case do_get_status(InstId, ConnectTimeout) of
{error, Reason} -> {error, Reason} ok ->
{ok, State};
Error ->
ok = ehttpc_sup:stop_pool(InstId),
Error
end;
Error ->
Error
end.
start_pool(PoolName, PoolOpts) ->
case ehttpc_sup:start_pool(PoolName, PoolOpts) of
{ok, _} ->
ok;
{error, {already_started, _}} ->
?SLOG(warning, #{
msg => "emqx_connector_on_start_already_started",
pool_name => PoolName
}),
ok;
Error ->
Error
end. end.
on_stop(InstId, _State) -> on_stop(InstId, _State) ->
@ -230,7 +250,9 @@ on_stop(InstId, _State) ->
msg => "stopping_http_connector", msg => "stopping_http_connector",
connector => InstId connector => InstId
}), }),
ehttpc_sup:stop_pool(InstId). Res = ehttpc_sup:stop_pool(InstId),
?tp(emqx_connector_http_stopped, #{instance_id => InstId}),
Res.
on_query(InstId, {send_message, Msg}, State) -> on_query(InstId, {send_message, Msg}, State) ->
case maps:get(request, State, undefined) of case maps:get(request, State, undefined) of

View File

@ -24,6 +24,8 @@ wrap_auth_headers_test_() ->
fun() -> fun() ->
meck:expect(ehttpc_sup, start_pool, 2, {ok, foo}), meck:expect(ehttpc_sup, start_pool, 2, {ok, foo}),
meck:expect(ehttpc, request, fun(_, _, Req, _, _) -> {ok, 200, Req} end), meck:expect(ehttpc, request, fun(_, _, Req, _, _) -> {ok, 200, Req} end),
meck:expect(ehttpc, workers, 1, [{self, self()}]),
meck:expect(ehttpc, health_check, 2, ok),
meck:expect(ehttpc_pool, pick_worker, 1, self()), meck:expect(ehttpc_pool, pick_worker, 1, self()),
meck:expect(emqx_resource, allocate_resource, 3, ok), meck:expect(emqx_resource, allocate_resource, 3, ok),
[ehttpc_sup, ehttpc, ehttpc_pool, emqx_resource] [ehttpc_sup, ehttpc, ehttpc_pool, emqx_resource]

View File

@ -195,7 +195,7 @@ enable(Bool) ->
bind(Port) -> bind(Port) ->
{"bind", {"bind",
?HOCON( ?HOCON(
?UNION([non_neg_integer(), emqx_schema:ip_port()]), emqx_schema:ip_port(),
#{ #{
default => 0, default => 0,
required => false, required => false,

View File

@ -225,7 +225,7 @@ t_update_conf(_Config) ->
DeletedConf = Conf#{<<"servers">> => Servers2}, DeletedConf = Conf#{<<"servers">> => Servers2},
validate_servers(Path, DeletedConf, Servers2), validate_servers(Path, DeletedConf, Servers2),
[L1, L2 | Servers3] = Servers, [L1, L2 | Servers3] = Servers,
UpdateL2 = L2#{<<"pool_size">> => 1, <<"request_timeout">> => 1000}, UpdateL2 = L2#{<<"pool_size">> => 1, <<"request_timeout">> => <<"1s">>},
UpdatedServers = [L1, UpdateL2 | Servers3], UpdatedServers = [L1, UpdateL2 | Servers3],
UpdatedConf = Conf#{<<"servers">> => UpdatedServers}, UpdatedConf = Conf#{<<"servers">> => UpdatedServers},
validate_servers(Path, UpdatedConf, UpdatedServers), validate_servers(Path, UpdatedConf, UpdatedServers),

View File

@ -67,7 +67,7 @@ init_per_suite(Config) ->
_ = emqx_exhook_demo_svr:start(), _ = emqx_exhook_demo_svr:start(),
load_cfg(?CONF_DEFAULT), load_cfg(?CONF_DEFAULT),
emqx_mgmt_api_test_util:init_suite([emqx_exhook]), emqx_mgmt_api_test_util:init_suite([emqx_exhook]),
[Conf] = emqx:get_config([exhook, servers]), [Conf] = emqx:get_raw_config([exhook, servers]),
[{template, Conf} | Config]. [{template, Conf} | Config].
end_per_suite(Config) -> end_per_suite(Config) ->
@ -157,8 +157,8 @@ t_get(_) ->
t_add(Cfg) -> t_add(Cfg) ->
Template = proplists:get_value(template, Cfg), Template = proplists:get_value(template, Cfg),
Instance = Template#{ Instance = Template#{
name => <<"test1">>, <<"name">> => <<"test1">>,
url => "http://127.0.0.1:9001" <<"url">> => "http://127.0.0.1:9001"
}, },
{ok, Data} = request_api( {ok, Data} = request_api(
post, post,
@ -186,8 +186,8 @@ t_add(Cfg) ->
t_add_duplicate(Cfg) -> t_add_duplicate(Cfg) ->
Template = proplists:get_value(template, Cfg), Template = proplists:get_value(template, Cfg),
Instance = Template#{ Instance = Template#{
name => <<"test1">>, <<"name">> => <<"test1">>,
url => "http://127.0.0.1:9001" <<"url">> => "http://127.0.0.1:9001"
}, },
{error, _Reason} = request_api( {error, _Reason} = request_api(
@ -203,8 +203,8 @@ t_add_duplicate(Cfg) ->
t_add_with_bad_name(Cfg) -> t_add_with_bad_name(Cfg) ->
Template = proplists:get_value(template, Cfg), Template = proplists:get_value(template, Cfg),
Instance = Template#{ Instance = Template#{
name => <<"🤔">>, <<"name">> => <<"🤔">>,
url => "http://127.0.0.1:9001" <<"url">> => "http://127.0.0.1:9001"
}, },
{error, _Reason} = request_api( {error, _Reason} = request_api(
@ -298,7 +298,7 @@ t_hooks(_Cfg) ->
t_update(Cfg) -> t_update(Cfg) ->
Template = proplists:get_value(template, Cfg), Template = proplists:get_value(template, Cfg),
Instance = Template#{enable => false}, Instance = Template#{<<"enable">> => false},
{ok, <<"{\"", _/binary>>} = request_api( {ok, <<"{\"", _/binary>>} = request_api(
put, put,
api_path(["exhooks", "default"]), api_path(["exhooks", "default"]),

View File

@ -19,6 +19,7 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-include("emqx_ft_api.hrl").
%% Swagger specs from hocon schema %% Swagger specs from hocon schema
-export([ -export([
@ -61,7 +62,7 @@ schema("/file_transfer/files") ->
#{ #{
'operationId' => '/file_transfer/files', 'operationId' => '/file_transfer/files',
get => #{ get => #{
tags => [<<"file_transfer">>], tags => ?TAGS,
summary => <<"List all uploaded files">>, summary => <<"List all uploaded files">>,
description => ?DESC("file_list"), description => ?DESC("file_list"),
parameters => [ parameters => [
@ -83,7 +84,7 @@ schema("/file_transfer/files/:clientid/:fileid") ->
#{ #{
'operationId' => '/file_transfer/files/:clientid/:fileid', 'operationId' => '/file_transfer/files/:clientid/:fileid',
get => #{ get => #{
tags => [<<"file_transfer">>], tags => ?TAGS,
summary => <<"List files uploaded in a specific transfer">>, summary => <<"List files uploaded in a specific transfer">>,
description => ?DESC("file_list_transfer"), description => ?DESC("file_list_transfer"),
parameters => [ parameters => [

View File

@ -0,0 +1,10 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-ifndef(__EMQX_FT_API__).
-define(__EMQX_FT_API__, 42).
-define(TAGS, [<<"File Transfer">>]).
-endif.

View File

@ -96,7 +96,7 @@ handle_event(
complete -> complete ->
{next_state, start_assembling, NSt, ?internal([])}; {next_state, start_assembling, NSt, ?internal([])};
{incomplete, _} -> {incomplete, _} ->
Nodes = mria_mnesia:running_nodes() -- [node()], Nodes = emqx:running_nodes() -- [node()],
{next_state, {list_remote_fragments, Nodes}, NSt, ?internal([])}; {next_state, {list_remote_fragments, Nodes}, NSt, ?internal([])};
% TODO: recovery? % TODO: recovery?
{error, _} = Error -> {error, _} = Error ->

View File

@ -361,7 +361,7 @@ list(_Options, Query) ->
end. end.
list(QueryIn) -> list(QueryIn) ->
{Nodes, NodeQuery} = decode_query(QueryIn, lists:sort(mria_mnesia:running_nodes())), {Nodes, NodeQuery} = decode_query(QueryIn, lists:sort(emqx:running_nodes())),
list_nodes(NodeQuery, Nodes, #{items => []}). list_nodes(NodeQuery, Nodes, #{items => []}).
list_nodes(Query, Nodes = [Node | Rest], Acc) -> list_nodes(Query, Nodes = [Node | Rest], Acc) ->

View File

@ -21,6 +21,7 @@
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include("emqx_ft_api.hrl").
%% Swagger specs from hocon schema %% Swagger specs from hocon schema
-export([ -export([
@ -60,7 +61,7 @@ schema("/file_transfer/file") ->
#{ #{
'operationId' => '/file_transfer/file', 'operationId' => '/file_transfer/file',
get => #{ get => #{
tags => [<<"file_transfer">>], tags => ?TAGS,
summary => <<"Download a particular file">>, summary => <<"Download a particular file">>,
description => ?DESC("file_get"), description => ?DESC("file_get"),
parameters => [ parameters => [

View File

@ -77,7 +77,7 @@ set_special_configs(Config) ->
% complete transfers. % complete transfers.
Storage = emqx_utils_maps:deep_merge( Storage = emqx_utils_maps:deep_merge(
emqx_ft_test_helpers:local_storage(Config), emqx_ft_test_helpers:local_storage(Config),
#{<<"local">> => #{<<"segments">> => #{<<"gc">> => #{<<"interval">> => 0}}}} #{<<"local">> => #{<<"segments">> => #{<<"gc">> => #{<<"interval">> => <<"0s">>}}}}
), ),
emqx_ft_test_helpers:load_config(#{ emqx_ft_test_helpers:load_config(#{
<<"enable">> => true, <<"enable">> => true,

View File

@ -24,6 +24,8 @@
-import(emqx_dashboard_api_test_helpers, [host/0, uri/1]). -import(emqx_dashboard_api_test_helpers, [host/0, uri/1]).
-define(SUITE_APPS, [emqx_conf, emqx_ft]).
all() -> all() ->
[ [
{group, single}, {group, single},
@ -49,10 +51,9 @@ end_per_suite(_Config) ->
init_per_group(Group = cluster, Config) -> init_per_group(Group = cluster, Config) ->
Cluster = mk_cluster_specs(Config), Cluster = mk_cluster_specs(Config),
ct:pal("Starting ~p", [Cluster]), ct:pal("Starting ~p", [Cluster]),
Nodes = [ Nodes = [emqx_common_test_helpers:start_slave(Name, Opts) || {Name, Opts} <- Cluster],
emqx_common_test_helpers:start_slave(Name, Opts#{join_to => node()}) InitResult = erpc:multicall(Nodes, fun() -> init_node(Config) end),
|| {Name, Opts} <- Cluster [] = [{Node, Error} || {Node, {R, Error}} <- lists:zip(Nodes, InitResult), R /= ok],
],
[{group, Group}, {cluster_nodes, Nodes} | Config]; [{group, Group}, {cluster_nodes, Nodes} | Config];
init_per_group(Group, Config) -> init_per_group(Group, Config) ->
[{group, Group} | Config]. [{group, Group} | Config].
@ -65,22 +66,29 @@ end_per_group(cluster, Config) ->
end_per_group(_Group, _Config) -> end_per_group(_Group, _Config) ->
ok. ok.
mk_cluster_specs(Config) -> mk_cluster_specs(_Config) ->
Specs = [ Specs = [
{core, emqx_ft_api_SUITE1, #{listener_ports => [{tcp, 2883}]}}, {core, emqx_ft_api_SUITE1, #{listener_ports => [{tcp, 2883}]}},
{core, emqx_ft_api_SUITE2, #{listener_ports => [{tcp, 3883}]}} {core, emqx_ft_api_SUITE2, #{listener_ports => [{tcp, 3883}]}},
], {replicant, emqx_ft_api_SUITE3, #{listener_ports => [{tcp, 4883}]}}
CommOpts = [
{env, [{emqx, boot_modules, [broker, listeners]}]},
{apps, [emqx_ft]},
{conf, [{[listeners, Proto, default, enabled], false} || Proto <- [ssl, ws, wss]]},
{env_handler, emqx_ft_test_helpers:env_handler(Config)}
], ],
CommOpts = #{
env => [
{mria, db_backend, rlog},
{emqx, boot_modules, [broker, listeners]}
],
apps => [],
load_apps => ?SUITE_APPS,
conf => [{[listeners, Proto, default, enabled], false} || Proto <- [ssl, ws, wss]]
},
emqx_common_test_helpers:emqx_cluster( emqx_common_test_helpers:emqx_cluster(
Specs, Specs,
CommOpts CommOpts
). ).
init_node(Config) ->
ok = emqx_common_test_helpers:start_apps(?SUITE_APPS, emqx_ft_test_helpers:env_handler(Config)).
init_per_testcase(Case, Config) -> init_per_testcase(Case, Config) ->
[{tc, Case} | Config]. [{tc, Case} | Config].
end_per_testcase(t_ft_disabled, _Config) -> end_per_testcase(t_ft_disabled, _Config) ->
@ -96,7 +104,7 @@ t_list_files(Config) ->
ClientId = client_id(Config), ClientId = client_id(Config),
FileId = <<"f1">>, FileId = <<"f1">>,
Node = lists:last(cluster(Config)), Node = lists:last(test_nodes(Config)),
ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, Node), ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, Node),
{ok, 200, #{<<"files">> := Files}} = {ok, 200, #{<<"files">> := Files}} =
@ -124,7 +132,7 @@ t_download_transfer(Config) ->
ClientId = client_id(Config), ClientId = client_id(Config),
FileId = <<"f1">>, FileId = <<"f1">>,
Node = lists:last(cluster(Config)), Node = lists:last(test_nodes(Config)),
ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, Node), ok = emqx_ft_test_helpers:upload_file(ClientId, FileId, "f1", <<"data">>, Node),
?assertMatch( ?assertMatch(
@ -184,7 +192,7 @@ t_download_transfer(Config) ->
t_list_files_paging(Config) -> t_list_files_paging(Config) ->
ClientId = client_id(Config), ClientId = client_id(Config),
NFiles = 20, NFiles = 20,
Nodes = cluster(Config), Nodes = test_nodes(Config),
Uploads = [ Uploads = [
{mk_file_id("file:", N), mk_file_name(N), pick(N, Nodes)} {mk_file_id("file:", N), mk_file_name(N), pick(N, Nodes)}
|| N <- lists:seq(1, NFiles) || N <- lists:seq(1, NFiles)
@ -280,8 +288,13 @@ t_ft_disabled(_Config) ->
%% Helpers %% Helpers
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
cluster(Config) -> test_nodes(Config) ->
[node() | proplists:get_value(cluster_nodes, Config, [])]. case proplists:get_value(cluster_nodes, Config, []) of
[] ->
[node()];
Nodes ->
Nodes
end.
client_id(Config) -> client_id(Config) ->
iolist_to_binary(io_lib:format("~s.~s", [?config(group, Config), ?config(tc, Config)])). iolist_to_binary(io_lib:format("~s.~s", [?config(group, Config), ?config(tc, Config)])).

View File

@ -36,7 +36,7 @@ start_additional_node(Config, Name) ->
). ).
stop_additional_node(Node) -> stop_additional_node(Node) ->
ok = rpc:call(Node, ekka, leave, []), _ = rpc:call(Node, ekka, leave, []),
ok = rpc:call(Node, emqx_common_test_helpers, stop_apps, [[emqx_ft]]), ok = rpc:call(Node, emqx_common_test_helpers, stop_apps, [[emqx_ft]]),
ok = emqx_common_test_helpers:stop_slave(Node), ok = emqx_common_test_helpers:stop_slave(Node),
ok. ok.

View File

@ -261,7 +261,7 @@ common_listener_opts() ->
)}, )},
{bind, {bind,
sc( sc(
hoconsc:union([ip_port(), integer()]), ip_port(),
#{desc => ?DESC(gateway_common_listener_bind)} #{desc => ?DESC(gateway_common_listener_bind)}
)}, )},
{max_connections, {max_connections,

View File

@ -56,7 +56,7 @@ fields(exproto_grpc_server) ->
[ [
{bind, {bind,
sc( sc(
hoconsc:union([ip_port(), integer()]), ip_port(),
#{ #{
required => true, required => true,
desc => ?DESC(exproto_grpc_server_bind) desc => ?DESC(exproto_grpc_server_bind)

View File

@ -1,4 +1,4 @@
%%-------------------------------------------------------------------- %--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
@ -192,7 +192,7 @@ default_config(Overrides) ->
" xml_dir = \"~s\"\n" " xml_dir = \"~s\"\n"
" lifetime_min = 1s\n" " lifetime_min = 1s\n"
" lifetime_max = 86400s\n" " lifetime_max = 86400s\n"
" qmode_time_window = 22\n" " qmode_time_window = 22s\n"
" auto_observe = ~w\n" " auto_observe = ~w\n"
" mountpoint = \"lwm2m/${username}\"\n" " mountpoint = \"lwm2m/${username}\"\n"
" update_msg_publish_condition = contains_object_list\n" " update_msg_publish_condition = contains_object_list\n"

View File

@ -382,15 +382,17 @@ import_mnesia_tab(BackupDir, TabName, Opts) ->
end. end.
restore_mnesia_tab(BackupDir, MnesiaBackupFileName, TabName, Opts) -> restore_mnesia_tab(BackupDir, MnesiaBackupFileName, TabName, Opts) ->
BackupNameToImport = MnesiaBackupFileName ++ "_for_import", Validated =
Prepared =
catch mnesia:traverse_backup( catch mnesia:traverse_backup(
MnesiaBackupFileName, BackupNameToImport, fun backup_converter/2, 0 MnesiaBackupFileName, mnesia_backup, dummy, read_only, fun validate_mnesia_backup/2, 0
), ),
try try
case Prepared of case Validated of
{ok, _} -> {ok, _} ->
Restored = mnesia:restore(BackupNameToImport, [{default_op, keep_tables}]), %% As we use keep_tables option, we don't need to modify 'copies' (nodes)
%% in a backup file before restoring it, as `mnsia:restore/2` will ignore
%% backed-up schema and keep the current table schema unchanged
Restored = mnesia:restore(MnesiaBackupFileName, [{default_op, keep_tables}]),
case Restored of case Restored of
{atomic, [TabName]} -> {atomic, [TabName]} ->
ok; ok;
@ -416,30 +418,23 @@ restore_mnesia_tab(BackupDir, MnesiaBackupFileName, TabName, Opts) ->
end end
after after
%% Cleanup files as soon as they are not needed any more for more efficient disk usage %% Cleanup files as soon as they are not needed any more for more efficient disk usage
_ = file:delete(BackupNameToImport),
_ = file:delete(MnesiaBackupFileName) _ = file:delete(MnesiaBackupFileName)
end. end.
backup_converter({schema, Tab, CreateList}, Acc) -> %% NOTE: if backup file is valid, we keep traversing it, though we only need to validate schema.
check_rec_attributes(Tab, CreateList), %% Looks like there is no clean way to abort traversal without triggering any error reporting,
{[{schema, Tab, lists:map(fun convert_copies/1, CreateList)}], Acc}; %% `mnesia_bup:read_schema/2` is an option but its direct usage should also be avoided...
backup_converter(Other, Acc) -> validate_mnesia_backup({schema, Tab, CreateList} = Schema, Acc) ->
{[Other], Acc}.
check_rec_attributes(Tab, CreateList) ->
ImportAttributes = proplists:get_value(attributes, CreateList), ImportAttributes = proplists:get_value(attributes, CreateList),
Attributes = mnesia:table_info(Tab, attributes), Attributes = mnesia:table_info(Tab, attributes),
case ImportAttributes =/= Attributes of case ImportAttributes =/= Attributes of
true -> true ->
throw({error, different_table_schema}); throw({error, different_table_schema});
false -> false ->
ok {[Schema], Acc}
end. end;
validate_mnesia_backup(Other, Acc) ->
convert_copies({K, [_ | _]}) when K == ram_copies; K == disc_copies; K == disc_only_copies -> {[Other], Acc}.
{K, [node()]};
convert_copies(Other) ->
Other.
extract_backup(BackupFileName) -> extract_backup(BackupFileName) ->
BackupDir = root_backup_dir(), BackupDir = root_backup_dir(),

View File

@ -295,7 +295,6 @@ cluster(Specs) ->
{env, Env}, {env, Env},
{apps, [emqx_conf]}, {apps, [emqx_conf]},
{load_schema, false}, {load_schema, false},
{join_to, true},
{env_handler, fun {env_handler, fun
(emqx) -> (emqx) ->
application:set_env(emqx, boot_modules, []), application:set_env(emqx, boot_modules, []),

View File

@ -159,7 +159,6 @@ cluster(Specs) ->
{env, Env}, {env, Env},
{apps, [emqx_conf, emqx_management]}, {apps, [emqx_conf, emqx_management]},
{load_schema, false}, {load_schema, false},
{join_to, true},
{env_handler, fun {env_handler, fun
(emqx) -> (emqx) ->
application:set_env(emqx, boot_modules, []), application:set_env(emqx, boot_modules, []),

View File

@ -444,7 +444,6 @@ cluster(Config) ->
env => [{mria, db_backend, rlog}], env => [{mria, db_backend, rlog}],
load_schema => true, load_schema => true,
start_autocluster => true, start_autocluster => true,
join_to => true,
listener_ports => [], listener_ports => [],
conf => [{[dashboard, listeners, http, bind], 0}], conf => [{[dashboard, listeners, http, bind], 0}],
env_handler => env_handler =>

View File

@ -36,6 +36,7 @@ roots() ->
array("rewrite", #{ array("rewrite", #{
desc => "List of topic rewrite rules.", desc => "List of topic rewrite rules.",
importance => ?IMPORTANCE_HIDDEN, importance => ?IMPORTANCE_HIDDEN,
validator => fun rewrite_validator/1,
default => [] default => []
}), }),
array("topic_metrics", #{ array("topic_metrics", #{
@ -45,6 +46,37 @@ roots() ->
}) })
]. ].
rewrite_validator(Rules) ->
case
lists:foldl(
fun
(#{<<"action">> := subscribe}, Acc) ->
Acc;
(#{<<"dest_topic">> := DestTopic}, InvalidAcc) ->
try
true = emqx_topic:validate(name, DestTopic),
InvalidAcc
catch
_:_ ->
[DestTopic | InvalidAcc]
end
end,
[],
Rules
)
of
[] ->
ok;
InvalidTopics ->
{
error,
#{
msg => "cannot_use_wildcard_for_destination_topic",
invalid_topics => InvalidTopics
}
}
end.
fields("delayed") -> fields("delayed") ->
[ [
{enable, ?HOCON(boolean(), #{default => true, desc => ?DESC(enable)})}, {enable, ?HOCON(boolean(), #{default => true, desc => ?DESC(enable)})},

View File

@ -95,6 +95,52 @@ t_mqtt_topic_rewrite_limit(_) ->
) )
). ).
t_mqtt_topic_rewrite_wildcard(_) ->
BadRules = [
#{
<<"source_topic">> => <<"test/#">>,
<<"re">> => <<"^test/(.+)$">>,
<<"dest_topic">> => <<"bad/test/#">>
},
#{
<<"source_topic">> => <<"test/#">>,
<<"re">> => <<"^test/(.+)$">>,
<<"dest_topic">> => <<"bad/#/test">>
},
#{
<<"source_topic">> => <<"test/#">>,
<<"re">> => <<"^test/(.+)$">>,
<<"dest_topic">> => <<"bad/test/+">>
},
#{
<<"source_topic">> => <<"test/#">>,
<<"re">> => <<"^test/(.+)$">>,
<<"dest_topic">> => <<"bad/+/test">>
}
],
Rules = lists:flatten(
lists:map(
fun(Rule) ->
[Rule#{<<"action">> => <<"publish">>}, Rule#{<<"action">> => <<"all">>}]
end,
BadRules
)
),
lists:foreach(
fun(Rule) ->
?assertMatch(
{ok, 500, _},
request(
put,
uri(["mqtt", "topic_rewrite"]),
[Rule]
)
)
end,
Rules
).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Helpers %% Helpers
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -468,7 +468,7 @@ fields(rebalance_evacuation_start) ->
)}, )},
{"wait_takeover", {"wait_takeover",
mk( mk(
pos_integer(), emqx_schema:timeout_duration_s(),
#{ #{
desc => ?DESC(wait_takeover), desc => ?DESC(wait_takeover),
required => false required => false
@ -709,24 +709,24 @@ fields(global_status) ->
rebalance_example() -> rebalance_example() ->
#{ #{
wait_health_check => 10, wait_health_check => <<"10s">>,
conn_evict_rate => 10, conn_evict_rate => 10,
sess_evict_rate => 20, sess_evict_rate => 20,
abs_conn_threshold => 10, abs_conn_threshold => 10,
rel_conn_threshold => 1.5, rel_conn_threshold => 1.5,
abs_sess_threshold => 10, abs_sess_threshold => 10,
rel_sess_threshold => 1.5, rel_sess_threshold => 1.5,
wait_takeover => 10, wait_takeover => <<"10s">>,
nodes => [<<"othernode@127.0.0.1">>] nodes => [<<"othernode@127.0.0.1">>]
}. }.
rebalance_evacuation_example() -> rebalance_evacuation_example() ->
#{ #{
wait_health_check => 10, wait_health_check => <<"10s">>,
conn_evict_rate => 100, conn_evict_rate => 100,
sess_evict_rate => 100, sess_evict_rate => 100,
redirect_to => <<"othernode:1883">>, redirect_to => <<"othernode:1883">>,
wait_takeover => 10, wait_takeover => <<"10s">>,
migrate_to => [<<"othernode@127.0.0.1">>] migrate_to => [<<"othernode@127.0.0.1">>]
}. }.

View File

@ -67,7 +67,6 @@ t_start_evacuation_validation(Config) ->
BadOpts = [ BadOpts = [
#{conn_evict_rate => <<"conn">>}, #{conn_evict_rate => <<"conn">>},
#{sess_evict_rate => <<"sess">>}, #{sess_evict_rate => <<"sess">>},
#{redirect_to => 123},
#{wait_takeover => <<"wait">>}, #{wait_takeover => <<"wait">>},
#{wait_health_check => <<"wait">>}, #{wait_health_check => <<"wait">>},
#{migrate_to => []}, #{migrate_to => []},
@ -83,7 +82,8 @@ t_start_evacuation_validation(Config) ->
api_post( api_post(
["load_rebalance", atom_to_list(DonorNode), "evacuation", "start"], ["load_rebalance", atom_to_list(DonorNode), "evacuation", "start"],
Opts Opts
) ),
Opts
) )
end, end,
BadOpts BadOpts
@ -103,8 +103,8 @@ t_start_evacuation_validation(Config) ->
#{ #{
conn_evict_rate => 10, conn_evict_rate => 10,
sess_evict_rate => 10, sess_evict_rate => 10,
wait_takeover => 10, wait_takeover => <<"10s">>,
wait_health_check => 10, wait_health_check => <<"10s">>,
redirect_to => <<"srv">>, redirect_to => <<"srv">>,
migrate_to => [atom_to_binary(RecipientNode)] migrate_to => [atom_to_binary(RecipientNode)]
} }
@ -166,8 +166,8 @@ t_start_rebalance_validation(Config) ->
#{ #{
conn_evict_rate => 10, conn_evict_rate => 10,
sess_evict_rate => 10, sess_evict_rate => 10,
wait_takeover => 10, wait_takeover => <<"10s">>,
wait_health_check => 10, wait_health_check => <<"10s">>,
abs_conn_threshold => 10, abs_conn_threshold => 10,
rel_conn_threshold => 1.001, rel_conn_threshold => 1.001,
abs_sess_threshold => 10, abs_sess_threshold => 10,

View File

@ -223,8 +223,7 @@ restart(ResId, Opts) when is_binary(ResId) ->
start(ResId, Opts) -> start(ResId, Opts) ->
case safe_call(ResId, start, ?T_OPERATION) of case safe_call(ResId, start, ?T_OPERATION) of
ok -> ok ->
_ = wait_for_ready(ResId, maps:get(start_timeout, Opts, 5000)), wait_for_ready(ResId, maps:get(start_timeout, Opts, 5000));
ok;
{error, _Reason} = Error -> {error, _Reason} = Error ->
Error Error
end. end.

View File

@ -88,7 +88,7 @@ resource_opts_meta() ->
desc => ?DESC(<<"resource_opts">>) desc => ?DESC(<<"resource_opts">>)
}. }.
worker_pool_size(type) -> non_neg_integer(); worker_pool_size(type) -> range(1, 1024);
worker_pool_size(desc) -> ?DESC("worker_pool_size"); worker_pool_size(desc) -> ?DESC("worker_pool_size");
worker_pool_size(default) -> ?WORKER_POOL_SIZE; worker_pool_size(default) -> ?WORKER_POOL_SIZE;
worker_pool_size(required) -> false; worker_pool_size(required) -> false;

View File

@ -246,6 +246,9 @@ batch_big_payload({async, ReplyFunAndArgs}, InstId, Batch, State = #{pid := Pid}
on_get_status(_InstId, #{health_check_error := true}) -> on_get_status(_InstId, #{health_check_error := true}) ->
?tp(connector_demo_health_check_error, #{}), ?tp(connector_demo_health_check_error, #{}),
disconnected; disconnected;
on_get_status(_InstId, State = #{health_check_error := {msg, Message}}) ->
?tp(connector_demo_health_check_error, #{}),
{disconnected, State, Message};
on_get_status(_InstId, #{pid := Pid}) -> on_get_status(_InstId, #{pid := Pid}) ->
timer:sleep(300), timer:sleep(300),
case is_process_alive(Pid) of case is_process_alive(Pid) of

View File

@ -40,6 +40,7 @@ groups() ->
init_per_testcase(_, Config) -> init_per_testcase(_, Config) ->
ct:timetrap({seconds, 30}), ct:timetrap({seconds, 30}),
emqx_connector_demo:set_callback_mode(always_sync), emqx_connector_demo:set_callback_mode(always_sync),
snabbkaffe:start_trace(),
Config. Config.
end_per_testcase(_, _Config) -> end_per_testcase(_, _Config) ->
@ -1145,10 +1146,33 @@ t_auto_retry(_) ->
?DEFAULT_RESOURCE_GROUP, ?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE, ?TEST_RESOURCE,
#{name => test_resource, create_error => true}, #{name => test_resource, create_error => true},
#{auto_retry_interval => 100} #{health_check_interval => 100}
), ),
?assertEqual(ok, Res). ?assertEqual(ok, Res).
%% tests resources that have an asynchronous start: they are created
%% without problems, but later some issue is found when calling the
%% health check.
t_start_throw_error(_Config) ->
Message = "something went wrong",
?assertMatch(
{{ok, _}, {ok, _}},
?wait_async_action(
emqx_resource:create_local(
?ID,
?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE,
#{name => test_resource, health_check_error => {msg, Message}},
#{health_check_interval => 100}
),
#{?snk_kind := connector_demo_health_check_error},
1_000
)
),
%% Now, if we try to "reconnect" (restart) it, we should get the error
?assertMatch({error, Message}, emqx_resource:start(?ID, _Opts = #{})),
ok.
t_health_check_disconnected(_) -> t_health_check_disconnected(_) ->
?check_trace( ?check_trace(
begin begin
@ -1157,7 +1181,7 @@ t_health_check_disconnected(_) ->
?DEFAULT_RESOURCE_GROUP, ?DEFAULT_RESOURCE_GROUP,
?TEST_RESOURCE, ?TEST_RESOURCE,
#{name => test_resource, create_error => true}, #{name => test_resource, create_error => true},
#{auto_retry_interval => 100} #{health_check_interval => 100}
), ),
?assertEqual( ?assertEqual(
{ok, disconnected}, {ok, disconnected},

View File

@ -74,6 +74,44 @@ health_check_interval_validator_test_() ->
) )
]. ].
worker_pool_size_test_() ->
BaseConf = parse(webhook_bridge_health_check_hocon(<<"15s">>)),
Check = fun(WorkerPoolSize) ->
Conf = emqx_utils_maps:deep_put(
[
<<"bridges">>,
<<"webhook">>,
<<"simple">>,
<<"resource_opts">>,
<<"worker_pool_size">>
],
BaseConf,
WorkerPoolSize
),
#{<<"bridges">> := #{<<"webhook">> := #{<<"simple">> := CheckedConf}}} = check(Conf),
#{<<"resource_opts">> := #{<<"worker_pool_size">> := WPS}} = CheckedConf,
WPS
end,
AssertThrow = fun(WorkerPoolSize) ->
?assertThrow(
{_, [
#{
kind := validation_error,
reason := #{expected_type := _},
value := WorkerPoolSize
}
]},
Check(WorkerPoolSize)
)
end,
[
?_assertEqual(1, Check(1)),
?_assertEqual(100, Check(100)),
?_assertEqual(1024, Check(1024)),
?_test(AssertThrow(0)),
?_test(AssertThrow(1025))
].
%%=========================================================================== %%===========================================================================
%% Helper functions %% Helper functions
%%=========================================================================== %%===========================================================================

View File

@ -30,7 +30,7 @@
{profiles, [ {profiles, [
{test, [ {test, [
{deps, [ {deps, [
{emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.5"}}} {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.6"}}}
]} ]}
]} ]}
]}. ]}.

View File

@ -483,26 +483,25 @@ t_clear_expired(_) ->
with_conf(ConfMod, Case). with_conf(ConfMod, Case).
t_max_payload_size(_) -> t_max_payload_size(_) ->
ConfMod = fun(Conf) -> Conf#{<<"max_payload_size">> := 6} end, ConfMod = fun(Conf) -> Conf#{<<"max_payload_size">> := <<"1kb">>} end,
Case = fun() -> Case = fun() ->
emqx_retainer:clean(), emqx_retainer:clean(),
timer:sleep(500), timer:sleep(500),
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1), {ok, _} = emqtt:connect(C1),
Payload = iolist_to_binary(lists:duplicate(1024, <<"0">>)),
emqtt:publish( emqtt:publish(
C1, C1,
<<"retained/1">>, <<"retained/1">>,
#{}, #{},
<<"1234">>, Payload,
[{qos, 0}, {retain, true}] [{qos, 0}, {retain, true}]
), ),
emqtt:publish( emqtt:publish(
C1, C1,
<<"retained/2">>, <<"retained/2">>,
#{}, #{},
<<"1234567">>, <<"1", Payload/binary>>,
[{qos, 0}, {retain, true}] [{qos, 0}, {retain, true}]
), ),

View File

@ -70,7 +70,8 @@
Op =:= '-' orelse Op =:= '-' orelse
Op =:= '*' orelse Op =:= '*' orelse
Op =:= '/' orelse Op =:= '/' orelse
Op =:= 'div') Op =:= 'div' orelse
Op =:= 'mod')
). ).
%% Compare operators %% Compare operators

View File

@ -66,6 +66,7 @@ groups() ->
t_sqlselect_with_3rd_party_impl2, t_sqlselect_with_3rd_party_impl2,
t_sqlselect_with_3rd_party_funcs_unknown, t_sqlselect_with_3rd_party_funcs_unknown,
t_sqlselect_001, t_sqlselect_001,
t_sqlselect_002,
t_sqlselect_inject_props, t_sqlselect_inject_props,
t_sqlselect_01, t_sqlselect_01,
t_sqlselect_02, t_sqlselect_02,
@ -1089,6 +1090,36 @@ t_sqlselect_001(_Config) ->
) )
). ).
t_sqlselect_002(_Config) ->
%% Verify that the div and mod can be used both as infix operations and as
%% function calls
Sql =
""
"select 2 mod 2 as mod1,\n"
" mod(3, 2) as mod2,\n"
" 4 div 2 as div1,\n"
" div(7, 2) as div2\n"
" from \"t/#\" "
"",
?assertMatch(
{ok, #{
<<"mod1">> := 0,
<<"mod2">> := 1,
<<"div1">> := 2,
<<"div2">> := 3
}},
emqx_rule_sqltester:test(
#{
sql => Sql,
context =>
#{
payload => #{<<"what">> => 4},
topic => <<"t/a">>
}
}
)
).
t_sqlselect_inject_props(_Config) -> t_sqlselect_inject_props(_Config) ->
SQL = SQL =
"SELECT json_decode(payload) as p, payload, " "SELECT json_decode(payload) as p, payload, "

View File

@ -1,6 +1,6 @@
{deps, [ {deps, [
{emqx, {path, "../../apps/emqx"}}, {emqx, {path, "../../apps/emqx"}},
{erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.6.8-emqx-1"}}} {erlcloud, {git, "https://github.com/emqx/erlcloud", {tag, "3.7.0-emqx-1"}}}
]}. ]}.
{project_plugins, [erlfmt]}. {project_plugins, [erlfmt]}.

View File

@ -84,7 +84,7 @@ t_full_config(_Config) ->
<<"min_part_size">> => <<"10mb">>, <<"min_part_size">> => <<"10mb">>,
<<"acl">> => <<"public_read">>, <<"acl">> => <<"public_read">>,
<<"transport_options">> => #{ <<"transport_options">> => #{
<<"connect_timeout">> => 30000, <<"connect_timeout">> => <<"30s">>,
<<"enable_pipelining">> => 200, <<"enable_pipelining">> => 200,
<<"pool_size">> => 10, <<"pool_size">> => 10,
<<"pool_type">> => <<"random">>, <<"pool_type">> => <<"random">>,

View File

@ -64,10 +64,10 @@ base_raw_config(tcp) ->
<<"secret_access_key">> => bin(?SECRET_ACCESS_KEY), <<"secret_access_key">> => bin(?SECRET_ACCESS_KEY),
<<"host">> => ?TCP_HOST, <<"host">> => ?TCP_HOST,
<<"port">> => ?TCP_PORT, <<"port">> => ?TCP_PORT,
<<"max_part_size">> => 10 * 1024 * 1024, <<"max_part_size">> => <<"10MB">>,
<<"transport_options">> => <<"transport_options">> =>
#{ #{
<<"request_timeout">> => 2000 <<"request_timeout">> => <<"2s">>
} }
}; };
base_raw_config(tls) -> base_raw_config(tls) ->
@ -77,10 +77,10 @@ base_raw_config(tls) ->
<<"secret_access_key">> => bin(?SECRET_ACCESS_KEY), <<"secret_access_key">> => bin(?SECRET_ACCESS_KEY),
<<"host">> => ?TLS_HOST, <<"host">> => ?TLS_HOST,
<<"port">> => ?TLS_PORT, <<"port">> => ?TLS_PORT,
<<"max_part_size">> => 10 * 1024 * 1024, <<"max_part_size">> => <<"10MB">>,
<<"transport_options">> => <<"transport_options">> =>
#{ #{
<<"request_timeout">> => 2000, <<"request_timeout">> => <<"2s">>,
<<"ssl">> => #{ <<"ssl">> => #{
<<"enable">> => true, <<"enable">> => true,
<<"cacertfile">> => bin(cert_path("ca.crt")), <<"cacertfile">> => bin(cert_path("ca.crt")),

View File

@ -139,7 +139,7 @@ settings(get, _) ->
{200, emqx:get_raw_config([slow_subs], #{})}; {200, emqx:get_raw_config([slow_subs], #{})};
settings(put, #{body := Body}) -> settings(put, #{body := Body}) ->
case emqx_slow_subs:update_settings(Body) of case emqx_slow_subs:update_settings(Body) of
{ok, #{config := NewConf}} -> {ok, #{raw_config := NewConf}} ->
{200, NewConf}; {200, NewConf};
{error, Reason} -> {error, Reason} ->
Message = list_to_binary(io_lib:format("Update slow subs config failed ~p", [Reason])), Message = list_to_binary(io_lib:format("Update slow subs config failed ~p", [Reason])),

View File

@ -41,7 +41,7 @@
"{\n" "{\n"
" enable = true\n" " enable = true\n"
" top_k_num = 5,\n" " top_k_num = 5,\n"
" expire_interval = 60000\n" " expire_interval = 60s\n"
" stats_type = whole\n" " stats_type = whole\n"
"}" "}"
"" ""
@ -137,36 +137,33 @@ t_clear(_) ->
?assertEqual(0, ets:info(?TOPK_TAB, size)). ?assertEqual(0, ets:info(?TOPK_TAB, size)).
t_settting(_) -> t_settting(_) ->
Conf = emqx:get_config([slow_subs]), RawConf = emqx:get_raw_config([slow_subs]),
Conf2 = Conf#{stats_type => internal}, RawConf2 = RawConf#{<<"stats_type">> => <<"internal">>},
{ok, Data} = request_api( {ok, Data} = request_api(
put, put,
api_path(["slow_subscriptions", "settings"]), api_path(["slow_subscriptions", "settings"]),
[], [],
auth_header_(), auth_header_(),
Conf2 RawConf2
), ),
Return = decode_json(Data), Return = decode_json(Data),
Expect = emqx_config:fill_defaults(RawConf2),
?assertEqual(Conf2#{stats_type := <<"internal">>}, Return), ?assertEqual(Expect, Return),
timer:sleep(800),
{ok, GetData} = request_api( {ok, GetData} = request_api(
get, get,
api_path(["slow_subscriptions", "settings"]), api_path(["slow_subscriptions", "settings"]),
[], [],
auth_header_() auth_header_()
), ),
timer:sleep(1000),
GetReturn = decode_json(GetData), GetReturn = decode_json(GetData),
?assertEqual(Expect, GetReturn).
?assertEqual(Conf2#{stats_type := <<"internal">>}, GetReturn).
decode_json(Data) -> decode_json(Data) ->
BinJosn = emqx_utils_json:decode(Data, [return_maps]), emqx_utils_json:decode(Data, [return_maps]).
emqx_utils_maps:unsafe_atom_key_map(BinJosn).
request_api(Method, Url, Auth) -> request_api(Method, Url, Auth) ->
request_api(Method, Url, [], Auth, []). request_api(Method, Url, [], Auth, []).

View File

@ -20,6 +20,8 @@
%% [TODO] Cleanup so the instruction below is not necessary. %% [TODO] Cleanup so the instruction below is not necessary.
-elvis([{elvis_style, god_modules, disable}]). -elvis([{elvis_style, god_modules, disable}]).
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-export([ -export([
merge_opts/2, merge_opts/2,
maybe_apply/2, maybe_apply/2,
@ -432,7 +434,7 @@ nolink_apply(Fun) -> nolink_apply(Fun, infinity).
-spec nolink_apply(function(), timer:timeout()) -> term(). -spec nolink_apply(function(), timer:timeout()) -> term().
nolink_apply(Fun, Timeout) when is_function(Fun, 0) -> nolink_apply(Fun, Timeout) when is_function(Fun, 0) ->
Caller = self(), Caller = self(),
ResRef = make_ref(), ResRef = alias([reply]),
Middleman = erlang:spawn( Middleman = erlang:spawn(
fun() -> fun() ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
@ -446,7 +448,8 @@ nolink_apply(Fun, Timeout) when is_function(Fun, 0) ->
C:E:S -> C:E:S ->
{exception, {C, E, S}} {exception, {C, E, S}}
end, end,
_ = erlang:send(Caller, {ResRef, Res}), _ = erlang:send(ResRef, {ResRef, Res}),
?tp(pmap_middleman_sent_response, #{}),
exit(normal) exit(normal)
end end
), ),
@ -460,7 +463,7 @@ nolink_apply(Fun, Timeout) when is_function(Fun, 0) ->
exit(normal); exit(normal);
{'EXIT', Worker, Reason} -> {'EXIT', Worker, Reason} ->
%% worker exited with some reason other than 'normal' %% worker exited with some reason other than 'normal'
_ = erlang:send(Caller, {ResRef, {'EXIT', Reason}}), _ = erlang:send(ResRef, {ResRef, {'EXIT', Reason}}),
exit(normal) exit(normal)
end end
end end
@ -473,8 +476,21 @@ nolink_apply(Fun, Timeout) when is_function(Fun, 0) ->
{ResRef, {'EXIT', Reason}} -> {ResRef, {'EXIT', Reason}} ->
exit(Reason) exit(Reason)
after Timeout -> after Timeout ->
%% possible race condition: a message was received just as we enter the after
%% block.
?tp(pmap_timeout, #{}),
unalias(ResRef),
exit(Middleman, kill), exit(Middleman, kill),
exit(timeout) receive
{ResRef, {normal, Result}} ->
Result;
{ResRef, {exception, {C, E, S}}} ->
erlang:raise(C, E, S);
{ResRef, {'EXIT', Reason}} ->
exit(Reason)
after 0 ->
exit(timeout)
end
end. end.
safe_to_existing_atom(In) -> safe_to_existing_atom(In) ->

View File

@ -20,6 +20,7 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(SOCKOPTS, [ -define(SOCKOPTS, [
binary, binary,
@ -208,3 +209,34 @@ t_pmap_exception(_) ->
[{2, 3}, {3, 4}, error] [{2, 3}, {3, 4}, error]
) )
). ).
t_pmap_late_reply(_) ->
?check_trace(
begin
?force_ordering(
#{?snk_kind := pmap_middleman_sent_response},
#{?snk_kind := pmap_timeout}
),
Timeout = 100,
Res =
catch emqx_utils:pmap(
fun(_) ->
process_flag(trap_exit, true),
timer:sleep(3 * Timeout),
done
end,
[1, 2, 3],
Timeout
),
receive
{Ref, LateReply} when is_reference(Ref) ->
ct:fail("should not receive late reply: ~p", [LateReply])
after (5 * Timeout) ->
ok
end,
?assertMatch([done, done, done], Res),
ok
end,
[]
),
ok.

View File

@ -0,0 +1 @@
Do not allow wildcards for destination topic in rewrite rules.

View File

@ -0,0 +1 @@
Addressed an inconsistency in the usage of 'div' and 'mod' operations within the rule engine. Previously, the 'div' operation was only usable as an infix operation and 'mod' could only be applied through a function call. With this change, both 'div' and 'mod' can be used via function call syntax and infix syntax.

View File

@ -0,0 +1 @@
When starting an HTTP connector EMQX now returns a descriptive error in case the system is unable to connect to the remote target system.

View File

@ -0,0 +1 @@
Fix to adhere to Protocol spec MQTT-5.0 [MQTT-3.8.3-4].

View File

@ -0,0 +1 @@
Fixed an issue where connection errors in Kafka Producer would not be reported when reconnecting the bridge.

View File

@ -0,0 +1 @@
Updated `erlcloud` dependency.

View File

@ -0,0 +1,3 @@
Added a validation for the maximum number of pool workers of a bridge.
Now the maximum amount is 1024 to avoid large memory consumption from an unreasonable number of workers.

View File

@ -0,0 +1 @@
Upgraded emqtt dependency to avoid sensitive data leakage in the debug log.

View File

@ -23,8 +23,12 @@ AutoReq: 0
%if "%{_arch} %{?rhel}" == "x86_64 7" %if "%{_arch} %{?rhel}" == "x86_64 7"
Requires: openssl11 libatomic procps which findutils Requires: openssl11 libatomic procps which findutils
%else %else
%if "%{?dist}" == ".amzn2023"
Requires: libatomic procps which findutils ncurses util-linux shadow-utils
%else
Requires: libatomic procps which findutils Requires: libatomic procps which findutils
%endif %endif
%endif
%description %description
EMQX, a distributed, massively scalable, highly extensible MQTT message broker. EMQX, a distributed, massively scalable, highly extensible MQTT message broker.

4
dev
View File

@ -158,7 +158,7 @@ export EMQX_LOG_DIR="$BASE_DIR/log"
CONFIGS_DIR="$EMQX_DATA_DIR/configs" CONFIGS_DIR="$EMQX_DATA_DIR/configs"
# Use your cookie so your IDE can connect to it. # Use your cookie so your IDE can connect to it.
COOKIE="${EMQX_NODE__COOKIE:-${EMQX_NODE_COOKIE:-$(cat ~/.erlang.cookie || echo 'emqxsecretcookie')}}" COOKIE="${EMQX_NODE__COOKIE:-${EMQX_NODE_COOKIE:-$(cat ~/.erlang.cookie || echo 'emqxsecretcookie')}}"
mkdir -p "$EMQX_ETC_DIR" "$EMQX_DATA_DIR/patches" "$EMQX_LOG_DIR" "$CONFIGS_DIR" mkdir -p "$EMQX_ETC_DIR" "$EMQX_DATA_DIR/patches" "$EMQX_DATA_DIR/certs" "$EMQX_LOG_DIR" "$CONFIGS_DIR"
if [ $EKKA_EPMD -eq 1 ]; then if [ $EKKA_EPMD -eq 1 ]; then
EPMD_ARGS='-start_epmd false -epmd_module ekka_epmd' EPMD_ARGS='-start_epmd false -epmd_module ekka_epmd'
else else
@ -290,7 +290,7 @@ append_args_file() {
+IOt 4 +IOt 4
+SDio 8 +SDio 8
-shutdown_time 30000 -shutdown_time 30000
-pa '"$EMQX_DATA_DIR/patches"' -pa '$EMQX_DATA_DIR/patches'
-mnesia dump_log_write_threshold 5000 -mnesia dump_log_write_threshold 5000
-mnesia dump_log_time_threshold 60000 -mnesia dump_log_time_threshold 60000
-os_mon start_disksup false -os_mon start_disksup false

15
mix.exs
View File

@ -64,15 +64,15 @@ defmodule EMQXUmbrella.MixProject do
{:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true}, {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true},
# maybe forbid to fetch quicer # maybe forbid to fetch quicer
{:emqtt, {:emqtt,
github: "emqx/emqtt", tag: "1.8.5", override: true, system_env: maybe_no_quic_env()}, github: "emqx/emqtt", tag: "1.8.6", override: true, system_env: maybe_no_quic_env()},
{:rulesql, github: "emqx/rulesql", tag: "0.1.6"}, {:rulesql, github: "emqx/rulesql", tag: "0.1.7"},
{:observer_cli, "1.7.1"}, {:observer_cli, "1.7.1"},
{:system_monitor, github: "ieQu1/system_monitor", tag: "3.0.3"}, {:system_monitor, github: "ieQu1/system_monitor", tag: "3.0.3"},
{:telemetry, "1.1.0"}, {:telemetry, "1.1.0"},
# in conflict by emqtt and hocon # in conflict by emqtt and hocon
{:getopt, "1.0.2", override: true}, {:getopt, "1.0.2", override: true},
{:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.8", override: true}, {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.8", override: true},
{:hocon, github: "emqx/hocon", tag: "0.39.8", override: true}, {:hocon, github: "emqx/hocon", tag: "0.39.9", override: true},
{:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true}, {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true},
{:esasl, github: "emqx/esasl", tag: "0.2.0"}, {:esasl, github: "emqx/esasl", tag: "0.2.0"},
{:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"}, {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"},
@ -216,14 +216,7 @@ defmodule EMQXUmbrella.MixProject do
github: "emqx/rabbitmq-server", github: "emqx/rabbitmq-server",
tag: "v3.11.13-emqx", tag: "v3.11.13-emqx",
sparse: "deps/amqp_client", sparse: "deps/amqp_client",
override: true}, override: true}
{:erlcloud, github: "emqx/erlcloud", tag: "3.6.8-emqx-1", override: true},
# erlcloud's rebar.config requires rebar3 and does not support Mix,
# so it tries to fetch deps from git. We need to override this.
{:lhttpc, github: "erlcloud/lhttpc", tag: "1.6.2", override: true},
{:eini, "1.2.9", override: true},
{:base16, "1.0.0", override: true}
# end of erlcloud's deps
] ]
end end

View File

@ -69,13 +69,13 @@
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.4"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.4"}}}
, {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}} , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.5"}}} , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.6"}}}
, {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.6"}}} , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.7"}}}
, {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x , {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x
, {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}} , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}}
, {getopt, "1.0.2"} , {getopt, "1.0.2"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.8"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.9"}}}
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}} , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}
, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
, {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}} , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}