feat: allow mountpoint to use client_attrs
This commit is contained in:
parent
8e8fc6a3d1
commit
3136ec5958
|
@ -1563,9 +1563,7 @@ enrich_client(ConnPkt, Channel = #channel{clientinfo = ClientInfo}) ->
|
||||||
fun maybe_username_as_clientid/2,
|
fun maybe_username_as_clientid/2,
|
||||||
fun maybe_assign_clientid/2,
|
fun maybe_assign_clientid/2,
|
||||||
%% attr init should happen after clientid and username assign
|
%% attr init should happen after clientid and username assign
|
||||||
fun maybe_set_client_initial_attr/2,
|
fun maybe_set_client_initial_attr/2
|
||||||
%% moutpoint fix should happen after attr init
|
|
||||||
fun fix_mountpoint/2
|
|
||||||
],
|
],
|
||||||
ConnPkt,
|
ConnPkt,
|
||||||
ClientInfo
|
ClientInfo
|
||||||
|
@ -1728,11 +1726,11 @@ extract_attr_from_clientinfo(#{extract_from := username, extract_regexp := Regex
|
||||||
extract_attr_from_clientinfo(_Config, _CLientInfo) ->
|
extract_attr_from_clientinfo(_Config, _CLientInfo) ->
|
||||||
ignored.
|
ignored.
|
||||||
|
|
||||||
fix_mountpoint(_ConnPkt, #{mountpoint := undefined}) ->
|
fix_mountpoint(#{mountpoint := undefined} = ClientInfo) ->
|
||||||
ok;
|
ClientInfo;
|
||||||
fix_mountpoint(_ConnPkt, ClientInfo = #{mountpoint := MountPoint}) ->
|
fix_mountpoint(ClientInfo = #{mountpoint := MountPoint}) ->
|
||||||
MountPoint1 = emqx_mountpoint:replvar(MountPoint, ClientInfo),
|
MountPoint1 = emqx_mountpoint:replvar(MountPoint, ClientInfo),
|
||||||
{ok, ClientInfo#{mountpoint := MountPoint1}}.
|
ClientInfo#{mountpoint := MountPoint1}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Set log metadata
|
%% Set log metadata
|
||||||
|
@ -1853,10 +1851,11 @@ merge_auth_result(ClientInfo, AuthResult0) when is_map(ClientInfo) andalso is_ma
|
||||||
Attrs0 = maps:get(client_attrs, ClientInfo, #{}),
|
Attrs0 = maps:get(client_attrs, ClientInfo, #{}),
|
||||||
Attrs1 = maps:get(client_attrs, AuthResult0, #{}),
|
Attrs1 = maps:get(client_attrs, AuthResult0, #{}),
|
||||||
Attrs = maps:merge(Attrs0, Attrs1),
|
Attrs = maps:merge(Attrs0, Attrs1),
|
||||||
maps:merge(
|
NewClientInfo = maps:merge(
|
||||||
ClientInfo#{client_attrs => Attrs},
|
ClientInfo#{client_attrs => Attrs},
|
||||||
AuthResult#{is_superuser => IsSuperuser}
|
AuthResult#{is_superuser => IsSuperuser}
|
||||||
).
|
),
|
||||||
|
fix_mountpoint(NewClientInfo).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Process Topic Alias
|
%% Process Topic Alias
|
||||||
|
|
|
@ -87,18 +87,12 @@ unmount_maybe_share(MountPoint, TopicFilter = #share{topic = Topic}) when
|
||||||
-spec replvar(option(mountpoint()), map()) -> option(mountpoint()).
|
-spec replvar(option(mountpoint()), map()) -> option(mountpoint()).
|
||||||
replvar(undefined, _Vars) ->
|
replvar(undefined, _Vars) ->
|
||||||
undefined;
|
undefined;
|
||||||
replvar(MountPoint, Vars) ->
|
replvar(MountPoint, Vars0) ->
|
||||||
ClientID = maps:get(clientid, Vars, undefined),
|
Allowed = [clientid, username, endpoint_name, client_attrs],
|
||||||
UserName = maps:get(username, Vars, undefined),
|
Vars = maps:filter(
|
||||||
EndpointName = maps:get(endpoint_name, Vars, undefined),
|
fun(K, V) -> V =/= undefined andalso lists:member(K, Allowed) end,
|
||||||
List = [
|
Vars0
|
||||||
{?PH_CLIENTID, ClientID},
|
),
|
||||||
{?PH_USERNAME, UserName},
|
Template = emqx_template:parse(MountPoint),
|
||||||
{?PH_ENDPOINT_NAME, EndpointName}
|
{String, _Errors} = emqx_template:render(Template, Vars),
|
||||||
],
|
unicode:characters_to_binary(String).
|
||||||
lists:foldl(fun feed_var/2, MountPoint, List).
|
|
||||||
|
|
||||||
feed_var({_PlaceHolder, undefined}, MountPoint) ->
|
|
||||||
MountPoint;
|
|
||||||
feed_var({PlaceHolder, Value}, MountPoint) ->
|
|
||||||
emqx_topic:feed_var(PlaceHolder, Value, MountPoint).
|
|
||||||
|
|
|
@ -143,6 +143,36 @@ t_max_conns_tcp(_Config) ->
|
||||||
)
|
)
|
||||||
end).
|
end).
|
||||||
|
|
||||||
|
t_client_attr_as_mountpoint(_Config) ->
|
||||||
|
Port = emqx_common_test_helpers:select_free_port(tcp),
|
||||||
|
ListenerConf = #{
|
||||||
|
<<"bind">> => format_bind({"127.0.0.1", Port}),
|
||||||
|
<<"limiter">> => #{},
|
||||||
|
<<"mountpoint">> => <<"groups/${client_attrs.ns}/">>
|
||||||
|
},
|
||||||
|
emqx_config:put_zone_conf(default, [mqtt, client_attrs_init], #{
|
||||||
|
extract_from => clientid,
|
||||||
|
extract_regexp => <<"^(.+)-.+$">>,
|
||||||
|
extract_as => <<"ns">>
|
||||||
|
}),
|
||||||
|
emqx_logger:set_log_level(debug),
|
||||||
|
with_listener(tcp, attr_as_moutpoint, ListenerConf, fun() ->
|
||||||
|
{ok, Client} = emqtt:start_link(#{
|
||||||
|
hosts => [{"127.0.0.1", Port}],
|
||||||
|
clientid => <<"abc-123">>
|
||||||
|
}),
|
||||||
|
unlink(Client),
|
||||||
|
{ok, _} = emqtt:connect(Client),
|
||||||
|
TopicPrefix = atom_to_binary(?FUNCTION_NAME),
|
||||||
|
SubTopic = <<TopicPrefix/binary, "/#">>,
|
||||||
|
MatchTopic = <<"groups/abc/", TopicPrefix/binary, "/1">>,
|
||||||
|
{ok, _, [1]} = emqtt:subscribe(Client, SubTopic, 1),
|
||||||
|
?assertMatch([_], emqx_router:match_routes(MatchTopic)),
|
||||||
|
emqtt:stop(Client)
|
||||||
|
end),
|
||||||
|
emqx_config:put_zone_conf(default, [mqtt, client_attrs_init], disabled),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_current_conns_tcp(_Config) ->
|
t_current_conns_tcp(_Config) ->
|
||||||
Port = emqx_common_test_helpers:select_free_port(tcp),
|
Port = emqx_common_test_helpers:select_free_port(tcp),
|
||||||
Conf = #{
|
Conf = #{
|
||||||
|
|
|
@ -219,11 +219,11 @@ drop_invalid_attr(Map) when is_map(Map) ->
|
||||||
do_drop_invalid_attr([]) ->
|
do_drop_invalid_attr([]) ->
|
||||||
[];
|
[];
|
||||||
do_drop_invalid_attr([{K, V} | More]) ->
|
do_drop_invalid_attr([{K, V} | More]) ->
|
||||||
case emqx_utils:is_restricted_str(K) andalso emqx_utils:is_restricted_str(V) of
|
case emqx_utils:is_restricted_str(K) of
|
||||||
true ->
|
true ->
|
||||||
[{iolist_to_binary(K), iolist_to_binary(V)} | do_drop_invalid_attr(More)];
|
[{iolist_to_binary(K), iolist_to_binary(V)} | do_drop_invalid_attr(More)];
|
||||||
false ->
|
false ->
|
||||||
?SLOG(debug, #{msg => "invalid_client_attr_dropped", key => K, value => V}, #{
|
?SLOG(debug, #{msg => "invalid_client_attr_dropped", attr_name => K}, #{
|
||||||
tag => "AUTHN"
|
tag => "AUTHN"
|
||||||
}),
|
}),
|
||||||
do_drop_invalid_attr(More)
|
do_drop_invalid_attr(More)
|
||||||
|
|
|
@ -548,8 +548,7 @@ samples() ->
|
||||||
is_superuser => true,
|
is_superuser => true,
|
||||||
client_attrs => #{
|
client_attrs => #{
|
||||||
fid => <<"n11">>,
|
fid => <<"n11">>,
|
||||||
<<"_bad_key">> => <<"v">>,
|
<<"#_bad_key">> => <<"v">>
|
||||||
<<"ok_key">> => <<"but bad value">>
|
|
||||||
}
|
}
|
||||||
}),
|
}),
|
||||||
Req0
|
Req0
|
||||||
|
|
|
@ -1579,7 +1579,8 @@ client_attrs_init {
|
||||||
where `NAME` is the name of the attribute specified in the config `extract_as`.
|
where `NAME` is the name of the attribute specified in the config `extract_as`.
|
||||||
The initialized client attribute will be stored in the `client_attrs` property with the specified name,
|
The initialized client attribute will be stored in the `client_attrs` property with the specified name,
|
||||||
and can be used as a placeholder in a template for authentication and authorization.
|
and can be used as a placeholder in a template for authentication and authorization.
|
||||||
For example, use `${client_attrs.alias}` to render a HTTP POST body when `extract_as = alias`."""
|
For example, use `${client_attrs.alias}` to render an HTTP POST body when `extract_as = alias`,
|
||||||
|
or render listener config `moutpoint = devices/${client_attrs.alias}/` to initialize a per-client topic namespace."""
|
||||||
}
|
}
|
||||||
|
|
||||||
client_attrs_init_extract_from {
|
client_attrs_init_extract_from {
|
||||||
|
|
Loading…
Reference in New Issue