fix(authz): merge pull request #6341 from JimMoen/fix-authz
fix authz resources create and update, http and mongo
This commit is contained in:
commit
fa40189209
|
@ -284,7 +284,7 @@ replace_placeholders([{K, V0} | More], Credential, Acc) ->
|
|||
undefined ->
|
||||
error({cannot_get_variable, V0});
|
||||
V ->
|
||||
replace_placeholders(More, Credential, [{K, emqx_authn_utils:bin(V)} | Acc])
|
||||
replace_placeholders(More, Credential, [{K, to_bin(V)} | Acc])
|
||||
end.
|
||||
|
||||
append_query(Path, []) ->
|
||||
|
|
|
@ -207,7 +207,12 @@ check_dup_types([Source | Sources], Checked) ->
|
|||
create_dry_run(T, Source) ->
|
||||
case is_connector_source(T) of
|
||||
true ->
|
||||
[NSource] = check_sources([Source]),
|
||||
[CheckedSource] = check_sources([Source]),
|
||||
case T of
|
||||
http ->
|
||||
URIMap = maps:get(url, CheckedSource),
|
||||
NSource = maps:put(base_url, maps:remove(query, URIMap), CheckedSource)
|
||||
end,
|
||||
emqx_resource:create_dry_run(connector_module(T), NSource);
|
||||
false ->
|
||||
ok
|
||||
|
|
|
@ -440,14 +440,15 @@ read_certs(#{<<"ssl">> := SSL} = Source) ->
|
|||
{error, Reason} ->
|
||||
?SLOG(error, Reason#{msg => failed_to_readd_ssl_file}),
|
||||
throw(failed_to_readd_ssl_file);
|
||||
NewSSL ->
|
||||
{ok, NewSSL} ->
|
||||
Source#{<<"ssl">> => NewSSL}
|
||||
end;
|
||||
read_certs(Source) -> Source.
|
||||
|
||||
maybe_write_certs(#{<<"ssl">> := #{<<"enable">> := true} = SSL} = Source) ->
|
||||
Type = maps:get(<<"type">>, Source),
|
||||
emqx_tls_lib:ensure_ssl_files(filename:join(["authz", Type]), SSL);
|
||||
{ok, Return} = emqx_tls_lib:ensure_ssl_files(filename:join(["authz", Type]), SSL),
|
||||
maps:put(<<"ssl">>, Return, Source);
|
||||
maybe_write_certs(Source) -> Source.
|
||||
|
||||
write_file(Filename, Bytes0) ->
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
%% AuthZ Callbacks
|
||||
-export([ authorize/4
|
||||
, description/0
|
||||
, parse_url/1
|
||||
]).
|
||||
|
||||
-ifdef(TEST).
|
||||
|
@ -36,7 +37,7 @@ description() ->
|
|||
|
||||
authorize(Client, PubSub, Topic,
|
||||
#{type := http,
|
||||
url := #{path := Path} = Url,
|
||||
url := #{path := Path} = URL,
|
||||
headers := Headers,
|
||||
method := Method,
|
||||
request_timeout := RequestTimeout,
|
||||
|
@ -44,7 +45,7 @@ authorize(Client, PubSub, Topic,
|
|||
} = Source) ->
|
||||
Request = case Method of
|
||||
get ->
|
||||
Query = maps:get(query, Url, ""),
|
||||
Query = maps:get(query, URL, ""),
|
||||
Path1 = replvar(Path ++ "?" ++ Query, PubSub, Topic, Client),
|
||||
{Path1, maps:to_list(Headers)};
|
||||
_ ->
|
||||
|
@ -57,9 +58,31 @@ authorize(Client, PubSub, Topic,
|
|||
{Path1, maps:to_list(Headers), Body1}
|
||||
end,
|
||||
case emqx_resource:query(ResourceID, {Method, Request, RequestTimeout}) of
|
||||
{ok, 204, _Headers} -> {matched, allow};
|
||||
{ok, 200, _Headers, _Body} -> {matched, allow};
|
||||
_ -> nomatch
|
||||
{ok, 200, _Headers} ->
|
||||
{matched, allow};
|
||||
{ok, 204, _Headers} ->
|
||||
{matched, allow};
|
||||
{ok, 200, _Headers, _Body} ->
|
||||
{matched, allow};
|
||||
{ok, _Status, _Headers, _Body} ->
|
||||
nomatch;
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{msg => "http_server_query_failed",
|
||||
resource => ResourceID,
|
||||
reason => Reason}),
|
||||
ignore
|
||||
end.
|
||||
|
||||
parse_url(URL)
|
||||
when URL =:= undefined ->
|
||||
#{};
|
||||
parse_url(URL) ->
|
||||
{ok, URIMap} = emqx_http_lib:uri_parse(URL),
|
||||
case maps:get(query, URIMap, undefined) of
|
||||
undefined ->
|
||||
URIMap#{query => ""};
|
||||
_ ->
|
||||
URIMap
|
||||
end.
|
||||
|
||||
query_string(Body) ->
|
||||
|
@ -88,7 +111,7 @@ replvar(Str0, PubSub, Topic,
|
|||
is_binary(Str0) ->
|
||||
NTopic = emqx_http_lib:uri_encode(Topic),
|
||||
Str1 = re:replace( Str0, emqx_authz:ph_to_re(?PH_S_CLIENTID)
|
||||
, Clientid, [global, {return, binary}]),
|
||||
, bin(Clientid), [global, {return, binary}]),
|
||||
Str2 = re:replace( Str1, emqx_authz:ph_to_re(?PH_S_USERNAME)
|
||||
, bin(Username), [global, {return, binary}]),
|
||||
Str3 = re:replace( Str2, emqx_authz:ph_to_re(?PH_S_HOST)
|
||||
|
@ -96,9 +119,9 @@ replvar(Str0, PubSub, Topic,
|
|||
Str4 = re:replace( Str3, emqx_authz:ph_to_re(?PH_S_PROTONAME)
|
||||
, bin(Protocol), [global, {return, binary}]),
|
||||
Str5 = re:replace( Str4, emqx_authz:ph_to_re(?PH_S_MOUNTPOINT)
|
||||
, Mountpoint, [global, {return, binary}]),
|
||||
, bin(Mountpoint), [global, {return, binary}]),
|
||||
Str6 = re:replace( Str5, emqx_authz:ph_to_re(?PH_S_TOPIC)
|
||||
, NTopic, [global, {return, binary}]),
|
||||
, bin(NTopic), [global, {return, binary}]),
|
||||
Str7 = re:replace( Str6, emqx_authz:ph_to_re(?PH_S_ACTION)
|
||||
, bin(PubSub), [global, {return, binary}]),
|
||||
Str7.
|
||||
|
|
|
@ -32,10 +32,15 @@
|
|||
-export([ namespace/0
|
||||
, roots/0
|
||||
, fields/1
|
||||
, validations/0
|
||||
]).
|
||||
|
||||
-import(emqx_schema, [mk_duration/2]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Hocon Schema
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
namespace() -> authz.
|
||||
|
||||
%% @doc authorization schema is not exported
|
||||
|
@ -98,92 +103,24 @@ and the new rules will override all rules from the old config file.
|
|||
}}
|
||||
];
|
||||
fields(http_get) ->
|
||||
[ {type, #{type => http}}
|
||||
, {enable, #{type => boolean(),
|
||||
default => true}}
|
||||
, {url, #{type => url()}}
|
||||
, {method, #{type => get, default => get }}
|
||||
, {headers, #{type => map(),
|
||||
default => #{ <<"accept">> => <<"application/json">>
|
||||
, <<"cache-control">> => <<"no-cache">>
|
||||
, <<"connection">> => <<"keep-alive">>
|
||||
, <<"keep-alive">> => <<"timeout=5">>
|
||||
},
|
||||
converter => fun (Headers0) ->
|
||||
Headers1 = maps:fold(fun(K0, V, AccIn) ->
|
||||
K1 = iolist_to_binary(string:to_lower(to_list(K0))),
|
||||
maps:put(K1, V, AccIn)
|
||||
end, #{}, Headers0),
|
||||
maps:merge(#{ <<"accept">> => <<"application/json">>
|
||||
, <<"cache-control">> => <<"no-cache">>
|
||||
, <<"connection">> => <<"keep-alive">>
|
||||
, <<"keep-alive">> => <<"timeout=5">>
|
||||
}, Headers1)
|
||||
end
|
||||
}
|
||||
}
|
||||
, {request_timeout, mk_duration("request timeout", #{default => "30s"})}
|
||||
] ++ proplists:delete(base_url, emqx_connector_http:fields(config));
|
||||
[ {method, #{type => get, default => post}}
|
||||
, {headers, fun headers_no_content_type/1}
|
||||
] ++ http_common_fields();
|
||||
fields(http_post) ->
|
||||
[ {type, #{type => http}}
|
||||
, {enable, #{type => boolean(),
|
||||
default => true}}
|
||||
, {url, #{type => url()}}
|
||||
, {method, #{type => post,
|
||||
default => get}}
|
||||
, {headers, #{type => map(),
|
||||
default => #{ <<"accept">> => <<"application/json">>
|
||||
, <<"cache-control">> => <<"no-cache">>
|
||||
, <<"connection">> => <<"keep-alive">>
|
||||
, <<"content-type">> => <<"application/json">>
|
||||
, <<"keep-alive">> => <<"timeout=5">>
|
||||
},
|
||||
converter => fun (Headers0) ->
|
||||
Headers1 = maps:fold(fun(K0, V, AccIn) ->
|
||||
K1 = iolist_to_binary(string:to_lower(binary_to_list(K0))),
|
||||
maps:put(K1, V, AccIn)
|
||||
end, #{}, Headers0),
|
||||
maps:merge(#{ <<"accept">> => <<"application/json">>
|
||||
, <<"cache-control">> => <<"no-cache">>
|
||||
, <<"connection">> => <<"keep-alive">>
|
||||
, <<"content-type">> => <<"application/json">>
|
||||
, <<"keep-alive">> => <<"timeout=5">>
|
||||
}, Headers1)
|
||||
end
|
||||
}
|
||||
}
|
||||
, {request_timeout, mk_duration("request timeout", #{default => "30s"})}
|
||||
, {body, #{type => map(),
|
||||
nullable => true
|
||||
}
|
||||
}
|
||||
] ++ proplists:delete(base_url, emqx_connector_http:fields(config));
|
||||
[ {method, #{type => post, default => post}}
|
||||
, {headers, fun headers/1}
|
||||
] ++ http_common_fields();
|
||||
fields(mnesia) ->
|
||||
[ {type, #{type => 'built-in-database'}}
|
||||
, {enable, #{type => boolean(),
|
||||
default => true}}
|
||||
];
|
||||
fields(mongo_single) ->
|
||||
[ {collection, #{type => atom()}}
|
||||
, {selector, #{type => map()}}
|
||||
, {type, #{type => mongodb}}
|
||||
, {enable, #{type => boolean(),
|
||||
default => true}}
|
||||
] ++ emqx_connector_mongo:fields(single);
|
||||
mongo_common_fields() ++ emqx_connector_mongo:fields(single);
|
||||
fields(mongo_rs) ->
|
||||
[ {collection, #{type => atom()}}
|
||||
, {selector, #{type => map()}}
|
||||
, {type, #{type => mongodb}}
|
||||
, {enable, #{type => boolean(),
|
||||
default => true}}
|
||||
] ++ emqx_connector_mongo:fields(rs);
|
||||
mongo_common_fields() ++ emqx_connector_mongo:fields(rs);
|
||||
fields(mongo_sharded) ->
|
||||
[ {collection, #{type => atom()}}
|
||||
, {selector, #{type => map()}}
|
||||
, {type, #{type => mongodb}}
|
||||
, {enable, #{type => boolean(),
|
||||
default => true}}
|
||||
] ++ emqx_connector_mongo:fields(sharded);
|
||||
mongo_common_fields() ++ emqx_connector_mongo:fields(sharded);
|
||||
fields(mysql) ->
|
||||
connector_fields(mysql) ++
|
||||
[ {query, query()} ];
|
||||
|
@ -203,10 +140,87 @@ fields(redis_cluster) ->
|
|||
connector_fields(redis, cluster) ++
|
||||
[ {cmd, query()} ].
|
||||
|
||||
http_common_fields() ->
|
||||
[ {type, #{type => http}}
|
||||
, {enable, #{type => boolean(), default => true}}
|
||||
, {url, #{type => url()}}
|
||||
, {request_timeout, mk_duration("request timeout", #{default => "30s"})}
|
||||
, {body, #{type => map(), nullable => true}}
|
||||
] ++ proplists:delete(base_url, emqx_connector_http:fields(config)).
|
||||
|
||||
mongo_common_fields() ->
|
||||
[ {collection, #{type => atom()}}
|
||||
, {selector, #{type => map()}}
|
||||
, {type, #{type => mongodb}}
|
||||
, {enable, #{type => boolean(),
|
||||
default => true}}
|
||||
].
|
||||
|
||||
validations() ->
|
||||
[ {check_ssl_opts, fun check_ssl_opts/1}
|
||||
, {check_headers, fun check_headers/1}
|
||||
].
|
||||
|
||||
headers(type) -> map();
|
||||
headers(converter) ->
|
||||
fun(Headers) ->
|
||||
maps:merge(default_headers(), transform_header_name(Headers))
|
||||
end;
|
||||
headers(default) -> default_headers();
|
||||
headers(_) -> undefined.
|
||||
|
||||
headers_no_content_type(type) -> map();
|
||||
headers_no_content_type(converter) ->
|
||||
fun(Headers) ->
|
||||
maps:merge(default_headers_no_content_type(), transform_header_name(Headers))
|
||||
end;
|
||||
headers_no_content_type(default) -> default_headers_no_content_type();
|
||||
headers_no_content_type(_) -> undefined.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
default_headers() ->
|
||||
maps:put(<<"content-type">>,
|
||||
<<"application/json">>,
|
||||
default_headers_no_content_type()).
|
||||
|
||||
default_headers_no_content_type() ->
|
||||
#{ <<"accept">> => <<"application/json">>
|
||||
, <<"cache-control">> => <<"no-cache">>
|
||||
, <<"connection">> => <<"keep-alive">>
|
||||
, <<"keep-alive">> => <<"timeout=5">>
|
||||
}.
|
||||
|
||||
transform_header_name(Headers) ->
|
||||
maps:fold(fun(K0, V, Acc) ->
|
||||
K = list_to_binary(string:to_lower(to_list(K0))),
|
||||
maps:put(K, V, Acc)
|
||||
end, #{}, Headers).
|
||||
|
||||
check_ssl_opts(Conf)
|
||||
when Conf =:= #{} ->
|
||||
true;
|
||||
check_ssl_opts(Conf) ->
|
||||
case emqx_authz_http:parse_url(hocon_schema:get_value("config.url", Conf)) of
|
||||
#{scheme := https} ->
|
||||
case hocon_schema:get_value("config.ssl.enable", Conf) of
|
||||
true -> ok;
|
||||
false -> false
|
||||
end;
|
||||
#{scheme := http} ->
|
||||
ok
|
||||
end.
|
||||
|
||||
check_headers(Conf)
|
||||
when Conf =:= #{} ->
|
||||
true;
|
||||
check_headers(Conf) ->
|
||||
Method = to_bin(hocon_schema:get_value("config.method", Conf)),
|
||||
Headers = hocon_schema:get_value("config.headers", Conf),
|
||||
Method =:= <<"post">> orelse (not maps:is_key(<<"content-type">>, Headers)).
|
||||
|
||||
union_array(Item) when is_list(Item) ->
|
||||
hoconsc:array(hoconsc:union(Item)).
|
||||
|
||||
|
@ -229,15 +243,22 @@ connector_fields(DB, Fields) ->
|
|||
catch
|
||||
error:badarg ->
|
||||
list_to_atom(Mod0);
|
||||
Error ->
|
||||
erlang:error(Error)
|
||||
error:Reason ->
|
||||
erlang:error(Reason)
|
||||
end,
|
||||
[ {type, #{type => DB}}
|
||||
, {enable, #{type => boolean(),
|
||||
default => true}}
|
||||
] ++ Mod:fields(Fields).
|
||||
] ++ erlang:apply(Mod, fields, [Fields]).
|
||||
|
||||
to_list(A) when is_atom(A) ->
|
||||
atom_to_list(A);
|
||||
to_list(B) when is_binary(B) ->
|
||||
binary_to_list(B).
|
||||
|
||||
to_bin(A) when is_atom(A) ->
|
||||
atom_to_binary(A);
|
||||
to_bin(B) when is_binary(B) ->
|
||||
B;
|
||||
to_bin(L) when is_list(L) ->
|
||||
list_to_binary(L).
|
||||
|
|
|
@ -36,7 +36,8 @@ init_per_suite(Config) ->
|
|||
meck:expect(emqx_resource, remove, fun(_) -> ok end ),
|
||||
|
||||
ok = emqx_common_test_helpers:start_apps(
|
||||
[emqx_conf, emqx_authz], fun set_special_configs/1),
|
||||
[emqx_connector, emqx_conf, emqx_authz],
|
||||
fun set_special_configs/1),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
|
|
|
@ -269,7 +269,7 @@ srv_record(_) -> undefined.
|
|||
parse_servers(Type, Servers) when is_binary(Servers) ->
|
||||
parse_servers(Type, binary_to_list(Servers));
|
||||
parse_servers(Type, Servers) when is_list(Servers) ->
|
||||
case string:split(Servers, ",", trailing) of
|
||||
case string:split(Servers, ",", all) of
|
||||
[Host | _] when Type =:= single ->
|
||||
[Host];
|
||||
Hosts ->
|
||||
|
|
Loading…
Reference in New Issue