From 14b39224d45083d4ddb1b1997e938ca66763354e Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 31 Aug 2021 15:35:53 +0800 Subject: [PATCH] chore(gw): clients http implement skeleton --- apps/emqx_gateway/src/emqx_gateway.erl | 8 +- apps/emqx_gateway/src/emqx_gateway_api.erl | 21 +- ...lient.erl => emqx_gateway_api_clients.erl} | 212 +++++++++++++----- apps/emqx_gateway/src/emqx_gateway_cli.erl | 3 +- ...gateway_intr.erl => emqx_gateway_http.erl} | 74 +++++- apps/emqx_gateway/src/emqx_gateway_utils.erl | 4 + 6 files changed, 245 insertions(+), 77 deletions(-) rename apps/emqx_gateway/src/{emqx_gateway_api_client.erl => emqx_gateway_api_clients.erl} (69%) rename apps/emqx_gateway/src/{emqx_gateway_intr.erl => emqx_gateway_http.erl} (56%) diff --git a/apps/emqx_gateway/src/emqx_gateway.erl b/apps/emqx_gateway/src/emqx_gateway.erl index 79ea5d8a4..596b47547 100644 --- a/apps/emqx_gateway/src/emqx_gateway.erl +++ b/apps/emqx_gateway/src/emqx_gateway.erl @@ -25,7 +25,7 @@ , post_config_update/4 ]). -%% APIs +%% Gateway APIs -export([ registered_gateway/0 , load/2 , unload/1 @@ -48,7 +48,7 @@ registered_gateway() -> emqx_gateway_registry:list(). %%-------------------------------------------------------------------- -%% Gateway Instace APIs +%% Gateway APIs -spec list() -> [gateway()]. list() -> @@ -96,7 +96,8 @@ update_rawconf(RawName, RawConfDiff) -> %%-------------------------------------------------------------------- %% Config Handler --spec pre_config_update(emqx_config:update_request(), emqx_config:raw_config()) -> +-spec pre_config_update(emqx_config:update_request(), + emqx_config:raw_config()) -> {ok, emqx_config:update_request()} | {error, term()}. pre_config_update({RawName, RawConfDiff}, RawConf) -> {ok, emqx_map_lib:deep_merge(RawConf, #{RawName => RawConfDiff})}. @@ -117,4 +118,3 @@ post_config_update({RawName, _}, NewConfig, OldConfig, _AppEnvs) -> %%-------------------------------------------------------------------- %% Internal funcs %%-------------------------------------------------------------------- - diff --git a/apps/emqx_gateway/src/emqx_gateway_api.erl b/apps/emqx_gateway/src/emqx_gateway_api.erl index b9ae0a234..2ff002fd5 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api.erl @@ -20,8 +20,13 @@ -compile(nowarn_unused_function). --import(emqx_mgmt_util, [ schema/1 - ]). +-import(emqx_mgmt_util, + [ schema/1 + ]). + +-import(emqx_gateway_http, + [ return_http_error/2 + ]). %% minirest behaviour callbacks -export([api_spec/0]). @@ -342,7 +347,7 @@ gateway(get, Request) -> undefined -> all; S0 -> binary_to_existing_atom(S0, utf8) end, - {200, emqx_gateway_intr:gateways(Status)}. + {200, emqx_gateway_http:gateways(Status)}. gateway_insta(delete, #{bindings := #{name := Name0}}) -> Name = binary_to_existing_atom(Name0), @@ -380,13 +385,3 @@ gateway_insta(put, #{body := RawConfsIn, gateway_insta_stats(get, _Req) -> return_http_error(401, <<"Implement it later (maybe 5.1)">>). - -return_http_error(Code, Msg) -> - emqx_json:encode( - #{code => codestr(Code), - reason => emqx_gateway_utils:stringfy(Msg) - }). - -codestr(404) -> 'RESOURCE_NOT_FOUND'; -codestr(401) -> 'NOT_SUPPORTED_NOW'; -codestr(500) -> 'UNKNOW_ERROR'. diff --git a/apps/emqx_gateway/src/emqx_gateway_api_client.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl similarity index 69% rename from apps/emqx_gateway/src/emqx_gateway_api_client.erl rename to apps/emqx_gateway/src/emqx_gateway_api_clients.erl index d2c3b466c..ba61a38fc 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_client.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -13,8 +13,8 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- -%% --module(emqx_gateway_api_client). + +-module(emqx_gateway_api_clients). -behaviour(minirest_api). @@ -32,6 +32,10 @@ , format_channel_info/1 ]). +-import(emqx_gateway_http, + [ return_http_error/2 + ]). + %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- @@ -46,17 +50,14 @@ apis() -> , {"/gateway/:name/clients/:clientid/subscriptions/:topic", subscriptions} ]. - -define(CLIENT_QS_SCHEMA, [ {<<"node">>, atom} , {<<"clientid">>, binary} , {<<"username">>, binary} - %%, {<<"zone">>, atom} , {<<"ip_address">>, ip} , {<<"conn_state">>, atom} , {<<"clean_start">>, atom} - %%, {<<"proto_name">>, binary} - %%, {<<"proto_ver">>, integer} + , {<<"proto_ver">>, integer} , {<<"like_clientid">>, binary} , {<<"like_username">>, binary} , {<<"gte_created_at">>, timestamp} @@ -90,14 +91,69 @@ clients(get, #{ bindings := #{name := GwName0} {200, Response} end. -clients_insta(get, _Req) -> - {200, <<"{}">>}; -clients_insta(delete, _Req) -> +clients_insta(get, #{ bindings := #{name := GwName0, + clientid := ClientId} + }) -> + GwName = binary_to_existing_atom(GwName0), + TabName = emqx_gateway_cm:tabname(info, GwName), + %% XXX: We need a lookuo function for it instead of a query + #{data := Data} = emqx_mgmt_api:cluster_query( + #{<<"clientid">> => ClientId}, + TabName, ?CLIENT_QS_SCHEMA, ?query_fun + ), + case Data of + [ClientInfo] -> + {200, ClientInfo}; + [] -> + return_http_error(404, <<"Gateway or ClientId not found">>) + end; + +clients_insta(delete, #{ bindings := #{name := GwName0, + clientid := ClientId0} + }) -> + GwName = binary_to_existing_atom(GwName0), + ClientId = emqx_mgmt_util:urldecode(ClientId0), + emqx_gateway_http:client_kickout(GwName, ClientId), {200}. -subscriptions(get, _Req) -> +subscriptions(get, #{ bindings := #{name := GwName0, + clientid := ClientId0} + }) -> + GwName = binary_to_existing_atom(GwName0), + ClientId = emqx_mgmt_util:urldecode(ClientId0), + emqx_gateway_http:client_subscriptions(GwName, ClientId), {200, []}; -subscriptions(delete, _Req) -> + +subscriptions(post, #{ bindings := #{name := GwName0, + clientid := ClientId0}, + body := Body + }) -> + GwName = binary_to_existing_atom(GwName0), + ClientId = emqx_mgmt_util:urldecode(ClientId0), + + case {maps:get(<<"topic">>, Body, undefined), + maps:get(<<"qos">>, Body, 0)} of + {undefined, _} -> + %% FIXME: more reasonable error code?? + return_http_error(404, <<"Request paramter missed: topic">>); + {Topic, QoS} -> + case emqx_gateway_http:client_subscribe(GwName, ClientId, Topic, QoS) of + {error, Reason} -> + return_http_error(404, Reason); + ok -> + {200} + end + end; + +subscriptions(delete, #{ bindings := #{name := GwName0, + clientid := ClientId0, + topic := Topic0 + } + }) -> + GwName = binary_to_existing_atom(GwName0), + ClientId = emqx_mgmt_util:urldecode(ClientId0), + Topic = emqx_mgmt_util:urldecode(Topic0), + _ = emqx_gateway_http:client_unsubscribe(GwName, ClientId, Topic), {200}. %%-------------------------------------------------------------------- @@ -148,8 +204,6 @@ ms(conn_state, X) -> #{conn_state => X}; ms(clean_start, X) -> #{conninfo => #{clean_start => X}}; -ms(proto_name, X) -> - #{conninfo => #{proto_name => X}}; ms(proto_ver, X) -> #{conninfo => #{proto_ver => X}}; ms(connected_at, X) -> @@ -188,49 +242,79 @@ run_fuzzy_match(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, _, RE}|Fuzzy]) - %%-------------------------------------------------------------------- %% format funcs -format_channel_info({_, ClientInfo, ClientStats}) -> - Fun = - fun - (_Key, Value, Current) when is_map(Value) -> - maps:merge(Current, Value); - (Key, Value, Current) -> - maps:put(Key, Value, Current) - end, - StatsMap = maps:without([memory, next_pkt_id, total_heap_size], - maps:from_list(ClientStats)), - ClientInfoMap0 = maps:fold(Fun, #{}, ClientInfo), - IpAddress = peer_to_binary(maps:get(peername, ClientInfoMap0)), - Connected = maps:get(conn_state, ClientInfoMap0) =:= connected, - ClientInfoMap1 = maps:merge(StatsMap, ClientInfoMap0), - ClientInfoMap2 = maps:put(node, node(), ClientInfoMap1), - ClientInfoMap3 = maps:put(ip_address, IpAddress, ClientInfoMap2), - ClientInfoMap = maps:put(connected, Connected, ClientInfoMap3), - RemoveList = [ - auth_result - , peername - , sockname - , peerhost - , conn_state - , send_pend - , conn_props - , peercert - , sockstate - , subscriptions - , receive_maximum - , protocol - , is_superuser - , sockport - , anonymous - , mountpoint - , socktype - , active_n - , await_rel_timeout - , conn_mod - , sockname - , retry_interval - , upgrade_qos - ], - maps:without(RemoveList, ClientInfoMap). +format_channel_info({_, Infos, Stats}) -> + ClientInfo = maps:get(clientinfo, Infos, #{}), + ConnInfo = maps:get(conninfo, Infos, #{}), + SessInfo = maps:get(session, Infos, #{}), + FetchX = [ {node, ClientInfo, node()} + , {clientid, ClientInfo} + , {username, ClientInfo} + , {proto_name, ConnInfo} + , {proto_ver, ConnInfo} + , {ip_address, {peername, ConnInfo, fun peer_to_binary/1}} + , {is_bridge, ClientInfo, false} + , {connected_at, + {connected_at, ConnInfo, fun emqx_gateway_utils:unix_ts_to_rfc3339/1}} + , {disconnected_at, + {disconnected_at, ConnInfo, fun emqx_gateway_utils:unix_ts_to_rfc3339/1}} + , {connected, {conn_state, Infos, fun conn_state_to_connected/1}} + , {keepalive, ClientInfo, 0} + , {clean_start, ConnInfo, true} + , {expiry_interval, ConnInfo, 0} + , {created_at, + {created_at, SessInfo, fun emqx_gateway_utils:unix_ts_to_rfc3339/1}} + , {subscriptions_cnt, Stats, 0} + , {subscriptions_max, Stats, infinity} + , {inflight_cnt, Stats, 0} + , {inflight_max, Stats, infinity} + , {mqueue_len, Stats, 0} + , {mqueue_max, Stats, infinity} + , {mqueue_dropped, Stats, 0} + , {awaiting_rel_cnt, Stats, 0} + , {awaiting_rel_max, Stats, infinity} + , {recv_oct, Stats, 0} + , {recv_cnt, Stats, 0} + , {recv_pkt, Stats, 0} + , {recv_msg, Stats, 0} + , {send_oct, Stats, 0} + , {send_cnt, Stats, 0} + , {send_pkt, Stats, 0} + , {send_msg, Stats, 0} + , {mailbox_len, Stats, 0} + , {heap_size, Stats, 0} + , {reductions, Stats, 0} + ], + eval(FetchX). + +eval(Ls) -> + eval(Ls, #{}). +eval([], AccMap) -> + AccMap; +eval([{K, Vx}|More], AccMap) -> + case valuex_get(K, Vx) of + undefined -> eval(More, AccMap#{K => null}); + Value -> eval(More, AccMap#{K => Value}) + end; +eval([{K, Vx, Default}|More], AccMap) -> + case valuex_get(K, Vx) of + undefined -> eval(More, AccMap#{K => Default}); + Value -> eval(More, AccMap#{K => Value}) + end. + +valuex_get(K, Vx) when is_map(Vx); is_list(Vx) -> + key_get(K, Vx); +valuex_get(_K, {InKey, Obj}) when is_map(Obj); is_list(Obj) -> + key_get(InKey, Obj); +valuex_get(_K, {InKey, Obj, MappingFun}) when is_map(Obj); is_list(Obj) -> + case key_get(InKey, Obj) of + undefined -> undefined; + Val -> MappingFun(Val) + end. + +key_get(K, M) when is_map(M) -> + maps:get(K, M, undefined); +key_get(K, L) when is_list(L) -> + proplists:get_value(K, L). peer_to_binary({Addr, Port}) -> AddrBinary = list_to_binary(inet:ntoa(Addr)), @@ -239,6 +323,9 @@ peer_to_binary({Addr, Port}) -> peer_to_binary(Addr) -> list_to_binary(inet:ntoa(Addr)). +conn_state_to_connected(connected) -> true; +conn_state_to_connected(_) -> false. + %%-------------------------------------------------------------------- %% Swagger defines %%-------------------------------------------------------------------- @@ -325,6 +412,7 @@ params_client_searching_in_qs() -> , {username, string} , {ip_address, string} , {conn_state, string} + , {proto_ver, string} , {clean_start, boolean} , {like_clientid, string} , {like_username, string} @@ -426,6 +514,10 @@ properties_client() -> "when connected is false">>} , {connected, boolean, <<"Whether the client is connected">>} + %% FIXME: the will_msg attribute is not a general attribute + %% for every protocol. But it should be returned to frontend if someone + %% want it + %% %, {will_msg, string, % <<"Client will message">>} %, {zone, string, @@ -488,5 +580,11 @@ properties_subscription() -> [ {topic, string, <<"Topic Fillter">>} , {qos, integer, - <<"QoS level">>} + <<"QoS level, enum: 0, 1, 2">>} + , {nl, integer, %% FIXME: why not boolean? + <<"No Local option, enum: 0, 1">>} + , {rap, integer, + <<"Retain as Published option, enum: 0, 1">>} + , {rh, integer, + <<"Retain Handling option, enum: 0, 1, 2">>} ]). diff --git a/apps/emqx_gateway/src/emqx_gateway_cli.erl b/apps/emqx_gateway/src/emqx_gateway_cli.erl index b446cda92..6ccb444f0 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cli.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cli.erl @@ -51,7 +51,7 @@ is_cmd(Fun) -> gateway(["list"]) -> lists:foreach(fun(#{name := Name} = Gateway) -> - %% XXX: More infos: listeners?, connected? + %% TODO: More infos: listeners?, connected? Status = maps:get(status, Gateway, stopped), emqx_ctl:print("Gateway(name=~s, status=~s)~n", [Name, Status]) @@ -106,6 +106,7 @@ gateway(_) -> ]). 'gateway-clients'(["list", Name]) -> + %% FIXME: page me. for example: --limit 100 --page 10 ??? InfoTab = emqx_gateway_cm:tabname(info, Name), case ets:info(InfoTab) of undefined -> diff --git a/apps/emqx_gateway/src/emqx_gateway_intr.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl similarity index 56% rename from apps/emqx_gateway/src/emqx_gateway_intr.erl rename to apps/emqx_gateway/src/emqx_gateway_http.erl index add37e1c5..a36d97dea 100644 --- a/apps/emqx_gateway/src/emqx_gateway_intr.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -15,11 +15,26 @@ %%-------------------------------------------------------------------- %% @doc Gateway Interface Module for HTTP-APIs --module(emqx_gateway_intr). +-module(emqx_gateway_http). +-include("include/emqx_gateway.hrl"). + +%% Mgmt APIs - gateway -export([ gateways/1 ]). +%% Mgmt APIs - clients +-export([ client_lookup/2 + , client_kickout/2 + , client_subscribe/4 + , client_unsubscribe/3 + , client_subscriptions/2 + ]). + +%% Utils for http, swagger, etc. +-export([ return_http_error/2 + ]). + -type gateway_summary() :: #{ name := binary() , status := running | stopped | unloaded @@ -30,7 +45,7 @@ }. %%-------------------------------------------------------------------- -%% APIs +%% Mgmt APIs - gateway %%-------------------------------------------------------------------- -spec gateways(Status :: all | running | stopped | unloaded) @@ -76,3 +91,58 @@ get_listeners_status(GwName, Config) -> %% @private listener_name(GwName, Type, LisName) -> list_to_atom(lists:concat([GwName, ":", Type, ":", LisName])). + +%%-------------------------------------------------------------------- +%% Mgmt APIs - clients +%%-------------------------------------------------------------------- + +-spec client_lookup(gateway_name(), emqx_type:clientid()) + -> {ok, {emqx_types:infos(), emqx_types:stats()}} + | {error, any()}. +client_lookup(_GwName, _ClientId) -> + %% FIXME: The Gap between `ClientInfo in HTTP-API` and + %% ClientInfo defination + todo. + +-spec client_kickout(gateway_name(), emqx_type:clientid()) + -> {error, any()} + | ok. +client_kickout(GwName, ClientId) -> + emqx_gateway_cm:kick_session(GwName, ClientId). + +-spec client_subscriptions(gateway_name(), emqx_type:clientid()) + -> {error, any()} + | {ok, list()}. %% FIXME: #{<<"t/1">> => + %% #{nl => 0,qos => 0,rap => 0,rh => 0, + %% sub_props => #{}} +client_subscriptions(_GwName, _ClientId) -> + todo. + +-spec client_subscribe(gateway_name(), emqx_type:clientid(), + emqx_type:topic(), emqx_type:qos()) + -> {error, any()} + | ok. +client_subscribe(_GwName, _ClientId, _Topic, _QoS) -> + todo. + +-spec client_unsubscribe(gateway_name(), + emqx_type:clientid(), emqx_type:topic()) + -> {error, any()} + | ok. +client_unsubscribe(_GwName, _ClientId, _Topic) -> + todo. + +%%-------------------------------------------------------------------- +%% Utils +%%-------------------------------------------------------------------- + +-spec return_http_error(integer(), binary()) -> binary(). +return_http_error(Code, Msg) -> + emqx_json:encode( + #{code => codestr(Code), + reason => emqx_gateway_utils:stringfy(Msg) + }). + +codestr(404) -> 'RESOURCE_NOT_FOUND'; +codestr(401) -> 'NOT_SUPPORTED_NOW'; +codestr(500) -> 'UNKNOW_ERROR'. diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index dc4e38e7d..3300ebf69 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -28,6 +28,7 @@ -export([ apply/2 , format_listenon/1 + , unix_ts_to_rfc3339/1 , unix_ts_to_rfc3339/2 ]). @@ -121,6 +122,9 @@ unix_ts_to_rfc3339(Key, Map) -> emqx_rule_funcs:unix_ts_to_rfc3339(Ts, <<"millisecond">>)} end. +unix_ts_to_rfc3339(Ts) -> + emqx_rule_funcs:unix_ts_to_rfc3339(Ts, <<"millisecond">>). + -spec stringfy(term()) -> binary(). stringfy(T) -> iolist_to_binary(io_lib:format("~0p", [T])).