Make codes compliance with mqtt protocol specifications (#2790)

* Make codes compliance with mqtt protocol specifications

* Fix test case

* Remove emqx_protocol:puback/4
This commit is contained in:
tigercl 2019-08-17 09:15:43 +08:00 committed by turtleDeng
parent 2e26cd244a
commit c1fd5f89f1
8 changed files with 65 additions and 87 deletions

View File

@ -267,9 +267,9 @@ connected(enter, _PrevSt, State = #state{proto_state = ProtoState}) ->
shutdown(Reason, NState) shutdown(Reason, NState)
end; end;
connected(cast, {incoming, Packet = ?PACKET(?CONNECT)}, State) -> connected(cast, {incoming, ?PACKET(?CONNECT)}, State) ->
?LOG(warning, "Unexpected connect: ~p", [Packet]), Shutdown = fun(NewSt) -> shutdown(?RC_PROTOCOL_ERROR, NewSt) end,
shutdown(unexpected_incoming_connect, State); handle_outgoing(?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), Shutdown, State);
connected(cast, {incoming, Packet}, State) when is_record(Packet, mqtt_packet) -> connected(cast, {incoming, Packet}, State) when is_record(Packet, mqtt_packet) ->
handle_incoming(Packet, fun keep_state/1, State); handle_incoming(Packet, fun keep_state/1, State);

View File

@ -122,6 +122,8 @@ critical(Metadata, Format, Args) when is_map(Metadata) ->
logger:critical(Format, Args, Metadata). logger:critical(Format, Args, Metadata).
-spec(set_metadata_client_id(emqx_types:client_id()) -> ok). -spec(set_metadata_client_id(emqx_types:client_id()) -> ok).
set_metadata_client_id(<<>>) ->
ok;
set_metadata_client_id(ClientId) -> set_metadata_client_id(ClientId) ->
set_proc_metadata(#{client_id => ClientId}). set_proc_metadata(#{client_id => ClientId}).

View File

@ -45,25 +45,21 @@ on_client_connected(#{client_id := ClientId,
username := Username, username := Username,
peername := {IpAddr, _} peername := {IpAddr, _}
}, ConnAck, }, ConnAck,
#{session := #{clean_start := CleanStart, #{session := Session,
expiry_interval := Interval
},
proto_name := ProtoName, proto_name := ProtoName,
proto_ver := ProtoVer, proto_ver := ProtoVer,
keepalive := Keepalive keepalive := Keepalive
}, Env) -> }, Env) ->
case emqx_json:safe_encode(#{clientid => ClientId, case emqx_json:safe_encode(maps:merge(#{clientid => ClientId,
username => Username, username => Username,
ipaddress => iolist_to_binary(esockd_net:ntoa(IpAddr)), ipaddress => iolist_to_binary(esockd_net:ntoa(IpAddr)),
proto_name => ProtoName, proto_name => ProtoName,
proto_ver => ProtoVer, proto_ver => ProtoVer,
keepalive => Keepalive, keepalive => Keepalive,
clean_start => CleanStart, connack => ConnAck,
expiry_interval => Interval, ts => erlang:system_time(millisecond)
connack => ConnAck, }, maps:with([clean_start, expiry_interval], Session))) of
ts => erlang:system_time(millisecond)
}) of
{ok, Payload} -> {ok, Payload} ->
emqx:publish(message(qos(Env), topic(connected, ClientId), Payload)); emqx:publish(message(qos(Env), topic(connected, ClientId), Payload));
{error, Reason} -> {error, Reason} ->

View File

