Merge pull request #1788 from tigercl/emqx30

Support retain as published in subscription options
This commit is contained in:
turtleDeng 2018-09-07 21:43:56 +08:00 committed by GitHub
commit 5ca61dd45c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 77 additions and 58 deletions

View File

@ -76,6 +76,9 @@ parse_remaining_len(_Bin, _Header, _Multiplier, Length,
error(mqtt_frame_too_large);
parse_remaining_len(<<>>, Header, Multiplier, Length, Options) ->
{more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Options) end};
%% Match DISCONNECT without payload
parse_remaining_len(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, _Options) ->
wrap(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}, Rest);
%% Match PINGREQ.
parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) ->
parse_frame(Rest, Header, 0, Options);
@ -233,8 +236,8 @@ parse_will_message(Packet = #mqtt_packet_connect{will_flag = true,
parse_will_message(Packet, Bin) ->
{Packet, Bin}.
protocol_approved(Ver, Name) ->
lists:member({Ver, Name}, ?PROTOCOL_NAMES).
% protocol_approved(Ver, Name) ->
% lists:member({Ver, Name}, ?PROTOCOL_NAMES).
parse_packet_id(<<PacketId:16/big, Rest/binary>>) ->
{PacketId, Rest}.

View File

@ -34,32 +34,33 @@
%% Packets sent and received of broker
-define(PACKET_METRICS, [
{counter, 'packets/received'}, % All Packets received
{counter, 'packets/sent'}, % All Packets sent
{counter, 'packets/connect'}, % CONNECT Packets received
{counter, 'packets/connack'}, % CONNACK Packets sent
{counter, 'packets/publish/received'}, % PUBLISH packets received
{counter, 'packets/publish/sent'}, % PUBLISH packets sent
{counter, 'packets/puback/received'}, % PUBACK packets received
{counter, 'packets/puback/sent'}, % PUBACK packets sent
{counter, 'packets/puback/missed'}, % PUBACK packets missed
{counter, 'packets/pubrec/received'}, % PUBREC packets received
{counter, 'packets/pubrec/sent'}, % PUBREC packets sent
{counter, 'packets/pubrec/missed'}, % PUBREC packets missed
{counter, 'packets/pubrel/received'}, % PUBREL packets received
{counter, 'packets/pubrel/sent'}, % PUBREL packets sent
{counter, 'packets/pubrel/missed'}, % PUBREL packets missed
{counter, 'packets/pubcomp/received'}, % PUBCOMP packets received
{counter, 'packets/pubcomp/sent'}, % PUBCOMP packets sent
{counter, 'packets/pubcomp/missed'}, % PUBCOMP packets missed
{counter, 'packets/subscribe'}, % SUBSCRIBE Packets received
{counter, 'packets/suback'}, % SUBACK packets sent
{counter, 'packets/unsubscribe'}, % UNSUBSCRIBE Packets received
{counter, 'packets/unsuback'}, % UNSUBACK Packets sent
{counter, 'packets/pingreq'}, % PINGREQ packets received
{counter, 'packets/pingresp'}, % PINGRESP Packets sent
{counter, 'packets/disconnect'}, % DISCONNECT Packets received
{counter, 'packets/auth'} % Auth Packets received
{counter, 'packets/received'}, % All Packets received
{counter, 'packets/sent'}, % All Packets sent
{counter, 'packets/connect'}, % CONNECT Packets received
{counter, 'packets/connack'}, % CONNACK Packets sent
{counter, 'packets/publish/received'}, % PUBLISH packets received
{counter, 'packets/publish/sent'}, % PUBLISH packets sent
{counter, 'packets/puback/received'}, % PUBACK packets received
{counter, 'packets/puback/sent'}, % PUBACK packets sent
{counter, 'packets/puback/missed'}, % PUBACK packets missed
{counter, 'packets/pubrec/received'}, % PUBREC packets received
{counter, 'packets/pubrec/sent'}, % PUBREC packets sent
{counter, 'packets/pubrec/missed'}, % PUBREC packets missed
{counter, 'packets/pubrel/received'}, % PUBREL packets received
{counter, 'packets/pubrel/sent'}, % PUBREL packets sent
{counter, 'packets/pubrel/missed'}, % PUBREL packets missed
{counter, 'packets/pubcomp/received'}, % PUBCOMP packets received
{counter, 'packets/pubcomp/sent'}, % PUBCOMP packets sent
{counter, 'packets/pubcomp/missed'}, % PUBCOMP packets missed
{counter, 'packets/subscribe'}, % SUBSCRIBE Packets received
{counter, 'packets/suback'}, % SUBACK packets sent
{counter, 'packets/unsubscribe'}, % UNSUBSCRIBE Packets received
{counter, 'packets/unsuback'}, % UNSUBACK Packets sent
{counter, 'packets/pingreq'}, % PINGREQ packets received
{counter, 'packets/pingresp'}, % PINGRESP Packets sent
{counter, 'packets/disconnect/received'}, % DISCONNECT Packets received
{counter, 'packets/disconnect/sent'}, % DISCONNECT Packets sent
{counter, 'packets/auth'} % Auth Packets received
]).
%% Messages sent and received of broker
@ -194,7 +195,7 @@ received2(?UNSUBSCRIBE) ->
received2(?PINGREQ) ->
inc('packets/pingreq');
received2(?DISCONNECT) ->
inc('packets/disconnect');
inc('packets/disconnect/received');
received2(_) ->
ignore.
qos_received(?QOS_0) ->
@ -233,6 +234,8 @@ sent2(?UNSUBACK) ->
inc('packets/unsuback');
sent2(?PINGRESP) ->
inc('packets/pingresp');
sent2(?DISCONNECT) ->
inc('packets/disconnect/sent');
sent2(_Type) ->
ignore.
qos_sent(?QOS_0) ->

View File

@ -32,8 +32,8 @@
-type(reason_code() :: 0..16#FF).
-type(packet_id() :: 1..16#FFFF).
-type(properties() :: #{atom() => term()}).
-type(subopts() :: #{rh := 0 | 1,
rap := 0 | 1 | 2,
-type(subopts() :: #{rh := 0 | 1 | 2,
rap := 0 | 1,
nl := 0 | 1,
qos := qos(),
rc => reason_code()

View File

@ -208,6 +208,9 @@ received(Packet = ?PACKET(Type), PState) ->
true ->
{Packet1, PState1} = preprocess_properties(Packet, PState),
process_packet(Packet1, inc_stats(recv, Type, PState1));
{'EXIT', {topic_filters_invalid, _Stacktrace}} ->
deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState),
{error, topic_filters_invalid, PState};
{'EXIT', {Reason, _Stacktrace}} ->
deliver({disconnect, ?RC_MALFORMED_PACKET}, PState),
{error, Reason, PState}
@ -356,9 +359,17 @@ process_packet(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session =
{ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), PState};
process_packet(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
PState = #pstate{session = SPid, mountpoint = Mountpoint}) ->
PState = #pstate{session = SPid, mountpoint = Mountpoint, proto_ver = ProtoVer, is_bridge = IsBridge}) ->
RawTopicFilters1 = if ProtoVer < ?MQTT_PROTO_V5 ->
case IsBridge of
true -> [{RawTopic, SubOpts#{rap => 1}} || {RawTopic, SubOpts} <- RawTopicFilters];
false -> [{RawTopic, SubOpts#{rap => 0}} || {RawTopic, SubOpts} <- RawTopicFilters]
end;
true ->
RawTopicFilters
end,
case check_subscribe(
parse_topic_filters(?SUBSCRIBE, RawTopicFilters), PState) of
parse_topic_filters(?SUBSCRIBE, RawTopicFilters1), PState) of
{ok, TopicFilters} ->
case emqx_hooks:run('client.subscribe', [credentials(PState)], TopicFilters) of
{ok, TopicFilters1} ->
@ -396,9 +407,11 @@ process_packet(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
process_packet(?PACKET(?PINGREQ), PState) ->
send(?PACKET(?PINGRESP), PState);
process_packet(?PACKET(?DISCONNECT), PState) ->
process_packet(?DISCONNECT_PACKET(?RC_SUCCESS), PState) ->
%% Clean willmsg
{stop, normal, PState#pstate{will_msg = undefined}}.
{stop, normal, PState#pstate{will_msg = undefined}};
process_packet(?DISCONNECT_PACKET(_), PState) ->
{stop, normal, PState}.
%%------------------------------------------------------------------------------
%% ConnAck --> Client
@ -485,10 +498,10 @@ deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone,
deliver({connack, ReasonCode, SP}, PState) ->
send(?CONNACK_PACKET(ReasonCode, SP), PState);
deliver({publish, PacketId, Msg}, PState = #pstate{is_bridge = IsBridge, mountpoint = MountPoint}) ->
deliver({publish, PacketId, Msg}, PState = #pstate{mountpoint = MountPoint}) ->
_ = emqx_hooks:run('message.delivered', [credentials(PState)], Msg),
Msg1 = emqx_message:update_expiry(Msg),
Msg2 = emqx_mountpoint:unmount(MountPoint, clean_retain(IsBridge, Msg1)),
Msg2 = emqx_mountpoint:unmount(MountPoint, Msg1),
send(emqx_packet:from_message(PacketId, Msg2), PState);
deliver({puback, PacketId, ReasonCode}, PState) ->
@ -736,18 +749,6 @@ parse_topic_filters(?SUBSCRIBE, RawTopicFilters) ->
parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters) ->
lists:map(fun emqx_topic:parse/1, RawTopicFilters).
%%-----------------------------------------------------------------------------
%% The retained flag should be propagated for bridge.
%%-----------------------------------------------------------------------------
clean_retain(false, Msg = #message{flags = #{retain := true}, headers = Headers}) ->
case maps:get(retained, Headers, false) of
true -> Msg;
false -> emqx_message:set_flag(retain, false, Msg)
end;
clean_retain(_IsBridge, Msg) ->
Msg.
%%------------------------------------------------------------------------------
%% Update mountpoint

View File

@ -448,11 +448,11 @@ handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}},
{ok, _SubOpts} ->
emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts),
%% Why???
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]),
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts#{first => false}]),
maps:put(Topic, SubOpts, SubMap);
error ->
emqx_broker:subscribe(Topic, ClientId, SubOpts),
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]),
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts#{first => true}]),
maps:put(Topic, SubOpts, SubMap)
end}
end, {[], Subscriptions}, TopicFilters),
@ -548,10 +548,10 @@ handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) ->
%% Dispatch message
handle_info({dispatch, Topic, Msg}, State = #state{subscriptions = SubMap}) when is_record(Msg, message) ->
noreply(case maps:find(Topic, SubMap) of
{ok, #{nl := Nl, qos := QoS, subid := SubId}} ->
run_dispatch_steps([{nl, Nl},{qos, QoS}, {subid, SubId}], Msg, State);
{ok, #{nl := Nl, qos := QoS}} ->
run_dispatch_steps([{nl, Nl},{qos, QoS}], Msg, State);
{ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} ->
run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}, {subid, SubId}], Msg, State);
{ok, #{nl := Nl, qos := QoS, rap := Rap}} ->
run_dispatch_steps([{nl, Nl}, {qos, QoS}, {rap, Rap}], Msg, State);
error ->
dispatch(emqx_message:unset_flag(dup, Msg), State)
end);
@ -726,6 +726,11 @@ run_dispatch_steps([{qos, SubQoS}|Steps], Msg = #message{qos = PubQoS}, State =
run_dispatch_steps(Steps, Msg#message{qos = min(SubQoS, PubQoS)}, State);
run_dispatch_steps([{qos, SubQoS}|Steps], Msg = #message{qos = PubQoS}, State = #state{upgrade_qos = true}) ->
run_dispatch_steps(Steps, Msg#message{qos = max(SubQoS, PubQoS)}, State);
run_dispatch_steps([{rap, 0}|Steps], Msg = #message{flags = Flags}, State = #state{}) ->
Flags1 = maps:put(retain, false, Flags),
run_dispatch_steps(Steps, Msg#message{flags = Flags1}, State);
run_dispatch_steps([{rap, _}|Steps], Msg, State) ->
run_dispatch_steps(Steps, Msg, State);
run_dispatch_steps([{subid, SubId}|Steps], Msg, State) ->
run_dispatch_steps(Steps, emqx_message:set_header('Subscription-Identifier', SubId, Msg), State).

View File

@ -184,9 +184,16 @@ parse(Topic = <<"$share/", _/binary>>, #{share := _Group}) ->
error({invalid_topic, Topic});
parse(<<"$queue/", Topic1/binary>>, Options) ->
parse(Topic1, maps:put(share, <<"$queue">>, Options));
parse(<<"$share/", Topic1/binary>>, Options) ->
[Group, Topic2] = binary:split(Topic1, <<"/">>),
{Topic2, maps:put(share, Group, Options)};
parse(Topic = <<"$share/", Topic1/binary>>, Options) ->
case binary:split(Topic1, <<"/">>) of
[<<>>] -> error({invalid_topic, Topic});
[_] -> error({invalid_topic, Topic});
[Group, Topic2] ->
case binary:match(Group, [<<"/">>, <<"+">>, <<"#">>]) of
nomatch -> {Topic2, maps:put(share, Group, Options)};
_ -> error({invalid_topic, Topic})
end
end;
parse(Topic, Options) ->
{Topic, Options}.