Merge pull request #6855 from savonarola/ph-interpolation
refactor(authn,authz): unify variable interpolation
This commit is contained in:
commit
5ed27f92b7
|
@ -19,8 +19,12 @@
|
|||
-include_lib("emqx/include/emqx_placeholder.hrl").
|
||||
|
||||
-export([ check_password_from_selected_map/3
|
||||
, replace_placeholders/2
|
||||
, replace_placeholder/2
|
||||
, parse_deep/1
|
||||
, parse_str/1
|
||||
, parse_sql/2
|
||||
, render_deep/2
|
||||
, render_str/2
|
||||
, render_sql_params/2
|
||||
, is_superuser/1
|
||||
, bin/1
|
||||
, ensure_apps_started/1
|
||||
|
@ -30,6 +34,13 @@
|
|||
|
||||
-define(RESOURCE_GROUP, <<"emqx_authn">>).
|
||||
|
||||
-define(AUTHN_PLACEHOLDERS, [?PH_USERNAME,
|
||||
?PH_CLIENTID,
|
||||
?PH_PASSWORD,
|
||||
?PH_PEERHOST,
|
||||
?PH_CERT_SUBJECT,
|
||||
?PH_CERT_CN_NAME]).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% APIs
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -45,33 +56,35 @@ check_password_from_selected_map(
|
|||
{error, bad_username_or_password}
|
||||
end.
|
||||
|
||||
replace_placeholders(PlaceHolders, Data) ->
|
||||
replace_placeholders(PlaceHolders, Data, []).
|
||||
parse_deep(Template) ->
|
||||
emqx_placeholder:preproc_tmpl_deep(Template, #{placeholders => ?AUTHN_PLACEHOLDERS}).
|
||||
|
||||
replace_placeholders([], _Credential, Acc) ->
|
||||
lists:reverse(Acc);
|
||||
replace_placeholders([Placeholder | More], Credential, Acc) ->
|
||||
case replace_placeholder(Placeholder, Credential) of
|
||||
undefined ->
|
||||
error({cannot_get_variable, Placeholder});
|
||||
V ->
|
||||
replace_placeholders(More, Credential, [convert_to_sql_param(V) | Acc])
|
||||
end.
|
||||
parse_str(Template) ->
|
||||
emqx_placeholder:preproc_tmpl(Template, #{placeholders => ?AUTHN_PLACEHOLDERS}).
|
||||
|
||||
replace_placeholder(?PH_USERNAME, Credential) ->
|
||||
maps:get(username, Credential, undefined);
|
||||
replace_placeholder(?PH_CLIENTID, Credential) ->
|
||||
maps:get(clientid, Credential, undefined);
|
||||
replace_placeholder(?PH_PASSWORD, Credential) ->
|
||||
maps:get(password, Credential, undefined);
|
||||
replace_placeholder(?PH_PEERHOST, Credential) ->
|
||||
maps:get(peerhost, Credential, undefined);
|
||||
replace_placeholder(?PH_CERT_SUBJECT, Credential) ->
|
||||
maps:get(dn, Credential, undefined);
|
||||
replace_placeholder(?PH_CERT_CN_NAME, Credential) ->
|
||||
maps:get(cn, Credential, undefined);
|
||||
replace_placeholder(Constant, _) ->
|
||||
Constant.
|
||||
parse_sql(Template, ReplaceWith) ->
|
||||
emqx_placeholder:preproc_sql(
|
||||
Template,
|
||||
#{replace_with => ReplaceWith,
|
||||
placeholders => ?AUTHN_PLACEHOLDERS}).
|
||||
|
||||
render_deep(Template, Credential) ->
|
||||
emqx_placeholder:proc_tmpl_deep(
|
||||
Template,
|
||||
Credential,
|
||||
#{return => full_binary, var_trans => fun handle_var/2}).
|
||||
|
||||
render_str(Template, Credential) ->
|
||||
emqx_placeholder:proc_tmpl(
|
||||
Template,
|
||||
Credential,
|
||||
#{return => full_binary, var_trans => fun handle_var/2}).
|
||||
|
||||
render_sql_params(ParamList, Credential) ->
|
||||
emqx_placeholder:proc_tmpl(
|
||||
ParamList,
|
||||
Credential,
|
||||
#{return => rawlist, var_trans => fun handle_sql_var/2}).
|
||||
|
||||
is_superuser(#{<<"is_superuser">> := <<"">>}) ->
|
||||
#{is_superuser => false};
|
||||
|
@ -113,7 +126,12 @@ make_resource_id(Name) ->
|
|||
%% Internal functions
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
convert_to_sql_param(undefined) ->
|
||||
null;
|
||||
convert_to_sql_param(V) ->
|
||||
bin(V).
|
||||
handle_var({var, Name}, undefined) ->
|
||||
error({cannot_get_variable, Name});
|
||||
handle_var(_, Value) ->
|
||||
emqx_placeholder:bin(Value).
|
||||
|
||||
handle_sql_var({var, Name}, undefined) ->
|
||||
error({cannot_get_variable, Name});
|
||||
handle_sql_var(_, Value) ->
|
||||
emqx_placeholder:sql_data(Value).
|
||||
|
|
|
@ -126,12 +126,14 @@ create(#{method := Method,
|
|||
{BsaeUrlWithPath, Query} = parse_fullpath(RawURL),
|
||||
URIMap = parse_url(BsaeUrlWithPath),
|
||||
ResourceId = emqx_authn_utils:make_resource_id(?MODULE),
|
||||
State = #{method => Method,
|
||||
path => maps:get(path, URIMap),
|
||||
base_query => cow_qs:parse_qs(to_bin(Query)),
|
||||
headers => maps:to_list(Headers),
|
||||
body => maps:to_list(Body),
|
||||
request_timeout => RequestTimeout,
|
||||
State = #{method => Method,
|
||||
path => maps:get(path, URIMap),
|
||||
base_query_template => emqx_authn_utils:parse_deep(
|
||||
cow_qs:parse_qs(to_bin(Query))),
|
||||
headers => maps:to_list(Headers),
|
||||
body_template => emqx_authn_utils:parse_deep(
|
||||
maps:to_list(Body)),
|
||||
request_timeout => RequestTimeout,
|
||||
resource_id => ResourceId},
|
||||
case emqx_resource:create_local(ResourceId,
|
||||
emqx_connector_http,
|
||||
|
@ -259,11 +261,11 @@ parse_url(URL) ->
|
|||
|
||||
generate_request(Credential, #{method := Method,
|
||||
path := Path,
|
||||
base_query := BaseQuery,
|
||||
base_query_template := BaseQueryTemplate,
|
||||
headers := Headers,
|
||||
body := Body0}) ->
|
||||
Body = replace_placeholders(Body0, Credential),
|
||||
NBaseQuery = replace_placeholders(BaseQuery, Credential),
|
||||
body_template := BodyTemplate}) ->
|
||||
Body = emqx_authn_utils:render_deep(BodyTemplate, Credential),
|
||||
NBaseQuery = emqx_authn_utils:render_deep(BaseQueryTemplate, Credential),
|
||||
case Method of
|
||||
get ->
|
||||
NPath = append_query(Path, NBaseQuery ++ Body),
|
||||
|
@ -275,19 +277,6 @@ generate_request(Credential, #{method := Method,
|
|||
{NPath, Headers, NBody}
|
||||
end.
|
||||
|
||||
replace_placeholders(KVs, Credential) ->
|
||||
replace_placeholders(KVs, Credential, []).
|
||||
|
||||
replace_placeholders([], _Credential, Acc) ->
|
||||
lists:reverse(Acc);
|
||||
replace_placeholders([{K, V0} | More], Credential, Acc) ->
|
||||
case emqx_authn_utils:replace_placeholder(V0, Credential) of
|
||||
undefined ->
|
||||
error({cannot_get_variable, V0});
|
||||
V ->
|
||||
replace_placeholders(More, Credential, [{K, to_bin(V)} | Acc])
|
||||
end.
|
||||
|
||||
append_query(Path, []) ->
|
||||
Path;
|
||||
append_query(Path, Query) ->
|
||||
|
|
|
@ -97,7 +97,7 @@ create(_AuthenticatorID, Config) ->
|
|||
create(Config).
|
||||
|
||||
create(#{selector := Selector} = Config) ->
|
||||
NSelector = parse_selector(Selector),
|
||||
SelectorTemplate = emqx_authn_utils:parse_deep(Selector),
|
||||
State = maps:with(
|
||||
[collection,
|
||||
password_hash_field,
|
||||
|
@ -110,7 +110,7 @@ create(#{selector := Selector} = Config) ->
|
|||
ok = emqx_authn_password_hashing:init(Algorithm),
|
||||
ResourceId = emqx_authn_utils:make_resource_id(?MODULE),
|
||||
NState = State#{
|
||||
selector => NSelector,
|
||||
selector_template => SelectorTemplate,
|
||||
resource_id => ResourceId},
|
||||
case emqx_resource:create_local(ResourceId, emqx_connector_mongo, Config) of
|
||||
{ok, already_created} ->
|
||||
|
@ -134,17 +134,16 @@ authenticate(#{auth_method := _}, _) ->
|
|||
ignore;
|
||||
authenticate(#{password := Password} = Credential,
|
||||
#{collection := Collection,
|
||||
selector := Selector0,
|
||||
selector_template := SelectorTemplate,
|
||||
resource_id := ResourceId} = State) ->
|
||||
Selector1 = replace_placeholders(Selector0, Credential),
|
||||
Selector2 = normalize_selector(Selector1),
|
||||
case emqx_resource:query(ResourceId, {find_one, Collection, Selector2, #{}}) of
|
||||
Selector = emqx_authn_utils:render_deep(SelectorTemplate, Credential),
|
||||
case emqx_resource:query(ResourceId, {find_one, Collection, Selector, #{}}) of
|
||||
undefined -> ignore;
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{msg => "mongodb_query_failed",
|
||||
resource => ResourceId,
|
||||
collection => Collection,
|
||||
selector => Selector2,
|
||||
selector => Selector,
|
||||
reason => Reason}),
|
||||
ignore;
|
||||
Doc ->
|
||||
|
@ -155,7 +154,7 @@ authenticate(#{password := Password} = Credential,
|
|||
?SLOG(error, #{msg => "cannot_find_password_hash_field",
|
||||
resource => ResourceId,
|
||||
collection => Collection,
|
||||
selector => Selector2,
|
||||
selector => Selector,
|
||||
password_hash_field => PasswordHashField}),
|
||||
ignore;
|
||||
{error, Reason} ->
|
||||
|
@ -171,31 +170,6 @@ destroy(#{resource_id := ResourceId}) ->
|
|||
%% Internal functions
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
parse_selector(Selector) ->
|
||||
NSelector = emqx_json:encode(Selector),
|
||||
Tokens = re:split(NSelector, "(" ++ ?RE_PLACEHOLDER ++ ")", [{return, binary}, group, trim]),
|
||||
parse_selector(Tokens, []).
|
||||
|
||||
parse_selector([], Acc) ->
|
||||
lists:reverse(Acc);
|
||||
parse_selector([[Constant, Placeholder] | Tokens], Acc) ->
|
||||
parse_selector(Tokens, [{placeholder, Placeholder}, {constant, Constant} | Acc]);
|
||||
parse_selector([[Constant] | Tokens], Acc) ->
|
||||
parse_selector(Tokens, [{constant, Constant} | Acc]).
|
||||
|
||||
replace_placeholders(Selector, Credential) ->
|
||||
lists:map(fun({constant, Constant}) ->
|
||||
Constant;
|
||||
({placeholder, Placeholder}) ->
|
||||
case emqx_authn_utils:replace_placeholder(Placeholder, Credential) of
|
||||
undefined -> error({cannot_get_variable, Placeholder});
|
||||
Value -> Value
|
||||
end
|
||||
end, Selector).
|
||||
|
||||
normalize_selector(Selector) ->
|
||||
emqx_json:decode(iolist_to_binary(Selector), [return_maps]).
|
||||
|
||||
check_password(undefined, _Selected, _State) ->
|
||||
{error, bad_username_or_password};
|
||||
check_password(Password,
|
||||
|
|
|
@ -75,7 +75,7 @@ create(#{password_hash_algorithm := Algorithm,
|
|||
query_timeout := QueryTimeout
|
||||
} = Config) ->
|
||||
ok = emqx_authn_password_hashing:init(Algorithm),
|
||||
{Query, PlaceHolders} = parse_query(Query0),
|
||||
{Query, PlaceHolders} = emqx_authn_utils:parse_sql(Query0, '?'),
|
||||
ResourceId = emqx_authn_utils:make_resource_id(?MODULE),
|
||||
State = #{password_hash_algorithm => Algorithm,
|
||||
query => Query,
|
||||
|
@ -108,7 +108,7 @@ authenticate(#{password := Password} = Credential,
|
|||
query_timeout := Timeout,
|
||||
resource_id := ResourceId,
|
||||
password_hash_algorithm := Algorithm}) ->
|
||||
Params = emqx_authn_utils:replace_placeholders(PlaceHolders, Credential),
|
||||
Params = emqx_authn_utils:render_sql_params(PlaceHolders, Credential),
|
||||
case emqx_resource:query(ResourceId, {sql, Query, Params, Timeout}) of
|
||||
{ok, _Columns, []} -> ignore;
|
||||
{ok, Columns, [Row | _]} ->
|
||||
|
@ -133,18 +133,3 @@ authenticate(#{password := Password} = Credential,
|
|||
destroy(#{resource_id := ResourceId}) ->
|
||||
_ = emqx_resource:remove_local(ResourceId),
|
||||
ok.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
%% TODO: Support prepare
|
||||
parse_query(Query) ->
|
||||
case re:run(Query, ?RE_PLACEHOLDER, [global, {capture, all, binary}]) of
|
||||
{match, Captured} ->
|
||||
PlaceHolders = [PlaceHolder || [PlaceHolder] <- Captured],
|
||||
NQuery = re:replace(Query, ?RE_PLACEHOLDER, "?", [global, {return, binary}]),
|
||||
{NQuery, PlaceHolders};
|
||||
nomatch ->
|
||||
{Query, []}
|
||||
end.
|
||||
|
|
|
@ -74,7 +74,7 @@ create(_AuthenticatorID, Config) ->
|
|||
create(#{query := Query0,
|
||||
password_hash_algorithm := Algorithm} = Config) ->
|
||||
ok = emqx_authn_password_hashing:init(Algorithm),
|
||||
{Query, PlaceHolders} = parse_query(Query0),
|
||||
{Query, PlaceHolders} = emqx_authn_utils:parse_sql(Query0, '$n'),
|
||||
ResourceId = emqx_authn_utils:make_resource_id(?MODULE),
|
||||
State = #{placeholders => PlaceHolders,
|
||||
password_hash_algorithm => Algorithm,
|
||||
|
@ -103,7 +103,7 @@ authenticate(#{password := Password} = Credential,
|
|||
#{placeholders := PlaceHolders,
|
||||
resource_id := ResourceId,
|
||||
password_hash_algorithm := Algorithm}) ->
|
||||
Params = emqx_authn_utils:replace_placeholders(PlaceHolders, Credential),
|
||||
Params = emqx_authn_utils:render_sql_params(PlaceHolders, Credential),
|
||||
case emqx_resource:query(ResourceId, {prepared_query, ResourceId, Params}) of
|
||||
{ok, _Columns, []} -> ignore;
|
||||
{ok, Columns, [Row | _]} ->
|
||||
|
@ -127,20 +127,3 @@ authenticate(#{password := Password} = Credential,
|
|||
destroy(#{resource_id := ResourceId}) ->
|
||||
_ = emqx_resource:remove_local(ResourceId),
|
||||
ok.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
parse_query(Query) ->
|
||||
case re:run(Query, ?RE_PLACEHOLDER, [global, {capture, all, binary}]) of
|
||||
{match, Captured} ->
|
||||
PlaceHolders = [PlaceHolder || [PlaceHolder] <- Captured],
|
||||
Replacements = ["$" ++ integer_to_list(I) || I <- lists:seq(1, length(Captured))],
|
||||
NQuery = lists:foldl(fun({PlaceHolder, Replacement}, Query0) ->
|
||||
re:replace(Query0, "\\" ++ PlaceHolder, Replacement, [{return, binary}])
|
||||
end, Query, lists:zip(PlaceHolders, Replacements)),
|
||||
{NQuery, PlaceHolders};
|
||||
nomatch ->
|
||||
{Query, []}
|
||||
end.
|
||||
|
|
|
@ -120,10 +120,10 @@ update(Config, State) ->
|
|||
authenticate(#{auth_method := _}, _) ->
|
||||
ignore;
|
||||
authenticate(#{password := Password} = Credential,
|
||||
#{cmd := {Command, Key, Fields},
|
||||
#{cmd := {Command, KeyTemplate, Fields},
|
||||
resource_id := ResourceId,
|
||||
password_hash_algorithm := Algorithm}) ->
|
||||
NKey = binary_to_list(iolist_to_binary(replace_placeholders(Key, Credential))),
|
||||
NKey = emqx_authn_utils:render_str(KeyTemplate, Credential),
|
||||
case emqx_resource:query(ResourceId, {cmd, [Command, NKey | Fields]}) of
|
||||
{ok, []} -> ignore;
|
||||
{ok, Values} ->
|
||||
|
@ -168,8 +168,8 @@ parse_cmd(Cmd) ->
|
|||
[Command, Key, Field | Fields] when Command =:= "HGET" orelse Command =:= "HMGET" ->
|
||||
NFields = [Field | Fields],
|
||||
check_fields(NFields),
|
||||
NKey = parse_key(Key),
|
||||
{Command, NKey, NFields};
|
||||
KeyTemplate = emqx_authn_utils:parse_str(list_to_binary(Key)),
|
||||
{Command, KeyTemplate, NFields};
|
||||
_ ->
|
||||
error({unsupported_cmd, Cmd})
|
||||
end.
|
||||
|
@ -185,27 +185,6 @@ check_fields(Fields) ->
|
|||
{false, _} -> error(missing_password_hash)
|
||||
end.
|
||||
|
||||
parse_key(Key) ->
|
||||
Tokens = re:split(Key, "(" ++ ?RE_PLACEHOLDER ++ ")", [{return, binary}, group, trim]),
|
||||
parse_key(Tokens, []).
|
||||
|
||||
parse_key([], Acc) ->
|
||||
lists:reverse(Acc);
|
||||
parse_key([[Constant, Placeholder] | Tokens], Acc) ->
|
||||
parse_key(Tokens, [{placeholder, Placeholder}, {constant, Constant} | Acc]);
|
||||
parse_key([[Constant] | Tokens], Acc) ->
|
||||
parse_key(Tokens, [{constant, Constant} | Acc]).
|
||||
|
||||
replace_placeholders(Key, Credential) ->
|
||||
lists:map(fun({constant, Constant}) ->
|
||||
Constant;
|
||||
({placeholder, Placeholder}) ->
|
||||
case emqx_authn_utils:replace_placeholder(Placeholder, Credential) of
|
||||
undefined -> error({cannot_get_variable, Placeholder});
|
||||
Value -> Value
|
||||
end
|
||||
end, Key).
|
||||
|
||||
merge(Fields, Value) when not is_list(Value) ->
|
||||
merge(Fields, [Value]);
|
||||
merge(Fields, Values) ->
|
||||
|
|
|
@ -31,7 +31,7 @@
|
|||
-define(PATH, [authentication]).
|
||||
|
||||
all() ->
|
||||
[{group, require_seeds}, t_create_invalid, t_parse_query].
|
||||
[{group, require_seeds}, t_create_invalid].
|
||||
|
||||
groups() ->
|
||||
[{require_seeds, [], [t_create, t_authenticate, t_update, t_destroy, t_is_superuser]}].
|
||||
|
@ -252,18 +252,6 @@ test_is_superuser({Field, Value, ExpectedValue}) ->
|
|||
{ok, #{is_superuser => ExpectedValue}},
|
||||
emqx_access_control:authenticate(Credentials)).
|
||||
|
||||
|
||||
t_parse_query(_) ->
|
||||
Query1 = ?PH_USERNAME,
|
||||
?assertEqual({<<"$1">>, [?PH_USERNAME]}, emqx_authn_pgsql:parse_query(Query1)),
|
||||
|
||||
Query2 = <<?PH_USERNAME/binary, ", ", ?PH_CLIENTID/binary>>,
|
||||
?assertEqual({<<"$1, $2">>, [?PH_USERNAME, ?PH_CLIENTID]},
|
||||
emqx_authn_pgsql:parse_query(Query2)),
|
||||
|
||||
Query3 = <<"nomatch">>,
|
||||
?assertEqual({<<"nomatch">>, []}, emqx_authn_pgsql:parse_query(Query3)).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Helpers
|
||||
%%------------------------------------------------------------------------------
|
||||
|
|
|
@ -42,6 +42,8 @@
|
|||
]
|
||||
}).
|
||||
|
||||
-define(IS_TRUE(Val), ((Val =:= true) or (Val =:= <<"true">>))).
|
||||
|
||||
-export([ get_raw_sources/0
|
||||
, get_raw_source/1
|
||||
]).
|
||||
|
@ -462,8 +464,7 @@ read_certs(#{<<"ssl">> := SSL} = Source) ->
|
|||
end;
|
||||
read_certs(Source) -> Source.
|
||||
|
||||
maybe_write_certs(#{<<"ssl">> := #{<<"enable">> := True} = SSL} = Source)
|
||||
when (True =:= true) or (True =:= <<"true">>) ->
|
||||
maybe_write_certs(#{<<"ssl">> := #{<<"enable">> := True} = SSL} = Source) when ?IS_TRUE(True) ->
|
||||
Type = maps:get(<<"type">>, Source),
|
||||
{ok, Return} = emqx_tls_lib:ensure_ssl_files(filename:join(["authz", Type]), SSL),
|
||||
maps:put(<<"ssl">>, Return, Source);
|
||||
|
|
|
@ -37,6 +37,14 @@
|
|||
-compile(nowarn_export_all).
|
||||
-endif.
|
||||
|
||||
-define(PLACEHOLDERS, [?PH_USERNAME,
|
||||
?PH_CLIENTID,
|
||||
?PH_PEERHOST,
|
||||
?PH_PROTONAME,
|
||||
?PH_MOUNTPOINT,
|
||||
?PH_TOPIC,
|
||||
?PH_ACTION]).
|
||||
|
||||
description() ->
|
||||
"AuthZ with http".
|
||||
|
||||
|
@ -87,12 +95,16 @@ parse_config(#{ url := URL
|
|||
} = Conf) ->
|
||||
{BaseURLWithPath, Query} = parse_fullpath(URL),
|
||||
BaseURLMap = parse_url(BaseURLWithPath),
|
||||
Conf#{ method => Method
|
||||
, base_url => maps:remove(query, BaseURLMap)
|
||||
, base_query => cow_qs:parse_qs(bin(Query))
|
||||
, body => maps:get(body, Conf, #{})
|
||||
, headers => Headers
|
||||
, request_timeout => ReqTimeout
|
||||
Conf#{ method => Method
|
||||
, base_url => maps:remove(query, BaseURLMap)
|
||||
, base_query_template => emqx_authz_utils:parse_deep(
|
||||
cow_qs:parse_qs(bin(Query)),
|
||||
?PLACEHOLDERS)
|
||||
, body_template => emqx_authz_utils:parse_deep(
|
||||
maps:to_list(maps:get(body, Conf, #{})),
|
||||
?PLACEHOLDERS)
|
||||
, headers => Headers
|
||||
, request_timeout => ReqTimeout
|
||||
}.
|
||||
|
||||
parse_fullpath(RawURL) ->
|
||||
|
@ -115,12 +127,13 @@ generate_request( PubSub
|
|||
, Client
|
||||
, #{ method := Method
|
||||
, base_url := #{path := Path}
|
||||
, base_query := BaseQuery
|
||||
, base_query_template := BaseQueryTemplate
|
||||
, headers := Headers
|
||||
, body := Body0
|
||||
, body_template := BodyTemplate
|
||||
}) ->
|
||||
Body = replace_placeholders(maps:to_list(Body0), PubSub, Topic, Client),
|
||||
NBaseQuery = replace_placeholders(BaseQuery, PubSub, Topic, Client),
|
||||
Values = client_vars(Client, PubSub, Topic),
|
||||
Body = emqx_authz_utils:render_deep(BodyTemplate, Values),
|
||||
NBaseQuery = emqx_authz_utils:render_deep(BaseQueryTemplate, Values),
|
||||
case Method of
|
||||
get ->
|
||||
NPath = append_query(Path, NBaseQuery ++ Body),
|
||||
|
@ -159,36 +172,11 @@ serialize_body(<<"application/json">>, Body) ->
|
|||
serialize_body(<<"application/x-www-form-urlencoded">>, Body) ->
|
||||
query_string(Body).
|
||||
|
||||
replace_placeholders(KVs, PubSub, Topic, Client) ->
|
||||
replace_placeholders(KVs, PubSub, Topic, Client, []).
|
||||
|
||||
replace_placeholders([], _PubSub, _Topic, _Client, Acc) ->
|
||||
lists:reverse(Acc);
|
||||
replace_placeholders([{K, V0} | More], PubSub, Topic, Client, Acc) ->
|
||||
case replace_placeholder(V0, PubSub, Topic, Client) of
|
||||
undefined ->
|
||||
error({cannot_get_variable, V0});
|
||||
V ->
|
||||
replace_placeholders(More, PubSub, Topic, Client, [{bin(K), bin(V)} | Acc])
|
||||
end.
|
||||
|
||||
replace_placeholder(?PH_USERNAME, _PubSub, _Topic, Client) ->
|
||||
bin(maps:get(username, Client, undefined));
|
||||
replace_placeholder(?PH_CLIENTID, _PubSub, _Topic, Client) ->
|
||||
bin(maps:get(clientid, Client, undefined));
|
||||
replace_placeholder(?PH_HOST, _PubSub, _Topic, Client) ->
|
||||
inet_parse:ntoa(maps:get(peerhost, Client, undefined));
|
||||
replace_placeholder(?PH_PROTONAME, _PubSub, _Topic, Client) ->
|
||||
bin(maps:get(protocol, Client, undefined));
|
||||
replace_placeholder(?PH_MOUNTPOINT, _PubSub, _Topic, Client) ->
|
||||
bin(maps:get(mountpoint, Client, undefined));
|
||||
replace_placeholder(?PH_TOPIC, _PubSub, Topic, _Client) ->
|
||||
bin(emqx_http_lib:uri_encode(Topic));
|
||||
replace_placeholder(?PH_ACTION, PubSub, _Topic, _Client) ->
|
||||
bin(PubSub);
|
||||
|
||||
replace_placeholder(Constant, _, _, _) ->
|
||||
Constant.
|
||||
client_vars(Client, PubSub, Topic) ->
|
||||
Client#{
|
||||
action => PubSub,
|
||||
topic => Topic
|
||||
}.
|
||||
|
||||
bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
|
||||
bin(B) when is_binary(B) -> B;
|
||||
|
|
|
@ -36,13 +36,20 @@
|
|||
-compile(nowarn_export_all).
|
||||
-endif.
|
||||
|
||||
-define(PLACEHOLDERS, [?PH_USERNAME,
|
||||
?PH_CLIENTID,
|
||||
?PH_PEERHOST]).
|
||||
|
||||
description() ->
|
||||
"AuthZ with MongoDB".
|
||||
|
||||
init(Source) ->
|
||||
init(#{selector := Selector} = Source) ->
|
||||
case emqx_authz_utils:create_resource(emqx_connector_mongo, Source) of
|
||||
{error, Reason} -> error({load_config_error, Reason});
|
||||
{ok, Id} -> Source#{annotations => #{id => Id}}
|
||||
{ok, Id} -> Source#{annotations => #{id => Id},
|
||||
selector_template => emqx_authz_utils:parse_deep(
|
||||
Selector,
|
||||
?PLACEHOLDERS)}
|
||||
end.
|
||||
|
||||
dry_run(Source) ->
|
||||
|
@ -53,10 +60,10 @@ destroy(#{annotations := #{id := Id}}) ->
|
|||
|
||||
authorize(Client, PubSub, Topic,
|
||||
#{collection := Collection,
|
||||
selector := Selector,
|
||||
selector_template := SelectorTemplate,
|
||||
annotations := #{id := ResourceID}
|
||||
}) ->
|
||||
RenderedSelector = replvar(Selector, Client),
|
||||
RenderedSelector = emqx_authz_utils:render_deep(SelectorTemplate, Client),
|
||||
Result = try
|
||||
emqx_resource:query(ResourceID, {find, Collection, RenderedSelector, #{}})
|
||||
catch
|
||||
|
@ -87,33 +94,3 @@ do_authorize(Client, PubSub, Topic, [Rule | Tail]) ->
|
|||
{matched, Permission} -> {matched, Permission};
|
||||
nomatch -> do_authorize(Client, PubSub, Topic, Tail)
|
||||
end.
|
||||
|
||||
replvar(Selector, #{clientid := Clientid,
|
||||
username := Username,
|
||||
peerhost := IpAddress
|
||||
}) ->
|
||||
Fun = fun
|
||||
InFun(K, V, AccIn) when is_map(V) ->
|
||||
maps:put(K, maps:fold(InFun, AccIn, V), AccIn);
|
||||
InFun(K, V, AccIn) when is_list(V) ->
|
||||
maps:put(K, [ begin
|
||||
[{K1, V1}] = maps:to_list(M),
|
||||
InFun(K1, V1, AccIn)
|
||||
end || M <- V],
|
||||
AccIn);
|
||||
InFun(K, V, AccIn) when is_binary(V) ->
|
||||
V1 = re:replace( V, emqx_authz:ph_to_re(?PH_S_CLIENTID)
|
||||
, bin(Clientid), [global, {return, binary}]),
|
||||
V2 = re:replace( V1, emqx_authz:ph_to_re(?PH_S_USERNAME)
|
||||
, bin(Username), [global, {return, binary}]),
|
||||
V3 = re:replace( V2, emqx_authz:ph_to_re(?PH_S_PEERHOST)
|
||||
, inet_parse:ntoa(IpAddress), [global, {return, binary}]),
|
||||
maps:put(K, V3, AccIn);
|
||||
InFun(K, V, AccIn) -> maps:put(K, V, AccIn)
|
||||
end,
|
||||
maps:fold(Fun, #{}, Selector).
|
||||
|
||||
bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
|
||||
bin(B) when is_binary(B) -> B;
|
||||
bin(L) when is_list(L) -> list_to_binary(L);
|
||||
bin(X) -> X.
|
||||
|
|
|
@ -36,6 +36,12 @@
|
|||
-compile(nowarn_export_all).
|
||||
-endif.
|
||||
|
||||
-define(PLACEHOLDERS, [?PH_USERNAME,
|
||||
?PH_CLIENTID,
|
||||
?PH_PEERHOST,
|
||||
?PH_CERT_CN_NAME,
|
||||
?PH_CERT_SUBJECT]).
|
||||
|
||||
description() ->
|
||||
"AuthZ with Mysql".
|
||||
|
||||
|
@ -44,7 +50,10 @@ init(#{query := SQL} = Source) ->
|
|||
{error, Reason} -> error({load_config_error, Reason});
|
||||
{ok, Id} -> Source#{annotations =>
|
||||
#{id => Id,
|
||||
query => parse_query(SQL)}}
|
||||
query => emqx_authz_utils:parse_sql(
|
||||
SQL,
|
||||
'?',
|
||||
?PLACEHOLDERS)}}
|
||||
end.
|
||||
|
||||
dry_run(Source) ->
|
||||
|
@ -58,7 +67,7 @@ authorize(Client, PubSub, Topic,
|
|||
query := {Query, Params}
|
||||
}
|
||||
}) ->
|
||||
RenderParams = replvar(Params, Client),
|
||||
RenderParams = emqx_authz_utils:render_sql_params(Params, Client),
|
||||
case emqx_resource:query(ResourceID, {sql, Query, RenderParams}) of
|
||||
{ok, _Columns, []} -> nomatch;
|
||||
{ok, Columns, Rows} ->
|
||||
|
@ -72,14 +81,6 @@ authorize(Client, PubSub, Topic,
|
|||
nomatch
|
||||
end.
|
||||
|
||||
parse_query(Sql) ->
|
||||
case re:run(Sql, ?RE_PLACEHOLDER, [global, {capture, all, list}]) of
|
||||
{match, Variables} ->
|
||||
Params = [Var || [Var] <- Variables],
|
||||
{re:replace(Sql, ?RE_PLACEHOLDER, "?", [global, {return, list}]), Params};
|
||||
nomatch ->
|
||||
{Sql, []}
|
||||
end.
|
||||
|
||||
do_authorize(_Client, _PubSub, _Topic, _Columns, []) ->
|
||||
nomatch;
|
||||
|
@ -102,30 +103,3 @@ index(Elem, List) ->
|
|||
index(_Elem, [], _Index) -> {error, not_found};
|
||||
index(Elem, [ Elem | _List], Index) -> Index;
|
||||
index(Elem, [ _ | List], Index) -> index(Elem, List, Index + 1).
|
||||
|
||||
replvar(Params, ClientInfo) ->
|
||||
replvar(Params, ClientInfo, []).
|
||||
|
||||
replvar([], _ClientInfo, Acc) ->
|
||||
lists:reverse(Acc);
|
||||
|
||||
replvar([?PH_S_USERNAME | Params], ClientInfo, Acc) ->
|
||||
replvar(Params, ClientInfo, [safe_get(username, ClientInfo) | Acc]);
|
||||
replvar([?PH_S_CLIENTID | Params], ClientInfo = #{clientid := _ClientId}, Acc) ->
|
||||
replvar(Params, ClientInfo, [safe_get(clientid, ClientInfo) | Acc]);
|
||||
replvar([?PH_S_PEERHOST | Params], ClientInfo = #{peerhost := IpAddr}, Acc) ->
|
||||
replvar(Params, ClientInfo, [inet_parse:ntoa(IpAddr) | Acc]);
|
||||
replvar([?PH_S_CERT_CN_NAME | Params], ClientInfo, Acc) ->
|
||||
replvar(Params, ClientInfo, [safe_get(cn, ClientInfo) | Acc]);
|
||||
replvar([?PH_S_CERT_SUBJECT | Params], ClientInfo, Acc) ->
|
||||
replvar(Params, ClientInfo, [safe_get(dn, ClientInfo) | Acc]);
|
||||
replvar([Param | Params], ClientInfo, Acc) ->
|
||||
replvar(Params, ClientInfo, [Param | Acc]).
|
||||
|
||||
safe_get(K, ClientInfo) ->
|
||||
bin(maps:get(K, ClientInfo, "undefined")).
|
||||
|
||||
bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
|
||||
bin(B) when is_binary(B) -> B;
|
||||
bin(L) when is_list(L) -> list_to_binary(L);
|
||||
bin(X) -> X.
|
||||
|
|
|
@ -36,11 +36,20 @@
|
|||
-compile(nowarn_export_all).
|
||||
-endif.
|
||||
|
||||
-define(PLACEHOLDERS, [?PH_USERNAME,
|
||||
?PH_CLIENTID,
|
||||
?PH_PEERHOST,
|
||||
?PH_CERT_CN_NAME,
|
||||
?PH_CERT_SUBJECT]).
|
||||
|
||||
description() ->
|
||||
"AuthZ with Postgresql".
|
||||
|
||||
init(#{query := SQL0} = Source) ->
|
||||
{SQL, PlaceHolders} = parse_query(SQL0),
|
||||
{SQL, PlaceHolders} = emqx_authz_utils:parse_sql(
|
||||
SQL0,
|
||||
'$n',
|
||||
?PLACEHOLDERS),
|
||||
ResourceID = emqx_authz_utils:make_resource_id(emqx_connector_pgsql),
|
||||
case emqx_resource:create_local(
|
||||
ResourceID,
|
||||
|
@ -60,27 +69,12 @@ destroy(#{annotations := #{id := Id}}) ->
|
|||
dry_run(Source) ->
|
||||
emqx_resource:create_dry_run_local(emqx_connector_pgsql, Source).
|
||||
|
||||
parse_query(Sql) ->
|
||||
case re:run(Sql, ?RE_PLACEHOLDER, [global, {capture, all, list}]) of
|
||||
{match, Captured} ->
|
||||
PlaceHolders = [PlaceHolder || [PlaceHolder] <- Captured],
|
||||
Replacements = ["$" ++ integer_to_list(I) || I <- lists:seq(1, length(PlaceHolders))],
|
||||
NSql = lists:foldl(
|
||||
fun({PlaceHolder, Replacement}, S) ->
|
||||
re:replace(
|
||||
S, emqx_authz:ph_to_re(PlaceHolder), Replacement, [{return, list}])
|
||||
end, Sql, lists:zip(PlaceHolders, Replacements)),
|
||||
{NSql, PlaceHolders};
|
||||
nomatch ->
|
||||
{Sql, []}
|
||||
end.
|
||||
|
||||
authorize(Client, PubSub, Topic,
|
||||
#{annotations := #{id := ResourceID,
|
||||
placeholders := Placeholders
|
||||
}
|
||||
}) ->
|
||||
RenderedParams = replvar(Placeholders, Client),
|
||||
RenderedParams = emqx_authz_utils:render_sql_params(Placeholders, Client),
|
||||
case emqx_resource:query(ResourceID, {prepared_query, ResourceID, RenderedParams}) of
|
||||
{ok, _Columns, []} -> nomatch;
|
||||
{ok, Columns, Rows} ->
|
||||
|
@ -115,30 +109,3 @@ index(Key, N, TupleList) when is_integer(N) ->
|
|||
index(_Tuple, [], _Index) -> {error, not_found};
|
||||
index(Tuple, [Tuple | _TupleList], Index) -> Index;
|
||||
index(Tuple, [_ | TupleList], Index) -> index(Tuple, TupleList, Index + 1).
|
||||
|
||||
replvar(Params, ClientInfo) ->
|
||||
replvar(Params, ClientInfo, []).
|
||||
|
||||
replvar([], _ClientInfo, Acc) ->
|
||||
lists:reverse(Acc);
|
||||
|
||||
replvar([?PH_S_USERNAME | Params], ClientInfo, Acc) ->
|
||||
replvar(Params, ClientInfo, [safe_get(username, ClientInfo) | Acc]);
|
||||
replvar([?PH_S_CLIENTID | Params], ClientInfo = #{clientid := ClientId}, Acc) ->
|
||||
replvar(Params, ClientInfo, [ClientId | Acc]);
|
||||
replvar([?PH_S_PEERHOST | Params], ClientInfo = #{peerhost := IpAddr}, Acc) ->
|
||||
replvar(Params, ClientInfo, [inet_parse:ntoa(IpAddr) | Acc]);
|
||||
replvar([?PH_S_CERT_CN_NAME | Params], ClientInfo, Acc) ->
|
||||
replvar(Params, ClientInfo, [safe_get(cn, ClientInfo) | Acc]);
|
||||
replvar([?PH_S_CERT_SUBJECT | Params], ClientInfo, Acc) ->
|
||||
replvar(Params, ClientInfo, [safe_get(dn, ClientInfo) | Acc]);
|
||||
replvar([Param | Params], ClientInfo, Acc) ->
|
||||
replvar(Params, ClientInfo, [Param | Acc]).
|
||||
|
||||
safe_get(K, ClientInfo) ->
|
||||
bin(maps:get(K, ClientInfo, "undefined")).
|
||||
|
||||
bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
|
||||
bin(B) when is_binary(B) -> B;
|
||||
bin(L) when is_list(L) -> list_to_binary(L);
|
||||
bin(X) -> X.
|
||||
|
|
|
@ -36,13 +36,22 @@
|
|||
-compile(nowarn_export_all).
|
||||
-endif.
|
||||
|
||||
-define(PLACEHOLDERS, [?PH_CERT_CN_NAME,
|
||||
?PH_CERT_SUBJECT,
|
||||
?PH_PEERHOST,
|
||||
?PH_CLIENTID,
|
||||
?PH_USERNAME]).
|
||||
|
||||
description() ->
|
||||
"AuthZ with Redis".
|
||||
|
||||
init(Source) ->
|
||||
init(#{cmd := CmdStr} = Source) ->
|
||||
Cmd = tokens(CmdStr),
|
||||
CmdTemplate = emqx_authz_utils:parse_deep(Cmd, ?PLACEHOLDERS),
|
||||
case emqx_authz_utils:create_resource(emqx_connector_redis, Source) of
|
||||
{error, Reason} -> error({load_config_error, Reason});
|
||||
{ok, Id} -> Source#{annotations => #{id => Id}}
|
||||
{ok, Id} -> Source#{annotations => #{id => Id},
|
||||
cmd_template => CmdTemplate}
|
||||
end.
|
||||
|
||||
destroy(#{annotations := #{id := Id}}) ->
|
||||
|
@ -52,18 +61,18 @@ dry_run(Source) ->
|
|||
emqx_resource:create_dry_run_local(emqx_connector_redis, Source).
|
||||
|
||||
authorize(Client, PubSub, Topic,
|
||||
#{cmd := CMD,
|
||||
#{cmd_template := CmdTemplate,
|
||||
annotations := #{id := ResourceID}
|
||||
}) ->
|
||||
NCMD = string:tokens(replvar(CMD, Client), " "),
|
||||
case emqx_resource:query(ResourceID, {cmd, NCMD}) of
|
||||
Cmd = emqx_authz_utils:render_deep(CmdTemplate, Client),
|
||||
case emqx_resource:query(ResourceID, {cmd, Cmd}) of
|
||||
{ok, []} -> nomatch;
|
||||
{ok, Rows} ->
|
||||
do_authorize(Client, PubSub, Topic, Rows);
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{ msg => "query_redis_error"
|
||||
, reason => Reason
|
||||
, cmd => NCMD
|
||||
, cmd => Cmd
|
||||
, resource_id => ResourceID}),
|
||||
nomatch
|
||||
end.
|
||||
|
@ -79,22 +88,6 @@ do_authorize(Client, PubSub, Topic, [TopicFilter, Action | Tail]) ->
|
|||
nomatch -> do_authorize(Client, PubSub, Topic, Tail)
|
||||
end.
|
||||
|
||||
replvar(Cmd, Client = #{cn := CN}) ->
|
||||
replvar(repl(Cmd, ?PH_S_CERT_CN_NAME, CN), maps:remove(cn, Client));
|
||||
replvar(Cmd, Client = #{dn := DN}) ->
|
||||
replvar(repl(Cmd, ?PH_S_CERT_SUBJECT, DN), maps:remove(dn, Client));
|
||||
replvar(Cmd, Client = #{peerhost := IpAddr}) ->
|
||||
replvar(repl(Cmd, ?PH_S_PEERHOST, inet_parse:ntoa(IpAddr)), maps:remove(peerhost, Client));
|
||||
replvar(Cmd, Client = #{clientid := ClientId}) ->
|
||||
replvar(repl(Cmd, ?PH_S_CLIENTID, ClientId), maps:remove(clientid, Client));
|
||||
replvar(Cmd, Client = #{username := Username}) ->
|
||||
replvar(repl(Cmd, ?PH_S_USERNAME, Username), maps:remove(username, Client));
|
||||
replvar(Cmd, _) ->
|
||||
Cmd.
|
||||
|
||||
repl(S, _VarPH, undefined) ->
|
||||
S;
|
||||
repl(S, VarPH, Val) ->
|
||||
NVal = re:replace(Val, "&", "\\\\&", [global, {return, list}]),
|
||||
NVarPH = emqx_authz:ph_to_re(VarPH),
|
||||
re:replace(S, NVarPH, NVal, [{return, list}]).
|
||||
tokens(Query) ->
|
||||
Tokens = binary:split(Query, <<" ">>, [global]),
|
||||
[Token || Token <- Tokens, size(Token) > 0].
|
||||
|
|
|
@ -22,6 +22,10 @@
|
|||
, make_resource_id/1
|
||||
, create_resource/2
|
||||
, update_config/2
|
||||
, parse_deep/2
|
||||
, parse_sql/3
|
||||
, render_deep/2
|
||||
, render_sql_params/2
|
||||
]).
|
||||
|
||||
-define(RESOURCE_GROUP, <<"emqx_authz">>).
|
||||
|
@ -51,10 +55,56 @@ update_config(Path, ConfigRequest) ->
|
|||
emqx_conf:update(Path, ConfigRequest, #{rawconf_with_defaults => true,
|
||||
override_to => cluster}).
|
||||
|
||||
parse_deep(Template, PlaceHolders) ->
|
||||
emqx_placeholder:preproc_tmpl_deep(Template, #{placeholders => PlaceHolders}).
|
||||
|
||||
parse_sql(Template, ReplaceWith, PlaceHolders) ->
|
||||
emqx_placeholder:preproc_sql(
|
||||
Template,
|
||||
#{replace_with => ReplaceWith,
|
||||
placeholders => PlaceHolders}).
|
||||
|
||||
render_deep(Template, Values) ->
|
||||
emqx_placeholder:proc_tmpl_deep(
|
||||
Template,
|
||||
client_vars(Values),
|
||||
#{return => full_binary, var_trans => fun handle_var/2}).
|
||||
|
||||
render_sql_params(ParamList, Values) ->
|
||||
emqx_placeholder:proc_tmpl(
|
||||
ParamList,
|
||||
client_vars(Values),
|
||||
#{return => rawlist, var_trans => fun handle_sql_var/2}).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
client_vars(ClientInfo) ->
|
||||
maps:from_list(
|
||||
lists:map(
|
||||
fun convert_client_var/1,
|
||||
maps:to_list(ClientInfo))).
|
||||
|
||||
convert_client_var({cn, CN}) -> {cert_common_name, CN};
|
||||
convert_client_var({dn, DN}) -> {cert_subject, DN};
|
||||
convert_client_var({protocol, Proto}) -> {proto_name, Proto};
|
||||
convert_client_var(Other) -> Other.
|
||||
|
||||
handle_var({var, _Name}, undefined) ->
|
||||
"undefined";
|
||||
handle_var({var, <<"peerhost">>}, IpAddr) ->
|
||||
inet_parse:ntoa(IpAddr);
|
||||
handle_var(_Name, Value) ->
|
||||
emqx_placeholder:bin(Value).
|
||||
|
||||
handle_sql_var({var, _Name}, undefined) ->
|
||||
"undefined";
|
||||
handle_sql_var({var, <<"peerhost">>}, IpAddr) ->
|
||||
inet_parse:ntoa(IpAddr);
|
||||
handle_sql_var(_Name, Value) ->
|
||||
emqx_placeholder:sql_data(Value).
|
||||
|
||||
bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
|
||||
bin(L) when is_list(L) -> list_to_binary(L);
|
||||
bin(X) -> X.
|
||||
|
|
|
@ -159,7 +159,7 @@ t_query_params(_Config) ->
|
|||
#{<<"url">> => <<"http://127.0.0.1:33333/authz/users/?"
|
||||
"username=${username}&"
|
||||
"clientid=${clientid}&"
|
||||
"peerhost=${host}&"
|
||||
"peerhost=${peerhost}&"
|
||||
"proto_name=${proto_name}&"
|
||||
"mountpoint=${mountpoint}&"
|
||||
"topic=${topic}&"
|
||||
|
@ -204,7 +204,7 @@ t_json_body(_Config) ->
|
|||
#{<<"method">> => <<"post">>,
|
||||
<<"body">> => #{<<"username">> => <<"${username}">>,
|
||||
<<"CLIENT">> => <<"${clientid}">>,
|
||||
<<"peerhost">> => <<"${host}">>,
|
||||
<<"peerhost">> => <<"${peerhost}">>,
|
||||
<<"proto_name">> => <<"${proto_name}">>,
|
||||
<<"mountpoint">> => <<"${mountpoint}">>,
|
||||
<<"topic">> => <<"${topic}">>,
|
||||
|
@ -250,7 +250,7 @@ t_form_body(_Config) ->
|
|||
#{<<"method">> => <<"post">>,
|
||||
<<"body">> => #{<<"username">> => <<"${username}">>,
|
||||
<<"clientid">> => <<"${clientid}">>,
|
||||
<<"peerhost">> => <<"${host}">>,
|
||||
<<"peerhost">> => <<"${peerhost}">>,
|
||||
<<"proto_name">> => <<"${proto_name}">>,
|
||||
<<"mountpoint">> => <<"${mountpoint}">>,
|
||||
<<"topic">> => <<"${topic}">>,
|
||||
|
|
|
@ -86,7 +86,7 @@ t_topic_rules(_Config) ->
|
|||
|
||||
|
||||
t_lookups(_Config) ->
|
||||
ClientInfo = #{clientid => <<"clientid">>,
|
||||
ClientInfo = #{clientid => <<"client id">>,
|
||||
cn => <<"cn">>,
|
||||
dn => <<"dn">>,
|
||||
username => <<"username">>,
|
||||
|
@ -95,7 +95,7 @@ t_lookups(_Config) ->
|
|||
listener => {tcp, default}
|
||||
},
|
||||
|
||||
ByClientid = #{<<"mqtt_user:clientid">> =>
|
||||
ByClientid = #{<<"mqtt_user:client id">> =>
|
||||
#{<<"a">> => <<"all">>}},
|
||||
|
||||
ok = setup_sample(ByClientid),
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
%% preprocess and process template string with place holders
|
||||
-export([ preproc_tmpl/1
|
||||
, preproc_tmpl/2
|
||||
, proc_tmpl/2
|
||||
, proc_tmpl/3
|
||||
, preproc_cmd/1
|
||||
|
@ -28,43 +29,66 @@
|
|||
, proc_sql/2
|
||||
, proc_sql_param_str/2
|
||||
, proc_cql_param_str/2
|
||||
]).
|
||||
, preproc_tmpl_deep/1
|
||||
, preproc_tmpl_deep/2
|
||||
, proc_tmpl_deep/2
|
||||
, proc_tmpl_deep/3
|
||||
|
||||
-import(emqx_plugin_libs_rule, [bin/1]).
|
||||
, bin/1
|
||||
, sql_data/1
|
||||
]).
|
||||
|
||||
-define(EX_PLACE_HOLDER, "(\\$\\{[a-zA-Z0-9\\._]+\\})").
|
||||
-define(EX_WITHE_CHARS, "\\s"). %% Space and CRLF
|
||||
|
||||
-type(tmpl_token() :: list({var, binary()} | {str, binary()})).
|
||||
-type tmpl_token() :: list({var, binary()} | {str, binary()}).
|
||||
|
||||
-type(tmpl_cmd() :: list(tmpl_token())).
|
||||
-type tmpl_cmd() :: list(tmpl_token()).
|
||||
|
||||
-type(prepare_statement_key() :: binary()).
|
||||
-type prepare_statement_key() :: binary().
|
||||
|
||||
%% preprocess template string with place holders
|
||||
-spec(preproc_tmpl(binary()) -> tmpl_token()).
|
||||
-type var_trans() ::
|
||||
fun((FoundValue :: term()) -> binary()) |
|
||||
fun((Placeholder :: term(), FoundValue :: term()) -> binary()).
|
||||
|
||||
-type preproc_tmpl_opts() :: #{placeholders => list(binary())}.
|
||||
|
||||
-type preproc_sql_opts() :: #{placeholders => list(binary()),
|
||||
replace_with => '?' | '$n'}.
|
||||
|
||||
-type preproc_deep_opts() :: #{placeholders => list(binary()),
|
||||
process_keys => boolean()}.
|
||||
|
||||
-type proc_tmpl_opts() :: #{return => rawlist | full_binary,
|
||||
var_trans => var_trans()}.
|
||||
|
||||
-type deep_template() ::
|
||||
#{deep_template() => deep_template()} |
|
||||
{tuple, [deep_template()]} |
|
||||
[deep_template()] |
|
||||
{tmpl, tmpl_token()} |
|
||||
{value, term()}.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% APIs
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
-spec preproc_tmpl(binary()) -> tmpl_token().
|
||||
preproc_tmpl(Str) ->
|
||||
Tokens = re:split(Str, ?EX_PLACE_HOLDER, [{return,binary},group,trim]),
|
||||
preproc_tmpl(Tokens, []).
|
||||
preproc_tmpl(Str, #{}).
|
||||
|
||||
preproc_tmpl([], Acc) ->
|
||||
lists:reverse(Acc);
|
||||
preproc_tmpl([[Str, Phld] | Tokens], Acc) ->
|
||||
preproc_tmpl(Tokens,
|
||||
put_head(var, parse_nested(unwrap(Phld)),
|
||||
put_head(str, Str, Acc)));
|
||||
preproc_tmpl([[Str] | Tokens], Acc) ->
|
||||
preproc_tmpl(Tokens, put_head(str, Str, Acc)).
|
||||
-spec preproc_tmpl(binary(), preproc_tmpl_opts()) -> tmpl_token().
|
||||
preproc_tmpl(Str, Opts) ->
|
||||
RE = preproc_var_re(Opts),
|
||||
Tokens = re:split(Str, RE, [{return, binary}, group, trim]),
|
||||
do_preproc_tmpl(Tokens, []).
|
||||
|
||||
put_head(_Type, <<>>, List) -> List;
|
||||
put_head(Type, Term, List) ->
|
||||
[{Type, Term} | List].
|
||||
|
||||
-spec(proc_tmpl(tmpl_token(), map()) -> binary()).
|
||||
-spec proc_tmpl(tmpl_token(), map()) -> binary().
|
||||
proc_tmpl(Tokens, Data) ->
|
||||
proc_tmpl(Tokens, Data, #{return => full_binary}).
|
||||
|
||||
-spec(proc_tmpl(tmpl_token(), map(), map()) -> binary() | list()).
|
||||
-spec proc_tmpl(tmpl_token(), map(), proc_tmpl_opts()) -> binary() | list().
|
||||
proc_tmpl(Tokens, Data, Opts = #{return := full_binary}) ->
|
||||
Trans = maps:get(var_trans, Opts, fun emqx_plugin_libs_rule:bin/1),
|
||||
list_to_binary(
|
||||
|
@ -74,54 +98,134 @@ proc_tmpl(Tokens, Data, Opts = #{return := rawlist}) ->
|
|||
Trans = maps:get(var_trans, Opts, undefined),
|
||||
lists:map(
|
||||
fun ({str, Str}) -> Str;
|
||||
({var, Phld}) when is_function(Trans) ->
|
||||
({var, Phld}) when is_function(Trans, 1) ->
|
||||
Trans(get_phld_var(Phld, Data));
|
||||
({var, Phld}) when is_function(Trans, 2) ->
|
||||
Trans(Phld, get_phld_var(Phld, Data));
|
||||
({var, Phld}) ->
|
||||
get_phld_var(Phld, Data)
|
||||
end, Tokens).
|
||||
|
||||
|
||||
-spec(preproc_cmd(binary()) -> tmpl_cmd()).
|
||||
-spec preproc_cmd(binary()) -> tmpl_cmd().
|
||||
preproc_cmd(Str) ->
|
||||
SubStrList = re:split(Str, ?EX_WITHE_CHARS, [{return,binary},trim]),
|
||||
[preproc_tmpl(SubStr) || SubStr <- SubStrList].
|
||||
|
||||
-spec(proc_cmd([tmpl_token()], map()) -> binary() | list()).
|
||||
-spec proc_cmd([tmpl_token()], map()) -> binary() | list().
|
||||
proc_cmd(Tokens, Data) ->
|
||||
proc_cmd(Tokens, Data, #{return => full_binary}).
|
||||
-spec(proc_cmd([tmpl_token()], map(), map()) -> list()).
|
||||
-spec proc_cmd([tmpl_token()], map(), map()) -> list().
|
||||
proc_cmd(Tokens, Data, Opts) ->
|
||||
[proc_tmpl(Tks, Data, Opts) || Tks <- Tokens].
|
||||
|
||||
|
||||
%% preprocess SQL with place holders
|
||||
-spec(preproc_sql(Sql::binary()) -> {prepare_statement_key(), tmpl_token()}).
|
||||
-spec preproc_sql(Sql::binary()) -> {prepare_statement_key(), tmpl_token()}.
|
||||
preproc_sql(Sql) ->
|
||||
preproc_sql(Sql, '?').
|
||||
|
||||
-spec(preproc_sql(Sql::binary(), ReplaceWith :: '?' | '$n')
|
||||
-> {prepare_statement_key(), tmpl_token()}).
|
||||
-spec preproc_sql(binary(), '?' | '$n' | preproc_sql_opts()) ->
|
||||
{prepare_statement_key(), tmpl_token()}.
|
||||
preproc_sql(Sql, ReplaceWith) when is_atom(ReplaceWith) ->
|
||||
preproc_sql(Sql, #{replace_with => ReplaceWith});
|
||||
|
||||
preproc_sql(Sql, ReplaceWith) ->
|
||||
case re:run(Sql, ?EX_PLACE_HOLDER, [{capture, all_but_first, binary}, global]) of
|
||||
preproc_sql(Sql, Opts) ->
|
||||
RE = preproc_var_re(Opts),
|
||||
ReplaceWith = maps:get(replace_with, Opts, '?'),
|
||||
case re:run(Sql, RE, [{capture, all_but_first, binary}, global]) of
|
||||
{match, PlaceHolders} ->
|
||||
PhKs = [parse_nested(unwrap(Phld)) || [Phld | _] <- PlaceHolders],
|
||||
{replace_with(Sql, ReplaceWith), [{var, Phld} || Phld <- PhKs]};
|
||||
{replace_with(Sql, RE, ReplaceWith), [{var, Phld} || Phld <- PhKs]};
|
||||
nomatch ->
|
||||
{Sql, []}
|
||||
end.
|
||||
|
||||
-spec(proc_sql(tmpl_token(), map()) -> list()).
|
||||
|
||||
-spec proc_sql(tmpl_token(), map()) -> list().
|
||||
proc_sql(Tokens, Data) ->
|
||||
proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => fun sql_data/1}).
|
||||
|
||||
-spec(proc_sql_param_str(tmpl_token(), map()) -> binary()).
|
||||
|
||||
-spec proc_sql_param_str(tmpl_token(), map()) -> binary().
|
||||
proc_sql_param_str(Tokens, Data) ->
|
||||
proc_param_str(Tokens, Data, fun quote_sql/1).
|
||||
|
||||
-spec(proc_cql_param_str(tmpl_token(), map()) -> binary()).
|
||||
|
||||
-spec proc_cql_param_str(tmpl_token(), map()) -> binary().
|
||||
proc_cql_param_str(Tokens, Data) ->
|
||||
proc_param_str(Tokens, Data, fun quote_cql/1).
|
||||
|
||||
|
||||
-spec preproc_tmpl_deep(term()) -> deep_template().
|
||||
preproc_tmpl_deep(Data) ->
|
||||
preproc_tmpl_deep(Data, #{process_keys => true}).
|
||||
|
||||
-spec preproc_tmpl_deep(term(), preproc_deep_opts()) -> deep_template().
|
||||
preproc_tmpl_deep(List, Opts) when is_list(List) ->
|
||||
[preproc_tmpl_deep(El, Opts) || El <- List];
|
||||
|
||||
preproc_tmpl_deep(Map, Opts) when is_map(Map) ->
|
||||
maps:from_list(
|
||||
lists:map(
|
||||
fun({K, V}) ->
|
||||
{preproc_tmpl_deep_map_key(K, Opts),
|
||||
preproc_tmpl_deep(V, Opts)}
|
||||
end,
|
||||
maps:to_list(Map)));
|
||||
|
||||
preproc_tmpl_deep(Binary, Opts) when is_binary(Binary) ->
|
||||
{tmpl, preproc_tmpl(Binary, Opts)};
|
||||
|
||||
preproc_tmpl_deep(Tuple, Opts) when is_tuple(Tuple) ->
|
||||
{tuple, preproc_tmpl_deep(tuple_to_list(Tuple), Opts)};
|
||||
|
||||
preproc_tmpl_deep(Other, _Opts) ->
|
||||
{value, Other}.
|
||||
|
||||
|
||||
-spec proc_tmpl_deep(deep_template(), map()) -> term().
|
||||
proc_tmpl_deep(DeepTmpl, Data) ->
|
||||
proc_tmpl_deep(DeepTmpl, Data, #{return => full_binary}).
|
||||
|
||||
-spec proc_tmpl_deep(deep_template(), map(), proc_tmpl_opts()) -> term().
|
||||
proc_tmpl_deep(List, Data, Opts) when is_list(List) ->
|
||||
[proc_tmpl_deep(El, Data, Opts) || El <- List];
|
||||
|
||||
proc_tmpl_deep(Map, Data, Opts) when is_map(Map) ->
|
||||
maps:from_list(
|
||||
lists:map(
|
||||
fun({K, V}) ->
|
||||
{proc_tmpl_deep(K, Data, Opts),
|
||||
proc_tmpl_deep(V, Data, Opts)}
|
||||
end,
|
||||
maps:to_list(Map)));
|
||||
|
||||
proc_tmpl_deep({value, Value}, _Data, _Opts) -> Value;
|
||||
|
||||
proc_tmpl_deep({tmpl, Tokens}, Data, Opts) -> proc_tmpl(Tokens, Data, Opts);
|
||||
|
||||
proc_tmpl_deep({tuple, Elements}, Data, Opts) ->
|
||||
list_to_tuple([proc_tmpl_deep(El, Data, Opts) || El <- Elements]).
|
||||
|
||||
|
||||
-spec sql_data(term()) -> term().
|
||||
sql_data(undefined) -> null;
|
||||
sql_data(List) when is_list(List) -> List;
|
||||
sql_data(Bin) when is_binary(Bin) -> Bin;
|
||||
sql_data(Num) when is_number(Num) -> Num;
|
||||
sql_data(Bool) when is_boolean(Bool) -> Bool;
|
||||
sql_data(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
|
||||
sql_data(Map) when is_map(Map) -> emqx_json:encode(Map).
|
||||
|
||||
|
||||
-spec bin(term()) -> binary().
|
||||
bin(Val) -> emqx_plugin_libs_rule:bin(Val).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
proc_param_str(Tokens, Data, Quote) ->
|
||||
iolist_to_binary(
|
||||
proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => Quote})).
|
||||
|
@ -132,10 +236,42 @@ get_phld_var(Fun, Data) when is_function(Fun) ->
|
|||
get_phld_var(Phld, Data) ->
|
||||
emqx_rule_maps:nested_get(Phld, Data).
|
||||
|
||||
replace_with(Tmpl, '?') ->
|
||||
re:replace(Tmpl, ?EX_PLACE_HOLDER, "?", [{return, binary}, global]);
|
||||
replace_with(Tmpl, '$n') ->
|
||||
Parts = re:split(Tmpl, ?EX_PLACE_HOLDER, [{return, binary}, trim, group]),
|
||||
preproc_var_re(#{placeholders := PHs}) ->
|
||||
"(" ++ string:join([ph_to_re(PH) || PH <- PHs], "|") ++ ")";
|
||||
preproc_var_re(#{}) ->
|
||||
?EX_PLACE_HOLDER.
|
||||
|
||||
ph_to_re(VarPH) ->
|
||||
re:replace(VarPH, "[\\$\\{\\}]", "\\\\&", [global, {return, list}]).
|
||||
|
||||
do_preproc_tmpl([], Acc) ->
|
||||
lists:reverse(Acc);
|
||||
do_preproc_tmpl([[Str, Phld] | Tokens], Acc) ->
|
||||
do_preproc_tmpl(
|
||||
Tokens,
|
||||
put_head(
|
||||
var,
|
||||
parse_nested(unwrap(Phld)),
|
||||
put_head(str, Str, Acc)));
|
||||
do_preproc_tmpl([[Str] | Tokens], Acc) ->
|
||||
do_preproc_tmpl(
|
||||
Tokens,
|
||||
put_head(str, Str, Acc)).
|
||||
|
||||
put_head(_Type, <<>>, List) -> List;
|
||||
put_head(Type, Term, List) ->
|
||||
[{Type, Term} | List].
|
||||
|
||||
preproc_tmpl_deep_map_key(Key, #{process_keys := true} = Opts) ->
|
||||
preproc_tmpl_deep(Key, Opts);
|
||||
|
||||
preproc_tmpl_deep_map_key(Key, _) ->
|
||||
{value, Key}.
|
||||
|
||||
replace_with(Tmpl, RE, '?') ->
|
||||
re:replace(Tmpl, RE, "?", [{return, binary}, global]);
|
||||
replace_with(Tmpl, RE, '$n') ->
|
||||
Parts = re:split(Tmpl, RE, [{return, binary}, trim, group]),
|
||||
{Res, _} =
|
||||
lists:foldl(
|
||||
fun([Tkn, _Phld], {Acc, Seq}) ->
|
||||
|
@ -155,14 +291,6 @@ parse_nested(Attr) ->
|
|||
unwrap(<<"${", Val/binary>>) ->
|
||||
binary:part(Val, {0, byte_size(Val)-1}).
|
||||
|
||||
sql_data(undefined) -> null;
|
||||
sql_data(List) when is_list(List) -> List;
|
||||
sql_data(Bin) when is_binary(Bin) -> Bin;
|
||||
sql_data(Num) when is_number(Num) -> Num;
|
||||
sql_data(Bool) when is_boolean(Bool) -> Bool;
|
||||
sql_data(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
|
||||
sql_data(Map) when is_map(Map) -> emqx_json:encode(Map).
|
||||
|
||||
quote_sql(Str) ->
|
||||
quote(Str, <<"\\\\'">>).
|
||||
|
||||
|
|
|
@ -30,6 +30,18 @@ t_proc_tmpl(_) ->
|
|||
?assertEqual(<<"a:1,b:1,c:1.0,d:{\"d1\":\"hi\"}">>,
|
||||
emqx_placeholder:proc_tmpl(Tks, Selected)).
|
||||
|
||||
t_proc_tmpl_path(_) ->
|
||||
Selected = #{d => #{d1 => <<"hi">>}},
|
||||
Tks = emqx_placeholder:preproc_tmpl(<<"d.d1:${d.d1}">>),
|
||||
?assertEqual(<<"d.d1:hi">>,
|
||||
emqx_placeholder:proc_tmpl(Tks, Selected)).
|
||||
|
||||
t_proc_tmpl_custom_ph(_) ->
|
||||
Selected = #{a => <<"a">>, b => <<"b">>},
|
||||
Tks = emqx_placeholder:preproc_tmpl(<<"a:${a},b:${b}">>, #{placeholders => [<<"${a}">>]}),
|
||||
?assertEqual(<<"a:a,b:${b}">>,
|
||||
emqx_placeholder:proc_tmpl(Tks, Selected)).
|
||||
|
||||
t_proc_tmpl1(_) ->
|
||||
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
|
||||
Tks = emqx_placeholder:preproc_tmpl(<<"a:$a,b:b},c:{c},d:${d">>),
|
||||
|
@ -59,6 +71,7 @@ t_preproc_sql1(_) ->
|
|||
?assertEqual(<<"a:$1,b:$2,c:$3,d:$4">>, PrepareStatement),
|
||||
?assertEqual([<<"1">>,1,1.0,<<"{\"d1\":\"hi\"}">>],
|
||||
emqx_placeholder:proc_sql(ParamsTokens, Selected)).
|
||||
|
||||
t_preproc_sql2(_) ->
|
||||
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
|
||||
{PrepareStatement, ParamsTokens} =
|
||||
|
@ -89,3 +102,29 @@ t_preproc_sql5(_) ->
|
|||
ParamsTokens = emqx_placeholder:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>),
|
||||
?assertEqual(<<"a:'1''''2',b:1,c:1.0,d:'{\"d1\":\"someone''s phone\"}'">>,
|
||||
emqx_placeholder:proc_cql_param_str(ParamsTokens, Selected)).
|
||||
|
||||
t_preproc_sql6(_) ->
|
||||
Selected = #{a => <<"a">>, b => <<"b">>},
|
||||
{PrepareStatement, ParamsTokens} = emqx_placeholder:preproc_sql(
|
||||
<<"a:${a},b:${b}">>,
|
||||
#{replace_with => '$n',
|
||||
placeholders => [<<"${a}">>]}),
|
||||
?assertEqual(<<"a:$1,b:${b}">>, PrepareStatement),
|
||||
?assertEqual([<<"a">>],
|
||||
emqx_placeholder:proc_sql(ParamsTokens, Selected)).
|
||||
|
||||
t_preproc_tmpl_deep(_) ->
|
||||
Selected = #{a => <<"1">>, b => 1, c => 1.0, d => #{d1 => <<"hi">>}},
|
||||
|
||||
Tmpl0 = emqx_placeholder:preproc_tmpl_deep(
|
||||
#{<<"${a}">> => [<<"${b}">>, "c", 2, 3.0, '${d}', {[<<"${c}">>], 0}]}),
|
||||
?assertEqual(
|
||||
#{<<"1">> => [<<"1">>, "c", 2, 3.0, '${d}', {[<<"1.0">>], 0}]},
|
||||
emqx_placeholder:proc_tmpl_deep(Tmpl0, Selected)),
|
||||
|
||||
Tmpl1 = emqx_placeholder:preproc_tmpl_deep(
|
||||
#{<<"${a}">> => [<<"${b}">>, "c", 2, 3.0, '${d}', {[<<"${c}">>], 0}]},
|
||||
#{process_keys => false}),
|
||||
?assertEqual(
|
||||
#{<<"${a}">> => [<<"1">>, "c", 2, 3.0, '${d}', {[<<"1.0">>], 0}]},
|
||||
emqx_placeholder:proc_tmpl_deep(Tmpl1, Selected)).
|
||||
|
|
Loading…
Reference in New Issue