diff --git a/apps/emqx/src/emqx_exclusive_subscription.erl b/apps/emqx/src/emqx_exclusive_subscription.erl index 3a9dc8014..095233fbb 100644 --- a/apps/emqx/src/emqx_exclusive_subscription.erl +++ b/apps/emqx/src/emqx_exclusive_subscription.erl @@ -117,6 +117,13 @@ try_subscribe(ClientId, Topic) -> write ), allow; + [#exclusive_subscription{clientid = ClientId, topic = Topic}] -> + %% Fixed the issue-13476 + %% In this feature, the user must manually call `unsubscribe` to release the lock, + %% but sometimes the node may go down for some reason, + %% then the client will reconnect to this node and resubscribe. + %% We need to allow resubscription, otherwise the lock will never be released. + allow; [_] -> deny end. diff --git a/apps/emqx/test/emqx_exclusive_sub_SUITE.erl b/apps/emqx/test/emqx_exclusive_sub_SUITE.erl index abbdb5f44..a859612b2 100644 --- a/apps/emqx/test/emqx_exclusive_sub_SUITE.erl +++ b/apps/emqx/test/emqx_exclusive_sub_SUITE.erl @@ -56,6 +56,8 @@ t_exclusive_sub(_) -> {ok, _} = emqtt:connect(C1), ?CHECK_SUB(C1, 0), + ?CHECK_SUB(C1, 0), + {ok, C2} = emqtt:start_link([ {clientid, <<"client2">>}, {clean_start, false}, diff --git a/apps/emqx_connector/src/emqx_connector_jwt.erl b/apps/emqx_connector/src/emqx_connector_jwt.erl index dd74754ba..60b35ddbb 100644 --- a/apps/emqx_connector/src/emqx_connector_jwt.erl +++ b/apps/emqx_connector/src/emqx_connector_jwt.erl @@ -141,5 +141,5 @@ store_jwt(#{resource_id := ResourceId, table := TId}, JWT) -> is_about_to_expire(JWT) -> #jose_jwt{fields = #{<<"exp">> := Exp}} = jose_jwt:peek(JWT), Now = erlang:system_time(seconds), - GraceExp = Exp - timer:seconds(5), + GraceExp = Exp - 5, Now >= GraceExp. diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl index 50b2132e2..8cb61793d 100644 --- a/apps/emqx_connector/src/emqx_connector_resource.erl +++ b/apps/emqx_connector/src/emqx_connector_resource.erl @@ -352,8 +352,10 @@ safe_atom(Bin) when is_binary(Bin) -> binary_to_existing_atom(Bin, utf8); safe_atom(Atom) when is_atom(Atom) -> Atom. parse_opts(Conf, Opts0) -> - Opts1 = override_start_after_created(Conf, Opts0), - set_no_buffer_workers(Opts1). + Opts1 = emqx_resource:fetch_creation_opts(Conf), + Opts2 = maps:merge(Opts1, Opts0), + Opts = override_start_after_created(Conf, Opts2), + set_no_buffer_workers(Opts). override_start_after_created(Config, Opts) -> Enabled = maps:get(enable, Config, true), diff --git a/apps/emqx_connector/test/emqx_connector_jwt_SUITE.erl b/apps/emqx_connector/test/emqx_connector_jwt_SUITE.erl index 6469614f8..aef0e660c 100644 --- a/apps/emqx_connector/test/emqx_connector_jwt_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_jwt_SUITE.erl @@ -125,7 +125,7 @@ t_ensure_jwt(_Config) -> JWT0 = emqx_connector_jwt:ensure_jwt(JWTConfig), ?assertNot(is_expired(JWT0)), %% should refresh 5 s before expiration - ct:sleep(Expiration - 5500), + ct:sleep(Expiration - 3000), JWT1 = emqx_connector_jwt:ensure_jwt(JWTConfig), ?assertNot(is_expired(JWT1)), %% fully expired diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index 3f7f24604..ee6a83ab1 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -252,6 +252,9 @@ timezone_to_offset_seconds/1 ]). +%% System functions +-export([getenv/1]). + %% See extra_functions_module/0 and set_extra_functions_module/1 in the %% emqx_rule_engine module -callback handle_rule_function(atom(), list()) -> any() | {error, no_match_for_function}. @@ -1262,3 +1265,9 @@ convert_timestamp(MillisecondsTimestamp) -> uuid_str(UUID, DisplayOpt) -> uuid:uuid_to_string(UUID, DisplayOpt). + +%%------------------------------------------------------------------------------ +%% System Funcs +%%------------------------------------------------------------------------------ +getenv(Env) -> + emqx_variform_bif:getenv(Env). diff --git a/apps/emqx_utils/src/emqx_variform_bif.erl b/apps/emqx_utils/src/emqx_variform_bif.erl index f30db8f7a..e66b8e47d 100644 --- a/apps/emqx_utils/src/emqx_variform_bif.erl +++ b/apps/emqx_utils/src/emqx_variform_bif.erl @@ -79,6 +79,12 @@ %% Number compare functions -export([num_comp/2, num_eq/2, num_lt/2, num_lte/2, num_gt/2, num_gte/2]). +%% System +-export([getenv/1]). + +-define(CACHE(Key), {?MODULE, Key}). +-define(ENV_CACHE(Env), ?CACHE({env, Env})). + %%------------------------------------------------------------------------------ %% String Funcs %%------------------------------------------------------------------------------ @@ -569,3 +575,24 @@ num_lte(A, B) -> num_gte(A, B) -> R = num_comp(A, B), R =:= gt orelse R =:= eq. + +%%------------------------------------------------------------------------------ +%% System +%%------------------------------------------------------------------------------ +getenv(Bin) when is_binary(Bin) -> + EnvKey = ?ENV_CACHE(Bin), + case persistent_term:get(EnvKey, undefined) of + undefined -> + Name = erlang:binary_to_list(Bin), + Result = + case os:getenv(Name) of + false -> + <<>>; + Value -> + erlang:list_to_binary(Value) + end, + persistent_term:put(EnvKey, Result), + Result; + Result -> + Result + end. diff --git a/apps/emqx_utils/test/emqx_variform_bif_tests.erl b/apps/emqx_utils/test/emqx_variform_bif_tests.erl index 92144ff43..36235be40 100644 --- a/apps/emqx_utils/test/emqx_variform_bif_tests.erl +++ b/apps/emqx_utils/test/emqx_variform_bif_tests.erl @@ -72,3 +72,10 @@ base64_encode_decode_test() -> RandBytes = crypto:strong_rand_bytes(100), Encoded = emqx_variform_bif:base64_encode(RandBytes), ?assertEqual(RandBytes, emqx_variform_bif:base64_decode(Encoded)). + +system_test() -> + EnvName = erlang:atom_to_list(?MODULE), + EnvVal = erlang:atom_to_list(?FUNCTION_NAME), + EnvNameBin = erlang:list_to_binary(EnvName), + os:putenv(EnvName, EnvVal), + ?assertEqual(erlang:list_to_binary(EnvVal), emqx_variform_bif:getenv(EnvNameBin)). diff --git a/changes/ce/feat-13507.en.md b/changes/ce/feat-13507.en.md new file mode 100644 index 000000000..115fa49a9 --- /dev/null +++ b/changes/ce/feat-13507.en.md @@ -0,0 +1,2 @@ +Added a new builtin function `getenv` in the rule engine and variform expression to access the environment variables. +Note this value is immutable once loaded from the environment. diff --git a/changes/ce/fix-13503.en.md b/changes/ce/fix-13503.en.md new file mode 100644 index 000000000..a4f0eb811 --- /dev/null +++ b/changes/ce/fix-13503.en.md @@ -0,0 +1 @@ +Fixed an issue where a connector wouldn't respect the configured health check interval when first starting up, and would need an update/restart for the correct value to take effect. diff --git a/changes/ce/fix-13515.en.md b/changes/ce/fix-13515.en.md new file mode 100644 index 000000000..775c21848 --- /dev/null +++ b/changes/ce/fix-13515.en.md @@ -0,0 +1 @@ +Fixed an issue where the same client could not subscribe to the same exclusive topic when the node was down for some reason.