@ -79,10 +79,6 @@ do_check_pub(#{qos := QoS}, #{max_qos_allowed := MaxQoS})
{error, ?RC_QOS_NOT_SUPPORTED}; {error, ?RC_QOS_NOT_SUPPORTED};
do_check_pub(#{retain := true}, #{retain_available := false}) -> do_check_pub(#{retain := true}, #{retain_available := false}) ->
{error, ?RC_RETAIN_NOT_SUPPORTED}; {error, ?RC_RETAIN_NOT_SUPPORTED};
do_check_pub(#{topic_alias := TopicAlias},
#{max_topic_alias := MaxTopicAlias})
when 0 == TopicAlias; TopicAlias >= MaxTopicAlias ->
{error, ?RC_TOPIC_ALIAS_INVALID};
do_check_pub(_Flags, _Caps) -> ok. do_check_pub(_Flags, _Caps) -> ok.
-spec(check_sub(emqx_types:zone(), -spec(check_sub(emqx_types:zone(),

View File

@ -82,8 +82,7 @@ validate_packet_id(_) ->
validate_properties(?SUBSCRIBE, #{'Subscription-Identifier' := I}) validate_properties(?SUBSCRIBE, #{'Subscription-Identifier' := I})
when I =< 0; I >= 16#FFFFFFF -> when I =< 0; I >= 16#FFFFFFF ->
error(subscription_identifier_invalid); error(subscription_identifier_invalid);
validate_properties(?PUBLISH, #{'Topic-Alias':= I}) validate_properties(?PUBLISH, #{'Topic-Alias':= 0}) ->
when I =:= 0 ->
error(topic_alias_invalid); error(topic_alias_invalid);
validate_properties(?PUBLISH, #{'Subscription-Identifier' := _I}) -> validate_properties(?PUBLISH, #{'Subscription-Identifier' := _I}) ->
error(protocol_error); error(protocol_error);

View File

@ -137,6 +137,7 @@ init(ConnInfo, Options) ->
MountPoint = emqx_zone:get_env(Zone, mountpoint), MountPoint = emqx_zone:get_env(Zone, mountpoint),
Client = maps:merge(#{zone => Zone, Client = maps:merge(#{zone => Zone,
username => Username, username => Username,
client_id => <<>>,
mountpoint => MountPoint, mountpoint => MountPoint,
is_bridge => false, is_bridge => false,
is_superuser => false is_superuser => false
@ -175,13 +176,14 @@ handle_in(?CONNECT_PACKET(
fun check_connect/2, fun check_connect/2,
fun enrich_client/2, fun enrich_client/2,
fun auth_connect/2], ConnPkt, PState1) of fun auth_connect/2], ConnPkt, PState1) of
{ok, NConnPkt, NPState} -> {ok, NConnPkt, NPState = #protocol{client = #{client_id := ClientId1}}} ->
process_connect(NConnPkt, maybe_assign_clientid(NPState)); ok = emqx_logger:set_metadata_client_id(ClientId1),
process_connect(NConnPkt, NPState);
{error, ReasonCode, NPState} -> {error, ReasonCode, NPState} ->
handle_out({disconnect, ReasonCode}, NPState) handle_out({connack, ReasonCode}, NPState)
end; end;
handle_in(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), PState= #protocol{proto_ver = Ver}) -> handle_in(Packet = ?PUBLISH_PACKET(_QoS, Topic, _PacketId), PState= #protocol{proto_ver = Ver}) ->
case pipeline([fun validate_in/2, case pipeline([fun validate_in/2,
fun process_alias/2, fun process_alias/2,
fun check_publish/2], Packet, PState) of fun check_publish/2], Packet, PState) of
@ -190,7 +192,7 @@ handle_in(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), PState= #protocol{prot
{error, ReasonCode, NPState} -> {error, ReasonCode, NPState} ->
?LOG(warning, "Cannot publish message to ~s due to ~s", ?LOG(warning, "Cannot publish message to ~s due to ~s",
[Topic, emqx_reason_codes:text(ReasonCode, Ver)]), [Topic, emqx_reason_codes:text(ReasonCode, Ver)]),
puback(QoS, PacketId, ReasonCode, NPState) handle_out({disconnect, ReasonCode}, NPState)
end; end;
handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), PState = #protocol{session = Session}) -> handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), PState = #protocol{session = Session}) ->
@ -380,10 +382,6 @@ handle_out({publish, PacketId, Msg}, PState = #protocol{client = Client}) ->
Packet = emqx_packet:from_message(PacketId, unmount(Client, Msg1)), Packet = emqx_packet:from_message(PacketId, unmount(Client, Msg1)),
{ok, Packet, PState}; {ok, Packet, PState};
%% TODO: How to handle the err?
handle_out({puberr, _ReasonCode}, PState) ->
{ok, PState};
handle_out({puback, PacketId, ReasonCode}, PState) -> handle_out({puback, PacketId, ReasonCode}, PState) ->
{ok, ?PUBACK_PACKET(PacketId, ReasonCode), PState}; {ok, ?PUBACK_PACKET(PacketId, ReasonCode), PState};
@ -500,7 +498,7 @@ check_connect(ConnPkt, PState) ->
fun check_banned/2, fun check_banned/2,
fun check_will_topic/2, fun check_will_topic/2,
fun check_will_retain/2], ConnPkt, PState) of fun check_will_retain/2], ConnPkt, PState) of
ok -> {ok, PState}; {ok, NConnPkt, NPState} -> {ok, NConnPkt, NPState};
Error -> Error Error -> Error
end. end.
@ -508,7 +506,7 @@ check_proto_ver(#mqtt_packet_connect{proto_ver = Ver,
proto_name = Name}, _PState) -> proto_name = Name}, _PState) ->
case lists:member({Ver, Name}, ?PROTOCOL_NAMES) of case lists:member({Ver, Name}, ?PROTOCOL_NAMES) of
true -> ok; true -> ok;
false -> {error, ?RC_PROTOCOL_ERROR} false -> {error, ?RC_UNSUPPORTED_PROTOCOL_VERSION}
end. end.
%% MQTT3.1 does not allow null clientId %% MQTT3.1 does not allow null clientId
@ -571,29 +569,43 @@ check_will_retain(#mqtt_packet_connect{will_retain = true},
%% Enrich client %% Enrich client
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
enrich_client(#mqtt_packet_connect{client_id = ClientId, enrich_client(ConnPkt, PState) ->
username = Username, case pipeline([fun set_username/2,
is_bridge = IsBridge fun maybe_use_username_as_clientid/2,
}, fun maybe_assign_clientid/2,
PState = #protocol{client = Client}) -> fun set_rest_client_fields/2], ConnPkt, PState) of
Client1 = set_username(Username, Client#{client_id => ClientId, {ok, NConnPkt, NPState} -> {ok, NConnPkt, NPState};
is_bridge => IsBridge Error -> Error
}), end.
{ok, PState#protocol{client = maybe_username_as_clientid(Client1)}}.
maybe_use_username_as_clientid(_ConnPkt, PState = #protocol{client = #{username := undefined}}) ->
{ok, PState};
maybe_use_username_as_clientid(_ConnPkt, PState = #protocol{client = Client = #{zone := Zone,
username := Username}}) ->
NClient =
case emqx_zone:get_env(Zone, use_username_as_clientid, false) of
true -> Client#{client_id => Username};
false -> Client
end,
{ok, PState#protocol{client = NClient}}.
maybe_assign_clientid(#mqtt_packet_connect{client_id = <<>>},
PState = #protocol{client = Client, ack_props = AckProps}) ->
ClientId = emqx_guid:to_base62(emqx_guid:gen()),
AckProps1 = set_property('Assigned-Client-Identifier', ClientId, AckProps),
{ok, PState#protocol{client = Client#{client_id => ClientId}, ack_props = AckProps1}};
maybe_assign_clientid(#mqtt_packet_connect{client_id = ClientId}, PState = #protocol{client = Client}) ->
{ok, PState#protocol{client = Client#{client_id => ClientId}}}.
%% Username maybe not undefined if peer_cert_as_username %% Username maybe not undefined if peer_cert_as_username
set_username(Username, Client = #{username := undefined}) -> set_username(#mqtt_packet_connect{username = Username},
Client#{username => Username}; PState = #protocol{client = Client = #{username := undefined}}) ->
set_username(_Username, Client) -> Client. {ok, PState#protocol{client = Client#{username => Username}}};
set_username(_ConnPkt, PState) ->
{ok, PState}.
maybe_username_as_clientid(Client = #{username := undefined}) -> set_rest_client_fields(#mqtt_packet_connect{is_bridge = IsBridge}, PState = #protocol{client = Client}) ->
Client; {ok, PState#protocol{client = Client#{is_bridge => IsBridge}}}.
maybe_username_as_clientid(Client = #{zone := Zone,
username := Username}) ->
case emqx_zone:get_env(Zone, use_username_as_clientid, false) of
true -> Client#{client_id => Username};
false -> Client
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Auth Connect %% Auth Connect
@ -612,18 +624,6 @@ auth_connect(#mqtt_packet_connect{client_id = ClientId,
{error, emqx_reason_codes:connack_error(Reason)} {error, emqx_reason_codes:connack_error(Reason)}
end. end.
%%--------------------------------------------------------------------
%% Assign a random clientId
%%--------------------------------------------------------------------
maybe_assign_clientid(PState = #protocol{client = Client = #{client_id := <<>>},
ack_props = AckProps}) ->
ClientId = emqx_guid:to_base62(emqx_guid:gen()),
Client1 = Client#{client_id => ClientId},
AckProps1 = set_property('Assigned-Client-Identifier', ClientId, AckProps),
PState#protocol{client = Client1, ack_props = AckProps1};
maybe_assign_clientid(PState) -> PState.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Process Connect %% Process Connect
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -671,7 +671,7 @@ process_alias(Packet = #mqtt_packet{
{ok, Packet#mqtt_packet{ {ok, Packet#mqtt_packet{
variable = Publish#mqtt_packet_publish{ variable = Publish#mqtt_packet_publish{
topic_name = Topic}}, PState}; topic_name = Topic}}, PState};
false -> {error, ?RC_TOPIC_ALIAS_INVALID} false -> {error, ?RC_PROTOCOL_ERROR}
end; end;
process_alias(#mqtt_packet{ process_alias(#mqtt_packet{
@ -760,17 +760,6 @@ process_publish(PacketId, Msg = #message{qos = ?QOS_2},
handle_out({pubrec, PacketId, ReasonCode}, PState) handle_out({pubrec, PacketId, ReasonCode}, PState)
end. end.
%%--------------------------------------------------------------------
%% Puback
%%--------------------------------------------------------------------
puback(?QOS_0, _PacketId, ReasonCode, PState) ->
handle_out({puberr, ReasonCode}, PState);
puback(?QOS_1, PacketId, ReasonCode, PState) ->
handle_out({puback, PacketId, ReasonCode}, PState);
puback(?QOS_2, PacketId, ReasonCode, PState) ->
handle_out({pubrec, PacketId, ReasonCode}, PState).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Process subscribe request %% Process subscribe request
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -248,6 +248,8 @@ info(created_at, #session{created_at = CreatedAt}) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec(attrs(session()) -> emqx_types:attrs()). -spec(attrs(session()) -> emqx_types:attrs()).
attrs(undefined) ->
#{};
attrs(#session{clean_start = CleanStart, attrs(#session{clean_start = CleanStart,
expiry_interval = ExpiryInterval, expiry_interval = ExpiryInterval,
created_at = CreatedAt}) -> created_at = CreatedAt}) ->

View File

@ -26,23 +26,17 @@ all() -> emqx_ct:all(?MODULE).
t_check_pub(_) -> t_check_pub(_) ->
PubCaps = #{max_qos_allowed => ?QOS_1, PubCaps = #{max_qos_allowed => ?QOS_1,
retain_available => false, retain_available => false
max_topic_alias => 4
}, },
ok = emqx_zone:set_env(zone, '$mqtt_pub_caps', PubCaps), ok = emqx_zone:set_env(zone, '$mqtt_pub_caps', PubCaps),
ok = emqx_mqtt_caps:check_pub(zone, #{qos => ?QOS_1, ok = emqx_mqtt_caps:check_pub(zone, #{qos => ?QOS_1,
retain => false, retain => false}),
topic_alias => 1
}),
PubFlags1 = #{qos => ?QOS_2, retain => false}, PubFlags1 = #{qos => ?QOS_2, retain => false},
?assertEqual({error, ?RC_QOS_NOT_SUPPORTED}, ?assertEqual({error, ?RC_QOS_NOT_SUPPORTED},
emqx_mqtt_caps:check_pub(zone, PubFlags1)), emqx_mqtt_caps:check_pub(zone, PubFlags1)),
PubFlags2 = #{qos => ?QOS_1, retain => true}, PubFlags2 = #{qos => ?QOS_1, retain => true},
?assertEqual({error, ?RC_RETAIN_NOT_SUPPORTED}, ?assertEqual({error, ?RC_RETAIN_NOT_SUPPORTED},
emqx_mqtt_caps:check_pub(zone, PubFlags2)), emqx_mqtt_caps:check_pub(zone, PubFlags2)),
PubFlags3 = #{qos => ?QOS_1, retain => false, topic_alias => 5},
?assertEqual({error, ?RC_TOPIC_ALIAS_INVALID},
emqx_mqtt_caps:check_pub(zone, PubFlags3)),
true = emqx_zone:unset_env(zone, '$mqtt_pub_caps'). true = emqx_zone:unset_env(zone, '$mqtt_pub_caps').
t_check_sub(_) -> t_check_sub(_) ->