fix(authz): http source create and update
This commit is contained in:
parent
f21d670751
commit
68af284570
|
@ -284,7 +284,7 @@ replace_placeholders([{K, V0} | More], Credential, Acc) ->
|
||||||
undefined ->
|
undefined ->
|
||||||
error({cannot_get_variable, V0});
|
error({cannot_get_variable, V0});
|
||||||
V ->
|
V ->
|
||||||
replace_placeholders(More, Credential, [{K, emqx_authn_utils:bin(V)} | Acc])
|
replace_placeholders(More, Credential, [{K, to_bin(V)} | Acc])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
append_query(Path, []) ->
|
append_query(Path, []) ->
|
||||||
|
|
|
@ -207,7 +207,12 @@ check_dup_types([Source | Sources], Checked) ->
|
||||||
create_dry_run(T, Source) ->
|
create_dry_run(T, Source) ->
|
||||||
case is_connector_source(T) of
|
case is_connector_source(T) of
|
||||||
true ->
|
true ->
|
||||||
[NSource] = check_sources([Source]),
|
[CheckedSource] = check_sources([Source]),
|
||||||
|
case T of
|
||||||
|
http ->
|
||||||
|
URIMap = maps:get(url, CheckedSource),
|
||||||
|
NSource = maps:put(base_url, maps:remove(query, URIMap), CheckedSource)
|
||||||
|
end,
|
||||||
emqx_resource:create_dry_run(connector_module(T), NSource);
|
emqx_resource:create_dry_run(connector_module(T), NSource);
|
||||||
false ->
|
false ->
|
||||||
ok
|
ok
|
||||||
|
|
|
@ -440,7 +440,7 @@ read_certs(#{<<"ssl">> := SSL} = Source) ->
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?SLOG(error, Reason#{msg => failed_to_readd_ssl_file}),
|
?SLOG(error, Reason#{msg => failed_to_readd_ssl_file}),
|
||||||
throw(failed_to_readd_ssl_file);
|
throw(failed_to_readd_ssl_file);
|
||||||
NewSSL ->
|
{ok, NewSSL} ->
|
||||||
Source#{<<"ssl">> => NewSSL}
|
Source#{<<"ssl">> => NewSSL}
|
||||||
end;
|
end;
|
||||||
read_certs(Source) -> Source.
|
read_certs(Source) -> Source.
|
||||||
|
|
|
@ -24,6 +24,7 @@
|
||||||
%% AuthZ Callbacks
|
%% AuthZ Callbacks
|
||||||
-export([ authorize/4
|
-export([ authorize/4
|
||||||
, description/0
|
, description/0
|
||||||
|
, parse_url/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
|
@ -36,7 +37,7 @@ description() ->
|
||||||
|
|
||||||
authorize(Client, PubSub, Topic,
|
authorize(Client, PubSub, Topic,
|
||||||
#{type := http,
|
#{type := http,
|
||||||
url := #{path := Path} = Url,
|
url := #{path := Path} = URL,
|
||||||
headers := Headers,
|
headers := Headers,
|
||||||
method := Method,
|
method := Method,
|
||||||
request_timeout := RequestTimeout,
|
request_timeout := RequestTimeout,
|
||||||
|
@ -44,7 +45,7 @@ authorize(Client, PubSub, Topic,
|
||||||
} = Source) ->
|
} = Source) ->
|
||||||
Request = case Method of
|
Request = case Method of
|
||||||
get ->
|
get ->
|
||||||
Query = maps:get(query, Url, ""),
|
Query = maps:get(query, URL, ""),
|
||||||
Path1 = replvar(Path ++ "?" ++ Query, PubSub, Topic, Client),
|
Path1 = replvar(Path ++ "?" ++ Query, PubSub, Topic, Client),
|
||||||
{Path1, maps:to_list(Headers)};
|
{Path1, maps:to_list(Headers)};
|
||||||
_ ->
|
_ ->
|
||||||
|
@ -56,10 +57,29 @@ authorize(Client, PubSub, Topic,
|
||||||
Path1 = replvar(Path, PubSub, Topic, Client),
|
Path1 = replvar(Path, PubSub, Topic, Client),
|
||||||
{Path1, maps:to_list(Headers), Body1}
|
{Path1, maps:to_list(Headers), Body1}
|
||||||
end,
|
end,
|
||||||
case emqx_resource:query(ResourceID, {Method, Request, RequestTimeout}) of
|
case emqx_resource:query(ResourceID, {Method, Request, RequestTimeout}) of
|
||||||
{ok, 204, _Headers} -> {matched, allow};
|
{ok, 200, _Headers} ->
|
||||||
{ok, 200, _Headers, _Body} -> {matched, allow};
|
{matched, allow};
|
||||||
_ -> nomatch
|
{ok, 204, _Headers} ->
|
||||||
|
{matched, allow};
|
||||||
|
{ok, 200, _Headers, _Body} ->
|
||||||
|
{matched, allow};
|
||||||
|
{ok, _Status, _Headers, _Body} ->
|
||||||
|
nomatch;
|
||||||
|
{error, Reason} ->
|
||||||
|
?SLOG(error, #{msg => "http_server_query_failed",
|
||||||
|
resource => ResourceID,
|
||||||
|
reason => Reason}),
|
||||||
|
ignore
|
||||||
|
end.
|
||||||
|
|
||||||
|
parse_url(URL) ->
|
||||||
|
{ok, URIMap} = emqx_http_lib:uri_parse(URL),
|
||||||
|
case maps:get(query, URIMap, undefined) of
|
||||||
|
undefined ->
|
||||||
|
URIMap#{query => ""};
|
||||||
|
_ ->
|
||||||
|
URIMap
|
||||||
end.
|
end.
|
||||||
|
|
||||||
query_string(Body) ->
|
query_string(Body) ->
|
||||||
|
@ -88,7 +108,7 @@ replvar(Str0, PubSub, Topic,
|
||||||
is_binary(Str0) ->
|
is_binary(Str0) ->
|
||||||
NTopic = emqx_http_lib:uri_encode(Topic),
|
NTopic = emqx_http_lib:uri_encode(Topic),
|
||||||
Str1 = re:replace( Str0, emqx_authz:ph_to_re(?PH_S_CLIENTID)
|
Str1 = re:replace( Str0, emqx_authz:ph_to_re(?PH_S_CLIENTID)
|
||||||
, Clientid, [global, {return, binary}]),
|
, bin(Clientid), [global, {return, binary}]),
|
||||||
Str2 = re:replace( Str1, emqx_authz:ph_to_re(?PH_S_USERNAME)
|
Str2 = re:replace( Str1, emqx_authz:ph_to_re(?PH_S_USERNAME)
|
||||||
, bin(Username), [global, {return, binary}]),
|
, bin(Username), [global, {return, binary}]),
|
||||||
Str3 = re:replace( Str2, emqx_authz:ph_to_re(?PH_S_HOST)
|
Str3 = re:replace( Str2, emqx_authz:ph_to_re(?PH_S_HOST)
|
||||||
|
@ -96,9 +116,9 @@ replvar(Str0, PubSub, Topic,
|
||||||
Str4 = re:replace( Str3, emqx_authz:ph_to_re(?PH_S_PROTONAME)
|
Str4 = re:replace( Str3, emqx_authz:ph_to_re(?PH_S_PROTONAME)
|
||||||
, bin(Protocol), [global, {return, binary}]),
|
, bin(Protocol), [global, {return, binary}]),
|
||||||
Str5 = re:replace( Str4, emqx_authz:ph_to_re(?PH_S_MOUNTPOINT)
|
Str5 = re:replace( Str4, emqx_authz:ph_to_re(?PH_S_MOUNTPOINT)
|
||||||
, Mountpoint, [global, {return, binary}]),
|
, bin(Mountpoint), [global, {return, binary}]),
|
||||||
Str6 = re:replace( Str5, emqx_authz:ph_to_re(?PH_S_TOPIC)
|
Str6 = re:replace( Str5, emqx_authz:ph_to_re(?PH_S_TOPIC)
|
||||||
, NTopic, [global, {return, binary}]),
|
, bin(NTopic), [global, {return, binary}]),
|
||||||
Str7 = re:replace( Str6, emqx_authz:ph_to_re(?PH_S_ACTION)
|
Str7 = re:replace( Str6, emqx_authz:ph_to_re(?PH_S_ACTION)
|
||||||
, bin(PubSub), [global, {return, binary}]),
|
, bin(PubSub), [global, {return, binary}]),
|
||||||
Str7.
|
Str7.
|
||||||
|
|
|
@ -32,10 +32,15 @@
|
||||||
-export([ namespace/0
|
-export([ namespace/0
|
||||||
, roots/0
|
, roots/0
|
||||||
, fields/1
|
, fields/1
|
||||||
|
, validations/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-import(emqx_schema, [mk_duration/2]).
|
-import(emqx_schema, [mk_duration/2]).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Hocon Schema
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
namespace() -> authz.
|
namespace() -> authz.
|
||||||
|
|
||||||
%% @doc authorization schema is not exported
|
%% @doc authorization schema is not exported
|
||||||
|
@ -98,66 +103,13 @@ and the new rules will override all rules from the old config file.
|
||||||
}}
|
}}
|
||||||
];
|
];
|
||||||
fields(http_get) ->
|
fields(http_get) ->
|
||||||
[ {type, #{type => http}}
|
[ {method, #{type => get, default => post}}
|
||||||
, {enable, #{type => boolean(),
|
, {headers, fun headers_no_content_type/1}
|
||||||
default => true}}
|
] ++ http_common_fields();
|
||||||
, {url, #{type => url()}}
|
|
||||||
, {method, #{type => get, default => get }}
|
|
||||||
, {headers, #{type => map(),
|
|
||||||
default => #{ <<"accept">> => <<"application/json">>
|
|
||||||
, <<"cache-control">> => <<"no-cache">>
|
|
||||||
, <<"connection">> => <<"keep-alive">>
|
|
||||||
, <<"keep-alive">> => <<"timeout=5">>
|
|
||||||
},
|
|
||||||
converter => fun (Headers0) ->
|
|
||||||
Headers1 = maps:fold(fun(K0, V, AccIn) ->
|
|
||||||
K1 = iolist_to_binary(string:to_lower(to_list(K0))),
|
|
||||||
maps:put(K1, V, AccIn)
|
|
||||||
end, #{}, Headers0),
|
|
||||||
maps:merge(#{ <<"accept">> => <<"application/json">>
|
|
||||||
, <<"cache-control">> => <<"no-cache">>
|
|
||||||
, <<"connection">> => <<"keep-alive">>
|
|
||||||
, <<"keep-alive">> => <<"timeout=5">>
|
|
||||||
}, Headers1)
|
|
||||||
end
|
|
||||||
}
|
|
||||||
}
|
|
||||||
, {request_timeout, mk_duration("request timeout", #{default => "30s"})}
|
|
||||||
] ++ proplists:delete(base_url, emqx_connector_http:fields(config));
|
|
||||||
fields(http_post) ->
|
fields(http_post) ->
|
||||||
[ {type, #{type => http}}
|
[ {method, #{type => post, default => post}}
|
||||||
, {enable, #{type => boolean(),
|
, {headers, fun headers/1}
|
||||||
default => true}}
|
] ++ http_common_fields();
|
||||||
, {url, #{type => url()}}
|
|
||||||
, {method, #{type => post,
|
|
||||||
default => get}}
|
|
||||||
, {headers, #{type => map(),
|
|
||||||
default => #{ <<"accept">> => <<"application/json">>
|
|
||||||
, <<"cache-control">> => <<"no-cache">>
|
|
||||||
, <<"connection">> => <<"keep-alive">>
|
|
||||||
, <<"content-type">> => <<"application/json">>
|
|
||||||
, <<"keep-alive">> => <<"timeout=5">>
|
|
||||||
},
|
|
||||||
converter => fun (Headers0) ->
|
|
||||||
Headers1 = maps:fold(fun(K0, V, AccIn) ->
|
|
||||||
K1 = iolist_to_binary(string:to_lower(binary_to_list(K0))),
|
|
||||||
maps:put(K1, V, AccIn)
|
|
||||||
end, #{}, Headers0),
|
|
||||||
maps:merge(#{ <<"accept">> => <<"application/json">>
|
|
||||||
, <<"cache-control">> => <<"no-cache">>
|
|
||||||
, <<"connection">> => <<"keep-alive">>
|
|
||||||
, <<"content-type">> => <<"application/json">>
|
|
||||||
, <<"keep-alive">> => <<"timeout=5">>
|
|
||||||
}, Headers1)
|
|
||||||
end
|
|
||||||
}
|
|
||||||
}
|
|
||||||
, {request_timeout, mk_duration("request timeout", #{default => "30s"})}
|
|
||||||
, {body, #{type => map(),
|
|
||||||
nullable => true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
] ++ proplists:delete(base_url, emqx_connector_http:fields(config));
|
|
||||||
fields(mnesia) ->
|
fields(mnesia) ->
|
||||||
[ {type, #{type => 'built-in-database'}}
|
[ {type, #{type => 'built-in-database'}}
|
||||||
, {enable, #{type => boolean(),
|
, {enable, #{type => boolean(),
|
||||||
|
@ -203,10 +155,73 @@ fields(redis_cluster) ->
|
||||||
connector_fields(redis, cluster) ++
|
connector_fields(redis, cluster) ++
|
||||||
[ {cmd, query()} ].
|
[ {cmd, query()} ].
|
||||||
|
|
||||||
|
http_common_fields() ->
|
||||||
|
[ {type, #{type => http}}
|
||||||
|
, {enable, #{type => boolean(), default => true}}
|
||||||
|
, {url, #{type => url()}}
|
||||||
|
, {request_timeout, mk_duration("request timeout", #{default => "30s"})}
|
||||||
|
, {body, #{type => map(), nullable => true}}
|
||||||
|
] ++ proplists:delete(base_url, emqx_connector_http:fields(config)).
|
||||||
|
|
||||||
|
validations() ->
|
||||||
|
[ {check_ssl_opts, fun check_ssl_opts/1}
|
||||||
|
, {check_headers, fun check_headers/1}
|
||||||
|
].
|
||||||
|
|
||||||
|
headers(type) -> map();
|
||||||
|
headers(converter) ->
|
||||||
|
fun(Headers) ->
|
||||||
|
maps:merge(default_headers(), transform_header_name(Headers))
|
||||||
|
end;
|
||||||
|
headers(default) -> default_headers();
|
||||||
|
headers(_) -> undefined.
|
||||||
|
|
||||||
|
headers_no_content_type(type) -> map();
|
||||||
|
headers_no_content_type(converter) ->
|
||||||
|
fun(Headers) ->
|
||||||
|
maps:merge(default_headers_no_content_type(), transform_header_name(Headers))
|
||||||
|
end;
|
||||||
|
headers_no_content_type(default) -> default_headers_no_content_type();
|
||||||
|
headers_no_content_type(_) -> undefined.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
default_headers() ->
|
||||||
|
maps:put(<<"content-type">>,
|
||||||
|
<<"application/json">>,
|
||||||
|
default_headers_no_content_type()).
|
||||||
|
|
||||||
|
default_headers_no_content_type() ->
|
||||||
|
#{ <<"accept">> => <<"application/json">>
|
||||||
|
, <<"cache-control">> => <<"no-cache">>
|
||||||
|
, <<"connection">> => <<"keep-alive">>
|
||||||
|
, <<"keep-alive">> => <<"timeout=5">>
|
||||||
|
}.
|
||||||
|
|
||||||
|
transform_header_name(Headers) ->
|
||||||
|
maps:fold(fun(K0, V, Acc) ->
|
||||||
|
K = list_to_binary(string:to_lower(to_list(K0))),
|
||||||
|
maps:put(K, V, Acc)
|
||||||
|
end, #{}, Headers).
|
||||||
|
|
||||||
|
check_ssl_opts(Conf) ->
|
||||||
|
case emqx_authz_http:parse_url(hocon_schema:get_value("config.url", Conf)) of
|
||||||
|
#{scheme := https} ->
|
||||||
|
case hocon_schema:get_value("config.ssl.enable", Conf) of
|
||||||
|
true -> ok;
|
||||||
|
false -> false
|
||||||
|
end;
|
||||||
|
#{scheme := http} ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
check_headers(Conf) ->
|
||||||
|
Method = to_bin(hocon_schema:get_value("config.method", Conf)),
|
||||||
|
Headers = hocon_schema:get_value("config.headers", Conf),
|
||||||
|
Method =:= <<"post">> orelse (not maps:is_key(<<"content-type">>, Headers)).
|
||||||
|
|
||||||
union_array(Item) when is_list(Item) ->
|
union_array(Item) when is_list(Item) ->
|
||||||
hoconsc:array(hoconsc:union(Item)).
|
hoconsc:array(hoconsc:union(Item)).
|
||||||
|
|
||||||
|
@ -229,8 +244,8 @@ connector_fields(DB, Fields) ->
|
||||||
catch
|
catch
|
||||||
error:badarg ->
|
error:badarg ->
|
||||||
list_to_atom(Mod0);
|
list_to_atom(Mod0);
|
||||||
Error ->
|
error:Reason ->
|
||||||
erlang:error(Error)
|
erlang:error(Reason)
|
||||||
end,
|
end,
|
||||||
[ {type, #{type => DB}}
|
[ {type, #{type => DB}}
|
||||||
, {enable, #{type => boolean(),
|
, {enable, #{type => boolean(),
|
||||||
|
@ -241,3 +256,10 @@ to_list(A) when is_atom(A) ->
|
||||||
atom_to_list(A);
|
atom_to_list(A);
|
||||||
to_list(B) when is_binary(B) ->
|
to_list(B) when is_binary(B) ->
|
||||||
binary_to_list(B).
|
binary_to_list(B).
|
||||||
|
|
||||||
|
to_bin(A) when is_atom(A) ->
|
||||||
|
atom_to_binary(A);
|
||||||
|
to_bin(B) when is_binary(B) ->
|
||||||
|
B;
|
||||||
|
to_bin(L) when is_list(L) ->
|
||||||
|
list_to_binary(L).
|
||||||
|
|
Loading…
Reference in New Issue