From 1748de5ee3948b947254d4e5a5063aed27382938 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 31 Aug 2021 20:15:21 +0800 Subject: [PATCH] feat(gw): support the sub/unsub operation --- .../src/emqx_gateway_api_clients.erl | 50 +++++-- apps/emqx_gateway/src/emqx_gateway_http.erl | 125 ++++++++++++++---- .../src/exproto/emqx_exproto_channel.erl | 18 +-- .../src/exproto/emqx_exproto_gsvr.erl | 4 +- .../src/mqttsn/emqx_sn_channel.erl | 18 +-- .../src/stomp/emqx_stomp_channel.erl | 56 ++++++-- 6 files changed, 200 insertions(+), 71 deletions(-) diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index ba61a38fc..cfb7f81e8 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -18,6 +18,8 @@ -behaviour(minirest_api). +-include_lib("emqx/include/logger.hrl"). + %% minirest behaviour callbacks -export([api_spec/0]). @@ -92,20 +94,22 @@ clients(get, #{ bindings := #{name := GwName0} end. clients_insta(get, #{ bindings := #{name := GwName0, - clientid := ClientId} + clientid := ClientId0} }) -> GwName = binary_to_existing_atom(GwName0), - TabName = emqx_gateway_cm:tabname(info, GwName), - %% XXX: We need a lookuo function for it instead of a query - #{data := Data} = emqx_mgmt_api:cluster_query( - #{<<"clientid">> => ClientId}, - TabName, ?CLIENT_QS_SCHEMA, ?query_fun - ), - case Data of + ClientId = emqx_mgmt_util:urldecode(ClientId0), + + case emqx_gateway_http:lookup_client(GwName, ClientId, + {?MODULE, format_channel_info}) of [ClientInfo] -> {200, ClientInfo}; + [ClientInfo|_More] -> + ?LOG(warning, "More than one client info was returned on ~s", + [ClientId]), + {200, ClientInfo}; [] -> return_http_error(404, <<"Gateway or ClientId not found">>) + end; clients_insta(delete, #{ bindings := #{name := GwName0, @@ -113,7 +117,7 @@ clients_insta(delete, #{ bindings := #{name := GwName0, }) -> GwName = binary_to_existing_atom(GwName0), ClientId = emqx_mgmt_util:urldecode(ClientId0), - emqx_gateway_http:client_kickout(GwName, ClientId), + emqx_gateway_http:kickout_client(GwName, ClientId), {200}. subscriptions(get, #{ bindings := #{name := GwName0, @@ -121,8 +125,7 @@ subscriptions(get, #{ bindings := #{name := GwName0, }) -> GwName = binary_to_existing_atom(GwName0), ClientId = emqx_mgmt_util:urldecode(ClientId0), - emqx_gateway_http:client_subscriptions(GwName, ClientId), - {200, []}; + {200, emqx_gateway_http:list_client_subscriptions(GwName, ClientId)}; subscriptions(post, #{ bindings := #{name := GwName0, clientid := ClientId0}, @@ -131,8 +134,7 @@ subscriptions(post, #{ bindings := #{name := GwName0, GwName = binary_to_existing_atom(GwName0), ClientId = emqx_mgmt_util:urldecode(ClientId0), - case {maps:get(<<"topic">>, Body, undefined), - maps:get(<<"qos">>, Body, 0)} of + case {maps:get(<<"topic">>, Body, undefined), subopts(Body)} of {undefined, _} -> %% FIXME: more reasonable error code?? return_http_error(404, <<"Request paramter missed: topic">>); @@ -156,6 +158,23 @@ subscriptions(delete, #{ bindings := #{name := GwName0, _ = emqx_gateway_http:client_unsubscribe(GwName, ClientId, Topic), {200}. +%%-------------------------------------------------------------------- +%% Utils + +subopts(Req) -> + #{ qos => maps:get(<<"qos">>, Req, 0) + , rap => maps:get(<<"rap">>, Req, 0) + , nl => maps:get(<<"nl">>, Req, 0) + , rh => maps:get(<<"rh">>, Req, 0) + , sub_prop => extra_sub_prop(maps:get(<<"sub_prop">>, Req, #{})) + }. + +extra_sub_prop(Props) -> + maps:filter( + fun(_, V) -> V =/= undefined end, + #{subid => maps:get(<<"subid">>, Props, undefined)} + ). + %%-------------------------------------------------------------------- %% query funcs @@ -576,6 +595,10 @@ properties_client() -> ]). properties_subscription() -> + ExtraProps = [ {subid, integer, + <<"Only stomp protocol, an uniquely identity for " + "the subscription. range: 1-65535.">>} + ], emqx_mgmt_util:properties( [ {topic, string, <<"Topic Fillter">>} @@ -587,4 +610,5 @@ properties_subscription() -> <<"Retain as Published option, enum: 0, 1">>} , {rh, integer, <<"Retain Handling option, enum: 0, 1, 2">>} + , {sub_prop, object, ExtraProps} ]). diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index a36d97dea..130c31243 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -18,17 +18,20 @@ -module(emqx_gateway_http). -include("include/emqx_gateway.hrl"). +-include_lib("emqx/include/logger.hrl"). %% Mgmt APIs - gateway -export([ gateways/1 ]). %% Mgmt APIs - clients --export([ client_lookup/2 - , client_kickout/2 +-export([ lookup_client/3 + , lookup_client/4 + , kickout_client/2 + , kickout_client/3 + , list_client_subscriptions/2 , client_subscribe/4 , client_unsubscribe/3 - , client_subscriptions/2 ]). %% Utils for http, swagger, etc. @@ -44,6 +47,8 @@ , listeners => [] }. +-define(DEFAULT_CALL_TIMEOUT, 15000). + %%-------------------------------------------------------------------- %% Mgmt APIs - gateway %%-------------------------------------------------------------------- @@ -96,41 +101,104 @@ listener_name(GwName, Type, LisName) -> %% Mgmt APIs - clients %%-------------------------------------------------------------------- --spec client_lookup(gateway_name(), emqx_type:clientid()) - -> {ok, {emqx_types:infos(), emqx_types:stats()}} - | {error, any()}. -client_lookup(_GwName, _ClientId) -> - %% FIXME: The Gap between `ClientInfo in HTTP-API` and - %% ClientInfo defination - todo. +-spec lookup_client(gateway_name(), emqx_type:clientid(), function()) -> list(). +lookup_client(GwName, ClientId, FormatFun) -> + lists:append([lookup_client(Node, GwName, {clientid, ClientId}, FormatFun) + || Node <- ekka_mnesia:running_nodes()]). --spec client_kickout(gateway_name(), emqx_type:clientid()) +lookup_client(Node, GwName, {clientid, ClientId}, {M,F}) when Node =:= node() -> + ChanTab = emqx_gateway_cm:tabname(chan, GwName), + InfoTab = emqx_gateway_cm:tabname(info, GwName), + + lists:append(lists:map( + fun(Key) -> + lists:map(fun M:F/1, ets:lookup(InfoTab, Key)) + end, ets:lookup(ChanTab, ClientId))); + +lookup_client(Node, GwName, {clientid, ClientId}, FormatFun) -> + rpc_call(Node, lookup_client, + [Node, GwName, {clientid, ClientId}, FormatFun]). + +-spec kickout_client(gateway_name(), emqx_type:clientid()) -> {error, any()} | ok. -client_kickout(GwName, ClientId) -> - emqx_gateway_cm:kick_session(GwName, ClientId). +kickout_client(GwName, ClientId) -> + Results = [kickout_client(Node, GwName, ClientId) + || Node <- ekka_mnesia:running_nodes()], + case lists:any(fun(Item) -> Item =:= ok end, Results) of + true -> ok; + false -> lists:last(Results) + end. --spec client_subscriptions(gateway_name(), emqx_type:clientid()) +kickout_client(Node, GwName, ClientId) when Node =:= node() -> + emqx_gateway_cm:kick_session(GwName, ClientId); + +kickout_client(Node, GwName, ClientId) -> + rpc_call(Node, kickout_client, [Node, GwName, ClientId]). + +-spec list_client_subscriptions(gateway_name(), emqx_type:clientid()) -> {error, any()} - | {ok, list()}. %% FIXME: #{<<"t/1">> => - %% #{nl => 0,qos => 0,rap => 0,rh => 0, - %% sub_props => #{}} -client_subscriptions(_GwName, _ClientId) -> - todo. + | {ok, list()}. +list_client_subscriptions(GwName, ClientId) -> + %% Get the subscriptions from session-info + case emqx_gateway_cm:get_chan_info(GwName, ClientId) of + undefined -> + {error, not_found}; + Infos -> + Subs = maps:get(subscriptions, Infos, #{}), + maps:fold(fun(K, V, Acc) -> + [maps:merge( + #{topic => K}, + maps:with([qos, nl, rap, rh], V)) + |Acc] + end, [], Subs) + end. -spec client_subscribe(gateway_name(), emqx_type:clientid(), - emqx_type:topic(), emqx_type:qos()) + emqx_type:topic(), emqx_type:subopts()) -> {error, any()} | ok. -client_subscribe(_GwName, _ClientId, _Topic, _QoS) -> - todo. +client_subscribe(GwName, ClientId, Topic, SubOpts) -> + case emqx_gateway_cm:lookup_channels(GwName, ClientId) of + [] -> {error, not_found}; + [Pid] -> + %% fixed conn module? + emqx_gateway_conn:call( + Pid, {subscribe, Topic, SubOpts}, + ?DEFAULT_CALL_TIMEOUT + ); + Pids -> + ?LOG(warning, "More than one client process ~p was found " + "clientid ~s", [Pids, ClientId]), + _ = [ + emqx_gateway_conn:call( + Pid, {subscribe, Topic, SubOpts}, + ?DEFAULT_CALL_TIMEOUT + ) || Pid <- Pids], + ok + end. -spec client_unsubscribe(gateway_name(), emqx_type:clientid(), emqx_type:topic()) -> {error, any()} | ok. -client_unsubscribe(_GwName, _ClientId, _Topic) -> - todo. +client_unsubscribe(GwName, ClientId, Topic) -> + case emqx_gateway_cm:lookup_channels(GwName, ClientId) of + [] -> {error, not_found}; + [Pid] -> + emqx_gateway_conn:call( + Pid, {unsubscribe, Topic}, + ?DEFAULT_CALL_TIMEOUT); + Pids -> + ?LOG(warning, "More than one client process ~p was found " + "clientid ~s", [Pids, ClientId]), + _ = [ + emqx_gateway_conn:call( + Pid, {unsubscribe, Topic}, + ?DEFAULT_CALL_TIMEOUT + ) || Pid <- Pids], + ok + end. %%-------------------------------------------------------------------- %% Utils @@ -146,3 +214,12 @@ return_http_error(Code, Msg) -> codestr(404) -> 'RESOURCE_NOT_FOUND'; codestr(401) -> 'NOT_SUPPORTED_NOW'; codestr(500) -> 'UNKNOW_ERROR'. + +%%-------------------------------------------------------------------- +%% Internal funcs + +rpc_call(Node, Fun, Args) -> + case rpc:call(Node, ?MODULE, Fun, Args) of + {badrpc, Reason} -> {error, Reason}; + Res -> Res + end. diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl index b1a1ae027..ace9a7be5 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl @@ -310,7 +310,7 @@ handle_call({start_timer, keepalive, Interval}, NChannel = Channel#channel{conninfo = NConnInfo, clientinfo = NClientInfo}, {reply, ok, ensure_keepalive(NChannel)}; -handle_call({subscribe, TopicFilter, Qos}, +handle_call({subscribe_from_client, TopicFilter, Qos}, Channel = #channel{ ctx = Ctx, conn_state = connected, @@ -323,11 +323,19 @@ handle_call({subscribe, TopicFilter, Qos}, {reply, ok, NChannel} end; -handle_call({unsubscribe, TopicFilter}, +handle_call({subscribe, Topic, SubOpts}, Channel) -> + {ok, NChannel} = do_subscribe([{Topic, SubOpts}], Channel), + {reply, ok, NChannel}; + +handle_call({unsubscribe_from_client, TopicFilter}, Channel = #channel{conn_state = connected}) -> {ok, NChannel} = do_unsubscribe([{TopicFilter, #{}}], Channel), {reply, ok, NChannel}; +handle_call({unsubscribe, Topic}, Channel) -> + {ok, NChannel} = do_unsubscribe([Topic], Channel), + {reply, ok, NChannel}; + handle_call({publish, Topic, Qos, Payload}, Channel = #channel{ ctx = Ctx, @@ -363,12 +371,6 @@ handle_cast(Req, Channel) -> -spec handle_info(any(), channel()) -> {ok, channel()} | {shutdown, Reason :: term(), channel()}. -handle_info({subscribe, TopicFilters}, Channel) -> - do_subscribe(TopicFilters, Channel); - -handle_info({unsubscribe, TopicFilters}, Channel) -> - do_unsubscribe(TopicFilters, Channel); - handle_info({sock_closed, Reason}, Channel = #channel{rqueue = Queue, inflight = Inflight}) -> case queue:len(Queue) =:= 0 diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_gsvr.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_gsvr.erl index 346f87452..0135aa8e3 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_gsvr.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_gsvr.erl @@ -96,7 +96,7 @@ publish(Req, Md) -> subscribe(Req = #{conn := Conn, topic := Topic, qos := Qos}, Md) when ?IS_QOS(Qos) -> ?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]), - {ok, response(call(Conn, {subscribe, Topic, Qos})), Md}; + {ok, response(call(Conn, {subscribe_from_client, Topic, Qos})), Md}; subscribe(Req, Md) -> ?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]), @@ -107,7 +107,7 @@ subscribe(Req, Md) -> | {error, grpc_cowboy_h:error_response()}. unsubscribe(Req = #{conn := Conn, topic := Topic}, Md) -> ?LOG(debug, "Recv ~p function with request ~0p", [?FUNCTION_NAME, Req]), - {ok, response(call(Conn, {unsubscribe, Topic})), Md}. + {ok, response(call(Conn, {unsubscribe_from_client, Topic})), Md}. %%-------------------------------------------------------------------- %% Internal funcs diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index 5bba599c8..0707534b7 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -1102,6 +1102,12 @@ message_to_packet(MsgId, Message, | {shutdown, Reason :: term(), Reply :: term(), channel()} | {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}. +handle_call({subscribe, _Topic, _Subopts}, Channel) -> + reply({error, not_supported_now}, Channel); + +handle_call({unsubscribe, _Topic}, Channel) -> + reply({error, not_supported_now}, Channel); + handle_call(kick, Channel) -> NChannel = ensure_disconnected(kicked, Channel), shutdown_and_reply(kicked, ok, NChannel); @@ -1150,18 +1156,6 @@ handle_cast(_Req, Channel) -> -spec handle_info(Info :: term(), channel()) -> ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}. -%% XXX: Received from the emqx-management ??? -%handle_info({subscribe, TopicFilters}, Channel ) -> -% {_, NChannel} = lists:foldl( -% fun({TopicFilter, SubOpts}, {_, ChannelAcc}) -> -% do_subscribe(TopicFilter, SubOpts, ChannelAcc) -% end, {[], Channel}, parse_topic_filters(TopicFilters)), -% {ok, NChannel}; -% -%handle_info({unsubscribe, TopicFilters}, Channel) -> -% {_RC, NChannel} = process_unsubscribe(TopicFilters, #{}, Channel), -% {ok, NChannel}; - handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) -> shutdown(Reason, Channel); diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl index 250f43988..3e7fe8b42 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl @@ -626,6 +626,50 @@ handle_out(receipt, ReceiptId, Channel) -> -> {reply, Reply :: term(), channel()} | {shutdown, Reason :: term(), Reply :: term(), channel()} | {shutdown, Reason :: term(), Reply :: term(), stomp_frame(), channel()}). +handle_call({subscribe, Topic, SubOpts}, + Channel = #channel{ + subscriptions = Subs + }) -> + case maps:get(subid, + maps:get(sub_prop, SubOpts, #{}), + undefined) of + undefined -> + reply({error, no_subid}, Channel); + SubId -> + case emqx_misc:pipeline( + [ fun parse_topic_filter/2 + , fun check_subscribed_status/2 + ], {SubId, {Topic, SubOpts}}, Channel) of + {ok, {_, TopicFilter}, NChannel} -> + [MountedTopic] = do_subscribe([TopicFilter], NChannel), + NChannel1 = NChannel#channel{ + subscriptions = + [{SubId, MountedTopic, <<"auto">>}|Subs] + }, + reply(ok, NChannel1); + {error, ErrMsg, NChannel} -> + ?LOG(error, "Failed to subscribe topic ~s, reason: ~s", + [Topic, ErrMsg]), + reply({error, ErrMsg}, NChannel) + end + end; + +handle_call({unsubscribe, Topic}, + Channel = #channel{ + ctx = Ctx, + clientinfo = ClientInfo = #{mountpoint := Mountpoint}, + subscriptions = Subs + }) -> + {ParsedTopic, _SubOpts} = emqx_topic:parse(Topic), + MountedTopic = emqx_mountpoint:mount(Mountpoint, ParsedTopic), + ok = emqx_broker:unsubscribe(MountedTopic), + _ = run_hooks(Ctx, 'session.unsubscribe', + [ClientInfo, MountedTopic, #{}]), + reply(ok, + Channel#channel{ + subscriptions = lists:keydelete(MountedTopic, 2, Subs)} + ); + handle_call(kick, Channel) -> NChannel = ensure_disconnected(kicked, Channel), Frame = error_frame(undefined, <<"Kicked out">>), @@ -678,18 +722,6 @@ handle_cast(_Req, Channel) -> -spec(handle_info(Info :: term(), channel()) -> ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}). -%% XXX: Received from the emqx-management ??? -%handle_info({subscribe, TopicFilters}, Channel ) -> -% {_, NChannel} = lists:foldl( -% fun({TopicFilter, SubOpts}, {_, ChannelAcc}) -> -% do_subscribe(TopicFilter, SubOpts, ChannelAcc) -% end, {[], Channel}, parse_topic_filters(TopicFilters)), -% {ok, NChannel}; -% -%handle_info({unsubscribe, TopicFilters}, Channel) -> -% {_RC, NChannel} = process_unsubscribe(TopicFilters, #{}, Channel), -% {ok, NChannel}; - handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) -> shutdown(Reason, Channel);