feat: client apisupport mqueue_len/mqueue_dropped filter
This commit is contained in:
parent
03c1efa439
commit
3df5c15819
|
@ -35,6 +35,10 @@
|
||||||
{<<"_gte_created_at">>, timestamp},
|
{<<"_gte_created_at">>, timestamp},
|
||||||
{<<"_lte_created_at">>, timestamp},
|
{<<"_lte_created_at">>, timestamp},
|
||||||
{<<"_gte_connected_at">>, timestamp},
|
{<<"_gte_connected_at">>, timestamp},
|
||||||
|
{<<"_gte_mqueue_len">>, integer},
|
||||||
|
{<<"_lte_mqueue_len">>, integer},
|
||||||
|
{<<"_gte_mqueue_dropped">>, integer},
|
||||||
|
{<<"_lte_mqueue_dropped">>, integer},
|
||||||
{<<"_lte_connected_at">>, timestamp}]}).
|
{<<"_lte_connected_at">>, timestamp}]}).
|
||||||
|
|
||||||
-rest_api(#{name => list_clients,
|
-rest_api(#{name => list_clients,
|
||||||
|
@ -320,14 +324,14 @@ format_acl_cache({{PubSub, Topic}, {AclResult, Timestamp}}) ->
|
||||||
%% Query Functions
|
%% Query Functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
query({Qs, []}, Start, Limit) ->
|
|
||||||
Ms = qs2ms(Qs),
|
|
||||||
emqx_mgmt_api:select_table(emqx_channel_info, Ms, Start, Limit, fun format_channel_info/1);
|
|
||||||
|
|
||||||
query({Qs, Fuzzy}, Start, Limit) ->
|
query({Qs, Fuzzy}, Start, Limit) ->
|
||||||
Ms = qs2ms(Qs),
|
case qs2ms(Qs) of
|
||||||
MatchFun = match_fun(Ms, Fuzzy),
|
{Ms, []}when Fuzzy =:= [] ->
|
||||||
emqx_mgmt_api:traverse_table(emqx_channel_info, MatchFun, Start, Limit, fun format_channel_info/1).
|
emqx_mgmt_api:select_table(emqx_channel_info, Ms, Start, Limit, fun format_channel_info/1);
|
||||||
|
{Ms, FuzzyStats} ->
|
||||||
|
MatchFun = match_fun(Ms, Fuzzy ++ FuzzyStats),
|
||||||
|
emqx_mgmt_api:traverse_table(emqx_channel_info, MatchFun, Start, Limit, fun format_channel_info/1)
|
||||||
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Match funcs
|
%% Match funcs
|
||||||
|
@ -351,27 +355,48 @@ run_fuzzy_match(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, like, SubStr}|Fu
|
||||||
undefined -> <<>>;
|
undefined -> <<>>;
|
||||||
V -> V
|
V -> V
|
||||||
end,
|
end,
|
||||||
binary:match(Val, SubStr) /= nomatch andalso run_fuzzy_match(E, Fuzzy).
|
binary:match(Val, SubStr) /= nomatch andalso run_fuzzy_match(E, Fuzzy);
|
||||||
|
run_fuzzy_match(E = {_, _, Stats}, [{Key, '>=', Int}|Fuzzy]) ->
|
||||||
|
case lists:keyfind(Key, 1, Stats) of
|
||||||
|
{_, Val} when Val >= Int -> run_fuzzy_match(E, Fuzzy);
|
||||||
|
_ -> false
|
||||||
|
end;
|
||||||
|
run_fuzzy_match(E = {_, _, Stats}, [{Key, '=<', Int}|Fuzzy]) ->
|
||||||
|
case lists:keyfind(Key, 1, Stats) of
|
||||||
|
{_, Val} when Val =< Int -> run_fuzzy_match(E, Fuzzy);
|
||||||
|
_ -> false
|
||||||
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% QueryString to Match Spec
|
%% QueryString to Match Spec
|
||||||
|
|
||||||
-spec qs2ms(list()) -> ets:match_spec().
|
-spec qs2ms(list()) -> ets:match_spec().
|
||||||
qs2ms(Qs) ->
|
qs2ms(Qs) ->
|
||||||
{MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}),
|
{MatchHead, Conds, FuzzyStats} = qs2ms(Qs, 2, #{}, [], []),
|
||||||
[{{'$1', MtchHead, '_'}, Conds, ['$_']}].
|
{[{{'$1', MatchHead, '_'}, Conds, ['$_']}], FuzzyStats}.
|
||||||
|
|
||||||
qs2ms([], _, {MtchHead, Conds}) ->
|
qs2ms([], _, MatchHead, Conds, FuzzyStats) ->
|
||||||
{MtchHead, lists:reverse(Conds)};
|
{MatchHead, lists:reverse(Conds), FuzzyStats};
|
||||||
|
|
||||||
qs2ms([{Key, '=:=', Value} | Rest], N, {MtchHead, Conds}) ->
|
qs2ms([{Key, '=:=', Value} | Rest], N, MatchHead, Conds, FuzzyStats) ->
|
||||||
NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(Key, Value)),
|
NMatchHead = emqx_mgmt_util:merge_maps(MatchHead, ms(Key, Value)),
|
||||||
qs2ms(Rest, N, {NMtchHead, Conds});
|
qs2ms(Rest, N, NMatchHead, Conds, FuzzyStats);
|
||||||
qs2ms([Qs | Rest], N, {MtchHead, Conds}) ->
|
qs2ms([Qs | Rest], N, MatchHead, Conds, FuzzyStats) ->
|
||||||
Holder = binary_to_atom(iolist_to_binary(["$", integer_to_list(N)]), utf8),
|
Holder = binary_to_atom(iolist_to_binary(["$", integer_to_list(N)]), utf8),
|
||||||
NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(element(1, Qs), Holder)),
|
case ms(element(1, Qs), Holder) of
|
||||||
NConds = put_conds(Qs, Holder, Conds),
|
fuzzy_stats ->
|
||||||
qs2ms(Rest, N+1, {NMtchHead, NConds}).
|
FuzzyStats1 =
|
||||||
|
case Qs of
|
||||||
|
{_Key, _Symbol, _Val} -> [Qs | FuzzyStats];
|
||||||
|
{Key, Symbol1, Val1, Symbol2, Val2} ->
|
||||||
|
[{Key, Symbol1, Val1}, {Key, Symbol2, Val2} | FuzzyStats]
|
||||||
|
end,
|
||||||
|
qs2ms(Rest, N, MatchHead, Conds, FuzzyStats1);
|
||||||
|
Ms ->
|
||||||
|
NMatchHead = emqx_mgmt_util:merge_maps(MatchHead, Ms),
|
||||||
|
NConds = put_conds(Qs, Holder, Conds),
|
||||||
|
qs2ms(Rest, N+1, NMatchHead, NConds, FuzzyStats)
|
||||||
|
end.
|
||||||
|
|
||||||
put_conds({_, Op, V}, Holder, Conds) ->
|
put_conds({_, Op, V}, Holder, Conds) ->
|
||||||
[{Op, Holder, V} | Conds];
|
[{Op, Holder, V} | Conds];
|
||||||
|
@ -398,7 +423,11 @@ ms(proto_ver, X) ->
|
||||||
ms(connected_at, X) ->
|
ms(connected_at, X) ->
|
||||||
#{conninfo => #{connected_at => X}};
|
#{conninfo => #{connected_at => X}};
|
||||||
ms(created_at, X) ->
|
ms(created_at, X) ->
|
||||||
#{session => #{created_at => X}}.
|
#{session => #{created_at => X}};
|
||||||
|
ms(mqueue_len, _X) ->
|
||||||
|
fuzzy_stats;
|
||||||
|
ms(mqueue_dropped, _X) ->
|
||||||
|
fuzzy_stats.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% EUnits
|
%% EUnits
|
||||||
|
@ -421,6 +450,10 @@ params2qs_test() ->
|
||||||
{<<"_lte_created_at">>, 5},
|
{<<"_lte_created_at">>, 5},
|
||||||
{<<"_gte_connected_at">>, 1},
|
{<<"_gte_connected_at">>, 1},
|
||||||
{<<"_lte_connected_at">>, 5},
|
{<<"_lte_connected_at">>, 5},
|
||||||
|
{<<"_lte_mqueue_len">>, 10},
|
||||||
|
{<<"_gte_mqueue_len">>, 5},
|
||||||
|
{<<"_lte_mqueue_dropped">>, 100},
|
||||||
|
{<<"_gte_mqueue_dropped">>, 50},
|
||||||
{<<"_like_clientid">>, <<"a">>},
|
{<<"_like_clientid">>, <<"a">>},
|
||||||
{<<"_like_username">>, <<"e">>}
|
{<<"_like_username">>, <<"e">>}
|
||||||
],
|
],
|
||||||
|
@ -440,12 +473,17 @@ params2qs_test() ->
|
||||||
{'=<','$2', 5000},
|
{'=<','$2', 5000},
|
||||||
{'>=','$3', 1000},
|
{'>=','$3', 1000},
|
||||||
{'=<','$3', 5000}],
|
{'=<','$3', 5000}],
|
||||||
{10, {Qs1, []}} = emqx_mgmt_api:params2qs(Params, QsSchema),
|
ExpectedFuzzyStats = [{mqueue_dropped,'=<',100},
|
||||||
[{{'$1', MtchHead, _}, Condi, _}] = qs2ms(Qs1),
|
{mqueue_dropped,'>=',50},
|
||||||
|
{mqueue_len,'=<',10},
|
||||||
|
{mqueue_len,'>=',5}],
|
||||||
|
{12, {Qs1, []}} = emqx_mgmt_api:params2qs(Params, QsSchema),
|
||||||
|
{[{{'$1', MtchHead, _}, Condi, _}], FuzzyStats} = qs2ms(Qs1),
|
||||||
?assertEqual(ExpectedMtchHead, MtchHead),
|
?assertEqual(ExpectedMtchHead, MtchHead),
|
||||||
?assertEqual(ExpectedCondi, Condi),
|
?assertEqual(ExpectedCondi, Condi),
|
||||||
|
?assertEqual(ExpectedFuzzyStats, lists:sort(FuzzyStats)),
|
||||||
|
|
||||||
[{{'$1', #{}, '_'}, [], ['$_']}] = qs2ms([]).
|
{[{{'$1', #{}, '_'}, [], ['$_']}], []} = qs2ms([]).
|
||||||
|
|
||||||
fuzzy_match_test() ->
|
fuzzy_match_test() ->
|
||||||
Info = {emqx_channel_info,
|
Info = {emqx_channel_info,
|
||||||
|
@ -467,4 +505,31 @@ fuzzy_match_test() ->
|
||||||
true = run_fuzzy_match(Info, [{clientid, like, <<"de">>},
|
true = run_fuzzy_match(Info, [{clientid, like, <<"de">>},
|
||||||
{username, like, <<"[]">>}]).
|
{username, like, <<"[]">>}]).
|
||||||
|
|
||||||
|
fuzzy_stats_test() ->
|
||||||
|
Fun = fun(Len, Dropped) ->
|
||||||
|
{emqx_channel_info,
|
||||||
|
#{clientinfo =>
|
||||||
|
#{ clientid => <<"abcde">>, username => <<"abc\\name*[]()">>}},
|
||||||
|
[{mqueue_len, Len}, {mqueue_max,1000}, {mqueue_dropped, Dropped}]
|
||||||
|
}
|
||||||
|
end,
|
||||||
|
false = run_fuzzy_match(Fun(0, 100), [{mqueue_len, '>=', 1}]),
|
||||||
|
true = run_fuzzy_match(Fun(1, 100), [{mqueue_len, '>=', 1}]),
|
||||||
|
false = run_fuzzy_match(Fun(1, 100), [{mqueue_len, '>=', 2}]),
|
||||||
|
true = run_fuzzy_match(Fun(99, 100), [{mqueue_len, '=<', 100}, {mqueue_len, '>=', 98}]),
|
||||||
|
false = run_fuzzy_match(Fun(99, 100), [{mqueue_len, '=<', 101}, {mqueue_len, '>=', 100}]),
|
||||||
|
|
||||||
|
false = run_fuzzy_match(Fun(1000, 0), [{mqueue_dropped, '>=', 1}]),
|
||||||
|
true = run_fuzzy_match(Fun(1000, 1), [{mqueue_dropped, '>=', 1}]),
|
||||||
|
false = run_fuzzy_match(Fun(1000, 1), [{mqueue_dropped, '>=', 2}]),
|
||||||
|
true = run_fuzzy_match(Fun(1000, 99), [{mqueue_dropped, '=<', 100}, {mqueue_dropped, '>=', 98}]),
|
||||||
|
false = run_fuzzy_match(Fun(1000, 99), [{mqueue_dropped, '=<', 98}, {mqueue_dropped, '>=', 97}]),
|
||||||
|
false = run_fuzzy_match(Fun(1000, 102), [{mqueue_dropped, '=<', 104}, {mqueue_dropped, '>=', 103}]),
|
||||||
|
|
||||||
|
true = run_fuzzy_match(Fun(1000, 103), [{mqueue_dropped, '=<', 104}, {mqueue_dropped, '>=', 103},
|
||||||
|
{mqueue_len, '>=', 1000}]),
|
||||||
|
false = run_fuzzy_match(Fun(1000, 199), [{mqueue_dropped, '>=', 198},
|
||||||
|
{mqueue_len, '=<', 99}, {mqueue_len, '>=', 1}]),
|
||||||
|
ok.
|
||||||
|
|
||||||
-endif.
|
-endif.
|
||||||
|
|
Loading…
Reference in New Issue