chore(gw): fix adding subscription

This commit is contained in:
JianBo He 2021-12-17 15:24:05 +08:00
parent 52c78974d6
commit 92fd9c1eca
9 changed files with 52 additions and 44 deletions

View File

@ -250,7 +250,7 @@ handle_call({subscribe, Topic, SubOpts}, _From,
Result = emqx_coap_session:process_subscribe( Result = emqx_coap_session:process_subscribe(
SubReq, TempMsg, #{}, Session), SubReq, TempMsg, #{}, Session),
NSession = maps:get(session, Result), NSession = maps:get(session, Result),
{reply, ok, Channel#channel{session = NSession}}; {reply, {ok, {MountedTopic, NSubOpts}}, Channel#channel{session = NSession}};
handle_call({unsubscribe, Topic}, _From, handle_call({unsubscribe, Topic}, _From,
Channel = #channel{ Channel = #channel{

View File

@ -497,11 +497,11 @@ examples_gateway_confs() ->
, auto_observe => false , auto_observe => false
, update_msg_publish_condition => <<"always">> , update_msg_publish_condition => <<"always">>
, translators => , translators =>
#{ command => #{topic => <<"/dn/#">>} #{ command => #{topic => <<"dn/#">>}
, response => #{topic => <<"/up/resp">>} , response => #{topic => <<"up/resp">>}
, notify => #{topic => <<"/up/notify">>} , notify => #{topic => <<"up/notify">>}
, register => #{topic => <<"/up/resp">>} , register => #{topic => <<"up/resp">>}
, update => #{topic => <<"/up/resp">>} , update => #{topic => <<"up/resp">>}
} }
, listeners => , listeners =>
[ #{ type => <<"udp">> [ #{ type => <<"udp">>
@ -599,11 +599,11 @@ examples_update_gateway_confs() ->
, auto_observe => false , auto_observe => false
, update_msg_publish_condition => <<"always">> , update_msg_publish_condition => <<"always">>
, translators => , translators =>
#{ command => #{topic => <<"/dn/#">>} #{ command => #{topic => <<"dn/#">>}
, response => #{topic => <<"/up/resp">>} , response => #{topic => <<"up/resp">>}
, notify => #{topic => <<"/up/notify">>} , notify => #{topic => <<"up/notify">>}
, register => #{topic => <<"/up/resp">>} , register => #{topic => <<"up/resp">>}
, update => #{topic => <<"/up/resp">>} , update => #{topic => <<"up/resp">>}
} }
} }
} }

View File

