Merge pull request #7457 from thalesmg/telemetry-revamp-part3
feat(telemetry): add authn and authz info to telemetry data
This commit is contained in:
commit
1df850710e
|
@ -2406,9 +2406,9 @@ str(S) when is_list(S) ->
|
||||||
S.
|
S.
|
||||||
|
|
||||||
authentication(Desc) ->
|
authentication(Desc) ->
|
||||||
%% authentication schemais lazy to make it more 'plugable'
|
%% authentication schema is lazy to make it more 'plugable'
|
||||||
%% the type checks are done in emqx_auth application when it boots.
|
%% the type checks are done in emqx_auth application when it boots.
|
||||||
%% and in emqx_authentication_config module for rutime changes.
|
%% and in emqx_authentication_config module for runtime changes.
|
||||||
Default = hoconsc:lazy(hoconsc:union([typerefl:map(), hoconsc:array(typerefl:map())])),
|
Default = hoconsc:lazy(hoconsc:union([typerefl:map(), hoconsc:array(typerefl:map())])),
|
||||||
%% as the type is lazy, the runtime module injection
|
%% as the type is lazy, the runtime module injection
|
||||||
%% from EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY
|
%% from EMQX_AUTHENTICATION_SCHEMA_MODULE_PT_KEY
|
||||||
|
|
|
@ -34,6 +34,8 @@
|
||||||
-define(CONF_NS_ATOM, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM).
|
-define(CONF_NS_ATOM, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM).
|
||||||
-define(CONF_NS_BINARY, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY).
|
-define(CONF_NS_BINARY, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY).
|
||||||
|
|
||||||
|
-type authenticator_id() :: binary().
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
-define(RESOURCE_GROUP, <<"emqx_authn">>).
|
-define(RESOURCE_GROUP, <<"emqx_authn">>).
|
||||||
|
|
|
@ -20,7 +20,9 @@
|
||||||
providers/0,
|
providers/0,
|
||||||
check_config/1,
|
check_config/1,
|
||||||
check_config/2,
|
check_config/2,
|
||||||
check_configs/1
|
check_configs/1,
|
||||||
|
%% for telemetry information
|
||||||
|
get_enabled_authns/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-include("emqx_authn.hrl").
|
-include("emqx_authn.hrl").
|
||||||
|
@ -77,3 +79,37 @@ atom(Bin) ->
|
||||||
_:_ ->
|
_:_ ->
|
||||||
throw({unknown_auth_provider, Bin})
|
throw({unknown_auth_provider, Bin})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec get_enabled_authns() ->
|
||||||
|
#{
|
||||||
|
authenticators => [authenticator_id()],
|
||||||
|
overridden_listeners => #{authenticator_id() => pos_integer()}
|
||||||
|
}.
|
||||||
|
get_enabled_authns() ->
|
||||||
|
%% at the moment of writing, `emqx_authentication:list_chains/0'
|
||||||
|
%% result is always wrapped in `{ok, _}', and it cannot return any
|
||||||
|
%% error values.
|
||||||
|
{ok, Chains} = emqx_authentication:list_chains(),
|
||||||
|
AuthnTypes = lists:usort([
|
||||||
|
Type
|
||||||
|
|| #{authenticators := As} <- Chains,
|
||||||
|
#{id := Type} <- As
|
||||||
|
]),
|
||||||
|
OverriddenListeners =
|
||||||
|
lists:foldl(
|
||||||
|
fun
|
||||||
|
(#{name := ?GLOBAL}, Acc) ->
|
||||||
|
Acc;
|
||||||
|
(#{authenticators := As}, Acc) ->
|
||||||
|
lists:foldl(fun tally_authenticators/2, Acc, As)
|
||||||
|
end,
|
||||||
|
#{},
|
||||||
|
Chains
|
||||||
|
),
|
||||||
|
#{
|
||||||
|
authenticators => AuthnTypes,
|
||||||
|
overridden_listeners => OverriddenListeners
|
||||||
|
}.
|
||||||
|
|
||||||
|
tally_authenticators(#{id := AuthenticatorName}, Acc) ->
|
||||||
|
maps:update_with(AuthenticatorName, fun(N) -> N + 1 end, 1, Acc).
|
||||||
|
|
|
@ -33,7 +33,9 @@
|
||||||
lookup/1,
|
lookup/1,
|
||||||
move/2,
|
move/2,
|
||||||
update/2,
|
update/2,
|
||||||
authorize/5
|
authorize/5,
|
||||||
|
%% for telemetry information
|
||||||
|
get_enabled_authzs/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([post_config_update/5, pre_config_update/3]).
|
-export([post_config_update/5, pre_config_update/3]).
|
||||||
|
@ -336,6 +338,9 @@ do_authorize(
|
||||||
Matched -> {Matched, Type}
|
Matched -> {Matched, Type}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
get_enabled_authzs() ->
|
||||||
|
lists:usort([Type || #{type := Type} <- lookup()]).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal function
|
%% Internal function
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -389,7 +394,7 @@ maybe_write_files(NewSource) ->
|
||||||
|
|
||||||
write_acl_file(#{<<"rules">> := Rules} = Source) ->
|
write_acl_file(#{<<"rules">> := Rules} = Source) ->
|
||||||
NRules = check_acl_file_rules(Rules),
|
NRules = check_acl_file_rules(Rules),
|
||||||
Path = acl_conf_file(),
|
Path = ?MODULE:acl_conf_file(),
|
||||||
{ok, _Filename} = write_file(Path, NRules),
|
{ok, _Filename} = write_file(Path, NRules),
|
||||||
maps:without([<<"rules">>], Source#{<<"path">> => Path}).
|
maps:without([<<"rules">>], Source#{<<"path">> => Path}).
|
||||||
|
|
||||||
|
|
|
@ -57,7 +57,7 @@ end_per_suite(_Config) ->
|
||||||
}
|
}
|
||||||
),
|
),
|
||||||
ok = stop_apps([emqx_resource]),
|
ok = stop_apps([emqx_resource]),
|
||||||
emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]),
|
emqx_common_test_helpers:stop_apps([emqx_connector, emqx_authz, emqx_conf]),
|
||||||
meck:unload(emqx_resource),
|
meck:unload(emqx_resource),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
@ -279,5 +279,12 @@ t_move_source(_) ->
|
||||||
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_get_enabled_authzs_none_enabled(_Config) ->
|
||||||
|
?assertEqual([], emqx_authz:get_enabled_authzs()).
|
||||||
|
|
||||||
|
t_get_enabled_authzs_some_enabled(_Config) ->
|
||||||
|
{ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE4]),
|
||||||
|
?assertEqual([postgresql], emqx_authz:get_enabled_authzs()).
|
||||||
|
|
||||||
stop_apps(Apps) ->
|
stop_apps(Apps) ->
|
||||||
lists:foreach(fun application:stop/1, Apps).
|
lists:foreach(fun application:stop/1, Apps).
|
||||||
|
|
|
@ -158,14 +158,16 @@ auth_header_() ->
|
||||||
{"Authorization", "Basic " ++ Basic}.
|
{"Authorization", "Basic " ++ Basic}.
|
||||||
|
|
||||||
restart_monitor() ->
|
restart_monitor() ->
|
||||||
erlang:exit(erlang:whereis(emqx_dashboard_monitor), killed),
|
OldMonitor = erlang:whereis(emqx_dashboard_monitor),
|
||||||
?assertEqual(ok, wait_new_monitor(10)).
|
erlang:exit(OldMonitor, killed),
|
||||||
|
?assertEqual(ok, wait_new_monitor(OldMonitor, 10)).
|
||||||
|
|
||||||
wait_new_monitor(Count) when Count =< 0 -> timeout;
|
wait_new_monitor(_OldMonitor, Count) when Count =< 0 -> timeout;
|
||||||
wait_new_monitor(Count) ->
|
wait_new_monitor(OldMonitor, Count) ->
|
||||||
case is_pid(erlang:whereis(emqx_dashboard_monitor)) of
|
NewMonitor = erlang:whereis(emqx_dashboard_monitor),
|
||||||
|
case is_pid(NewMonitor) andalso NewMonitor =/= OldMonitor of
|
||||||
true -> ok;
|
true -> ok;
|
||||||
false ->
|
false ->
|
||||||
timer:sleep(100),
|
timer:sleep(100),
|
||||||
wait_new_monitor(Count - 1)
|
wait_new_monitor(OldMonitor, Count - 1)
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -37,9 +37,7 @@ maybe_enable_modules() ->
|
||||||
RewriteEnabled = length(emqx_conf:get([rewrite], [])) > 0,
|
RewriteEnabled = length(emqx_conf:get([rewrite], [])) > 0,
|
||||||
RetainerEnabled = emqx_conf:get([retainer, enable], false),
|
RetainerEnabled = emqx_conf:get([retainer, enable], false),
|
||||||
AutoSubscribeEnabled = length(emqx_conf:get([auto_subscribe, topics], [])) > 0,
|
AutoSubscribeEnabled = length(emqx_conf:get([auto_subscribe, topics], [])) > 0,
|
||||||
application:set_env(
|
emqx_modules:set_advanced_mqtt_features_in_use(
|
||||||
emqx_modules,
|
|
||||||
advanced_mqtt_features_in_use,
|
|
||||||
#{
|
#{
|
||||||
delayed => DelayedEnabled,
|
delayed => DelayedEnabled,
|
||||||
topic_rewrite => RewriteEnabled,
|
topic_rewrite => RewriteEnabled,
|
||||||
|
|
|
@ -348,7 +348,8 @@ get_telemetry(State0 = #state{uuid = UUID}) ->
|
||||||
{build_info, build_info()},
|
{build_info, build_info()},
|
||||||
{vm_specs, vm_specs()},
|
{vm_specs, vm_specs()},
|
||||||
{mqtt_runtime_insights, MQTTRTInsights},
|
{mqtt_runtime_insights, MQTTRTInsights},
|
||||||
{advanced_mqtt_features, advanced_mqtt_features()}
|
{advanced_mqtt_features, advanced_mqtt_features()},
|
||||||
|
{authn_authz, get_authn_authz_info()}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
report_telemetry(State0 = #state{url = URL}) ->
|
report_telemetry(State0 = #state{url = URL}) ->
|
||||||
|
@ -452,6 +453,18 @@ advanced_mqtt_features() ->
|
||||||
AdvancedFeatures = emqx_modules:get_advanced_mqtt_features_in_use(),
|
AdvancedFeatures = emqx_modules:get_advanced_mqtt_features_in_use(),
|
||||||
maps:map(fun(_K, V) -> bool2int(V) end, AdvancedFeatures).
|
maps:map(fun(_K, V) -> bool2int(V) end, AdvancedFeatures).
|
||||||
|
|
||||||
|
get_authn_authz_info() ->
|
||||||
|
#{
|
||||||
|
authenticators := AuthnTypes,
|
||||||
|
overridden_listeners := OverriddenListeners
|
||||||
|
} = emqx_authn:get_enabled_authns(),
|
||||||
|
AuthzTypes = emqx_authz:get_enabled_authzs(),
|
||||||
|
#{
|
||||||
|
authn => AuthnTypes,
|
||||||
|
authn_listener => OverriddenListeners,
|
||||||
|
authz => AuthzTypes
|
||||||
|
}.
|
||||||
|
|
||||||
bin(L) when is_list(L) ->
|
bin(L) when is_list(L) ->
|
||||||
list_to_binary(L);
|
list_to_binary(L);
|
||||||
bin(A) when is_atom(A) ->
|
bin(A) when is_atom(A) ->
|
||||||
|
|
|
@ -29,19 +29,35 @@ all() -> emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
init_per_suite(Config) ->
|
init_per_suite(Config) ->
|
||||||
snabbkaffe:fix_ct_logging(),
|
snabbkaffe:fix_ct_logging(),
|
||||||
emqx_common_test_helpers:start_apps([emqx_conf, emqx_modules]),
|
meck:expect(
|
||||||
|
emqx_authz,
|
||||||
|
acl_conf_file,
|
||||||
|
fun() ->
|
||||||
|
emqx_common_test_helpers:deps_path(emqx_authz, "etc/acl.conf")
|
||||||
|
end
|
||||||
|
),
|
||||||
|
emqx_common_test_helpers:start_apps(
|
||||||
|
[emqx_conf, emqx_authn, emqx_authz, emqx_modules],
|
||||||
|
fun set_special_configs/1
|
||||||
|
),
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
emqx_common_test_helpers:stop_apps([emqx_conf, emqx_modules]).
|
{ok, _} = emqx:update_config(
|
||||||
|
[authorization],
|
||||||
|
#{
|
||||||
|
<<"no_match">> => <<"allow">>,
|
||||||
|
<<"cache">> => #{<<"enable">> => <<"true">>},
|
||||||
|
<<"sources">> => []
|
||||||
|
}
|
||||||
|
),
|
||||||
|
emqx_common_test_helpers:stop_apps([emqx_conf, emqx_authn, emqx_authz, emqx_modules]),
|
||||||
|
meck:unload(emqx_authz),
|
||||||
|
ok.
|
||||||
|
|
||||||
init_per_testcase(t_get_telemetry, Config) ->
|
init_per_testcase(t_get_telemetry, Config) ->
|
||||||
DataDir = ?config(data_dir, Config),
|
DataDir = ?config(data_dir, Config),
|
||||||
TestPID = self(),
|
mock_httpc(),
|
||||||
ok = meck:new(httpc, [non_strict, passthrough, no_history, no_link]),
|
|
||||||
ok = meck:expect(httpc, request, fun(Method, URL, Headers, Body) ->
|
|
||||||
TestPID ! {request, Method, URL, Headers, Body}
|
|
||||||
end),
|
|
||||||
ok = meck:new(emqx_telemetry, [non_strict, passthrough, no_history, no_link]),
|
ok = meck:new(emqx_telemetry, [non_strict, passthrough, no_history, no_link]),
|
||||||
ok = meck:expect(
|
ok = meck:expect(
|
||||||
emqx_telemetry,
|
emqx_telemetry,
|
||||||
|
@ -75,6 +91,14 @@ init_per_testcase(t_advanced_mqtt_features, Config) ->
|
||||||
auto_subscribe => false
|
auto_subscribe => false
|
||||||
}),
|
}),
|
||||||
[{old_values, OldValues} | Config];
|
[{old_values, OldValues} | Config];
|
||||||
|
init_per_testcase(t_authn_authz_info, Config) ->
|
||||||
|
mock_httpc(),
|
||||||
|
{ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
|
||||||
|
create_authn('mqtt:global', built_in_database),
|
||||||
|
create_authn('tcp:default', redis),
|
||||||
|
create_authn('ws:default', redis),
|
||||||
|
create_authz(postgresql),
|
||||||
|
Config;
|
||||||
init_per_testcase(_Testcase, Config) ->
|
init_per_testcase(_Testcase, Config) ->
|
||||||
TestPID = self(),
|
TestPID = self(),
|
||||||
ok = meck:new(httpc, [non_strict, passthrough, no_history, no_link]),
|
ok = meck:new(httpc, [non_strict, passthrough, no_history, no_link]),
|
||||||
|
@ -91,6 +115,19 @@ end_per_testcase(t_get_telemetry, _Config) ->
|
||||||
end_per_testcase(t_advanced_mqtt_features, Config) ->
|
end_per_testcase(t_advanced_mqtt_features, Config) ->
|
||||||
OldValues = ?config(old_values, Config),
|
OldValues = ?config(old_values, Config),
|
||||||
emqx_modules:set_advanced_mqtt_features_in_use(OldValues);
|
emqx_modules:set_advanced_mqtt_features_in_use(OldValues);
|
||||||
|
end_per_testcase(t_authn_authz_info, _Config) ->
|
||||||
|
meck:unload([httpc]),
|
||||||
|
emqx_authz:update({delete, postgresql}, #{}),
|
||||||
|
lists:foreach(
|
||||||
|
fun(ChainName) ->
|
||||||
|
catch emqx_authn_test_lib:delete_authenticators(
|
||||||
|
[authentication],
|
||||||
|
ChainName
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
['mqtt:global', 'tcp:default', 'ws:default']
|
||||||
|
),
|
||||||
|
ok;
|
||||||
end_per_testcase(_Testcase, _Config) ->
|
end_per_testcase(_Testcase, _Config) ->
|
||||||
meck:unload([httpc]),
|
meck:unload([httpc]),
|
||||||
ok.
|
ok.
|
||||||
|
@ -152,6 +189,7 @@ t_get_telemetry(_Config) ->
|
||||||
?assert(is_number(maps:get(messages_sent_rate, MQTTRTInsights))),
|
?assert(is_number(maps:get(messages_sent_rate, MQTTRTInsights))),
|
||||||
?assert(is_number(maps:get(messages_received_rate, MQTTRTInsights))),
|
?assert(is_number(maps:get(messages_received_rate, MQTTRTInsights))),
|
||||||
?assert(is_integer(maps:get(num_topics, MQTTRTInsights))),
|
?assert(is_integer(maps:get(num_topics, MQTTRTInsights))),
|
||||||
|
?assert(is_map(get_value(authn_authz, TelemetryData))),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_advanced_mqtt_features(_) ->
|
t_advanced_mqtt_features(_) ->
|
||||||
|
@ -183,6 +221,22 @@ t_advanced_mqtt_features(_) ->
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_authn_authz_info(_) ->
|
||||||
|
{ok, TelemetryData} = emqx_telemetry:get_telemetry(),
|
||||||
|
AuthnAuthzInfo = get_value(authn_authz, TelemetryData),
|
||||||
|
?assertEqual(
|
||||||
|
#{
|
||||||
|
authn =>
|
||||||
|
[
|
||||||
|
<<"password_based:built_in_database">>,
|
||||||
|
<<"password_based:redis">>
|
||||||
|
],
|
||||||
|
authn_listener => #{<<"password_based:redis">> => 2},
|
||||||
|
authz => [postgresql]
|
||||||
|
},
|
||||||
|
AuthnAuthzInfo
|
||||||
|
).
|
||||||
|
|
||||||
t_enable(_) ->
|
t_enable(_) ->
|
||||||
ok = meck:new(emqx_telemetry, [non_strict, passthrough, no_history, no_link]),
|
ok = meck:new(emqx_telemetry, [non_strict, passthrough, no_history, no_link]),
|
||||||
ok = meck:expect(emqx_telemetry, official_version, fun(_) -> true end),
|
ok = meck:expect(emqx_telemetry, official_version, fun(_) -> true end),
|
||||||
|
@ -265,3 +319,71 @@ bin(L) when is_list(L) ->
|
||||||
list_to_binary(L);
|
list_to_binary(L);
|
||||||
bin(B) when is_binary(B) ->
|
bin(B) when is_binary(B) ->
|
||||||
B.
|
B.
|
||||||
|
|
||||||
|
mock_httpc() ->
|
||||||
|
TestPID = self(),
|
||||||
|
ok = meck:new(httpc, [non_strict, passthrough, no_history, no_link]),
|
||||||
|
ok = meck:expect(httpc, request, fun(
|
||||||
|
Method, {URL, Headers, _ContentType, Body}, _HTTPOpts, _Opts
|
||||||
|
) ->
|
||||||
|
TestPID ! {request, Method, URL, Headers, Body}
|
||||||
|
end).
|
||||||
|
|
||||||
|
create_authn(ChainName, built_in_database) ->
|
||||||
|
emqx_authentication:initialize_authentication(
|
||||||
|
ChainName,
|
||||||
|
[
|
||||||
|
#{
|
||||||
|
mechanism => password_based,
|
||||||
|
backend => built_in_database,
|
||||||
|
enable => true,
|
||||||
|
user_id_type => username,
|
||||||
|
password_hash_algorithm => #{
|
||||||
|
name => plain,
|
||||||
|
salt_position => suffix
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
);
|
||||||
|
create_authn(ChainName, redis) ->
|
||||||
|
emqx_authentication:initialize_authentication(
|
||||||
|
ChainName,
|
||||||
|
[
|
||||||
|
#{
|
||||||
|
mechanism => password_based,
|
||||||
|
backend => redis,
|
||||||
|
enable => true,
|
||||||
|
user_id_type => username,
|
||||||
|
cmd => "HMGET mqtt_user:${username} password_hash salt is_superuser",
|
||||||
|
password_hash_algorithm => #{
|
||||||
|
name => plain,
|
||||||
|
salt_position => suffix
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
).
|
||||||
|
|
||||||
|
create_authz(postgresql) ->
|
||||||
|
emqx_authz:update(
|
||||||
|
append,
|
||||||
|
#{
|
||||||
|
<<"type">> => <<"postgresql">>,
|
||||||
|
<<"enable">> => true,
|
||||||
|
<<"server">> => <<"127.0.0.1:27017">>,
|
||||||
|
<<"pool_size">> => 1,
|
||||||
|
<<"database">> => <<"mqtt">>,
|
||||||
|
<<"username">> => <<"xx">>,
|
||||||
|
<<"password">> => <<"ee">>,
|
||||||
|
<<"auto_reconnect">> => true,
|
||||||
|
<<"ssl">> => #{<<"enable">> => false},
|
||||||
|
<<"query">> => <<"abcb">>
|
||||||
|
}
|
||||||
|
).
|
||||||
|
|
||||||
|
set_special_configs(emqx_authz) ->
|
||||||
|
{ok, _} = emqx:update_config([authorization, cache, enable], false),
|
||||||
|
{ok, _} = emqx:update_config([authorization, no_match], deny),
|
||||||
|
{ok, _} = emqx:update_config([authorization, sources], []),
|
||||||
|
ok;
|
||||||
|
set_special_configs(_App) ->
|
||||||
|
ok.
|
||||||
|
|
Loading…
Reference in New Issue