Merge remote-tracking branch 'origin/release-57' into sync-release-57-20240726-021823
This commit is contained in:
commit
6236320ab3
|
@ -117,6 +117,13 @@ try_subscribe(ClientId, Topic) ->
|
||||||
write
|
write
|
||||||
),
|
),
|
||||||
allow;
|
allow;
|
||||||
|
[#exclusive_subscription{clientid = ClientId, topic = Topic}] ->
|
||||||
|
%% Fixed the issue-13476
|
||||||
|
%% In this feature, the user must manually call `unsubscribe` to release the lock,
|
||||||
|
%% but sometimes the node may go down for some reason,
|
||||||
|
%% then the client will reconnect to this node and resubscribe.
|
||||||
|
%% We need to allow resubscription, otherwise the lock will never be released.
|
||||||
|
allow;
|
||||||
[_] ->
|
[_] ->
|
||||||
deny
|
deny
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -56,6 +56,8 @@ t_exclusive_sub(_) ->
|
||||||
{ok, _} = emqtt:connect(C1),
|
{ok, _} = emqtt:connect(C1),
|
||||||
?CHECK_SUB(C1, 0),
|
?CHECK_SUB(C1, 0),
|
||||||
|
|
||||||
|
?CHECK_SUB(C1, 0),
|
||||||
|
|
||||||
{ok, C2} = emqtt:start_link([
|
{ok, C2} = emqtt:start_link([
|
||||||
{clientid, <<"client2">>},
|
{clientid, <<"client2">>},
|
||||||
{clean_start, false},
|
{clean_start, false},
|
||||||
|
|
|
@ -141,5 +141,5 @@ store_jwt(#{resource_id := ResourceId, table := TId}, JWT) ->
|
||||||
is_about_to_expire(JWT) ->
|
is_about_to_expire(JWT) ->
|
||||||
#jose_jwt{fields = #{<<"exp">> := Exp}} = jose_jwt:peek(JWT),
|
#jose_jwt{fields = #{<<"exp">> := Exp}} = jose_jwt:peek(JWT),
|
||||||
Now = erlang:system_time(seconds),
|
Now = erlang:system_time(seconds),
|
||||||
GraceExp = Exp - timer:seconds(5),
|
GraceExp = Exp - 5,
|
||||||
Now >= GraceExp.
|
Now >= GraceExp.
|
||||||
|
|
|
@ -352,8 +352,10 @@ safe_atom(Bin) when is_binary(Bin) -> binary_to_existing_atom(Bin, utf8);
|
||||||
safe_atom(Atom) when is_atom(Atom) -> Atom.
|
safe_atom(Atom) when is_atom(Atom) -> Atom.
|
||||||
|
|
||||||
parse_opts(Conf, Opts0) ->
|
parse_opts(Conf, Opts0) ->
|
||||||
Opts1 = override_start_after_created(Conf, Opts0),
|
Opts1 = emqx_resource:fetch_creation_opts(Conf),
|
||||||
set_no_buffer_workers(Opts1).
|
Opts2 = maps:merge(Opts1, Opts0),
|
||||||
|
Opts = override_start_after_created(Conf, Opts2),
|
||||||
|
set_no_buffer_workers(Opts).
|
||||||
|
|
||||||
override_start_after_created(Config, Opts) ->
|
override_start_after_created(Config, Opts) ->
|
||||||
Enabled = maps:get(enable, Config, true),
|
Enabled = maps:get(enable, Config, true),
|
||||||
|
|
|
@ -125,7 +125,7 @@ t_ensure_jwt(_Config) ->
|
||||||
JWT0 = emqx_connector_jwt:ensure_jwt(JWTConfig),
|
JWT0 = emqx_connector_jwt:ensure_jwt(JWTConfig),
|
||||||
?assertNot(is_expired(JWT0)),
|
?assertNot(is_expired(JWT0)),
|
||||||
%% should refresh 5 s before expiration
|
%% should refresh 5 s before expiration
|
||||||
ct:sleep(Expiration - 5500),
|
ct:sleep(Expiration - 3000),
|
||||||
JWT1 = emqx_connector_jwt:ensure_jwt(JWTConfig),
|
JWT1 = emqx_connector_jwt:ensure_jwt(JWTConfig),
|
||||||
?assertNot(is_expired(JWT1)),
|
?assertNot(is_expired(JWT1)),
|
||||||
%% fully expired
|
%% fully expired
|
||||||
|
|
|
@ -252,6 +252,9 @@
|
||||||
timezone_to_offset_seconds/1
|
timezone_to_offset_seconds/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
%% System functions
|
||||||
|
-export([getenv/1]).
|
||||||
|
|
||||||
%% See extra_functions_module/0 and set_extra_functions_module/1 in the
|
%% See extra_functions_module/0 and set_extra_functions_module/1 in the
|
||||||
%% emqx_rule_engine module
|
%% emqx_rule_engine module
|
||||||
-callback handle_rule_function(atom(), list()) -> any() | {error, no_match_for_function}.
|
-callback handle_rule_function(atom(), list()) -> any() | {error, no_match_for_function}.
|
||||||
|
@ -1262,3 +1265,9 @@ convert_timestamp(MillisecondsTimestamp) ->
|
||||||
|
|
||||||
uuid_str(UUID, DisplayOpt) ->
|
uuid_str(UUID, DisplayOpt) ->
|
||||||
uuid:uuid_to_string(UUID, DisplayOpt).
|
uuid:uuid_to_string(UUID, DisplayOpt).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% System Funcs
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
getenv(Env) ->
|
||||||
|
emqx_variform_bif:getenv(Env).
|
||||||
|
|
|
@ -79,6 +79,12 @@
|
||||||
%% Number compare functions
|
%% Number compare functions
|
||||||
-export([num_comp/2, num_eq/2, num_lt/2, num_lte/2, num_gt/2, num_gte/2]).
|
-export([num_comp/2, num_eq/2, num_lt/2, num_lte/2, num_gt/2, num_gte/2]).
|
||||||
|
|
||||||
|
%% System
|
||||||
|
-export([getenv/1]).
|
||||||
|
|
||||||
|
-define(CACHE(Key), {?MODULE, Key}).
|
||||||
|
-define(ENV_CACHE(Env), ?CACHE({env, Env})).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% String Funcs
|
%% String Funcs
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -569,3 +575,24 @@ num_lte(A, B) ->
|
||||||
num_gte(A, B) ->
|
num_gte(A, B) ->
|
||||||
R = num_comp(A, B),
|
R = num_comp(A, B),
|
||||||
R =:= gt orelse R =:= eq.
|
R =:= gt orelse R =:= eq.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% System
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
getenv(Bin) when is_binary(Bin) ->
|
||||||
|
EnvKey = ?ENV_CACHE(Bin),
|
||||||
|
case persistent_term:get(EnvKey, undefined) of
|
||||||
|
undefined ->
|
||||||
|
Name = erlang:binary_to_list(Bin),
|
||||||
|
Result =
|
||||||
|
case os:getenv(Name) of
|
||||||
|
false ->
|
||||||
|
<<>>;
|
||||||
|
Value ->
|
||||||
|
erlang:list_to_binary(Value)
|
||||||
|
end,
|
||||||
|
persistent_term:put(EnvKey, Result),
|
||||||
|
Result;
|
||||||
|
Result ->
|
||||||
|
Result
|
||||||
|
end.
|
||||||
|
|
|
@ -72,3 +72,10 @@ base64_encode_decode_test() ->
|
||||||
RandBytes = crypto:strong_rand_bytes(100),
|
RandBytes = crypto:strong_rand_bytes(100),
|
||||||
Encoded = emqx_variform_bif:base64_encode(RandBytes),
|
Encoded = emqx_variform_bif:base64_encode(RandBytes),
|
||||||
?assertEqual(RandBytes, emqx_variform_bif:base64_decode(Encoded)).
|
?assertEqual(RandBytes, emqx_variform_bif:base64_decode(Encoded)).
|
||||||
|
|
||||||
|
system_test() ->
|
||||||
|
EnvName = erlang:atom_to_list(?MODULE),
|
||||||
|
EnvVal = erlang:atom_to_list(?FUNCTION_NAME),
|
||||||
|
EnvNameBin = erlang:list_to_binary(EnvName),
|
||||||
|
os:putenv(EnvName, EnvVal),
|
||||||
|
?assertEqual(erlang:list_to_binary(EnvVal), emqx_variform_bif:getenv(EnvNameBin)).
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
Added a new builtin function `getenv` in the rule engine and variform expression to access the environment variables.
|
||||||
|
Note this value is immutable once loaded from the environment.
|
|
@ -0,0 +1 @@
|
||||||
|
Fixed an issue where a connector wouldn't respect the configured health check interval when first starting up, and would need an update/restart for the correct value to take effect.
|
|
@ -0,0 +1 @@
|
||||||
|
Fixed an issue where the same client could not subscribe to the same exclusive topic when the node was down for some reason.
|
Loading…
Reference in New Issue