From 4764a7707c7ec553df840b72109d4de5526b9e90 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Mon, 16 Sep 2019 14:17:36 +0800 Subject: [PATCH] Rewrite the emqx_packet module and improve channel pipeline (#2903) Add use_username_as_clientid/1 function and Improve function 'pipeline/3' --- src/emqx_channel.erl | 191 +++++++---------------- src/emqx_connection.erl | 2 +- src/emqx_message.erl | 32 +++- src/emqx_misc.erl | 19 +-- src/emqx_packet.erl | 298 +++++++++++++++++++++++------------- src/emqx_ws_connection.erl | 2 +- src/emqx_zone.erl | 7 +- test/emqx_message_SUITE.erl | 13 ++ test/emqx_packet_SUITE.erl | 208 ++++++++++++++++--------- 9 files changed, 439 insertions(+), 333 deletions(-) diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 4c918253b..1e780eb08 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -50,8 +50,7 @@ ]). -import(emqx_misc, - [ run_fold/2 - , run_fold/3 + [ run_fold/3 , pipeline/3 , maybe_apply/2 ]). @@ -106,7 +105,7 @@ %% Init the channel %%-------------------------------------------------------------------- --spec(init(emqx_types:conn(), proplists:proplist()) -> channel()). +-spec(init(emqx_types:conninfo(), proplists:proplist()) -> channel()). init(ConnInfo, Options) -> Zone = proplists:get_value(zone, Options), Peercert = maps:get(peercert, ConnInfo, undefined), @@ -216,8 +215,7 @@ handle_in(?CONNECT_PACKET(_), Channel = #channel{connected = true}) -> handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel); handle_in(?CONNECT_PACKET(ConnPkt), Channel) -> - case pipeline([fun validate_packet/2, - fun check_connect/2, + case pipeline([fun check_connpkt/2, fun init_protocol/2, fun enrich_client/2, fun set_logger_meta/2, @@ -232,7 +230,7 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) -> handle_in(Packet = ?PUBLISH_PACKET(_QoS, Topic, _PacketId), Channel = #channel{protocol = Protocol}) -> - case pipeline([fun validate_packet/2, + case pipeline([fun emqx_packet:check/1, fun process_alias/2, fun check_publish/2], Packet, Channel) of {ok, NPacket, NChannel} -> @@ -302,22 +300,27 @@ handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = S {ok, Channel} end; -handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), Channel) -> - case validate_packet(Packet, Channel) of - ok -> - TopicFilters = preprocess_subscribe(Properties, RawTopicFilters, Channel), - {ReasonCodes, NChannel} = process_subscribe(TopicFilters, Channel), - handle_out({suback, PacketId, ReasonCodes}, NChannel); +handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), + Channel = #channel{client = Client}) -> + case emqx_packet:check(Packet) of + ok -> TopicFilters1 = emqx_hooks:run_fold('client.subscribe', + [Client, Properties], + parse_topic_filters(TopicFilters)), + TopicFilters2 = enrich_subid(Properties, TopicFilters1), + {ReasonCodes, NChannel} = process_subscribe(TopicFilters2, Channel), + handle_out({suback, PacketId, ReasonCodes}, NChannel); {error, ReasonCode} -> handle_out({disconnect, ReasonCode}, Channel) end; -handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), Channel) -> - case validate_packet(Packet, Channel) of - ok -> - TopicFilters = preprocess_unsubscribe(Properties, RawTopicFilters, Channel), - {ReasonCodes, NChannel} = process_unsubscribe(TopicFilters, Channel), - handle_out({unsuback, PacketId, ReasonCodes}, NChannel); +handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), + Channel = #channel{client = Client}) -> + case emqx_packet:check(Packet) of + ok -> TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe', + [Client, Properties], + parse_topic_filters(TopicFilters)), + {ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel), + handle_out({unsuback, PacketId, ReasonCodes}, NChannel); {error, ReasonCode} -> handle_out({disconnect, ReasonCode}, Channel) end; @@ -421,24 +424,18 @@ process_publish(PacketId, Msg = #message{qos = ?QOS_2}, handle_out({pubrec, PacketId, RC}, Channel) end. -publish_to_msg(Packet, #channel{client = Client = #{mountpoint := MountPoint}}) -> +publish_to_msg(Packet, #channel{client = Client = #{mountpoint := MountPoint}, + protocol = Protocol}) -> Msg = emqx_packet:to_message(Client, Packet), Msg1 = emqx_message:set_flag(dup, false, Msg), - emqx_mountpoint:mount(MountPoint, Msg1). + ProtoVer = emqx_protocol:info(proto_ver, Protocol), + Msg2 = emqx_message:set_header(proto_ver, ProtoVer, Msg1), + emqx_mountpoint:mount(MountPoint, Msg2). %%-------------------------------------------------------------------- %% Process Subscribe %%-------------------------------------------------------------------- --compile({inline, [preprocess_subscribe/3]}). -preprocess_subscribe(Properties, RawTopicFilters, #channel{client = Client}) -> - RunHook = fun(TopicFilters) -> - emqx_hooks:run_fold('client.subscribe', - [Client, Properties], TopicFilters) - end, - Enrich = fun(TopicFilters) -> enrich_subid(Properties, TopicFilters) end, - run_fold([fun parse_topic_filters/1, RunHook, Enrich], RawTopicFilters). - process_subscribe(TopicFilters, Channel) -> process_subscribe(TopicFilters, [], Channel). @@ -468,14 +465,6 @@ do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel = %% Process Unsubscribe %%-------------------------------------------------------------------- --compile({inline, [preprocess_unsubscribe/3]}). -preprocess_unsubscribe(Properties, RawTopicFilter, #channel{client = Client}) -> - RunHook = fun(TopicFilters) -> - emqx_hooks:run_fold('client.unsubscribe', - [Client, Properties], TopicFilters) - end, - run_fold([fun parse_topic_filters/1, RunHook], RawTopicFilter). - -compile({inline, [process_unsubscribe/2]}). process_unsubscribe(TopicFilters, Channel) -> process_unsubscribe(TopicFilters, [], Channel). @@ -578,7 +567,7 @@ handle_out({publish, PacketId, Msg}, Channel = Msg1 = emqx_message:update_expiry(Msg), Msg2 = emqx_hooks:run_fold('message.delivered', [Client], Msg1), Msg3 = emqx_mountpoint:unmount(MountPoint, Msg2), - {ok, emqx_packet:from_message(PacketId, Msg3), Channel}; + {ok, emqx_message:to_packet(PacketId, Msg3), Channel}; handle_out({puback, PacketId, ReasonCode}, Channel) -> {ok, ?PUBACK_PACKET(PacketId, ReasonCode), Channel}; @@ -671,16 +660,18 @@ handle_cast(Msg, Channel) -> -spec(handle_info(Info :: term(), channel()) -> {ok, channel()} | {stop, Reason :: term(), channel()}). -handle_info({subscribe, RawTopicFilters}, Channel) -> - TopicFilters = preprocess_subscribe(#{'Internal' => true}, - RawTopicFilters, Channel), - {_ReasonCodes, NChannel} = process_subscribe(TopicFilters, Channel), +handle_info({subscribe, TopicFilters}, Channel = #channel{client = Client}) -> + TopicFilters1 = emqx_hooks:run_fold('client.subscribe', + [Client, #{'Internal' => true}], + parse_topic_filters(TopicFilters)), + {_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel), {ok, NChannel}; -handle_info({unsubscribe, RawTopicFilters}, Channel) -> - TopicFilters = preprocess_unsubscribe(#{'Internal' => true}, - RawTopicFilters, Channel), - {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters, Channel), +handle_info({unsubscribe, TopicFilters}, Channel = #channel{client = Client}) -> + TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe', + [Client, #{'Internal' => true}], + parse_topic_filters(TopicFilters)), + {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel), {ok, NChannel}; handle_info(disconnected, Channel = #channel{connected = undefined}) -> @@ -833,125 +824,51 @@ received(Oct, Channel) -> sent(Oct, Channel) -> ensure_timer(stats_timer, maybe_gc_and_check_oom(Oct, Channel)). -%%TODO: Improve will msg:) +%% TODO: Improve will msg:) publish_will_msg(undefined) -> ok; publish_will_msg(Msg) -> emqx_broker:publish(Msg). -%% @doc Validate incoming packet. --spec(validate_packet(emqx_types:packet(), channel()) - -> ok | {error, emqx_types:reason_code()}). -validate_packet(Packet, _Channel) -> - try emqx_packet:validate(Packet) of - true -> ok - catch - error:protocol_error -> - {error, ?RC_PROTOCOL_ERROR}; - error:subscription_identifier_invalid -> - {error, ?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED}; - error:topic_alias_invalid -> - {error, ?RC_TOPIC_ALIAS_INVALID}; - error:topic_filters_invalid -> - {error, ?RC_TOPIC_FILTER_INVALID}; - error:topic_name_invalid -> - {error, ?RC_TOPIC_NAME_INVALID}; - error:_Reason -> - {error, ?RC_MALFORMED_PACKET} - end. - -%%-------------------------------------------------------------------- -%% Check connect packet -%%-------------------------------------------------------------------- - -check_connect(ConnPkt, Channel) -> - pipeline([fun check_proto_ver/2, - fun check_client_id/2, - fun check_will_topic/2, - fun check_will_retain/2], ConnPkt, Channel). - -check_proto_ver(#mqtt_packet_connect{proto_ver = Ver, - proto_name = Name}, _Channel) -> - case lists:member({Ver, Name}, ?PROTOCOL_NAMES) of - true -> ok; - false -> {error, ?RC_UNSUPPORTED_PROTOCOL_VERSION} - end. - -%% MQTT3.1 does not allow null clientId -check_client_id(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V3, - client_id = <<>> - }, _Channel) -> - {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}; - -%% Issue#599: Null clientId and clean_start = false -check_client_id(#mqtt_packet_connect{client_id = <<>>, - clean_start = false}, _Channel) -> - {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}; - -check_client_id(#mqtt_packet_connect{client_id = <<>>, - clean_start = true}, _Channel) -> - ok; - -check_client_id(#mqtt_packet_connect{client_id = ClientId}, - #channel{client = #{zone := Zone}}) -> - Len = byte_size(ClientId), - MaxLen = emqx_zone:get_env(Zone, max_clientid_len), - case (1 =< Len) andalso (Len =< MaxLen) of - true -> ok; - false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID} - end. - -check_will_topic(#mqtt_packet_connect{will_flag = false}, _Channel) -> - ok; -check_will_topic(#mqtt_packet_connect{will_topic = WillTopic}, _Channel) -> - try emqx_topic:validate(WillTopic) of - true -> ok - catch error:_Error -> - {error, ?RC_TOPIC_NAME_INVALID} - end. - -check_will_retain(#mqtt_packet_connect{will_retain = false}, _Channel) -> - ok; -check_will_retain(#mqtt_packet_connect{will_retain = true}, - #channel{client = #{zone := Zone}}) -> - case emqx_zone:get_env(Zone, mqtt_retain_available, true) of - true -> ok; - false -> {error, ?RC_RETAIN_NOT_SUPPORTED} - end. +%% @doc Check connect packet. +check_connpkt(ConnPkt, #channel{client = #{zone := Zone}}) -> + emqx_packet:check(ConnPkt, emqx_mqtt_caps:get_caps(Zone)). +%% @doc Init protocol record. init_protocol(ConnPkt, Channel = #channel{client = #{zone := Zone}}) -> {ok, Channel#channel{protocol = emqx_protocol:init(ConnPkt, Zone)}}. -%%-------------------------------------------------------------------- -%% Enrich client -%%-------------------------------------------------------------------- - -enrich_client(ConnPkt = #mqtt_packet_connect{is_bridge = IsBridge}, - Channel = #channel{client = Client}) -> +%% @doc Enrich client +enrich_client(ConnPkt, Channel = #channel{client = Client}) -> {ok, NConnPkt, NClient} = pipeline([fun set_username/2, + fun set_bridge_mode/2, fun maybe_username_as_clientid/2, fun maybe_assign_clientid/2, fun fix_mountpoint/2 ], ConnPkt, Client), - {ok, NConnPkt, Channel#channel{client = NClient#{is_bridge => IsBridge}}}. + {ok, NConnPkt, Channel#channel{client = NClient}}. -%% Username may be not undefined if peer_cert_as_username set_username(#mqtt_packet_connect{username = Username}, Client = #{username := undefined}) -> {ok, Client#{username => Username}}; set_username(_ConnPkt, Client) -> {ok, Client}. +set_bridge_mode(#mqtt_packet_connect{is_bridge = true}, Client) -> + {ok, Client#{is_bridge => true}}; +set_bridge_mode(_ConnPkt, _Client) -> ok. + maybe_username_as_clientid(_ConnPkt, Client = #{username := undefined}) -> {ok, Client}; maybe_username_as_clientid(_ConnPkt, Client = #{zone := Zone, username := Username}) -> - case emqx_zone:get_env(Zone, use_username_as_clientid, false) of + case emqx_zone:use_username_as_clientid(Zone) of true -> {ok, Client#{client_id => Username}}; false -> ok end. maybe_assign_clientid(#mqtt_packet_connect{client_id = <<>>}, Client) -> - RandClientId = emqx_guid:to_base62(emqx_guid:gen()), - {ok, Client#{client_id => RandClientId}}; + %% Generate a rand clientId + RandId = emqx_guid:to_base62(emqx_guid:gen()), + {ok, Client#{client_id => RandId}}; maybe_assign_clientid(#mqtt_packet_connect{client_id = ClientId}, Client) -> {ok, Client#{client_id => ClientId}}. diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 47acdb8fe..302a1b4c5 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -104,7 +104,7 @@ info(CPid) when is_pid(CPid) -> info(Conn = #connection{chan_state = ChanState}) -> ConnInfo = info(?INFO_KEYS, Conn), ChanInfo = emqx_channel:info(ChanState), - maps:merge(ChanInfo, #{connection => maps:from_list(ConnInfo)}). + maps:merge(ChanInfo, #{conninfo => maps:from_list(ConnInfo)}). info(Keys, Conn) when is_list(Keys) -> [{Key, info(Key, Conn)} || Key <- Keys]; diff --git a/src/emqx_message.erl b/src/emqx_message.erl index d0059cde0..6fbd3bbf5 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -57,7 +57,8 @@ , update_expiry/1 ]). --export([ to_map/1 +-export([ to_packet/2 + , to_map/1 , to_list/1 ]). @@ -188,6 +189,34 @@ update_expiry(Msg = #message{headers = #{'Message-Expiry-Interval' := Interval}, end; update_expiry(Msg) -> Msg. +%% @doc Message to PUBLISH Packet. +-spec(to_packet(emqx_types:packet_id(), emqx_types:message()) + -> emqx_types:packet()). +to_packet(PacketId, #message{qos = QoS, flags = Flags, headers = Headers, + topic = Topic, payload = Payload}) -> + Flags1 = if Flags =:= undefined -> #{}; + true -> Flags + end, + Dup = maps:get(dup, Flags1, false), + Retain = maps:get(retain, Flags1, false), + Publish = #mqtt_packet_publish{topic_name = Topic, + packet_id = PacketId, + properties = publish_props(Headers)}, + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, + dup = Dup, + qos = QoS, + retain = Retain}, + variable = Publish, payload = Payload}. + +publish_props(Headers) -> + maps:with(['Payload-Format-Indicator', + 'Response-Topic', + 'Correlation-Data', + 'User-Property', + 'Subscription-Identifier', + 'Content-Type', + 'Message-Expiry-Interval'], Headers). + %% @doc Message to map -spec(to_map(emqx_types:message()) -> map()). to_map(#message{ @@ -228,3 +257,4 @@ format(flags, Flags) -> io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]); format(headers, Headers) -> io_lib:format("~p", [Headers]). + diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index 193fc9243..34717c804 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -20,7 +20,6 @@ -export([ merge_opts/2 , maybe_apply/2 - , run_fold/2 , run_fold/3 , pipeline/3 , start_timer/2 @@ -60,11 +59,6 @@ maybe_apply(_Fun, undefined) -> maybe_apply(Fun, Arg) when is_function(Fun) -> erlang:apply(Fun, [Arg]). -run_fold([], Acc) -> - Acc; -run_fold([Fun|More], Acc) -> - run_fold(More, Fun(Acc)). - %% @doc RunFold run_fold([], Acc, _State) -> Acc; @@ -76,18 +70,25 @@ pipeline([], Input, State) -> {ok, Input, State}; pipeline([Fun|More], Input, State) -> - case Fun(Input, State) of + case apply_fun(Fun, Input, State) of ok -> pipeline(More, Input, State); {ok, NState} -> pipeline(More, Input, NState); - {ok, NInput, NState} -> - pipeline(More, NInput, NState); + {ok, Output, NState} -> + pipeline(More, Output, NState); {error, Reason} -> {error, Reason, State}; {error, Reason, NState} -> {error, Reason, NState} end. +-compile({inline, [apply_fun/3]}). +apply_fun(Fun, Input, State) -> + case erlang:fun_info(Fun, arity) of + {arity, 1} -> Fun(Input); + {arity, 2} -> Fun(Input, State) + end. + -spec(start_timer(integer(), term()) -> reference()). start_timer(Interval, Msg) -> start_timer(Interval, self(), Msg). diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index a6876c1ef..b06f526af 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -19,143 +19,224 @@ -include("emqx.hrl"). -include("emqx_mqtt.hrl"). +%% Header APIs -export([ type/1 + , type_name/1 + , dup/1 , qos/1 + , retain/1 ]). -export([ proto_name/1 - , type_name/1 - , validate/1 - , format/1 - , to_message/2 - , from_message/2 + , proto_ver/1 + ]). + +%% Check API +-export([ check/1 + , check/2 + ]). + +-export([ to_message/2 , will_msg/1 ]). --compile(inline). +-export([format/1]). +-type(connect() :: #mqtt_packet_connect{}). +-type(publish() :: #mqtt_packet_publish{}). +-type(subscribe() :: #mqtt_packet_subscribe{}). +-type(unsubscribe() :: #mqtt_packet_unsubscribe{}). + +%%-------------------------------------------------------------------- +%% MQTT Packet Type and Flags. +%%-------------------------------------------------------------------- + +%% @doc MQTT packet type. +-spec(type(emqx_types:packet()) -> emqx_types:packet_type()). type(#mqtt_packet{header = #mqtt_packet_header{type = Type}}) -> Type. +%% @doc Name of MQTT packet type. +-spec(type_name(emqx_types:packet()) -> atom()). +type_name(Packet) when is_record(Packet, mqtt_packet) -> + lists:nth(type(Packet), ?TYPE_NAMES). + +%% @doc Dup flag of MQTT packet. +-spec(dup(emqx_types:packet()) -> boolean()). +dup(#mqtt_packet{header = #mqtt_packet_header{dup = Dup}}) -> + Dup. + +%% @doc QoS of MQTT packet type. +-spec(qos(emqx_types:packet()) -> emqx_types:qos()). qos(#mqtt_packet{header = #mqtt_packet_header{qos = QoS}}) -> QoS. -%% @doc Protocol name of the version. --spec(proto_name(emqx_types:version()) -> binary()). -proto_name(?MQTT_PROTO_V3) -> - <<"MQIsdp">>; -proto_name(?MQTT_PROTO_V4) -> - <<"MQTT">>; -proto_name(?MQTT_PROTO_V5) -> - <<"MQTT">>. - -%% @doc Name of MQTT packet type. --spec(type_name(emqx_types:packet_type()) -> atom()). -type_name(Type) when ?RESERVED < Type, Type =< ?AUTH -> - lists:nth(Type, ?TYPE_NAMES). +%% @doc Retain flag of MQTT packet. +-spec(retain(emqx_types:packet()) -> boolean()). +retain(#mqtt_packet{header = #mqtt_packet_header{retain = Retain}}) -> + Retain. %%-------------------------------------------------------------------- -%% Validate MQTT Packet +%% Protocol name and version of MQTT CONNECT Packet. %%-------------------------------------------------------------------- --spec(validate(emqx_types:packet()) -> true). -validate(?SUBSCRIBE_PACKET(_PacketId, _Properties, [])) -> - error(topic_filters_invalid); -validate(?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters)) -> - validate_packet_id(PacketId) - andalso validate_properties(?SUBSCRIBE, Properties) - andalso ok == lists:foreach(fun validate_subscription/1, TopicFilters); +%% @doc Protocol name of the CONNECT Packet. +-spec(proto_name(emqx_types:packet()|connect()) -> binary()). +proto_name(?CONNECT_PACKET(ConnPkt)) -> + proto_name(ConnPkt); +proto_name(#mqtt_packet_connect{proto_name = Name}) -> + Name. -validate(?UNSUBSCRIBE_PACKET(_PacketId, [])) -> - error(topic_filters_invalid); -validate(?UNSUBSCRIBE_PACKET(PacketId, TopicFilters)) -> - validate_packet_id(PacketId) - andalso ok == lists:foreach(fun emqx_topic:validate/1, TopicFilters); +%% @doc Protocol version of the CONNECT Packet. +-spec(proto_ver(emqx_types:packet()|connect()) -> emqx_types:version()). +proto_ver(?CONNACK_PACKET(ConnPkt)) -> + proto_ver(ConnPkt); +proto_ver(#mqtt_packet_connect{proto_ver = Ver}) -> + Ver. -validate(?PUBLISH_PACKET(_QoS, <<>>, _, #{'Topic-Alias':= _I}, _)) -> - true; -validate(?PUBLISH_PACKET(_QoS, <<>>, _, _, _)) -> - error(protocol_error); -validate(?PUBLISH_PACKET(QoS, Topic, _, Properties, _)) -> - ((not (QoS =:= 3)) orelse error(qos_invalid)) - andalso ((not emqx_topic:wildcard(Topic)) orelse error(topic_name_invalid)) - andalso validate_properties(?PUBLISH, Properties); +%%-------------------------------------------------------------------- +%% Check MQTT Packet +%%-------------------------------------------------------------------- -validate(?CONNECT_PACKET(#mqtt_packet_connect{properties = Properties})) -> - validate_properties(?CONNECT, Properties); +%% @doc Check PubSub Packet. +-spec(check(emqx_types:packet()|publish()|subscribe()|unsubscribe()) + -> ok | {error, emqx_types:reason_code()}). +check(#mqtt_packet{variable = PubPkt}) when is_record(PubPkt, mqtt_packet_publish) -> + check(PubPkt); -validate(_Packet) -> - true. +check(#mqtt_packet{variable = SubPkt}) when is_record(SubPkt, mqtt_packet_subscribe) -> + check(SubPkt); -validate_packet_id(0) -> - error(packet_id_invalid); -validate_packet_id(_) -> - true. +check(#mqtt_packet{variable = UnsubPkt}) when is_record(UnsubPkt, mqtt_packet_unsubscribe) -> + check(UnsubPkt); -validate_properties(?SUBSCRIBE, #{'Subscription-Identifier' := I}) - when I =< 0; I >= 16#FFFFFFF -> - error(subscription_identifier_invalid); -validate_properties(?PUBLISH, #{'Topic-Alias':= 0}) -> - error(topic_alias_invalid); -validate_properties(?PUBLISH, #{'Subscription-Identifier' := _I}) -> - error(protocol_error); -validate_properties(?PUBLISH, #{'Response-Topic' := ResponseTopic}) -> - case emqx_topic:wildcard(ResponseTopic) of - true -> - error(protocol_error); - false -> - true +check(#mqtt_packet_publish{topic_name = TopicName, properties = Props}) -> + try emqx_topic:validate(name, TopicName) of + true -> check_pub_props(Props) + catch + error:_Error -> + {error, ?RC_TOPIC_NAME_INVALID} end; -validate_properties(?CONNECT, #{'Receive-Maximum' := 0}) -> - error(protocol_error); -validate_properties(?CONNECT, #{'Request-Response-Information' := ReqRespInfo}) - when ReqRespInfo =/= 0, ReqRespInfo =/= 1 -> - error(protocol_error); -validate_properties(?CONNECT, #{'Request-Problem-Information' := ReqProInfo}) + +check(#mqtt_packet_subscribe{properties = #{'Subscription-Identifier' := I}}) + when I =< 0; I >= 16#FFFFFFF -> + {error, ?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED}; + +check(#mqtt_packet_subscribe{topic_filters = []}) -> + {error, ?RC_TOPIC_FILTER_INVALID}; + +check(#mqtt_packet_subscribe{topic_filters = TopicFilters}) -> + try validate_topic_filters(TopicFilters) + catch + error:_Error -> + {error, ?RC_TOPIC_FILTER_INVALID} + end; + +check(#mqtt_packet_unsubscribe{topic_filters = []}) -> + {error, ?RC_TOPIC_FILTER_INVALID}; + +check(#mqtt_packet_unsubscribe{topic_filters = TopicFilters}) -> + try validate_topic_filters(TopicFilters) + catch + error:_Error -> + {error, ?RC_TOPIC_FILTER_INVALID} + end. + +check_pub_props(#{'Topic-Alias' := 0}) -> + {error, ?RC_TOPIC_ALIAS_INVALID}; + +check_pub_props(#{'Subscription-Identifier' := 0}) -> + {error, ?RC_PROTOCOL_ERROR}; + +check_pub_props(#{'Response-Topic' := ResponseTopic}) -> + try emqx_topic:validate(name, ResponseTopic) of + true -> ok + catch + error:_Error -> + {error, ?RC_PROTOCOL_ERROR} + end; +check_pub_props(_Props) -> ok. + +%% @doc Check CONNECT Packet. +-spec(check(emqx_types:packet()|connect(), Opts :: map()) + -> ok | {error, emqx_types:reason_code()}). +check(?CONNECT_PACKET(ConnPkt), Opts) -> + check(ConnPkt, Opts); +check(ConnPkt, Opts) when is_record(ConnPkt, mqtt_packet_connect) -> + run_checks([fun check_proto_ver/2, + fun check_client_id/2, + fun check_conn_props/2, + fun check_will_msg/2], ConnPkt, Opts). + +check_proto_ver(#mqtt_packet_connect{proto_ver = Ver, + proto_name = Name}, _Opts) -> + case lists:member({Ver, Name}, ?PROTOCOL_NAMES) of + true -> ok; + false -> {error, ?RC_UNSUPPORTED_PROTOCOL_VERSION} + end. + +%% MQTT3.1 does not allow null clientId +check_client_id(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V3, + client_id = <<>>}, _Opts) -> + {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}; +%% Issue#599: Null clientId and clean_start = false +check_client_id(#mqtt_packet_connect{client_id = <<>>, + clean_start = false}, _Opts) -> + {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}; +check_client_id(#mqtt_packet_connect{client_id = <<>>, + clean_start = true}, _Opts) -> + ok; +check_client_id(#mqtt_packet_connect{client_id = ClientId}, + _Opts = #{max_clientid_len := MaxLen}) -> + case (1 =< (Len = byte_size(ClientId))) andalso (Len =< MaxLen) of + true -> ok; + false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID} + end. + +check_conn_props(#mqtt_packet_connect{properties = undefined}, _Opts) -> + ok; +check_conn_props(#mqtt_packet_connect{properties = #{'Receive-Maximum' := 0}}, _Opts) -> + {error, ?RC_PROTOCOL_ERROR}; +check_conn_props(#mqtt_packet_connect{properties = #{'Request-Response-Information' := ReqRespInfo}}, _Opts) + when ReqRespInfo =/= 0, ReqRespInfo =/= 1 -> + {error, ?RC_PROTOCOL_ERROR}; +check_conn_props(#mqtt_packet_connect{properties = #{'Request-Problem-Information' := ReqProInfo}}, _Opts) when ReqProInfo =/= 0, ReqProInfo =/= 1 -> - error(protocol_error); -validate_properties(_, _) -> - true. + {error, ?RC_PROTOCOL_ERROR}; +check_conn_props(_ConnPkt, _Opts) -> ok. -validate_subscription({Topic, #{qos := QoS}}) -> - emqx_topic:validate(filter, Topic) andalso validate_qos(QoS). +check_will_msg(#mqtt_packet_connect{will_flag = false}, _Caps) -> + ok; +check_will_msg(#mqtt_packet_connect{will_retain = true}, + _Opts = #{mqtt_retain_available := false}) -> + {error, ?RC_RETAIN_NOT_SUPPORTED}; +check_will_msg(#mqtt_packet_connect{will_topic = WillTopic}, _Opts) -> + try emqx_topic:validate(name, WillTopic) of + true -> ok + catch error:_Error -> + {error, ?RC_TOPIC_NAME_INVALID} + end. -validate_qos(QoS) when ?QOS_0 =< QoS, QoS =< ?QOS_2 -> - true; -validate_qos(_) -> error(bad_qos). +run_checks([], _Packet, _Options) -> + ok; +run_checks([Check|More], Packet, Options) -> + case Check(Packet, Options) of + ok -> run_checks(More, Packet, Options); + {error, Reason} -> {error, Reason} + end. -%% @doc From message to packet --spec(from_message(emqx_types:packet_id(), emqx_types:message()) - -> emqx_types:packet()). -from_message(PacketId, #message{qos = QoS, flags = Flags, headers = Headers, - topic = Topic, payload = Payload}) -> - Flags1 = if Flags =:= undefined -> - #{}; - true -> Flags - end, - Dup = maps:get(dup, Flags1, false), - Retain = maps:get(retain, Flags1, false), - Publish = #mqtt_packet_publish{topic_name = Topic, - packet_id = PacketId, - properties = publish_props(Headers)}, - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, - dup = Dup, - qos = QoS, - retain = Retain}, - variable = Publish, payload = Payload}. +%% @doc Validate MQTT Packet +%% @private +validate_topic_filters(TopicFilters) -> + lists:foreach( + fun({TopicFilter, _SubOpts}) -> + emqx_topic:validate(TopicFilter); + (TopicFilter) -> + emqx_topic:validate(TopicFilter) + end, TopicFilters). -publish_props(Headers) -> - maps:with(['Payload-Format-Indicator', - 'Response-Topic', - 'Correlation-Data', - 'User-Property', - 'Subscription-Identifier', - 'Content-Type', - 'Message-Expiry-Interval'], Headers). - -%% @doc Message from Packet --spec(to_message(emqx_types:client(), emqx_ypes:packet()) - -> emqx_types:message()). +%% @doc Publish Packet to Message. +-spec(to_message(emqx_types:client(), emqx_ypes:packet()) -> emqx_types:message()). to_message(#{client_id := ClientId, username := Username, peername := Peername}, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, retain = Retain, @@ -199,9 +280,10 @@ format_header(#mqtt_packet_header{type = Type, retain = Retain}, S) -> S1 = if S == undefined -> <<>>; - true -> [", ", S] + true -> [", ", S] end, - io_lib:format("~s(Q~p, R~p, D~p~s)", [type_name(Type), QoS, i(Retain), i(Dup), S1]). + io_lib:format("~s(Q~p, R~p, D~p~s)", + [lists:nth(Type, ?TYPE_NAMES), QoS, i(Retain), i(Dup), S1]). format_variable(undefined, _) -> undefined; diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index ff0f618ab..f9527634c 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -97,7 +97,7 @@ attrs(WsPid) when is_pid(WsPid) -> attrs(WsConn = #ws_connection{chan_state = ChanState}) -> ConnAttrs = info(?ATTR_KEYS, WsConn), ChanAttrs = emqx_channel:attrs(ChanState), - maps:merge(ChanAttrs, #{connection => maps:from_list(ConnAttrs)}). + maps:merge(ChanAttrs, #{conninfo => maps:from_list(ConnAttrs)}). -spec(stats(pid()|ws_connection()) -> emqx_types:stats()). stats(WsPid) when is_pid(WsPid) -> diff --git a/src/emqx_zone.erl b/src/emqx_zone.erl index 586fa7ad7..2b696c937 100644 --- a/src/emqx_zone.erl +++ b/src/emqx_zone.erl @@ -27,7 +27,8 @@ %% APIs -export([start_link/0, stop/0]). --export([ enable_acl/1 +-export([ use_username_as_clientid/1 + , enable_acl/1 , enable_banned/1 , enable_flapping_detect/1 ]). @@ -67,6 +68,10 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +-spec(use_username_as_clientid(zone()) -> boolean()). +use_username_as_clientid(Zone) -> + get_env(Zone, use_username_as_clientid, false). + -spec(enable_acl(zone()) -> boolean()). enable_acl(Zone) -> get_env(Zone, enable_acl, true). diff --git a/test/emqx_message_SUITE.erl b/test/emqx_message_SUITE.erl index 9c699b65d..e9f51db08 100644 --- a/test/emqx_message_SUITE.erl +++ b/test/emqx_message_SUITE.erl @@ -27,6 +27,7 @@ , t_header/1 , t_format/1 , t_expired/1 + , t_to_packet/1 , t_to_map/1 ]). @@ -91,6 +92,18 @@ t_expired(_) -> Msg2 = emqx_message:update_expiry(Msg1), ?assertEqual(1, emqx_message:get_header('Message-Expiry-Interval', Msg2)). +t_to_packet(_) -> + Pkt = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, + qos = ?QOS_0, + retain = false, + dup = false}, + variable = #mqtt_packet_publish{topic_name = <<"topic">>, + packet_id = 10, + properties = #{}}, + payload = <<"payload">>}, + Msg = emqx_message:make(<<"clientid">>, ?QOS_0, <<"topic">>, <<"payload">>), + ?assertEqual(Pkt, emqx_message:to_packet(10, Msg)). + t_to_map(_) -> Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"topic">>, <<"payload">>), List = [{id, emqx_message:id(Msg)}, diff --git a/test/emqx_packet_SUITE.erl b/test/emqx_packet_SUITE.erl index f3857ff2e..f802e7718 100644 --- a/test/emqx_packet_SUITE.erl +++ b/test/emqx_packet_SUITE.erl @@ -26,63 +26,127 @@ all() -> emqx_ct:all(?MODULE). -t_proto_name(_) -> - ?assertEqual(<<"MQIsdp">>, emqx_packet:proto_name(3)), - ?assertEqual(<<"MQTT">>, emqx_packet:proto_name(4)), - ?assertEqual(<<"MQTT">>, emqx_packet:proto_name(5)). +t_type(_) -> + ?assertEqual(?CONNECT, emqx_packet:type(?CONNECT_PACKET(#mqtt_packet_connect{}))), + ?assertEqual(?CONNACK, emqx_packet:type(?CONNACK_PACKET(?RC_SUCCESS))), + ?assertEqual(?PUBLISH, emqx_packet:type(?PUBLISH_PACKET(?QOS_1))), + ?assertEqual(?PUBACK, emqx_packet:type(?PUBACK_PACKET(1))), + ?assertEqual(?PUBREC, emqx_packet:type(?PUBREC_PACKET(1))), + ?assertEqual(?PUBREL, emqx_packet:type(?PUBREL_PACKET(1))), + ?assertEqual(?PUBCOMP, emqx_packet:type(?PUBCOMP_PACKET(1))), + ?assertEqual(?SUBSCRIBE, emqx_packet:type(?SUBSCRIBE_PACKET(1, []))), + ?assertEqual(?SUBACK, emqx_packet:type(?SUBACK_PACKET(1, [0]))), + ?assertEqual(?UNSUBSCRIBE, emqx_packet:type(?UNSUBSCRIBE_PACKET(1, []))), + ?assertEqual(?UNSUBACK, emqx_packet:type(?UNSUBACK_PACKET(1))), + ?assertEqual(?DISCONNECT, emqx_packet:type(?DISCONNECT_PACKET(?RC_SUCCESS))), + ?assertEqual(?AUTH, emqx_packet:type(?AUTH_PACKET())). t_type_name(_) -> - ?assertEqual('CONNECT', emqx_packet:type_name(?CONNECT)), - ?assertEqual('UNSUBSCRIBE', emqx_packet:type_name(?UNSUBSCRIBE)). + ?assertEqual('CONNECT', emqx_packet:type_name(?CONNECT_PACKET(#mqtt_packet_connect{}))), + ?assertEqual('CONNACK', emqx_packet:type_name(?CONNACK_PACKET(?RC_SUCCESS))), + ?assertEqual('PUBLISH', emqx_packet:type_name(?PUBLISH_PACKET(?QOS_1))), + ?assertEqual('PUBACK', emqx_packet:type_name(?PUBACK_PACKET(1))), + ?assertEqual('PUBREC', emqx_packet:type_name(?PUBREC_PACKET(1))), + ?assertEqual('PUBREL', emqx_packet:type_name(?PUBREL_PACKET(1))), + ?assertEqual('PUBCOMP', emqx_packet:type_name(?PUBCOMP_PACKET(1))), + ?assertEqual('SUBSCRIBE', emqx_packet:type_name(?SUBSCRIBE_PACKET(1, []))), + ?assertEqual('SUBACK', emqx_packet:type_name(?SUBACK_PACKET(1, [0]))), + ?assertEqual('UNSUBSCRIBE', emqx_packet:type_name(?UNSUBSCRIBE_PACKET(1, []))), + ?assertEqual('UNSUBACK', emqx_packet:type_name(?UNSUBACK_PACKET(1))), + ?assertEqual('DISCONNECT', emqx_packet:type_name(?DISCONNECT_PACKET(?RC_SUCCESS))), + ?assertEqual('AUTH', emqx_packet:type_name(?AUTH_PACKET())). -t_validate(_) -> - ?assert(emqx_packet:validate(?SUBSCRIBE_PACKET(15, #{'Subscription-Identifier' => 1}, - [{<<"topic">>, #{qos => ?QOS_0}}]))), - ?assert(emqx_packet:validate(?UNSUBSCRIBE_PACKET(89, [<<"topic">>]))), - ?assert(emqx_packet:validate(?CONNECT_PACKET(#mqtt_packet_connect{}))), +t_dup(_) -> + ?assertEqual(false, emqx_packet:dup(?PUBLISH_PACKET(?QOS_1))). + +t_qos(_) -> + ?assertEqual(?QOS_0, emqx_packet:qos(?PUBLISH_PACKET(?QOS_0))), + ?assertEqual(?QOS_1, emqx_packet:qos(?PUBLISH_PACKET(?QOS_1))), + ?assertEqual(?QOS_2, emqx_packet:qos(?PUBLISH_PACKET(?QOS_2))). + +t_retain(_) -> + ?assertEqual(false, emqx_packet:retain(?PUBLISH_PACKET(?QOS_1))). + +t_proto_name(_) -> + lists:foreach( + fun({Ver, Name}) -> + ConnPkt = ?CONNECT_PACKET(#mqtt_packet_connect{proto_ver = Ver, + proto_name = Name}), + ?assertEqual(Name, emqx_packet:proto_name(ConnPkt)) + end, ?PROTOCOL_NAMES). + +t_proto_ver(_) -> + lists:foreach( + fun(Ver) -> + ?assertEqual(Ver, emqx_packet:proto_ver(#mqtt_packet_connect{proto_ver = Ver})) + end, [?MQTT_PROTO_V3, ?MQTT_PROTO_V4, ?MQTT_PROTO_V5]). + +t_check_publish(_) -> Props = #{'Response-Topic' => <<"responsetopic">>, 'Topic-Alias' => 1}, - ?assert(emqx_packet:validate(?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, Props, <<"payload">>))), - ?assert(emqx_packet:validate(?CONNECT_PACKET(#mqtt_packet_connect{properties = #{'Receive-Maximum' => 1}}))), - ?assertError(subscription_identifier_invalid, - emqx_packet:validate( - ?SUBSCRIBE_PACKET(15, #{'Subscription-Identifier' => -1}, - [{<<"topic">>, #{qos => ?QOS_0}}]))), - ?assertError(topic_filters_invalid, - emqx_packet:validate(?UNSUBSCRIBE_PACKET(1,[]))), - ?assertError(protocol_error, - emqx_packet:validate(?PUBLISH_PACKET(1,<<>>,1,#{},<<"payload">>))), - ?assertError(topic_name_invalid, - emqx_packet:validate(?PUBLISH_PACKET - (1, <<"+/+">>, 1, #{}, <<"payload">>))), - ?assertError(topic_alias_invalid, - emqx_packet:validate( - ?PUBLISH_PACKET - (1, <<"topic">>, 1, #{'Topic-Alias' => 0}, <<"payload">>))), - ?assertError(protocol_error, - emqx_packet:validate( - ?PUBLISH_PACKET(1, <<"topic">>, 1, - #{'Subscription-Identifier' => 10}, <<"payload">>))), - ?assertError(protocol_error, - emqx_packet:validate( - ?PUBLISH_PACKET(1, <<"topic">>, 1, - #{'Response-Topic' => <<"+/+">>}, <<"payload">>))), - ?assertError(protocol_error, - emqx_packet:validate( - ?CONNECT_PACKET(#mqtt_packet_connect{ - properties = - #{'Request-Response-Information' => -1}}))), - ?assertError(protocol_error, - emqx_packet:validate( - ?CONNECT_PACKET(#mqtt_packet_connect{ - properties = - #{'Request-Problem-Information' => 2}}))), - ?assertError(protocol_error, - emqx_packet:validate( - ?CONNECT_PACKET(#mqtt_packet_connect{ - properties = - #{'Receive-Maximum' => 0}}))). + ok = emqx_packet:check(?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, Props, <<"payload">>)), + ok = emqx_packet:check(#mqtt_packet_publish{packet_id = 1, topic_name = <<"t">>}), + {error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(?PUBLISH_PACKET(1,<<>>,1,#{},<<"payload">>)), + {error, ?RC_TOPIC_NAME_INVALID} = emqx_packet:check(?PUBLISH_PACKET(1, <<"+/+">>, 1, #{}, <<"payload">>)), + {error, ?RC_TOPIC_ALIAS_INVALID} = emqx_packet:check(?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Topic-Alias' => 0}, <<"payload">>)), + %% TODO:: + %% {error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Subscription-Identifier' => 10}, <<"payload">>)), + ok = emqx_packet:check(?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Subscription-Identifier' => 10}, <<"payload">>)), + {error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Response-Topic' => <<"+/+">>}, <<"payload">>)). + +t_check_subscribe(_) -> + ok = emqx_packet:check(?SUBSCRIBE_PACKET(1, #{'Subscription-Identifier' => 1}, + [{<<"topic">>, #{qos => ?QOS_0}}])), + {error, ?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED} = + emqx_packet:check(?SUBSCRIBE_PACKET(1, #{'Subscription-Identifier' => -1}, + [{<<"topic">>, #{qos => ?QOS_0, rp => 0}}])). + +t_check_unsubscribe(_) -> + ok = emqx_packet:check(?UNSUBSCRIBE_PACKET(1, [<<"topic">>])), + {error, ?RC_TOPIC_FILTER_INVALID} = emqx_packet:check(?UNSUBSCRIBE_PACKET(1,[])). + +t_check_connect(_) -> + Opts = #{max_clientid_len => 5, mqtt_retain_available => false}, + ok = emqx_packet:check(#mqtt_packet_connect{}, Opts), + ok = emqx_packet:check(?CONNECT_PACKET(#mqtt_packet_connect{properties = #{'Receive-Maximum' => 1}}), Opts), + ConnPkt1 = #mqtt_packet_connect{proto_name = <<"MQIsdp">>, + proto_ver = ?MQTT_PROTO_V5 + }, + {error, ?RC_UNSUPPORTED_PROTOCOL_VERSION} = emqx_packet:check(ConnPkt1, Opts), + + ConnPkt2 = #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V3, + proto_name = <<"MQIsdp">>, + client_id = <<>> + }, + {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID} = emqx_packet:check(ConnPkt2, Opts), + + ConnPkt3 = #mqtt_packet_connect{client_id = <<"123456">>}, + {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID} = emqx_packet:check(ConnPkt3, Opts), + + ConnPkt4 = #mqtt_packet_connect{will_flag = true, + will_retain = true + }, + {error, ?RC_RETAIN_NOT_SUPPORTED} = emqx_packet:check(ConnPkt4, Opts), + + ConnPkt5 = #mqtt_packet_connect{will_flag = true, + will_topic = <<"#">> + }, + {error, ?RC_TOPIC_NAME_INVALID} = emqx_packet:check(ConnPkt5, Opts), + + ConnPkt6 = ?CONNECT_PACKET(#mqtt_packet_connect{properties = #{'Request-Response-Information' => -1}}), + {error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(ConnPkt6, Opts), + + {error, ?RC_PROTOCOL_ERROR} = emqx_packet:check( + ?CONNECT_PACKET(#mqtt_packet_connect{ + properties = #{'Request-Problem-Information' => 2}}), Opts), + {error, ?RC_PROTOCOL_ERROR} = emqx_packet:check( + ?CONNECT_PACKET(#mqtt_packet_connect{ + properties = #{'Receive-Maximum' => 0}}), Opts). t_from_to_message(_) -> + ExpectedMsg = emqx_message:set_headers( + #{peername => {{127,0,0,1}, 9527}, username => <<"test">>}, + emqx_message:make(<<"clientid">>, ?QOS_0, <<"topic">>, <<"payload">>)), + ExpectedMsg1 = emqx_message:set_flag(retain, false, ExpectedMsg), Pkt = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, qos = ?QOS_0, retain = false, @@ -91,30 +155,12 @@ t_from_to_message(_) -> packet_id = 10, properties = #{}}, payload = <<"payload">>}, - Msg = emqx_message:make(<<"clientid">>, ?QOS_0, <<"topic">>, <<"payload">>), - Msg2 = emqx_message:set_flag(retain, false, Msg), - Pkt = emqx_packet:from_message(10, Msg2), - Msg3 = emqx_message:set_header( - peername, {{127,0,0,1}, 9527}, - emqx_message:set_header(username, "test", Msg2) - ), - Msg4 = emqx_packet:to_message(#{client_id => <<"clientid">>, - username => "test", - peername => {{127,0,0,1}, 9527}}, Pkt), - Msg5 = Msg4#message{timestamp = Msg3#message.timestamp, id = Msg3#message.id}, - Msg5 = Msg3. - -t_packet_format(_) -> - io:format("~s", [emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{}))]), - io:format("~s", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]), - io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1))]), - io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]), - io:format("~s", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]), - io:format("~s", [emqx_packet:format(?PUBREL_PACKET(99))]), - io:format("~s", [emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS_0}, {<<"topic1">>, ?QOS_1}]))]), - io:format("~s", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS_0, ?QOS_1]))]), - io:format("~s", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]), - io:format("~s", [emqx_packet:format(?UNSUBACK_PACKET(90))]). + MsgFromPkt = emqx_packet:to_message(#{client_id => <<"clientid">>, + username => <<"test">>, + peername => {{127,0,0,1}, 9527}}, Pkt), + ?assertEqual(ExpectedMsg1, MsgFromPkt#message{id = emqx_message:id(ExpectedMsg), + timestamp = emqx_message:timestamp(ExpectedMsg) + }). t_will_msg(_) -> Pkt = #mqtt_packet_connect{will_flag = true, @@ -130,3 +176,15 @@ t_will_msg(_) -> ?assertEqual(<<"clientid">>, Msg#message.from), ?assertEqual(<<"topic">>, Msg#message.topic). +t_format(_) -> + io:format("~s", [emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{}))]), + io:format("~s", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]), + io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1))]), + io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]), + io:format("~s", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]), + io:format("~s", [emqx_packet:format(?PUBREL_PACKET(99))]), + io:format("~s", [emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS_0}, {<<"topic1">>, ?QOS_1}]))]), + io:format("~s", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS_0, ?QOS_1]))]), + io:format("~s", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]), + io:format("~s", [emqx_packet:format(?UNSUBACK_PACKET(90))]). +