diff --git a/apps/emqx_gateway/src/emqx_gateway_api_client.erl b/apps/emqx_gateway/src/emqx_gateway_api_client.erl index 0a94cd906..d2c3b466c 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_client.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_client.erl @@ -21,11 +21,21 @@ %% minirest behaviour callbacks -export([api_spec/0]). +%% http handlers -export([ clients/2 , clients_insta/2 , subscriptions/2 ]). +%% internal exports (for client query) +-export([ query/4 + , format_channel_info/1 + ]). + +%%-------------------------------------------------------------------- +%% APIs +%%-------------------------------------------------------------------- + api_spec() -> {metadata(apis()), []}. @@ -36,8 +46,49 @@ apis() -> , {"/gateway/:name/clients/:clientid/subscriptions/:topic", subscriptions} ]. -clients(get, _Req) -> - {200, []}. + +-define(CLIENT_QS_SCHEMA, + [ {<<"node">>, atom} + , {<<"clientid">>, binary} + , {<<"username">>, binary} + %%, {<<"zone">>, atom} + , {<<"ip_address">>, ip} + , {<<"conn_state">>, atom} + , {<<"clean_start">>, atom} + %%, {<<"proto_name">>, binary} + %%, {<<"proto_ver">>, integer} + , {<<"like_clientid">>, binary} + , {<<"like_username">>, binary} + , {<<"gte_created_at">>, timestamp} + , {<<"lte_created_at">>, timestamp} + , {<<"gte_connected_at">>, timestamp} + , {<<"lte_connected_at">>, timestamp} + ]). + +-define(query_fun, {?MODULE, query}). +-define(format_fun, {?MODULE, format_channel_info}). + +clients(get, #{ bindings := #{name := GwName0} + , query_string := Qs + }) -> + GwName = binary_to_existing_atom(GwName0), + TabName = emqx_gateway_cm:tabname(info, GwName), + case maps:get(<<"node">>, Qs, undefined) of + undefined -> + Response = emqx_mgmt_api:cluster_query( + Qs, TabName, + ?CLIENT_QS_SCHEMA, ?query_fun + ), + {200, Response}; + Node1 -> + Node = binary_to_atom(Node1, utf8), + ParamsWithoutNode = maps:without([<<"node">>], Qs), + Response = emqx_mgmt_api:node_query( + Node, ParamsWithoutNode, + TabName, ?CLIENT_QS_SCHEMA, ?query_fun + ), + {200, Response} + end. clients_insta(get, _Req) -> {200, <<"{}">>}; @@ -49,6 +100,145 @@ subscriptions(get, _Req) -> subscriptions(delete, _Req) -> {200}. +%%-------------------------------------------------------------------- +%% query funcs + +query(Tab, {Qs, []}, Start, Limit) -> + Ms = qs2ms(Qs), + emqx_mgmt_api:select_table(Tab, Ms, Start, Limit, + fun format_channel_info/1); + +query(Tab, {Qs, Fuzzy}, Start, Limit) -> + Ms = qs2ms(Qs), + MatchFun = match_fun(Ms, Fuzzy), + emqx_mgmt_api:traverse_table(Tab, MatchFun, Start, Limit, + fun format_channel_info/1). + +qs2ms(Qs) -> + {MtchHead, Conds} = qs2ms(Qs, 2, {#{}, []}), + [{{'$1', MtchHead, '_'}, Conds, ['$_']}]. + +qs2ms([], _, {MtchHead, Conds}) -> + {MtchHead, lists:reverse(Conds)}; + +qs2ms([{Key, '=:=', Value} | Rest], N, {MtchHead, Conds}) -> + NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(Key, Value)), + qs2ms(Rest, N, {NMtchHead, Conds}); +qs2ms([Qs | Rest], N, {MtchHead, Conds}) -> + Holder = binary_to_atom(iolist_to_binary(["$", integer_to_list(N)]), utf8), + NMtchHead = emqx_mgmt_util:merge_maps(MtchHead, ms(element(1, Qs), Holder)), + NConds = put_conds(Qs, Holder, Conds), + qs2ms(Rest, N+1, {NMtchHead, NConds}). + +put_conds({_, Op, V}, Holder, Conds) -> + [{Op, Holder, V} | Conds]; +put_conds({_, Op1, V1, Op2, V2}, Holder, Conds) -> + [{Op2, Holder, V2}, + {Op1, Holder, V1} | Conds]. + +ms(clientid, X) -> + #{clientinfo => #{clientid => X}}; +ms(username, X) -> + #{clientinfo => #{username => X}}; +ms(zone, X) -> + #{clientinfo => #{zone => X}}; +ms(ip_address, X) -> + #{clientinfo => #{peerhost => X}}; +ms(conn_state, X) -> + #{conn_state => X}; +ms(clean_start, X) -> + #{conninfo => #{clean_start => X}}; +ms(proto_name, X) -> + #{conninfo => #{proto_name => X}}; +ms(proto_ver, X) -> + #{conninfo => #{proto_ver => X}}; +ms(connected_at, X) -> + #{conninfo => #{connected_at => X}}; +ms(created_at, X) -> + #{session => #{created_at => X}}. + +%%-------------------------------------------------------------------- +%% Match funcs + +match_fun(Ms, Fuzzy) -> + MsC = ets:match_spec_compile(Ms), + REFuzzy = lists:map(fun({K, like, S}) -> + {ok, RE} = re:compile(S), + {K, like, RE} + end, Fuzzy), + fun(Rows) -> + case ets:match_spec_run(Rows, MsC) of + [] -> []; + Ls -> + lists:filter(fun(E) -> + run_fuzzy_match(E, REFuzzy) + end, Ls) + end + end. + +run_fuzzy_match(_, []) -> + true; +run_fuzzy_match(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, _, RE}|Fuzzy]) -> + Val = case maps:get(Key, ClientInfo, "") of + undefined -> ""; + V -> V + end, + re:run(Val, RE, [{capture, none}]) == match andalso run_fuzzy_match(E, Fuzzy). + +%%-------------------------------------------------------------------- +%% format funcs + +format_channel_info({_, ClientInfo, ClientStats}) -> + Fun = + fun + (_Key, Value, Current) when is_map(Value) -> + maps:merge(Current, Value); + (Key, Value, Current) -> + maps:put(Key, Value, Current) + end, + StatsMap = maps:without([memory, next_pkt_id, total_heap_size], + maps:from_list(ClientStats)), + ClientInfoMap0 = maps:fold(Fun, #{}, ClientInfo), + IpAddress = peer_to_binary(maps:get(peername, ClientInfoMap0)), + Connected = maps:get(conn_state, ClientInfoMap0) =:= connected, + ClientInfoMap1 = maps:merge(StatsMap, ClientInfoMap0), + ClientInfoMap2 = maps:put(node, node(), ClientInfoMap1), + ClientInfoMap3 = maps:put(ip_address, IpAddress, ClientInfoMap2), + ClientInfoMap = maps:put(connected, Connected, ClientInfoMap3), + RemoveList = [ + auth_result + , peername + , sockname + , peerhost + , conn_state + , send_pend + , conn_props + , peercert + , sockstate + , subscriptions + , receive_maximum + , protocol + , is_superuser + , sockport + , anonymous + , mountpoint + , socktype + , active_n + , await_rel_timeout + , conn_mod + , sockname + , retry_interval + , upgrade_qos + ], + maps:without(RemoveList, ClientInfoMap). + +peer_to_binary({Addr, Port}) -> + AddrBinary = list_to_binary(inet:ntoa(Addr)), + PortBinary = integer_to_binary(Port), + <>; +peer_to_binary(Addr) -> + list_to_binary(inet:ntoa(Addr)). + %%-------------------------------------------------------------------- %% Swagger defines %%-------------------------------------------------------------------- @@ -112,7 +302,7 @@ swagger("/gateway/:name/clients/:clientid/subscriptions", post) -> }; swagger("/gateway/:name/clients/:clientid/subscriptions/:topic", delete) -> #{ description => <<"Unsubscribe the topic for client">> - , parameters => params_client_insta() ++ params_topic_name_in_path() + , parameters => params_topic_name_in_path() ++ params_client_insta() , responses => #{ <<"404">> => schema_not_found() , <<"204">> => schema_no_content() @@ -120,13 +310,13 @@ swagger("/gateway/:name/clients/:clientid/subscriptions/:topic", delete) -> }. params_client_query() -> - params_client_searching_in_qs() - ++ emqx_mgmt_util:page_params() - ++ params_gateway_name_in_path(). + params_gateway_name_in_path() + ++ params_client_searching_in_qs() + ++ emqx_mgmt_util:page_params(). params_client_insta() -> - params_gateway_name_in_path() - ++ params_clientid_in_path(). + params_clientid_in_path() + ++ params_gateway_name_in_path(). params_client_searching_in_qs() -> queries( @@ -183,11 +373,10 @@ schema_no_content() -> #{description => <<"No Content">>}. schema_clients_list() -> - emqx_mgmt_util:array_schema( + emqx_mgmt_util:page_schema( #{ type => object , properties => properties_client() - }, - <<"Client lists">> + } ). schema_client() ->