Upgrade the publish sequence of QoS1/2 messages

This commit is contained in:
Feng Lee 2018-08-29 02:53:22 +08:00
parent 1cf4532947
commit 594819b752
7 changed files with 300 additions and 347 deletions

61
TODO
View File

@ -1,61 +0,0 @@
## MQTT 5.0
1. Topic Alias
2. Subscriber ID
3. Session ensure stats
4. Message Expiration
## Connection
## WebSocket
## Listeners
## Protocol
1. Global ACL cache with limited age and size?
2. Whether to enable ACL for each zone?
## Session
## Bridges
Config
CLI
Remote Bridge
replay queue
## Access Control
Global ACL Cache
Add ACL cache emqx_access_control module
## Zone
## Hooks
The hooks design...
## MQueue
Bound Queue
LastValue Queue
Priority Queue
## Supervisor tree
KernelSup
## Managment
## Dashboard
## Testcases
1. Update the README.md
2. Update the Documentation
3. Shared subscription and dispatch strategy
4. Remove lager syslog:
dep_lager_syslog = git https://github.com/basho/lager_syslog

View File

@ -84,10 +84,8 @@ subscribe(Topic, Subscriber, Options) when is_tuple(Subscriber)->
{SubPid, SubId} = Subscriber,
emqx_broker:subscribe(iolist_to_binary(Topic), SubPid, SubId, Options).
%% @doc Publish Message
-spec(publish(message()) -> {ok, delivery()} | {error, term()}).
publish(Msg) ->
emqx_broker:publish(Msg).
-spec(publish(message()) -> {ok, emqx_types:dispatches()}).
publish(Msg) -> emqx_broker:publish(Msg).
-spec(unsubscribe(topic() | string()) -> ok | {error, term()}).
unsubscribe(Topic) ->

View File