@ -168,9 +168,9 @@ subscriptions(post, #{ bindings := #{name := Name0,
case {maps:get(<<"topic">>, Body, undefined), subopts(Body)} of case {maps:get(<<"topic">>, Body, undefined), subopts(Body)} of
{undefined, _} -> {undefined, _} ->
return_http_error(400, "Miss topic property"); return_http_error(400, "Miss topic property");
{Topic, QoS} -> {Topic, SubOpts} ->
case emqx_gateway_http:client_subscribe( case emqx_gateway_http:client_subscribe(
GwName, ClientId, Topic, QoS) of GwName, ClientId, Topic, SubOpts) of
{error, nosupport} -> {error, nosupport} ->
return_http_error( return_http_error(
405, 405,
@ -181,8 +181,8 @@ subscriptions(post, #{ bindings := #{name := Name0,
<<"Not implemented now">>); <<"Not implemented now">>);
{error, Reason} -> {error, Reason} ->
return_http_error(404, Reason); return_http_error(404, Reason);
ok -> {ok, {NTopic, NSubOpts}}->
{204} {201, maps:merge(NSubOpts, #{topic => NTopic})}
end end
end end
end); end);
@ -204,12 +204,16 @@ subscriptions(delete, #{ bindings := #{name := Name0,
%% Utils %% Utils
subopts(Req) -> subopts(Req) ->
#{ qos => maps:get(<<"qos">>, Req, 0) SubOpts = #{ qos => maps:get(<<"qos">>, Req, 0)
, rap => maps:get(<<"rap">>, Req, 0) , rap => maps:get(<<"rap">>, Req, 0)
, nl => maps:get(<<"nl">>, Req, 0) , nl => maps:get(<<"nl">>, Req, 0)
, rh => maps:get(<<"rh">>, Req, 0) , rh => maps:get(<<"rh">>, Req, 1)
, sub_props => extra_sub_props(maps:get(<<"sub_props">>, Req, #{})) },
}. SubProps = extra_sub_props(maps:get(<<"sub_props">>, Req, #{})),
case maps:size(SubProps) of
0 -> SubOpts;
_ -> maps:put(sub_props, SubProps, SubOpts)
end.
extra_sub_props(Props) -> extra_sub_props(Props) ->
maps:filter( maps:filter(
@ -888,5 +892,4 @@ example_general_subscription() ->
, nl => 0 , nl => 0
, rap => 0 , rap => 0
, rh => 0 , rh => 0
, sub_props => #{}
}. }.

View File

@ -235,7 +235,7 @@ confexp({error, already_exist}) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec lookup_client(gateway_name(), -spec lookup_client(gateway_name(),
emqx_type:clientid(), {atom(), atom()}) -> list(). emqx_types:clientid(), {atom(), atom()}) -> list().
lookup_client(GwName, ClientId, FormatFun) -> lookup_client(GwName, ClientId, FormatFun) ->
lists:append([lookup_client(Node, GwName, {clientid, ClientId}, FormatFun) lists:append([lookup_client(Node, GwName, {clientid, ClientId}, FormatFun)
|| Node <- mria_mnesia:running_nodes()]). || Node <- mria_mnesia:running_nodes()]).
@ -253,7 +253,7 @@ lookup_client(Node, GwName, {clientid, ClientId}, FormatFun) ->
rpc_call(Node, lookup_client, rpc_call(Node, lookup_client,
[Node, GwName, {clientid, ClientId}, FormatFun]). [Node, GwName, {clientid, ClientId}, FormatFun]).
-spec kickout_client(gateway_name(), emqx_type:clientid()) -spec kickout_client(gateway_name(), emqx_types:clientid())
-> {error, any()} -> {error, any()}
| ok. | ok.
kickout_client(GwName, ClientId) -> kickout_client(GwName, ClientId) ->
@ -270,7 +270,7 @@ kickout_client(Node, GwName, ClientId) when Node =:= node() ->
kickout_client(Node, GwName, ClientId) -> kickout_client(Node, GwName, ClientId) ->
rpc_call(Node, kickout_client, [Node, GwName, ClientId]). rpc_call(Node, kickout_client, [Node, GwName, ClientId]).
-spec list_client_subscriptions(gateway_name(), emqx_type:clientid()) -spec list_client_subscriptions(gateway_name(), emqx_types:clientid())
-> {error, any()} -> {error, any()}
| {ok, list()}. | {ok, list()}.
list_client_subscriptions(GwName, ClientId) -> list_client_subscriptions(GwName, ClientId) ->
@ -288,10 +288,10 @@ list_client_subscriptions(GwName, ClientId) ->
end end
end). end).
-spec client_subscribe(gateway_name(), emqx_type:clientid(), -spec client_subscribe(gateway_name(), emqx_types:clientid(),
emqx_type:topic(), emqx_type:subopts()) emqx_types:topic(), emqx_types:subopts())
-> {error, any()} -> {error, any()}
| ok. | {ok, {emqx_types:topic(), emqx_types:subopts()}}.
client_subscribe(GwName, ClientId, Topic, SubOpts) -> client_subscribe(GwName, ClientId, Topic, SubOpts) ->
with_channel(GwName, ClientId, with_channel(GwName, ClientId,
fun(Pid) -> fun(Pid) ->
@ -302,7 +302,7 @@ client_subscribe(GwName, ClientId, Topic, SubOpts) ->
end). end).
-spec client_unsubscribe(gateway_name(), -spec client_unsubscribe(gateway_name(),
emqx_type:clientid(), emqx_type:topic()) emqx_types:clientid(), emqx_types:topic())
-> {error, any()} -> {error, any()}
| ok. | ok.
client_unsubscribe(GwName, ClientId, Topic) -> client_unsubscribe(GwName, ClientId, Topic) ->

View File

@ -334,13 +334,14 @@ handle_call({subscribe_from_client, TopicFilter, Qos}, _From,
deny -> deny ->
{reply, {error, ?RESP_PERMISSION_DENY, <<"Authorization deny">>}, Channel}; {reply, {error, ?RESP_PERMISSION_DENY, <<"Authorization deny">>}, Channel};
_ -> _ ->
{ok, NChannel} = do_subscribe([{TopicFilter, #{qos => Qos}}], Channel), {ok, _, NChannel} = do_subscribe([{TopicFilter, #{qos => Qos}}], Channel),
{reply, ok, NChannel} {reply, ok, NChannel}
end; end;
handle_call({subscribe, Topic, SubOpts}, _From, Channel) -> handle_call({subscribe, Topic, SubOpts}, _From, Channel) ->
{ok, NChannel} = do_subscribe([{Topic, SubOpts}], Channel), {ok,
{reply, ok, NChannel}; [{NTopicFilter, NSubOpts}], NChannel} = do_subscribe([{Topic, SubOpts}], Channel),
{reply, {ok, {NTopicFilter, NSubOpts}}, NChannel};
handle_call({unsubscribe_from_client, TopicFilter}, _From, handle_call({unsubscribe_from_client, TopicFilter}, _From,
Channel = #channel{conn_state = connected}) -> Channel = #channel{conn_state = connected}) ->
@ -437,11 +438,12 @@ terminate(Reason, Channel) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
do_subscribe(TopicFilters, Channel) -> do_subscribe(TopicFilters, Channel) ->
NChannel = lists:foldl( {MadeSubs, NChannel} = lists:foldl(
fun({TopicFilter, SubOpts}, ChannelAcc) -> fun({TopicFilter, SubOpts}, {MadeSubs, ChannelAcc}) ->
do_subscribe(TopicFilter, SubOpts, ChannelAcc) {Sub, Channel1} = do_subscribe(TopicFilter, SubOpts, ChannelAcc),
end, Channel, parse_topic_filters(TopicFilters)), {MadeSubs ++ [Sub], Channel1}
{ok, NChannel}. end, {[], Channel}, parse_topic_filters(TopicFilters)),
{ok, MadeSubs, NChannel}.
%% @private %% @private
do_subscribe(TopicFilter, SubOpts, Channel = do_subscribe(TopicFilter, SubOpts, Channel =
@ -451,17 +453,20 @@ do_subscribe(TopicFilter, SubOpts, Channel =
NTopicFilter = emqx_mountpoint:mount(Mountpoint, TopicFilter), NTopicFilter = emqx_mountpoint:mount(Mountpoint, TopicFilter),
NSubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts), NSubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts),
SubId = maps:get(clientid, ClientInfo, undefined), SubId = maps:get(clientid, ClientInfo, undefined),
%% XXX: is_new?
IsNew = not maps:is_key(NTopicFilter, Subs), IsNew = not maps:is_key(NTopicFilter, Subs),
case IsNew of case IsNew of
true -> true ->
ok = emqx:subscribe(NTopicFilter, SubId, NSubOpts), ok = emqx:subscribe(NTopicFilter, SubId, NSubOpts),
ok = emqx_hooks:run('session.subscribed', ok = emqx_hooks:run('session.subscribed',
[ClientInfo, NTopicFilter, NSubOpts#{is_new => IsNew}]), [ClientInfo, NTopicFilter, NSubOpts#{is_new => IsNew}]),
Channel#channel{subscriptions = Subs#{NTopicFilter => NSubOpts}}; {{NTopicFilter, NSubOpts},
Channel#channel{subscriptions = Subs#{NTopicFilter => NSubOpts}}};
_ -> _ ->
%% Update subopts %% Update subopts
ok = emqx:subscribe(NTopicFilter, SubId, NSubOpts), ok = emqx:subscribe(NTopicFilter, SubId, NSubOpts),
Channel#channel{subscriptions = Subs#{NTopicFilter => NSubOpts}} {{NTopicFilter, NSubOpts},
Channel#channel{subscriptions = Subs#{NTopicFilter => NSubOpts}}}
end. end.
do_unsubscribe(TopicFilters, Channel) -> do_unsubscribe(TopicFilters, Channel) ->

View File

@ -219,7 +219,7 @@ handle_call({subscribe, Topic, SubOpts}, _From,
Subs = emqx_lwm2m_session:info(subscriptions, Session), Subs = emqx_lwm2m_session:info(subscriptions, Session),
NSubs = maps:put(MountedTopic, NSubOpts, Subs), NSubs = maps:put(MountedTopic, NSubOpts, Subs),
NSession = emqx_lwm2m_session:set_subscriptions(NSubs, Session), NSession = emqx_lwm2m_session:set_subscriptions(NSubs, Session),
{reply, ok, Channel#channel{session = NSession}}; {reply, {ok, {MountedTopic, NSubOpts}}, Channel#channel{session = NSession}};
handle_call({unsubscribe, Topic}, _From, handle_call({unsubscribe, Topic}, _From,
Channel = #channel{ Channel = #channel{

View File

@ -1194,8 +1194,8 @@ handle_call({subscribe, Topic, SubOpts}, _From, Channel) ->
2 -> 2 ->
case do_subscribe({?SN_INVALID_TOPIC_ID, case do_subscribe({?SN_INVALID_TOPIC_ID,
Topic, SubOpts}, Channel) of Topic, SubOpts}, Channel) of
{ok, _, NChannel} -> {ok, {_, NTopicName, NSubOpts}, NChannel} ->
reply(ok, NChannel); reply({ok, {NTopicName, NSubOpts}}, NChannel);
{error, ?SN_EXCEED_LIMITATION} -> {error, ?SN_EXCEED_LIMITATION} ->
reply({error, exceed_limitation}, Channel) reply({error, exceed_limitation}, Channel)
end; end;

View File

@ -660,7 +660,7 @@ handle_call({subscribe, Topic, SubOpts}, _From,
), ),
NSubs = [{SubId, MountedTopic, <<"auto">>, NSubOpts}|Subs], NSubs = [{SubId, MountedTopic, <<"auto">>, NSubOpts}|Subs],
NChannel1 = NChannel#channel{subscriptions = NSubs}, NChannel1 = NChannel#channel{subscriptions = NSubs},
reply(ok, NChannel1); reply({ok, {MountedTopic, NSubOpts}}, NChannel1);
{error, ErrMsg, NChannel} -> {error, ErrMsg, NChannel} ->
?SLOG(error, #{ msg => "failed_to_subscribe_topic" ?SLOG(error, #{ msg => "failed_to_subscribe_topic"
, topic => Topic , topic => Topic

View File

@ -395,7 +395,7 @@ t_rest_clienit_info(_) ->
?assertEqual(1, length(Subs)), ?assertEqual(1, length(Subs)),
assert_feilds_apperence([topic, qos], lists:nth(1, Subs)), assert_feilds_apperence([topic, qos], lists:nth(1, Subs)),
{204, _} = request(post, ClientPath ++ "/subscriptions", {201, _} = request(post, ClientPath ++ "/subscriptions",
#{topic => <<"t/a">>, qos => 1, #{topic => <<"t/a">>, qos => 1,
sub_props => #{subid => <<"1001">>}}), sub_props => #{subid => <<"1001">>}}),