feat(mongo srv): support srv for mongodb authentication

This commit is contained in:
zhouzb 2021-11-12 15:59:35 +08:00
parent d305111929
commit 25f504c90a
4 changed files with 121 additions and 20 deletions

View File

@ -5,7 +5,13 @@
## MongoDB Topology Type.
##
## Value: single | unknown | sharded | rs
auth.mongo.type = single
auth.mongo.type =
## Whether to use SRV and TXT records.
##
## Value: true | false
## Default: false
auth.mongo.srv_record = false
## The set name if type is rs.
##
@ -37,7 +43,6 @@ auth.mongo.pool = 8
## MongoDB AuthSource
##
## Value: String
## Default: mqtt
## auth.mongo.auth_source = admin
## MongoDB database

View File

@ -6,8 +6,12 @@
{datatype, {enum, [single, unknown, sharded, rs]}}
]}.
{mapping, "auth.mongo.srv_record", "emqx_auth_mongo.server", [
{default, false},
{datatype, {enum, [true, false]}}
]}.
{mapping, "auth.mongo.rs_set_name", "emqx_auth_mongo.server", [
{default, "mqtt"},
{datatype, string}
]}.
@ -41,7 +45,6 @@
]}.
{mapping, "auth.mongo.auth_source", "emqx_auth_mongo.server", [
{default, "mqtt"},
{datatype, string}
]}.
@ -101,9 +104,9 @@
]}.
{translation, "emqx_auth_mongo.server", fun(Conf) ->
H = cuttlefish:conf_get("auth.mongo.server", Conf),
Hosts = string:tokens(H, ","),
Type0 = cuttlefish:conf_get("auth.mongo.type", Conf),
SrvRecord = cuttlefish:conf_get("auth.mongo.srv_record", Conf, false),
Server = cuttlefish:conf_get("auth.mongo.server", Conf),
Type = cuttlefish:conf_get("auth.mongo.type", Conf),
Pool = cuttlefish:conf_get("auth.mongo.pool", Conf),
%% FIXME: compatible with 4.0-4.2 version format, plan to delete in 5.0
Login = cuttlefish:conf_get("auth.mongo.username", Conf,
@ -111,7 +114,10 @@
),
Passwd = cuttlefish:conf_get("auth.mongo.password", Conf),
DB = cuttlefish:conf_get("auth.mongo.database", Conf),
AuthSrc = cuttlefish:conf_get("auth.mongo.auth_source", Conf),
AuthSource = case cuttlefish:conf_get("auth.mongo.auth_source", Conf, undefined) of
undefined -> [];
AuthSource0 -> [{auth_source, list_to_binary(AuthSource0)}]
end,
R = cuttlefish:conf_get("auth.mongo.w_mode", Conf),
W = cuttlefish:conf_get("auth.mongo.r_mode", Conf),
Login0 = case Login =:= [] of
@ -156,8 +162,8 @@
false -> []
end,
WorkerOptions = [{database, list_to_binary(DB)}, {auth_source, list_to_binary(AuthSrc)}]
++ Login0 ++ Passwd0 ++ W0 ++ R0 ++ Ssl,
WorkerOptions = [{database, list_to_binary(DB)}]
++ Login0 ++ Passwd0 ++ W0 ++ R0 ++ Ssl ++ AuthSource,
Vars = cuttlefish_variable:fuzzy_matches(["auth", "mongo", "topology", "$name"], Conf),
Options = lists:map(fun({_, Name}) ->
@ -174,16 +180,17 @@
{list_to_atom(Name2), cuttlefish:conf_get("auth.mongo.topology."++Name, Conf)}
end, Vars),
Type = case Type0 =:= rs of
true -> {Type0, list_to_binary(cuttlefish:conf_get("auth.mongo.rs_set_name", Conf))};
false -> Type0
end,
[{type, Type},
{hosts, Hosts},
ReplicaSet = case cuttlefish:conf_get("auth.mongo.rs_set_name", Conf, undefined) of
undefined -> [];
ReplicaSet0 -> [{rs_set_name, list_to_binary(ReplicaSet0)}]
end,
[{srv_record, SrvRecord},
{type, Type},
{server, Server},
{options, Options},
{worker_options, WorkerOptions},
{auto_reconnect, 1},
{pool_size, Pool}]
{pool_size, Pool}] ++ ReplicaSet
end}.
%% The mongodb operation timeout is specified by the value of `cursor_timeout` from application config,

