From 92fd9c1ecaf925996abc53031e583700757ab925 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 17 Dec 2021 15:24:05 +0800 Subject: [PATCH] chore(gw): fix adding subscription --- .../src/coap/emqx_coap_channel.erl | 2 +- apps/emqx_gateway/src/emqx_gateway_api.erl | 20 +++++++-------- .../src/emqx_gateway_api_clients.erl | 25 +++++++++++-------- apps/emqx_gateway/src/emqx_gateway_http.erl | 14 +++++------ .../src/exproto/emqx_exproto_channel.erl | 25 +++++++++++-------- .../src/lwm2m/emqx_lwm2m_channel.erl | 2 +- .../src/mqttsn/emqx_sn_channel.erl | 4 +-- .../src/stomp/emqx_stomp_channel.erl | 2 +- apps/emqx_gateway/test/emqx_stomp_SUITE.erl | 2 +- 9 files changed, 52 insertions(+), 44 deletions(-) diff --git a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl index c75eda47c..9b1e5821c 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl @@ -250,7 +250,7 @@ handle_call({subscribe, Topic, SubOpts}, _From, Result = emqx_coap_session:process_subscribe( SubReq, TempMsg, #{}, Session), NSession = maps:get(session, Result), - {reply, ok, Channel#channel{session = NSession}}; + {reply, {ok, {MountedTopic, NSubOpts}}, Channel#channel{session = NSession}}; handle_call({unsubscribe, Topic}, _From, Channel = #channel{ diff --git a/apps/emqx_gateway/src/emqx_gateway_api.erl b/apps/emqx_gateway/src/emqx_gateway_api.erl index 7bf52b4ec..3ee209f19 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api.erl @@ -497,11 +497,11 @@ examples_gateway_confs() -> , auto_observe => false , update_msg_publish_condition => <<"always">> , translators => - #{ command => #{topic => <<"/dn/#">>} - , response => #{topic => <<"/up/resp">>} - , notify => #{topic => <<"/up/notify">>} - , register => #{topic => <<"/up/resp">>} - , update => #{topic => <<"/up/resp">>} + #{ command => #{topic => <<"dn/#">>} + , response => #{topic => <<"up/resp">>} + , notify => #{topic => <<"up/notify">>} + , register => #{topic => <<"up/resp">>} + , update => #{topic => <<"up/resp">>} } , listeners => [ #{ type => <<"udp">> @@ -599,11 +599,11 @@ examples_update_gateway_confs() -> , auto_observe => false , update_msg_publish_condition => <<"always">> , translators => - #{ command => #{topic => <<"/dn/#">>} - , response => #{topic => <<"/up/resp">>} - , notify => #{topic => <<"/up/notify">>} - , register => #{topic => <<"/up/resp">>} - , update => #{topic => <<"/up/resp">>} + #{ command => #{topic => <<"dn/#">>} + , response => #{topic => <<"up/resp">>} + , notify => #{topic => <<"up/notify">>} + , register => #{topic => <<"up/resp">>} + , update => #{topic => <<"up/resp">>} } } } diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index ef59cfb19..69a06be61 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -168,9 +168,9 @@ subscriptions(post, #{ bindings := #{name := Name0, case {maps:get(<<"topic">>, Body, undefined), subopts(Body)} of {undefined, _} -> return_http_error(400, "Miss topic property"); - {Topic, QoS} -> + {Topic, SubOpts} -> case emqx_gateway_http:client_subscribe( - GwName, ClientId, Topic, QoS) of + GwName, ClientId, Topic, SubOpts) of {error, nosupport} -> return_http_error( 405, @@ -181,8 +181,8 @@ subscriptions(post, #{ bindings := #{name := Name0, <<"Not implemented now">>); {error, Reason} -> return_http_error(404, Reason); - ok -> - {204} + {ok, {NTopic, NSubOpts}}-> + {201, maps:merge(NSubOpts, #{topic => NTopic})} end end end); @@ -204,12 +204,16 @@ subscriptions(delete, #{ bindings := #{name := Name0, %% Utils subopts(Req) -> - #{ qos => maps:get(<<"qos">>, Req, 0) - , rap => maps:get(<<"rap">>, Req, 0) - , nl => maps:get(<<"nl">>, Req, 0) - , rh => maps:get(<<"rh">>, Req, 0) - , sub_props => extra_sub_props(maps:get(<<"sub_props">>, Req, #{})) - }. + SubOpts = #{ qos => maps:get(<<"qos">>, Req, 0) + , rap => maps:get(<<"rap">>, Req, 0) + , nl => maps:get(<<"nl">>, Req, 0) + , rh => maps:get(<<"rh">>, Req, 1) + }, + SubProps = extra_sub_props(maps:get(<<"sub_props">>, Req, #{})), + case maps:size(SubProps) of + 0 -> SubOpts; + _ -> maps:put(sub_props, SubProps, SubOpts) + end. extra_sub_props(Props) -> maps:filter( @@ -888,5 +892,4 @@ example_general_subscription() -> , nl => 0 , rap => 0 , rh => 0 - , sub_props => #{} }. diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index 6e97865a0..810a79987 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -235,7 +235,7 @@ confexp({error, already_exist}) -> %%-------------------------------------------------------------------- -spec lookup_client(gateway_name(), - emqx_type:clientid(), {atom(), atom()}) -> list(). + emqx_types:clientid(), {atom(), atom()}) -> list(). lookup_client(GwName, ClientId, FormatFun) -> lists:append([lookup_client(Node, GwName, {clientid, ClientId}, FormatFun) || Node <- mria_mnesia:running_nodes()]). @@ -253,7 +253,7 @@ lookup_client(Node, GwName, {clientid, ClientId}, FormatFun) -> rpc_call(Node, lookup_client, [Node, GwName, {clientid, ClientId}, FormatFun]). --spec kickout_client(gateway_name(), emqx_type:clientid()) +-spec kickout_client(gateway_name(), emqx_types:clientid()) -> {error, any()} | ok. kickout_client(GwName, ClientId) -> @@ -270,7 +270,7 @@ kickout_client(Node, GwName, ClientId) when Node =:= node() -> kickout_client(Node, GwName, ClientId) -> rpc_call(Node, kickout_client, [Node, GwName, ClientId]). --spec list_client_subscriptions(gateway_name(), emqx_type:clientid()) +-spec list_client_subscriptions(gateway_name(), emqx_types:clientid()) -> {error, any()} | {ok, list()}. list_client_subscriptions(GwName, ClientId) -> @@ -288,10 +288,10 @@ list_client_subscriptions(GwName, ClientId) -> end end). --spec client_subscribe(gateway_name(), emqx_type:clientid(), - emqx_type:topic(), emqx_type:subopts()) +-spec client_subscribe(gateway_name(), emqx_types:clientid(), + emqx_types:topic(), emqx_types:subopts()) -> {error, any()} - | ok. + | {ok, {emqx_types:topic(), emqx_types:subopts()}}. client_subscribe(GwName, ClientId, Topic, SubOpts) -> with_channel(GwName, ClientId, fun(Pid) -> @@ -302,7 +302,7 @@ client_subscribe(GwName, ClientId, Topic, SubOpts) -> end). -spec client_unsubscribe(gateway_name(), - emqx_type:clientid(), emqx_type:topic()) + emqx_types:clientid(), emqx_types:topic()) -> {error, any()} | ok. client_unsubscribe(GwName, ClientId, Topic) -> diff --git a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl index bb72d24e6..bb8072358 100644 --- a/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl +++ b/apps/emqx_gateway/src/exproto/emqx_exproto_channel.erl @@ -334,13 +334,14 @@ handle_call({subscribe_from_client, TopicFilter, Qos}, _From, deny -> {reply, {error, ?RESP_PERMISSION_DENY, <<"Authorization deny">>}, Channel}; _ -> - {ok, NChannel} = do_subscribe([{TopicFilter, #{qos => Qos}}], Channel), + {ok, _, NChannel} = do_subscribe([{TopicFilter, #{qos => Qos}}], Channel), {reply, ok, NChannel} end; handle_call({subscribe, Topic, SubOpts}, _From, Channel) -> - {ok, NChannel} = do_subscribe([{Topic, SubOpts}], Channel), - {reply, ok, NChannel}; + {ok, + [{NTopicFilter, NSubOpts}], NChannel} = do_subscribe([{Topic, SubOpts}], Channel), + {reply, {ok, {NTopicFilter, NSubOpts}}, NChannel}; handle_call({unsubscribe_from_client, TopicFilter}, _From, Channel = #channel{conn_state = connected}) -> @@ -437,11 +438,12 @@ terminate(Reason, Channel) -> %%-------------------------------------------------------------------- do_subscribe(TopicFilters, Channel) -> - NChannel = lists:foldl( - fun({TopicFilter, SubOpts}, ChannelAcc) -> - do_subscribe(TopicFilter, SubOpts, ChannelAcc) - end, Channel, parse_topic_filters(TopicFilters)), - {ok, NChannel}. + {MadeSubs, NChannel} = lists:foldl( + fun({TopicFilter, SubOpts}, {MadeSubs, ChannelAcc}) -> + {Sub, Channel1} = do_subscribe(TopicFilter, SubOpts, ChannelAcc), + {MadeSubs ++ [Sub], Channel1} + end, {[], Channel}, parse_topic_filters(TopicFilters)), + {ok, MadeSubs, NChannel}. %% @private do_subscribe(TopicFilter, SubOpts, Channel = @@ -451,17 +453,20 @@ do_subscribe(TopicFilter, SubOpts, Channel = NTopicFilter = emqx_mountpoint:mount(Mountpoint, TopicFilter), NSubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts), SubId = maps:get(clientid, ClientInfo, undefined), + %% XXX: is_new? IsNew = not maps:is_key(NTopicFilter, Subs), case IsNew of true -> ok = emqx:subscribe(NTopicFilter, SubId, NSubOpts), ok = emqx_hooks:run('session.subscribed', [ClientInfo, NTopicFilter, NSubOpts#{is_new => IsNew}]), - Channel#channel{subscriptions = Subs#{NTopicFilter => NSubOpts}}; + {{NTopicFilter, NSubOpts}, + Channel#channel{subscriptions = Subs#{NTopicFilter => NSubOpts}}}; _ -> %% Update subopts ok = emqx:subscribe(NTopicFilter, SubId, NSubOpts), - Channel#channel{subscriptions = Subs#{NTopicFilter => NSubOpts}} + {{NTopicFilter, NSubOpts}, + Channel#channel{subscriptions = Subs#{NTopicFilter => NSubOpts}}} end. do_unsubscribe(TopicFilters, Channel) -> diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl index 3edee0f47..c01c6adc5 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl @@ -219,7 +219,7 @@ handle_call({subscribe, Topic, SubOpts}, _From, Subs = emqx_lwm2m_session:info(subscriptions, Session), NSubs = maps:put(MountedTopic, NSubOpts, Subs), NSession = emqx_lwm2m_session:set_subscriptions(NSubs, Session), - {reply, ok, Channel#channel{session = NSession}}; + {reply, {ok, {MountedTopic, NSubOpts}}, Channel#channel{session = NSession}}; handle_call({unsubscribe, Topic}, _From, Channel = #channel{ diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl index 0b1bdde73..74c8d2c12 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_channel.erl @@ -1194,8 +1194,8 @@ handle_call({subscribe, Topic, SubOpts}, _From, Channel) -> 2 -> case do_subscribe({?SN_INVALID_TOPIC_ID, Topic, SubOpts}, Channel) of - {ok, _, NChannel} -> - reply(ok, NChannel); + {ok, {_, NTopicName, NSubOpts}, NChannel} -> + reply({ok, {NTopicName, NSubOpts}}, NChannel); {error, ?SN_EXCEED_LIMITATION} -> reply({error, exceed_limitation}, Channel) end; diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl index 17a7644ec..dfca57cf1 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl @@ -660,7 +660,7 @@ handle_call({subscribe, Topic, SubOpts}, _From, ), NSubs = [{SubId, MountedTopic, <<"auto">>, NSubOpts}|Subs], NChannel1 = NChannel#channel{subscriptions = NSubs}, - reply(ok, NChannel1); + reply({ok, {MountedTopic, NSubOpts}}, NChannel1); {error, ErrMsg, NChannel} -> ?SLOG(error, #{ msg => "failed_to_subscribe_topic" , topic => Topic diff --git a/apps/emqx_gateway/test/emqx_stomp_SUITE.erl b/apps/emqx_gateway/test/emqx_stomp_SUITE.erl index 179471127..6dd6860b7 100644 --- a/apps/emqx_gateway/test/emqx_stomp_SUITE.erl +++ b/apps/emqx_gateway/test/emqx_stomp_SUITE.erl @@ -395,7 +395,7 @@ t_rest_clienit_info(_) -> ?assertEqual(1, length(Subs)), assert_feilds_apperence([topic, qos], lists:nth(1, Subs)), - {204, _} = request(post, ClientPath ++ "/subscriptions", + {201, _} = request(post, ClientPath ++ "/subscriptions", #{topic => <<"t/a">>, qos => 1, sub_props => #{subid => <<"1001">>}}),