diff --git a/include/emqx.hrl b/include/emqx.hrl index acca359c4..bbeb5defd 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -62,13 +62,15 @@ %% Message from from :: atom() | binary(), %% Message flags - flags :: #{atom() => boolean()}, - %% Message headers, or MQTT 5.0 Properties - headers :: map(), + flags = #{} :: emqx_types:flags(), + %% Message headers. May contain any metadata. e.g. the + %% protocol version number, username, peerhost or + %% the PUBLISH properties (MQTT 5.0). + headers = #{} :: emqx_types:headers(), %% Topic that the message is published to - topic :: binary(), + topic :: emqx_types:topic(), %% Message Payload - payload :: binary(), + payload :: emqx_types:payload(), %% Timestamp (Unit: millisecond) timestamp :: integer() }). diff --git a/include/emqx_mqtt.hrl b/include/emqx_mqtt.hrl index 152846a1b..078baf301 100644 --- a/include/emqx_mqtt.hrl +++ b/include/emqx_mqtt.hrl @@ -219,9 +219,9 @@ will_qos = ?QOS_0, will_retain = false, keepalive = 0, - properties = undefined, + properties = #{}, clientid = <<>>, - will_props = undefined, + will_props = #{}, will_topic = undefined, will_payload = undefined, username = undefined, @@ -231,53 +231,53 @@ -record(mqtt_packet_connack, { ack_flags, reason_code, - properties + properties = #{} }). -record(mqtt_packet_publish, { topic_name, packet_id, - properties + properties = #{} }). -record(mqtt_packet_puback, { packet_id, reason_code, - properties + properties = #{} }). -record(mqtt_packet_subscribe, { packet_id, - properties, + properties = #{}, topic_filters }). -record(mqtt_packet_suback, { packet_id, - properties, + properties = #{}, reason_codes }). -record(mqtt_packet_unsubscribe, { packet_id, - properties, + properties = #{}, topic_filters }). -record(mqtt_packet_unsuback, { packet_id, - properties, + properties = #{}, reason_codes }). -record(mqtt_packet_disconnect, { reason_code, - properties + properties = #{} }). -record(mqtt_packet_auth, { reason_code, - properties + properties = #{} }). %%-------------------------------------------------------------------- diff --git a/src/emqx_alarm_handler.erl b/src/emqx_alarm_handler.erl index 990f8a76e..77d2b60a0 100644 --- a/src/emqx_alarm_handler.erl +++ b/src/emqx_alarm_handler.erl @@ -171,9 +171,10 @@ encode_alarm({AlarmId, AlarmDesc}) -> }). alarm_msg(Topic, Payload) -> - Msg = emqx_message:make(?MODULE, Topic, Payload), - emqx_message:set_headers(#{'Content-Type' => <<"application/json">>}, - emqx_message:set_flag(sys, Msg)). + emqx_message:make(?MODULE, 0, Topic, Payload, + #{sys => true}, + #{properties => #{'Content-Type' => <<"application/json">>}} + ). topic(alert) -> emqx_topic:systop(<<"alarms/alert">>); diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index ff964eff1..ff1ca4661 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -281,14 +281,14 @@ handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) -> handle_out(disconnect, ReasonCode, Channel) end; -handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), Channel +handle_in(?PUBACK_PACKET(PacketId, _ReasonCode, Properties), Channel = #channel{clientinfo = ClientInfo, session = Session}) -> case emqx_session:puback(PacketId, Session) of {ok, Msg, NSession} -> - ok = after_message_acked(ClientInfo, Msg), + ok = after_message_acked(ClientInfo, Msg, Properties), {ok, Channel#channel{session = NSession}}; {ok, Msg, Publishes, NSession} -> - ok = after_message_acked(ClientInfo, Msg), + ok = after_message_acked(ClientInfo, Msg, Properties), handle_out(publish, Publishes, Channel#channel{session = NSession}); {error, ?RC_PACKET_IDENTIFIER_IN_USE} -> ?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]), @@ -300,11 +300,11 @@ handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), Channel {ok, Channel} end; -handle_in(?PUBREC_PACKET(PacketId, _ReasonCode), Channel +handle_in(?PUBREC_PACKET(PacketId, _ReasonCode, Properties), Channel = #channel{clientinfo = ClientInfo, session = Session}) -> case emqx_session:pubrec(PacketId, Session) of {ok, Msg, NSession} -> - ok = after_message_acked(ClientInfo, Msg), + ok = after_message_acked(ClientInfo, Msg, Properties), NChannel = Channel#channel{session = NSession}, handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel); {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} -> @@ -347,12 +347,12 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) -> case emqx_packet:check(Packet) of ok -> TopicFilters1 = parse_topic_filters(TopicFilters), - TopicFilters2 = enrich_subid(Properties, TopicFilters1), + TopicFilters2 = put_subid_in_subopts(Properties, TopicFilters1), TopicFilters3 = run_hooks('client.subscribe', [ClientInfo, Properties], TopicFilters2 ), - {ReasonCodes, NChannel} = process_subscribe(TopicFilters3, Channel), + {ReasonCodes, NChannel} = process_subscribe(TopicFilters3, Properties, Channel), case emqx_zone:get_env(Zone, acl_deny_action, ignore) =:= disconnect andalso lists:any(fun(ReasonCode) -> ReasonCode =:= ?RC_NOT_AUTHORIZED @@ -373,7 +373,7 @@ handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), [ClientInfo, Properties], parse_topic_filters(TopicFilters) ), - {ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel), + {ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Properties, Channel), handle_out(unsuback, {PacketId, ReasonCodes}, NChannel); {error, ReasonCode} -> handle_out(disconnect, ReasonCode, Channel) @@ -382,8 +382,8 @@ handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), handle_in(?PACKET(?PINGREQ), Channel) -> {ok, ?PACKET(?PINGRESP), Channel}; -handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel) -> - NChannel = maybe_clean_will_msg(ReasonCode, Channel), +handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel = #channel{conninfo = ConnInfo}) -> + NChannel = maybe_clean_will_msg(ReasonCode, Channel#channel{conninfo = ConnInfo#{disconn_props => Properties}}), process_disconnect(ReasonCode, Properties, NChannel); handle_in(?AUTH_PACKET(), Channel) -> @@ -437,7 +437,7 @@ process_connect(AckProps, Channel = #channel{conninfo = #{clean_start := CleanSt %% Process Publish %%-------------------------------------------------------------------- -process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), +process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel = #channel{clientinfo = #{zone := Zone}}) -> case pipeline([fun process_alias/2, fun check_pub_alias/2, @@ -466,12 +466,23 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), handle_out(disconnect, ReasonCode, NChannel) end. -packet_to_message(Packet, #channel{conninfo = #{proto_ver := ProtoVer}, - clientinfo = ClientInfo = - #{mountpoint := MountPoint}}) -> - emqx_mountpoint:mount( - MountPoint, emqx_packet:to_message( - ClientInfo, #{proto_ver => ProtoVer}, Packet)). +packet_to_message(Packet, #channel{ + conninfo = #{proto_ver := ProtoVer}, + clientinfo = #{ + protocol := Protocol, + clientid := ClientId, + username := Username, + peerhost := PeerHost, + mountpoint := MountPoint + } + }) -> + emqx_mountpoint:mount(MountPoint, + emqx_packet:to_message( + Packet, ClientId, + #{proto_ver => ProtoVer, + protocol => Protocol, + username => Username, + peerhost => PeerHost})). do_publish(_PacketId, Msg = #message{qos = ?QOS_0}, Channel) -> _ = emqx_broker:publish(Msg), @@ -504,25 +515,26 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2}, puback_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS; puback_reason_code([_|_]) -> ?RC_SUCCESS. --compile({inline, [after_message_acked/2]}). -after_message_acked(ClientInfo, Msg) -> +-compile({inline, [after_message_acked/3]}). +after_message_acked(ClientInfo, Msg, PubAckProps) -> ok = emqx_metrics:inc('messages.acked'), - emqx_hooks:run('message.acked', [ClientInfo, Msg]). + emqx_hooks:run('message.acked', [ClientInfo, + emqx_message:set_header(puback_props, PubAckProps, Msg)]). %%-------------------------------------------------------------------- %% Process Subscribe %%-------------------------------------------------------------------- --compile({inline, [process_subscribe/2]}). -process_subscribe(TopicFilters, Channel) -> - process_subscribe(TopicFilters, [], Channel). +-compile({inline, [process_subscribe/3]}). +process_subscribe(TopicFilters, SubProps, Channel) -> + process_subscribe(TopicFilters, SubProps, Channel, []). -process_subscribe([], Acc, Channel) -> +process_subscribe([], _SubProps, Channel, Acc) -> {lists:reverse(Acc), Channel}; -process_subscribe([{TopicFilter, SubOpts}|More], Acc, Channel) -> - {RC, NChannel} = do_subscribe(TopicFilter, SubOpts, Channel), - process_subscribe(More, [RC|Acc], NChannel). +process_subscribe([{TopicFilter, SubOpts}|More], SubProps, Channel, Acc) -> + {RC, NChannel} = do_subscribe(TopicFilter, SubOpts#{sub_props => SubProps}, Channel), + process_subscribe(More, SubProps, NChannel, [RC|Acc]). do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel = #channel{clientinfo = ClientInfo = #{mountpoint := MountPoint}, @@ -557,22 +569,22 @@ process_force_subscribe(Subscriptions, Channel = %% Process Unsubscribe %%-------------------------------------------------------------------- --compile({inline, [process_unsubscribe/2]}). -process_unsubscribe(TopicFilters, Channel) -> - process_unsubscribe(TopicFilters, [], Channel). +-compile({inline, [process_unsubscribe/3]}). +process_unsubscribe(TopicFilters, UnSubProps, Channel) -> + process_unsubscribe(TopicFilters, UnSubProps, Channel, []). -process_unsubscribe([], Acc, Channel) -> +process_unsubscribe([], _UnSubProps, Channel, Acc) -> {lists:reverse(Acc), Channel}; -process_unsubscribe([{TopicFilter, SubOpts}|More], Acc, Channel) -> - {RC, NChannel} = do_unsubscribe(TopicFilter, SubOpts, Channel), - process_unsubscribe(More, [RC|Acc], NChannel). +process_unsubscribe([{TopicFilter, SubOpts}|More], UnSubProps, Channel, Acc) -> + {RC, NChannel} = do_unsubscribe(TopicFilter, SubOpts#{unsub_props => UnSubProps}, Channel), + process_unsubscribe(More, UnSubProps, NChannel, [RC|Acc]). -do_unsubscribe(TopicFilter, _SubOpts, Channel = +do_unsubscribe(TopicFilter, SubOpts, Channel = #channel{clientinfo = ClientInfo = #{mountpoint := MountPoint}, session = Session}) -> TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter), - case emqx_session:unsubscribe(ClientInfo, TopicFilter1, Session) of + case emqx_session:unsubscribe(ClientInfo, TopicFilter1, SubOpts, Session) of {ok, NSession} -> {?RC_SUCCESS, Channel#channel{session = NSession}}; {error, RC} -> {RC, Channel} @@ -582,9 +594,9 @@ do_unsubscribe(TopicFilter, _SubOpts, Channel = process_force_unsubscribe(Subscriptions, Channel = #channel{clientinfo = ClientInfo = #{mountpoint := MountPoint}, session = Session}) -> - lists:foldl(fun({TopicFilter, _SubOpts}, {ReasonCodes, ChannelAcc}) -> + lists:foldl(fun({TopicFilter, SubOpts}, {ReasonCodes, ChannelAcc}) -> NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter), - case emqx_session:unsubscribe(ClientInfo, NTopicFilter, Session) of + case emqx_session:unsubscribe(ClientInfo, NTopicFilter, SubOpts, Session) of {ok, NSession} -> {ReasonCodes ++ [?RC_SUCCESS], ChannelAcc#channel{session = NSession}}; {error, ReasonCode} -> @@ -844,7 +856,7 @@ handle_info({subscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInf [ClientInfo, #{'Internal' => true}], parse_topic_filters(TopicFilters) ), - {_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel), + {_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, #{}, Channel), {ok, NChannel}; handle_info({force_subscribe, TopicFilters}, Channel) -> @@ -856,7 +868,7 @@ handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientI [ClientInfo, #{'Internal' => true}], parse_topic_filters(TopicFilters) ), - {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel), + {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, #{}, Channel), {ok, NChannel}; handle_info({force_unsubscribe, TopicFilters}, Channel) -> @@ -1329,9 +1341,9 @@ check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = #{zone := Zone}}) -> %%-------------------------------------------------------------------- %% Enrich SubId -enrich_subid(#{'Subscription-Identifier' := SubId}, TopicFilters) -> +put_subid_in_subopts(#{'Subscription-Identifier' := SubId}, TopicFilters) -> [{Topic, SubOpts#{subid => SubId}} || {Topic, SubOpts} <- TopicFilters]; -enrich_subid(_Properties, TopicFilters) -> TopicFilters. +put_subid_in_subopts(_Properties, TopicFilters) -> TopicFilters. %%-------------------------------------------------------------------- %% Enrich SubOpts @@ -1490,7 +1502,7 @@ mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> end. will_delay_interval(WillMsg) -> - emqx_message:get_header('Will-Delay-Interval', WillMsg, 0). + maps:get('Will-Delay-Interval', emqx_message:get_header(properties, WillMsg), 0). publish_will_msg(Msg) -> emqx_broker:publish(Msg). diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index 9e5f85dfc..dce697054 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -307,7 +307,7 @@ parse_packet_id(<>) -> {PacketId, Rest}. parse_properties(Bin, Ver) when Ver =/= ?MQTT_PROTO_V5 -> - {undefined, Bin}; + {#{}, Bin}; %% TODO: version mess? parse_properties(<<>>, ?MQTT_PROTO_V5) -> {#{}, <<>>}; diff --git a/src/emqx_message.erl b/src/emqx_message.erl index f7fbd6021..0b5cbb56f 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -26,6 +26,8 @@ -export([ make/2 , make/3 , make/4 + , make/6 + , make/7 ]). %% Fields @@ -69,8 +71,6 @@ -export([format/1]). --type(flag() :: atom()). - -spec(make(emqx_topic:topic(), emqx_types:payload()) -> emqx_types:message()). make(Topic, Payload) -> make(undefined, Topic, Payload). @@ -95,6 +95,47 @@ make(From, QoS, Topic, Payload) when ?QOS_0 =< QoS, QoS =< ?QOS_2 -> timestamp = Now }. +-spec(make(emqx_types:clientid(), + emqx_types:qos(), + emqx_topic:topic(), + emqx_types:payload(), + emqx_types:flags(), + emqx_types:headers()) -> emqx_types:message()). +make(From, QoS, Topic, Payload, Flags, Headers) + when ?QOS_0 =< QoS, QoS =< ?QOS_2, + is_map(Flags), is_map(Headers) -> + Now = erlang:system_time(millisecond), + #message{id = emqx_guid:gen(), + qos = QoS, + from = From, + flags = Flags, + headers = Headers, + topic = Topic, + payload = Payload, + timestamp = Now + }. + +-spec(make(MsgId :: binary(), + emqx_types:clientid(), + emqx_types:qos(), + emqx_topic:topic(), + emqx_types:payload(), + emqx_types:flags(), + emqx_types:headers()) -> emqx_types:message()). +make(MsgId, From, QoS, Topic, Payload, Flags, Headers) + when ?QOS_0 =< QoS, QoS =< ?QOS_2, + is_map(Flags), is_map(Headers) -> + Now = erlang:system_time(millisecond), + #message{id = MsgId, + qos = QoS, + from = From, + flags = Flags, + headers = Headers, + topic = Topic, + payload = Payload, + timestamp = Now + }. + -spec(id(emqx_types:message()) -> maybe(binary())). id(#message{id = Id}) -> Id. @@ -126,39 +167,29 @@ clean_dup(Msg = #message{flags = Flags = #{dup := true}}) -> clean_dup(Msg) -> Msg. -spec(set_flags(map(), emqx_types:message()) -> emqx_types:message()). -set_flags(Flags, Msg = #message{flags = undefined}) when is_map(Flags) -> - Msg#message{flags = Flags}; set_flags(New, Msg = #message{flags = Old}) when is_map(New) -> Msg#message{flags = maps:merge(Old, New)}. --spec(get_flag(flag(), emqx_types:message()) -> boolean()). -get_flag(_Flag, #message{flags = undefined}) -> - false; +-spec(get_flag(emqx_types:flag(), emqx_types:message()) -> boolean()). get_flag(Flag, Msg) -> get_flag(Flag, Msg, false). -get_flag(_Flag, #message{flags = undefined}, Default) -> - Default; get_flag(Flag, #message{flags = Flags}, Default) -> maps:get(Flag, Flags, Default). -spec(get_flags(emqx_types:message()) -> maybe(map())). get_flags(#message{flags = Flags}) -> Flags. --spec(set_flag(flag(), emqx_types:message()) -> emqx_types:message()). -set_flag(Flag, Msg = #message{flags = undefined}) when is_atom(Flag) -> - Msg#message{flags = #{Flag => true}}; +-spec(set_flag(emqx_types:flag(), emqx_types:message()) -> emqx_types:message()). set_flag(Flag, Msg = #message{flags = Flags}) when is_atom(Flag) -> Msg#message{flags = maps:put(Flag, true, Flags)}. --spec(set_flag(flag(), boolean() | integer(), emqx_types:message()) +-spec(set_flag(emqx_types:flag(), boolean() | integer(), emqx_types:message()) -> emqx_types:message()). -set_flag(Flag, Val, Msg = #message{flags = undefined}) when is_atom(Flag) -> - Msg#message{flags = #{Flag => Val}}; set_flag(Flag, Val, Msg = #message{flags = Flags}) when is_atom(Flag) -> Msg#message{flags = maps:put(Flag, Val, Flags)}. --spec(unset_flag(flag(), emqx_types:message()) -> emqx_types:message()). +-spec(unset_flag(emqx_types:flag(), emqx_types:message()) -> emqx_types:message()). unset_flag(Flag, Msg = #message{flags = Flags}) -> case maps:is_key(Flag, Flags) of true -> Msg#message{flags = maps:remove(Flag, Flags)}; @@ -166,8 +197,6 @@ unset_flag(Flag, Msg = #message{flags = Flags}) -> end. -spec(set_headers(map(), emqx_types:message()) -> emqx_types:message()). -set_headers(Headers, Msg = #message{headers = undefined}) when is_map(Headers) -> - Msg#message{headers = Headers}; set_headers(New, Msg = #message{headers = Old}) when is_map(New) -> Msg#message{headers = maps:merge(Old, New)}. @@ -175,25 +204,17 @@ set_headers(New, Msg = #message{headers = Old}) when is_map(New) -> get_headers(Msg) -> Msg#message.headers. -spec(get_header(term(), emqx_types:message()) -> term()). -get_header(_Hdr, #message{headers = undefined}) -> - undefined; get_header(Hdr, Msg) -> get_header(Hdr, Msg, undefined). -spec(get_header(term(), emqx_types:message(), term()) -> term()). -get_header(_Hdr, #message{headers = undefined}, Default) -> - Default; get_header(Hdr, #message{headers = Headers}, Default) -> maps:get(Hdr, Headers, Default). -spec(set_header(term(), term(), emqx_types:message()) -> emqx_types:message()). -set_header(Hdr, Val, Msg = #message{headers = undefined}) -> - Msg#message{headers = #{Hdr => Val}}; set_header(Hdr, Val, Msg = #message{headers = Headers}) -> Msg#message{headers = maps:put(Hdr, Val, Headers)}. -spec(remove_header(term(), emqx_types:message()) -> emqx_types:message()). -remove_header(_Hdr, Msg = #message{headers = undefined}) -> - Msg; remove_header(Hdr, Msg = #message{headers = Headers}) -> case maps:is_key(Hdr, Headers) of true -> Msg#message{headers = maps:remove(Hdr, Headers)}; @@ -201,18 +222,18 @@ remove_header(Hdr, Msg = #message{headers = Headers}) -> end. -spec(is_expired(emqx_types:message()) -> boolean()). -is_expired(#message{headers = #{'Message-Expiry-Interval' := Interval}, +is_expired(#message{headers = #{properties := #{'Message-Expiry-Interval' := Interval}}, timestamp = CreatedAt}) -> elapsed(CreatedAt) > timer:seconds(Interval); is_expired(_Msg) -> false. -spec(update_expiry(emqx_types:message()) -> emqx_types:message()). -update_expiry(Msg = #message{headers = #{'Message-Expiry-Interval' := Interval}, +update_expiry(Msg = #message{headers = #{properties := Props = #{'Message-Expiry-Interval' := Interval}}, timestamp = CreatedAt}) -> case elapsed(CreatedAt) of Elapsed when Elapsed > 0 -> Interval1 = max(1, Interval - (Elapsed div 1000)), - set_header('Message-Expiry-Interval', Interval1, Msg); + set_header(properties, Props#{'Message-Expiry-Interval' => Interval1}, Msg); _ -> Msg end; update_expiry(Msg) -> Msg. @@ -229,21 +250,11 @@ to_packet(PacketId, Msg = #message{qos = QoS, headers = Headers, }, variable = #mqtt_packet_publish{topic_name = Topic, packet_id = PacketId, - properties = props(Headers) + properties = maps:get(properties, Headers, #{}) }, payload = Payload }. -props(undefined) -> undefined; -props(Headers) -> maps:with(['Payload-Format-Indicator', - 'Response-Topic', - 'Correlation-Data', - 'User-Property', - 'Subscription-Identifier', - 'Content-Type', - 'Message-Expiry-Interval' - ], Headers). - %% @doc Message to map -spec(to_map(emqx_types:message()) -> map()). to_map(#message{ diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index 768bf79e1..43fb4b51e 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -373,34 +373,26 @@ validate_topic_filters(TopicFilters) -> emqx_topic:validate(TopicFilter) end, TopicFilters). --spec(to_message(emqx_types:clientinfo(), emqx_ypes:packet()) -> emqx_types:message()). -to_message(ClientInfo, Packet) -> - to_message(ClientInfo, #{}, Packet). +-spec(to_message(emqx_types:packet(), emqx_types:clientid()) -> emqx_types:message()). +to_message(Packet, ClientId) -> + to_message(Packet, ClientId, #{}). %% @doc Transform Publish Packet to Message. --spec(to_message(emqx_types:clientinfo(), map(), emqx_ypes:packet()) - -> emqx_types:message()). -to_message(#{protocol := Protocol, - clientid := ClientId, - username := Username, - peerhost := PeerHost - }, Headers, - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, - retain = Retain, - qos = QoS, - dup = Dup - }, - variable = #mqtt_packet_publish{topic_name = Topic, - properties = Props - }, - payload = Payload - }) -> +-spec(to_message(emqx_types:packet(), emqx_types:clientid(), map()) -> emqx_types:message()). +to_message(#mqtt_packet{ + header = #mqtt_packet_header{ + type = ?PUBLISH, + retain = Retain, + qos = QoS, + dup = Dup}, + variable = #mqtt_packet_publish{ + topic_name = Topic, + properties = Props}, + payload = Payload + }, ClientId, Headers) -> Msg = emqx_message:make(ClientId, QoS, Topic, Payload), - Headers1 = merge_props(Headers#{protocol => Protocol, - username => Username, - peerhost => PeerHost - }, Props), - Msg#message{flags = #{dup => Dup, retain => Retain}, headers = Headers1}. + Msg#message{flags = #{dup => Dup, retain => Retain}, + headers = Headers#{properties => Props}}. -spec(will_msg(#mqtt_packet_connect{}) -> emqx_types:message()). will_msg(#mqtt_packet_connect{will_flag = false}) -> @@ -413,13 +405,8 @@ will_msg(#mqtt_packet_connect{clientid = ClientId, will_props = Props, will_payload = Payload}) -> Msg = emqx_message:make(ClientId, QoS, Topic, Payload), - Headers = merge_props(#{username => Username}, Props), - Msg#message{flags = #{dup => false, retain => Retain}, headers = Headers}. - -merge_props(Headers, undefined) -> - Headers; -merge_props(Headers, Props) -> - maps:merge(Headers, Props). + Msg#message{flags = #{dup => false, retain => Retain}, + headers = #{username => Username, properties => Props}}. %% @doc Format packet -spec(format(emqx_types:packet()) -> iolist()). diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 684589ff2..1e7a4dc7a 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -63,7 +63,7 @@ ]). -export([ subscribe/4 - , unsubscribe/3 + , unsubscribe/4 ]). -export([ publish/3 @@ -261,13 +261,13 @@ is_subscriptions_full(#session{subscriptions = Subs, %% Client -> Broker: UNSUBSCRIBE %%-------------------------------------------------------------------- --spec(unsubscribe(emqx_types:clientinfo(), emqx_types:topic(), session()) +-spec(unsubscribe(emqx_types:clientinfo(), emqx_types:topic(), emqx_types:subopts(), session()) -> {ok, session()} | {error, emqx_types:reason_code()}). -unsubscribe(ClientInfo, TopicFilter, Session = #session{subscriptions = Subs}) -> +unsubscribe(ClientInfo, TopicFilter, UnSubOpts, Session = #session{subscriptions = Subs}) -> case maps:find(TopicFilter, Subs) of {ok, SubOpts} -> ok = emqx_broker:unsubscribe(TopicFilter), - ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, TopicFilter, SubOpts]), + ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, TopicFilter, maps:merge(SubOpts, UnSubOpts)]), {ok, Session#session{subscriptions = maps:remove(TopicFilter, Subs)}}; error -> {error, ?RC_NO_SUBSCRIPTION_EXISTED} @@ -523,7 +523,8 @@ enrich_subopts([{rap, 0}|Opts], Msg = #message{headers = #{retained := true}}, S enrich_subopts([{rap, 0}|Opts], Msg, Session) -> enrich_subopts(Opts, emqx_message:set_flag(retain, false, Msg), Session); enrich_subopts([{subid, SubId}|Opts], Msg, Session) -> - Msg1 = emqx_message:set_header('Subscription-Identifier', SubId, Msg), + Props = emqx_message:get_header(properties, Msg, #{}), + Msg1 = emqx_message:set_header(properties, Props#{'Subscription-Identifier' => SubId}, Msg), enrich_subopts(Opts, Msg1, Session). %%-------------------------------------------------------------------- diff --git a/src/emqx_types.erl b/src/emqx_types.erl index 31f9ad65c..2475ad88b 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -63,6 +63,9 @@ -export_type([ payload/0 , message/0 + , flag/0 + , flags/0 + , headers/0 ]). -export_type([ deliver/0 @@ -179,6 +182,9 @@ -type(subscriber() :: {pid(), subid()}). -type(payload() :: binary() | iodata()). -type(message() :: #message{}). +-type(flag() :: atom()). +-type(flags() :: #{flag() := boolean()}). +-type(headers() :: map()). -type(banned() :: #banned{}). -type(deliver() :: {deliver, topic(), message()}). -type(delivery() :: #delivery{}). diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index 086ff557e..2d8465b26 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -279,7 +279,7 @@ t_handle_in_subscribe(_) -> t_handle_in_unsubscribe(_) -> ok = meck:expect(emqx_session, unsubscribe, - fun(_, _, Session) -> + fun(_, _, _, Session) -> {ok, Session} end), Channel = channel(#{conn_state => connected}), @@ -345,12 +345,12 @@ t_process_publish_qos1(_) -> t_process_subscribe(_) -> ok = meck:expect(emqx_session, subscribe, fun(_, _, _, Session) -> {ok, Session} end), TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}], - {[?RC_SUCCESS], _Channel} = emqx_channel:process_subscribe(TopicFilters, channel()). + {[?RC_SUCCESS], _Channel} = emqx_channel:process_subscribe(TopicFilters, #{}, channel()). t_process_unsubscribe(_) -> - ok = meck:expect(emqx_session, unsubscribe, fun(_, _, Session) -> {ok, Session} end), + ok = meck:expect(emqx_session, unsubscribe, fun(_, _, _, Session) -> {ok, Session} end), TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}], - {[?RC_SUCCESS], _Channel} = emqx_channel:process_unsubscribe(TopicFilters, channel()). + {[?RC_SUCCESS], _Channel} = emqx_channel:process_unsubscribe(TopicFilters, #{}, channel()). %%-------------------------------------------------------------------- %% Test cases for handle_deliver @@ -465,7 +465,7 @@ t_handle_info_subscribe(_) -> {ok, _Chan} = emqx_channel:handle_info({subscribe, topic_filters()}, channel()). t_handle_info_unsubscribe(_) -> - ok = meck:expect(emqx_session, unsubscribe, fun(_, _, Session) -> {ok, Session} end), + ok = meck:expect(emqx_session, unsubscribe, fun(_, _, _, Session) -> {ok, Session} end), {ok, _Chan} = emqx_channel:handle_info({unsubscribe, topic_filters()}, channel()). t_handle_info_sock_closed(_) -> @@ -541,7 +541,7 @@ t_packing_alias(_) -> ?assertEqual(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias' => 1}}}, RePacket2), {RePacket3, _} = emqx_channel:packing_alias(Packet2, NChannel2), - ?assertEqual(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"y">>, properties = undefined}}, RePacket3), + ?assertEqual(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"y">>, properties = #{}}}, RePacket3), ?assertMatch({#mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"z">>}}, _}, emqx_channel:packing_alias(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"z">>}}, channel())). @@ -644,7 +644,7 @@ connpkt() -> is_bridge = false, clean_start = true, keepalive = 30, - properties = undefined, + properties = #{}, clientid = <<"clientid">>, username = <<"username">>, password = <<"passwd">> diff --git a/test/emqx_frame_SUITE.erl b/test/emqx_frame_SUITE.erl index 96e744a5d..acec61faa 100644 --- a/test/emqx_frame_SUITE.erl +++ b/test/emqx_frame_SUITE.erl @@ -141,10 +141,6 @@ prop_serialize_parse_connect() -> ?FORALL(Opts = #{version := ProtoVer}, parse_opts(), begin ProtoName = proplists:get_value(ProtoVer, ?PROTOCOL_NAMES), - DefaultProps = if ProtoVer == ?MQTT_PROTO_V5 -> - #{}; - true -> undefined - end, Packet = ?CONNECT_PACKET(#mqtt_packet_connect{ proto_name = ProtoName, proto_ver = ProtoVer, @@ -153,10 +149,10 @@ prop_serialize_parse_connect() -> will_flag = true, will_retain = true, will_topic = <<"will">>, - will_props = DefaultProps, + will_props = #{}, will_payload = <<"bye">>, clean_start = true, - properties = DefaultProps + properties = #{} }), ok == ?assertEqual(Packet, parse_serialize(Packet, Opts)) end). diff --git a/test/emqx_listeners_SUITE.erl b/test/emqx_listeners_SUITE.erl index 3bf53e683..1472cc072 100644 --- a/test/emqx_listeners_SUITE.erl +++ b/test/emqx_listeners_SUITE.erl @@ -48,7 +48,7 @@ t_restart_listeners(_) -> ok = emqx_listeners:stop(). render_config_file() -> - Path = local_path(["etc", "emqx.conf"]), + Path = local_path(["..", "..", "..", "..", "etc", "emqx.conf"]), {ok, Temp} = file:read_file(Path), Vars0 = mustache_vars(), Vars = [{atom_to_list(N), iolist_to_binary(V)} || {N, V} <- Vars0], diff --git a/test/emqx_message_SUITE.erl b/test/emqx_message_SUITE.erl index ecac9366f..2eebdc1dc 100644 --- a/test/emqx_message_SUITE.erl +++ b/test/emqx_message_SUITE.erl @@ -86,7 +86,7 @@ t_clean_dup(_) -> ?assertNot(emqx_message:get_flag(dup, Msg2)). t_get_set_flags(_) -> - Msg = #message{id = <<"id">>, qos = ?QOS_1, flags = undefined}, + Msg = #message{id = <<"id">>, qos = ?QOS_1}, Msg1 = emqx_message:set_flags(#{retain => true}, Msg), ?assertEqual(#{retain => true}, emqx_message:get_flags(Msg1)), Msg2 = emqx_message:set_flags(#{dup => true}, Msg1), @@ -109,7 +109,7 @@ t_get_set_flag(_) -> Msg6 = emqx_message:set_flags(#{dup => true, retain => true}, Msg5), ?assert(emqx_message:get_flag(dup, Msg6)), ?assert(emqx_message:get_flag(retain, Msg6)), - Msg7 = #message{id = <<"id">>, qos = ?QOS_1, flags = undefined}, + Msg7 = #message{id = <<"id">>, qos = ?QOS_1}, Msg8 = emqx_message:set_flag(retain, Msg7), Msg9 = emqx_message:set_flag(retain, true, Msg7), ?assertEqual(#{retain => true}, emqx_message:get_flags(Msg8)), @@ -135,7 +135,7 @@ t_get_set_header(_) -> ?assertEqual(#{b => 2, c => 3}, emqx_message:get_headers(Msg4)). t_undefined_headers(_) -> - Msg = #message{id = <<"id">>, qos = ?QOS_0, headers = undefined}, + Msg = #message{id = <<"id">>, qos = ?QOS_0}, Msg1 = emqx_message:set_headers(#{a => 1, b => 2}, Msg), ?assertEqual(1, emqx_message:get_header(a, Msg1)), Msg2 = emqx_message:set_header(c, 3, Msg), @@ -144,14 +144,14 @@ t_undefined_headers(_) -> t_format(_) -> Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), io:format("~s~n", [emqx_message:format(Msg)]), - Msg1 = emqx_message:set_header('Subscription-Identifier', 1, + Msg1 = emqx_message:set_header(properties, #{'Subscription-Identifier' => 1}, emqx_message:set_flag(dup, Msg)), io:format("~s~n", [emqx_message:format(Msg1)]). t_is_expired(_) -> Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), ?assertNot(emqx_message:is_expired(Msg)), - Msg1 = emqx_message:set_headers(#{'Message-Expiry-Interval' => 1}, Msg), + Msg1 = emqx_message:set_headers(#{properties => #{'Message-Expiry-Interval' => 1}}, Msg), timer:sleep(500), ?assertNot(emqx_message:is_expired(Msg1)), timer:sleep(600), @@ -159,7 +159,8 @@ t_is_expired(_) -> timer:sleep(1000), Msg = emqx_message:update_expiry(Msg), Msg2 = emqx_message:update_expiry(Msg1), - ?assertEqual(1, emqx_message:get_header('Message-Expiry-Interval', Msg2)). + Props = emqx_message:get_header(properties, Msg2), + ?assertEqual(1, maps:get('Message-Expiry-Interval', Props)). % t_to_list(_) -> % error('TODO'). @@ -172,7 +173,7 @@ t_to_packet(_) -> }, variable = #mqtt_packet_publish{topic_name = <<"topic">>, packet_id = 10, - properties = undefined + properties = #{} }, payload = <<"payload">> }, @@ -193,7 +194,7 @@ t_to_packet_with_props(_) -> payload = <<"payload">> }, Msg = emqx_message:make(<<"clientid">>, ?QOS_0, <<"topic">>, <<"payload">>), - Msg1 = emqx_message:set_header('Subscription-Identifier', 1, Msg), + Msg1 = emqx_message:set_header(properties, #{'Subscription-Identifier' => 1}, Msg), ?assertEqual(Pkt, emqx_message:to_packet(10, Msg1)). t_to_map(_) -> @@ -201,8 +202,8 @@ t_to_map(_) -> List = [{id, emqx_message:id(Msg)}, {qos, ?QOS_1}, {from, <<"clientid">>}, - {flags, undefined}, - {headers, undefined}, + {flags, #{}}, + {headers, #{}}, {topic, <<"topic">>}, {payload, <<"payload">>}, {timestamp, emqx_message:timestamp(Msg)}], diff --git a/test/emqx_packet_SUITE.erl b/test/emqx_packet_SUITE.erl index bada2d85a..00ab6e0c5 100644 --- a/test/emqx_packet_SUITE.erl +++ b/test/emqx_packet_SUITE.erl @@ -85,7 +85,6 @@ t_connect_info(_) -> will_retain = true, will_qos = ?QOS_2, will_topic = <<"topic">>, - will_props = undefined, will_payload = <<"payload">> }, ?assertEqual(<<"MQTT">>, emqx_packet:info(proto_name, ConnPkt)), @@ -96,9 +95,9 @@ t_connect_info(_) -> ?assertEqual(?QOS_2, emqx_packet:info(will_qos, ConnPkt)), ?assertEqual(true, emqx_packet:info(will_retain, ConnPkt)), ?assertEqual(0, emqx_packet:info(keepalive, ConnPkt)), - ?assertEqual(undefined, emqx_packet:info(properties, ConnPkt)), + ?assertEqual(#{}, emqx_packet:info(properties, ConnPkt)), ?assertEqual(<<"clientid">>, emqx_packet:info(clientid, ConnPkt)), - ?assertEqual(undefined, emqx_packet:info(will_props, ConnPkt)), + ?assertEqual(#{}, emqx_packet:info(will_props, ConnPkt)), ?assertEqual(<<"topic">>, emqx_packet:info(will_topic, ConnPkt)), ?assertEqual(<<"payload">>, emqx_packet:info(will_payload, ConnPkt)), ?assertEqual(<<"username">>, emqx_packet:info(username, ConnPkt)), @@ -108,54 +107,54 @@ t_connack_info(_) -> AckPkt = #mqtt_packet_connack{ack_flags = 0, reason_code = 0}, ?assertEqual(0, emqx_packet:info(ack_flags, AckPkt)), ?assertEqual(0, emqx_packet:info(reason_code, AckPkt)), - ?assertEqual(undefined, emqx_packet:info(properties, AckPkt)). + ?assertEqual(#{}, emqx_packet:info(properties, AckPkt)). t_publish_info(_) -> PubPkt = #mqtt_packet_publish{topic_name = <<"t">>, packet_id = 1}, ?assertEqual(1, emqx_packet:info(packet_id, PubPkt)), ?assertEqual(<<"t">>, emqx_packet:info(topic_name, PubPkt)), - ?assertEqual(undefined, emqx_packet:info(properties, PubPkt)). + ?assertEqual(#{}, emqx_packet:info(properties, PubPkt)). t_puback_info(_) -> AckPkt = #mqtt_packet_puback{packet_id = 1, reason_code = 0}, ?assertEqual(1, emqx_packet:info(packet_id, AckPkt)), ?assertEqual(0, emqx_packet:info(reason_code, AckPkt)), - ?assertEqual(undefined, emqx_packet:info(properties, AckPkt)). + ?assertEqual(#{}, emqx_packet:info(properties, AckPkt)). t_subscribe_info(_) -> TopicFilters = [{<<"t/#">>, #{}}], SubPkt = #mqtt_packet_subscribe{packet_id = 1, topic_filters = TopicFilters}, ?assertEqual(1, emqx_packet:info(packet_id, SubPkt)), - ?assertEqual(undefined, emqx_packet:info(properties, SubPkt)), + ?assertEqual(#{}, emqx_packet:info(properties, SubPkt)), ?assertEqual(TopicFilters, emqx_packet:info(topic_filters, SubPkt)). t_suback_info(_) -> SubackPkt = #mqtt_packet_suback{packet_id = 1, reason_codes = [0]}, ?assertEqual(1, emqx_packet:info(packet_id, SubackPkt)), - ?assertEqual(undefined, emqx_packet:info(properties, SubackPkt)), + ?assertEqual(#{}, emqx_packet:info(properties, SubackPkt)), ?assertEqual([0], emqx_packet:info(reason_codes, SubackPkt)). t_unsubscribe_info(_) -> UnsubPkt = #mqtt_packet_unsubscribe{packet_id = 1, topic_filters = [<<"t/#">>]}, ?assertEqual(1, emqx_packet:info(packet_id, UnsubPkt)), - ?assertEqual(undefined, emqx_packet:info(properties, UnsubPkt)), + ?assertEqual(#{}, emqx_packet:info(properties, UnsubPkt)), ?assertEqual([<<"t/#">>], emqx_packet:info(topic_filters, UnsubPkt)). t_unsuback_info(_) -> AckPkt = #mqtt_packet_unsuback{packet_id = 1, reason_codes = [0]}, ?assertEqual(1, emqx_packet:info(packet_id, AckPkt)), ?assertEqual([0], emqx_packet:info(reason_codes, AckPkt)), - ?assertEqual(undefined, emqx_packet:info(properties, AckPkt)). + ?assertEqual(#{}, emqx_packet:info(properties, AckPkt)). t_disconnect_info(_) -> DisconnPkt = #mqtt_packet_disconnect{reason_code = 0}, ?assertEqual(0, emqx_packet:info(reason_code, DisconnPkt)), - ?assertEqual(undefined, emqx_packet:info(properties, DisconnPkt)). + ?assertEqual(#{}, emqx_packet:info(properties, DisconnPkt)). t_auth_info(_) -> AuthPkt = #mqtt_packet_auth{reason_code = 0}, ?assertEqual(0, emqx_packet:info(reason_code, AuthPkt)), - ?assertEqual(undefined, emqx_packet:info(properties, AuthPkt)). + ?assertEqual(#{}, emqx_packet:info(properties, AuthPkt)). t_set_props(_) -> Pkts = [#mqtt_packet_connect{}, #mqtt_packet_connack{}, #mqtt_packet_publish{}, @@ -245,6 +244,7 @@ t_from_to_message(_) -> ExpectedMsg1 = emqx_message:set_flags(#{dup => false, retain => false}, ExpectedMsg), ExpectedMsg2 = emqx_message:set_headers(#{peerhost => {127,0,0,1}, protocol => mqtt, + properties => #{}, username => <<"test">> }, ExpectedMsg1), Pkt = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, @@ -255,10 +255,10 @@ t_from_to_message(_) -> packet_id = 10, properties = #{}}, payload = <<"payload">>}, - MsgFromPkt = emqx_packet:to_message(#{protocol => mqtt, - clientid => <<"clientid">>, - username => <<"test">>, - peerhost => {127,0,0,1}}, Pkt), + MsgFromPkt = emqx_packet:to_message(Pkt, <<"clientid">>, + #{protocol => mqtt, + username => <<"test">>, + peerhost => {127,0,0,1}}), ?assertEqual(ExpectedMsg2, MsgFromPkt#message{id = emqx_message:id(ExpectedMsg), timestamp = emqx_message:timestamp(ExpectedMsg) }). @@ -283,7 +283,6 @@ t_will_msg(_) -> will_retain = true, will_qos = ?QOS_2, will_topic = <<"topic">>, - will_props = undefined, will_payload = <<"payload">> }, Msg2 = emqx_packet:will_msg(Pkt2), @@ -297,7 +296,6 @@ t_format(_) -> will_retain = true, will_qos = ?QOS_2, will_topic = <<"topic">>, - will_props = undefined, will_payload = <<"payload">>}))]), io:format("~s", [emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{password = password}))]), io:format("~s", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]), diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index d8710eb58..5f3461ca5 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -116,9 +116,9 @@ t_is_subscriptions_full_true(_) -> t_unsubscribe(_) -> ok = meck:expect(emqx_broker, unsubscribe, fun(_) -> ok end), Session = session(#{subscriptions => #{<<"#">> => subopts()}}), - {ok, Session1} = emqx_session:unsubscribe(clientinfo(), <<"#">>, Session), + {ok, Session1} = emqx_session:unsubscribe(clientinfo(), <<"#">>, #{}, Session), {error, ?RC_NO_SUBSCRIPTION_EXISTED} = - emqx_session:unsubscribe(clientinfo(), <<"#">>, Session1). + emqx_session:unsubscribe(clientinfo(), <<"#">>, #{}, Session1). t_publish_qos0(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), diff --git a/test/emqx_ws_connection_SUITE.erl b/test/emqx_ws_connection_SUITE.erl index 09c9d5dd7..eccdcd677 100644 --- a/test/emqx_ws_connection_SUITE.erl +++ b/test/emqx_ws_connection_SUITE.erl @@ -304,7 +304,7 @@ t_parse_incoming(_) -> St = ?ws_conn:parse_incoming(<<48,3>>, st()), St1 = ?ws_conn:parse_incoming(<<0,1,116>>, St), Packet = ?PUBLISH_PACKET(?QOS_0, <<"t">>, undefined, <<>>), - [{incoming, Packet}] = ?ws_conn:info(postponed, St1). + ?assertMatch([{incoming, Packet}], ?ws_conn:info(postponed, St1)). t_parse_incoming_frame_error(_) -> St = ?ws_conn:parse_incoming(<<3,2,1,0>>, st()),