diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index cfb7f81e8..99876c917 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -120,13 +120,22 @@ clients_insta(delete, #{ bindings := #{name := GwName0, emqx_gateway_http:kickout_client(GwName, ClientId), {200}. +%% FIXME: +%% List the subscription without mountpoint, but has SubOpts, +%% for example, share group ... subscriptions(get, #{ bindings := #{name := GwName0, clientid := ClientId0} }) -> GwName = binary_to_existing_atom(GwName0), ClientId = emqx_mgmt_util:urldecode(ClientId0), - {200, emqx_gateway_http:list_client_subscriptions(GwName, ClientId)}; + case emqx_gateway_http:list_client_subscriptions(GwName, ClientId) of + {error, Reason} -> + return_http_error(404, Reason); + {ok, Subs} -> + {200, Subs} + end; +%% Create the subscription without mountpoint subscriptions(post, #{ bindings := #{name := GwName0, clientid := ClientId0}, body := Body @@ -147,6 +156,7 @@ subscriptions(post, #{ bindings := #{name := GwName0, end end; +%% Remove the subscription without mountpoint subscriptions(delete, #{ bindings := #{name := GwName0, clientid := ClientId0, topic := Topic0 @@ -166,10 +176,10 @@ subopts(Req) -> , rap => maps:get(<<"rap">>, Req, 0) , nl => maps:get(<<"nl">>, Req, 0) , rh => maps:get(<<"rh">>, Req, 0) - , sub_prop => extra_sub_prop(maps:get(<<"sub_prop">>, Req, #{})) + , sub_props => extra_sub_props(maps:get(<<"sub_props">>, Req, #{})) }. -extra_sub_prop(Props) -> +extra_sub_props(Props) -> maps:filter( fun(_, V) -> V =/= undefined end, #{subid => maps:get(<<"subid">>, Props, undefined)} @@ -595,7 +605,7 @@ properties_client() -> ]). properties_subscription() -> - ExtraProps = [ {subid, integer, + ExtraProps = [ {subid, string, <<"Only stomp protocol, an uniquely identity for " "the subscription. range: 1-65535.">>} ], @@ -610,5 +620,5 @@ properties_subscription() -> <<"Retain as Published option, enum: 0, 1">>} , {rh, integer, <<"Retain Handling option, enum: 0, 1, 2">>} - , {sub_prop, object, ExtraProps} + , {sub_props, object, ExtraProps} ]). diff --git a/apps/emqx_gateway/src/emqx_gateway_cm.erl b/apps/emqx_gateway/src/emqx_gateway_cm.erl index 7a7ad055d..d8b615fe8 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm.erl @@ -48,6 +48,10 @@ , connection_closed/2 ]). +-export([ with_channel/3 + , lookup_channels/2 + ]). + %% Internal funcs for getting tabname by GatewayId -export([cmtabs/1, tabname/2]). diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index 130c31243..5b7055d25 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -141,63 +141,44 @@ kickout_client(Node, GwName, ClientId) -> | {ok, list()}. list_client_subscriptions(GwName, ClientId) -> %% Get the subscriptions from session-info - case emqx_gateway_cm:get_chan_info(GwName, ClientId) of - undefined -> - {error, not_found}; - Infos -> - Subs = maps:get(subscriptions, Infos, #{}), - maps:fold(fun(K, V, Acc) -> - [maps:merge( - #{topic => K}, - maps:with([qos, nl, rap, rh], V)) - |Acc] - end, [], Subs) - end. + with_channel(GwName, ClientId, + fun(Pid) -> + Subs = emqx_gateway_conn:call( + Pid, + subscriptions, ?DEFAULT_CALL_TIMEOUT), + {ok, lists:map(fun({Topic, SubOpts}) -> + SubOpts#{topic => Topic} + end, Subs)} + end). -spec client_subscribe(gateway_name(), emqx_type:clientid(), emqx_type:topic(), emqx_type:subopts()) -> {error, any()} | ok. client_subscribe(GwName, ClientId, Topic, SubOpts) -> - case emqx_gateway_cm:lookup_channels(GwName, ClientId) of - [] -> {error, not_found}; - [Pid] -> - %% fixed conn module? + with_channel(GwName, ClientId, + fun(Pid) -> emqx_gateway_conn:call( Pid, {subscribe, Topic, SubOpts}, ?DEFAULT_CALL_TIMEOUT - ); - Pids -> - ?LOG(warning, "More than one client process ~p was found " - "clientid ~s", [Pids, ClientId]), - _ = [ - emqx_gateway_conn:call( - Pid, {subscribe, Topic, SubOpts}, - ?DEFAULT_CALL_TIMEOUT - ) || Pid <- Pids], - ok - end. + ) + end). -spec client_unsubscribe(gateway_name(), emqx_type:clientid(), emqx_type:topic()) -> {error, any()} | ok. client_unsubscribe(GwName, ClientId, Topic) -> - case emqx_gateway_cm:lookup_channels(GwName, ClientId) of - [] -> {error, not_found}; - [Pid] -> + with_channel(GwName, ClientId, + fun(Pid) -> emqx_gateway_conn:call( - Pid, {unsubscribe, Topic}, - ?DEFAULT_CALL_TIMEOUT); - Pids -> - ?LOG(warning, "More than one client process ~p was found " - "clientid ~s", [Pids, ClientId]), - _ = [ - emqx_gateway_conn:call( - Pid, {unsubscribe, Topic}, - ?DEFAULT_CALL_TIMEOUT - ) || Pid <- Pids], - ok + Pid, {unsubscribe, Topic}, ?DEFAULT_CALL_TIMEOUT) + end). + +with_channel(GwName, ClientId, Fun) -> + case emqx_gateway_cm:with_channel(GwName, ClientId, Fun) of + undefined -> {error, not_found}; + Res -> Res end. %%-------------------------------------------------------------------- @@ -206,10 +187,11 @@ client_unsubscribe(GwName, ClientId, Topic) -> -spec return_http_error(integer(), binary()) -> binary(). return_http_error(Code, Msg) -> - emqx_json:encode( - #{code => codestr(Code), - reason => emqx_gateway_utils:stringfy(Msg) - }). + {Code, emqx_json:encode( + #{code => codestr(Code), + reason => emqx_gateway_utils:stringfy(Msg) + }) + }. codestr(404) -> 'RESOURCE_NOT_FOUND'; codestr(401) -> 'NOT_SUPPORTED_NOW'; diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index 3300ebf69..2b4e9f0a2 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -209,5 +209,6 @@ default_subopts() -> #{rh => 0, %% Retain Handling rap => 0, %% Retain as Publish nl => 0, %% No Local - qos => 0 %% QoS + qos => 0, %% QoS + is_new => true }. diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl index 3e7fe8b42..a57c8e667 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl @@ -393,11 +393,9 @@ handle_in(?PACKET(?CMD_SUBSCRIBE, Headers), [] -> ErrMsg = "Permission denied", handle_out(error, {receipt_id(Headers), ErrMsg}, Channel); - [MountedTopic|_] -> - NChannel1 = NChannel#channel{ - subscriptions = [{SubId, MountedTopic, Ack} - | Subs] - }, + [{MountedTopic, SubOpts}|_] -> + NSubs = [{SubId, MountedTopic, Ack, SubOpts}|Subs], + NChannel1 = NChannel#channel{subscriptions = NSubs}, handle_out(receipt, receipt_id(Headers), NChannel1) end; {error, ErrMsg, NChannel} -> @@ -415,7 +413,7 @@ handle_in(?PACKET(?CMD_UNSUBSCRIBE, Headers), SubId = header(<<"id">>, Headers), {ok, NChannel} = case lists:keyfind(SubId, 1, Subs) of - {SubId, MountedTopic, _Ack} -> + {SubId, MountedTopic, _Ack, _SubOpts} -> Topic = emqx_mountpoint:unmount(Mountpoint, MountedTopic), %% XXX: eval the return topics? _ = run_hooks(Ctx, 'client.unsubscribe', @@ -539,15 +537,16 @@ trans_pipeline([{Func, Args}|More], Outgoings, Channel) -> %% Subs parse_topic_filter({SubId, Topic}, Channel) -> - TopicFilter = emqx_topic:parse(Topic), - {ok, {SubId, TopicFilter}, Channel}. + {ParsedTopic, SubOpts} = emqx_topic:parse(Topic), + NSubOpts = SubOpts#{sub_props => #{subid => SubId}}, + {ok, {SubId, {ParsedTopic, NSubOpts}}, Channel}. -check_subscribed_status({SubId, TopicFilter}, +check_subscribed_status({SubId, {ParsedTopic, _SubOpts}}, #channel{ subscriptions = Subs, clientinfo = #{mountpoint := Mountpoint} }) -> - MountedTopic = emqx_mountpoint:mount(Mountpoint, TopicFilter), + MountedTopic = emqx_mountpoint:mount(Mountpoint, ParsedTopic), case lists:keyfind(SubId, 1, Subs) of {SubId, MountedTopic, _Ack} -> ok; @@ -557,11 +556,11 @@ check_subscribed_status({SubId, TopicFilter}, ok end. -check_sub_acl({_SubId, TopicFilter}, +check_sub_acl({_SubId, {ParsedTopic, _SubOpts}}, #channel{ ctx = Ctx, clientinfo = ClientInfo}) -> - case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, TopicFilter) of + case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, ParsedTopic) of deny -> {error, "ACL Deny"}; allow -> ok end. @@ -571,17 +570,17 @@ do_subscribe(TopicFilters, Channel) -> do_subscribe([], _Channel, Acc) -> lists:reverse(Acc); -do_subscribe([{TopicFilter, Option}|More], +do_subscribe([{ParsedTopic, SubOpts0}|More], Channel = #channel{ ctx = Ctx, clientinfo = ClientInfo = #{clientid := ClientId, mountpoint := Mountpoint}}, Acc) -> - SubOpts = maps:merge(emqx_gateway_utils:default_subopts(), Option), - MountedTopic = emqx_mountpoint:mount(Mountpoint, TopicFilter), + SubOpts = maps:merge(emqx_gateway_utils:default_subopts(), SubOpts0), + MountedTopic = emqx_mountpoint:mount(Mountpoint, ParsedTopic), _ = emqx_broker:subscribe(MountedTopic, ClientId, SubOpts), run_hooks(Ctx, 'session.subscribed', [ClientInfo, MountedTopic, SubOpts]), - do_subscribe(More, Channel, [MountedTopic|Acc]). + do_subscribe(More, Channel, [{MountedTopic, SubOpts}|Acc]). %%-------------------------------------------------------------------- %% Handle outgoing packet @@ -631,7 +630,7 @@ handle_call({subscribe, Topic, SubOpts}, subscriptions = Subs }) -> case maps:get(subid, - maps:get(sub_prop, SubOpts, #{}), + maps:get(sub_props, SubOpts, #{}), undefined) of undefined -> reply({error, no_subid}, Channel); @@ -641,11 +640,12 @@ handle_call({subscribe, Topic, SubOpts}, , fun check_subscribed_status/2 ], {SubId, {Topic, SubOpts}}, Channel) of {ok, {_, TopicFilter}, NChannel} -> - [MountedTopic] = do_subscribe([TopicFilter], NChannel), - NChannel1 = NChannel#channel{ - subscriptions = - [{SubId, MountedTopic, <<"auto">>}|Subs] - }, + [{MountedTopic, NSubOpts}] = do_subscribe( + [TopicFilter], + NChannel + ), + NSubs = [{SubId, MountedTopic, <<"auto">>, NSubOpts}|Subs], + NChannel1 = NChannel#channel{subscriptions = NSubs}, reply(ok, NChannel1); {error, ErrMsg, NChannel} -> ?LOG(error, "Failed to subscribe topic ~s, reason: ~s", @@ -670,6 +670,14 @@ handle_call({unsubscribe, Topic}, subscriptions = lists:keydelete(MountedTopic, 2, Subs)} ); +%% Reply :: [{emqx_types:topic(), emqx_types:subopts()}] +handle_call(subscriptions, Channel = #channel{subscriptions = Subs}) -> + Reply = lists:map( + fun({_SubId, Topic, _Ack, SubOpts}) -> + {Topic, SubOpts} + end, Subs), + reply(Reply, Channel); + handle_call(kick, Channel) -> NChannel = ensure_disconnected(kicked, Channel), Frame = error_frame(undefined, <<"Kicked out">>),