commit
1ea2fbc949
|
@ -1,3 +1,13 @@
|
||||||
|
# 5.0.4
|
||||||
|
|
||||||
|
## Bug fixes
|
||||||
|
|
||||||
|
* The `data/configs/cluster-override.conf` is cleared to 0KB if `hocon_pp:do/2` failed [commits/71f64251](https://github.com/emqx/emqx/pull/8443/commits/71f642518a683cc91a32fd542aafaac6ef915720)
|
||||||
|
* Improve the health_check for webhooks. [commits/6b45d2ea](https://github.com/emqx/emqx/commit/6b45d2ea9fde6d3b4a5b007f7a8c5a1c573d141e)
|
||||||
|
Prior to this change, the webhook only checks the connectivity of the TCP port using `gen_tcp:connect/2`, so
|
||||||
|
if it's a HTTPs server, we didn't check if TLS handshake was successful.
|
||||||
|
* The `create_at` field of rules is missing after emqx restarts. [commits/5fc09e6b](https://github.com/emqx/emqx/commit/5fc09e6b950c340243d7be627a0ce1700691221c)
|
||||||
|
|
||||||
# 5.0.3
|
# 5.0.3
|
||||||
|
|
||||||
## Bug fixes
|
## Bug fixes
|
||||||
|
|
|
@ -3,7 +3,7 @@
|
||||||
{id, "emqx"},
|
{id, "emqx"},
|
||||||
{description, "EMQX Core"},
|
{description, "EMQX Core"},
|
||||||
% strict semver, bump manually!
|
% strict semver, bump manually!
|
||||||
{vsn, "5.0.3"},
|
{vsn, "5.0.4"},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [
|
{applications, [
|
||||||
|
|
|
@ -508,14 +508,15 @@ get_schema_mod(RootName) ->
|
||||||
get_root_names() ->
|
get_root_names() ->
|
||||||
maps:get(names, persistent_term:get(?PERSIS_SCHEMA_MODS, #{names => []})).
|
maps:get(names, persistent_term:get(?PERSIS_SCHEMA_MODS, #{names => []})).
|
||||||
|
|
||||||
-spec save_configs(app_envs(), config(), raw_config(), raw_config(), update_opts()) ->
|
-spec save_configs(app_envs(), config(), raw_config(), raw_config(), update_opts()) -> ok.
|
||||||
ok | {error, term()}.
|
|
||||||
save_configs(_AppEnvs, Conf, RawConf, OverrideConf, Opts) ->
|
save_configs(_AppEnvs, Conf, RawConf, OverrideConf, Opts) ->
|
||||||
|
%% We first try to save to override.conf, because saving to files is more error prone
|
||||||
|
%% than saving into memory.
|
||||||
|
ok = save_to_override_conf(OverrideConf, Opts),
|
||||||
%% We may need also support hot config update for the apps that use application envs.
|
%% We may need also support hot config update for the apps that use application envs.
|
||||||
%% If that is the case uncomment the following line to update the configs to app env
|
%% If that is the case uncomment the following line to update the configs to app env
|
||||||
%save_to_app_env(AppEnvs),
|
%save_to_app_env(_AppEnvs),
|
||||||
save_to_config_map(Conf, RawConf),
|
save_to_config_map(Conf, RawConf).
|
||||||
save_to_override_conf(OverrideConf, Opts).
|
|
||||||
|
|
||||||
-spec save_to_app_env([tuple()]) -> ok.
|
-spec save_to_app_env([tuple()]) -> ok.
|
||||||
save_to_app_env(AppEnvs) ->
|
save_to_app_env(AppEnvs) ->
|
||||||
|
|
|
@ -278,24 +278,11 @@ check_and_save_configs(
|
||||||
case do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, #{}) of
|
case do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, #{}) of
|
||||||
{ok, Result0} ->
|
{ok, Result0} ->
|
||||||
remove_from_local_if_cluster_change(ConfKeyPath, Opts),
|
remove_from_local_if_cluster_change(ConfKeyPath, Opts),
|
||||||
case
|
ok = emqx_config:save_configs(AppEnvs, NewConf, NewRawConf, OverrideConf, Opts),
|
||||||
save_configs(
|
Result1 = return_change_result(ConfKeyPath, UpdateArgs),
|
||||||
ConfKeyPath,
|
|
||||||
AppEnvs,
|
|
||||||
NewConf,
|
|
||||||
NewRawConf,
|
|
||||||
OverrideConf,
|
|
||||||
UpdateArgs,
|
|
||||||
Opts
|
|
||||||
)
|
|
||||||
of
|
|
||||||
{ok, Result1} ->
|
|
||||||
{ok, Result1#{post_config_update => Result0}};
|
{ok, Result1#{post_config_update => Result0}};
|
||||||
Error ->
|
Error ->
|
||||||
Error
|
Error
|
||||||
end;
|
|
||||||
Error ->
|
|
||||||
Error
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, Result) ->
|
do_post_config_update(ConfKeyPath, Handlers, OldConf, NewConf, AppEnvs, UpdateArgs, Result) ->
|
||||||
|
@ -432,12 +419,6 @@ call_post_config_update(
|
||||||
) ->
|
) ->
|
||||||
{ok, Result}.
|
{ok, Result}.
|
||||||
|
|
||||||
save_configs(ConfKeyPath, AppEnvs, CheckedConf, NewRawConf, OverrideConf, UpdateArgs, Opts) ->
|
|
||||||
case emqx_config:save_configs(AppEnvs, CheckedConf, NewRawConf, OverrideConf, Opts) of
|
|
||||||
ok -> {ok, return_change_result(ConfKeyPath, UpdateArgs)};
|
|
||||||
{error, Reason} -> {error, {save_configs, Reason}}
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% The default callback of config handlers
|
%% The default callback of config handlers
|
||||||
%% the behaviour is overwriting the old config if:
|
%% the behaviour is overwriting the old config if:
|
||||||
%% 1. the old config is undefined
|
%% 1. the old config is undefined
|
||||||
|
@ -452,8 +433,9 @@ merge_to_old_config(UpdateReq, _RawConf) ->
|
||||||
%% local-override.conf priority is higher than cluster-override.conf
|
%% local-override.conf priority is higher than cluster-override.conf
|
||||||
%% If we want cluster to take effect, we must remove the local.
|
%% If we want cluster to take effect, we must remove the local.
|
||||||
remove_from_local_if_cluster_change(BinKeyPath, #{override_to := cluster} = Opts) ->
|
remove_from_local_if_cluster_change(BinKeyPath, #{override_to := cluster} = Opts) ->
|
||||||
Local = remove_from_override_config(BinKeyPath, Opts#{override_to => local}),
|
Opts1 = Opts#{override_to => local},
|
||||||
_ = emqx_config:save_to_override_conf(Local, Opts),
|
Local = remove_from_override_config(BinKeyPath, Opts1),
|
||||||
|
_ = emqx_config:save_to_override_conf(Local, Opts1),
|
||||||
ok;
|
ok;
|
||||||
remove_from_local_if_cluster_change(_BinKeyPath, _Opts) ->
|
remove_from_local_if_cluster_change(_BinKeyPath, _Opts) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{application, emqx_authn, [
|
{application, emqx_authn, [
|
||||||
{description, "EMQX Authentication"},
|
{description, "EMQX Authentication"},
|
||||||
{vsn, "0.1.2"},
|
{vsn, "0.1.3"},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, [emqx_authn_sup, emqx_authn_registry]},
|
{registered, [emqx_authn_sup, emqx_authn_registry]},
|
||||||
{applications, [kernel, stdlib, emqx_resource, ehttpc, epgsql, mysql, jose]},
|
{applications, [kernel, stdlib, emqx_resource, ehttpc, epgsql, mysql, jose]},
|
||||||
|
|
|
@ -71,15 +71,17 @@ on_query(_InstId, {update, Opts}, AfterQuery, #{pool_name := PoolName}) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
on_get_status(_InstId, #{pool_name := PoolName}) ->
|
on_get_status(_InstId, #{pool_name := PoolName}) ->
|
||||||
emqx_plugin_libs_pool:get_status(
|
Func =
|
||||||
PoolName,
|
fun(Conn) ->
|
||||||
fun(Pid) ->
|
case emqx_authn_jwks_client:get_jwks(Conn) of
|
||||||
case emqx_authn_jwks_client:get_jwks(Pid) of
|
|
||||||
{ok, _} -> true;
|
{ok, _} -> true;
|
||||||
_ -> false
|
_ -> false
|
||||||
end
|
end
|
||||||
end
|
end,
|
||||||
).
|
case emqx_plugin_libs_pool:health_check_ecpool_workers(PoolName, Func) of
|
||||||
|
true -> connected;
|
||||||
|
false -> disconnected
|
||||||
|
end.
|
||||||
|
|
||||||
connect(Opts) ->
|
connect(Opts) ->
|
||||||
ConnectorOpts = proplists:get_value(connector_opts, Opts),
|
ConnectorOpts = proplists:get_value(connector_opts, Opts),
|
||||||
|
|
|
@ -71,16 +71,17 @@ end_per_testcase(_Case, Config) ->
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
listener_mqtt_tcp_conf(Port, EnableAuthn) ->
|
listener_mqtt_tcp_conf(Port, EnableAuthn) ->
|
||||||
|
PortS = integer_to_binary(Port),
|
||||||
#{
|
#{
|
||||||
acceptors => 16,
|
<<"acceptors">> => 16,
|
||||||
zone => default,
|
<<"zone">> => <<"default">>,
|
||||||
access_rules => ["allow all"],
|
<<"access_rules">> => ["allow all"],
|
||||||
bind => {{0, 0, 0, 0}, Port},
|
<<"bind">> => <<"0.0.0.0:", PortS/binary>>,
|
||||||
max_connections => 1024000,
|
<<"max_connections">> => 1024000,
|
||||||
mountpoint => <<>>,
|
<<"mountpoint">> => <<>>,
|
||||||
proxy_protocol => false,
|
<<"proxy_protocol">> => false,
|
||||||
proxy_protocol_timeout => 3000,
|
<<"proxy_protocol_timeout">> => 3000,
|
||||||
enable_authn => EnableAuthn
|
<<"enable_authn">> => EnableAuthn
|
||||||
}.
|
}.
|
||||||
|
|
||||||
t_enable_authn(_Config) ->
|
t_enable_authn(_Config) ->
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{application, emqx_connector, [
|
{application, emqx_connector, [
|
||||||
{description, "An OTP application"},
|
{description, "An OTP application"},
|
||||||
{vsn, "0.1.1"},
|
{vsn, "0.1.2"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{mod, {emqx_connector_app, []}},
|
{mod, {emqx_connector_app, []}},
|
||||||
{applications, [
|
{applications, [
|
||||||
|
|
|
@ -309,27 +309,42 @@ on_query(
|
||||||
end,
|
end,
|
||||||
Result.
|
Result.
|
||||||
|
|
||||||
on_get_status(_InstId, #{host := Host, port := Port, connect_timeout := Timeout} = State) ->
|
on_get_status(_InstId, #{pool_name := PoolName, connect_timeout := Timeout} = State) ->
|
||||||
case do_get_status(Host, Port, Timeout) of
|
case do_get_status(PoolName, Timeout) of
|
||||||
ok ->
|
true ->
|
||||||
connected;
|
connected;
|
||||||
{error, Reason} ->
|
false ->
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{
|
||||||
msg => "http_connector_get_status_failed",
|
msg => "http_connector_get_status_failed",
|
||||||
reason => Reason,
|
state => State
|
||||||
host => Host,
|
|
||||||
port => Port
|
|
||||||
}),
|
}),
|
||||||
{disconnected, State, Reason}
|
disconnected
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_get_status(Host, Port, Timeout) ->
|
do_get_status(PoolName, Timeout) ->
|
||||||
case gen_tcp:connect(Host, Port, emqx_misc:ipv6_probe([]), Timeout) of
|
Workers = [Worker || {_WorkerName, Worker} <- ehttpc:workers(PoolName)],
|
||||||
{ok, Sock} ->
|
DoPerWorker =
|
||||||
gen_tcp:close(Sock),
|
fun(Worker) ->
|
||||||
ok;
|
case ehttpc:health_check(Worker, Timeout) of
|
||||||
|
ok ->
|
||||||
|
true;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{error, Reason}
|
?SLOG(error, #{
|
||||||
|
msg => "ehttpc_health_check_failed",
|
||||||
|
reason => Reason,
|
||||||
|
worker => Worker
|
||||||
|
}),
|
||||||
|
false
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
try emqx_misc:pmap(DoPerWorker, Workers, Timeout) of
|
||||||
|
[_ | _] = Status ->
|
||||||
|
lists:all(fun(St) -> St =:= true end, Status);
|
||||||
|
[] ->
|
||||||
|
false
|
||||||
|
catch
|
||||||
|
exit:timeout ->
|
||||||
|
false
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -240,32 +240,18 @@ on_get_status(InstId, #{poolname := PoolName} = _State) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
health_check(PoolName) ->
|
health_check(PoolName) ->
|
||||||
Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
|
emqx_plugin_libs_pool:health_check_ecpool_workers(
|
||||||
try
|
PoolName, fun ?MODULE:check_worker_health/1, ?HEALTH_CHECK_TIMEOUT + timer:seconds(1)
|
||||||
emqx_misc:pmap(
|
).
|
||||||
fun check_worker_health/1, Workers, ?HEALTH_CHECK_TIMEOUT + timer:seconds(1)
|
|
||||||
)
|
|
||||||
of
|
|
||||||
[_ | _] = Status ->
|
|
||||||
lists:all(fun(St) -> St =:= true end, Status);
|
|
||||||
[] ->
|
|
||||||
false
|
|
||||||
catch
|
|
||||||
exit:timeout ->
|
|
||||||
false
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% ===================================================================
|
%% ===================================================================
|
||||||
|
|
||||||
check_worker_health(Worker) ->
|
check_worker_health(Conn) ->
|
||||||
case ecpool_worker:client(Worker) of
|
|
||||||
{ok, Conn} ->
|
|
||||||
%% we don't care if this returns something or not, we just to test the connection
|
%% we don't care if this returns something or not, we just to test the connection
|
||||||
try do_test_query(Conn) of
|
try do_test_query(Conn) of
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?SLOG(warning, #{
|
?SLOG(warning, #{
|
||||||
msg => "mongo_connection_get_status_error",
|
msg => "mongo_connection_get_status_error",
|
||||||
worker => Worker,
|
|
||||||
reason => Reason
|
reason => Reason
|
||||||
}),
|
}),
|
||||||
false;
|
false;
|
||||||
|
@ -275,19 +261,10 @@ check_worker_health(Worker) ->
|
||||||
Class:Error ->
|
Class:Error ->
|
||||||
?SLOG(warning, #{
|
?SLOG(warning, #{
|
||||||
msg => "mongo_connection_get_status_exception",
|
msg => "mongo_connection_get_status_exception",
|
||||||
worker => Worker,
|
|
||||||
class => Class,
|
class => Class,
|
||||||
error => Error
|
error => Error
|
||||||
}),
|
}),
|
||||||
false
|
false
|
||||||
end;
|
|
||||||
_ ->
|
|
||||||
?SLOG(warning, #{
|
|
||||||
msg => "mongo_connection_get_status_error",
|
|
||||||
worker => Worker,
|
|
||||||
reason => worker_not_found
|
|
||||||
}),
|
|
||||||
false
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_test_query(Conn) ->
|
do_test_query(Conn) ->
|
||||||
|
|
|
@ -169,9 +169,9 @@ on_query(
|
||||||
mysql_function(sql) -> query;
|
mysql_function(sql) -> query;
|
||||||
mysql_function(prepared_query) -> execute.
|
mysql_function(prepared_query) -> execute.
|
||||||
|
|
||||||
on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn} = State) ->
|
on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn} = State) ->
|
||||||
case emqx_plugin_libs_pool:get_status(PoolName, fun ?MODULE:do_get_status/1, AutoReconn) of
|
case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of
|
||||||
connected ->
|
true ->
|
||||||
case do_check_prepares(State) of
|
case do_check_prepares(State) of
|
||||||
ok ->
|
ok ->
|
||||||
connected;
|
connected;
|
||||||
|
@ -180,15 +180,10 @@ on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn} = S
|
||||||
{connected, NState};
|
{connected, NState};
|
||||||
{error, _Reason} ->
|
{error, _Reason} ->
|
||||||
%% do not log error, it is logged in prepare_sql_to_conn
|
%% do not log error, it is logged in prepare_sql_to_conn
|
||||||
case AutoReconn of
|
conn_status(AutoReconn)
|
||||||
true ->
|
|
||||||
connecting;
|
|
||||||
false ->
|
|
||||||
disconnected
|
|
||||||
end
|
|
||||||
end;
|
end;
|
||||||
ConnectStatus ->
|
false ->
|
||||||
ConnectStatus
|
conn_status(AutoReconn)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_get_status(Conn) ->
|
do_get_status(Conn) ->
|
||||||
|
@ -207,6 +202,9 @@ do_check_prepares(State = #{poolname := PoolName, prepare_statement := {error, P
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% ===================================================================
|
%% ===================================================================
|
||||||
|
conn_status(_AutoReconn = true) -> connecting;
|
||||||
|
conn_status(_AutoReconn = false) -> disconnected.
|
||||||
|
|
||||||
reconn_interval(true) -> 15;
|
reconn_interval(true) -> 15;
|
||||||
reconn_interval(false) -> false.
|
reconn_interval(false) -> false.
|
||||||
|
|
||||||
|
|
|
@ -139,13 +139,19 @@ on_query(InstId, {Type, NameOrSQL, Params}, AfterQuery, #{poolname := PoolName}
|
||||||
end,
|
end,
|
||||||
Result.
|
Result.
|
||||||
|
|
||||||
on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn}) ->
|
on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn}) ->
|
||||||
emqx_plugin_libs_pool:get_status(PoolName, fun ?MODULE:do_get_status/1, AutoReconn).
|
case emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1) of
|
||||||
|
true -> connected;
|
||||||
|
false -> conn_status(AutoReconn)
|
||||||
|
end.
|
||||||
|
|
||||||
do_get_status(Conn) ->
|
do_get_status(Conn) ->
|
||||||
ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")).
|
ok == element(1, epgsql:squery(Conn, "SELECT count(1) AS T")).
|
||||||
|
|
||||||
%% ===================================================================
|
%% ===================================================================
|
||||||
|
conn_status(_AutoReconn = true) -> connecting;
|
||||||
|
conn_status(_AutoReconn = false) -> disconnected.
|
||||||
|
|
||||||
reconn_interval(true) -> 15;
|
reconn_interval(true) -> 15;
|
||||||
reconn_interval(false) -> false.
|
reconn_interval(false) -> false.
|
||||||
|
|
||||||
|
|
|
@ -225,8 +225,9 @@ on_get_status(_InstId, #{type := cluster, poolname := PoolName, auto_reconnect :
|
||||||
false ->
|
false ->
|
||||||
disconnect
|
disconnect
|
||||||
end;
|
end;
|
||||||
on_get_status(_InstId, #{poolname := PoolName, auto_reconnect := AutoReconn}) ->
|
on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn}) ->
|
||||||
emqx_plugin_libs_pool:get_status(PoolName, fun ?MODULE:do_get_status/1, AutoReconn).
|
Health = emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1),
|
||||||
|
status_result(Health, AutoReconn).
|
||||||
|
|
||||||
do_get_status(Conn) ->
|
do_get_status(Conn) ->
|
||||||
case eredis:q(Conn, ["PING"]) of
|
case eredis:q(Conn, ["PING"]) of
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{application, emqx_plugin_libs, [
|
{application, emqx_plugin_libs, [
|
||||||
{description, "EMQX Plugin utility libs"},
|
{description, "EMQX Plugin utility libs"},
|
||||||
{vsn, "4.3.1"},
|
{vsn, "4.3.2"},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{applications, [kernel, stdlib]},
|
{applications, [kernel, stdlib]},
|
||||||
{env, []}
|
{env, []}
|
||||||
|
|
|
@ -20,12 +20,14 @@
|
||||||
start_pool/3,
|
start_pool/3,
|
||||||
stop_pool/1,
|
stop_pool/1,
|
||||||
pool_name/1,
|
pool_name/1,
|
||||||
get_status/2,
|
health_check_ecpool_workers/2,
|
||||||
get_status/3
|
health_check_ecpool_workers/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
|
||||||
|
-define(HEALTH_CHECK_TIMEOUT, 15000).
|
||||||
|
|
||||||
pool_name(ID) when is_binary(ID) ->
|
pool_name(ID) when is_binary(ID) ->
|
||||||
list_to_atom(binary_to_list(ID)).
|
list_to_atom(binary_to_list(ID)).
|
||||||
|
|
||||||
|
@ -61,29 +63,26 @@ stop_pool(Name) ->
|
||||||
error({stop_pool_failed, Name, Reason})
|
error({stop_pool_failed, Name, Reason})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
get_status(PoolName, CheckFunc) ->
|
health_check_ecpool_workers(PoolName, CheckFunc) ->
|
||||||
get_status(PoolName, CheckFunc, false).
|
health_check_ecpool_workers(PoolName, CheckFunc, ?HEALTH_CHECK_TIMEOUT).
|
||||||
|
|
||||||
get_status(PoolName, CheckFunc, AutoReconn) when is_function(CheckFunc) ->
|
health_check_ecpool_workers(PoolName, CheckFunc, Timeout) when is_function(CheckFunc) ->
|
||||||
Status = [
|
Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
|
||||||
begin
|
DoPerWorker =
|
||||||
|
fun(Worker) ->
|
||||||
case ecpool_worker:client(Worker) of
|
case ecpool_worker:client(Worker) of
|
||||||
{ok, Conn} ->
|
{ok, Conn} ->
|
||||||
erlang:is_process_alive(Conn) andalso CheckFunc(Conn);
|
erlang:is_process_alive(Conn) andalso CheckFunc(Conn);
|
||||||
_ ->
|
_ ->
|
||||||
false
|
false
|
||||||
end
|
end
|
||||||
end
|
end,
|
||||||
|| {_WorkerName, Worker} <- ecpool:workers(PoolName)
|
try emqx_misc:pmap(DoPerWorker, Workers, Timeout) of
|
||||||
],
|
[_ | _] = Status ->
|
||||||
case length(Status) > 0 andalso lists:all(fun(St) -> St =:= true end, Status) of
|
lists:all(fun(St) -> St =:= true end, Status);
|
||||||
true ->
|
[] ->
|
||||||
connected;
|
false
|
||||||
false ->
|
catch
|
||||||
case AutoReconn of
|
exit:timeout ->
|
||||||
true ->
|
false
|
||||||
connecting;
|
|
||||||
false ->
|
|
||||||
disconnect
|
|
||||||
end
|
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -70,6 +70,17 @@ counter of the function action or the bridge channel will increase.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rules_metadata {
|
||||||
|
desc {
|
||||||
|
en: "Rule metadata, do not change manually"
|
||||||
|
zh: "规则的元数据,不要手动修改"
|
||||||
|
}
|
||||||
|
label: {
|
||||||
|
en: "Rule metadata"
|
||||||
|
zh: "规则的元数据"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
rules_description {
|
rules_description {
|
||||||
desc {
|
desc {
|
||||||
en: "The description of the rule"
|
en: "The description of the rule"
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
{application, emqx_rule_engine, [
|
{application, emqx_rule_engine, [
|
||||||
{description, "EMQX Rule Engine"},
|
{description, "EMQX Rule Engine"},
|
||||||
% strict semver, bump manually!
|
% strict semver, bump manually!
|
||||||
{vsn, "5.0.0"},
|
{vsn, "5.0.1"},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, [emqx_rule_engine_sup, emqx_rule_engine]},
|
{registered, [emqx_rule_engine_sup, emqx_rule_engine]},
|
||||||
{applications, [kernel, stdlib, rulesql, getopt]},
|
{applications, [kernel, stdlib, rulesql, getopt]},
|
||||||
|
|
|
@ -66,6 +66,8 @@
|
||||||
%% exported for `emqx_telemetry'
|
%% exported for `emqx_telemetry'
|
||||||
-export([get_basic_usage_info/0]).
|
-export([get_basic_usage_info/0]).
|
||||||
|
|
||||||
|
-export([now_ms/0]).
|
||||||
|
|
||||||
%% gen_server Callbacks
|
%% gen_server Callbacks
|
||||||
-export([
|
-export([
|
||||||
init/1,
|
init/1,
|
||||||
|
@ -137,16 +139,22 @@ post_config_update(_, _Req, NewRules, OldRules, _AppEnvs) ->
|
||||||
-spec load_rules() -> ok.
|
-spec load_rules() -> ok.
|
||||||
load_rules() ->
|
load_rules() ->
|
||||||
maps_foreach(
|
maps_foreach(
|
||||||
fun({Id, Rule}) ->
|
fun
|
||||||
{ok, _} = create_rule(Rule#{id => bin(Id)})
|
({Id, #{metadata := #{created_at := CreatedAt}} = Rule}) ->
|
||||||
|
create_rule(Rule#{id => bin(Id)}, CreatedAt);
|
||||||
|
({Id, Rule}) ->
|
||||||
|
create_rule(Rule#{id => bin(Id)})
|
||||||
end,
|
end,
|
||||||
emqx:get_config([rule_engine, rules], #{})
|
emqx:get_config([rule_engine, rules], #{})
|
||||||
).
|
).
|
||||||
|
|
||||||
-spec create_rule(map()) -> {ok, rule()} | {error, term()}.
|
-spec create_rule(map()) -> {ok, rule()} | {error, term()}.
|
||||||
create_rule(Params = #{id := RuleId}) when is_binary(RuleId) ->
|
create_rule(Params) ->
|
||||||
|
create_rule(Params, now_ms()).
|
||||||
|
|
||||||
|
create_rule(Params = #{id := RuleId}, CreatedAt) when is_binary(RuleId) ->
|
||||||
case get_rule(RuleId) of
|
case get_rule(RuleId) of
|
||||||
not_found -> parse_and_insert(Params, now_ms());
|
not_found -> parse_and_insert(Params, CreatedAt);
|
||||||
{ok, _} -> {error, already_exists}
|
{ok, _} -> {error, already_exists}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -233,11 +233,6 @@ param_path_id() ->
|
||||||
%% Rules API
|
%% Rules API
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
%% To get around the hocon bug, we replace crlf with spaces
|
|
||||||
replace_sql_clrf(#{<<"sql">> := SQL} = Params) ->
|
|
||||||
NewSQL = re:replace(SQL, "[\r\n]", " ", [{return, binary}, global]),
|
|
||||||
Params#{<<"sql">> => NewSQL}.
|
|
||||||
|
|
||||||
'/rule_events'(get, _Params) ->
|
'/rule_events'(get, _Params) ->
|
||||||
{200, emqx_rule_events:event_info()}.
|
{200, emqx_rule_events:event_info()}.
|
||||||
|
|
||||||
|
@ -249,7 +244,7 @@ replace_sql_clrf(#{<<"sql">> := SQL} = Params) ->
|
||||||
<<>> ->
|
<<>> ->
|
||||||
{400, #{code => 'BAD_REQUEST', message => <<"empty rule id is not allowed">>}};
|
{400, #{code => 'BAD_REQUEST', message => <<"empty rule id is not allowed">>}};
|
||||||
Id ->
|
Id ->
|
||||||
Params = filter_out_request_body(replace_sql_clrf(Params0)),
|
Params = filter_out_request_body(add_metadata(Params0)),
|
||||||
ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
|
ConfPath = emqx_rule_engine:config_key_path() ++ [Id],
|
||||||
case emqx_rule_engine:get_rule(Id) of
|
case emqx_rule_engine:get_rule(Id) of
|
||||||
{ok, _Rule} ->
|
{ok, _Rule} ->
|
||||||
|
@ -491,6 +486,13 @@ aggregate_metrics(AllMetrics) ->
|
||||||
get_one_rule(AllRules, Id) ->
|
get_one_rule(AllRules, Id) ->
|
||||||
[R || R = #{id := Id0} <- AllRules, Id0 == Id].
|
[R || R = #{id := Id0} <- AllRules, Id0 == Id].
|
||||||
|
|
||||||
|
add_metadata(Params) ->
|
||||||
|
Params#{
|
||||||
|
<<"metadata">> => #{
|
||||||
|
<<"created_at">> => emqx_rule_engine:now_ms()
|
||||||
|
}
|
||||||
|
}.
|
||||||
|
|
||||||
filter_out_request_body(Conf) ->
|
filter_out_request_body(Conf) ->
|
||||||
ExtraConfs = [
|
ExtraConfs = [
|
||||||
<<"id">>,
|
<<"id">>,
|
||||||
|
|
|
@ -91,7 +91,8 @@ fields("rules") ->
|
||||||
example => "Some description",
|
example => "Some description",
|
||||||
default => <<>>
|
default => <<>>
|
||||||
}
|
}
|
||||||
)}
|
)},
|
||||||
|
{"metadata", ?HOCON(map(), #{desc => ?DESC("rules_metadata")})}
|
||||||
];
|
];
|
||||||
fields("builtin_action_republish") ->
|
fields("builtin_action_republish") ->
|
||||||
[
|
[
|
||||||
|
|
2
mix.exs
2
mix.exs
|
@ -47,7 +47,7 @@ defmodule EMQXUmbrella.MixProject do
|
||||||
{:lc, github: "emqx/lc", tag: "0.3.1"},
|
{:lc, github: "emqx/lc", tag: "0.3.1"},
|
||||||
{:redbug, "2.0.7"},
|
{:redbug, "2.0.7"},
|
||||||
{:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true},
|
{:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true},
|
||||||
{:ehttpc, github: "emqx/ehttpc", tag: "0.2.0"},
|
{:ehttpc, github: "emqx/ehttpc", tag: "0.2.1"},
|
||||||
{:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true},
|
{:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true},
|
||||||
{:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
|
{:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
|
||||||
{:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true},
|
{:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true},
|
||||||
|
|
|
@ -49,7 +49,7 @@
|
||||||
, {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
|
, {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
|
||||||
, {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}}
|
, {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}}
|
||||||
, {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.7"}}}
|
, {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.7"}}}
|
||||||
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.2.0"}}}
|
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.2.1"}}}
|
||||||
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
|
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
|
||||||
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
||||||
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
|
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
|
||||||
|
|
Loading…
Reference in New Issue