emqx/apps/emqx_ldap/src/emqx_ldap.erl

367 lines
12 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_ldap).
-include_lib("emqx_connector/include/emqx_connector.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("eldap/include/eldap.hrl").
-behaviour(emqx_resource).
%% callbacks of behaviour emqx_resource
-export([
callback_mode/0,
on_start/2,
on_stop/2,
on_query/3,
on_get_status/2
]).
%% ecpool connect & reconnect
-export([connect/1]).
-export([namespace/0, roots/0, fields/1, desc/1]).
-export([do_get_status/1, get_status_with_poolname/1]).
-define(LDAP_HOST_OPTIONS, #{
default_port => 389
}).
-define(REDACT_VAL, "******").
-type params_tokens() :: #{atom() => list()}.
-type state() ::
#{
pool_name := binary(),
base_tokens := params_tokens(),
filter_tokens := params_tokens()
}.
%%=====================================================================
%% Hocon schema
namespace() -> "ldap".
roots() ->
[{config, #{type => hoconsc:ref(?MODULE, config)}}].
fields(config) ->
[
{server, server()},
{pool_size, fun emqx_connector_schema_lib:pool_size/1},
{username, fun ensure_username/1},
{password, emqx_connector_schema_lib:password_field()},
{base_dn,
?HOCON(binary(), #{
desc => ?DESC(base_dn),
required => true,
example => <<"uid=${username},ou=testdevice,dc=emqx,dc=io">>,
validator => fun emqx_schema:non_empty_string/1
})},
{filter,
?HOCON(
binary(),
#{
desc => ?DESC(filter),
default => <<"(objectClass=mqttUser)">>,
example => <<"(& (objectClass=mqttUser) (uid=${username}))">>,
validator => fun emqx_schema:non_empty_string/1
}
)},
{request_timeout,
?HOCON(emqx_schema:timeout_duration_ms(), #{
desc => ?DESC(request_timeout),
default => <<"10s">>
})},
{ssl,
?HOCON(?R_REF(?MODULE, ssl), #{
default => #{<<"enable">> => false},
desc => ?DESC(emqx_connector_schema_lib, "ssl")
})}
];
fields(ssl) ->
Schema = emqx_schema:client_ssl_opts_schema(#{}),
lists:keydelete("user_lookup_fun", 1, Schema);
fields(bind_opts) ->
[
{bind_password,
?HOCON(
binary(),
#{
desc => ?DESC(bind_password),
default => <<"${password}">>,
example => <<"${password}">>,
sensitive => true,
validator => fun emqx_schema:non_empty_string/1
}
)}
].
desc(ssl) ->
?DESC(emqx_connector_schema_lib, "ssl");
desc(_) ->
undefined.
server() ->
Meta = #{desc => ?DESC("server")},
emqx_schema:servers_sc(Meta, ?LDAP_HOST_OPTIONS).
ensure_username(required) ->
true;
ensure_username(Field) ->
emqx_connector_schema_lib:username(Field).
%% ===================================================================
callback_mode() -> always_sync.
-spec on_start(binary(), hocon:config()) -> {ok, state()} | {error, _}.
on_start(
InstId,
#{
server := Server,
pool_size := PoolSize,
ssl := SSL
} = Config
) ->
HostPort = emqx_schema:parse_server(Server, ?LDAP_HOST_OPTIONS),
?SLOG(info, #{
msg => "starting_ldap_connector",
connector => InstId,
config => emqx_utils:redact(Config)
}),
Config2 = maps:merge(Config, HostPort),
Config3 =
case maps:get(enable, SSL) of
true ->
Config2#{sslopts => emqx_tls_lib:to_client_opts(SSL)};
false ->
Config2
end,
Options = [
{pool_size, PoolSize},
{auto_reconnect, ?AUTO_RECONNECT_INTERVAL},
{options, Config3}
],
case emqx_resource_pool:start(InstId, ?MODULE, [{log_tag, "eldap_info"} | Options]) of
ok ->
emqx_ldap_bind_worker:on_start(
InstId,
Config,
[{log_tag, "eldap_bind_info"} | Options],
prepare_template(Config, #{pool_name => InstId})
);
{error, Reason} ->
?tp(
ldap_connector_start_failed,
#{error => emqx_utils:redact(Reason)}
),
{error, Reason}
end.
on_stop(InstId, State) ->
?SLOG(info, #{
msg => "stopping_ldap_connector",
connector => InstId
}),
ok = emqx_ldap_bind_worker:on_stop(InstId, State),
emqx_resource_pool:stop(InstId).
on_query(InstId, {query, Data}, State) ->
on_query(InstId, {query, Data}, [], State);
on_query(InstId, {query, Data, Attrs}, State) ->
on_query(InstId, {query, Data}, [{attributes, Attrs}], State);
on_query(InstId, {query, Data, Attrs, Timeout}, State) ->
on_query(InstId, {query, Data}, [{attributes, Attrs}, {timeout, Timeout}], State);
on_query(InstId, {bind, _DN, _Data} = Req, State) ->
emqx_ldap_bind_worker:on_query(InstId, Req, State).
on_get_status(InstId, #{pool_name := PoolName} = State) ->
case get_status_with_poolname(PoolName) of
connected ->
emqx_ldap_bind_worker:on_get_status(InstId, State);
disconnected ->
disconnected
end.
get_status_with_poolname(PoolName) ->
case emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1) of
true ->
connected;
false ->
%% Note: here can only return `disconnected` not `connecting`
%% because the LDAP socket/connection can't be reused
%% searching on a died socket will never return until timeout
disconnected
end.
do_get_status(Conn) ->
%% search with an invalid base object
%% if the server is down, the result is {error, ldap_closed}
%% otherwise is {error, invalidDNSyntax/timeout}
{error, ldap_closed} =/=
eldap:search(Conn, [{base, "cn=checkalive"}, {filter, eldap:'approxMatch'("", "")}]).
%% ===================================================================
connect(Options) ->
#{
hostname := Host,
username := Username,
password := Password,
request_timeout := RequestTimeout
} =
Conf = proplists:get_value(options, Options),
OpenOpts = maps:to_list(maps:with([port, sslopts], Conf)),
LogTag = proplists:get_value(log_tag, Options),
case eldap:open([Host], [{log, mk_log_func(LogTag)}, {timeout, RequestTimeout} | OpenOpts]) of
{ok, Handle} = Ret ->
%% TODO: teach `eldap` to accept 0-arity closures as passwords.
case eldap:simple_bind(Handle, Username, emqx_secret:unwrap(Password)) of
ok -> Ret;
Error -> Error
end;
Error ->
Error
end.
on_query(
InstId,
{query, Data},
SearchOptions,
#{base_tokens := BaseTks, filter_tokens := FilterTks} = State
) ->
Base = emqx_placeholder:proc_tmpl(BaseTks, Data),
FilterBin = emqx_placeholder:proc_tmpl(FilterTks, Data, #{
return => full_binary, var_trans => fun filter_escape/1
}),
case emqx_ldap_filter_parser:scan_and_parse(FilterBin) of
{ok, Filter} ->
do_ldap_query(
InstId,
[{base, Base}, {filter, Filter} | SearchOptions],
State
);
{error, Reason} = Error ->
?SLOG(error, #{
msg => "filter_parse_failed",
filter => FilterBin,
reason => Reason
}),
Error
end.
do_ldap_query(
InstId,
SearchOptions,
#{pool_name := PoolName} = State
) ->
LogMeta = #{connector => InstId, search => SearchOptions, state => emqx_utils:redact(State)},
?TRACE("QUERY", "ldap_connector_received_query", LogMeta),
case
ecpool:pick_and_do(
PoolName,
{eldap, search, [SearchOptions]},
handover
)
of
{ok, Result} ->
?tp(
ldap_connector_query_return,
#{result => Result}
),
Entries = Result#eldap_search_result.entries,
Count = length(Entries),
case Count =< 1 of
true ->
{ok, Entries};
false ->
%% Accept only a single exact match.
%% Multiple matches likely indicate:
%% 1. A misconfiguration in EMQX, allowing overly broad query conditions.
%% 2. Indistinguishable entries in the LDAP database.
%% Neither scenario should be allowed to proceed.
Msg = "ldap_query_found_more_than_one_match",
?SLOG(
error,
LogMeta#{
msg => "ldap_query_found_more_than_one_match",
count => Count
}
),
{error, {unrecoverable_error, Msg}}
end;
{error, 'noSuchObject'} ->
{ok, []};
{error, Reason} ->
?SLOG(
error,
LogMeta#{
msg => "ldap_connector_do_query_failed",
reason => emqx_utils:redact(Reason)
}
),
{error, {unrecoverable_error, Reason}}
end.
%% Note: the value of the `_Level` here always is 2
mk_log_func(LogTag) ->
fun(_Level, Format, Args) ->
?SLOG(
debug,
#{
msg => LogTag,
log => io_lib:format(Format, [redact_ldap_log(Arg) || Arg <- Args])
}
)
end.
redact_ldap_log({'BindRequest', Version, Name, {simple, _}}) ->
{'BindRequest', Version, Name, {simple, ?REDACT_VAL}};
redact_ldap_log(Arg) ->
Arg.
prepare_template(Config, State) ->
maps:fold(fun prepare_template/3, State, Config).
prepare_template(base_dn, V, State) ->
State#{base_tokens => emqx_placeholder:preproc_tmpl(V)};
prepare_template(filter, V, State) ->
State#{filter_tokens => emqx_placeholder:preproc_tmpl(V)};
prepare_template(_Entry, _, State) ->
State.
filter_escape(Binary) when is_binary(Binary) ->
filter_escape(erlang:binary_to_list(Binary));
filter_escape([Char | T]) ->
case lists:member(Char, filter_special_chars()) of
true ->
[$\\, Char | filter_escape(T)];
_ ->
[Char | filter_escape(T)]
end;
filter_escape([]) ->
[].
filter_special_chars() ->
[$(, $), $&, $|, $=, $!, $~, $>, $<, $:, $*, $\t, $\n, $\r, $\\].