Merge remote-tracking branch 'origin/release-57' into sync-release-57-20240728-022010
This commit is contained in:
commit
980a884382
|
@ -0,0 +1,61 @@
|
|||
# LDAP authentication
|
||||
|
||||
To run manual tests with the default docker-compose files.
|
||||
|
||||
Expose openldap container port by uncommenting the `ports` config in `docker-compose-ldap.yaml `
|
||||
|
||||
To start openldap:
|
||||
|
||||
```
|
||||
docker-compose -f ./.ci/docker-compose-file/docker-compose.yaml -f ./.ci/docker-compose-file/docker-compose-ldap.yaml up -docker
|
||||
```
|
||||
|
||||
## LDAP database
|
||||
|
||||
LDAP database is populated from below files:
|
||||
```
|
||||
apps/emqx_ldap/test/data/emqx.io.ldif /usr/local/etc/openldap/schema/emqx.io.ldif
|
||||
apps/emqx_ldap/test/data/emqx.schema /usr/local/etc/openldap/schema/emqx.schema
|
||||
```
|
||||
|
||||
## Minimal EMQX config
|
||||
|
||||
```
|
||||
authentication = [
|
||||
{
|
||||
backend = ldap
|
||||
base_dn = "uid=${username},ou=testdevice,dc=emqx,dc=io"
|
||||
filter = "(& (objectClass=mqttUser) (uid=${username}))"
|
||||
mechanism = password_based
|
||||
method {
|
||||
is_superuser_attribute = isSuperuser
|
||||
password_attribute = userPassword
|
||||
type = hash
|
||||
}
|
||||
password = public
|
||||
pool_size = 8
|
||||
query_timeout = "5s"
|
||||
request_timeout = "10s"
|
||||
server = "localhost:1389"
|
||||
username = "cn=root,dc=emqx,dc=io"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
## Example ldapsearch command
|
||||
|
||||
```
|
||||
ldapsearch -x -H ldap://localhost:389 -D "cn=root,dc=emqx,dc=io" -W -b "uid=mqttuser0007,ou=testdevice,dc=emqx,dc=io" "(&(objectClass=mqttUser)(uid=mqttuser0007))"
|
||||
```
|
||||
|
||||
## Example mqttx command
|
||||
|
||||
The client password hashes are generated from their username.
|
||||
|
||||
```
|
||||
# disabled user
|
||||
mqttx pub -t 't/1' -h localhost -p 1883 -m x -u mqttuser0006 -P mqttuser0006
|
||||
|
||||
# enabled super-user
|
||||
mqttx pub -t 't/1' -h localhost -p 1883 -m x -u mqttuser0007 -P mqttuser0007
|
||||
```
|
|
@ -117,6 +117,13 @@ try_subscribe(ClientId, Topic) ->
|
|||
write
|
||||
),
|
||||
allow;
|
||||
[#exclusive_subscription{clientid = ClientId, topic = Topic}] ->
|
||||
%% Fixed the issue-13476
|
||||
%% In this feature, the user must manually call `unsubscribe` to release the lock,
|
||||
%% but sometimes the node may go down for some reason,
|
||||
%% then the client will reconnect to this node and resubscribe.
|
||||
%% We need to allow resubscription, otherwise the lock will never be released.
|
||||
allow;
|
||||
[_] ->
|
||||
deny
|
||||
end.
|
||||
|
|
|
@ -56,6 +56,8 @@ t_exclusive_sub(_) ->
|
|||
{ok, _} = emqtt:connect(C1),
|
||||
?CHECK_SUB(C1, 0),
|
||||
|
||||
?CHECK_SUB(C1, 0),
|
||||
|
||||
{ok, C2} = emqtt:start_link([
|
||||
{clientid, <<"client2">>},
|
||||
{clean_start, false},
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
-include_lib("emqx_auth/include/emqx_authn.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
||||
-define(LDAP_HOST, "ldap").
|
||||
-define(LDAP_DEFAULT_PORT, 389).
|
||||
|
@ -46,13 +47,6 @@ init_per_suite(Config) ->
|
|||
Apps = emqx_cth_suite:start([emqx, emqx_conf, emqx_auth, emqx_auth_ldap], #{
|
||||
work_dir => ?config(priv_dir, Config)
|
||||
}),
|
||||
{ok, _} = emqx_resource:create_local(
|
||||
?LDAP_RESOURCE,
|
||||
?AUTHN_RESOURCE_GROUP,
|
||||
emqx_ldap,
|
||||
ldap_config(),
|
||||
#{}
|
||||
),
|
||||
[{apps, Apps} | Config];
|
||||
false ->
|
||||
{skip, no_ldap}
|
||||
|
@ -63,7 +57,6 @@ end_per_suite(Config) ->
|
|||
[authentication],
|
||||
?GLOBAL
|
||||
),
|
||||
ok = emqx_resource:remove_local(?LDAP_RESOURCE),
|
||||
ok = emqx_cth_suite:stop(?config(apps, Config)).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -128,6 +121,87 @@ t_create_invalid(_Config) ->
|
|||
InvalidConfigs
|
||||
).
|
||||
|
||||
t_authenticate_timeout_cause_reconnect(_Config) ->
|
||||
TestPid = self(),
|
||||
meck:new(eldap, [non_strict, no_link, passthrough]),
|
||||
try
|
||||
%% cause eldap process to be killed
|
||||
meck:expect(
|
||||
eldap,
|
||||
search,
|
||||
fun
|
||||
(Pid, [{base, <<"uid=mqttuser0007", _/binary>>} | _]) ->
|
||||
TestPid ! {eldap_pid, Pid},
|
||||
{error, {gen_tcp_error, timeout}};
|
||||
(Pid, Args) ->
|
||||
meck:passthrough([Pid, Args])
|
||||
end
|
||||
),
|
||||
|
||||
Credentials = fun(Username) ->
|
||||
#{
|
||||
username => Username,
|
||||
password => Username,
|
||||
listener => 'tcp:default',
|
||||
protocol => mqtt
|
||||
}
|
||||
end,
|
||||
|
||||
SpecificConfigParams = #{},
|
||||
Result = {ok, #{is_superuser => true}},
|
||||
|
||||
Timeout = 1000,
|
||||
Config0 = raw_ldap_auth_config(),
|
||||
Config = Config0#{
|
||||
<<"pool_size">> => 1,
|
||||
<<"request_timeout">> => Timeout
|
||||
},
|
||||
AuthConfig = maps:merge(Config, SpecificConfigParams),
|
||||
{ok, _} = emqx:update_config(
|
||||
?PATH,
|
||||
{create_authenticator, ?GLOBAL, AuthConfig}
|
||||
),
|
||||
|
||||
%% 0006 is a disabled user
|
||||
?assertEqual(
|
||||
{error, user_disabled},
|
||||
emqx_access_control:authenticate(Credentials(<<"mqttuser0006">>))
|
||||
),
|
||||
?assertEqual(
|
||||
{error, not_authorized},
|
||||
emqx_access_control:authenticate(Credentials(<<"mqttuser0007">>))
|
||||
),
|
||||
ok = wait_for_ldap_pid(1000),
|
||||
[#{id := ResourceID}] = emqx_resource_manager:list_all(),
|
||||
?retry(1_000, 10, {ok, connected} = emqx_resource_manager:health_check(ResourceID)),
|
||||
%% turn back to normal
|
||||
meck:expect(
|
||||
eldap,
|
||||
search,
|
||||
2,
|
||||
fun(Pid2, Query) ->
|
||||
meck:passthrough([Pid2, Query])
|
||||
end
|
||||
),
|
||||
%% expect eldap process to be restarted
|
||||
?assertEqual(Result, emqx_access_control:authenticate(Credentials(<<"mqttuser0007">>))),
|
||||
emqx_authn_test_lib:delete_authenticators(
|
||||
[authentication],
|
||||
?GLOBAL
|
||||
)
|
||||
after
|
||||
meck:unload(eldap)
|
||||
end.
|
||||
|
||||
wait_for_ldap_pid(After) ->
|
||||
receive
|
||||
{eldap_pid, Pid} ->
|
||||
?assertNot(is_process_alive(Pid)),
|
||||
ok
|
||||
after After ->
|
||||
error(timeout)
|
||||
end.
|
||||
|
||||
t_authenticate(_Config) ->
|
||||
ok = lists:foreach(
|
||||
fun(Sample) ->
|
||||
|
@ -300,6 +374,3 @@ user_seeds() ->
|
|||
|
||||
ldap_server() ->
|
||||
iolist_to_binary(io_lib:format("~s:~B", [?LDAP_HOST, ?LDAP_DEFAULT_PORT])).
|
||||
|
||||
ldap_config() ->
|
||||
emqx_ldap_SUITE:ldap_config([]).
|
||||
|
|
|
@ -44,7 +44,6 @@ init_per_suite(Config) ->
|
|||
],
|
||||
#{work_dir => emqx_cth_suite:work_dir(Config)}
|
||||
),
|
||||
ok = create_ldap_resource(),
|
||||
[{apps, Apps} | Config];
|
||||
false ->
|
||||
{skip, no_ldap}
|
||||
|
@ -167,21 +166,8 @@ setup_config(SpecialParams) ->
|
|||
ldap_server() ->
|
||||
iolist_to_binary(io_lib:format("~s:~B", [?LDAP_HOST, ?LDAP_DEFAULT_PORT])).
|
||||
|
||||
ldap_config() ->
|
||||
emqx_ldap_SUITE:ldap_config([]).
|
||||
|
||||
start_apps(Apps) ->
|
||||
lists:foreach(fun application:ensure_all_started/1, Apps).
|
||||
|
||||
stop_apps(Apps) ->
|
||||
lists:foreach(fun application:stop/1, Apps).
|
||||
|
||||
create_ldap_resource() ->
|
||||
{ok, _} = emqx_resource:create_local(
|
||||
?LDAP_RESOURCE,
|
||||
?AUTHZ_RESOURCE_GROUP,
|
||||
emqx_ldap,
|
||||
ldap_config(),
|
||||
#{}
|
||||
),
|
||||
ok.
|
||||
|
|
|
@ -141,5 +141,5 @@ store_jwt(#{resource_id := ResourceId, table := TId}, JWT) ->
|
|||
is_about_to_expire(JWT) ->
|
||||
#jose_jwt{fields = #{<<"exp">> := Exp}} = jose_jwt:peek(JWT),
|
||||
Now = erlang:system_time(seconds),
|
||||
GraceExp = Exp - timer:seconds(5),
|
||||
GraceExp = Exp - 5,
|
||||
Now >= GraceExp.
|
||||
|
|
|
@ -352,8 +352,10 @@ safe_atom(Bin) when is_binary(Bin) -> binary_to_existing_atom(Bin, utf8);
|
|||
safe_atom(Atom) when is_atom(Atom) -> Atom.
|
||||
|
||||
parse_opts(Conf, Opts0) ->
|
||||
Opts1 = override_start_after_created(Conf, Opts0),
|
||||
set_no_buffer_workers(Opts1).
|
||||
Opts1 = emqx_resource:fetch_creation_opts(Conf),
|
||||
Opts2 = maps:merge(Opts1, Opts0),
|
||||
Opts = override_start_after_created(Conf, Opts2),
|
||||
set_no_buffer_workers(Opts).
|
||||
|
||||
override_start_after_created(Config, Opts) ->
|
||||
Enabled = maps:get(enable, Config, true),
|
||||
|
|
|
@ -125,7 +125,7 @@ t_ensure_jwt(_Config) ->
|
|||
JWT0 = emqx_connector_jwt:ensure_jwt(JWTConfig),
|
||||
?assertNot(is_expired(JWT0)),
|
||||
%% should refresh 5 s before expiration
|
||||
ct:sleep(Expiration - 5500),
|
||||
ct:sleep(Expiration - 3000),
|
||||
JWT1 = emqx_connector_jwt:ensure_jwt(JWTConfig),
|
||||
?assertNot(is_expired(JWT1)),
|
||||
%% fully expired
|
||||
|
|
|
@ -260,7 +260,15 @@ convert_certs(_Dir, Conf) ->
|
|||
%%------------------------------------------------------------------------------
|
||||
|
||||
save_jwks_file(Dir, Content) ->
|
||||
case filelib:is_file(Content) of
|
||||
true ->
|
||||
{ok, Content};
|
||||
_ ->
|
||||
Path = filename:join([emqx_tls_lib:pem_dir(Dir), "client_jwks"]),
|
||||
write_jwks_file(Path, Content)
|
||||
end.
|
||||
|
||||
write_jwks_file(Path, Content) ->
|
||||
case filelib:ensure_dir(Path) of
|
||||
ok ->
|
||||
case file:write_file(Path, Content) of
|
||||
|
@ -288,11 +296,18 @@ maybe_require_pkce(true, Opts) ->
|
|||
}.
|
||||
|
||||
init_client_jwks(#{client_jwks := #{type := file, file := File}}) ->
|
||||
try
|
||||
case jose_jwk:from_file(File) of
|
||||
{error, _} ->
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{msg => "failed_to_initialize_jwks", reason => Reason}),
|
||||
none;
|
||||
Jwks ->
|
||||
Jwks
|
||||
end
|
||||
catch
|
||||
_:CReason ->
|
||||
?SLOG(error, #{msg => "failed_to_initialize_jwks", reason => CReason}),
|
||||
none
|
||||
end;
|
||||
init_client_jwks(_) ->
|
||||
none.
|
||||
|
|
|
@ -28,6 +28,7 @@
|
|||
|
||||
-export([code_callback/2, make_callback_url/1]).
|
||||
|
||||
-define(BAD_REQUEST, 'BAD_REQUEST').
|
||||
-define(BAD_USERNAME_OR_PWD, 'BAD_USERNAME_OR_PWD').
|
||||
-define(BACKEND_NOT_FOUND, 'BACKEND_NOT_FOUND').
|
||||
|
||||
|
@ -62,6 +63,7 @@ schema("/sso/oidc/callback") ->
|
|||
desc => ?DESC(code_callback),
|
||||
responses => #{
|
||||
200 => emqx_dashboard_api:fields([token, version, license]),
|
||||
400 => response_schema(400),
|
||||
401 => response_schema(401),
|
||||
404 => response_schema(404)
|
||||
},
|
||||
|
@ -78,8 +80,9 @@ code_callback(get, #{query_string := QS}) ->
|
|||
?SLOG(info, #{
|
||||
msg => "dashboard_sso_login_successful"
|
||||
}),
|
||||
|
||||
{302, ?RESPHEADERS#{<<"location">> => Target}, ?REDIRECT_BODY};
|
||||
{error, invalid_query_string_param} ->
|
||||
{400, #{code => ?BAD_REQUEST, message => <<"Invalid query string">>}};
|
||||
{error, invalid_backend} ->
|
||||
{404, #{code => ?BACKEND_NOT_FOUND, message => <<"Backend not found">>}};
|
||||
{error, Reason} ->
|
||||
|
@ -93,11 +96,14 @@ code_callback(get, #{query_string := QS}) ->
|
|||
%%--------------------------------------------------------------------
|
||||
%% internal
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
response_schema(400) ->
|
||||
emqx_dashboard_swagger:error_codes([?BAD_REQUEST], <<"Bad Request">>);
|
||||
response_schema(401) ->
|
||||
emqx_dashboard_swagger:error_codes([?BAD_USERNAME_OR_PWD], ?DESC(login_failed401));
|
||||
emqx_dashboard_swagger:error_codes(
|
||||
[?BAD_USERNAME_OR_PWD], ?DESC(emqx_dashboard_api, login_failed401)
|
||||
);
|
||||
response_schema(404) ->
|
||||
emqx_dashboard_swagger:error_codes([?BACKEND_NOT_FOUND], ?DESC(backend_not_found)).
|
||||
emqx_dashboard_swagger:error_codes([?BACKEND_NOT_FOUND], <<"Backend not found">>).
|
||||
|
||||
reason_to_message(Bin) when is_binary(Bin) ->
|
||||
Bin;
|
||||
|
@ -119,7 +125,9 @@ ensure_oidc_state(#{<<"state">> := State} = QS, Cfg) ->
|
|||
retrieve_token(QS, Cfg, Data);
|
||||
_ ->
|
||||
{error, session_not_exists}
|
||||
end.
|
||||
end;
|
||||
ensure_oidc_state(_, _Cfg) ->
|
||||
{error, invalid_query_string_param}.
|
||||
|
||||
retrieve_token(
|
||||
#{<<"code">> := Code},
|
||||
|
|
|
@ -41,6 +41,7 @@
|
|||
-export([namespace/0, roots/0, fields/1, desc/1]).
|
||||
|
||||
-export([do_get_status/1, get_status_with_poolname/1]).
|
||||
-export([search/2]).
|
||||
|
||||
-define(LDAP_HOST_OPTIONS, #{
|
||||
default_port => 389
|
||||
|
@ -273,6 +274,22 @@ on_query(
|
|||
Error
|
||||
end.
|
||||
|
||||
search(Pid, SearchOptions) ->
|
||||
case eldap:search(Pid, SearchOptions) of
|
||||
{error, ldap_closed} ->
|
||||
%% ldap server closing the socket does not result in
|
||||
%% process restart, so we need to kill it to trigger a quick reconnect
|
||||
%% instead of waiting for the next health-check
|
||||
_ = exit(Pid, kill),
|
||||
{error, ldap_closed};
|
||||
{error, {gen_tcp_error, _} = Reason} ->
|
||||
%% kill the process to trigger reconnect
|
||||
_ = exit(Pid, kill),
|
||||
{error, Reason};
|
||||
Result ->
|
||||
Result
|
||||
end.
|
||||
|
||||
do_ldap_query(
|
||||
InstId,
|
||||
SearchOptions,
|
||||
|
@ -283,7 +300,7 @@ do_ldap_query(
|
|||
case
|
||||
ecpool:pick_and_do(
|
||||
PoolName,
|
||||
{eldap, search, [SearchOptions]},
|
||||
{?MODULE, search, [SearchOptions]},
|
||||
handover
|
||||
)
|
||||
of
|
||||
|
@ -319,7 +336,7 @@ do_ldap_query(
|
|||
?SLOG(
|
||||
error,
|
||||
LogMeta#{
|
||||
msg => "ldap_connector_do_query_failed",
|
||||
msg => "ldap_connector_query_failed",
|
||||
reason => emqx_utils:redact(Reason)
|
||||
}
|
||||
),
|
||||
|
|
|
@ -252,6 +252,9 @@
|
|||
timezone_to_offset_seconds/1
|
||||
]).
|
||||
|
||||
%% System functions
|
||||
-export([getenv/1]).
|
||||
|
||||
%% See extra_functions_module/0 and set_extra_functions_module/1 in the
|
||||
%% emqx_rule_engine module
|
||||
-callback handle_rule_function(atom(), list()) -> any() | {error, no_match_for_function}.
|
||||
|
@ -1262,3 +1265,9 @@ convert_timestamp(MillisecondsTimestamp) ->
|
|||
|
||||
uuid_str(UUID, DisplayOpt) ->
|
||||
uuid:uuid_to_string(UUID, DisplayOpt).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% System Funcs
|
||||
%%------------------------------------------------------------------------------
|
||||
getenv(Env) ->
|
||||
emqx_variform_bif:getenv(Env).
|
||||
|
|
|
@ -79,6 +79,12 @@
|
|||
%% Number compare functions
|
||||
-export([num_comp/2, num_eq/2, num_lt/2, num_lte/2, num_gt/2, num_gte/2]).
|
||||
|
||||
%% System
|
||||
-export([getenv/1]).
|
||||
|
||||
-define(CACHE(Key), {?MODULE, Key}).
|
||||
-define(ENV_CACHE(Env), ?CACHE({env, Env})).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% String Funcs
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -569,3 +575,24 @@ num_lte(A, B) ->
|
|||
num_gte(A, B) ->
|
||||
R = num_comp(A, B),
|
||||
R =:= gt orelse R =:= eq.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% System
|
||||
%%------------------------------------------------------------------------------
|
||||
getenv(Bin) when is_binary(Bin) ->
|
||||
EnvKey = ?ENV_CACHE(Bin),
|
||||
case persistent_term:get(EnvKey, undefined) of
|
||||
undefined ->
|
||||
Name = erlang:binary_to_list(Bin),
|
||||
Result =
|
||||
case os:getenv(Name) of
|
||||
false ->
|
||||
<<>>;
|
||||
Value ->
|
||||
erlang:list_to_binary(Value)
|
||||
end,
|
||||
persistent_term:put(EnvKey, Result),
|
||||
Result;
|
||||
Result ->
|
||||
Result
|
||||
end.
|
||||
|
|
|
@ -72,3 +72,10 @@ base64_encode_decode_test() ->
|
|||
RandBytes = crypto:strong_rand_bytes(100),
|
||||
Encoded = emqx_variform_bif:base64_encode(RandBytes),
|
||||
?assertEqual(RandBytes, emqx_variform_bif:base64_decode(Encoded)).
|
||||
|
||||
system_test() ->
|
||||
EnvName = erlang:atom_to_list(?MODULE),
|
||||
EnvVal = erlang:atom_to_list(?FUNCTION_NAME),
|
||||
EnvNameBin = erlang:list_to_binary(EnvName),
|
||||
os:putenv(EnvName, EnvVal),
|
||||
?assertEqual(erlang:list_to_binary(EnvVal), emqx_variform_bif:getenv(EnvNameBin)).
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
Added a new builtin function `getenv` in the rule engine and variform expression to access the environment variables.
|
||||
Note this value is immutable once loaded from the environment.
|
|
@ -0,0 +1,4 @@
|
|||
Fix LDAP query timeout issue.
|
||||
|
||||
Previously, LDAP query timeout may cause the underlying connection to be unusable.
|
||||
Fixed to always reconnect if timeout happens.
|
|
@ -0,0 +1 @@
|
|||
Fixed an issue where a connector wouldn't respect the configured health check interval when first starting up, and would need an update/restart for the correct value to take effect.
|
|
@ -0,0 +1 @@
|
|||
Fixed an issue where the same client could not subscribe to the same exclusive topic when the node was down for some reason.
|
|
@ -0,0 +1,19 @@
|
|||
authentication = [
|
||||
{
|
||||
backend = ldap
|
||||
base_dn = "uid=${username},ou=testdevice,dc=emqx,dc=io"
|
||||
filter = "(& (objectClass=mqttUser) (uid=${username}))"
|
||||
mechanism = password_based
|
||||
method {
|
||||
is_superuser_attribute = isSuperuser
|
||||
password_attribute = userPassword
|
||||
type = hash
|
||||
}
|
||||
password = public
|
||||
pool_size = 8
|
||||
query_timeout = "5s"
|
||||
request_timeout = "10s"
|
||||
server = "localhost:1389"
|
||||
username = "cn=root,dc=emqx,dc=io"
|
||||
}
|
||||
]
|
Loading…
Reference in New Issue