Compare commits

...

11 Commits

Author SHA1 Message Date
id 6236320ab3 Merge remote-tracking branch 'origin/release-57' into sync-release-57-20240726-021823 2024-07-26 02:18:23 +00:00
lafirest 1925ed2f55
Merge pull request #13507 from lafirest/feat/env_func
feat(variform): add a builtin function to get env vars
2024-07-25 22:46:50 +08:00
lafirest a45f817f0e
Merge pull request #13515 from lafirest/fix/exclusive
fix(exclusive): allow the same client to resubscribe to an existing exclusive topic
2024-07-25 21:26:08 +08:00
firest 57959ac7d4 chore: update changes 2024-07-25 18:59:40 +08:00
firest 79020b2436 feat(variform): add a builtin function to get env vars 2024-07-25 18:50:53 +08:00
firest 4f21594707 chore: update changes 2024-07-25 09:40:20 +08:00
firest 117c8197d7 fix(exclusive): allow the same client to resubscribe to an existing exclusive topic 2024-07-25 09:40:15 +08:00
Thales Macedo Garitezi c728b98e79
Merge pull request #13510 from thalesmg/20240723-r57-fix-jwt-about-to-expire-check
fix(jwt): fix grace period for renewal check
2024-07-24 16:52:35 -03:00
Thales Macedo Garitezi 9e65e0d048
Merge pull request #13503 from thalesmg/20240722-r57-resource-manager-hc-interval-startup
fix(connector resource): use configuration `resource_opts` for health check interval when starting up
2024-07-24 09:15:47 -03:00
Thales Macedo Garitezi 7374123c5c fix(jwt): fix grace period for renewal check 2024-07-23 17:25:29 -03:00
Thales Macedo Garitezi 8ae54ac325 fix(connector resource): use configuration `resource_opts` for health check interval when starting up
Fixes https://emqx.atlassian.net/browse/EMQX-12738
2024-07-22 11:34:10 -03:00
11 changed files with 62 additions and 4 deletions

View File

@ -117,6 +117,13 @@ try_subscribe(ClientId, Topic) ->
write
),
allow;
[#exclusive_subscription{clientid = ClientId, topic = Topic}] ->
%% Fixed the issue-13476
%% In this feature, the user must manually call `unsubscribe` to release the lock,
%% but sometimes the node may go down for some reason,
%% then the client will reconnect to this node and resubscribe.
%% We need to allow resubscription, otherwise the lock will never be released.
allow;
[_] ->
deny
end.

View File

@ -56,6 +56,8 @@ t_exclusive_sub(_) ->
{ok, _} = emqtt:connect(C1),
?CHECK_SUB(C1, 0),
?CHECK_SUB(C1, 0),
{ok, C2} = emqtt:start_link([
{clientid, <<"client2">>},
{clean_start, false},

View File

@ -141,5 +141,5 @@ store_jwt(#{resource_id := ResourceId, table := TId}, JWT) ->
is_about_to_expire(JWT) ->
#jose_jwt{fields = #{<<"exp">> := Exp}} = jose_jwt:peek(JWT),
Now = erlang:system_time(seconds),
GraceExp = Exp - timer:seconds(5),
GraceExp = Exp - 5,
Now >= GraceExp.

View File

@ -352,8 +352,10 @@ safe_atom(Bin) when is_binary(Bin) -> binary_to_existing_atom(Bin, utf8);
safe_atom(Atom) when is_atom(Atom) -> Atom.
parse_opts(Conf, Opts0) ->
Opts1 = override_start_after_created(Conf, Opts0),
set_no_buffer_workers(Opts1).
Opts1 = emqx_resource:fetch_creation_opts(Conf),
Opts2 = maps:merge(Opts1, Opts0),
Opts = override_start_after_created(Conf, Opts2),
set_no_buffer_workers(Opts).
override_start_after_created(Config, Opts) ->
Enabled = maps:get(enable, Config, true),

View File

@ -125,7 +125,7 @@ t_ensure_jwt(_Config) ->
JWT0 = emqx_connector_jwt:ensure_jwt(JWTConfig),
?assertNot(is_expired(JWT0)),
%% should refresh 5 s before expiration
ct:sleep(Expiration - 5500),
ct:sleep(Expiration - 3000),
JWT1 = emqx_connector_jwt:ensure_jwt(JWTConfig),
?assertNot(is_expired(JWT1)),
%% fully expired

View File

@ -252,6 +252,9 @@
timezone_to_offset_seconds/1
]).
%% System functions
-export([getenv/1]).
%% See extra_functions_module/0 and set_extra_functions_module/1 in the
%% emqx_rule_engine module
-callback handle_rule_function(atom(), list()) -> any() | {error, no_match_for_function}.
@ -1262,3 +1265,9 @@ convert_timestamp(MillisecondsTimestamp) ->
uuid_str(UUID, DisplayOpt) ->
uuid:uuid_to_string(UUID, DisplayOpt).
%%------------------------------------------------------------------------------
%% System Funcs
%%------------------------------------------------------------------------------
getenv(Env) ->
emqx_variform_bif:getenv(Env).

View File

@ -79,6 +79,12 @@
%% Number compare functions
-export([num_comp/2, num_eq/2, num_lt/2, num_lte/2, num_gt/2, num_gte/2]).
%% System
-export([getenv/1]).
-define(CACHE(Key), {?MODULE, Key}).
-define(ENV_CACHE(Env), ?CACHE({env, Env})).
%%------------------------------------------------------------------------------
%% String Funcs
%%------------------------------------------------------------------------------
@ -569,3 +575,24 @@ num_lte(A, B) ->
num_gte(A, B) ->
R = num_comp(A, B),
R =:= gt orelse R =:= eq.
%%------------------------------------------------------------------------------
%% System
%%------------------------------------------------------------------------------
getenv(Bin) when is_binary(Bin) ->
EnvKey = ?ENV_CACHE(Bin),
case persistent_term:get(EnvKey, undefined) of
undefined ->
Name = erlang:binary_to_list(Bin),
Result =
case os:getenv(Name) of
false ->
<<>>;
Value ->
erlang:list_to_binary(Value)
end,
persistent_term:put(EnvKey, Result),
Result;
Result ->
Result
end.

View File

@ -72,3 +72,10 @@ base64_encode_decode_test() ->
RandBytes = crypto:strong_rand_bytes(100),
Encoded = emqx_variform_bif:base64_encode(RandBytes),
?assertEqual(RandBytes, emqx_variform_bif:base64_decode(Encoded)).
system_test() ->
EnvName = erlang:atom_to_list(?MODULE),
EnvVal = erlang:atom_to_list(?FUNCTION_NAME),
EnvNameBin = erlang:list_to_binary(EnvName),
os:putenv(EnvName, EnvVal),
?assertEqual(erlang:list_to_binary(EnvVal), emqx_variform_bif:getenv(EnvNameBin)).

View File

@ -0,0 +1,2 @@
Added a new builtin function `getenv` in the rule engine and variform expression to access the environment variables.
Note this value is immutable once loaded from the environment.

View File

@ -0,0 +1 @@
Fixed an issue where a connector wouldn't respect the configured health check interval when first starting up, and would need an update/restart for the correct value to take effect.

View File

@ -0,0 +1 @@
Fixed an issue where the same client could not subscribe to the same exclusive topic when the node was down for some reason.