Merge pull request #7983 from zhongwencool/clients-api-mqueue-len-dropped-support
feat: client api support mqueue_len/mqueue_dropped filter
This commit is contained in:
commit
144efca79c
|
@ -35,6 +35,10 @@
|
|||
{<<"_gte_created_at">>, timestamp},
|
||||
{<<"_lte_created_at">>, timestamp},
|
||||
{<<"_gte_connected_at">>, timestamp},
|
||||
{<<"_gte_mqueue_len">>, integer},
|
||||
{<<"_lte_mqueue_len">>, integer},
|
||||
{<<"_gte_mqueue_dropped">>, integer},
|
||||
{<<"_lte_mqueue_dropped">>, integer},
|
||||
{<<"_lte_connected_at">>, timestamp}]}).
|
||||
|
||||
-rest_api(#{name => list_clients,
|
||||
|
@ -320,14 +324,14 @@ format_acl_cache({{PubSub, Topic}, {AclResult, Timestamp}}) ->
|
|||
%% Query Functions
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
query({Qs, []}, Start, Limit) ->
|
||||
Ms = qs2ms(Qs),
|
||||
emqx_mgmt_api:select_table(emqx_channel_info, Ms, Start, Limit, fun format_channel_info/1);
|
||||
|
||||
query({Qs, Fuzzy}, Start, Limit) ->
|
||||
Ms = qs2ms(Qs),
|
||||
MatchFun = match_fun(Ms, Fuzzy),
|
||||
emqx_mgmt_api:traverse_table(emqx_channel_info, MatchFun, Start, Limit, fun format_channel_info/1).
|
||||
case qs2ms(Qs) of
|
||||
{Ms, []}when Fuzzy =:= [] ->
|
||||
emqx_mgmt_api:select_table(emqx_channel_info, Ms, Start, Limit, fun format_channel_info/1);
|
||||
{Ms, FuzzyStats} ->
|
||||
MatchFun = match_fun(Ms, Fuzzy ++ FuzzyStats),
|
||||
emqx_mgmt_api:traverse_table(emqx_channel_info, MatchFun, Start, Limit, fun format_channel_info/1)
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Match funcs
|
||||
|
@ -351,27 +355,48 @@ run_fuzzy_match(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, like, SubStr}|Fu
|
|||
undefined -> <<>>;
|
||||
V -> V
|
||||
end,
|
||||
binary:match(Val, SubStr) /= nomatch andalso run_fuzzy_match(E, Fuzzy).
|
||||
binary:match(Val, SubStr) /= nomatch andalso run_fuzzy_match(E, Fuzzy);
|
||||
run_fuzzy_match(E = {_, _, Stats}, [{Key, '>=', Int}|Fuzzy]) ->
|
||||
case lists:keyfind(Key, 1, Stats) of
|
||||
{_, Val} when Val >= Int -> run_fuzzy_match(E, Fuzzy);
|
||||
_ -> false
|
||||
end;
|
||||
run_fuzzy_match(E = {_, _, Stats}, [{Key, '=<', Int}|Fuzzy]) ->
|
||||
case lists:keyfind(Key, 1, Stats) of
|
||||
{_, Val} when Val =< Int -> run_fuzzy_match(E, Fuzzy);
|
||||
_ -> false
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% QueryString to Match Spec
|
||||
|
||||
-spec qs2ms(list()) -> ets:match_spec().
|
||||
qs2ms(Qs) ->
|
||||
{MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}),
|
||||
[{{'$1', MtchHead, '_'}, Conds, ['$_']}].
|
||||
{MatchHead, Conds, FuzzyStats} = qs2ms(Qs, 2, #{}, [], []),
|
||||
{[{{'$1', MatchHead, '_'}, Conds, ['$_']}], FuzzyStats}.
|
||||
|
||||
qs2ms([], _, {MtchHead, Conds}) ->
|
||||
{MtchHead, lists:reverse(Conds)};
|
||||
qs2ms([], _, MatchHead, Conds, FuzzyStats) ->
|
||||
{MatchHead, lists:reverse(Conds), FuzzyStats};
|
||||
|
||||
qs2ms([{Key, '=:=', Value} | Rest], N, {MtchHead, Conds}) ->
|
||||
NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(Key, Value)),
|
||||
qs2ms(Rest, N, {NMtchHead, Conds});
|
||||
qs2ms([Qs | Rest], N, {MtchHead, Conds}) ->
|
||||
qs2ms([{Key, '=:=', Value} | Rest], N, MatchHead, Conds, FuzzyStats) ->
|
||||
NMatchHead = emqx_mgmt_util:merge_maps(MatchHead, ms(Key, Value)),
|
||||
qs2ms(Rest, N, NMatchHead, Conds, FuzzyStats);
|
||||
qs2ms([Qs | Rest], N, MatchHead, Conds, FuzzyStats) ->
|
||||
Holder = binary_to_atom(iolist_to_binary(["$", integer_to_list(N)]), utf8),
|
||||
NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(element(1, Qs), Holder)),
|
||||
case ms(element(1, Qs), Holder) of
|
||||
fuzzy_stats ->
|
||||
FuzzyStats1 =
|
||||
case Qs of
|
||||
{_Key, _Symbol, _Val} -> [Qs | FuzzyStats];
|
||||
{Key, Symbol1, Val1, Symbol2, Val2} ->
|
||||
[{Key, Symbol1, Val1}, {Key, Symbol2, Val2} | FuzzyStats]
|
||||
end,
|
||||
qs2ms(Rest, N, MatchHead, Conds, FuzzyStats1);
|
||||
Ms ->
|
||||
NMatchHead = emqx_mgmt_util:merge_maps(MatchHead, Ms),
|
||||
NConds = put_conds(Qs, Holder, Conds),
|
||||
qs2ms(Rest, N+1, {NMtchHead, NConds}).
|
||||
qs2ms(Rest, N+1, NMatchHead, NConds, FuzzyStats)
|
||||
end.
|
||||
|
||||
put_conds({_, Op, V}, Holder, Conds) ->
|
||||
[{Op, Holder, V} | Conds];
|
||||
|
@ -398,7 +423,11 @@ ms(proto_ver, X) ->
|
|||
ms(connected_at, X) ->
|
||||
#{conninfo => #{connected_at => X}};
|
||||
ms(created_at, X) ->
|
||||
#{session => #{created_at => X}}.
|
||||
#{session => #{created_at => X}};
|
||||
ms(mqueue_len, _X) ->
|
||||
fuzzy_stats;
|
||||
ms(mqueue_dropped, _X) ->
|
||||
fuzzy_stats.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% EUnits
|
||||
|
@ -421,6 +450,10 @@ params2qs_test() ->
|
|||
{<<"_lte_created_at">>, 5},
|
||||
{<<"_gte_connected_at">>, 1},
|
||||
{<<"_lte_connected_at">>, 5},
|
||||
{<<"_lte_mqueue_len">>, 10},
|
||||
{<<"_gte_mqueue_len">>, 5},
|
||||
{<<"_lte_mqueue_dropped">>, 100},
|
||||
{<<"_gte_mqueue_dropped">>, 50},
|
||||
{<<"_like_clientid">>, <<"a">>},
|
||||
{<<"_like_username">>, <<"e">>}
|
||||
],
|
||||
|
@ -440,12 +473,17 @@ params2qs_test() ->
|
|||
{'=<','$2', 5000},
|
||||
{'>=','$3', 1000},
|
||||
{'=<','$3', 5000}],
|
||||
{10, {Qs1, []}} = emqx_mgmt_api:params2qs(Params, QsSchema),
|
||||
[{{'$1', MtchHead, _}, Condi, _}] = qs2ms(Qs1),
|
||||
ExpectedFuzzyStats = [{mqueue_dropped,'=<',100},
|
||||
{mqueue_dropped,'>=',50},
|
||||
{mqueue_len,'=<',10},
|
||||
{mqueue_len,'>=',5}],
|
||||
{12, {Qs1, []}} = emqx_mgmt_api:params2qs(Params, QsSchema),
|
||||
{[{{'$1', MtchHead, _}, Condi, _}], FuzzyStats} = qs2ms(Qs1),
|
||||
?assertEqual(ExpectedMtchHead, MtchHead),
|
||||
?assertEqual(ExpectedCondi, Condi),
|
||||
?assertEqual(ExpectedFuzzyStats, lists:sort(FuzzyStats)),
|
||||
|
||||
[{{'$1', #{}, '_'}, [], ['$_']}] = qs2ms([]).
|
||||
{[{{'$1', #{}, '_'}, [], ['$_']}], []} = qs2ms([]).
|
||||
|
||||
fuzzy_match_test() ->
|
||||
Info = {emqx_channel_info,
|
||||
|
@ -467,4 +505,31 @@ fuzzy_match_test() ->
|
|||
true = run_fuzzy_match(Info, [{clientid, like, <<"de">>},
|
||||
{username, like, <<"[]">>}]).
|
||||
|
||||
fuzzy_stats_test() ->
|
||||
Fun = fun(Len, Dropped) ->
|
||||
{emqx_channel_info,
|
||||
#{clientinfo =>
|
||||
#{ clientid => <<"abcde">>, username => <<"abc\\name*[]()">>}},
|
||||
[{mqueue_len, Len}, {mqueue_max,1000}, {mqueue_dropped, Dropped}]
|
||||
}
|
||||
end,
|
||||
false = run_fuzzy_match(Fun(0, 100), [{mqueue_len, '>=', 1}]),
|
||||
true = run_fuzzy_match(Fun(1, 100), [{mqueue_len, '>=', 1}]),
|
||||
false = run_fuzzy_match(Fun(1, 100), [{mqueue_len, '>=', 2}]),
|
||||
true = run_fuzzy_match(Fun(99, 100), [{mqueue_len, '=<', 100}, {mqueue_len, '>=', 98}]),
|
||||
false = run_fuzzy_match(Fun(99, 100), [{mqueue_len, '=<', 101}, {mqueue_len, '>=', 100}]),
|
||||
|
||||
false = run_fuzzy_match(Fun(1000, 0), [{mqueue_dropped, '>=', 1}]),
|
||||
true = run_fuzzy_match(Fun(1000, 1), [{mqueue_dropped, '>=', 1}]),
|
||||
false = run_fuzzy_match(Fun(1000, 1), [{mqueue_dropped, '>=', 2}]),
|
||||
true = run_fuzzy_match(Fun(1000, 99), [{mqueue_dropped, '=<', 100}, {mqueue_dropped, '>=', 98}]),
|
||||
false = run_fuzzy_match(Fun(1000, 99), [{mqueue_dropped, '=<', 98}, {mqueue_dropped, '>=', 97}]),
|
||||
false = run_fuzzy_match(Fun(1000, 102), [{mqueue_dropped, '=<', 104}, {mqueue_dropped, '>=', 103}]),
|
||||
|
||||
true = run_fuzzy_match(Fun(1000, 103), [{mqueue_dropped, '=<', 104}, {mqueue_dropped, '>=', 103},
|
||||
{mqueue_len, '>=', 1000}]),
|
||||
false = run_fuzzy_match(Fun(1000, 199), [{mqueue_dropped, '>=', 198},
|
||||
{mqueue_len, '=<', 99}, {mqueue_len, '>=', 1}]),
|
||||
ok.
|
||||
|
||||
-endif.
|
||||
|
|
Loading…
Reference in New Issue