Merge pull request #10737 from HJianBo/fix-mqtt-sn-bugs
Avoid urldecode clientid twice
This commit is contained in:
commit
48ceb14755
|
@ -145,10 +145,9 @@ clients(get, #{
|
||||||
clients_insta(get, #{
|
clients_insta(get, #{
|
||||||
bindings := #{
|
bindings := #{
|
||||||
name := Name0,
|
name := Name0,
|
||||||
clientid := ClientId0
|
clientid := ClientId
|
||||||
}
|
}
|
||||||
}) ->
|
}) ->
|
||||||
ClientId = emqx_mgmt_util:urldecode(ClientId0),
|
|
||||||
with_gateway(Name0, fun(GwName, _) ->
|
with_gateway(Name0, fun(GwName, _) ->
|
||||||
case
|
case
|
||||||
emqx_gateway_http:lookup_client(
|
emqx_gateway_http:lookup_client(
|
||||||
|
@ -172,10 +171,9 @@ clients_insta(get, #{
|
||||||
clients_insta(delete, #{
|
clients_insta(delete, #{
|
||||||
bindings := #{
|
bindings := #{
|
||||||
name := Name0,
|
name := Name0,
|
||||||
clientid := ClientId0
|
clientid := ClientId
|
||||||
}
|
}
|
||||||
}) ->
|
}) ->
|
||||||
ClientId = emqx_mgmt_util:urldecode(ClientId0),
|
|
||||||
with_gateway(Name0, fun(GwName, _) ->
|
with_gateway(Name0, fun(GwName, _) ->
|
||||||
_ = emqx_gateway_http:kickout_client(GwName, ClientId),
|
_ = emqx_gateway_http:kickout_client(GwName, ClientId),
|
||||||
{204}
|
{204}
|
||||||
|
@ -185,10 +183,9 @@ clients_insta(delete, #{
|
||||||
subscriptions(get, #{
|
subscriptions(get, #{
|
||||||
bindings := #{
|
bindings := #{
|
||||||
name := Name0,
|
name := Name0,
|
||||||
clientid := ClientId0
|
clientid := ClientId
|
||||||
}
|
}
|
||||||
}) ->
|
}) ->
|
||||||
ClientId = emqx_mgmt_util:urldecode(ClientId0),
|
|
||||||
with_gateway(Name0, fun(GwName, _) ->
|
with_gateway(Name0, fun(GwName, _) ->
|
||||||
case emqx_gateway_http:list_client_subscriptions(GwName, ClientId) of
|
case emqx_gateway_http:list_client_subscriptions(GwName, ClientId) of
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
|
@ -203,11 +200,10 @@ subscriptions(get, #{
|
||||||
subscriptions(post, #{
|
subscriptions(post, #{
|
||||||
bindings := #{
|
bindings := #{
|
||||||
name := Name0,
|
name := Name0,
|
||||||
clientid := ClientId0
|
clientid := ClientId
|
||||||
},
|
},
|
||||||
body := Body
|
body := Body
|
||||||
}) ->
|
}) ->
|
||||||
ClientId = emqx_mgmt_util:urldecode(ClientId0),
|
|
||||||
with_gateway(Name0, fun(GwName, _) ->
|
with_gateway(Name0, fun(GwName, _) ->
|
||||||
case {maps:get(<<"topic">>, Body, undefined), subopts(Body)} of
|
case {maps:get(<<"topic">>, Body, undefined), subopts(Body)} of
|
||||||
{undefined, _} ->
|
{undefined, _} ->
|
||||||
|
@ -231,12 +227,10 @@ subscriptions(post, #{
|
||||||
subscriptions(delete, #{
|
subscriptions(delete, #{
|
||||||
bindings := #{
|
bindings := #{
|
||||||
name := Name0,
|
name := Name0,
|
||||||
clientid := ClientId0,
|
clientid := ClientId,
|
||||||
topic := Topic0
|
topic := Topic
|
||||||
}
|
}
|
||||||
}) ->
|
}) ->
|
||||||
ClientId = emqx_mgmt_util:urldecode(ClientId0),
|
|
||||||
Topic = emqx_mgmt_util:urldecode(Topic0),
|
|
||||||
with_gateway(Name0, fun(GwName, _) ->
|
with_gateway(Name0, fun(GwName, _) ->
|
||||||
_ = emqx_gateway_http:client_unsubscribe(GwName, ClientId, Topic),
|
_ = emqx_gateway_http:client_unsubscribe(GwName, ClientId, Topic),
|
||||||
{204}
|
{204}
|
||||||
|
|
|
@ -110,14 +110,12 @@ listeners(post, #{bindings := #{name := Name0}, body := LConf}) ->
|
||||||
end
|
end
|
||||||
end).
|
end).
|
||||||
|
|
||||||
listeners_insta(delete, #{bindings := #{name := Name0, id := ListenerId0}}) ->
|
listeners_insta(delete, #{bindings := #{name := Name0, id := ListenerId}}) ->
|
||||||
ListenerId = emqx_mgmt_util:urldecode(ListenerId0),
|
|
||||||
with_gateway(Name0, fun(_GwName, _) ->
|
with_gateway(Name0, fun(_GwName, _) ->
|
||||||
ok = emqx_gateway_http:remove_listener(ListenerId),
|
ok = emqx_gateway_http:remove_listener(ListenerId),
|
||||||
{204}
|
{204}
|
||||||
end);
|
end);
|
||||||
listeners_insta(get, #{bindings := #{name := Name0, id := ListenerId0}}) ->
|
listeners_insta(get, #{bindings := #{name := Name0, id := ListenerId}}) ->
|
||||||
ListenerId = emqx_mgmt_util:urldecode(ListenerId0),
|
|
||||||
with_gateway(Name0, fun(_GwName, _) ->
|
with_gateway(Name0, fun(_GwName, _) ->
|
||||||
case emqx_gateway_conf:listener(ListenerId) of
|
case emqx_gateway_conf:listener(ListenerId) of
|
||||||
{ok, Listener} ->
|
{ok, Listener} ->
|
||||||
|
@ -130,9 +128,8 @@ listeners_insta(get, #{bindings := #{name := Name0, id := ListenerId0}}) ->
|
||||||
end);
|
end);
|
||||||
listeners_insta(put, #{
|
listeners_insta(put, #{
|
||||||
body := LConf,
|
body := LConf,
|
||||||
bindings := #{name := Name0, id := ListenerId0}
|
bindings := #{name := Name0, id := ListenerId}
|
||||||
}) ->
|
}) ->
|
||||||
ListenerId = emqx_mgmt_util:urldecode(ListenerId0),
|
|
||||||
with_gateway(Name0, fun(_GwName, _) ->
|
with_gateway(Name0, fun(_GwName, _) ->
|
||||||
{ok, RespConf} = emqx_gateway_http:update_listener(ListenerId, LConf),
|
{ok, RespConf} = emqx_gateway_http:update_listener(ListenerId, LConf),
|
||||||
{200, RespConf}
|
{200, RespConf}
|
||||||
|
@ -141,10 +138,9 @@ listeners_insta(put, #{
|
||||||
listeners_insta_authn(get, #{
|
listeners_insta_authn(get, #{
|
||||||
bindings := #{
|
bindings := #{
|
||||||
name := Name0,
|
name := Name0,
|
||||||
id := ListenerId0
|
id := ListenerId
|
||||||
}
|
}
|
||||||
}) ->
|
}) ->
|
||||||
ListenerId = emqx_mgmt_util:urldecode(ListenerId0),
|
|
||||||
with_gateway(Name0, fun(GwName, _) ->
|
with_gateway(Name0, fun(GwName, _) ->
|
||||||
try emqx_gateway_http:authn(GwName, ListenerId) of
|
try emqx_gateway_http:authn(GwName, ListenerId) of
|
||||||
Authn -> {200, Authn}
|
Authn -> {200, Authn}
|
||||||
|
@ -157,10 +153,9 @@ listeners_insta_authn(post, #{
|
||||||
body := Conf,
|
body := Conf,
|
||||||
bindings := #{
|
bindings := #{
|
||||||
name := Name0,
|
name := Name0,
|
||||||
id := ListenerId0
|
id := ListenerId
|
||||||
}
|
}
|
||||||
}) ->
|
}) ->
|
||||||
ListenerId = emqx_mgmt_util:urldecode(ListenerId0),
|
|
||||||
with_gateway(Name0, fun(GwName, _) ->
|
with_gateway(Name0, fun(GwName, _) ->
|
||||||
{ok, Authn} = emqx_gateway_http:add_authn(GwName, ListenerId, Conf),
|
{ok, Authn} = emqx_gateway_http:add_authn(GwName, ListenerId, Conf),
|
||||||
{201, Authn}
|
{201, Authn}
|
||||||
|
@ -169,10 +164,9 @@ listeners_insta_authn(put, #{
|
||||||
body := Conf,
|
body := Conf,
|
||||||
bindings := #{
|
bindings := #{
|
||||||
name := Name0,
|
name := Name0,
|
||||||
id := ListenerId0
|
id := ListenerId
|
||||||
}
|
}
|
||||||
}) ->
|
}) ->
|
||||||
ListenerId = emqx_mgmt_util:urldecode(ListenerId0),
|
|
||||||
with_gateway(Name0, fun(GwName, _) ->
|
with_gateway(Name0, fun(GwName, _) ->
|
||||||
{ok, Authn} = emqx_gateway_http:update_authn(
|
{ok, Authn} = emqx_gateway_http:update_authn(
|
||||||
GwName, ListenerId, Conf
|
GwName, ListenerId, Conf
|
||||||
|
@ -182,10 +176,9 @@ listeners_insta_authn(put, #{
|
||||||
listeners_insta_authn(delete, #{
|
listeners_insta_authn(delete, #{
|
||||||
bindings := #{
|
bindings := #{
|
||||||
name := Name0,
|
name := Name0,
|
||||||
id := ListenerId0
|
id := ListenerId
|
||||||
}
|
}
|
||||||
}) ->
|
}) ->
|
||||||
ListenerId = emqx_mgmt_util:urldecode(ListenerId0),
|
|
||||||
with_gateway(Name0, fun(GwName, _) ->
|
with_gateway(Name0, fun(GwName, _) ->
|
||||||
ok = emqx_gateway_http:remove_authn(GwName, ListenerId),
|
ok = emqx_gateway_http:remove_authn(GwName, ListenerId),
|
||||||
{204}
|
{204}
|
||||||
|
|
|
@ -1791,14 +1791,14 @@ message_to_packet(
|
||||||
handle_call({subscribe, Topic, SubOpts}, _From, Channel) ->
|
handle_call({subscribe, Topic, SubOpts}, _From, Channel) ->
|
||||||
case do_subscribe({?SN_INVALID_TOPIC_ID, Topic, SubOpts}, Channel) of
|
case do_subscribe({?SN_INVALID_TOPIC_ID, Topic, SubOpts}, Channel) of
|
||||||
{ok, {_, NTopicName, NSubOpts}, NChannel} ->
|
{ok, {_, NTopicName, NSubOpts}, NChannel} ->
|
||||||
reply({ok, {NTopicName, NSubOpts}}, NChannel);
|
reply_and_update({ok, {NTopicName, NSubOpts}}, NChannel);
|
||||||
{error, ?SN_RC2_EXCEED_LIMITATION} ->
|
{error, ?SN_RC2_EXCEED_LIMITATION} ->
|
||||||
reply({error, exceed_limitation}, Channel)
|
reply({error, exceed_limitation}, Channel)
|
||||||
end;
|
end;
|
||||||
handle_call({unsubscribe, Topic}, _From, Channel) ->
|
handle_call({unsubscribe, Topic}, _From, Channel) ->
|
||||||
TopicFilters = [emqx_topic:parse(Topic)],
|
TopicFilters = [emqx_topic:parse(Topic)],
|
||||||
{ok, _, NChannel} = do_unsubscribe(TopicFilters, Channel),
|
{ok, _, NChannel} = do_unsubscribe(TopicFilters, Channel),
|
||||||
reply(ok, NChannel);
|
reply_and_update(ok, NChannel);
|
||||||
handle_call(subscriptions, _From, Channel = #channel{session = Session}) ->
|
handle_call(subscriptions, _From, Channel = #channel{session = Session}) ->
|
||||||
reply({ok, maps:to_list(emqx_session:info(subscriptions, Session))}, Channel);
|
reply({ok, maps:to_list(emqx_session:info(subscriptions, Session))}, Channel);
|
||||||
handle_call(kick, _From, Channel) ->
|
handle_call(kick, _From, Channel) ->
|
||||||
|
@ -2192,6 +2192,9 @@ terminate(_Reason, _Channel) ->
|
||||||
reply(Reply, Channel) ->
|
reply(Reply, Channel) ->
|
||||||
{reply, Reply, Channel}.
|
{reply, Reply, Channel}.
|
||||||
|
|
||||||
|
reply_and_update(Reply, Channel) ->
|
||||||
|
{reply, Reply, [{event, updated}], Channel}.
|
||||||
|
|
||||||
shutdown(Reason, Channel) ->
|
shutdown(Reason, Channel) ->
|
||||||
{shutdown, Reason, Channel}.
|
{shutdown, Reason, Channel}.
|
||||||
|
|
||||||
|
|
|
@ -2259,6 +2259,46 @@ t_clients_subscription_api(_) ->
|
||||||
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
||||||
gen_udp:close(Socket).
|
gen_udp:close(Socket).
|
||||||
|
|
||||||
|
t_clients_api_complex_id(_) ->
|
||||||
|
ClientId = <<"!@#$%^&*()_+{}:\"<>?/">>,
|
||||||
|
ClientIdUriEncoded = cow_qs:urlencode(ClientId),
|
||||||
|
Path = "/gateways/mqttsn/clients/" ++ binary_to_list(ClientIdUriEncoded),
|
||||||
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
||||||
|
send_connect_msg(Socket, ClientId),
|
||||||
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||||
|
%% get
|
||||||
|
{200, Client} = request(get, Path),
|
||||||
|
?assertMatch(#{clientid := ClientId}, Client),
|
||||||
|
%% subscription list
|
||||||
|
{200, []} = request(get, Path ++ "/subscriptions"),
|
||||||
|
%% kickout
|
||||||
|
{204, _} = request(delete, Path),
|
||||||
|
gen_udp:close(Socket).
|
||||||
|
|
||||||
|
t_update_info_after_subscribed_via_api(_) ->
|
||||||
|
ClientId = <<"client_id_test1">>,
|
||||||
|
Path = "/gateways/mqttsn/clients/client_id_test1/subscriptions",
|
||||||
|
{ok, Socket} = gen_udp:open(0, [binary]),
|
||||||
|
send_connect_msg(Socket, ClientId),
|
||||||
|
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||||
|
%% create
|
||||||
|
SubReq = #{
|
||||||
|
topic => <<"tx">>,
|
||||||
|
qos => 1,
|
||||||
|
nl => 0,
|
||||||
|
rap => 0,
|
||||||
|
rh => 0
|
||||||
|
},
|
||||||
|
{201, _SubsResp} = request(post, Path, SubReq),
|
||||||
|
timer:sleep(500),
|
||||||
|
%% assert
|
||||||
|
{200, Client} = request(get, "/gateways/mqttsn/clients/client_id_test1"),
|
||||||
|
?assertMatch(#{subscriptions_cnt := 1}, Client),
|
||||||
|
|
||||||
|
send_disconnect_msg(Socket, undefined),
|
||||||
|
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
||||||
|
gen_udp:close(Socket).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Helper funcs
|
%% Helper funcs
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
Fix the issue where the HTTP API interface of Gateway cannot handle ClientIDs with
|
||||||
|
special characters, such as: `!@#$%^&*()_+{}:"<>?/`.
|
Loading…
Reference in New Issue