View File

@ -1,6 +1,6 @@
{application, emqx_auth_mongo,
[{description, "EMQ X Authentication/ACL with MongoDB"},
{vsn, "4.3.0"}, % strict semver, bump manually!
{vsn, "4.4.0"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_auth_mongo_sup]},
{applications, [kernel,stdlib,mongodb,ecpool]},

View File

@ -28,7 +28,96 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
{ok, PoolEnv} = application:get_env(?APP, server),
PoolSpec = ecpool:pool_spec(?APP, ?APP, ?APP, PoolEnv),
{ok, Opts} = application:get_env(?APP, server),
NOpts = may_parse_srv_and_txt_records(Opts),
PoolSpec = ecpool:pool_spec(?APP, ?APP, ?APP, NOpts),
{ok, {{one_for_all, 10, 100}, [PoolSpec]}}.
may_parse_srv_and_txt_records(Opts) when is_list(Opts) ->
maps:to_list(may_parse_srv_and_txt_records(maps:from_list(Opts)));
may_parse_srv_and_txt_records(#{type := Type,
srv_record := false,
server := Server} = Opts) ->
Hosts = to_hosts(Server),
case Type =:= rs of
true ->
case maps:get(rs_set_name, Opts, undefined) of
undefined ->
error({missing_parameter, rs_set_name});
ReplicaSet ->
Opts#{type => {rs, ReplicaSet},
hosts => Hosts}
end;
false ->
Opts#{hosts => Hosts}
end;
may_parse_srv_and_txt_records(#{type := Type,
srv_record := true,
server := Server,
worker_options := WorkerOptions} = Opts) ->
Hosts = parse_srv_records(Server),
Opts0 = parse_txt_records(Type, Server),
NWorkerOptions = maps:to_list(maps:merge(maps:from_list(WorkerOptions), maps:with([auth_source], Opts0))),
NOpts = Opts#{hosts => Hosts, worker_options => NWorkerOptions},
case Type =:= rs of
true ->
case maps:get(rs_set_name, Opts0, maps:get(rs_set_name, NOpts, undefined)) of
undefined ->
error({missing_parameter, rs_set_name});
ReplicaSet ->
NOpts#{type => {Type, ReplicaSet}}
end;
false ->
NOpts
end.
to_hosts(Server) ->
[string:trim(H) || H <- string:tokens(Server, ",")].
parse_srv_records(Server) ->
case inet_res:lookup("_mongodb._tcp." ++ Server, in, srv) of
[] ->
error(service_not_found);
Services ->
[Host ++ ":" ++ integer_to_list(Port) || {_, _, Port, Host} <- Services]
end.
parse_txt_records(Type, Server) ->
case inet_res:lookup(Server, in, txt) of
[] ->
#{};
[[QueryString]] ->
case uri_string:dissect_query(QueryString) of
{error, _, _} ->
error({invalid_txt_record, invalid_query_string});
Options ->
Fields = case Type of
rs -> ["authSource", "replicaSet"];
_ -> ["authSource"]
end,
take_and_convert(Fields, Options)
end;
_ ->
error({invalid_txt_record, multiple_records})
end.
take_and_convert(Fields, Options) ->
take_and_convert(Fields, Options, #{}).
take_and_convert([], [_ | _], _Acc) ->
error({invalid_txt_record, invalid_option});
take_and_convert([], [], Acc) ->
Acc;
take_and_convert([Field | More], Options, Acc) ->
case lists:keytake(Field, 1, Options) of
{value, {"authSource", V}, NOptions} ->
take_and_convert(More, NOptions, Acc#{auth_source => list_to_binary(V)});
{value, {"replicaSet", V}, NOptions} ->
take_and_convert(More, NOptions, Acc#{rs_set_name => list_to_binary(V)});
{value, _, _} ->
error({invalid_txt_record, invalid_option});
false ->
take_and_convert(More, Options, Acc)
end.