@ -143,16 +143,18 @@ multi_unsubscribe(Topics, SubPid, SubId) when is_pid(SubPid), ?is_subid(SubId) -
%% Publish
%%------------------------------------------------------------------------------
-spec(publish(message()) -> delivery()).
-spec(publish(message()) -> {ok, emqx_types:dispatches()}).
publish(Msg) when is_record(Msg, message) ->
_ = emqx_tracer:trace(publish, Msg),
case emqx_hooks:run('message.publish', [], Msg) of
{ok, Msg1 = #message{topic = Topic}} ->
route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1));
{stop, Msg1} ->
emqx_logger:warning("Stop publishing: ~p", [Msg]), delivery(Msg1)
end.
{ok, case emqx_hooks:run('message.publish', [], Msg) of
{ok, Msg1 = #message{topic = Topic}} ->
Delivery = route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)),
Delivery#delivery.flows;
{stop, _} ->
emqx_logger:warning("Stop publishing: ~p", [Msg]), []
end}.
-spec(safe_publish(message()) -> ok).
%% Called internally
safe_publish(Msg) when is_record(Msg, message) ->
try
@ -172,8 +174,8 @@ delivery(Msg) ->
%%------------------------------------------------------------------------------
route([], Delivery = #delivery{message = Msg}) ->
emqx_hooks:run('message.dropped', [undefined, Msg]),
dropped(Msg#message.topic), Delivery;
emqx_hooks:run('message.dropped', [node(), Msg]),
inc_dropped_cnt(Msg#message.topic), Delivery;
route([{To, Node}], Delivery) when Node =:= node() ->
dispatch(To, Delivery);
@ -213,8 +215,8 @@ forward(Node, To, Delivery) ->
dispatch(Topic, Delivery = #delivery{message = Msg, flows = Flows}) ->
case subscribers(Topic) of
[] ->
emqx_hooks:run('message.dropped', [undefined, Msg]),
dropped(Topic), Delivery;
emqx_hooks:run('message.dropped', [node(), Msg]),
inc_dropped_cnt(Topic), Delivery;
[Sub] -> %% optimize?
dispatch(Sub, Topic, Msg),
Delivery#delivery{flows = [{dispatch, Topic, 1}|Flows]};
@ -230,9 +232,9 @@ dispatch({SubPid, _SubId}, Topic, Msg) when is_pid(SubPid) ->
dispatch({share, _Group, _Sub}, _Topic, _Msg) ->
ignored.
dropped(<<"$SYS/", _/binary>>) ->
inc_dropped_cnt(<<"$SYS/", _/binary>>) ->
ok;
dropped(_Topic) ->
inc_dropped_cnt(_Topic) ->
emqx_metrics:inc('messages/dropped').
-spec(subscribers(topic()) -> [subscriber()]).

View File

@ -37,30 +37,40 @@ protocol_name(?MQTT_PROTO_V5) ->
type_name(Type) when Type > ?RESERVED andalso Type =< ?AUTH ->
lists:nth(Type, ?TYPE_NAMES).
%%------------------------------------------------------------------------------
%% Validate MQTT Packet
%%------------------------------------------------------------------------------
validate(?SUBSCRIBE_PACKET(_PacketId, _Properties, [])) ->
error(packet_empty_topic_filters);
error(topic_filters_invalid);
validate(?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters)) ->
validate_packet_id(PacketId)
andalso validate_properties(?SUBSCRIBE, Properties)
andalso ok == lists:foreach(fun validate_subscription/1, TopicFilters);
validate(?UNSUBSCRIBE_PACKET(_PacketId, [])) ->
error(packet_empty_topic_filters);
error(topic_filters_invalid);
validate(?UNSUBSCRIBE_PACKET(PacketId, TopicFilters)) ->
validate_packet_id(PacketId)
andalso ok == lists:foreach(fun emqx_topic:validate/1, TopicFilters);
validate(?PUBLISH_PACKET(_QoS, <<>>, _, _)) ->
error(topic_name_invalid);
validate(?PUBLISH_PACKET(_QoS, Topic, _, _)) ->
emqx_topic:wildcard(Topic) orelse error(topic_name_invalid);
validate(_Packet) ->
true.
validate_packet_id(0) ->
error(bad_packet_id);
error(packet_id_invalid);
validate_packet_id(_) ->
true.
validate_properties(?SUBSCRIBE, #{'Subscription-Identifier' := 0}) ->
error(bad_subscription_identifier);
validate_properties(?SUBSCRIBE, _) ->
validate_properties(?SUBSCRIBE, #{'Subscription-Identifier' := I})
when I =< 0; I >= 16#FFFFFFF ->
error(subscription_identifier_invalid);
validate_properties(_, _) ->
true.
validate_subscription({Topic, #{qos := QoS}}) ->
@ -85,30 +95,35 @@ from_message(PacketId, Msg = #message{qos = QoS, topic = Topic, payload = Payloa
payload = Payload}.
%% @doc Message from Packet
-spec(to_message(client_id(), mqtt_packet()) -> message()).
to_message(ClientId, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
retain = Retain,
qos = QoS,
dup = Dup},
variable = #mqtt_packet_publish{topic_name = Topic,
properties = Props},
payload = Payload}) ->
-spec(to_message(emqx_types:credentials(), mqtt_packet()) -> message()).
to_message(#{client_id := ClientId, username := Username},
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
retain = Retain,
qos = QoS,
dup = Dup},
variable = #mqtt_packet_publish{topic_name = Topic,
properties = Props},
payload = Payload}) ->
Msg = emqx_message:make(ClientId, QoS, Topic, Payload),
Msg#message{flags = #{dup => Dup, retain => Retain},
headers = if
Props =:= undefined -> #{};
true -> Props
end};
headers = merge_props(#{username => Username}, Props)};
to_message(_ClientId, #mqtt_packet_connect{will_flag = false}) ->
to_message(_Credentials, #mqtt_packet_connect{will_flag = false}) ->
undefined;
to_message(ClientId, #mqtt_packet_connect{will_retain = Retain,
will_qos = QoS,
will_topic = Topic,
will_props = Props,
will_payload = Payload}) ->
to_message(#{client_id := ClientId, username := Username},
#mqtt_packet_connect{will_retain = Retain,
will_qos = QoS,
will_topic = Topic,
will_props = Props,
will_payload = Payload}) ->
Msg = emqx_message:make(ClientId, QoS, Topic, Payload),
Msg#message{flags = #{qos => QoS, retain => Retain}, headers = Props}.
Msg#message{flags = #{dup => false, retain => Retain},
headers = merge_props(#{username => Username}, Props)}.
merge_props(Headers, undefined) ->
Headers;
merge_props(Headers, Props) ->
maps:merge(Headers, Props).
%% @doc Format packet
-spec(format(mqtt_packet()) -> iolist()).

View File

@ -18,11 +18,14 @@
-include("emqx_mqtt.hrl").
-export([init/2, info/1, caps/1, stats/1]).
-export([client_id/1]).
-export([credentials/1]).
-export([client/1, client_id/1]).
-export([session/1]).
-export([parser/1]).
-export([received/2, process/2, deliver/2, send/2]).
-export([session/1]).
-export([received/2]).
-export([process_packet/2]).
-export([deliver/2]).
-export([send/2]).
-export([shutdown/2]).
-record(pstate, {
@ -34,12 +37,13 @@
proto_name,
ackprops,
client_id,
client_pid,
conn_pid,
conn_props,
ack_props,
username,
session,
clean_start,
topic_aliases,
packet_size,
will_msg,
keepalive,
@ -54,9 +58,12 @@
}).
-type(state() :: #pstate{}).
-export_type([state/0]).
-ifdef(TEST).
-compile(export_all).
-endif.
-define(LOG(Level, Format, Args, PState),
emqx_logger:Level([{client, PState#pstate.client_id}], "Client(~s@~s): " ++ Format,
[PState#pstate.client_id, esockd_net:format(PState#pstate.peername) | Args])).
@ -75,10 +82,11 @@ init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options)
proto_ver = ?MQTT_PROTO_V4,
proto_name = <<"MQTT">>,
client_id = <<>>,
client_pid = self(),
conn_pid = self(),
username = init_username(Peercert, Options),
is_super = false,
clean_start = false,
topic_aliases = #{},
packet_size = emqx_zone:get_env(Zone, max_packet_size),
mountpoint = emqx_zone:get_env(Zone, mountpoint),
is_bridge = false,
@ -135,6 +143,9 @@ info(#pstate{zone = Zone,
caps(#pstate{zone = Zone}) ->
emqx_mqtt_caps:get_caps(Zone).
client_id(#pstate{client_id = ClientId}) ->
ClientId.
credentials(#pstate{zone = Zone,
client_id = ClientId,
username = Username,
@ -144,20 +155,6 @@ credentials(#pstate{zone = Zone,
username => Username,
peername => Peername}.
client(#pstate{zone = Zone,
client_id = ClientId,
client_pid = ClientPid,
peername = Peername,
username = Username}) ->
#client{id = ClientId,
pid = ClientPid,
zone = Zone,
peername = Peername,
username = Username}.
client_id(#pstate{client_id = ClientId}) ->
ClientId.
stats(#pstate{recv_stats = #{pkt := RecvPkt, msg := RecvMsg},
send_stats = #{pkt := SendPkt, msg := SendMsg}}) ->
[{recv_pkt, RecvPkt},
@ -177,42 +174,73 @@ parser(#pstate{packet_size = Size, proto_ver = Ver}) ->
-spec(received(mqtt_packet(), state())
-> {ok, state()} | {error, term()} | {error, term(), state()}).
received(?PACKET(Type), PState = #pstate{connected = false})
when Type =/= ?CONNECT ->
received(?PACKET(Type), PState = #pstate{connected = false}) when Type =/= ?CONNECT ->
{error, proto_not_connected, PState};
received(?PACKET(?CONNECT), PState = #pstate{connected = true}) ->
{error, proto_bad_connect, PState};
{error, proto_unexpected_connect, PState};
received(Packet = ?PACKET(Type), PState) ->
trace(recv, Packet, PState),
case catch emqx_packet:validate(Packet) of
true ->
process(Packet, inc_stats(recv, Type, PState));
{'EXIT', {ReasonCode, _Stacktrace}} when is_integer(ReasonCode) ->
deliver({disconnect, ReasonCode}, PState),
{error, protocol_error, PState};
{Packet1, PState1} = preprocess_properties(Packet, PState),
process_packet(Packet1, inc_stats(recv, Type, PState1));
{'EXIT', {Reason, _Stacktrace}} ->
deliver({disconnect, ?RC_MALFORMED_PACKET}, PState),
{error, Reason, PState}
end.
%%------------------------------------------------------------------------------
%% Process Packet
%% Preprocess MQTT Properties
%%------------------------------------------------------------------------------
process(?CONNECT_PACKET(
#mqtt_packet_connect{proto_name = ProtoName,
proto_ver = ProtoVer,
is_bridge = IsBridge,
clean_start = CleanStart,
keepalive = Keepalive,
properties = ConnProps,
client_id = ClientId,
username = Username,
password = Password} = Connect), PState) ->
%% Subscription Identifier
preprocess_properties(Packet = #mqtt_packet{
variable = Subscribe = #mqtt_packet_subscribe{
properties = #{'Subscription-Identifier' := SubId},
topic_filters = TopicFilters
}
},
PState = #pstate{proto_ver = ?MQTT_PROTO_V5}) ->
TopicFilters1 = [{Topic, SubOpts#{subid => SubId}} || {Topic, SubOpts} <- TopicFilters],
{Packet#mqtt_packet{variable = Subscribe#mqtt_packet_subscribe{topic_filters = TopicFilters1}}, PState};
io:format("~p~n", [Connect]),
%% Topic Alias Mapping
preprocess_properties(Packet = #mqtt_packet{
variable = Publish = #mqtt_packet_publish{
topic_name = <<>>,
properties = #{'Topic-Alias' := AliasId}}
},
PState = #pstate{proto_ver = ?MQTT_PROTO_V5, topic_aliases = Aliases}) ->
{Packet#mqtt_packet{variable = Publish#mqtt_packet_publish{
topic_name = maps:get(AliasId, Aliases, <<>>)}}, PState};
preprocess_properties(Packet = #mqtt_packet{
variable = #mqtt_packet_publish{
topic_name = Topic,
properties = #{'Topic-Alias' := AliasId}}
},
PState = #pstate{proto_ver = ?MQTT_PROTO_V5, topic_aliases = Aliases}) ->
{Packet, PState#pstate{topic_aliases = maps:put(AliasId, Topic, Aliases)}};
preprocess_properties(Packet, PState) ->
{Packet, PState}.
%%------------------------------------------------------------------------------
%% Process MQTT Packet
%%------------------------------------------------------------------------------
process_packet(?CONNECT_PACKET(
#mqtt_packet_connect{proto_name = ProtoName,
proto_ver = ProtoVer,
is_bridge = IsBridge,
clean_start = CleanStart,
keepalive = Keepalive,
properties = ConnProps,
client_id = ClientId,
username = Username,
password = Password} = Connect), PState) ->
PState1 = set_username(Username,
PState#pstate{client_id = ClientId,
@ -240,10 +268,8 @@ process(?CONNECT_PACKET(
ok = emqx_cm:register_connection(client_id(PState4), info(PState4)),
%% Start keepalive
start_keepalive(Keepalive, PState4),
%% TODO: 'Run hooks' before open_session?
emqx_hooks:run('client.connected', [?RC_SUCCESS], client(PState4)),
%% Success
{?RC_SUCCESS, SP, replvar(PState4)};
{?RC_SUCCESS, SP, PState4};
{error, Error} ->
?LOG(error, "Failed to open session: ~p", [Error], PState1),
{?RC_UNSPECIFIED_ERROR, PState1}
@ -256,7 +282,7 @@ process(?CONNECT_PACKET(
{ReasonCode, PState1}
end);
process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) ->
process_packet(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) ->
case check_publish(Packet, PState) of
{ok, PState1} ->
do_publish(Packet, PState1);
@ -265,7 +291,7 @@ process(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload), PState) ->
{ok, PState}
end;
process(Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), PState) ->
process_packet(Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), PState) ->
case check_publish(Packet, PState) of
{ok, PState1} ->
do_publish(Packet, PState1);
@ -273,7 +299,7 @@ process(Packet = ?PUBLISH_PACKET(?QOS_1, PacketId), PState) ->
deliver({puback, PacketId, ReasonCode}, PState)
end;
process(Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), PState) ->
process_packet(Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), PState) ->
case check_publish(Packet, PState) of
{ok, PState1} ->
do_publish(Packet, PState1);
@ -281,30 +307,37 @@ process(Packet = ?PUBLISH_PACKET(?QOS_2, PacketId), PState) ->
deliver({pubrec, PacketId, ReasonCode}, PState)
end;
process(?PUBACK_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) ->
ok = emqx_session:puback(SPid, PacketId, ReasonCode),
{ok, PState};
process_packet(?PUBACK_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) ->
{ok = emqx_session:puback(SPid, PacketId, ReasonCode), PState};
process(?PUBREC_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) ->
ok = emqx_session:pubrec(SPid, PacketId, ReasonCode),
send(?PUBREL_PACKET(PacketId), PState);
process_packet(?PUBREC_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) ->
case emqx_session:pubrec(SPid, PacketId, ReasonCode) of
ok ->
send(?PUBREL_PACKET(PacketId), PState);
{error, NotFound} ->
send(?PUBREL_PACKET(PacketId, NotFound), PState)
end;
process(?PUBREL_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) ->
ok = emqx_session:pubrel(SPid, PacketId, ReasonCode),
send(?PUBCOMP_PACKET(PacketId), PState);
process_packet(?PUBREL_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) ->
case emqx_session:pubrel(SPid, PacketId, ReasonCode) of
ok ->
send(?PUBCOMP_PACKET(PacketId), PState);
{error, NotFound} ->
send(?PUBCOMP_PACKET(PacketId, NotFound), PState)
end;
process(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) ->
ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode),
{ok, PState};
process_packet(?PUBCOMP_PACKET(PacketId, ReasonCode), PState = #pstate{session = SPid}) ->
{ok = emqx_session:pubcomp(SPid, PacketId, ReasonCode), PState};
process(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
PState = #pstate{client_id = ClientId, session = SPid}) ->
process_packet(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
PState = #pstate{session = SPid, mountpoint = Mountpoint}) ->
case check_subscribe(
parse_topic_filters(?SUBSCRIBE, RawTopicFilters), PState) of
{ok, TopicFilters} ->
case emqx_hooks:run('client.subscribe', [ClientId], TopicFilters) of
case emqx_hooks:run('client.subscribe', [credentials(PState)], TopicFilters) of
{ok, TopicFilters1} ->
ok = emqx_session:subscribe(SPid, PacketId, Properties, mount(TopicFilters1, PState)),
ok = emqx_session:subscribe(SPid, PacketId, Properties,
emqx_mountpoint:mount(Mountpoint, TopicFilters1)),
{ok, PState};
{stop, _} ->
ReasonCodes = lists:duplicate(length(TopicFilters),
@ -320,12 +353,13 @@ process(?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
deliver({suback, PacketId, ReasonCodes}, PState)
end;
process(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
PState = #pstate{client_id = ClientId, session = SPid}) ->
case emqx_hooks:run('client.unsubscribe', [ClientId],
process_packet(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
PState = #pstate{session = SPid, mountpoint = MountPoint}) ->
case emqx_hooks:run('client.unsubscribe', [credentials(PState)],
parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters)) of
{ok, TopicFilters} ->
ok = emqx_session:unsubscribe(SPid, PacketId, Properties, mount(TopicFilters, PState)),
ok = emqx_session:unsubscribe(SPid, PacketId, Properties,
emqx_mountpoint:mount(MountPoint, TopicFilters)),
{ok, PState};
{stop, _Acc} ->
ReasonCodes = lists:duplicate(length(RawTopicFilters),
@ -333,19 +367,20 @@ process(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
deliver({unsuback, PacketId, ReasonCodes}, PState)
end;
process(?PACKET(?PINGREQ), PState) ->
process_packet(?PACKET(?PINGREQ), PState) ->
send(?PACKET(?PINGRESP), PState);
process(?PACKET(?DISCONNECT), PState) ->
process_packet(?PACKET(?DISCONNECT), PState) ->
%% Clean willmsg
{stop, normal, PState#pstate{will_msg = undefined}}.
%%------------------------------------------------------------------------------
%% ConnAck -> Client
%% ConnAck --> Client
%%------------------------------------------------------------------------------
connack({?RC_SUCCESS, SP, PState}) ->
deliver({connack, ?RC_SUCCESS, sp(SP)}, PState);
emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS]),
deliver({connack, ?RC_SUCCESS, sp(SP)}, update_mountpoint(PState));
connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) ->
_ = deliver({connack, if ProtoVer =:= ?MQTT_PROTO_V5 ->
@ -360,20 +395,28 @@ connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) ->
%%------------------------------------------------------------------------------
do_publish(Packet = ?PUBLISH_PACKET(QoS, PacketId),
PState = #pstate{client_id = ClientId, session = SPid}) ->
Msg = mount(emqx_packet:to_message(ClientId, Packet), PState),
_ = emqx_session:publish(SPid, PacketId, Msg),
puback(QoS, PacketId, PState).
PState = #pstate{session = SPid, mountpoint = MountPoint}) ->
Msg = emqx_mountpoint:mount(MountPoint,
emqx_packet:to_message(credentials(PState), Packet)),
puback(QoS, PacketId, emqx_session:publish(SPid, PacketId, Msg), PState).
%%------------------------------------------------------------------------------
%% Puback -> Client
%%------------------------------------------------------------------------------
puback(?QOS_0, _PacketId, PState) ->
puback(?QOS_0, _PacketId, _Result, PState) ->
{ok, PState};
puback(?QOS_1, PacketId, PState) ->
puback(?QOS_1, PacketId, {error, ReasonCode}, PState) ->
deliver({puback, PacketId, ReasonCode}, PState);
puback(?QOS_1, PacketId, {ok, []}, PState) ->
deliver({puback, PacketId, ?RC_NO_MATCHING_SUBSCRIBERS}, PState);
puback(?QOS_1, PacketId, {ok, _}, PState) ->
deliver({puback, PacketId, ?RC_SUCCESS}, PState);
puback(?QOS_2, PacketId, PState) ->
puback(?QOS_2, PacketId, {error, ReasonCode}, PState) ->
deliver({pubrec, PacketId, ReasonCode}, PState);
puback(?QOS_2, PacketId, {ok, []}, PState) ->
deliver({pubrec, PacketId, ?RC_NO_MATCHING_SUBSCRIBERS}, PState);
puback(?QOS_2, PacketId, {ok, _}, PState) ->
deliver({pubrec, PacketId, ?RC_SUCCESS}, PState).
%%------------------------------------------------------------------------------
@ -386,10 +429,9 @@ deliver({connack, ReasonCode}, PState) ->
deliver({connack, ReasonCode, SP}, PState) ->
send(?CONNACK_PACKET(ReasonCode, SP), PState);
deliver({publish, PacketId, Msg}, PState = #pstate{client_id = ClientId,
is_bridge = IsBridge}) ->
_ = emqx_hooks:run('message.delivered', [ClientId], Msg),
Msg1 = unmount(clean_retain(IsBridge, Msg), PState),
deliver({publish, PacketId, Msg}, PState = #pstate{is_bridge = IsBridge, mountpoint = MountPoint}) ->
_ = emqx_hooks:run('message.delivered', credentials(PState), Msg),
Msg1 = emqx_mountpoint:unmount(MountPoint, clean_retain(IsBridge, Msg)),
send(emqx_packet:from_message(PacketId, Msg1), PState);
deliver({puback, PacketId, ReasonCode}, PState) ->
@ -445,13 +487,13 @@ maybe_assign_client_id(PState) ->
try_open_session(#pstate{zone = Zone,
client_id = ClientId,
client_pid = ClientPid,
conn_pid = ConnPid,
conn_props = ConnProps,
username = Username,
clean_start = CleanStart}) ->
case emqx_sm:open_session(#{zone => Zone,
client_id => ClientId,
client_pid => ClientPid,
conn_pid => ConnPid,
username => Username,
clean_start => CleanStart,
conn_props => ConnProps}) of
@ -592,14 +634,14 @@ shutdown(Error, PState = #pstate{client_id = ClientId, will_msg = WillMsg}) ->
true -> ok;
false -> send_willmsg(WillMsg)
end,
emqx_hooks:run('client.disconnected', [Error], client(PState)),
emqx_hooks:run('client.disconnected', [credentials(PState), Error]),
emqx_cm:unregister_connection(ClientId).
willmsg(Packet, PState = #pstate{client_id = ClientId})
willmsg(Packet, #pstate{client_id = ClientId, mountpoint = MountPoint})
when is_record(Packet, mqtt_packet_connect) ->
case emqx_packet:to_message(ClientId, Packet) of
undefined -> undefined;
Msg -> mount(Msg, PState)
Msg -> emqx_mountpoint:mount(MountPoint, Msg)
end.
send_willmsg(undefined) ->
@ -617,14 +659,11 @@ start_keepalive(Secs, #pstate{zone = Zone}) when Secs > 0 ->
%% Parse topic filters
%%-----------------------------------------------------------------------------
parse_topic_filters(?SUBSCRIBE, TopicFilters) ->
[begin
{Topic, TOpts} = emqx_topic:parse(RawTopic),
{Topic, maps:merge(SubOpts, TOpts)}
end || {RawTopic, SubOpts} <- TopicFilters];
parse_topic_filters(?SUBSCRIBE, RawTopicFilters) ->
[emqx_topic:parse(RawTopic, SubOpts) || {RawTopic, SubOpts} <- RawTopicFilters];
parse_topic_filters(?UNSUBSCRIBE, TopicFilters) ->
lists:map(fun emqx_topic:parse/1, TopicFilters).
parse_topic_filters(?UNSUBSCRIBE, RawTopicFilters) ->
lists:map(fun emqx_topic:parse/1, RawTopicFilters).
%%-----------------------------------------------------------------------------
%% The retained flag should be propagated for bridge.
@ -638,37 +677,14 @@ clean_retain(false, Msg = #message{flags = #{retain := true}, headers = Headers}
clean_retain(_IsBridge, Msg) ->
Msg.
%%-----------------------------------------------------------------------------
%% Mount Point
%%-----------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% Update mountpoint
mount(Any, #pstate{mountpoint = undefined}) ->
Any;
mount(Msg = #message{topic = Topic}, #pstate{mountpoint = MountPoint}) ->
Msg#message{topic = <<MountPoint/binary, Topic/binary>>};
mount(TopicFilters, #pstate{mountpoint = MountPoint}) when is_list(TopicFilters) ->
[{<<MountPoint/binary, Topic/binary>>, SubOpts} || {Topic, SubOpts} <- TopicFilters].
unmount(Any, #pstate{mountpoint = undefined}) ->
Any;
unmount(Msg = #message{topic = Topic}, #pstate{mountpoint = MountPoint}) ->
case catch split_binary(Topic, byte_size(MountPoint)) of
{MountPoint, Topic1} -> Msg#message{topic = Topic1};
_Other -> Msg
end.
replvar(PState = #pstate{mountpoint = undefined}) ->
update_mountpoint(PState = #pstate{mountpoint = undefined}) ->
PState;
replvar(PState = #pstate{client_id = ClientId, username = Username, mountpoint = MountPoint}) ->
Vars = [{<<"%c">>, ClientId}, {<<"%u">>, Username}],
PState#pstate{mountpoint = lists:foldl(fun feed_var/2, MountPoint, Vars)}.
feed_var({<<"%c">>, ClientId}, MountPoint) ->
emqx_topic:feed_var(<<"%c">>, ClientId, MountPoint);
feed_var({<<"%u">>, undefined}, MountPoint) ->
MountPoint;
feed_var({<<"%u">>, Username}, MountPoint) ->
emqx_topic:feed_var(<<"%u">>, Username, MountPoint).
update_mountpoint(PState = #pstate{mountpoint = MountPoint}) ->
PState#pstate{mountpoint = emqx_mountpoint:replvar(MountPoint, credentials(PState))}.
sp(true) -> 1;
sp(false) -> 0.

View File

@ -133,9 +133,6 @@
%% Stats timer
stats_timer :: reference() | undefined,
%% Ignore loop deliver?
ignore_loop_deliver = false :: boolean(),
%% TODO:
deliver_stats = 0,
@ -169,20 +166,17 @@ start_link(SessAttrs) ->
-spec(subscribe(pid(), list({topic(), map()}) |
{mqtt_packet_id(), mqtt_properties(), topic_table()}) -> ok).
subscribe(SPid, TopicFilters) when is_list(TopicFilters) ->
gen_server:cast(SPid, {subscribe, [begin
{Topic, Opts} = emqx_topic:parse(RawTopic),
{Topic, maps:merge(
maps:merge(
?DEFAULT_SUBOPTS, SubOpts), Opts)}
end || {RawTopic, SubOpts} <- TopicFilters]}).
subscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) ->
TopicFilters = [emqx_topic:parse(RawTopic, maps:merge(?DEFAULT_SUBOPTS, SubOpts))
|| {RawTopic, SubOpts} <- RawTopicFilters],
subscribe(SPid, undefined, #{}, TopicFilters).
%% for mqtt 5.0
subscribe(SPid, PacketId, Properties, TopicFilters) ->
SubReq = {PacketId, Properties, TopicFilters},
gen_server:cast(SPid, {subscribe, self(), SubReq}).
-spec(publish(pid(), mqtt_packet_id(), message()) -> {ok, delivery()} | {error, term()}).
-spec(publish(pid(), mqtt_packet_id(), message()) -> {ok, emqx_types:dispatches()}).
publish(_SPid, _PacketId, Msg = #message{qos = ?QOS_0}) ->
%% Publish QoS0 message to broker directly
emqx_broker:publish(Msg);
@ -202,27 +196,29 @@ puback(SPid, PacketId) ->
puback(SPid, PacketId, ReasonCode) ->
gen_server:cast(SPid, {puback, PacketId, ReasonCode}).
-spec(pubrec(pid(), mqtt_packet_id()) -> ok).
-spec(pubrec(pid(), mqtt_packet_id()) -> ok | {error, mqtt_reason_code()}).
pubrec(SPid, PacketId) ->
gen_server:cast(SPid, {pubrec, PacketId}).
pubrec(SPid, PacketId, ?RC_SUCCESS).
-spec(pubrec(pid(), mqtt_packet_id(), mqtt_reason_code())
-> ok | {error, mqtt_reason_code()}).
pubrec(SPid, PacketId, ReasonCode) ->
gen_server:cast(SPid, {pubrec, PacketId, ReasonCode}).
gen_server:call(SPid, {pubrec, PacketId, ReasonCode}, infinity).
-spec(pubrel(pid(), mqtt_packet_id(), mqtt_reason_code()) -> ok).
-spec(pubrel(pid(), mqtt_packet_id(), mqtt_reason_code())
-> ok | {error, mqtt_reason_code()}).
pubrel(SPid, PacketId, ReasonCode) ->
gen_server:cast(SPid, {pubrel, PacketId, ReasonCode}).
gen_server:call(SPid, {pubrel, PacketId, ReasonCode}, infinity).
-spec(pubcomp(pid(), mqtt_packet_id(), mqtt_reason_code()) -> ok).
pubcomp(SPid, PacketId, ReasonCode) ->
gen_server:cast(SPid, {pubcomp, PacketId, ReasonCode}).
-spec(unsubscribe(pid(), {mqtt_packet_id(), mqtt_properties(), topic_table()}) -> ok).
unsubscribe(SPid, TopicFilters) when is_list(TopicFilters) ->
%%TODO: Parse the topic filters?
unsubscribe(SPid, {undefined, #{}, TopicFilters}).
-spec(unsubscribe(pid(), topic_table()) -> ok).
unsubscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) ->
unsubscribe(SPid, undefined, #{}, lists:map(fun emqx_topic:parse/1, RawTopicFilters)).
%% TODO:...
-spec(unsubscribe(pid(), mqtt_packet_id(), mqtt_properties(), topic_table()) -> ok).
unsubscribe(SPid, PacketId, Properties, TopicFilters) ->
UnsubReq = {PacketId, Properties, TopicFilters},
gen_server:cast(SPid, {unsubscribe, self(), UnsubReq}).
@ -335,7 +331,6 @@ init(#{zone := Zone,
max_awaiting_rel = get_env(Zone, max_awaiting_rel),
expiry_interval = get_env(Zone, session_expiry_interval),
enable_stats = get_env(Zone, enable_stats, true),
ignore_loop_deliver = get_env(Zone, ignore_loop_deliver, false),
deliver_stats = 0,
enqueue_stats = 0,
created_at = os:timestamp()},
@ -359,21 +354,45 @@ handle_call({discard, ClientPid}, _From, State = #state{client_pid = OldClientPi
?LOG(warning, " ~p kickout ~p", [ClientPid, OldClientPid], State),
{stop, {shutdown, conflict}, ok, State};
%% PUBLISH:
handle_call({publish, PacketId, Msg = #message{qos = ?QOS_2}}, _From,
State = #state{awaiting_rel = AwaitingRel,
await_rel_timer = Timer,
await_rel_timeout = Timeout}) ->
State = #state{awaiting_rel = AwaitingRel}) ->
case is_awaiting_full(State) of
false ->
State1 = case Timer == undefined of
true -> State#state{await_rel_timer = emqx_misc:start_timer(Timeout, check_awaiting_rel)};
false -> State
end,
reply(ok, State1#state{awaiting_rel = maps:put(PacketId, Msg, AwaitingRel)});
case maps:is_key(PacketId, AwaitingRel) of
true ->
reply({error, ?RC_PACKET_IDENTIFIER_IN_USE}, State);
false ->
State1 = State#state{awaiting_rel = maps:put(PacketId, Msg, AwaitingRel)},
reply(emqx_broker:publish(Msg), ensure_await_rel_timer(State1))
end;
true ->
?LOG(warning, "Dropped QoS2 Message for too many awaiting_rel: ~p", [Msg], State),
emqx_metrics:inc('messages/qos2/dropped'),
reply({error, dropped}, State)
reply({error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State)
end;
%% PUBREC:
handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = Inflight}) ->
case emqx_inflight:contain(PacketId, Inflight) of
true ->
reply(ok, acked(pubrec, PacketId, State));
false ->
?LOG(warning, "The PUBREC PacketId is not found: ~w", [PacketId], State),
emqx_metrics:inc('packets/pubrec/missed'),
reply({error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State)
end;
%% PUBREL:
handle_call({pubrel, PacketId, _ReasonCode}, _From,
State = #state{awaiting_rel = AwaitingRel}) ->
case maps:take(PacketId, AwaitingRel) of
{_, AwaitingRel1} ->
reply(ok, State#state{awaiting_rel = AwaitingRel1});
error ->
?LOG(warning, "Cannot find PUBREL: ~p", [PacketId], State),
emqx_metrics:inc('packets/pubrel/missed'),
reply({error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State)
end;
handle_call(info, _From, State) ->
@ -390,57 +409,38 @@ handle_call(Req, _From, State) ->
{reply, ignored, State}.
%% SUBSCRIBE:
handle_cast({subscribe, TopicFilters}, State = #state{client_id = ClientId, subscriptions = Subscriptions}) ->
Subscriptions1 = lists:foldl(
fun({Topic, SubOpts}, SubMap) ->
case maps:find(Topic, SubMap) of
{ok, _OldOpts} ->
emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts),
emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts]),
?LOG(warning, "Duplicated subscribe: ~s, subopts: ~p", [Topic, SubOpts], State);
error ->
emqx_broker:subscribe(Topic, ClientId, SubOpts),
emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts])
end,
maps:put(Topic, SubOpts, SubMap)
end, Subscriptions, TopicFilters),
{noreply, State#state{subscriptions = Subscriptions1}};
handle_cast({subscribe, From, {PacketId, Properties, TopicFilters}},
handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}},
State = #state{client_id = ClientId, subscriptions = Subscriptions}) ->
{ReasonCodes, Subscriptions1} =
lists:foldr(fun({Topic, SubOpts = #{qos := QoS}}, {RcAcc, SubMap}) ->
{[QoS|RcAcc],
case maps:find(Topic, SubMap) of
{ok, SubOpts} ->
?LOG(warning, "Duplicated subscribe: ~s, subopts: ~p", [Topic, SubOpts], State),
SubMap;
{ok, OldOpts} ->
emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts),
emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts]),
?LOG(warning, "Duplicated subscribe ~s, old_opts: ~p, new_opts: ~p", [Topic, OldOpts, SubOpts], State),
maps:put(Topic, with_subid(Properties, SubOpts), SubMap);
error ->
emqx_broker:subscribe(Topic, ClientId, SubOpts),
emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts]),
maps:put(Topic, with_subid(Properties, SubOpts), SubMap)
end}
{[QoS|RcAcc], case maps:find(Topic, SubMap) of
{ok, SubOpts} ->
SubMap;
{ok, _SubOpts} ->
emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts),
emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts]),
maps:put(Topic, SubOpts, SubMap);
error ->
emqx_broker:subscribe(Topic, ClientId, SubOpts),
emqx_hooks:run('session.subscribed', [ClientId, Topic, SubOpts]),
maps:put(Topic, SubOpts, SubMap)
end}
end, {[], Subscriptions}, TopicFilters),
suback(From, PacketId, ReasonCodes),
suback(FromPid, PacketId, ReasonCodes),
{noreply, State#state{subscriptions = Subscriptions1}};
%% UNSUBSCRIBE:
handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}},
State = #state{client_id = ClientId, subscriptions = Subscriptions}) ->
{ReasonCodes, Subscriptions1} =
lists:foldr(fun({Topic, _Opts}, {RcAcc, SubMap}) ->
lists:foldr(fun({Topic, _SubOpts}, {Acc, SubMap}) ->
case maps:find(Topic, SubMap) of
{ok, SubOpts} ->
emqx_broker:unsubscribe(Topic, ClientId),
ok = emqx_broker:unsubscribe(Topic, ClientId),
emqx_hooks:run('session.unsubscribed', [ClientId, Topic, SubOpts]),
{[?RC_SUCCESS|RcAcc], maps:remove(Topic, SubMap)};
{[?RC_SUCCESS|Acc], maps:remove(Topic, SubMap)};
error ->
{[?RC_NO_SUBSCRIPTION_EXISTED|RcAcc], SubMap}
{[?RC_NO_SUBSCRIPTION_EXISTED|Acc], SubMap}
end
end, {[], Subscriptions}, TopicFilters),
unsuback(From, PacketId, ReasonCodes),
@ -452,44 +452,18 @@ handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight}
true ->
{noreply, dequeue(acked(puback, PacketId, State))};
false ->
?LOG(warning, "The PUBACK PacketId is not found: ~p", [PacketId], State),
?LOG(warning, "The PUBACK PacketId is not found: ~w", [PacketId], State),
emqx_metrics:inc('packets/puback/missed'),
{noreply, State}
end;
%% PUBREC: How to handle ReasonCode?
handle_cast({pubrec, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) ->
case emqx_inflight:contain(PacketId, Inflight) of
true ->
{noreply, acked(pubrec, PacketId, State)};
false ->
?LOG(warning, "The PUBREC PacketId is not found: ~w", [PacketId], State),
emqx_metrics:inc('packets/pubrec/missed'),
{noreply, State}
end;
%% PUBREL:
handle_cast({pubrel, PacketId, _ReasonCode}, State = #state{awaiting_rel = AwaitingRel}) ->
{noreply,
case maps:take(PacketId, AwaitingRel) of
{Msg, AwaitingRel1} ->
%% Implement Qos2 by method A [MQTT 4.33]
%% Dispatch to subscriber when received PUBREL
emqx_broker:publish(Msg), %% FIXME:
maybe_gc(State#state{awaiting_rel = AwaitingRel1});
error ->
?LOG(warning, "Cannot find PUBREL: ~p", [PacketId], State),
emqx_metrics:inc('packets/pubrel/missed'),
State
end, hibernate};
%% PUBCOMP:
handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) ->
case emqx_inflight:contain(PacketId, Inflight) of
true ->
{noreply, dequeue(acked(pubcomp, PacketId, State))};
false ->
?LOG(warning, "The PUBCOMP Packet Identifier is not found: ~w", [PacketId], State),
?LOG(warning, "The PUBCOMP PacketId is not found: ~w", [PacketId], State),
emqx_metrics:inc('packets/pubcomp/missed'),
{noreply, State}
end;
@ -542,19 +516,22 @@ handle_cast(Msg, State) ->
emqx_logger:error("[Session] unexpected cast: ~p", [Msg]),
{noreply, State}.
%% Batch dispatch
handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) ->
{noreply, lists:foldl(fun(Msg, NewState) ->
element(2, handle_info({dispatch, Topic, Msg}, NewState))
end, State, Msgs)};
%% Ignore messages delivered by self
handle_info({dispatch, _Topic, #message{from = ClientId}},
State = #state{client_id = ClientId, ignore_loop_deliver = true}) ->
{noreply, State};
%% Dispatch Message
handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, message) ->
{noreply, maybe_gc(dispatch(tune_qos(Topic, reset_dup(Msg), State), State))};
%% Dispatch message
handle_info({dispatch, Topic, Msg}, State = #state{subscriptions = SubMap}) when is_record(Msg, message) ->
{noreply, case maps:find(Topic, SubMap) of
{ok, #{nl := Nl, qos := QoS, subid := SubId}} ->
run_dispatch_steps([{nl, Nl},{qos, QoS}, {subid, SubId}], Msg, State);
{ok, #{nl := Nl, qos := QoS}} ->
run_dispatch_steps([{nl, Nl},{qos, QoS}], Msg, State);
error ->
dispatch(reset_dup(Msg), State)
end};
%% Do nothing if the client has been disconnected.
handle_info({timeout, _Timer, retry_delivery}, State = #state{client_pid = undefined}) ->
@ -611,11 +588,6 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions
%%------------------------------------------------------------------------------
with_subid(#{'Subscription-Identifier' := SubId}, SubOpts) ->
maps:put(subid, SubId, SubOpts);
with_subid(_Props, SubOpts) ->
SubOpts.
suback(_From, undefined, _ReasonCodes) ->
ignore;
suback(From, PacketId, ReasonCodes) ->
@ -727,6 +699,19 @@ is_awaiting_full(#state{awaiting_rel = AwaitingRel, max_awaiting_rel = MaxLen})
%% Dispatch Messages
%%------------------------------------------------------------------------------
run_dispatch_steps([], Msg, State) ->
dispatch(Msg, State);
run_dispatch_steps([{nl, 1}|_Steps], #message{from = ClientId}, State = #state{client_id = ClientId}) ->
State;
run_dispatch_steps([{nl, 0}|Steps], Msg, State) ->
run_dispatch_steps(Steps, Msg, State);
run_dispatch_steps([{qos, SubQoS}|Steps], Msg = #message{qos = PubQoS}, State = #state{upgrade_qos = false}) ->
run_dispatch_steps(Steps, Msg#message{qos = min(SubQoS, PubQoS)}, State);
run_dispatch_steps([{qos, SubQoS}|Steps], Msg = #message{qos = PubQoS}, State = #state{upgrade_qos = true}) ->
run_dispatch_steps(Steps, Msg#message{qos = max(SubQoS, PubQoS)}, State);
run_dispatch_steps([{subid, SubId}|Steps], Msg, State) ->
run_dispatch_steps(Steps, emqx_message:set_header('Subscription-Identifier', SubId, Msg), State).
%% Enqueue message if the client has been disconnected
dispatch(Msg, State = #state{client_id = ClientId, client_pid = undefined}) ->
case emqx_hooks:run('message.dropped', [ClientId, Msg]) of
@ -837,19 +822,14 @@ dequeue2(State = #state{mqueue = Q}) ->
dequeue(dispatch(Msg, State#state{mqueue = Q1}))
end.
%%------------------------------------------------------------------------------
%% Tune QoS
tune_qos(Topic, Msg = #message{qos = PubQoS},
#state{subscriptions = SubMap, upgrade_qos = UpgradeQoS}) ->
case maps:find(Topic, SubMap) of
{ok, #{qos := SubQoS}} when UpgradeQoS andalso (SubQoS > PubQoS) ->
Msg#message{qos = SubQoS};
{ok, #{qos := SubQoS}} when (not UpgradeQoS) andalso (SubQoS < PubQoS) ->
Msg#message{qos = SubQoS};
{ok, _} -> Msg;
error -> Msg
end.
%%------------------------------------------------------------------------------
%% Ensure timers
ensure_await_rel_timer(State = #state{await_rel_timer = undefined, await_rel_timeout = Timeout}) ->
State#state{await_rel_timer = emqx_misc:start_timer(Timeout, check_awaiting_rel)};
ensure_await_rel_timer(State) ->
State.
%%------------------------------------------------------------------------------
%% Reset Dup
@ -889,5 +869,5 @@ reply(Reply, State) ->
shutdown(Reason, State) ->
{stop, {shutdown, Reason}, State}.
maybe_gc(State) -> State.
%%TODO: maybe_gc(State) -> State.

View File

@ -19,7 +19,7 @@
-export_type([startlink_ret/0]).
-export_type([zone/0, client_id/0, username/0, password/0, peername/0,
protocol/0, credentials/0]).
-export_type([payload/0]).
-export_type([topic/0, payload/0, dispatches/0]).
%%-export_type([payload/0, message/0, delivery/0]).
-type(startlink_ret() :: {ok, pid()} | ignore | {error, term()}).
@ -36,7 +36,10 @@
zone => zone(),
atom() => term()}).
-type(topic() :: binary()).
-type(payload() :: binary() | iodata()).
%-type(message() :: #message{}).
%-type(delivery() :: #delivery{}).
-type(dispatches() :: [{route, node(), topic()} | {dispatch, topic(), pos_integer()}]).