Use username replace id(issue#1737) (#1961)

* Use username replace id(issue#1737)

* Add test case for issue#1737

* Make with_connection/2 support batch connect
This commit is contained in:
tigercl 2018-11-21 22:51:33 +08:00 committed by turtleDeng
parent a2c658ba19
commit 14b8036576
4 changed files with 131 additions and 20 deletions

View File

@ -649,6 +649,12 @@ zone.external.mqueue_store_qos0 = true
## Value: String
## zone.external.mountpoint = devicebound/
## Whether use username replace client id
##
## Value: boolean
## Default: false
zone.external.use_username_as_clientid = false
##--------------------------------------------------------------------
## Internal Zone
@ -708,6 +714,12 @@ zone.internal.mqueue_store_qos0 = true
## Value: String
## zone.internal.mountpoint = cloudbound/
## Whether use username replace client id
##
## Value: boolean
## Default: false
zone.internal.use_username_as_clientid = false
##--------------------------------------------------------------------
## Listeners
##--------------------------------------------------------------------

View File

@ -794,6 +794,12 @@ end}.
{datatype, string}
]}.
%% @doc Use username replace client id
{mapping, "zone.$name.use_username_as_clientid", "emqx.zones", [
{default, false},
{datatype, {enum, [true, false]}}
]}.
{translation, "emqx.zones", fun(Conf) ->
Mapping = fun("retain_available", Val) ->
{mqtt_retain_available, Val};

View File

@ -289,13 +289,17 @@ process_packet(?CONNECT_PACKET(
client_id = ClientId,
username = Username,
password = Password} = Connect), PState) ->
emqx_logger:add_metadata_client_id(ClientId),
NewClientId = maybe_use_username_as_clientid(ClientId, Username, PState),
emqx_logger:add_metadata_client_id(NewClientId),
%% TODO: Mountpoint...
%% Msg -> emqx_mountpoint:mount(MountPoint, Msg)
WillMsg = make_will_msg(Connect),
PState1 = set_username(Username,
PState#pstate{client_id = ClientId,
PState#pstate{client_id = NewClientId,
proto_ver = ProtoVer,
proto_name = ProtoName,
clean_start = CleanStart,
@ -607,6 +611,19 @@ send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun
{error, Reason}
end.
%%------------------------------------------------------------------------------
%% Maybe use username replace client id
maybe_use_username_as_clientid(ClientId, undefined, _PState) ->
ClientId;
maybe_use_username_as_clientid(ClientId, Username, #pstate{zone = Zone}) ->
case emqx_zone:get_env(Zone, use_username_as_clientid, false) of
true ->
Username;
false ->
ClientId
end.
%%------------------------------------------------------------------------------
%% Assign a clientid

View File

@ -60,22 +60,45 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
with_connection(DoFun) ->
batch_connect(NumberOfConnections) ->
batch_connect([], NumberOfConnections).
batch_connect(Socks, 0) ->
Socks;
batch_connect(Socks, NumberOfConnections) ->
{ok, Sock} = emqx_client_sock:connect({127, 0, 0, 1}, 1883,
[binary, {packet, raw},
{active, false}], 3000),
[binary, {packet, raw}, {active, false}],
3000),
batch_connect([Sock | Socks], NumberOfConnections - 1).
with_connection(DoFun, NumberOfConnections) ->
Socks = batch_connect(NumberOfConnections),
try
DoFun(Sock)
DoFun(Socks)
after
emqx_client_sock:close(Sock)
lists:foreach(fun(Sock) ->
emqx_client_sock:close(Sock)
end, Socks)
end.
with_connection(DoFun) ->
with_connection(DoFun, 1).
% {ok, Sock} = emqx_client_sock:connect({127, 0, 0, 1}, 1883,
% [binary, {packet, raw},
% {active, false}], 3000),
% try
% DoFun(Sock)
% after
% emqx_client_sock:close(Sock)
% end.
connect_v4(_) ->
with_connection(fun(Sock) ->
with_connection(fun([Sock]) ->
emqx_client_sock:send(Sock, raw_send_serialize(?PACKET(?PUBLISH))),
{error, closed} =gen_tcp:recv(Sock, 0)
end),
with_connection(fun(Sock) ->
with_connection(fun([Sock]) ->
ConnectPacket = raw_send_serialize(?CONNECT_PACKET
(#mqtt_packet_connect{
client_id = <<"mqttv4_client">>,
@ -94,7 +117,7 @@ connect_v4(_) ->
connect_v5(_) ->
with_connection(fun(Sock) ->
with_connection(fun([Sock]) ->
emqx_client_sock:send(Sock,
raw_send_serialize(
?CONNECT_PACKET(#mqtt_packet_connect{
@ -105,7 +128,7 @@ connect_v5(_) ->
{ok, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5)
end),
with_connection(fun(Sock) ->
with_connection(fun([Sock]) ->
emqx_client_sock:send(Sock,
raw_send_serialize(
?CONNECT_PACKET(
@ -117,7 +140,7 @@ connect_v5(_) ->
{ok, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5)
end),
with_connection(fun(Sock) ->
with_connection(fun([Sock]) ->
emqx_client_sock:send(Sock,
raw_send_serialize(
?CONNECT_PACKET(
@ -133,7 +156,7 @@ connect_v5(_) ->
end),
% test clean start
with_connection(fun(Sock) ->
with_connection(fun([Sock]) ->
emqx_client_sock:send(Sock,
raw_send_serialize(
?CONNECT_PACKET(
@ -153,7 +176,7 @@ connect_v5(_) ->
timer:sleep(1000),
with_connection(fun(Sock) ->
with_connection(fun([Sock]) ->
emqx_client_sock:send(Sock,
raw_send_serialize(
?CONNECT_PACKET(
@ -169,7 +192,7 @@ connect_v5(_) ->
end),
% test will message publish and cancel
with_connection(fun(Sock) ->
with_connection(fun([Sock]) ->
emqx_client_sock:send(Sock,
raw_send_serialize(
?CONNECT_PACKET(
@ -250,6 +273,59 @@ connect_v5(_) ->
emqx_client_sock:close(Sock2)
end),
% duplicate client id
with_connection(fun([Sock, Sock1]) ->
emqx_zone:set_env(external, use_username_as_clientid, true),
emqx_client_sock:send(Sock,
raw_send_serialize(
?CONNECT_PACKET(
#mqtt_packet_connect{
proto_ver = ?MQTT_PROTO_V5,
clean_start = true,
client_id = <<"myclient">>,
properties =
#{'Session-Expiry-Interval' => 10}})
)),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data, ?MQTT_PROTO_V5),
emqx_client_sock:send(Sock1,
raw_send_serialize(
?CONNECT_PACKET(
#mqtt_packet_connect{
proto_ver = ?MQTT_PROTO_V5,
clean_start = false,
client_id = <<"myclient">>,
username = <<"admin">>,
password = <<"public">>,
properties =
#{'Session-Expiry-Interval' => 10}})
)),
{ok, Data1} = gen_tcp:recv(Sock1, 0),
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data1, ?MQTT_PROTO_V5),
emqx_client_sock:send(Sock, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh => 1,
qos => ?QOS_2,
rap => 0,
nl => 0,
rc => 0}}]),
#{version => ?MQTT_PROTO_V5})),
{ok, SubData} = gen_tcp:recv(Sock, 0),
{ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(SubData, ?MQTT_PROTO_V5),
emqx_client_sock:send(Sock1, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh => 1,
qos => ?QOS_2,
rap => 0,
nl => 0,
rc => 0}}]),
#{version => ?MQTT_PROTO_V5})),
{ok, SubData1} = gen_tcp:recv(Sock1, 0),
{ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(SubData1, ?MQTT_PROTO_V5)
end, 2),
ok.
do_connect(Sock, ProtoVer) ->
@ -263,7 +339,7 @@ do_connect(Sock, ProtoVer) ->
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_parse(Data, ProtoVer).
subscribe_v4(_) ->
with_connection(fun(Sock) ->
with_connection(fun([Sock]) ->
do_connect(Sock, ?MQTT_PROTO_V4),
SubPacket = raw_send_serialize(
?SUBSCRIBE_PACKET(15,
@ -279,7 +355,7 @@ subscribe_v4(_) ->
ok.
subscribe_v5(_) ->
with_connection(fun(Sock) ->
with_connection(fun([Sock]) ->
do_connect(Sock, ?MQTT_PROTO_V5),
SubPacket = raw_send_serialize(?SUBSCRIBE_PACKET(15, #{'Subscription-Identifier' => -1},[]),
#{version => ?MQTT_PROTO_V5}),
@ -288,7 +364,7 @@ subscribe_v5(_) ->
{ok, ?DISCONNECT_PACKET(?RC_TOPIC_FILTER_INVALID), _} =
raw_recv_parse(DisConnData, ?MQTT_PROTO_V5)
end),
with_connection(fun(Sock) ->
with_connection(fun([Sock]) ->
do_connect(Sock, ?MQTT_PROTO_V5),
SubPacket = raw_send_serialize(
?SUBSCRIBE_PACKET(0, #{}, [{<<"TopicQos0">>,
@ -301,7 +377,7 @@ subscribe_v5(_) ->
{ok, ?DISCONNECT_PACKET(?RC_MALFORMED_PACKET), _} =
raw_recv_parse(DisConnData, ?MQTT_PROTO_V5)
end),
with_connection(fun(Sock) ->
with_connection(fun([Sock]) ->
do_connect(Sock, ?MQTT_PROTO_V5),
SubPacket = raw_send_serialize(
?SUBSCRIBE_PACKET(1, #{'Subscription-Identifier' => 0},
@ -315,7 +391,7 @@ subscribe_v5(_) ->
{ok, ?DISCONNECT_PACKET(?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED), _} =
raw_recv_parse(DisConnData, ?MQTT_PROTO_V5)
end),
with_connection(fun(Sock) ->
with_connection(fun([Sock]) ->
do_connect(Sock, ?MQTT_PROTO_V5),
SubPacket = raw_send_serialize(
?SUBSCRIBE_PACKET(1, #{'Subscription-Identifier' => 1},