diff --git a/README-CN.md b/README-CN.md index 4c8a6e20e..93e9ac645 100644 --- a/README-CN.md +++ b/README-CN.md @@ -65,7 +65,6 @@ cd _rel/emqx && ./bin/emqx console 你可通过以下途径与 EMQ 社区及开发者联系: - [EMQX Slack](http://emqx.slack.com) -- [Mailing Lists]() - [Twitter](https://twitter.com/emqtt) - [Forum](https://groups.google.com/d/forum/emqtt) - [Blog](https://medium.com/@emqtt) diff --git a/README.md b/README.md index 34e1221f2..968697b07 100644 --- a/README.md +++ b/README.md @@ -66,7 +66,6 @@ The [EMQ X Roadmap uses Github milestones](https://github.com/emqx/emqx/mileston You can reach the EMQ community and developers via the following channels: - [EMQX Slack](http://emqx.slack.com) -- [Mailing Lists]() - [Twitter](https://twitter.com/emqtt) - [Forum](https://groups.google.com/d/forum/emqtt) - [Blog](https://medium.com/@emqtt) diff --git a/etc/acl.conf b/etc/acl.conf index b60ea5e7a..af2fb0dd1 100644 --- a/etc/acl.conf +++ b/etc/acl.conf @@ -1,21 +1,19 @@ %%-------------------------------------------------------------------- +%% [ACL](https://docs.emqx.io/broker/v3/en/config.html) %% -%% [ACL](http://emqtt.io/docs/v2/config.html#allow-anonymous-and-acl-file) -%% -%% -type who() :: all | binary() | +%% -type(who() :: all | binary() | %% {ipaddr, esockd_access:cidr()} | %% {client, binary()} | -%% {user, binary()}. +%% {user, binary()}). %% -%% -type access() :: subscribe | publish | pubsub. +%% -type(access() :: subscribe | publish | pubsub). %% -%% -type topic() :: binary(). +%% -type(topic() :: binary()). %% -%% -type rule() :: {allow, all} | +%% -type(rule() :: {allow, all} | %% {allow, who(), access(), list(topic())} | %% {deny, all} | -%% {deny, who(), access(), list(topic())}. -%% +%% {deny, who(), access(), list(topic())}). %%-------------------------------------------------------------------- {allow, {user, "dashboard"}, subscribe, ["$SYS/#"]}. @@ -25,3 +23,4 @@ {deny, all, subscribe, ["$SYS/#", {eq, "#"}]}. {allow, all}. + diff --git a/include/emqx_mqtt.hrl b/include/emqx_mqtt.hrl index 3a138f393..b1c974c44 100644 --- a/include/emqx_mqtt.hrl +++ b/include/emqx_mqtt.hrl @@ -204,7 +204,7 @@ %%-------------------------------------------------------------------- -define(DEFAULT_SUBOPTS, #{rh => 0, %% Retain Handling - rap => 0, %% Retain as Publish + rap => 1, %% Retain as Publish nl => 0, %% No Local qos => 0 %% QoS }). diff --git a/include/logger.hrl b/include/logger.hrl index 6f046e3c2..539371301 100644 --- a/include/logger.hrl +++ b/include/logger.hrl @@ -43,6 +43,6 @@ -define(LOG(Level, Format, Args), begin - (logger:log(Level,#{},#{report_cb => fun(_) -> {'$logger_header'()++(Format), (Args)} end})) + (logger:log(Level,#{},#{report_cb => fun(_) -> {'$logger_header'()++(Format), (Args)} end, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY}})) end). diff --git a/src/emqx.app.src b/src/emqx.app.src index 0d58e4dd3..9c00ca6bb 100644 --- a/src/emqx.app.src +++ b/src/emqx.app.src @@ -1,14 +1,16 @@ -{application, emqx, [ +{application, emqx, + [{description, "EMQ X Broker"}, {id, "emqx"}, {vsn, "git"}, - {description, "EMQ X Broker"}, {modules, []}, {registered, []}, {applications, [kernel,stdlib,jsx,gproc,gen_rpc,esockd,cowboy, sasl,os_mon]}, - {env, []}, {mod, {emqx_app,[]}}, - {maintainers, ["Feng Lee "]}, + {env, []}, {licenses, ["Apache-2.0"]}, - {links, [{"Github", "https://github.com/emqx/emqx"}]} + {maintainers, ["EMQ X Team "]}, + {links, [{"Homepage", "https://emqx.io/"}, + {"Github", "https://github.com/emqx/emqx"} + ]} ]}. diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 95588d6ed..aa001cb01 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -33,8 +33,8 @@ , caps/1 ]). -%% for tests --export([set/3]). +%% Exports for unit tests:( +-export([set_field/3]). -export([ handle_in/2 , handle_out/2 @@ -45,14 +45,16 @@ , terminate/2 ]). -%% Ensure timer --export([ensure_timer/2]). +-export([ received/2 + , sent/2 + ]). --export([gc/3]). - --import(emqx_misc, [maybe_apply/2]). - --import(emqx_access_control, [check_acl/3]). +-import(emqx_misc, + [ run_fold/2 + , run_fold/3 + , pipeline/3 + , maybe_apply/2 + ]). -export_type([channel/0]). @@ -68,15 +70,14 @@ %% Timers timers :: #{atom() => disabled | maybe(reference())}, %% GC State - gc_state :: emqx_gc:gc_state(), + gc_state :: maybe(emqx_gc:gc_state()), %% OOM Policy - oom_policy :: emqx_oom:oom_policy(), + oom_policy :: maybe(emqx_oom:oom_policy()), %% Connected - connected :: boolean(), - %% Disonnected - disconnected :: boolean(), + connected :: undefined | boolean(), %% Connected at connected_at :: erlang:timestamp(), + %% Disconnected at disconnected_at :: erlang:timestamp(), %% Takeover takeover :: boolean(), @@ -97,6 +98,10 @@ will_timer => will_message }). +-define(ATTR_KEYS, [client, session, protocol, connected, connected_at, disconnected_at]). + +-define(INFO_KEYS, ?ATTR_KEYS ++ [keepalive, gc_state, disconnected_at]). + %%-------------------------------------------------------------------- %% Init the channel %%-------------------------------------------------------------------- @@ -117,22 +122,22 @@ init(ConnInfo, Options) -> client_id => <<>>, mountpoint => MountPoint, is_bridge => false, - is_superuser => false}, ConnInfo), + is_superuser => false + }, ConnInfo), EnableStats = emqx_zone:get_env(Zone, enable_stats, true), StatsTimer = if EnableStats -> undefined; ?Otherwise -> disabled end, - GcState = emqx_gc:init(emqx_zone:get_env(Zone, force_gc_policy, false)), - OomPolicy = emqx_oom:init(emqx_zone:get_env(Zone, force_shutdown_policy)), + GcState = maybe_apply(fun emqx_gc:init/1, + emqx_zone:get_env(Zone, force_gc_policy)), + OomPolicy = maybe_apply(fun emqx_oom:init/1, + emqx_zone:get_env(Zone, force_shutdown_policy)), #channel{client = Client, - session = undefined, - protocol = undefined, gc_state = GcState, oom_policy = OomPolicy, timers = #{stats_timer => StatsTimer}, - connected = false, - disconnected = false, + connected = undefined, takeover = false, resuming = false, pendings = [] @@ -145,27 +150,14 @@ peer_cert_as_username(Options) -> %% Info, Attrs and Caps %%-------------------------------------------------------------------- +%% @doc Get infos of the channel. -spec(info(channel()) -> emqx_types:infos()). -info(#channel{client = Client, - session = Session, - protocol = Protocol, - keepalive = Keepalive, - gc_state = GCState, - oom_policy = OomPolicy, - connected = Connected, - connected_at = ConnectedAt - }) -> - #{client => Client, - session => maybe_apply(fun emqx_session:info/1, Session), - protocol => maybe_apply(fun emqx_protocol:info/1, Protocol), - keepalive => maybe_apply(fun emqx_keepalive:info/1, Keepalive), - gc_state => emqx_gc:info(GCState), - oom_policy => emqx_oom:info(OomPolicy), - connected => Connected, - connected_at => ConnectedAt - }. +info(Channel) -> + maps:from_list(info(?INFO_KEYS, Channel)). --spec(info(atom(), channel()) -> term()). +-spec(info(list(atom())|atom(), channel()) -> term()). +info(Keys, Channel) when is_list(Keys) -> + [{Key, info(Key, Channel)} || Key <- Keys]; info(client, #channel{client = Client}) -> Client; info(session, #channel{session = Session}) -> @@ -174,10 +166,10 @@ info(protocol, #channel{protocol = Protocol}) -> maybe_apply(fun emqx_protocol:info/1, Protocol); info(keepalive, #channel{keepalive = Keepalive}) -> maybe_apply(fun emqx_keepalive:info/1, Keepalive); -info(gc_state, #channel{gc_state = GCState}) -> - emqx_gc:info(GCState); -info(oom_policy, #channel{oom_policy = Policy}) -> - emqx_oom:info(Policy); +info(gc_state, #channel{gc_state = GcState}) -> + maybe_apply(fun emqx_gc:info/1, GcState); +info(oom_policy, #channel{oom_policy = OomPolicy}) -> + maybe_apply(fun emqx_oom:info/1, OomPolicy); info(connected, #channel{connected = Connected}) -> Connected; info(connected_at, #channel{connected_at = ConnectedAt}) -> @@ -185,20 +177,17 @@ info(connected_at, #channel{connected_at = ConnectedAt}) -> info(disconnected_at, #channel{disconnected_at = DisconnectedAt}) -> DisconnectedAt. +%% @doc Get attrs of the channel. -spec(attrs(channel()) -> emqx_types:attrs()). -attrs(#channel{client = Client, - session = Session, - protocol = Protocol, - connected = Connected, - connected_at = ConnectedAt}) -> - #{client => Client, - session => maybe_apply(fun emqx_session:attrs/1, Session), - protocol => maybe_apply(fun emqx_protocol:attrs/1, Protocol), - connected => Connected, - connected_at => ConnectedAt - }. +attrs(Channel) -> + maps:from_list([{Key, attr(Key, Channel)} || Key <- ?ATTR_KEYS]). + +attr(protocol, #channel{protocol = Proto}) -> + maybe_apply(fun emqx_protocol:attrs/1, Proto); +attr(session, #channel{session = Session}) -> + maybe_apply(fun emqx_session:attrs/1, Session); +attr(Key, Channel) -> info(Key, Channel). -%%TODO: ChanStats? -spec(stats(channel()) -> emqx_types:stats()). stats(#channel{session = Session}) -> emqx_session:stats(Session). @@ -207,16 +196,11 @@ stats(#channel{session = Session}) -> caps(#channel{client = #{zone := Zone}}) -> emqx_mqtt_caps:get_caps(Zone). -%%-------------------------------------------------------------------- -%% For unit tests -%%-------------------------------------------------------------------- - -set(client, Client, Channel) -> - Channel#channel{client = Client}; -set(session, Session, Channel) -> - Channel#channel{session = Session}; -set(protocol, Protocol, Channel) -> - Channel#channel{protocol = Protocol}. +%% For tests +set_field(Name, Val, Channel) -> + Fields = record_info(fields, channel), + Pos = emqx_misc:index_of(Name, Fields), + setelement(Pos+1, Channel, Val). %%-------------------------------------------------------------------- %% Handle incoming packet @@ -244,7 +228,8 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) -> handle_out({connack, ReasonCode}, NChannel) end; -handle_in(Packet = ?PUBLISH_PACKET(_QoS, Topic, _PacketId), Channel = #channel{protocol = Protocol}) -> +handle_in(Packet = ?PUBLISH_PACKET(_QoS, Topic, _PacketId), + Channel = #channel{protocol = Protocol}) -> case pipeline([fun validate_packet/2, fun process_alias/2, fun check_publish/2], Packet, Channel) of @@ -255,41 +240,52 @@ handle_in(Packet = ?PUBLISH_PACKET(_QoS, Topic, _PacketId), Channel = #channel{p ?LOG(warning, "Cannot publish message to ~s due to ~s", [Topic, emqx_reason_codes:text(ReasonCode, ProtoVer)]), handle_out({disconnect, ReasonCode}, NChannel) - % case QoS of - % ?QOS_0 -> handle_out({puberr, ReasonCode}, NChannel); - % ?QOS_1 -> handle_out({puback, PacketId, ReasonCode}, NChannel); - % ?QOS_2 -> handle_out({pubrec, PacketId, ReasonCode}, NChannel) - % end end; -%%TODO: How to handle the ReasonCode? -handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) -> +handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), + Channel = #channel{client = Client, session = Session}) -> case emqx_session:puback(PacketId, Session) of - {ok, Publishes, NSession} -> + {ok, Msg, Publishes, NSession} -> + ok = emqx_hooks:run('message.acked', [Client, Msg]), handle_out({publish, Publishes}, Channel#channel{session = NSession}); - {ok, NSession} -> + {ok, Msg, NSession} -> + ok = emqx_hooks:run('message.acked', [Client, Msg]), {ok, Channel#channel{session = NSession}}; - {error, _NotFound} -> - %%TODO: How to handle NotFound, inc metrics? + {error, ?RC_PACKET_IDENTIFIER_IN_USE} -> + ?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]), + ok = emqx_metrics:inc('packets.puback.inuse'), + {ok, Channel}; + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> + ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]), + ok = emqx_metrics:inc('packets.puback.missed'), {ok, Channel} end; -%%TODO: How to handle the ReasonCode? -handle_in(?PUBREC_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) -> +handle_in(?PUBREC_PACKET(PacketId, _ReasonCode), + Channel = #channel{client = Client, session = Session}) -> case emqx_session:pubrec(PacketId, Session) of - {ok, NSession} -> - handle_out({pubrel, PacketId, ?RC_SUCCESS}, Channel#channel{session = NSession}); - {error, ReasonCode} -> - handle_out({pubrel, PacketId, ReasonCode}, Channel) + {ok, Msg, NSession} -> + ok = emqx_hooks:run('message.acked', [Client, Msg]), + NChannel = Channel#channel{session = NSession}, + handle_out({pubrel, PacketId, ?RC_SUCCESS}, NChannel); + {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} -> + ?LOG(warning, "The PUBREC PacketId ~w is inuse.", [PacketId]), + ok = emqx_metrics:inc('packets.pubrec.inuse'), + handle_out({pubrel, PacketId, RC}, Channel); + {error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> + ?LOG(warning, "The PUBREC ~w is not found.", [PacketId]), + ok = emqx_metrics:inc('packets.pubrec.missed'), + handle_out({pubrel, PacketId, RC}, Channel) end; -%%TODO: How to handle the ReasonCode? handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) -> case emqx_session:pubrel(PacketId, Session) of {ok, NSession} -> handle_out({pubcomp, PacketId, ?RC_SUCCESS}, Channel#channel{session = NSession}); - {error, ReasonCode} -> - handle_out({pubcomp, PacketId, ReasonCode}, Channel) + {error, NotFound} -> + ?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId]), + ok = emqx_metrics:inc('packets.pubrel.missed'), + handle_out({pubcomp, PacketId, NotFound}, Channel) end; handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) -> @@ -298,36 +294,27 @@ handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = S handle_out({publish, Publishes}, Channel#channel{session = NSession}); {ok, NSession} -> {ok, Channel#channel{session = NSession}}; - {error, _NotFound} -> - %% TODO: how to handle NotFound? + {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> + ?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId]), + ok = emqx_metrics:inc('packets.pubcomp.missed'), {ok, Channel} end; -handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), - Channel = #channel{client = Client}) -> +handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), Channel) -> case validate_packet(Packet, Channel) of ok -> - TopicFilters1 = [emqx_topic:parse(TopicFilter, SubOpts) - || {TopicFilter, SubOpts} <- TopicFilters], - TopicFilters2 = emqx_hooks:run_fold('client.subscribe', - [Client, Properties], - TopicFilters1), - TopicFilters3 = enrich_subid(Properties, TopicFilters2), - {ReasonCodes, NChannel} = process_subscribe(TopicFilters3, Channel), + TopicFilters = preprocess_subscribe(Properties, RawTopicFilters, Channel), + {ReasonCodes, NChannel} = process_subscribe(TopicFilters, Channel), handle_out({suback, PacketId, ReasonCodes}, NChannel); {error, ReasonCode} -> handle_out({disconnect, ReasonCode}, Channel) end; -handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), - Channel = #channel{client = Client}) -> +handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), Channel) -> case validate_packet(Packet, Channel) of ok -> - TopicFilters1 = lists:map(fun emqx_topic:parse/1, TopicFilters), - TopicFilters2 = emqx_hooks:run_fold('client.unsubscribe', - [Client, Properties], - TopicFilters1), - {ReasonCodes, NChannel} = process_unsubscribe(TopicFilters2, Channel), + TopicFilters = preprocess_unsubscribe(Properties, RawTopicFilters, Channel), + {ReasonCodes, NChannel} = process_unsubscribe(TopicFilters, Channel), handle_out({unsuback, PacketId, ReasonCodes}, NChannel); {error, ReasonCode} -> handle_out({disconnect, ReasonCode}, Channel) @@ -347,7 +334,7 @@ handle_in(?DISCONNECT_PACKET(RC, Properties), Channel = #channel{session = Sessi ?RC_SUCCESS -> Channel#channel{protocol = emqx_protocol:clear_will_msg(Protocol)}; _ -> Channel end, - Channel2 = ensure_disconnected(Channel1#channel{session = emqx_session:update_expiry_interval(Interval, Session)}), + Channel2 = Channel1#channel{session = emqx_session:update_expiry_interval(Interval, Session)}, case Interval of ?UINT_MAX -> {ok, ensure_timer(will_timer, Channel2)}; @@ -382,6 +369,7 @@ process_connect(ConnPkt, Channel) -> NChannel = Channel#channel{session = Session}, handle_out({connack, ?RC_SUCCESS, sp(false)}, NChannel); {ok, #{session := Session, present := true, pendings := Pendings}} -> + %%TODO: improve later. NPendings = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())), NChannel = Channel#channel{session = Session, resuming = true, @@ -397,39 +385,58 @@ process_connect(ConnPkt, Channel) -> %% Process Publish %%-------------------------------------------------------------------- -%% Process Publish -process_publish(Packet = ?PUBLISH_PACKET(_QoS, _Topic, PacketId), - Channel = #channel{client = Client, protocol = Protocol}) -> - Msg = emqx_packet:to_message(Client, Packet), - %%TODO: Improve later. - Msg1 = emqx_message:set_flag(dup, false, emqx_message:set_header(proto_ver, emqx_protocol:info(proto_ver, Protocol), Msg)), - process_publish(PacketId, mount(Client, Msg1), Channel). +process_publish(Packet = ?PUBLISH_PACKET(_QoS, _Topic, PacketId), Channel) -> + Msg = publish_to_msg(Packet, Channel), + process_publish(PacketId, Msg, Channel). process_publish(_PacketId, Msg = #message{qos = ?QOS_0}, Channel) -> _ = emqx_broker:publish(Msg), {ok, Channel}; process_publish(PacketId, Msg = #message{qos = ?QOS_1}, Channel) -> - Deliveries = emqx_broker:publish(Msg), - ReasonCode = emqx_reason_codes:puback(Deliveries), + ReasonCode = case emqx_broker:publish(Msg) of + [] -> ?RC_NO_MATCHING_SUBSCRIBERS; + _ -> ?RC_SUCCESS + end, handle_out({puback, PacketId, ReasonCode}, Channel); process_publish(PacketId, Msg = #message{qos = ?QOS_2}, Channel = #channel{session = Session}) -> case emqx_session:publish(PacketId, Msg, Session) of - {ok, Deliveries, NSession} -> - ReasonCode = emqx_reason_codes:puback(Deliveries), + {ok, Results, NSession} -> + RC = case Results of + [] -> ?RC_NO_MATCHING_SUBSCRIBERS; + _ -> ?RC_SUCCESS + end, NChannel = Channel#channel{session = NSession}, - handle_out({pubrec, PacketId, ReasonCode}, - ensure_timer(await_timer, NChannel)); - {error, ReasonCode} -> - handle_out({pubrec, PacketId, ReasonCode}, Channel) + handle_out({pubrec, PacketId, RC}, ensure_timer(await_timer, NChannel)); + {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} -> + ok = emqx_metrics:inc('packets.publish.inuse'), + handle_out({pubrec, PacketId, RC}, Channel); + {error, RC = ?RC_RECEIVE_MAXIMUM_EXCEEDED} -> + ?LOG(warning, "Dropped qos2 packet ~w due to awaiting_rel is full", [PacketId]), + ok = emqx_metrics:inc('messages.qos2.dropped'), + handle_out({pubrec, PacketId, RC}, Channel) end. +publish_to_msg(Packet, #channel{client = Client = #{mountpoint := MountPoint}}) -> + Msg = emqx_packet:to_message(Client, Packet), + Msg1 = emqx_message:set_flag(dup, false, Msg), + emqx_mountpoint:mount(MountPoint, Msg1). + %%-------------------------------------------------------------------- %% Process Subscribe %%-------------------------------------------------------------------- +-compile({inline, [preprocess_subscribe/3]}). +preprocess_subscribe(Properties, RawTopicFilters, #channel{client = Client}) -> + RunHook = fun(TopicFilters) -> + emqx_hooks:run_fold('client.subscribe', + [Client, Properties], TopicFilters) + end, + Enrich = fun(TopicFilters) -> enrich_subid(Properties, TopicFilters) end, + run_fold([fun parse_topic_filters/1, RunHook, Enrich], RawTopicFilters). + process_subscribe(TopicFilters, Channel) -> process_subscribe(TopicFilters, [], Channel). @@ -440,16 +447,18 @@ process_subscribe([{TopicFilter, SubOpts}|More], Acc, Channel) -> {RC, NChannel} = do_subscribe(TopicFilter, SubOpts, Channel), process_subscribe(More, [RC|Acc], NChannel). -do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, - Channel = #channel{client = Client, session = Session}) -> +do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel = + #channel{client = Client = #{mountpoint := MountPoint}, + session = Session}) -> case check_subscribe(TopicFilter, SubOpts, Channel) of - ok -> TopicFilter1 = mount(Client, TopicFilter), - SubOpts1 = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel), - case emqx_session:subscribe(Client, TopicFilter1, SubOpts1, Session) of - {ok, NSession} -> - {QoS, Channel#channel{session = NSession}}; - {error, RC} -> {RC, Channel} - end; + ok -> + TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter), + SubOpts1 = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel), + case emqx_session:subscribe(Client, TopicFilter1, SubOpts1, Session) of + {ok, NSession} -> + {QoS, Channel#channel{session = NSession}}; + {error, RC} -> {RC, Channel} + end; {error, RC} -> {RC, Channel} end. @@ -457,6 +466,15 @@ do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, %% Process Unsubscribe %%-------------------------------------------------------------------- +-compile({inline, [preprocess_unsubscribe/3]}). +preprocess_unsubscribe(Properties, RawTopicFilter, #channel{client = Client}) -> + RunHook = fun(TopicFilters) -> + emqx_hooks:run_fold('client.unsubscribe', + [Client, Properties], TopicFilters) + end, + run_fold([fun parse_topic_filters/1, RunHook], RawTopicFilter). + +-compile({inline, [process_unsubscribe/2]}). process_unsubscribe(TopicFilters, Channel) -> process_unsubscribe(TopicFilters, [], Channel). @@ -464,12 +482,14 @@ process_unsubscribe([], Acc, Channel) -> {lists:reverse(Acc), Channel}; process_unsubscribe([{TopicFilter, SubOpts}|More], Acc, Channel) -> - {RC, Channel1} = do_unsubscribe(TopicFilter, SubOpts, Channel), - process_unsubscribe(More, [RC|Acc], Channel1). + {RC, NChannel} = do_unsubscribe(TopicFilter, SubOpts, Channel), + process_unsubscribe(More, [RC|Acc], NChannel). -do_unsubscribe(TopicFilter, _SubOpts, - Channel = #channel{client = Client, session = Session}) -> - case emqx_session:unsubscribe(Client, mount(Client, TopicFilter), Session) of +do_unsubscribe(TopicFilter, _SubOpts, Channel = + #channel{client = Client = #{mountpoint := MountPoint}, + session = Session}) -> + TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter), + case emqx_session:unsubscribe(Client, TopicFilter1, Session) of {ok, NSession} -> {?RC_SUCCESS, Channel#channel{session = NSession}}; {error, RC} -> {RC, Channel} @@ -479,15 +499,15 @@ do_unsubscribe(TopicFilter, _SubOpts, %% Handle outgoing packet %%-------------------------------------------------------------------- +%%TODO: RunFold or Pipeline handle_out({connack, ?RC_SUCCESS, SP}, Channel = #channel{client = Client}) -> - ok = emqx_hooks:run('client.connected', - [Client, ?RC_SUCCESS, attrs(Channel)]), - AckProps = emqx_misc:run_fold([fun enrich_caps/2, - fun enrich_server_keepalive/2, - fun enrich_assigned_clientid/2 - ], #{}, Channel), - AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps), + AckProps = run_fold([fun enrich_caps/2, + fun enrich_server_keepalive/2, + fun enrich_assigned_clientid/2 + ], #{}, Channel), Channel1 = ensure_keepalive(AckProps, ensure_connected(Channel)), + ok = emqx_hooks:run('client.connected', [Client, ?RC_SUCCESS, attrs(Channel1)]), + AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps), case maybe_resume_session(Channel1) of ignore -> {ok, AckPacket, Channel1}; {ok, Publishes, NSession} -> @@ -502,7 +522,10 @@ handle_out({connack, ReasonCode}, Channel = #channel{client = Client, protocol = Protocol }) -> ok = emqx_hooks:run('client.connected', [Client, ReasonCode, attrs(Channel)]), - ProtoVer = emqx_protocol:info(proto_ver, Protocol), + ProtoVer = case Protocol of + undefined -> undefined; + _ -> emqx_protocol:info(proto_ver, Protocol) + end, ReasonCode1 = if ProtoVer == ?MQTT_PROTO_V5 -> ReasonCode; true -> emqx_reason_codes:compat(connack, ReasonCode) @@ -510,9 +533,10 @@ handle_out({connack, ReasonCode}, Channel = #channel{client = Client, Reason = emqx_reason_codes:name(ReasonCode1, ProtoVer), {stop, {shutdown, Reason}, ?CONNACK_PACKET(ReasonCode1), Channel}; -handle_out({deliver, Delivers}, Channel = #channel{session = Session, +handle_out({deliver, Delivers}, Channel = #channel{session = Session, connected = false}) -> - {ok, Channel#channel{session = emqx_session:enqueue(Delivers, Session)}}; + NSession = emqx_session:enqueue(Delivers, Session), + {ok, Channel#channel{session = NSession}}; handle_out({deliver, Delivers}, Channel = #channel{takeover = true, pendings = Pendings}) -> @@ -527,23 +551,33 @@ handle_out({deliver, Delivers}, Channel = #channel{session = Session}) -> {ok, Channel#channel{session = NSession}} end; -handle_out({publish, Publishes}, Channel) -> - Packets = lists:map( - fun(Publish) -> - element(2, handle_out(Publish, Channel)) - end, Publishes), - {ok, Packets, Channel}; +handle_out({publish, [Publish]}, Channel) -> + handle_out(Publish, Channel); -handle_out({publish, PacketId, Msg}, Channel = #channel{client = Client}) -> - Msg1 = emqx_hooks:run_fold('message.deliver', [Client], - emqx_message:update_expiry(Msg)), - Packet = emqx_packet:from_message(PacketId, unmount(Client, Msg1)), - {ok, Packet, Channel}; +handle_out({publish, Publishes}, Channel) when is_list(Publishes) -> + Packets = lists:foldl( + fun(Publish, Acc) -> + case handle_out(Publish, Channel) of + {ok, Packet, _Ch} -> + [Packet|Acc]; + {ok, _Ch} -> Acc + end + end, [], Publishes), + {ok, lists:reverse(Packets), Channel}; -%% TODO: How to handle the puberr? -handle_out({puberr, _ReasonCode}, Channel) -> +%% Ignore loop deliver +handle_out({publish, _PacketId, #message{from = ClientId, + flags = #{nl := true}}}, + Channel = #channel{client = #{client_id := ClientId}}) -> {ok, Channel}; +handle_out({publish, PacketId, Msg}, Channel = + #channel{client = Client = #{mountpoint := MountPoint}}) -> + Msg1 = emqx_message:update_expiry(Msg), + Msg2 = emqx_hooks:run_fold('message.delivered', [Client], Msg1), + Msg3 = emqx_mountpoint:unmount(MountPoint, Msg2), + {ok, emqx_packet:from_message(PacketId, Msg3), Channel}; + handle_out({puback, PacketId, ReasonCode}, Channel) -> {ok, ?PUBACK_PACKET(PacketId, ReasonCode), Channel}; @@ -556,25 +590,21 @@ handle_out({pubrec, PacketId, ReasonCode}, Channel) -> handle_out({pubcomp, PacketId, ReasonCode}, Channel) -> {ok, ?PUBCOMP_PACKET(PacketId, ReasonCode), Channel}; -handle_out({suback, PacketId, ReasonCodes}, - Channel = #channel{protocol = Protocol}) -> - ReasonCodes1 = - case emqx_protocol:info(proto_ver, Protocol) of - ?MQTT_PROTO_V5 -> ReasonCodes; - _Ver -> - [emqx_reason_codes:compat(suback, RC) || RC <- ReasonCodes] - end, +handle_out({suback, PacketId, ReasonCodes}, Channel = #channel{protocol = Protocol}) -> + ReasonCodes1 = case emqx_protocol:info(proto_ver, Protocol) of + ?MQTT_PROTO_V5 -> ReasonCodes; + _Ver -> + [emqx_reason_codes:compat(suback, RC) || RC <- ReasonCodes] + end, {ok, ?SUBACK_PACKET(PacketId, ReasonCodes1), Channel}; -handle_out({unsuback, PacketId, ReasonCodes}, - Channel = #channel{protocol = Protocol}) -> - Packet = case emqx_protocol:info(proto_ver, Protocol) of - ?MQTT_PROTO_V5 -> - ?UNSUBACK_PACKET(PacketId, ReasonCodes); - %% Ignore reason codes if not MQTT5 - _Ver -> ?UNSUBACK_PACKET(PacketId) - end, - {ok, Packet, Channel}; +handle_out({unsuback, PacketId, ReasonCodes}, Channel = #channel{protocol = Protocol}) -> + Unsuback = case emqx_protocol:info(proto_ver, Protocol) of + ?MQTT_PROTO_V5 -> + ?UNSUBACK_PACKET(PacketId, ReasonCodes); + _Ver -> ?UNSUBACK_PACKET(PacketId) + end, + {ok, Unsuback, Channel}; handle_out({disconnect, ReasonCode}, Channel = #channel{protocol = Protocol}) -> case emqx_protocol:info(proto_ver, Protocol) of @@ -595,6 +625,12 @@ handle_out({Type, Data}, Channel) -> %% Handle call %%-------------------------------------------------------------------- +handle_call(kick, Channel) -> + {stop, {shutdown, kicked}, ok, Channel}; + +handle_call(discard, Channel) -> + {stop, {shutdown, discarded}, ok, Channel}; + %% Session Takeover handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) -> {ok, Session, Channel#channel{takeover = true}}; @@ -613,6 +649,13 @@ handle_call(Req, Channel) -> %% Handle cast %%-------------------------------------------------------------------- +-spec(handle_cast(Msg :: term(), channel()) + -> ok | {ok, channel()} | {stop, Reason :: term(), channel()}). +handle_cast({register, Attrs, Stats}, #channel{client = #{client_id := ClientId}}) -> + ok = emqx_cm:register_channel(ClientId), + emqx_cm:set_chan_attrs(ClientId, Attrs), + emqx_cm:set_chan_stats(ClientId, Stats); + handle_cast(Msg, Channel) -> ?LOG(error, "Unexpected cast: ~p", [Msg]), {ok, Channel}. @@ -623,26 +666,24 @@ handle_cast(Msg, Channel) -> -spec(handle_info(Info :: term(), channel()) -> {ok, channel()} | {stop, Reason :: term(), channel()}). -handle_info({subscribe, TopicFilters}, Channel = #channel{client = Client}) -> - TopicFilters1 = emqx_hooks:run_fold('client.subscribe', - [Client, #{'Internal' => true}], - parse(subscribe, TopicFilters)), - {_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel), +handle_info({subscribe, RawTopicFilters}, Channel) -> + TopicFilters = preprocess_subscribe(#{'Internal' => true}, + RawTopicFilters, Channel), + {_ReasonCodes, NChannel} = process_subscribe(TopicFilters, Channel), {ok, NChannel}; -handle_info({unsubscribe, TopicFilters}, Channel = #channel{client = Client}) -> - TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe', - [Client, #{'Internal' => true}], - parse(unsubscribe, TopicFilters)), - {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel), +handle_info({unsubscribe, RawTopicFilters}, Channel) -> + TopicFilters = preprocess_unsubscribe(#{'Internal' => true}, + RawTopicFilters, Channel), + {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters, Channel), {ok, NChannel}; -handle_info(sock_closed, Channel = #channel{disconnected = true}) -> - {ok, Channel}; -handle_info(sock_closed, Channel = #channel{connected = false}) -> +handle_info(disconnected, Channel = #channel{connected = undefined}) -> shutdown(closed, Channel); -handle_info(sock_closed, Channel = #channel{protocol = Protocol, - session = Session}) -> + +handle_info(disconnected, Channel = #channel{protocol = Protocol, + session = Session}) -> + %% TODO: Why handle will_msg here? publish_will_msg(emqx_protocol:info(will_msg, Protocol)), NChannel = Channel#channel{protocol = emqx_protocol:clear_will_msg(Protocol)}, Interval = emqx_session:info(expiry_interval, Session), @@ -779,23 +820,20 @@ terminate(Reason, #channel{client = Client, true -> publish_will_msg(emqx_protocol:info(will_msg, Protocol)) end. +-spec(received(pos_integer(), channel()) -> channel()). +received(Oct, Channel) -> + ensure_timer(stats_timer, maybe_gc_and_check_oom(Oct, Channel)). + +-spec(sent(pos_integer(), channel()) -> channel()). +sent(Oct, Channel) -> + ensure_timer(stats_timer, maybe_gc_and_check_oom(Oct, Channel)). + %%TODO: Improve will msg:) publish_will_msg(undefined) -> ok; publish_will_msg(Msg) -> emqx_broker:publish(Msg). -%%-------------------------------------------------------------------- -%% GC the channel. -%%-------------------------------------------------------------------- - -gc(_Cnt, _Oct, Channel = #channel{gc_state = undefined}) -> - Channel; -gc(Cnt, Oct, Channel = #channel{gc_state = GCSt}) -> - {Ok, GCSt1} = emqx_gc:run(Cnt, Oct, GCSt), - Ok andalso emqx_metrics:inc('channel.gc.cnt'), - Channel#channel{gc_state = GCSt1}. - %% @doc Validate incoming packet. -spec(validate_packet(emqx_types:packet(), channel()) -> ok | {error, emqx_types:reason_code()}). @@ -899,42 +937,38 @@ init_protocol(ConnPkt, Channel) -> %% Enrich client %%-------------------------------------------------------------------- -enrich_client(ConnPkt, Channel) -> - pipeline([fun set_username/2, - fun maybe_use_username_as_clientid/2, - fun maybe_assign_clientid/2, - fun set_rest_client_fields/2], ConnPkt, Channel). +enrich_client(ConnPkt = #mqtt_packet_connect{is_bridge = IsBridge}, + Channel = #channel{client = Client}) -> + {ok, NConnPkt, NClient} = pipeline([fun set_username/2, + fun maybe_username_as_clientid/2, + fun maybe_assign_clientid/2, + fun fix_mountpoint/2 + ], ConnPkt, Client), + {ok, NConnPkt, Channel#channel{client = NClient#{is_bridge => IsBridge}}}. -maybe_use_username_as_clientid(_ConnPkt, Channel = #channel{client = #{username := undefined}}) -> - {ok, Channel}; -maybe_use_username_as_clientid(_ConnPkt, Channel = #channel{client = Client = #{zone := Zone, - username := Username}}) -> - NClient = - case emqx_zone:get_env(Zone, use_username_as_clientid, false) of - true -> Client#{client_id => Username}; - false -> Client - end, - {ok, Channel#channel{client = NClient}}. +%% Username may be not undefined if peer_cert_as_username +set_username(#mqtt_packet_connect{username = Username}, Client = #{username := undefined}) -> + {ok, Client#{username => Username}}; +set_username(_ConnPkt, Client) -> + {ok, Client}. -maybe_assign_clientid(#mqtt_packet_connect{client_id = <<>>}, - Channel = #channel{client = Client}) -> +maybe_username_as_clientid(_ConnPkt, Client = #{username := undefined}) -> + {ok, Client}; +maybe_username_as_clientid(_ConnPkt, Client = #{zone := Zone, username := Username}) -> + case emqx_zone:get_env(Zone, use_username_as_clientid, false) of + true -> {ok, Client#{client_id => Username}}; + false -> ok + end. + +maybe_assign_clientid(#mqtt_packet_connect{client_id = <<>>}, Client) -> RandClientId = emqx_guid:to_base62(emqx_guid:gen()), - {ok, Channel#channel{client = Client#{client_id => RandClientId}}}; + {ok, Client#{client_id => RandClientId}}; +maybe_assign_clientid(#mqtt_packet_connect{client_id = ClientId}, Client) -> + {ok, Client#{client_id => ClientId}}. -maybe_assign_clientid(#mqtt_packet_connect{client_id = ClientId}, - Channel = #channel{client = Client}) -> - {ok, Channel#channel{client = Client#{client_id => ClientId}}}. - -%% Username maybe not undefined if peer_cert_as_username -set_username(#mqtt_packet_connect{username = Username}, - Channel = #channel{client = Client = #{username := undefined}}) -> - {ok, Channel#channel{client = Client#{username => Username}}}; -set_username(_ConnPkt, Channel) -> - {ok, Channel}. - -set_rest_client_fields(#mqtt_packet_connect{is_bridge = IsBridge}, - Channel = #channel{client = Client}) -> - {ok, Channel#channel{client = Client#{is_bridge => IsBridge}}}. +fix_mountpoint(_ConnPkt, #{mountpoint := undefined}) -> ok; +fix_mountpoint(_ConnPkt, Client = #{mountpoint := Mountpoint}) -> + {ok, Client#{mountpoint := emqx_mountpoint:replvar(Mountpoint, Client)}}. %% @doc Set logger metadata. set_logger_meta(_ConnPkt, #channel{client = #{client_id := ClientId}}) -> @@ -1016,7 +1050,8 @@ check_publish(Packet, Channel) -> %% Check Pub ACL check_pub_acl(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = Topic}}, #channel{client = Client}) -> - case is_acl_enabled(Client) andalso check_acl(Client, publish, Topic) of + case is_acl_enabled(Client) andalso + emqx_access_control:check_acl(Client, publish, Topic) of false -> ok; allow -> ok; deny -> {error, ?RC_NOT_AUTHORIZED} @@ -1057,7 +1092,7 @@ check_subscribe(TopicFilter, SubOpts, Channel) -> %% Check Sub ACL check_sub_acl(TopicFilter, #channel{client = Client}) -> case is_acl_enabled(Client) andalso - check_acl(Client, subscribe, TopicFilter) of + emqx_access_control:check_acl(Client, subscribe, TopicFilter) of false -> allow; Result -> Result end. @@ -1116,10 +1151,10 @@ enrich_assigned_clientid(AckProps, #channel{client = #{client_id := ClientId}, end. ensure_connected(Channel) -> - Channel#channel{connected = true, connected_at = os:timestamp(), disconnected = false}. + Channel#channel{connected = true, connected_at = os:timestamp(), disconnected_at = undefined}. ensure_disconnected(Channel) -> - Channel#channel{connected = false, disconnected_at = os:timestamp(), disconnected = true}. + Channel#channel{connected = false, disconnected_at = os:timestamp()}. ensure_keepalive(#{'Server-Keep-Alive' := Interval}, Channel) -> ensure_keepalive_timer(Interval, Channel); @@ -1147,53 +1182,32 @@ maybe_resume_session(#channel{session = Session, {ok, lists:append(Publishes, More), Session2} end. -%%-------------------------------------------------------------------- -%% Is ACL enabled? -%%-------------------------------------------------------------------- - +%% @doc Is ACL enabled? is_acl_enabled(#{zone := Zone, is_superuser := IsSuperuser}) -> (not IsSuperuser) andalso emqx_zone:get_env(Zone, enable_acl, true). -%%-------------------------------------------------------------------- -%% Parse Topic Filters -%%-------------------------------------------------------------------- - -parse(subscribe, TopicFilters) -> - [emqx_topic:parse(TopicFilter, SubOpts) || {TopicFilter, SubOpts} <- TopicFilters]; - -parse(unsubscribe, TopicFilters) -> +%% @doc Parse Topic Filters +-compile({inline, [parse_topic_filters/1]}). +parse_topic_filters(TopicFilters) -> lists:map(fun emqx_topic:parse/1, TopicFilters). %%-------------------------------------------------------------------- -%% Mount/Unmount +%% Maybe GC and Check OOM %%-------------------------------------------------------------------- -mount(Client = #{mountpoint := MountPoint}, TopicOrMsg) -> - emqx_mountpoint:mount( - emqx_mountpoint:replvar(MountPoint, Client), TopicOrMsg). +maybe_gc_and_check_oom(_Oct, Channel = #channel{gc_state = undefined}) -> + Channel; +maybe_gc_and_check_oom(Oct, Channel = #channel{gc_state = GCSt, + oom_policy = OomPolicy}) -> + {IsGC, GCSt1} = emqx_gc:run(1, Oct, GCSt), + IsGC andalso emqx_metrics:inc('channel.gc.cnt'), + IsGC andalso maybe_apply(fun check_oom/1, OomPolicy), + Channel#channel{gc_state = GCSt1}. -unmount(Client = #{mountpoint := MountPoint}, TopicOrMsg) -> - emqx_mountpoint:unmount( - emqx_mountpoint:replvar(MountPoint, Client), TopicOrMsg). - -%%-------------------------------------------------------------------- -%% Pipeline -%%-------------------------------------------------------------------- - -pipeline([], Packet, Channel) -> - {ok, Packet, Channel}; - -pipeline([Fun|More], Packet, Channel) -> - case Fun(Packet, Channel) of - ok -> pipeline(More, Packet, Channel); - {ok, NChannel} -> - pipeline(More, Packet, NChannel); - {ok, NPacket, NChannel} -> - pipeline(More, NPacket, NChannel); - {error, ReasonCode} -> - {error, ReasonCode, Channel}; - {error, ReasonCode, NChannel} -> - {error, ReasonCode, NChannel} +check_oom(OomPolicy) -> + case emqx_oom:check(OomPolicy) of + ok -> ok; + Shutdown -> self() ! Shutdown end. %%-------------------------------------------------------------------- diff --git a/src/emqx_cli.erl b/src/emqx_cli.erl deleted file mode 100644 index 3deb50b07..000000000 --- a/src/emqx_cli.erl +++ /dev/null @@ -1,40 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_cli). - --export([ print/1 - , print/2 - , usage/1 - , usage/2 - ]). - -print(Msg) -> - io:format(Msg), lists:flatten(io_lib:format("~p", [Msg])). - -print(Format, Args) -> - io:format(Format, Args), lists:flatten(io_lib:format(Format, Args)). - -usage(CmdList) -> - lists:map( - fun({Cmd, Descr}) -> - io:format("~-48s# ~s~n", [Cmd, Descr]), - lists:flatten(io_lib:format("~-48s# ~s~n", [Cmd, Descr])) - end, CmdList). - -usage(Format, Args) -> - usage([{Format, Args}]). - diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 3673085ac..5f003807f 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -174,7 +174,7 @@ open_session(false, Client = #{client_id := ClientId}, Options) -> case takeover_session(ClientId) of {ok, ConnMod, ChanPid, Session} -> ok = emqx_session:resume(ClientId, Session), - Pendings = ConnMod:takeover(ChanPid, 'end'), + Pendings = ConnMod:call(ChanPid, {takeover, 'end'}), {ok, #{session => Session, present => true, pendings => Pendings}}; @@ -205,7 +205,7 @@ takeover_session(ClientId) -> takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> case get_chan_attrs(ClientId, ChanPid) of #{client := #{conn_mod := ConnMod}} -> - Session = ConnMod:takeover(ChanPid, 'begin'), + Session = ConnMod:call(ChanPid, {takeover, 'begin'}), {ok, ConnMod, ChanPid, Session}; undefined -> {error, not_found} @@ -234,7 +234,7 @@ discard_session(ClientId) when is_binary(ClientId) -> discard_session(ClientId, ChanPid) when node(ChanPid) == node() -> case get_chan_attrs(ClientId, ChanPid) of #{client := #{conn_mod := ConnMod}} -> - ConnMod:discard(ChanPid); + ConnMod:call(ChanPid, discard); undefined -> ok end; diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 93daf4cba..54da647fb 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -32,21 +32,15 @@ -export([ info/1 , attrs/1 , stats/1 + , state/1 ]). -%% For Debug --export([get_state/1]). - --export([ kick/1 - , discard/1 - , takeover/2 - ]). +-export([call/2]). %% state callbacks -export([ idle/3 , connected/3 , disconnected/3 - , takeovering/3 ]). %% gen_statem callbacks @@ -56,28 +50,44 @@ , terminate/3 ]). --record(state, { - transport :: esockd:transport(), - socket :: esockd:socket(), - peername :: emqx_types:peername(), - sockname :: emqx_types:peername(), - conn_state :: running | blocked, - active_n :: pos_integer(), - rate_limit :: maybe(esockd_rate_limit:bucket()), - pub_limit :: maybe(esockd_rate_limit:bucket()), +-record(connection, { + %% TCP/TLS Transport + transport :: esockd:transport(), + %% TCP/TLS Socket + socket :: esockd:socket(), + %% Peername of the connection + peername :: emqx_types:peername(), + %% Sockname of the connection + sockname :: emqx_types:peername(), + %% The {active, N} option + active_n :: pos_integer(), + %% The active state + active_state :: running | blocked, + %% Rate Limit + rate_limit :: maybe(esockd_rate_limit:bucket()), + %% Publish Limit + pub_limit :: maybe(esockd_rate_limit:bucket()), + %% Limit Timer limit_timer :: maybe(reference()), + %% Parser State parse_state :: emqx_frame:parse_state(), - serialize :: fun((emqx_types:packet()) -> iodata()), - chan_state :: emqx_channel:channel() + %% Serialize function + serialize :: fun((emqx_types:packet()) -> iodata()), + %% Channel State + chan_state :: emqx_channel:channel() }). --type(state() :: #state{}). +-type(connection() :: #connection{}). -define(ACTIVE_N, 100). -define(HANDLE(T, C, D), handle((T), (C), (D))). +-define(ATTR_KEYS, [socktype, peername, sockname]). +-define(INFO_KEYS, [socktype, peername, sockname, active_n, active_state, + rate_limit, pub_limit]). -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). +%% @doc Start the connection. -spec(start_link(esockd:transport(), esockd:socket(), proplists:proplist()) -> {ok, pid()}). start_link(Transport, Socket, Options) -> @@ -87,56 +97,53 @@ start_link(Transport, Socket, Options) -> %% API %%-------------------------------------------------------------------- -%% @doc Get infos of the channel. --spec(info(pid() | state()) -> emqx_types:infos()). +%% @doc Get infos of the connection. +-spec(info(pid()|connection()) -> emqx_types:infos()). info(CPid) when is_pid(CPid) -> call(CPid, info); -info(#state{transport = Transport, - socket = Socket, - peername = Peername, - sockname = Sockname, - conn_state = ConnState, - active_n = ActiveN, - rate_limit = RateLimit, - pub_limit = PubLimit, - chan_state = ChanState}) -> - ConnInfo = #{socktype => Transport:type(Socket), - peername => Peername, - sockname => Sockname, - conn_state => ConnState, - active_n => ActiveN, - rate_limit => limit_info(RateLimit), - pub_limit => limit_info(PubLimit) - }, +info(Conn = #connection{chan_state = ChanState}) -> + ConnInfo = info(?INFO_KEYS, Conn), ChanInfo = emqx_channel:info(ChanState), - maps:merge(ConnInfo, ChanInfo). + maps:merge(ChanInfo, #{connection => maps:from_list(ConnInfo)}). + +info(Keys, Conn) when is_list(Keys) -> + [{Key, info(Key, Conn)} || Key <- Keys]; +info(socktype, #connection{transport = Transport, socket = Socket}) -> + Transport:type(Socket); +info(peername, #connection{peername = Peername}) -> + Peername; +info(sockname, #connection{sockname = Sockname}) -> + Sockname; +info(active_n, #connection{active_n = ActiveN}) -> + ActiveN; +info(active_state, #connection{active_state = ActiveSt}) -> + ActiveSt; +info(rate_limit, #connection{rate_limit = RateLimit}) -> + limit_info(RateLimit); +info(pub_limit, #connection{pub_limit = PubLimit}) -> + limit_info(PubLimit); +info(chan_state, #connection{chan_state = ChanState}) -> + emqx_channel:info(ChanState). limit_info(Limit) -> emqx_misc:maybe_apply(fun esockd_rate_limit:info/1, Limit). -%% @doc Get attrs of the channel. --spec(attrs(pid() | state()) -> emqx_types:attrs()). +%% @doc Get attrs of the connection. +-spec(attrs(pid()|connection()) -> emqx_types:attrs()). attrs(CPid) when is_pid(CPid) -> call(CPid, attrs); -attrs(#state{transport = Transport, - socket = Socket, - peername = Peername, - sockname = Sockname, - chan_state = ChanState}) -> - ConnAttrs = #{socktype => Transport:type(Socket), - peername => Peername, - sockname => Sockname - }, +attrs(Conn = #connection{chan_state = ChanState}) -> + ConnAttrs = info(?ATTR_KEYS, Conn), ChanAttrs = emqx_channel:attrs(ChanState), - maps:merge(ConnAttrs, ChanAttrs). + maps:merge(ChanAttrs, #{connection => maps:from_list(ConnAttrs)}). %% @doc Get stats of the channel. --spec(stats(pid() | state()) -> emqx_types:stats()). +-spec(stats(pid()|connection()) -> emqx_types:stats()). stats(CPid) when is_pid(CPid) -> call(CPid, stats); -stats(#state{transport = Transport, - socket = Socket, - chan_state = ChanState}) -> +stats(#connection{transport = Transport, + socket = Socket, + chan_state = ChanState}) -> ProcStats = emqx_misc:proc_stats(), SockStats = case Transport:getstat(Socket, ?SOCK_STATS) of {ok, Ss} -> Ss; @@ -146,25 +153,13 @@ stats(#state{transport = Transport, ChanStats = emqx_channel:stats(ChanState), lists:append([ProcStats, SockStats, ConnStats, ChanStats]). --spec(get_state(pid()) -> state()). -get_state(CPid) -> - call(CPid, get_state). +%% For debug +-spec(state(pid()) -> connection()). +state(CPid) -> call(CPid, state). --spec(kick(pid()) -> ok). -kick(CPid) -> - call(CPid, kick). - --spec(discard(pid()) -> ok). -discard(CPid) -> - gen_statem:cast(CPid, discard). - --spec(takeover(pid(), 'begin'|'end') -> Result :: term()). -takeover(CPid, Phase) -> - gen_statem:call(CPid, {takeover, Phase}). - -%% @private -call(CPid, Req) -> - gen_statem:call(CPid, Req, infinity). +%% kick|discard|takeover +-spec(call(pid(), Req :: term()) -> Reply :: term()). +call(CPid, Req) -> gen_statem:call(CPid, Req). %%-------------------------------------------------------------------- %% gen_statem callbacks @@ -187,22 +182,21 @@ init({Transport, RawSocket, Options}) -> peercert => Peercert, conn_mod => ?MODULE}, Options), IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000), - State = #state{transport = Transport, - socket = Socket, - peername = Peername, - sockname = Sockname, - conn_state = running, - active_n = ActiveN, - rate_limit = RateLimit, - pub_limit = PubLimit, - parse_state = ParseState, - chan_state = ChanState - }, + State = #connection{transport = Transport, + socket = Socket, + peername = Peername, + sockname = Sockname, + active_n = ActiveN, + active_state = running, + rate_limit = RateLimit, + pub_limit = PubLimit, + parse_state = ParseState, + chan_state = ChanState + }, gen_statem:enter_loop(?MODULE, [{hibernate_after, 2 * IdleTimout}], idle, State, self(), [IdleTimout]). -init_limiter(undefined) -> - undefined; +init_limiter(undefined) -> undefined; init_limiter({Rate, Burst}) -> esockd_rate_limit:new(Rate, Burst). @@ -220,16 +214,13 @@ idle(enter, _, State) -> end; idle(timeout, _Timeout, State) -> - stop(idle_timeout, State); + shutdown(idle_timeout, State); -idle(cast, {incoming, Packet = ?CONNECT_PACKET( - #mqtt_packet_connect{ - proto_ver = ProtoVer} - )}, State) -> - State1 = State#state{serialize = serialize_fun(ProtoVer)}, - handle_incoming(Packet, fun(NewSt) -> - {next_state, connected, NewSt} - end, State1); +idle(cast, {incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) -> + #mqtt_packet_connect{proto_ver = ProtoVer} = ConnPkt, + NState = State#connection{serialize = serialize_fun(ProtoVer)}, + SuccFun = fun(NewSt) -> {next_state, connected, NewSt} end, + handle_incoming(Packet, SuccFun, NState); idle(cast, {incoming, Packet}, State) -> ?LOG(warning, "Unexpected incoming: ~p", [Packet]), @@ -241,17 +232,10 @@ idle(EventType, Content, State) -> %%-------------------------------------------------------------------- %% Connected State -connected(enter, _PrevSt, State = #state{chan_state = ChanState}) -> - #{client_id := ClientId} = emqx_channel:info(client, ChanState), - ok = emqx_cm:register_channel(ClientId), - ok = emqx_cm:set_chan_attrs(ClientId, attrs(State)), +connected(enter, _PrevSt, State) -> + ok = register_self(State), keep_state_and_data; -connected(cast, {incoming, Packet = ?PACKET(?CONNECT)}, State) -> - ?LOG(warning, "Unexpected connect: ~p", [Packet]), - Shutdown = fun(NewSt) -> shutdown(?RC_PROTOCOL_ERROR, NewSt) end, - handle_outgoing(?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), Shutdown, State); - connected(cast, {incoming, Packet}, State) when is_record(Packet, mqtt_packet) -> handle_incoming(Packet, fun keep_state/1, State); @@ -264,10 +248,14 @@ connected(EventType, Content, State) -> %%-------------------------------------------------------------------- %% Disconnected State -disconnected(enter, _, _State) -> - %% TODO: What to do? - %% CleanStart is true - keep_state_and_data; +disconnected(enter, _, State = #connection{chan_state = ChanState}) -> + case emqx_channel:handle_info(disconnected, ChanState) of + {ok, NChanState} -> + ok = register_self(State#connection{chan_state = NChanState}), + keep_state(State#connection{chan_state = NChanState}); + {stop, Reason, NChanState} -> + stop(Reason, State#connection{chan_state = NChanState}) + end; disconnected(info, Deliver = {deliver, _Topic, _Msg}, State) -> handle_deliver([Deliver], State); @@ -276,15 +264,8 @@ disconnected(EventType, Content, State) -> ?HANDLE(EventType, Content, State). %%-------------------------------------------------------------------- -%% Takeovering State - -takeovering(enter, _PreState, State) -> - {keep_state, State}; - -takeovering(EventType, Content, State) -> - ?HANDLE(EventType, Content, State). - %% Handle call + handle({call, From}, info, State) -> reply(From, info(State), State); @@ -294,60 +275,53 @@ handle({call, From}, attrs, State) -> handle({call, From}, stats, State) -> reply(From, stats(State), State); -handle({call, From}, get_state, State) -> +handle({call, From}, state, State) -> reply(From, State, State); -handle({call, From}, kick, State) -> - ok = gen_statem:reply(From, ok), - shutdown(kicked, State); - -handle({call, From}, Req, State = #state{chan_state = ChanState}) -> +handle({call, From}, Req, State = #connection{chan_state = ChanState}) -> case emqx_channel:handle_call(Req, ChanState) of {ok, Reply, NChanState} -> - reply(From, Reply, State#state{chan_state = NChanState}); + reply(From, Reply, State#connection{chan_state = NChanState}); {stop, Reason, Reply, NChanState} -> ok = gen_statem:reply(From, Reply), - stop(Reason, State#state{chan_state = NChanState}) + stop(Reason, State#connection{chan_state = NChanState}) end; -handle(cast, discard, State) -> - shutdown(discarded, State); - +%%-------------------------------------------------------------------- %% Handle cast -handle(cast, Msg, State = #state{chan_state = ChanState}) -> + +handle(cast, Msg, State = #connection{chan_state = ChanState}) -> case emqx_channel:handle_cast(Msg, ChanState) of {ok, NChanState} -> - keep_state(State#state{chan_state = NChanState}); + keep_state(State#connection{chan_state = NChanState}); {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}) + stop(Reason, State#connection{chan_state = NChanState}) end; +%%-------------------------------------------------------------------- +%% Handle info + %% Handle incoming data -handle(info, {Inet, _Sock, Data}, State = #state{chan_state = ChanState}) +handle(info, {Inet, _Sock, Data}, State = #connection{chan_state = ChanState}) when Inet == tcp; Inet == ssl -> - Oct = iolist_size(Data), ?LOG(debug, "RECV ~p", [Data]), + Oct = iolist_size(Data), emqx_pd:update_counter(incoming_bytes, Oct), ok = emqx_metrics:inc('bytes.received', Oct), - NChanState = emqx_channel:ensure_timer( - stats_timer, emqx_channel:gc(1, Oct, ChanState)), - process_incoming(Data, State#state{chan_state = NChanState}); + NChanState = emqx_channel:received(Oct, ChanState), + NState = State#connection{chan_state = NChanState}, + process_incoming(Data, NState); handle(info, {Error, _Sock, Reason}, State) when Error == tcp_error; Error == ssl_error -> shutdown(Reason, State); -handle(info, {Closed, _Sock}, State = #state{chan_state = ChanState}) +handle(info, {Closed, _Sock}, State) when Closed == tcp_closed; Closed == ssl_closed -> - case emqx_channel:handle_info(sock_closed, ChanState) of - {ok, NChanState} -> - {next_state, disconnected, State#state{chan_state = NChanState}}; - {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}) - end; + {next_state, disconnected, State}; -handle(info, {Passive, _Sock}, State) when Passive == tcp_passive; - Passive == ssl_passive -> +handle(info, {Passive, _Sock}, State) + when Passive == tcp_passive; Passive == ssl_passive -> %% Rate limit here:) NState = ensure_rate_limit(State), case activate_socket(NState) of @@ -358,25 +332,24 @@ handle(info, {Passive, _Sock}, State) when Passive == tcp_passive; handle(info, activate_socket, State) -> %% Rate limit timer expired. - NState = State#state{conn_state = running}, + NState = State#connection{active_state = running, + limit_timer = undefined + }, case activate_socket(NState) of - ok -> - keep_state(NState#state{limit_timer = undefined}); + ok -> keep_state(NState); {error, Reason} -> shutdown(Reason, NState) end; -handle(info, {inet_reply, _Sock, ok}, State = #state{chan_state = ChanState}) -> +handle(info, {inet_reply, _Sock, ok}, _State) -> %% something sent - NChanState = emqx_channel:ensure_timer(stats_timer, ChanState), - keep_state(State#state{chan_state = NChanState}); + keep_state_and_data; handle(info, {inet_reply, _Sock, {error, Reason}}, State) -> shutdown(Reason, State); handle(info, {timeout, TRef, keepalive}, - State = #state{transport = Transport, socket = Socket}) - when is_reference(TRef) -> + State = #connection{transport = Transport, socket = Socket}) -> case Transport:getstat(Socket, [recv_oct]) of {ok, [{recv_oct, RecvOct}]} -> handle_timeout(TRef, {keepalive, RecvOct}, State); @@ -384,37 +357,41 @@ handle(info, {timeout, TRef, keepalive}, shutdown(Reason, State) end; -handle(info, {timeout, TRef, emit_stats}, State) when is_reference(TRef) -> +handle(info, {timeout, TRef, emit_stats}, State) -> handle_timeout(TRef, {emit_stats, stats(State)}, State); -handle(info, {timeout, TRef, Msg}, State) when is_reference(TRef) -> +handle(info, {timeout, TRef, Msg}, State) -> handle_timeout(TRef, Msg, State); -handle(info, {shutdown, conflict, {ClientId, NewPid}}, State) -> - ?LOG(warning, "Clientid '~s' conflict with ~p", [ClientId, NewPid]), - shutdown(conflict, State); - handle(info, {shutdown, Reason}, State) -> shutdown(Reason, State); -handle(info, Info, State = #state{chan_state = ChanState}) -> +handle(info, Info, State = #connection{chan_state = ChanState}) -> case emqx_channel:handle_info(Info, ChanState) of {ok, NChanState} -> - keep_state(State#state{chan_state = NChanState}); + keep_state(State#connection{chan_state = NChanState}); {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}) + stop(Reason, State#connection{chan_state = NChanState}) end. code_change(_Vsn, State, Data, _Extra) -> {ok, State, Data}. -terminate(Reason, _StateName, #state{transport = Transport, - socket = Socket, - chan_state = ChanState}) -> +terminate(Reason, _StateName, State) -> + #connection{transport = Transport, + socket = Socket, + chan_state = ChanState} = State, ?LOG(debug, "Terminated for ~p", [Reason]), ok = Transport:fast_close(Socket), emqx_channel:terminate(Reason, ChanState). +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +register_self(State = #connection{chan_state = ChanState}) -> + emqx_channel:handle_cast({register, attrs(State), stats(State)}, ChanState). + %%-------------------------------------------------------------------- %% Process incoming data @@ -425,13 +402,13 @@ process_incoming(Data, State) -> process_incoming(<<>>, Packets, State) -> {keep_state, State, next_incoming_events(Packets)}; -process_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> +process_incoming(Data, Packets, State = #connection{parse_state = ParseState, chan_state = ChanState}) -> try emqx_frame:parse(Data, ParseState) of {ok, NParseState} -> - NState = State#state{parse_state = NParseState}, + NState = State#connection{parse_state = NParseState}, {keep_state, NState, next_incoming_events(Packets)}; {ok, Packet, Rest, NParseState} -> - NState = State#state{parse_state = NParseState}, + NState = State#connection{parse_state = NParseState}, process_incoming(Rest, [Packet|Packets], NState); {error, Reason} -> shutdown(Reason, State) @@ -439,54 +416,64 @@ process_incoming(Data, Packets, State = #state{parse_state = ParseState}) -> error:Reason:Stk -> ?LOG(error, "Parse failed for ~p~n\ Stacktrace:~p~nError data:~p", [Reason, Stk, Data]), - shutdown(parse_error, State) + case emqx_channel:handle_out({disconnect, emqx_reason_codes:mqtt_frame_error(Reason)}, ChanState) of + {stop, Reason0, OutPackets, NChanState} -> + Shutdown = fun(NewSt) -> stop(Reason0, NewSt) end, + NState = State#connection{chan_state = NChanState}, + handle_outgoing(OutPackets, Shutdown, NState); + {stop, Reason0, NChanState} -> + stop(Reason0, State#connection{chan_state = NChanState}) + end end. -next_incoming_events(Packets) when is_list(Packets) -> +-compile({inline, [next_incoming_events/1]}). +next_incoming_events(Packets) -> [next_event(cast, {incoming, Packet}) || Packet <- Packets]. %%-------------------------------------------------------------------- %% Handle incoming packet handle_incoming(Packet = ?PACKET(Type), SuccFun, - State = #state{chan_state = ChanState}) -> + State = #connection{chan_state = ChanState}) -> _ = inc_incoming_stats(Type), ok = emqx_metrics:inc_recv(Packet), ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), case emqx_channel:handle_in(Packet, ChanState) of {ok, NChanState} -> - SuccFun(State#state{chan_state= NChanState}); + SuccFun(State#connection{chan_state= NChanState}); {ok, OutPackets, NChanState} -> - handle_outgoing(OutPackets, SuccFun, State#state{chan_state = NChanState}); + handle_outgoing(OutPackets, SuccFun, + State#connection{chan_state = NChanState}); {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}); - {stop, Reason, OutPacket, NChanState} -> - Shutdown = fun(NewSt) -> shutdown(Reason, NewSt) end, - handle_outgoing(OutPacket, Shutdown, State#state{chan_state = NChanState}) + stop(Reason, State#connection{chan_state = NChanState}); + {stop, Reason, OutPackets, NChanState} -> + Shutdown = fun(NewSt) -> stop(Reason, NewSt) end, + NState = State#connection{chan_state = NChanState}, + handle_outgoing(OutPackets, Shutdown, NState) end. %%------------------------------------------------------------------- %% Handle deliver -handle_deliver(Delivers, State = #state{chan_state = ChanState}) -> +handle_deliver(Delivers, State = #connection{chan_state = ChanState}) -> case emqx_channel:handle_out({deliver, Delivers}, ChanState) of {ok, NChanState} -> - keep_state(State#state{chan_state = NChanState}); + keep_state(State#connection{chan_state = NChanState}); {ok, Packets, NChanState} -> - NState = State#state{chan_state = NChanState}, + NState = State#connection{chan_state = NChanState}, handle_outgoing(Packets, fun keep_state/1, NState); {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}) + stop(Reason, State#connection{chan_state = NChanState}) end. %%-------------------------------------------------------------------- %% Handle outgoing packets -handle_outgoing(Packets, SuccFun, State = #state{serialize = Serialize}) +handle_outgoing(Packets, SuccFun, State = #connection{serialize = Serialize}) when is_list(Packets) -> send(lists:map(Serialize, Packets), SuccFun, State); -handle_outgoing(Packet, SuccFun, State = #state{serialize = Serialize}) -> +handle_outgoing(Packet, SuccFun, State = #connection{serialize = Serialize}) -> send(Serialize(Packet), SuccFun, State). %%-------------------------------------------------------------------- @@ -496,18 +483,21 @@ serialize_fun(ProtoVer) -> fun(Packet = ?PACKET(Type)) -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]), _ = inc_outgoing_stats(Type), + _ = emqx_metrics:inc_sent(Packet), emqx_frame:serialize(Packet, ProtoVer) end. %%-------------------------------------------------------------------- %% Send data -send(IoData, SuccFun, State = #state{transport = Transport, - socket = Socket}) -> +send(IoData, SuccFun, State = #connection{transport = Transport, + socket = Socket, + chan_state = ChanState}) -> Oct = iolist_size(IoData), ok = emqx_metrics:inc('bytes.sent', Oct), case Transport:async_send(Socket, IoData) of - ok -> SuccFun(State); + ok -> NChanState = emqx_channel:sent(Oct, ChanState), + SuccFun(State#connection{chan_state = NChanState}); {error, Reason} -> shutdown(Reason, State) end. @@ -515,49 +505,51 @@ send(IoData, SuccFun, State = #state{transport = Transport, %%-------------------------------------------------------------------- %% Handle timeout -handle_timeout(TRef, Msg, State = #state{chan_state = ChanState}) -> +handle_timeout(TRef, Msg, State = #connection{chan_state = ChanState}) -> case emqx_channel:timeout(TRef, Msg, ChanState) of {ok, NChanState} -> - keep_state(State#state{chan_state = NChanState}); + keep_state(State#connection{chan_state = NChanState}); {ok, Packets, NChanState} -> handle_outgoing(Packets, fun keep_state/1, - State#state{chan_state = NChanState}); + State#connection{chan_state = NChanState}); {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}) + stop(Reason, State#connection{chan_state = NChanState}) end. - %%-------------------------------------------------------------------- %% Ensure rate limit -ensure_rate_limit(State = #state{rate_limit = Rl, pub_limit = Pl}) -> - Limiters = [{Pl, #state.pub_limit, emqx_pd:reset_counter(incoming_pubs)}, - {Rl, #state.rate_limit, emqx_pd:reset_counter(incoming_bytes)}], +-define(ENABLED(Rl), (Rl =/= undefined)). + +ensure_rate_limit(State = #connection{rate_limit = Rl, pub_limit = Pl}) -> + Pubs = emqx_pd:reset_counter(incoming_pubs), + Bytes = emqx_pd:reset_counter(incoming_bytes), + Limiters = [{Pl, #connection.pub_limit, Pubs} || ?ENABLED(Pl)] ++ + [{Rl, #connection.rate_limit, Bytes} || ?ENABLED(Rl)], ensure_rate_limit(Limiters, State). ensure_rate_limit([], State) -> State; -ensure_rate_limit([{undefined, _Pos, _Cnt}|Limiters], State) -> - ensure_rate_limit(Limiters, State); ensure_rate_limit([{Rl, Pos, Cnt}|Limiters], State) -> case esockd_rate_limit:check(Cnt, Rl) of {0, Rl1} -> ensure_rate_limit(Limiters, setelement(Pos, State, Rl1)); {Pause, Rl1} -> - ?LOG(debug, "Rate limit pause connection ~pms", [Pause]), + ?LOG(debug, "Pause ~pms due to rate limit", [Pause]), TRef = erlang:send_after(Pause, self(), activate_socket), - setelement(Pos, State#state{conn_state = blocked, - limit_timer = TRef}, Rl1) + NState = State#connection{active_state = blocked, + limit_timer = TRef}, + setelement(Pos, NState, Rl1) end. %%-------------------------------------------------------------------- %% Activate Socket -activate_socket(#state{conn_state = blocked}) -> +activate_socket(#connection{active_state = blocked}) -> ok; -activate_socket(#state{transport = Transport, - socket = Socket, - active_n = N}) -> +activate_socket(#connection{transport = Transport, + socket = Socket, + active_n = N}) -> Transport:setopts(Socket, [{active, N}]). %%-------------------------------------------------------------------- @@ -570,11 +562,11 @@ activate_socket(#state{transport = Transport, inc_incoming_stats(Type) -> emqx_pd:update_counter(recv_pkt, 1), - case Type == ?PUBLISH of - true -> + if + Type == ?PUBLISH -> emqx_pd:update_counter(recv_msg, 1), emqx_pd:update_counter(incoming_pubs, 1); - false -> ok + true -> ok end. inc_outgoing_stats(Type) -> diff --git a/src/emqx_gc.erl b/src/emqx_gc.erl index 974d5b48e..7d47b7071 100644 --- a/src/emqx_gc.erl +++ b/src/emqx_gc.erl @@ -50,45 +50,38 @@ -define(ENABLED(X), (is_integer(X) andalso X > 0)). %% @doc Initialize force GC state. --spec(init(opts() | false) -> maybe(gc_state())). +-spec(init(opts()) -> gc_state()). init(#{count := Count, bytes := Bytes}) -> Cnt = [{cnt, {Count, Count}} || ?ENABLED(Count)], Oct = [{oct, {Bytes, Bytes}} || ?ENABLED(Bytes)], - ?GCS(maps:from_list(Cnt ++ Oct)); -init(false) -> undefined. + ?GCS(maps:from_list(Cnt ++ Oct)). %% @doc Try to run GC based on reduntions of count or bytes. -spec(run(pos_integer(), pos_integer(), gc_state()) -> {boolean(), gc_state()}). run(Cnt, Oct, ?GCS(St)) -> {Res, St1} = run([{cnt, Cnt}, {oct, Oct}], St), - {Res, ?GCS(St1)}; -run(_Cnt, _Oct, undefined) -> - {false, undefined}. + {Res, ?GCS(St1)}. run([], St) -> {false, St}; run([{K, N}|T], St) -> case dec(K, N, St) of {true, St1} -> - {true, do_gc(St1)}; + true = erlang:garbage_collect(), + {true, do_reset(St1)}; {false, St1} -> run(T, St1) end. %% @doc Info of GC state. --spec(info(gc_state()) -> maybe(map())). -info(?GCS(St)) -> - St; -info(undefined) -> - undefined. +-spec(info(maybe(gc_state())) -> maybe(map())). +info(?GCS(St)) -> St. %% @doc Reset counters to zero. --spec(reset(gc_state()) -> gc_state()). +-spec(reset(maybe(gc_state())) -> gc_state()). reset(?GCS(St)) -> - ?GCS(do_reset(St)); -reset(undefined) -> - undefined. + ?GCS(do_reset(St)). %%-------------------------------------------------------------------- %% Internal functions @@ -105,10 +98,6 @@ dec(Key, Num, St) -> {true, St} end. -do_gc(St) -> - true = erlang:garbage_collect(), - do_reset(St). - do_reset(St) -> do_reset(cnt, do_reset(oct, St)). diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index a325dc94b..193fc9243 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -20,13 +20,16 @@ -export([ merge_opts/2 , maybe_apply/2 + , run_fold/2 , run_fold/3 + , pipeline/3 , start_timer/2 , start_timer/3 , cancel_timer/1 , proc_name/2 , proc_stats/0 , proc_stats/1 + , index_of/2 ]). -export([ drain_deliver/0 @@ -40,7 +43,7 @@ ]}). %% @doc Merge options --spec(merge_opts(list(), list()) -> list()). +-spec(merge_opts(Opts, Opts) -> Opts when Opts :: proplists:proplist()). merge_opts(Defaults, Options) -> lists:foldl( fun({Opt, Val}, Acc) -> @@ -57,11 +60,34 @@ maybe_apply(_Fun, undefined) -> maybe_apply(Fun, Arg) when is_function(Fun) -> erlang:apply(Fun, [Arg]). +run_fold([], Acc) -> + Acc; +run_fold([Fun|More], Acc) -> + run_fold(More, Fun(Acc)). + +%% @doc RunFold run_fold([], Acc, _State) -> Acc; run_fold([Fun|More], Acc, State) -> run_fold(More, Fun(Acc, State), State). +%% @doc Pipeline +pipeline([], Input, State) -> + {ok, Input, State}; + +pipeline([Fun|More], Input, State) -> + case Fun(Input, State) of + ok -> pipeline(More, Input, State); + {ok, NState} -> + pipeline(More, Input, NState); + {ok, NInput, NState} -> + pipeline(More, NInput, NState); + {error, Reason} -> + {error, Reason, State}; + {error, Reason, NState} -> + {error, Reason, NState} + end. + -spec(start_timer(integer(), term()) -> reference()). start_timer(Interval, Msg) -> start_timer(Interval, self(), Msg). @@ -123,3 +149,14 @@ drain_down(Cnt, Acc) -> drain_down(0, Acc) end. +%% lists:index_of/2 +index_of(E, L) -> + index_of(E, 1, L). + +index_of(_E, _I, []) -> + error(badarg); +index_of(E, I, [E|_]) -> + I; +index_of(E, I, [_|L]) -> + index_of(E, I+1, L). + diff --git a/src/emqx_mod_presence.erl b/src/emqx_mod_presence.erl index 013a335cd..3947c2c8c 100644 --- a/src/emqx_mod_presence.erl +++ b/src/emqx_mod_presence.erl @@ -51,7 +51,6 @@ on_client_connected(#{client_id := ClientId, proto_ver := ProtoVer, keepalive := Keepalive }, Env) -> - case emqx_json:safe_encode(maps:merge(#{clientid => ClientId, username => Username, ipaddress => iolist_to_binary(esockd_net:ntoa(IpAddr)), @@ -67,6 +66,9 @@ on_client_connected(#{client_id := ClientId, ?LOG(error, "Encoding connected event error: ~p", [Reason]) end. + + + on_client_disconnected(#{client_id := ClientId, username := Username}, Reason, Env) -> case emqx_json:safe_encode(#{clientid => ClientId, diff --git a/src/emqx_mod_rewrite.erl b/src/emqx_mod_rewrite.erl index 5cd5af323..17cff974a 100644 --- a/src/emqx_mod_rewrite.erl +++ b/src/emqx_mod_rewrite.erl @@ -21,6 +21,11 @@ -include_lib("emqx.hrl"). -include_lib("emqx_mqtt.hrl"). +-ifdef(TEST). +-compile(export_all). +-compile(nowarn_export_all). +-endif. + %% APIs -export([ rewrite_subscribe/4 , rewrite_unsubscribe/4 @@ -86,4 +91,3 @@ compile(Rules) -> {ok, MP} = re:compile(Re), {rewrite, Topic, MP, Dest} end, Rules). - diff --git a/src/emqx_oom.erl b/src/emqx_oom.erl index 5e14547f0..8d3344402 100644 --- a/src/emqx_oom.erl +++ b/src/emqx_oom.erl @@ -41,8 +41,7 @@ -define(DISABLED, 0). %% @doc Init the OOM policy. --spec(init(maybe(opts())) -> oom_policy()). -init(undefined) -> undefined; +-spec(init(opts()) -> oom_policy()). init(#{message_queue_len := MaxQLen, max_heap_size := MaxHeapSizeInBytes}) -> MaxHeapSize = MaxHeapSizeInBytes div erlang:system_info(wordsize), @@ -60,8 +59,7 @@ init(#{message_queue_len := MaxQLen, %% `ok': There is nothing out of the ordinary. %% `shutdown': Some numbers (message queue length hit the limit), %% hence shutdown for greater good (system stability). --spec(check(maybe(oom_policy())) -> ok | {shutdown, reason()}). -check(undefined) -> ok; +-spec(check(oom_policy()) -> ok | {shutdown, reason()}). check({oom_policy, #{message_queue_len := MaxQLen, max_heap_size := MaxHeapSize}}) -> Qlength = proc_info(message_queue_len), @@ -71,18 +69,15 @@ check({oom_policy, #{message_queue_len := MaxQLen, {fun() -> is_exceeded(HeapSize, MaxHeapSize) end, {shutdown, proc_heap_too_large}}]). -do_check([]) -> - ok; +do_check([]) -> ok; do_check([{Pred, Result} | Rest]) -> case Pred() of true -> Result; false -> do_check(Rest) end. --spec(info(maybe(oom_policy())) -> maybe(opts())). -info(undefined) -> undefined; -info({oom_policy, Opts}) -> - Opts. +-spec(info(oom_policy()) -> opts()). +info({oom_policy, Opts}) -> Opts. -compile({inline, [ is_exceeded/2 diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index 4b0912d3f..9f9fc7610 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -19,6 +19,10 @@ -include("emqx.hrl"). -include("emqx_mqtt.hrl"). +-export([ type/1 + , qos/1 + ]). + -export([ proto_name/1 , type_name/1 , validate/1 @@ -30,6 +34,12 @@ -compile(inline). +type(#mqtt_packet{header = #mqtt_packet_header{type = Type}}) -> + Type. + +qos(#mqtt_packet{header = #mqtt_packet_header{qos = QoS}}) -> + QoS. + %% @doc Protocol name of the version. -spec(proto_name(emqx_types:version()) -> binary()). proto_name(?MQTT_PROTO_V3) -> @@ -167,11 +177,10 @@ will_msg(#mqtt_packet_connect{client_id = ClientId, will_qos = QoS, will_topic = Topic, will_props = Properties, - will_payload = Payload, - proto_ver = ProtoVer}) -> + will_payload = Payload}) -> Msg = emqx_message:make(ClientId, QoS, Topic, Payload), Msg#message{flags = #{dup => false, retain => Retain}, - headers = merge_props(#{username => Username, proto_ver => ProtoVer}, Properties)}. + headers = merge_props(#{username => Username}, Properties)}. merge_props(Headers, undefined) -> Headers; diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 6d11ce5b6..7f9d69666 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -56,25 +56,19 @@ -opaque(protocol() :: #protocol{}). +-define(INFO_KEYS, record_info(fields, protocol)). + +-define(ATTR_KEYS, [proto_name, proto_ver, clean_start, keepalive]). + -spec(init(#mqtt_packet_connect{}) -> protocol()). init(#mqtt_packet_connect{proto_name = ProtoName, proto_ver = ProtoVer, - will_props = WillProps, clean_start = CleanStart, keepalive = Keepalive, properties = Properties, client_id = ClientId, - username = Username - } = ConnPkt) -> - WillMsg = emqx_packet:will_msg( - case ProtoVer of - ?MQTT_PROTO_V5 -> - WillDelayInterval = get_property('Will-Delay-Interval', WillProps, 0), - ConnPkt#mqtt_packet_connect{ - will_props = set_property('Will-Delay-Interval', WillDelayInterval, WillProps)}; - _ -> - ConnPkt - end), + username = Username} = ConnPkt) -> + WillMsg = emqx_packet:will_msg(ConnPkt), #protocol{proto_name = ProtoName, proto_ver = ProtoVer, clean_start = CleanStart, @@ -85,30 +79,15 @@ init(#mqtt_packet_connect{proto_name = ProtoName, conn_props = Properties }. -info(#protocol{proto_name = ProtoName, - proto_ver = ProtoVer, - clean_start = CleanStart, - keepalive = Keepalive, - client_id = ClientId, - username = Username, - will_msg = WillMsg, - conn_props = ConnProps, - topic_aliases = Aliases }) -> - #{proto_name => ProtoName, - proto_ver => ProtoVer, - clean_start => CleanStart, - keepalive => Keepalive, - client_id => ClientId, - username => Username, - will_msg => WillMsg, - conn_props => ConnProps, - topic_aliases => Aliases - }. +-spec(info(protocol()) -> emqx_types:infos()). +info(Proto) -> + maps:from_list(info(?INFO_KEYS, Proto)). +-spec(info(atom()|list(atom()), protocol()) -> term()). +info(Keys, Proto) when is_list(Keys) -> + [{Key, info(Key, Proto)} || Key <- Keys]; info(proto_name, #protocol{proto_name = ProtoName}) -> ProtoName; -info(proto_ver, undefined) -> - ?MQTT_PROTO_V4; info(proto_ver, #protocol{proto_ver = ProtoVer}) -> ProtoVer; info(clean_start, #protocol{clean_start = CleanStart}) -> @@ -130,35 +109,23 @@ info(conn_props, #protocol{conn_props = ConnProps}) -> info(topic_aliases, #protocol{topic_aliases = Aliases}) -> Aliases. -attrs(#protocol{proto_name = ProtoName, - proto_ver = ProtoVer, - clean_start = CleanStart, - keepalive = Keepalive}) -> - #{proto_name => ProtoName, - proto_ver => ProtoVer, - clean_start => CleanStart, - keepalive => Keepalive - }. +-spec(attrs(protocol()) -> emqx_types:attrs()). +attrs(Proto) -> + maps:from_list(info(?ATTR_KEYS, Proto)). +-spec(find_alias(emqx_types:alias_id(), protocol()) + -> {ok, emqx_types:topic()} | false). find_alias(_AliasId, #protocol{topic_aliases = undefined}) -> false; find_alias(AliasId, #protocol{topic_aliases = Aliases}) -> maps:find(AliasId, Aliases). -save_alias(AliasId, Topic, Protocol = #protocol{topic_aliases = undefined}) -> - Protocol#protocol{topic_aliases = #{AliasId => Topic}}; -save_alias(AliasId, Topic, Protocol = #protocol{topic_aliases = Aliases}) -> - Protocol#protocol{topic_aliases = maps:put(AliasId, Topic, Aliases)}. +-spec(save_alias(emqx_types:alias_id(), emqx_types:topic(), protocol()) + -> protocol()). +save_alias(AliasId, Topic, Proto = #protocol{topic_aliases = undefined}) -> + Proto#protocol{topic_aliases = #{AliasId => Topic}}; +save_alias(AliasId, Topic, Proto = #protocol{topic_aliases = Aliases}) -> + Proto#protocol{topic_aliases = maps:put(AliasId, Topic, Aliases)}. clear_will_msg(Protocol) -> - Protocol#protocol{will_msg = undefined}. - -set_property(Name, Value, undefined) -> - #{Name => Value}; -set_property(Name, Value, Props) -> - Props#{Name => Value}. - -get_property(_Name, undefined, Default) -> - Default; -get_property(Name, Props, Default) -> - maps:get(Name, Props, Default). + Protocol#protocol{will_msg = undefined}. \ No newline at end of file diff --git a/src/emqx_reason_codes.erl b/src/emqx_reason_codes.erl index 387208ebe..6aad26a31 100644 --- a/src/emqx_reason_codes.erl +++ b/src/emqx_reason_codes.erl @@ -24,7 +24,7 @@ , text/1 , text/2 , connack_error/1 - , puback/1 + , mqtt_frame_error/1 ]). -export([compat/2]). @@ -176,6 +176,5 @@ connack_error(banned) -> ?RC_BANNED; connack_error(bad_authentication_method) -> ?RC_BAD_AUTHENTICATION_METHOD; connack_error(_) -> ?RC_NOT_AUTHORIZED. -%%TODO: This function should be removed. -puback([]) -> ?RC_NO_MATCHING_SUBSCRIBERS; -puback(L) when is_list(L) -> ?RC_SUCCESS. +mqtt_frame_error(mqtt_frame_too_large) -> ?RC_PACKET_TOO_LARGE; +mqtt_frame_error(_) -> ?RC_MALFORMED_PACKET. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index cddaabe35..d569386cc 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -58,6 +58,9 @@ , stats/1 ]). +%% Exports for unit tests +-export([set_field/3]). + -export([update_expiry_interval/2]). -export([ subscribe/4 @@ -85,9 +88,6 @@ -export_type([session/0]). -%% For test case --export([set_pkt_id/2]). - -import(emqx_zone, [get_env/3]). -record(session, { @@ -118,8 +118,11 @@ await_rel_timeout :: timeout(), %% Session Expiry Interval expiry_interval :: timeout(), + %% Enqueue Count + enqueue_cnt :: non_neg_integer(), %% Created at created_at :: erlang:timestamp() + }). -opaque(session() :: #session{}). @@ -127,6 +130,14 @@ -type(publish() :: {publish, emqx_types:packet_id(), emqx_types:message()}). -define(DEFAULT_BATCH_N, 1000). +-define(ATTR_KEYS, [expiry_interval, created_at]). +-define(INFO_KEYS, [subscriptions, max_subscriptions, upgrade_qos, inflight, + max_inflight, retry_interval, mqueue_len, max_mqueue, + mqueue_dropped, next_pkt_id, awaiting_rel, max_awaiting_rel, + await_rel_timeout, expiry_interval, created_at]). +-define(STATS_KEYS, [subscriptions_cnt, max_subscriptions, inflight, max_inflight, + mqueue_len, max_mqueue, mqueue_dropped, awaiting_rel, + max_awaiting_rel, enqueue_cnt]). %%-------------------------------------------------------------------- %% Init a session @@ -147,6 +158,7 @@ init(#{zone := Zone}, #{max_inflight := MaxInflight, max_awaiting_rel = get_env(Zone, max_awaiting_rel, 100), await_rel_timeout = get_env(Zone, await_rel_timeout, 3600*1000), expiry_interval = ExpiryInterval, + enqueue_cnt = 0, created_at = os:timestamp() }. @@ -157,42 +169,26 @@ init_mqueue(Zone) -> default_priority => get_env(Zone, mqueue_default_priority, lowest) }). -%%-------------------------------------------------------------------- -%% Infos of the session -%%-------------------------------------------------------------------- - +%% @doc Get infos of the session. -spec(info(session()) -> emqx_types:infos()). -info(#session{max_subscriptions = MaxSubscriptions, - subscriptions = Subscriptions, - upgrade_qos = UpgradeQoS, - inflight = Inflight, - retry_interval = RetryInterval, - mqueue = MQueue, - next_pkt_id = PacketId, - max_awaiting_rel = MaxAwaitingRel, - awaiting_rel = AwaitingRel, - await_rel_timeout = AwaitRelTimeout, - expiry_interval = ExpiryInterval, - created_at = CreatedAt}) -> - #{subscriptions => Subscriptions, - max_subscriptions => MaxSubscriptions, - upgrade_qos => UpgradeQoS, - inflight => emqx_inflight:size(Inflight), - max_inflight => emqx_inflight:max_size(Inflight), - retry_interval => RetryInterval, - mqueue_len => emqx_mqueue:len(MQueue), - max_mqueue => emqx_mqueue:max_len(MQueue), - mqueue_dropped => emqx_mqueue:dropped(MQueue), - next_pkt_id => PacketId, - awaiting_rel => maps:size(AwaitingRel), - max_awaiting_rel => MaxAwaitingRel, - await_rel_timeout => AwaitRelTimeout, - expiry_interval => ExpiryInterval, - created_at => CreatedAt - }. +info(Session) -> + maps:from_list(info(?INFO_KEYS, Session)). +%% Get attrs of the session. +-spec(attrs(session()) -> emqx_types:attrs()). +attrs(Session) -> + maps:from_list(info(?ATTR_KEYS, Session)). + +%% @doc Get stats of the session. +-spec(stats(session()) -> emqx_types:stats()). +stats(Session) -> info(?STATS_KEYS, Session). + +info(Keys, Session) when is_list(Keys) -> + [{Key, info(Key, Session)} || Key <- Keys]; info(subscriptions, #session{subscriptions = Subs}) -> Subs; +info(subscriptions_cnt, #session{subscriptions = Subs}) -> + maps:size(Subs); info(max_subscriptions, #session{max_subscriptions = MaxSubs}) -> MaxSubs; info(upgrade_qos, #session{upgrade_qos = UpgradeQoS}) -> @@ -219,47 +215,20 @@ info(await_rel_timeout, #session{await_rel_timeout = Timeout}) -> Timeout; info(expiry_interval, #session{expiry_interval = Interval}) -> Interval; +info(enqueue_cnt, #session{enqueue_cnt = Cnt}) -> + Cnt; info(created_at, #session{created_at = CreatedAt}) -> CreatedAt. +%% For tests +set_field(Name, Val, Channel) -> + Fields = record_info(fields, session), + Pos = emqx_misc:index_of(Name, Fields), + setelement(Pos+1, Channel, Val). + update_expiry_interval(ExpiryInterval, Session) -> Session#session{expiry_interval = ExpiryInterval}. -%%-------------------------------------------------------------------- -%% Attrs of the session -%%-------------------------------------------------------------------- - --spec(attrs(session()) -> emqx_types:attrs()). -attrs(undefined) -> - #{}; -attrs(#session{expiry_interval = ExpiryInterval, - created_at = CreatedAt}) -> - #{expiry_interval => ExpiryInterval, - created_at => CreatedAt - }. - -%%-------------------------------------------------------------------- -%% Stats of the session -%%-------------------------------------------------------------------- - -%% @doc Get stats of the session. --spec(stats(session()) -> emqx_types:stats()). -stats(#session{subscriptions = Subscriptions, - max_subscriptions = MaxSubscriptions, - inflight = Inflight, - mqueue = MQueue, - awaiting_rel = AwaitingRel, - max_awaiting_rel = MaxAwaitingRel}) -> - [{subscriptions, maps:size(Subscriptions)}, - {max_subscriptions, MaxSubscriptions}, - {inflight, emqx_inflight:size(Inflight)}, - {max_inflight, emqx_inflight:max_size(Inflight)}, - {mqueue_len, emqx_mqueue:len(MQueue)}, - {max_mqueue, emqx_mqueue:max_len(MQueue)}, - {mqueue_dropped, emqx_mqueue:dropped(MQueue)}, - {awaiting_rel, maps:size(AwaitingRel)}, - {max_awaiting_rel, MaxAwaitingRel}]. - -spec(takeover(session()) -> ok). takeover(#session{subscriptions = Subs}) -> lists:foreach(fun({TopicFilter, _SubOpts}) -> @@ -268,7 +237,6 @@ takeover(#session{subscriptions = Subs}) -> -spec(resume(emqx_types:client_id(), session()) -> ok). resume(ClientId, #session{subscriptions = Subs}) -> - ?LOG(info, "Session is resumed."), %% 1. Subscribe again. lists:foreach(fun({TopicFilter, SubOpts}) -> ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts) @@ -295,8 +263,8 @@ redeliver(Session = #session{inflight = Inflight}) -> %% Client -> Broker: SUBSCRIBE %%-------------------------------------------------------------------- --spec(subscribe(emqx_types:client(), emqx_types:topic(), emqx_types:subopts(), - session()) -> {ok, session()} | {error, emqx_types:reason_code()}). +-spec(subscribe(emqx_types:client(), emqx_types:topic(), emqx_types:subopts(), session()) + -> {ok, session()} | {error, emqx_types:reason_code()}). subscribe(Client, TopicFilter, SubOpts, Session = #session{subscriptions = Subs}) -> case is_subscriptions_full(Session) andalso (not maps:is_key(TopicFilter, Subs)) of @@ -311,8 +279,9 @@ is_subscriptions_full(#session{max_subscriptions = MaxLimit, subscriptions = Subs}) -> maps:size(Subs) >= MaxLimit. -do_subscribe(Client = #{client_id := ClientId}, - TopicFilter, SubOpts, Session = #session{subscriptions = Subs}) -> +-compile({inline, [do_subscribe/4]}). +do_subscribe(Client = #{client_id := ClientId}, TopicFilter, SubOpts, + Session = #session{subscriptions = Subs}) -> case IsNew = (not maps:is_key(TopicFilter, Subs)) of true -> ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts); @@ -320,9 +289,8 @@ do_subscribe(Client = #{client_id := ClientId}, _ = emqx_broker:set_subopts(TopicFilter, SubOpts) end, ok = emqx_hooks:run('session.subscribed', - [Client, TopicFilter, SubOpts#{new => IsNew}]), - Subs1 = maps:put(TopicFilter, SubOpts, Subs), - {ok, Session#session{subscriptions = Subs1}}. + [Client, TopicFilter, SubOpts#{is_new => IsNew}]), + {ok, Session#session{subscriptions = maps:put(TopicFilter, SubOpts, Subs)}}. %%-------------------------------------------------------------------- %% Client -> Broker: UNSUBSCRIBE @@ -353,8 +321,6 @@ publish(PacketId, Msg = #message{qos = ?QOS_2}, Session) -> false -> do_publish(PacketId, Msg, Session); true -> - ?LOG(warning, "Dropped qos2 packet ~w for too many awaiting_rel", [PacketId]), - ok = emqx_metrics:inc('messages.qos2.dropped'), {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED} end; @@ -386,42 +352,39 @@ do_publish(PacketId, Msg = #message{timestamp = Ts}, %%-------------------------------------------------------------------- -spec(puback(emqx_types:packet_id(), session()) - -> {ok, session()} | {ok, list(publish()), session()} | - {error, emqx_types:reason_code()}). + -> {ok, emqx_types:message(), session()} + | {ok, emqx_types:message(), list(publish()), session()} + | {error, emqx_types:reason_code()}). puback(PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> - ok = emqx_hooks:run('message.acked', [Msg]), Inflight1 = emqx_inflight:delete(PacketId, Inflight), - dequeue(Session#session{inflight = Inflight1}); - {value, {_OtherPub, _Ts}} -> - ?LOG(warning, "The PacketId has been used, PacketId: ~p", [PacketId]), + return_with(Msg, dequeue(Session#session{inflight = Inflight1})); + {value, {_Pubrel, _Ts}} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> - ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]), - ok = emqx_metrics:inc('packets.puback.missed'), {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} end. +return_with(Msg, {ok, Session}) -> + {ok, Msg, Session}; +return_with(Msg, {ok, Publishes, Session}) -> + {ok, Msg, Publishes, Session}. + %%-------------------------------------------------------------------- %% Client -> Broker: PUBREC %%-------------------------------------------------------------------- -spec(pubrec(emqx_types:packet_id(), session()) - -> {ok, session()} | {error, emqx_types:reason_code()}). + -> {ok, emqx_types:message(), session()} | {error, emqx_types:reason_code()}). pubrec(PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> - ok = emqx_hooks:run('message.acked', [Msg]), Inflight1 = emqx_inflight:update(PacketId, {pubrel, os:timestamp()}, Inflight), - {ok, Session#session{inflight = Inflight1}}; + {ok, Msg, Session#session{inflight = Inflight1}}; {value, {pubrel, _Ts}} -> - ?LOG(warning, "The PUBREC ~w is duplicated", [PacketId]), - ok = emqx_metrics:inc('packets.pubrec.inuse'), {error, ?RC_PACKET_IDENTIFIER_IN_USE}; none -> - ?LOG(warning, "The PUBREC ~w is not found.", [PacketId]), - ok = emqx_metrics:inc('packets.pubrec.missed'), {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} end. @@ -436,8 +399,6 @@ pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) -> {_Ts, AwaitingRel1} -> {ok, Session#session{awaiting_rel = AwaitingRel1}}; error -> - ?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId]), - ok = emqx_metrics:inc('packets.pubrel.missed'), {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} end. @@ -446,16 +407,14 @@ pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) -> %%-------------------------------------------------------------------- -spec(pubcomp(emqx_types:packet_id(), session()) - -> {ok, session()} | {ok, list(publish()), session()} | - {error, emqx_types:reason_code()}). + -> {ok, session()} | {ok, list(publish()), session()} + | {error, emqx_types:reason_code()}). pubcomp(PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:contain(PacketId, Inflight) of true -> Inflight1 = emqx_inflight:delete(PacketId, Inflight), dequeue(Session#session{inflight = Inflight1}); false -> - ?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId]), - ok = emqx_metrics:inc('packets.pubcomp.missed'), {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} end. @@ -493,7 +452,7 @@ batch_n(Inflight) -> deliver(Delivers, Session = #session{subscriptions = Subs}) when is_list(Delivers) -> - Msgs = [enrich(get_subopts(Topic, Subs), Msg, Session) + Msgs = [enrich_subopt(get_subopts(Topic, Subs), Msg, Session) || {deliver, Topic, Msg} <- Delivers], deliver(Msgs, [], Session). @@ -515,23 +474,20 @@ deliver([Msg = #message{qos = QoS}|More], Acc, deliver(More, [Publish|Acc], next_pkt_id(Session1)) end. -enqueue(Delivers, Session = #session{subscriptions = Subs}) - when is_list(Delivers) -> - Msgs = [enrich(get_subopts(Topic, Subs), Msg, Session) +enqueue(Delivers, Session = #session{subscriptions = Subs}) when is_list(Delivers) -> + Msgs = [enrich_subopt(get_subopts(Topic, Subs), Msg, Session) || {deliver, Topic, Msg} <- Delivers], lists:foldl(fun enqueue/2, Session, Msgs); -enqueue(Msg, Session = #session{mqueue = Q}) when is_record(Msg, message) -> - emqx_pd:update_counter(enqueue_stats, 1), +enqueue(Msg, Session = #session{mqueue = Q, enqueue_cnt = Cnt}) + when is_record(Msg, message) -> {Dropped, NewQ} = emqx_mqueue:in(Msg, Q), - if - Dropped =/= undefined -> - %% TODO:... - %% SessProps = #{client_id => ClientId, username => Username}, - ok; %% = emqx_hooks:run('message.dropped', [SessProps, Dropped]); - true -> ok + if is_record(Dropped, message) -> + ?LOG(warning, "Dropped msg due to mqueue is full: ~s", + [emqx_message:format(Dropped)]); + true -> ok end, - Session#session{mqueue = NewQ}. + Session#session{mqueue = NewQ, enqueue_cnt = Cnt+1}. %%-------------------------------------------------------------------- %% Awaiting ACK for QoS1/QoS2 Messages @@ -550,26 +506,26 @@ get_subopts(Topic, SubMap) -> error -> [] end. -enrich([], Msg, _Session) -> - Msg; -%%enrich([{nl, 1}|_Opts], #message{from = ClientId}, #session{client_id = ClientId}) -> -%% ignore; -enrich([{nl, _}|Opts], Msg, Session) -> - enrich(Opts, Msg, Session); -enrich([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, Session = #session{upgrade_qos= true}) -> - enrich(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, Session); -enrich([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, Session = #session{upgrade_qos= false}) -> - enrich(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, Session); -enrich([{rap, 0}|Opts], Msg = #message{flags = Flags, headers = #{proto_ver := ?MQTT_PROTO_V5}}, Session) -> - enrich(Opts, Msg#message{flags = maps:put(retain, false, Flags)}, Session); -enrich([{rap, _}|Opts], Msg = #message{headers = #{proto_ver := ?MQTT_PROTO_V5}}, Session) -> - enrich(Opts, Msg, Session); -enrich([{rap, _}|Opts], Msg = #message{headers = #{retained := true}}, Session = #session{}) -> - enrich(Opts, Msg, Session); -enrich([{rap, _}|Opts], Msg = #message{flags = Flags}, Session) -> - enrich(Opts, Msg#message{flags = maps:put(retain, false, Flags)}, Session); -enrich([{subid, SubId}|Opts], Msg, Session) -> - enrich(Opts, emqx_message:set_header('Subscription-Identifier', SubId, Msg), Session). +enrich_subopt([], Msg, _Session) -> Msg; +enrich_subopt([{nl, 1}|Opts], Msg, Session) -> + enrich_subopt(Opts, emqx_message:set_flag(nl, Msg), Session); +enrich_subopt([{nl, 0}|Opts], Msg, Session) -> + enrich_subopt(Opts, Msg, Session); +enrich_subopt([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, + Session = #session{upgrade_qos= true}) -> + enrich_subopt(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, Session); +enrich_subopt([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS}, + Session = #session{upgrade_qos= false}) -> + enrich_subopt(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, Session); +enrich_subopt([{rap, 1}|Opts], Msg, Session) -> + enrich_subopt(Opts, Msg, Session); +enrich_subopt([{rap, 0}|Opts], Msg = #message{headers = #{retained := true}}, Session) -> + enrich_subopt(Opts, Msg, Session); +enrich_subopt([{rap, 0}|Opts], Msg, Session) -> + enrich_subopt(Opts, emqx_message:set_flag(retain, false, Msg), Session); +enrich_subopt([{subid, SubId}|Opts], Msg, Session) -> + Msg1 = emqx_message:set_header('Subscription-Identifier', SubId, Msg), + enrich_subopt(Opts, Msg1, Session). %%-------------------------------------------------------------------- %% Retry Delivery @@ -608,8 +564,9 @@ retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) - ok = emqx_metrics:inc('messages.expired'), {Acc, emqx_inflight:delete(PacketId, Inflight)}; false -> - {[{publish, PacketId, Msg}|Acc], - emqx_inflight:update(PacketId, {Msg, Now}, Inflight)} + Msg1 = emqx_message:set_flag(dup, true, Msg), + {[{publish, PacketId, Msg1}|Acc], + emqx_inflight:update(PacketId, {Msg1, Now}, Inflight)} end; retry_delivery(PacketId, pubrel, Now, Acc, Inflight) -> @@ -654,10 +611,3 @@ next_pkt_id(Session = #session{next_pkt_id = 16#FFFF}) -> next_pkt_id(Session = #session{next_pkt_id = Id}) -> Session#session{next_pkt_id = Id + 1}. -%%--------------------------------------------------------------------- -%% For Test case -%%--------------------------------------------------------------------- - -set_pkt_id(Session, PktId) -> - Session#session{next_pkt_id = PktId}. - diff --git a/src/emqx_topic.erl b/src/emqx_topic.erl index 680cb299e..67d84789a 100644 --- a/src/emqx_topic.erl +++ b/src/emqx_topic.erl @@ -167,6 +167,7 @@ bin(L) when is_list(L) -> list_to_binary(L). levels(Topic) when is_binary(Topic) -> length(tokens(Topic)). +-compile({inline, [tokens/1]}). %% @doc Split topic to tokens. -spec(tokens(topic()) -> list(binary())). tokens(Topic) -> @@ -204,12 +205,12 @@ join([]) -> join([W]) -> bin(W); join(Words) -> - {_, Bin} = - lists:foldr(fun(W, {true, Tail}) -> - {false, <>}; - (W, {false, Tail}) -> - {false, <>} - end, {true, <<>>}, [bin(W) || W <- Words]), + {_, Bin} = lists:foldr( + fun(W, {true, Tail}) -> + {false, <>}; + (W, {false, Tail}) -> + {false, <>} + end, {true, <<>>}, [bin(W) || W <- Words]), Bin. -spec(parse(topic() | {topic(), map()}) -> {topic(), #{share => binary()}}). diff --git a/src/emqx_tracer.erl b/src/emqx_tracer.erl index f9273d3e3..ab102d6bb 100644 --- a/src/emqx_tracer.erl +++ b/src/emqx_tracer.erl @@ -65,7 +65,7 @@ trace(publish, #message{topic = <<"$SYS/", _/binary>>}) -> ignore; trace(publish, #message{from = From, topic = Topic, payload = Payload}) when is_binary(From); is_atom(From) -> - emqx_logger:info(#{topic => Topic}, "PUBLISH to ~s: ~p", [Topic, Payload]). + emqx_logger:info(#{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY} }, "PUBLISH to ~s: ~p", [Topic, Payload]). %% @doc Start to trace client_id or topic. -spec(start_trace(trace_who(), logger:level(), string()) -> ok | {error, term()}). diff --git a/src/emqx_types.erl b/src/emqx_types.erl index e7f8af692..86c682dbe 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -31,7 +31,7 @@ , subid/0 ]). --export_type([ conn/0 +-export_type([ conninfo/0 , client/0 , client_id/0 , username/0 @@ -43,6 +43,7 @@ -export_type([ connack/0 , subopts/0 , reason_code/0 + , alias_id/0 , properties/0 ]). @@ -94,12 +95,12 @@ -type(topic() :: emqx_topic:topic()). -type(subid() :: binary() | atom()). --type(conn() :: #{peername := peername(), - sockname := peername(), - peercert := esockd_peercert:peercert(), - conn_mod := module(), - atom() => term() - }). +-type(conninfo() :: #{peername := peername(), + sockname := peername(), + peercert := esockd_peercert:peercert(), + conn_mod := module(), + atom() => term() + }). -type(client() :: #{zone := zone(), conn_mod := maybe(module()), peername := peername(), @@ -142,6 +143,7 @@ }). -type(reason_code() :: 0..16#FF). -type(packet_id() :: 1..16#FFFF). +-type(alias_id() :: 0..16#FFFF). -type(properties() :: #{atom() => term()}). -type(topic_filters() :: list({topic(), subopts()})). -type(packet() :: #mqtt_packet{}). diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index e28f74b06..4e6baebdf 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -27,12 +27,10 @@ -export([ info/1 , attrs/1 , stats/1 + , state/1 ]). --export([ kick/1 - , discard/1 - , takeover/2 - ]). +-export([call/2]). %% WebSocket callbacks -export([ init/2 @@ -42,19 +40,29 @@ , terminate/3 ]). --record(state, { - peername :: emqx_types:peername(), - sockname :: emqx_types:peername(), - fsm_state :: idle | connected | disconnected, - serialize :: fun((emqx_types:packet()) -> iodata()), +-record(ws_connection, { + %% Peername of the ws connection. + peername :: emqx_types:peername(), + %% Sockname of the ws connection + sockname :: emqx_types:peername(), + %% FSM state + fsm_state :: idle | connected | disconnected, + %% Parser State parse_state :: emqx_frame:parse_state(), - chan_state :: emqx_channel:channel(), - pendings :: list(), + %% Serialize function + serialize :: fun((emqx_types:packet()) -> iodata()), + %% Channel State + chan_state :: emqx_channel:channel(), + %% Out Pending Packets + pendings :: list(emqx_types:packet()), + %% The stop reason stop_reason :: term() }). --type(state() :: #state{}). +-type(ws_connection() :: #ws_connection{}). +-define(INFO_KEYS, [socktype, peername, sockname, active_state]). +-define(ATTR_KEYS, [socktype, peername, sockname]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]). @@ -62,40 +70,44 @@ %% API %%-------------------------------------------------------------------- --spec(info(pid() | state()) -> emqx_types:infos()). -info(WSPid) when is_pid(WSPid) -> - call(WSPid, info); -info(#state{peername = Peername, - sockname = Sockname, - chan_state = ChanState}) -> - ConnInfo = #{socktype => websocket, - peername => Peername, - sockname => Sockname, - conn_state => running - }, +-spec(info(pid()|ws_connection()) -> emqx_types:infos()). +info(WsPid) when is_pid(WsPid) -> + call(WsPid, info); +info(WsConn = #ws_connection{chan_state = ChanState}) -> + ConnInfo = info(?INFO_KEYS, WsConn), ChanInfo = emqx_channel:info(ChanState), - maps:merge(ConnInfo, ChanInfo). + maps:merge(ChanInfo, #{connection => maps:from_list(ConnInfo)}). --spec(attrs(pid() | state()) -> emqx_types:attrs()). -attrs(WSPid) when is_pid(WSPid) -> - call(WSPid, attrs); -attrs(#state{peername = Peername, - sockname = Sockname, - chan_state = ChanState}) -> - ConnAttrs = #{socktype => websocket, - peername => Peername, - sockname => Sockname - }, +info(Keys, WsConn) when is_list(Keys) -> + [{Key, info(Key, WsConn)} || Key <- Keys]; +info(socktype, #ws_connection{}) -> + websocket; +info(peername, #ws_connection{peername = Peername}) -> + Peername; +info(sockname, #ws_connection{sockname = Sockname}) -> + Sockname; +info(active_state, #ws_connection{}) -> + running; +info(chan_state, #ws_connection{chan_state = ChanState}) -> + emqx_channel:info(ChanState). + +-spec(attrs(pid()|ws_connection()) -> emqx_types:attrs()). +attrs(WsPid) when is_pid(WsPid) -> + call(WsPid, attrs); +attrs(WsConn = #ws_connection{chan_state = ChanState}) -> + ConnAttrs = info(?ATTR_KEYS, WsConn), ChanAttrs = emqx_channel:attrs(ChanState), - maps:merge(ConnAttrs, ChanAttrs). + maps:merge(ChanAttrs, #{connection => maps:from_list(ConnAttrs)}). --spec(stats(pid() | state()) -> emqx_types:stats()). -stats(WSPid) when is_pid(WSPid) -> - call(WSPid, stats); -stats(#state{chan_state = ChanState}) -> +-spec(stats(pid()|ws_connection()) -> emqx_types:stats()). +stats(WsPid) when is_pid(WsPid) -> + call(WsPid, stats); +stats(#ws_connection{chan_state = ChanState}) -> ProcStats = emqx_misc:proc_stats(), + SockStats = wsock_stats(), + ConnStats = conn_stats(), ChanStats = emqx_channel:stats(ChanState), - lists:append([ProcStats, wsock_stats(), conn_stats(), ChanStats]). + lists:append([ProcStats, SockStats, ConnStats, ChanStats]). wsock_stats() -> [{Key, emqx_pd:get_counter(Key)} || Key <- ?SOCK_STATS]. @@ -103,22 +115,14 @@ wsock_stats() -> conn_stats() -> [{Name, emqx_pd:get_counter(Name)} || Name <- ?CONN_STATS]. --spec(kick(pid()) -> ok). -kick(CPid) -> - call(CPid, kick). +-spec(state(pid()) -> ws_connection()). +state(WsPid) -> call(WsPid, state). --spec(discard(pid()) -> ok). -discard(WSPid) -> - WSPid ! {cast, discard}, ok. - --spec(takeover(pid(), 'begin'|'end') -> Result :: term()). -takeover(CPid, Phase) -> - call(CPid, {takeover, Phase}). - -%% @private -call(WSPid, Req) when is_pid(WSPid) -> - Mref = erlang:monitor(process, WSPid), - WSPid ! {call, {self(), Mref}, Req}, +%% kick|discard|takeover +-spec(call(pid(), Req :: term()) -> Reply :: term()). +call(WsPid, Req) when is_pid(WsPid) -> + Mref = erlang:monitor(process, WsPid), + WsPid ! {call, {self(), Mref}, Req}, receive {Mref, Reply} -> erlang:demonitor(Mref, [flush]), @@ -142,10 +146,10 @@ init(Req, Opts) -> I -> I end, Compress = proplists:get_value(compress, Opts, false), - WsOpts = #{compress => Compress, - deflate_opts => DeflateOptions, + WsOpts = #{compress => Compress, + deflate_opts => DeflateOptions, max_frame_size => MaxFrameSize, - idle_timeout => IdleTimeout + idle_timeout => IdleTimeout }, case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of undefined -> @@ -156,7 +160,7 @@ init(Req, Opts) -> <<"sec-websocket-protocol">>, <<"mqtt", Vsn/binary>>, Req), {cowboy_websocket, Resp, [Req, Opts], WsOpts}; _ -> - {ok, cowboy_req:reply(400, Req), #state{}} + {ok, cowboy_req:reply(400, Req), WsOpts} end. websocket_init([Req, Opts]) -> @@ -169,54 +173,51 @@ websocket_init([Req, Opts]) -> ?LOG(error, "Illegal cookie"), undefined; Error:Reason -> - ?LOG(error, "Cookie is parsed failed, Error: ~p, Reason ~p", + ?LOG(error, "Failed to parse cookie, Error: ~p, Reason ~p", [Error, Reason]), undefined end, - ChanState = emqx_channel:init(#{peername => Peername, - sockname => Sockname, - peercert => Peercert, + ChanState = emqx_channel:init(#{peername => Peername, + sockname => Sockname, + peercert => Peercert, ws_cookie => WsCookie, - conn_mod => ?MODULE + conn_mod => ?MODULE }, Opts), Zone = proplists:get_value(zone, Opts), MaxSize = emqx_zone:get_env(Zone, max_packet_size, ?MAX_PACKET_SIZE), ParseState = emqx_frame:initial_parse_state(#{max_size => MaxSize}), emqx_logger:set_metadata_peername(esockd_net:format(Peername)), - {ok, #state{peername = Peername, - sockname = Sockname, - fsm_state = idle, - parse_state = ParseState, - chan_state = ChanState, - pendings = [] - }}. + {ok, #ws_connection{peername = Peername, + sockname = Sockname, + fsm_state = idle, + parse_state = ParseState, + chan_state = ChanState, + pendings = []}}. websocket_handle({binary, Data}, State) when is_list(Data) -> websocket_handle({binary, iolist_to_binary(Data)}, State); -websocket_handle({binary, Data}, State = #state{chan_state = ChanState}) - when is_binary(Data) -> +websocket_handle({binary, Data}, State = #ws_connection{chan_state = ChanState}) -> ?LOG(debug, "RECV ~p", [Data]), Oct = iolist_size(Data), - emqx_pd:update_counter(recv_cnt, 1), - emqx_pd:update_counter(recv_oct, Oct), - ok = emqx_metrics:inc('bytes.received', Oct), - NChanState = emqx_channel:ensure_timer( - stats_timer, emqx_channel:gc(1, Oct, ChanState)), - process_incoming(Data, State#state{chan_state = NChanState}); + ok = inc_recv_stats(1, Oct), + NChanState = emqx_channel:received(Oct, ChanState), + NState = State#ws_connection{chan_state = NChanState}, + process_incoming(Data, NState); %% Pings should be replied with pongs, cowboy does it automatically %% Pongs can be safely ignored. Clause here simply prevents crash. websocket_handle(Frame, State) when Frame =:= ping; Frame =:= pong -> {ok, State}; + websocket_handle({FrameType, _}, State) when FrameType =:= ping; FrameType =:= pong -> {ok, State}; -%% According to mqtt spec[https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901285] + websocket_handle({FrameType, _}, State) -> - ?LOG(error, "Frame error: unexpected frame - ~p", [FrameType]), - stop(unexpected_ws_frame, State). + ?LOG(error, "Unexpected frame - ~p", [FrameType]), + stop({shutdown, unexpected_ws_frame}, State). websocket_info({call, From, info}, State) -> gen_server:reply(From, info(State)), @@ -230,62 +231,52 @@ websocket_info({call, From, stats}, State) -> gen_server:reply(From, stats(State)), {ok, State}; -websocket_info({call, From, kick}, State) -> - gen_server:reply(From, ok), - stop(kicked, State); +websocket_info({call, From, state}, State) -> + gen_server:reply(From, State), + {ok, State}; -websocket_info({call, From, Req}, State = #state{chan_state = ChanState}) -> +websocket_info({call, From, Req}, State = #ws_connection{chan_state = ChanState}) -> case emqx_channel:handle_call(Req, ChanState) of {ok, Reply, NChanState} -> _ = gen_server:reply(From, Reply), - {ok, State#state{chan_state = NChanState}}; + {ok, State#ws_connection{chan_state = NChanState}}; {stop, Reason, Reply, NChanState} -> _ = gen_server:reply(From, Reply), - stop(Reason, State#state{chan_state = NChanState}) + stop(Reason, State#ws_connection{chan_state = NChanState}) end; -websocket_info({cast, discard}, State) -> - stop(discarded, State); - -websocket_info({cast, Msg}, State = #state{chan_state = ChanState}) -> +websocket_info({cast, Msg}, State = #ws_connection{chan_state = ChanState}) -> case emqx_channel:handle_cast(Msg, ChanState) of {ok, NChanState} -> - {ok, State#state{chan_state = NChanState}}; + {ok, State#ws_connection{chan_state = NChanState}}; {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}) + stop(Reason, State#ws_connection{chan_state = NChanState}) end; -websocket_info({incoming, Packet = ?CONNECT_PACKET( - #mqtt_packet_connect{ - proto_ver = ProtoVer} - )}, - State = #state{fsm_state = idle}) -> - handle_incoming(Packet, fun connected/1, - State#state{serialize = serialize_fun(ProtoVer)}); +websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, + State = #ws_connection{fsm_state = idle}) -> + #mqtt_packet_connect{proto_ver = ProtoVer} = ConnPkt, + NState = State#ws_connection{serialize = serialize_fun(ProtoVer)}, + handle_incoming(Packet, fun connected/1, NState); -websocket_info({incoming, Packet}, State = #state{fsm_state = idle}) -> +websocket_info({incoming, Packet}, State = #ws_connection{fsm_state = idle}) -> ?LOG(warning, "Unexpected incoming: ~p", [Packet]), stop(unexpected_incoming_packet, State); -websocket_info({incoming, Packet = ?PACKET(?CONNECT)}, - State = #state{fsm_state = connected}) -> - ?LOG(warning, "Unexpected connect: ~p", [Packet]), - stop(unexpected_incoming_connect, State); - -websocket_info({incoming, Packet}, State = #state{fsm_state = connected}) +websocket_info({incoming, Packet}, State = #ws_connection{fsm_state = connected}) when is_record(Packet, mqtt_packet) -> handle_incoming(Packet, fun reply/1, State); websocket_info(Deliver = {deliver, _Topic, _Msg}, - State = #state{chan_state = ChanState}) -> + State = #ws_connection{chan_state = ChanState}) -> Delivers = emqx_misc:drain_deliver([Deliver]), case emqx_channel:handle_out({deliver, Delivers}, ChanState) of {ok, NChanState} -> - reply(State#state{chan_state = NChanState}); + reply(State#ws_connection{chan_state = NChanState}); {ok, Packets, NChanState} -> - reply(enqueue(Packets, State#state{chan_state = NChanState})); + reply(enqueue(Packets, State#ws_connection{chan_state = NChanState})); {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}) + stop(Reason, State#ws_connection{chan_state = NChanState}) end; websocket_info({timeout, TRef, keepalive}, State) when is_reference(TRef) -> @@ -298,26 +289,22 @@ websocket_info({timeout, TRef, emit_stats}, State) when is_reference(TRef) -> websocket_info({timeout, TRef, Msg}, State) when is_reference(TRef) -> handle_timeout(TRef, Msg, State); -websocket_info({shutdown, conflict, {ClientId, NewPid}}, State) -> - ?LOG(warning, "Clientid '~s' conflict with ~p", [ClientId, NewPid]), - stop(conflict, State); - websocket_info({shutdown, Reason}, State) -> - stop(Reason, State); + stop({shutdown, Reason}, State); websocket_info({stop, Reason}, State) -> stop(Reason, State); -websocket_info(Info, State = #state{chan_state = ChanState}) -> +websocket_info(Info, State = #ws_connection{chan_state = ChanState}) -> case emqx_channel:handle_info(Info, ChanState) of {ok, NChanState} -> - {ok, State#state{chan_state = NChanState}}; + {ok, State#ws_connection{chan_state = NChanState}}; {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}) + stop(Reason, State#ws_connection{chan_state = NChanState}) end. -terminate(SockError, _Req, #state{chan_state = ChanState, - stop_reason = Reason}) -> +terminate(SockError, _Req, #ws_connection{chan_state = ChanState, + stop_reason = Reason}) -> ?LOG(debug, "Terminated for ~p, sockerror: ~p", [Reason, SockError]), emqx_channel:terminate(Reason, ChanState). @@ -325,24 +312,22 @@ terminate(SockError, _Req, #state{chan_state = ChanState, %%-------------------------------------------------------------------- %% Connected callback -connected(State = #state{chan_state = ChanState}) -> - NState = State#state{fsm_state = connected}, - #{client_id := ClientId} = emqx_channel:info(client, ChanState), - ok = emqx_cm:register_channel(ClientId), - ok = emqx_cm:set_chan_attrs(ClientId, attrs(NState)), - reply(NState). +connected(State = #ws_connection{chan_state = ChanState}) -> + ok = emqx_channel:handle_cast({register, attrs(State), stats(State)}, ChanState), + reply(State#ws_connection{fsm_state = connected}). %%-------------------------------------------------------------------- %% Handle timeout -handle_timeout(TRef, Msg, State = #state{chan_state = ChanState}) -> +handle_timeout(TRef, Msg, State = #ws_connection{chan_state = ChanState}) -> case emqx_channel:timeout(TRef, Msg, ChanState) of {ok, NChanState} -> - {ok, State#state{chan_state = NChanState}}; + {ok, State#ws_connection{chan_state = NChanState}}; {ok, Packets, NChanState} -> - reply(enqueue(Packets, State#state{chan_state = NChanState})); + NState = State#ws_connection{chan_state = NChanState}, + reply(enqueue(Packets, NState)); {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}) + stop(Reason, State#ws_connection{chan_state = NChanState}) end. %%-------------------------------------------------------------------- @@ -351,13 +336,13 @@ handle_timeout(TRef, Msg, State = #state{chan_state = ChanState}) -> process_incoming(<<>>, State) -> {ok, State}; -process_incoming(Data, State = #state{parse_state = ParseState}) -> +process_incoming(Data, State = #ws_connection{parse_state = ParseState, chan_state = ChanState}) -> try emqx_frame:parse(Data, ParseState) of {ok, NParseState} -> - {ok, State#state{parse_state = NParseState}}; + {ok, State#ws_connection{parse_state = NParseState}}; {ok, Packet, Rest, NParseState} -> self() ! {incoming, Packet}, - process_incoming(Rest, State#state{parse_state = NParseState}); + process_incoming(Rest, State#ws_connection{parse_state = NParseState}); {error, Reason} -> ?LOG(error, "Frame error: ~p", [Reason]), stop(Reason, State) @@ -365,34 +350,46 @@ process_incoming(Data, State = #state{parse_state = ParseState}) -> error:Reason:Stk -> ?LOG(error, "Parse failed for ~p~n\ Stacktrace:~p~nFrame data: ~p", [Reason, Stk, Data]), - stop(parse_error, State) + case emqx_channel:handle_out({disconnect, emqx_reason_codes:mqtt_frame_error(Reason)}, ChanState) of + {stop, Reason0, OutPackets, NChanState} -> + NState = State#ws_connection{chan_state = NChanState}, + stop(Reason0, enqueue(OutPackets, NState)); + {stop, Reason0, NChanState} -> + stop(Reason0, State#ws_connection{chan_state = NChanState}) + end end. %%-------------------------------------------------------------------- %% Handle incoming packets -handle_incoming(Packet = ?PACKET(Type), SuccFun, State = #state{chan_state = ChanState}) -> +handle_incoming(Packet = ?PACKET(Type), SuccFun, + State = #ws_connection{chan_state = ChanState}) -> _ = inc_incoming_stats(Type), ok = emqx_metrics:inc_recv(Packet), ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]), case emqx_channel:handle_in(Packet, ChanState) of {ok, NChanState} -> - SuccFun(State#state{chan_state= NChanState}); + SuccFun(State#ws_connection{chan_state= NChanState}); {ok, OutPackets, NChanState} -> - SuccFun(enqueue(OutPackets, State#state{chan_state= NChanState})); + NState = State#ws_connection{chan_state= NChanState}, + SuccFun(enqueue(OutPackets, NState)); {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state= NChanState}); + stop(Reason, State#ws_connection{chan_state= NChanState}); {stop, Reason, OutPacket, NChanState} -> - stop(Reason, enqueue(OutPacket, State#state{chan_state= NChanState})) + NState = State#ws_connection{chan_state= NChanState}, + stop(Reason, enqueue(OutPacket, NState)) end. %%-------------------------------------------------------------------- %% Handle outgoing packets -handle_outgoing(Packets, #state{serialize = Serialize}) -> +handle_outgoing(Packets, State = #ws_connection{serialize = Serialize, + chan_state = ChanState}) -> Data = lists:map(Serialize, Packets), - emqx_pd:update_counter(send_oct, iolist_size(Data)), - {binary, Data}. + Oct = iolist_size(Data), + ok = inc_sent_stats(length(Packets), Oct), + NChanState = emqx_channel:sent(Oct, ChanState), + {{binary, Data}, State#ws_connection{chan_state = NChanState}}. %%-------------------------------------------------------------------- %% Serialize fun @@ -401,42 +398,60 @@ serialize_fun(ProtoVer) -> fun(Packet = ?PACKET(Type)) -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]), _ = inc_outgoing_stats(Type), + _ = emqx_metrics:inc_sent(Packet), emqx_frame:serialize(Packet, ProtoVer) end. %%-------------------------------------------------------------------- %% Inc incoming/outgoing stats +-compile({inline, + [ inc_recv_stats/2 + , inc_incoming_stats/1 + , inc_outgoing_stats/1 + , inc_sent_stats/2 + ]}). + +inc_recv_stats(Cnt, Oct) -> + emqx_pd:update_counter(recv_cnt, Cnt), + emqx_pd:update_counter(recv_oct, Oct), + emqx_metrics:inc('bytes.received', Oct). + inc_incoming_stats(Type) -> emqx_pd:update_counter(recv_pkt, 1), (Type == ?PUBLISH) andalso emqx_pd:update_counter(recv_msg, 1). inc_outgoing_stats(Type) -> - emqx_pd:update_counter(send_cnt, 1), emqx_pd:update_counter(send_pkt, 1), (Type == ?PUBLISH) andalso emqx_pd:update_counter(send_msg, 1). +inc_sent_stats(Cnt, Oct) -> + emqx_pd:update_counter(send_cnt, Cnt), + emqx_pd:update_counter(send_oct, Oct), + emqx_metrics:inc('bytes.sent', Oct). + %%-------------------------------------------------------------------- %% Reply or Stop -reply(State = #state{pendings = []}) -> - {ok, State}; -reply(State = #state{chan_state = ChanState, pendings = Pendings}) -> - Reply = handle_outgoing(Pendings, State), - NChanState = emqx_channel:ensure_timer(stats_timer, ChanState), - {reply, Reply, State#state{chan_state = NChanState, pendings = []}}. +-compile({inline, [reply/1]}). -stop(Reason, State = #state{pendings = []}) -> - {stop, State#state{stop_reason = Reason}}; -stop(Reason, State = #state{pendings = Pendings}) -> - Reply = handle_outgoing(Pendings, State), - {reply, [Reply, close], - State#state{pendings = [], stop_reason = Reason}}. +reply(State = #ws_connection{pendings = []}) -> + {ok, State}; +reply(State = #ws_connection{pendings = Pendings}) -> + {Reply, NState} = handle_outgoing(Pendings, State), + {reply, Reply, NState#ws_connection{pendings = []}}. + +stop(Reason, State = #ws_connection{pendings = []}) -> + {stop, State#ws_connection{stop_reason = Reason}}; +stop(Reason, State = #ws_connection{pendings = Pendings}) -> + {Reply, State1} = handle_outgoing(Pendings, State), + State2 = State1#ws_connection{pendings = [], stop_reason = Reason}, + {reply, [Reply, close], State2}. enqueue(Packet, State) when is_record(Packet, mqtt_packet) -> enqueue([Packet], State); -enqueue(Packets, State = #state{pendings = Pendings}) -> - State#state{pendings = lists:append(Pendings, Packets)}. +enqueue(Packets, State = #ws_connection{pendings = Pendings}) -> + State#ws_connection{pendings = lists:append(Pendings, Packets)}. diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index 0c8e61091..b06bc0b97 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -166,7 +166,8 @@ t_handle_deliver(_) -> Msg0 = emqx_message:make(<<"clientx">>, ?QOS_0, <<"t0">>, <<"qos0">>), Msg1 = emqx_message:make(<<"clientx">>, ?QOS_1, <<"t1">>, <<"qos1">>), Delivers = [{deliver, <<"+">>, Msg0}, {deliver, <<"+">>, Msg1}], - {ok, _Ch} = emqx_channel:handle_out({deliver, Delivers}, Channel1) + {ok, Packets, _Ch} = emqx_channel:handle_out({deliver, Delivers}, Channel1), + ?assertEqual([?QOS_0, ?QOS_1], [emqx_packet:qos(Pkt)|| Pkt <- Packets]) end). %%-------------------------------------------------------------------- @@ -178,7 +179,8 @@ t_handle_connack(_) -> fun(Channel) -> {ok, ?CONNACK_PACKET(?RC_SUCCESS, SP, _), _} = handle_out({connack, ?RC_SUCCESS, 0}, Channel), - {stop, {shutdown, unauthorized_client}, ?CONNACK_PACKET(5), _} + {stop, {shutdown, not_authorized}, + ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _} = handle_out({connack, ?RC_NOT_AUTHORIZED}, Channel) end). @@ -278,18 +280,20 @@ with_channel(Fun) -> Channel = emqx_channel:init(ConnInfo, Options), ConnPkt = #mqtt_packet_connect{ proto_name = <<"MQTT">>, - proto_ver = ?MQTT_PROTO_V4, + proto_ver = ?MQTT_PROTO_V5, clean_start = true, keepalive = 30, properties = #{}, client_id = <<"clientid">>, - username = <<"username">> + username = <<"username">>, + password = <<"passwd">> }, Protocol = emqx_protocol:init(ConnPkt), Session = emqx_session:init(#{zone => testing}, #{max_inflight => 100, expiry_interval => 0 }), - Fun(emqx_channel:set(protocol, Protocol, - emqx_channel:set(session, Session, Channel))). + Fun(emqx_channel:set_field(protocol, Protocol, + emqx_channel:set_field( + session, Session, Channel))). diff --git a/test/emqx_client_SUITE.erl b/test/emqx_client_SUITE.erl index a00d95603..660892112 100644 --- a/test/emqx_client_SUITE.erl +++ b/test/emqx_client_SUITE.erl @@ -100,7 +100,7 @@ t_cm(_) -> emqtt:subscribe(C, <<"mytopic">>, 0), ct:sleep(1200), Stats = emqx_cm:get_chan_stats(ClientId), - ?assertEqual(1, proplists:get_value(subscriptions, Stats)), + ?assertEqual(1, proplists:get_value(subscriptions_cnt, Stats)), emqx_zone:set_env(external, idle_timeout, IdleTimeout). t_cm_registry(_) -> diff --git a/test/emqx_gc_SUITE.erl b/test/emqx_gc_SUITE.erl index 1cdecebba..0b56fbdb6 100644 --- a/test/emqx_gc_SUITE.erl +++ b/test/emqx_gc_SUITE.erl @@ -24,7 +24,6 @@ all() -> emqx_ct:all(?MODULE). t_init(_) -> - ?assertEqual(undefined, emqx_gc:init(false)), GC1 = emqx_gc:init(#{count => 10, bytes => 0}), ?assertEqual(#{cnt => {10, 10}}, emqx_gc:info(GC1)), GC2 = emqx_gc:init(#{count => 0, bytes => 10}), @@ -33,9 +32,6 @@ t_init(_) -> ?assertEqual(#{cnt => {10, 10}, oct => {10, 10}}, emqx_gc:info(GC3)). t_run(_) -> - Undefined = emqx_gc:init(false), - ?assertEqual(undefined, Undefined), - ?assertEqual({false, undefined}, emqx_gc:run(1, 1, Undefined)), GC = emqx_gc:init(#{count => 10, bytes => 10}), ?assertEqual({true, GC}, emqx_gc:run(1, 1000, GC)), ?assertEqual({true, GC}, emqx_gc:run(1000, 1, GC)), @@ -51,12 +47,10 @@ t_run(_) -> ?assertEqual({false, DisabledGC}, emqx_gc:run(1, 1, DisabledGC)). t_info(_) -> - ?assertEqual(undefined, emqx_gc:info(undefined)), GC = emqx_gc:init(#{count => 10, bytes => 0}), ?assertEqual(#{cnt => {10, 10}}, emqx_gc:info(GC)). t_reset(_) -> - ?assertEqual(undefined, emqx_gc:reset(undefined)), GC = emqx_gc:init(#{count => 10, bytes => 10}), {false, GC1} = emqx_gc:run(5, 5, GC), ?assertEqual(#{cnt => {10, 5}, oct => {10, 5}}, emqx_gc:info(GC1)), diff --git a/test/emqx_misc_SUITE.erl b/test/emqx_misc_SUITE.erl index a8337c060..677b25c17 100644 --- a/test/emqx_misc_SUITE.erl +++ b/test/emqx_misc_SUITE.erl @@ -22,60 +22,92 @@ -include_lib("eunit/include/eunit.hrl"). -define(SOCKOPTS, [binary, - {packet, raw}, + {packet, raw}, {reuseaddr, true}, - {backlog, 512}, - {nodelay, true}]). + {backlog, 512}, + {nodelay, true} + ]). all() -> emqx_ct:all(?MODULE). -t_merge_opts() -> +t_merge_opts(_) -> Opts = emqx_misc:merge_opts(?SOCKOPTS, [raw, binary, {backlog, 1024}, {nodelay, false}, {max_clients, 1024}, - {acceptors, 16}]), + {acceptors, 16} + ]), ?assertEqual(1024, proplists:get_value(backlog, Opts)), ?assertEqual(1024, proplists:get_value(max_clients, Opts)), - [binary, raw, - {acceptors, 16}, - {backlog, 1024}, - {max_clients, 1024}, - {nodelay, false}, - {packet, raw}, - {reuseaddr, true}] = lists:sort(Opts). + ?assertEqual([binary, raw, + {acceptors, 16}, + {backlog, 1024}, + {max_clients, 1024}, + {nodelay, false}, + {packet, raw}, + {reuseaddr, true}], lists:sort(Opts)). -t_timer_cancel_flush() -> +t_maybe_apply(_) -> + ?assertEqual(undefined, emqx_misc:maybe_apply(fun(A) -> A end, undefined)), + ?assertEqual(a, emqx_misc:maybe_apply(fun(A) -> A end, a)). + +t_run_fold(_) -> + ?assertEqual(1, emqx_misc:run_fold([], 1, state)), + Add = fun(I, St) -> I+St end, + Mul = fun(I, St) -> I*St end, + ?assertEqual(6, emqx_misc:run_fold([Add, Mul], 1, 2)). + +t_pipeline(_) -> + ?assertEqual({ok, input, state}, emqx_misc:pipeline([], input, state)), + Funs = [fun(_I, _St) -> ok end, + fun(_I, St) -> {ok, St+1} end, + fun(I, St) -> {ok, I+1, St+1} end, + fun(I, St) -> {ok, I*2, St*2} end], + ?assertEqual({ok, 4, 6}, emqx_misc:pipeline(Funs, 1, 1)). + +t_start_timer(_) -> + TRef = emqx_misc:start_timer(1, tmsg), + timer:sleep(2), + ?assertEqual([{timeout, TRef, tmsg}], drain()), + ok = emqx_misc:cancel_timer(TRef). + +t_cancel_timer(_) -> Timer = emqx_misc:start_timer(0, foo), ok = emqx_misc:cancel_timer(Timer), - receive - {timeout, Timer, foo} -> - error(unexpected) - after 0 -> ok - end. + ?assertEqual([], drain()). t_proc_name(_) -> - 'TODO'. + ?assertEqual(emqx_pool_1, emqx_misc:proc_name(emqx_pool, 1)). t_proc_stats(_) -> - 'TODO'. + Pid1 = spawn(fun() -> exit(normal) end), + timer:sleep(10), + ?assertEqual([], emqx_misc:proc_stats(Pid1)), + Pid2 = spawn(fun() -> timer:sleep(100) end), + Pid2 ! msg, + timer:sleep(10), + ?assertMatch([{mailbox_len, 1}|_], emqx_misc:proc_stats(Pid2)). t_drain_deliver(_) -> - 'TODO'. + self() ! {deliver, t1, m1}, + self() ! {deliver, t2, m2}, + ?assertEqual([{deliver, t1, m1}, + {deliver, t2, m2} + ], emqx_misc:drain_deliver()). t_drain_down(_) -> - 'TODO'. + {Pid1, _Ref1} = erlang:spawn_monitor(fun() -> ok end), + {Pid2, _Ref2} = erlang:spawn_monitor(fun() -> ok end), + timer:sleep(100), + ?assertEqual([Pid1, Pid2], emqx_misc:drain_down(2)). -%% drain self() msg queue for deterministic test behavior drain() -> - _ = drain([]), % maybe log - ok. + drain([]). drain(Acc) -> receive - Msg -> - drain([Msg | Acc]) + Msg -> drain([Msg|Acc]) after 0 -> lists:reverse(Acc) diff --git a/test/emqx_mod_rewrite_SUITE.erl b/test/emqx_mod_rewrite_SUITE.erl index 675918286..158d93235 100644 --- a/test/emqx_mod_rewrite_SUITE.erl +++ b/test/emqx_mod_rewrite_SUITE.erl @@ -19,63 +19,32 @@ -compile(export_all). -compile(nowarn_export_all). --import(emqx_mod_rewrite, - [ rewrite_subscribe/4 - , rewrite_unsubscribe/4 - , rewrite_publish/2 - ]). - --include_lib("emqx.hrl"). +-include("emqx_mqtt.hrl"). -include_lib("eunit/include/eunit.hrl"). --define(TEST_RULES, [<<"x/# ^x/y/(.+)$ z/y/$1">>, - <<"y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2">> - ]). +-define(rules, [{rewrite,<<"x/#">>,<<"^x/y/(.+)$">>,<<"z/y/$1">>}, + {rewrite,<<"y/+/z/#">>,<<"^y/(.+)/z/(.+)$">>,<<"y/z/$2">>}]). all() -> emqx_ct:all(?MODULE). -%%-------------------------------------------------------------------- -%% Test cases -%%-------------------------------------------------------------------- - -t_rewrite_subscribe(_) -> - ?assertEqual({ok, [{<<"test">>, #{}}]}, - rewrite(subscribe, [{<<"test">>, #{}}])), - ?assertEqual({ok, [{<<"z/y/test">>, #{}}]}, - rewrite(subscribe, [{<<"x/y/test">>, #{}}])), - ?assertEqual({ok, [{<<"y/z/test_topic">>, #{}}]}, - rewrite(subscribe, [{<<"y/test/z/test_topic">>, #{}}])). - -t_rewrite_unsubscribe(_) -> - ?assertEqual({ok, [{<<"test">>, #{}}]}, - rewrite(unsubscribe, [{<<"test">>, #{}}])), - ?assertEqual({ok, [{<<"z/y/test">>, #{}}]}, - rewrite(unsubscribe, [{<<"x/y/test">>, #{}}])), - ?assertEqual({ok, [{<<"y/z/test_topic">>, #{}}]}, - rewrite(unsubscribe, [{<<"y/test/z/test_topic">>, #{}}])). - -t_rewrite_publish(_) -> - ?assertMatch({ok, #message{topic = <<"test">>}}, - rewrite(publish, #message{topic = <<"test">>})), - ?assertMatch({ok, #message{topic = <<"z/y/test">>}}, - rewrite(publish, #message{topic = <<"x/y/test">>})), - ?assertMatch({ok, #message{topic = <<"y/z/test_topic">>}}, - rewrite(publish, #message{topic = <<"y/test/z/test_topic">>})). - -%%-------------------------------------------------------------------- -%% Helper functions -%%-------------------------------------------------------------------- - -rewrite(subscribe, TopicFilters) -> - rewrite_subscribe(#{}, #{}, TopicFilters, rules()); -rewrite(unsubscribe, TopicFilters) -> - rewrite_unsubscribe(#{}, #{}, TopicFilters, rules()); -rewrite(publish, Msg) -> rewrite_publish(Msg, rules()). - -rules() -> - [begin - [Topic, Re, Dest] = string:split(Rule, " ", all), - {ok, MP} = re:compile(Re), - {rewrite, Topic, MP, Dest} - end || Rule <- ?TEST_RULES]. - +t_rewrite_rule(_Config) -> + {ok, _} = emqx_hooks:start_link(), + ok = emqx_mod_rewrite:load(?rules), + RawTopicFilters = [{<<"x/y/2">>, opts}, + {<<"x/1/2">>, opts}, + {<<"y/a/z/b">>, opts}, + {<<"y/def">>, opts}], + SubTopicFilters = emqx_hooks:run_fold('client.subscribe', [client, properties], RawTopicFilters), + UnSubTopicFilters = emqx_hooks:run_fold('client.unsubscribe', [client, properties], RawTopicFilters), + Messages = [emqx_hooks:run_fold('message.publish', [], emqx_message:make(Topic, <<"payload">>)) + || {Topic, _Opts} <- RawTopicFilters], + ExpectedTopicFilters = [{<<"z/y/2">>, opts}, + {<<"x/1/2">>, opts}, + {<<"y/z/b">>, opts}, + {<<"y/def">>, opts}], + ?assertEqual(ExpectedTopicFilters, SubTopicFilters), + ?assertEqual(ExpectedTopicFilters, UnSubTopicFilters), + [?assertEqual(ExpectedTopic, emqx_message:topic(Message)) + || {{ExpectedTopic, _opts}, Message} <- lists:zip(ExpectedTopicFilters, Messages)], + ok = emqx_mod_rewrite:unload(?rules), + ok = emqx_hooks:stop(). diff --git a/test/emqx_oom_SUITE.erl b/test/emqx_oom_SUITE.erl index 90eb4a253..c5a7edc1e 100644 --- a/test/emqx_oom_SUITE.erl +++ b/test/emqx_oom_SUITE.erl @@ -24,7 +24,6 @@ all() -> emqx_ct:all(?MODULE). t_init(_) -> - ?assertEqual(undefined, emqx_oom:init(undefined)), Opts = #{message_queue_len => 10, max_heap_size => 1024*1024*8 }, @@ -34,7 +33,6 @@ t_init(_) -> }, emqx_oom:info(Oom)). t_check(_) -> - ?assertEqual(ok, emqx_oom:check(undefined)), Opts = #{message_queue_len => 10, max_heap_size => 1024*1024*8 }, diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl index 89f1d7344..2cd091e30 100644 --- a/test/emqx_protocol_SUITE.erl +++ b/test/emqx_protocol_SUITE.erl @@ -24,26 +24,58 @@ all() -> emqx_ct:all(?MODULE). -t_init_and_info(_) -> - ConnPkt = #mqtt_packet_connect{ - proto_name = <<"MQTT">>, - proto_ver = ?MQTT_PROTO_V4, - is_bridge = false, - clean_start = true, - keepalive = 30, - properties = #{}, - client_id = <<"clientid">>, - username = <<"username">>, - password = <<"passwd">> - }, - Proto = emqx_protocol:init(ConnPkt), +init_per_suite(Config) -> + [{proto, init_protocol()}|Config]. + +init_protocol() -> + emqx_protocol:init(#mqtt_packet_connect{ + proto_name = <<"MQTT">>, + proto_ver = ?MQTT_PROTO_V5, + is_bridge = false, + clean_start = true, + keepalive = 30, + properties = #{}, + client_id = <<"clientid">>, + username = <<"username">>, + password = <<"passwd">> + }). + +end_per_suite(_Config) -> ok. + +t_init_info_1(Config) -> + Proto = proplists:get_value(proto, Config), + ?assertEqual(#{proto_name => <<"MQTT">>, + proto_ver => ?MQTT_PROTO_V5, + clean_start => true, + keepalive => 30, + conn_props => #{}, + will_msg => undefined, + client_id => <<"clientid">>, + username => <<"username">>, + topic_aliases => undefined + }, emqx_protocol:info(Proto)). + +t_init_info_2(Config) -> + Proto = proplists:get_value(proto, Config), ?assertEqual(<<"MQTT">>, emqx_protocol:info(proto_name, Proto)), - ?assertEqual(?MQTT_PROTO_V4, emqx_protocol:info(proto_ver, Proto)), + ?assertEqual(?MQTT_PROTO_V5, emqx_protocol:info(proto_ver, Proto)), ?assertEqual(true, emqx_protocol:info(clean_start, Proto)), + ?assertEqual(30, emqx_protocol:info(keepalive, Proto)), ?assertEqual(<<"clientid">>, emqx_protocol:info(client_id, Proto)), ?assertEqual(<<"username">>, emqx_protocol:info(username, Proto)), ?assertEqual(undefined, emqx_protocol:info(will_msg, Proto)), - ?assertEqual(#{}, emqx_protocol:info(conn_props, Proto)). - + ?assertEqual(0, emqx_protocol:info(will_delay_interval, Proto)), + ?assertEqual(#{}, emqx_protocol:info(conn_props, Proto)), + ?assertEqual(undefined, emqx_protocol:info(topic_aliases, Proto)). +t_find_save_alias(Config) -> + Proto = proplists:get_value(proto, Config), + ?assertEqual(undefined, emqx_protocol:info(topic_aliases, Proto)), + ?assertEqual(false, emqx_protocol:find_alias(1, Proto)), + Proto1 = emqx_protocol:save_alias(1, <<"t1">>, Proto), + Proto2 = emqx_protocol:save_alias(2, <<"t2">>, Proto1), + ?assertEqual(#{1 => <<"t1">>, 2 => <<"t2">>}, + emqx_protocol:info(topic_aliases, Proto2)), + ?assertEqual({ok, <<"t1">>}, emqx_protocol:find_alias(1, Proto2)), + ?assertEqual({ok, <<"t2">>}, emqx_protocol:find_alias(2, Proto2)). diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index 9f537ff19..3e30ff040 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -30,7 +30,6 @@ , emqx_message , emqx_hooks , emqx_zone - , emqx_pd ]). all() -> emqx_ct:all(?MODULE). @@ -83,7 +82,7 @@ apply_op(Session, attrs) -> apply_op(Session, stats) -> Stats = emqx_session:stats(Session), ?assert(is_list(Stats)), - ?assertEqual(9, length(Stats)), + ?assertEqual(10, length(Stats)), Session; apply_op(Session, {info, InfoArg}) -> _Ret = emqx_session:info(InfoArg, Session), @@ -113,16 +112,16 @@ apply_op(Session, {publish, {PacketId, Msg}}) -> end; apply_op(Session, {puback, PacketId}) -> case emqx_session:puback(PacketId, Session) of - {ok, _Msg} -> - Session; - {ok, _Deliver, NSession} -> + {ok, _Msg, NSession} -> + NSession; + {ok, _Msg, _Publishes, NSession} -> NSession; {error, _ErrorCode} -> Session end; apply_op(Session, {pubrec, PacketId}) -> case emqx_session:pubrec(PacketId, Session) of - {ok, NSession} -> + {ok, _Msg, NSession} -> NSession; {error, _ErrorCode} -> Session @@ -283,7 +282,7 @@ session() -> {zone(), option()}, begin Session = emqx_session:init(#{zone => Zone}, Options), - emqx_session:set_pkt_id(Session, 16#ffff) + emqx_session:set_field(next_pkt_id, 16#ffff, Session) end). %%%%%%%%%%%%%%%%%%%%%%%%%% @@ -327,6 +326,5 @@ do_mock(emqx_message) -> do_mock(emqx_hooks) -> meck:expect(emqx_hooks, run, fun(_Hook, _Args) -> ok end); do_mock(emqx_zone) -> - meck:expect(emqx_zone, get_env, fun(Env, Key, Default) -> maps:get(Key, Env, Default) end); -do_mock(emqx_pd) -> - meck:expect(emqx_pd, update_counter, fun(_stats, _num) -> ok end). + meck:expect(emqx_zone, get_env, fun(Env, Key, Default) -> maps:get(Key, Env, Default) end). +