diff --git a/.ci/docker-compose-file/openldap/README.md b/.ci/docker-compose-file/openldap/README.md new file mode 100644 index 000000000..c91b5c1dc --- /dev/null +++ b/.ci/docker-compose-file/openldap/README.md @@ -0,0 +1,61 @@ +# LDAP authentication + +To run manual tests with the default docker-compose files. + +Expose openldap container port by uncommenting the `ports` config in `docker-compose-ldap.yaml ` + +To start openldap: + +``` +docker-compose -f ./.ci/docker-compose-file/docker-compose.yaml -f ./.ci/docker-compose-file/docker-compose-ldap.yaml up -docker +``` + +## LDAP database + +LDAP database is populated from below files: +``` +apps/emqx_ldap/test/data/emqx.io.ldif /usr/local/etc/openldap/schema/emqx.io.ldif +apps/emqx_ldap/test/data/emqx.schema /usr/local/etc/openldap/schema/emqx.schema +``` + +## Minimal EMQX config + +``` +authentication = [ + { + backend = ldap + base_dn = "uid=${username},ou=testdevice,dc=emqx,dc=io" + filter = "(& (objectClass=mqttUser) (uid=${username}))" + mechanism = password_based + method { + is_superuser_attribute = isSuperuser + password_attribute = userPassword + type = hash + } + password = public + pool_size = 8 + query_timeout = "5s" + request_timeout = "10s" + server = "localhost:1389" + username = "cn=root,dc=emqx,dc=io" + } +] +``` + +## Example ldapsearch command + +``` +ldapsearch -x -H ldap://localhost:389 -D "cn=root,dc=emqx,dc=io" -W -b "uid=mqttuser0007,ou=testdevice,dc=emqx,dc=io" "(&(objectClass=mqttUser)(uid=mqttuser0007))" +``` + +## Example mqttx command + +The client password hashes are generated from their username. + +``` +# disabled user +mqttx pub -t 't/1' -h localhost -p 1883 -m x -u mqttuser0006 -P mqttuser0006 + +# enabled super-user +mqttx pub -t 't/1' -h localhost -p 1883 -m x -u mqttuser0007 -P mqttuser0007 +``` diff --git a/apps/emqx/src/emqx_exclusive_subscription.erl b/apps/emqx/src/emqx_exclusive_subscription.erl index 3a9dc8014..095233fbb 100644 --- a/apps/emqx/src/emqx_exclusive_subscription.erl +++ b/apps/emqx/src/emqx_exclusive_subscription.erl @@ -117,6 +117,13 @@ try_subscribe(ClientId, Topic) -> write ), allow; + [#exclusive_subscription{clientid = ClientId, topic = Topic}] -> + %% Fixed the issue-13476 + %% In this feature, the user must manually call `unsubscribe` to release the lock, + %% but sometimes the node may go down for some reason, + %% then the client will reconnect to this node and resubscribe. + %% We need to allow resubscription, otherwise the lock will never be released. + allow; [_] -> deny end. diff --git a/apps/emqx/test/emqx_exclusive_sub_SUITE.erl b/apps/emqx/test/emqx_exclusive_sub_SUITE.erl index abbdb5f44..a859612b2 100644 --- a/apps/emqx/test/emqx_exclusive_sub_SUITE.erl +++ b/apps/emqx/test/emqx_exclusive_sub_SUITE.erl @@ -56,6 +56,8 @@ t_exclusive_sub(_) -> {ok, _} = emqtt:connect(C1), ?CHECK_SUB(C1, 0), + ?CHECK_SUB(C1, 0), + {ok, C2} = emqtt:start_link([ {clientid, <<"client2">>}, {clean_start, false}, diff --git a/apps/emqx_auth_ldap/test/emqx_authn_ldap_SUITE.erl b/apps/emqx_auth_ldap/test/emqx_authn_ldap_SUITE.erl index ac941f268..af8100c23 100644 --- a/apps/emqx_auth_ldap/test/emqx_authn_ldap_SUITE.erl +++ b/apps/emqx_auth_ldap/test/emqx_authn_ldap_SUITE.erl @@ -21,6 +21,7 @@ -include_lib("emqx_auth/include/emqx_authn.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(LDAP_HOST, "ldap"). -define(LDAP_DEFAULT_PORT, 389). @@ -46,13 +47,6 @@ init_per_suite(Config) -> Apps = emqx_cth_suite:start([emqx, emqx_conf, emqx_auth, emqx_auth_ldap], #{ work_dir => ?config(priv_dir, Config) }), - {ok, _} = emqx_resource:create_local( - ?LDAP_RESOURCE, - ?AUTHN_RESOURCE_GROUP, - emqx_ldap, - ldap_config(), - #{} - ), [{apps, Apps} | Config]; false -> {skip, no_ldap} @@ -63,7 +57,6 @@ end_per_suite(Config) -> [authentication], ?GLOBAL ), - ok = emqx_resource:remove_local(?LDAP_RESOURCE), ok = emqx_cth_suite:stop(?config(apps, Config)). %%------------------------------------------------------------------------------ @@ -128,6 +121,87 @@ t_create_invalid(_Config) -> InvalidConfigs ). +t_authenticate_timeout_cause_reconnect(_Config) -> + TestPid = self(), + meck:new(eldap, [non_strict, no_link, passthrough]), + try + %% cause eldap process to be killed + meck:expect( + eldap, + search, + fun + (Pid, [{base, <<"uid=mqttuser0007", _/binary>>} | _]) -> + TestPid ! {eldap_pid, Pid}, + {error, {gen_tcp_error, timeout}}; + (Pid, Args) -> + meck:passthrough([Pid, Args]) + end + ), + + Credentials = fun(Username) -> + #{ + username => Username, + password => Username, + listener => 'tcp:default', + protocol => mqtt + } + end, + + SpecificConfigParams = #{}, + Result = {ok, #{is_superuser => true}}, + + Timeout = 1000, + Config0 = raw_ldap_auth_config(), + Config = Config0#{ + <<"pool_size">> => 1, + <<"request_timeout">> => Timeout + }, + AuthConfig = maps:merge(Config, SpecificConfigParams), + {ok, _} = emqx:update_config( + ?PATH, + {create_authenticator, ?GLOBAL, AuthConfig} + ), + + %% 0006 is a disabled user + ?assertEqual( + {error, user_disabled}, + emqx_access_control:authenticate(Credentials(<<"mqttuser0006">>)) + ), + ?assertEqual( + {error, not_authorized}, + emqx_access_control:authenticate(Credentials(<<"mqttuser0007">>)) + ), + ok = wait_for_ldap_pid(1000), + [#{id := ResourceID}] = emqx_resource_manager:list_all(), + ?retry(1_000, 10, {ok, connected} = emqx_resource_manager:health_check(ResourceID)), + %% turn back to normal + meck:expect( + eldap, + search, + 2, + fun(Pid2, Query) -> + meck:passthrough([Pid2, Query]) + end + ), + %% expect eldap process to be restarted + ?assertEqual(Result, emqx_access_control:authenticate(Credentials(<<"mqttuser0007">>))), + emqx_authn_test_lib:delete_authenticators( + [authentication], + ?GLOBAL + ) + after + meck:unload(eldap) + end. + +wait_for_ldap_pid(After) -> + receive + {eldap_pid, Pid} -> + ?assertNot(is_process_alive(Pid)), + ok + after After -> + error(timeout) + end. + t_authenticate(_Config) -> ok = lists:foreach( fun(Sample) -> @@ -300,6 +374,3 @@ user_seeds() -> ldap_server() -> iolist_to_binary(io_lib:format("~s:~B", [?LDAP_HOST, ?LDAP_DEFAULT_PORT])). - -ldap_config() -> - emqx_ldap_SUITE:ldap_config([]). diff --git a/apps/emqx_auth_ldap/test/emqx_authz_ldap_SUITE.erl b/apps/emqx_auth_ldap/test/emqx_authz_ldap_SUITE.erl index 09875a3fa..7ff6fdebe 100644 --- a/apps/emqx_auth_ldap/test/emqx_authz_ldap_SUITE.erl +++ b/apps/emqx_auth_ldap/test/emqx_authz_ldap_SUITE.erl @@ -44,7 +44,6 @@ init_per_suite(Config) -> ], #{work_dir => emqx_cth_suite:work_dir(Config)} ), - ok = create_ldap_resource(), [{apps, Apps} | Config]; false -> {skip, no_ldap} @@ -167,21 +166,8 @@ setup_config(SpecialParams) -> ldap_server() -> iolist_to_binary(io_lib:format("~s:~B", [?LDAP_HOST, ?LDAP_DEFAULT_PORT])). -ldap_config() -> - emqx_ldap_SUITE:ldap_config([]). - start_apps(Apps) -> lists:foreach(fun application:ensure_all_started/1, Apps). stop_apps(Apps) -> lists:foreach(fun application:stop/1, Apps). - -create_ldap_resource() -> - {ok, _} = emqx_resource:create_local( - ?LDAP_RESOURCE, - ?AUTHZ_RESOURCE_GROUP, - emqx_ldap, - ldap_config(), - #{} - ), - ok. diff --git a/apps/emqx_connector/src/emqx_connector_jwt.erl b/apps/emqx_connector/src/emqx_connector_jwt.erl index dd74754ba..60b35ddbb 100644 --- a/apps/emqx_connector/src/emqx_connector_jwt.erl +++ b/apps/emqx_connector/src/emqx_connector_jwt.erl @@ -141,5 +141,5 @@ store_jwt(#{resource_id := ResourceId, table := TId}, JWT) -> is_about_to_expire(JWT) -> #jose_jwt{fields = #{<<"exp">> := Exp}} = jose_jwt:peek(JWT), Now = erlang:system_time(seconds), - GraceExp = Exp - timer:seconds(5), + GraceExp = Exp - 5, Now >= GraceExp. diff --git a/apps/emqx_connector/src/emqx_connector_resource.erl b/apps/emqx_connector/src/emqx_connector_resource.erl index 50b2132e2..8cb61793d 100644 --- a/apps/emqx_connector/src/emqx_connector_resource.erl +++ b/apps/emqx_connector/src/emqx_connector_resource.erl @@ -352,8 +352,10 @@ safe_atom(Bin) when is_binary(Bin) -> binary_to_existing_atom(Bin, utf8); safe_atom(Atom) when is_atom(Atom) -> Atom. parse_opts(Conf, Opts0) -> - Opts1 = override_start_after_created(Conf, Opts0), - set_no_buffer_workers(Opts1). + Opts1 = emqx_resource:fetch_creation_opts(Conf), + Opts2 = maps:merge(Opts1, Opts0), + Opts = override_start_after_created(Conf, Opts2), + set_no_buffer_workers(Opts). override_start_after_created(Config, Opts) -> Enabled = maps:get(enable, Config, true), diff --git a/apps/emqx_connector/test/emqx_connector_jwt_SUITE.erl b/apps/emqx_connector/test/emqx_connector_jwt_SUITE.erl index 6469614f8..aef0e660c 100644 --- a/apps/emqx_connector/test/emqx_connector_jwt_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_jwt_SUITE.erl @@ -125,7 +125,7 @@ t_ensure_jwt(_Config) -> JWT0 = emqx_connector_jwt:ensure_jwt(JWTConfig), ?assertNot(is_expired(JWT0)), %% should refresh 5 s before expiration - ct:sleep(Expiration - 5500), + ct:sleep(Expiration - 3000), JWT1 = emqx_connector_jwt:ensure_jwt(JWTConfig), ?assertNot(is_expired(JWT1)), %% fully expired diff --git a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc.erl b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc.erl index 1d2520d0f..4d1fa9439 100644 --- a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc.erl +++ b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc.erl @@ -260,7 +260,15 @@ convert_certs(_Dir, Conf) -> %%------------------------------------------------------------------------------ save_jwks_file(Dir, Content) -> - Path = filename:join([emqx_tls_lib:pem_dir(Dir), "client_jwks"]), + case filelib:is_file(Content) of + true -> + {ok, Content}; + _ -> + Path = filename:join([emqx_tls_lib:pem_dir(Dir), "client_jwks"]), + write_jwks_file(Path, Content) + end. + +write_jwks_file(Path, Content) -> case filelib:ensure_dir(Path) of ok -> case file:write_file(Path, Content) of @@ -288,11 +296,18 @@ maybe_require_pkce(true, Opts) -> }. init_client_jwks(#{client_jwks := #{type := file, file := File}}) -> - case jose_jwk:from_file(File) of - {error, _} -> - none; - Jwks -> - Jwks + try + case jose_jwk:from_file(File) of + {error, Reason} -> + ?SLOG(error, #{msg => "failed_to_initialize_jwks", reason => Reason}), + none; + Jwks -> + Jwks + end + catch + _:CReason -> + ?SLOG(error, #{msg => "failed_to_initialize_jwks", reason => CReason}), + none end; init_client_jwks(_) -> none. diff --git a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc_api.erl b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc_api.erl index 3514b4fbb..eb887cce3 100644 --- a/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc_api.erl +++ b/apps/emqx_dashboard_sso/src/emqx_dashboard_sso_oidc_api.erl @@ -28,6 +28,7 @@ -export([code_callback/2, make_callback_url/1]). +-define(BAD_REQUEST, 'BAD_REQUEST'). -define(BAD_USERNAME_OR_PWD, 'BAD_USERNAME_OR_PWD'). -define(BACKEND_NOT_FOUND, 'BACKEND_NOT_FOUND'). @@ -62,6 +63,7 @@ schema("/sso/oidc/callback") -> desc => ?DESC(code_callback), responses => #{ 200 => emqx_dashboard_api:fields([token, version, license]), + 400 => response_schema(400), 401 => response_schema(401), 404 => response_schema(404) }, @@ -78,8 +80,9 @@ code_callback(get, #{query_string := QS}) -> ?SLOG(info, #{ msg => "dashboard_sso_login_successful" }), - {302, ?RESPHEADERS#{<<"location">> => Target}, ?REDIRECT_BODY}; + {error, invalid_query_string_param} -> + {400, #{code => ?BAD_REQUEST, message => <<"Invalid query string">>}}; {error, invalid_backend} -> {404, #{code => ?BACKEND_NOT_FOUND, message => <<"Backend not found">>}}; {error, Reason} -> @@ -93,11 +96,14 @@ code_callback(get, #{query_string := QS}) -> %%-------------------------------------------------------------------- %% internal %%-------------------------------------------------------------------- - +response_schema(400) -> + emqx_dashboard_swagger:error_codes([?BAD_REQUEST], <<"Bad Request">>); response_schema(401) -> - emqx_dashboard_swagger:error_codes([?BAD_USERNAME_OR_PWD], ?DESC(login_failed401)); + emqx_dashboard_swagger:error_codes( + [?BAD_USERNAME_OR_PWD], ?DESC(emqx_dashboard_api, login_failed401) + ); response_schema(404) -> - emqx_dashboard_swagger:error_codes([?BACKEND_NOT_FOUND], ?DESC(backend_not_found)). + emqx_dashboard_swagger:error_codes([?BACKEND_NOT_FOUND], <<"Backend not found">>). reason_to_message(Bin) when is_binary(Bin) -> Bin; @@ -119,7 +125,9 @@ ensure_oidc_state(#{<<"state">> := State} = QS, Cfg) -> retrieve_token(QS, Cfg, Data); _ -> {error, session_not_exists} - end. + end; +ensure_oidc_state(_, _Cfg) -> + {error, invalid_query_string_param}. retrieve_token( #{<<"code">> := Code}, diff --git a/apps/emqx_ldap/src/emqx_ldap.erl b/apps/emqx_ldap/src/emqx_ldap.erl index 67b250420..a2b09ccda 100644 --- a/apps/emqx_ldap/src/emqx_ldap.erl +++ b/apps/emqx_ldap/src/emqx_ldap.erl @@ -41,6 +41,7 @@ -export([namespace/0, roots/0, fields/1, desc/1]). -export([do_get_status/1, get_status_with_poolname/1]). +-export([search/2]). -define(LDAP_HOST_OPTIONS, #{ default_port => 389 @@ -273,6 +274,22 @@ on_query( Error end. +search(Pid, SearchOptions) -> + case eldap:search(Pid, SearchOptions) of + {error, ldap_closed} -> + %% ldap server closing the socket does not result in + %% process restart, so we need to kill it to trigger a quick reconnect + %% instead of waiting for the next health-check + _ = exit(Pid, kill), + {error, ldap_closed}; + {error, {gen_tcp_error, _} = Reason} -> + %% kill the process to trigger reconnect + _ = exit(Pid, kill), + {error, Reason}; + Result -> + Result + end. + do_ldap_query( InstId, SearchOptions, @@ -283,7 +300,7 @@ do_ldap_query( case ecpool:pick_and_do( PoolName, - {eldap, search, [SearchOptions]}, + {?MODULE, search, [SearchOptions]}, handover ) of @@ -319,7 +336,7 @@ do_ldap_query( ?SLOG( error, LogMeta#{ - msg => "ldap_connector_do_query_failed", + msg => "ldap_connector_query_failed", reason => emqx_utils:redact(Reason) } ), diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index 3f7f24604..ee6a83ab1 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -252,6 +252,9 @@ timezone_to_offset_seconds/1 ]). +%% System functions +-export([getenv/1]). + %% See extra_functions_module/0 and set_extra_functions_module/1 in the %% emqx_rule_engine module -callback handle_rule_function(atom(), list()) -> any() | {error, no_match_for_function}. @@ -1262,3 +1265,9 @@ convert_timestamp(MillisecondsTimestamp) -> uuid_str(UUID, DisplayOpt) -> uuid:uuid_to_string(UUID, DisplayOpt). + +%%------------------------------------------------------------------------------ +%% System Funcs +%%------------------------------------------------------------------------------ +getenv(Env) -> + emqx_variform_bif:getenv(Env). diff --git a/apps/emqx_utils/src/emqx_variform_bif.erl b/apps/emqx_utils/src/emqx_variform_bif.erl index f30db8f7a..e66b8e47d 100644 --- a/apps/emqx_utils/src/emqx_variform_bif.erl +++ b/apps/emqx_utils/src/emqx_variform_bif.erl @@ -79,6 +79,12 @@ %% Number compare functions -export([num_comp/2, num_eq/2, num_lt/2, num_lte/2, num_gt/2, num_gte/2]). +%% System +-export([getenv/1]). + +-define(CACHE(Key), {?MODULE, Key}). +-define(ENV_CACHE(Env), ?CACHE({env, Env})). + %%------------------------------------------------------------------------------ %% String Funcs %%------------------------------------------------------------------------------ @@ -569,3 +575,24 @@ num_lte(A, B) -> num_gte(A, B) -> R = num_comp(A, B), R =:= gt orelse R =:= eq. + +%%------------------------------------------------------------------------------ +%% System +%%------------------------------------------------------------------------------ +getenv(Bin) when is_binary(Bin) -> + EnvKey = ?ENV_CACHE(Bin), + case persistent_term:get(EnvKey, undefined) of + undefined -> + Name = erlang:binary_to_list(Bin), + Result = + case os:getenv(Name) of + false -> + <<>>; + Value -> + erlang:list_to_binary(Value) + end, + persistent_term:put(EnvKey, Result), + Result; + Result -> + Result + end. diff --git a/apps/emqx_utils/test/emqx_variform_bif_tests.erl b/apps/emqx_utils/test/emqx_variform_bif_tests.erl index 92144ff43..36235be40 100644 --- a/apps/emqx_utils/test/emqx_variform_bif_tests.erl +++ b/apps/emqx_utils/test/emqx_variform_bif_tests.erl @@ -72,3 +72,10 @@ base64_encode_decode_test() -> RandBytes = crypto:strong_rand_bytes(100), Encoded = emqx_variform_bif:base64_encode(RandBytes), ?assertEqual(RandBytes, emqx_variform_bif:base64_decode(Encoded)). + +system_test() -> + EnvName = erlang:atom_to_list(?MODULE), + EnvVal = erlang:atom_to_list(?FUNCTION_NAME), + EnvNameBin = erlang:list_to_binary(EnvName), + os:putenv(EnvName, EnvVal), + ?assertEqual(erlang:list_to_binary(EnvVal), emqx_variform_bif:getenv(EnvNameBin)). diff --git a/changes/ce/feat-13507.en.md b/changes/ce/feat-13507.en.md new file mode 100644 index 000000000..115fa49a9 --- /dev/null +++ b/changes/ce/feat-13507.en.md @@ -0,0 +1,2 @@ +Added a new builtin function `getenv` in the rule engine and variform expression to access the environment variables. +Note this value is immutable once loaded from the environment. diff --git a/changes/ce/feat-13521.en.md b/changes/ce/feat-13521.en.md new file mode 100644 index 000000000..6d57eee23 --- /dev/null +++ b/changes/ce/feat-13521.en.md @@ -0,0 +1,4 @@ +Fix LDAP query timeout issue. + +Previously, LDAP query timeout may cause the underlying connection to be unusable. +Fixed to always reconnect if timeout happens. diff --git a/changes/ce/fix-13503.en.md b/changes/ce/fix-13503.en.md new file mode 100644 index 000000000..a4f0eb811 --- /dev/null +++ b/changes/ce/fix-13503.en.md @@ -0,0 +1 @@ +Fixed an issue where a connector wouldn't respect the configured health check interval when first starting up, and would need an update/restart for the correct value to take effect. diff --git a/changes/ce/fix-13515.en.md b/changes/ce/fix-13515.en.md new file mode 100644 index 000000000..775c21848 --- /dev/null +++ b/changes/ce/fix-13515.en.md @@ -0,0 +1 @@ +Fixed an issue where the same client could not subscribe to the same exclusive topic when the node was down for some reason. diff --git a/rel/config/ee-examples/ldap-authn.conf b/rel/config/ee-examples/ldap-authn.conf new file mode 100644 index 000000000..633a5cc7b --- /dev/null +++ b/rel/config/ee-examples/ldap-authn.conf @@ -0,0 +1,19 @@ +authentication = [ + { + backend = ldap + base_dn = "uid=${username},ou=testdevice,dc=emqx,dc=io" + filter = "(& (objectClass=mqttUser) (uid=${username}))" + mechanism = password_based + method { + is_superuser_attribute = isSuperuser + password_attribute = userPassword + type = hash + } + password = public + pool_size = 8 + query_timeout = "5s" + request_timeout = "10s" + server = "localhost:1389" + username = "cn=root,dc=emqx,dc=io" + } +]