diff --git a/etc/emqx.conf b/etc/emqx.conf index 34c64b099..a211d91d9 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -483,16 +483,14 @@ acl_cache_ttl = 1m ## Default: ignore acl_deny_action = ignore -## The cleanning interval for flapping +## Specify the global flapping detect policy. +## The value is a string composed of flapping threshold, duration and banned interval. +## 1. threshold: an integer to specfify the disconnected times of a MQTT Client; +## 2. duration: the time window for flapping detect; +## 3. banned interval: the banned interval if a flapping is detected. ## -## Value: Duration -## -d: day -## -h: hour -## -m: minute -## -s: second -## -## Default: 1h, 1 hour -## flapping_clean_interval = 1h +## Value: Integer,Duration,Duration +flapping_detect_policy = 30, 1m, 5m ##-------------------------------------------------------------------- ## MQTT Protocol @@ -728,25 +726,6 @@ zone.external.mqueue_store_qos0 = true ## Value: on | off zone.external.enable_flapping_detect = off -## The times of state change per min, specifying the threshold which is used to -## detect if the connection starts flapping -## -## Value: number -zone.external.flapping_threshold = 10, 1m - -## Flapping expiry interval for connections. -## This config entry is used to determine when the connection -## will be unbanned. -## -## Value: Duration -## -d: day -## -h: hour -## -m: minute -## -s: second -## -## Default: 1h, 1 hour -zone.external.flapping_banned_expiry_interval = 1h - ## All the topics will be prefixed with the mountpoint path if this option is enabled. ## ## Variables in mountpoint path: @@ -828,25 +807,6 @@ zone.internal.mqueue_store_qos0 = true ## Value: on | off zone.internal.enable_flapping_detect = off -## The times of state change per second, specifying the threshold which is used to -## detect if the connection starts flapping -## -## Value: number -zone.internal.flapping_threshold = 10, 1m - -## Flapping expiry interval for connections. -## This config entry is used to determine when the connection -## will be unbanned. -## -## Value: Duration -## -d: day -## -h: hour -## -m: minute -## -s: second -## -## Default: 1h, 1 hour -zone.internal.flapping_banned_expiry_interval = 1h - ## All the topics will be prefixed with the mountpoint path if this option is enabled. ## ## Variables in mountpoint path: diff --git a/priv/emqx.schema b/priv/emqx.schema index 48b85ad6e..2c43ad577 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -631,11 +631,27 @@ end}. {datatype, {enum, [ignore, disconnect]}} ]}. -%% @doc time interval to clean flapping records -{mapping, "flapping_clean_interval", "emqx.flapping_clean_interval", [ - {datatype, {duration, ms}} +%% @doc Flapping detect policy +{mapping, "flapping_detect_policy", "emqx.flapping_detect_policy", [ + {datatype, string}, + {default, "30,1m,5m"} ]}. +{translation, "emqx.flapping_detect_policy", fun(Conf) -> + Policy = cuttlefish:conf_get("flapping_detect_policy", Conf), + [Threshold, Duration, Interval] = string:tokens(Policy, ", "), + ParseDuration = fun(S) -> + case cuttlefish_duration:parse(S, ms) of + I when is_integer(I) -> I; + {error, Reason} -> error(Reason) + end + end, + #{threshold => list_to_integer(Threshold), + duration => ParseDuration(Duration), + banned_interval => ParseDuration(Interval) + } +end}. + {validator, "range:gt_0", "must greater than 0", fun(X) -> X > 0 end }. @@ -877,15 +893,8 @@ end}. ]}. {mapping, "zone.$name.enable_flapping_detect", "emqx.zones", [ - {datatype, flag} -]}. - -{mapping, "zone.$name.flapping_threshold", "emqx.zones", [ - {datatype, string} -]}. - -{mapping, "zone.$name.flapping_banned_expiry_interval", "emqx.zones", [ - {datatype, {duration, s}} + {datatype, flag}, + {default, off} ]}. %% @doc Force connection/session process GC after this number of @@ -919,15 +928,6 @@ end}. {translation, "emqx.zones", fun(Conf) -> Mapping = fun("retain_available", Val) -> {retain_available, Val}; - ("flapping_threshold", Val) -> - [Limit, Duration] = string:tokens(Val, ", "), - FlappingThreshold = case cuttlefish_duration:parse(Duration, s) of - Min when is_integer(Min) -> - {list_to_integer(Limit), Min}; - {error, Reason} -> - error(Reason) - end, - {flapping_threshold, FlappingThreshold}; ("wildcard_subscription", Val) -> {wildcard_subscription, Val}; ("shared_subscription", Val) -> diff --git a/src/emqx.erl b/src/emqx.erl index 79356d38b..097b4b6c5 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -29,16 +29,18 @@ , stop/0 ]). +-export([ get_env/1 + , get_env/2 + ]). + %% PubSub API -export([ subscribe/1 , subscribe/2 , subscribe/3 + , publish/1 + , unsubscribe/1 ]). --export([publish/1]). - --export([unsubscribe/1]). - %% PubSub management API -export([ topics/0 , subscriptions/1 @@ -101,6 +103,15 @@ is_running(Node) -> Pid when is_pid(Pid) -> true end. +%% @doc Get environment +-spec(get_env(Key :: atom()) -> maybe(term())). +get_env(Key) -> + get_env(Key, undefined). + +-spec(get_env(Key :: atom(), Default :: term()) -> term()). +get_env(Key, Default) -> + application:get_env(?APP, Key, Default). + %%-------------------------------------------------------------------- %% PubSub API %%-------------------------------------------------------------------- diff --git a/src/emqx_app.erl b/src/emqx_app.erl index 29b8d9b49..8eacdca48 100644 --- a/src/emqx_app.erl +++ b/src/emqx_app.erl @@ -35,10 +35,10 @@ start(_Type, _Args) -> ok = emqx_modules:load(), ok = emqx_plugins:init(), emqx_plugins:load(), - ok = emqx_listeners:start(), + emqx_boot:is_enabled(listeners) + andalso (ok = emqx_listeners:start()), start_autocluster(), register(emqx, self()), - emqx_alarm_handler:load(), print_vsn(), {ok, Sup}. @@ -46,7 +46,8 @@ start(_Type, _Args) -> -spec(stop(State :: term()) -> term()). stop(_State) -> emqx_alarm_handler:unload(), - emqx_listeners:stop(), + emqx_boot:is_enabled(listeners) + andalso emqx_listeners:stop(), emqx_modules:unload(). %%-------------------------------------------------------------------- diff --git a/src/emqx_banned.erl b/src/emqx_banned.erl index ba3743385..c24bb4294 100644 --- a/src/emqx_banned.erl +++ b/src/emqx_banned.erl @@ -70,7 +70,10 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -spec(check(emqx_types:client()) -> boolean()). -check(#{client_id := ClientId, username := Username, peername := {IPAddr, _}}) -> +check(#{client_id := ClientId, + username := Username, + peername := {IPAddr, _} + }) -> ets:member(?BANNED_TAB, {client_id, ClientId}) orelse ets:member(?BANNED_TAB, {username, Username}) orelse ets:member(?BANNED_TAB, {ipaddr, IPAddr}). diff --git a/src/emqx_config.erl b/src/emqx_boot.erl similarity index 69% rename from src/emqx_config.erl rename to src/emqx_boot.erl index bef0302f6..aa89449cc 100644 --- a/src/emqx_config.erl +++ b/src/emqx_boot.erl @@ -14,19 +14,16 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_config). +-module(emqx_boot). --export([ get_env/1 - , get_env/2 - ]). +-export([is_enabled/1]). --define(APP, emqx). +-define(BOOT_MODULES, [router, broker, listeners]). -%% @doc Get environment --spec(get_env(Key :: atom()) -> term() | undefined). -get_env(Key) -> - get_env(Key, undefined). +-spec(is_enabled(all|list(router|broker|listeners)) -> boolean()). +is_enabled(Mod) -> + (BootMods = boot_modules()) =:= all orelse lists:member(Mod, BootMods). + +boot_modules() -> + application:get_env(emqx, boot_modules, ?BOOT_MODULES). --spec(get_env(Key :: atom(), Default :: term()) -> term()). -get_env(Key, Default) -> - application:get_env(?APP, Key, Default). diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 359e65ab8..f171d3e98 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -236,7 +236,7 @@ route(Routes, Delivery) -> do_route({To, Node}, Delivery) when Node =:= node() -> {Node, To, dispatch(To, Delivery)}; do_route({To, Node}, Delivery) when is_atom(Node) -> - {Node, To, forward(Node, To, Delivery, emqx_config:get_env(rpc_mode, async))}; + {Node, To, forward(Node, To, Delivery, emqx:get_env(rpc_mode, async))}; do_route({To, Group}, Delivery) when is_tuple(Group); is_binary(Group) -> {share, To, emqx_shared_sub:dispatch(Group, To, Delivery)}. diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index aa001cb01..224965fde 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -50,8 +50,7 @@ ]). -import(emqx_misc, - [ run_fold/2 - , run_fold/3 + [ run_fold/3 , pipeline/3 , maybe_apply/2 ]). @@ -106,7 +105,7 @@ %% Init the channel %%-------------------------------------------------------------------- --spec(init(emqx_types:conn(), proplists:proplist()) -> channel()). +-spec(init(emqx_types:conninfo(), proplists:proplist()) -> channel()). init(ConnInfo, Options) -> Zone = proplists:get_value(zone, Options), Peercert = maps:get(peercert, ConnInfo, undefined), @@ -216,11 +215,12 @@ handle_in(?CONNECT_PACKET(_), Channel = #channel{connected = true}) -> handle_out({disconnect, ?RC_PROTOCOL_ERROR}, Channel); handle_in(?CONNECT_PACKET(ConnPkt), Channel) -> - case pipeline([fun validate_packet/2, - fun check_connect/2, + case pipeline([fun check_connpkt/2, fun init_protocol/2, fun enrich_client/2, fun set_logger_meta/2, + fun check_banned/2, + fun check_flapping/2, fun auth_connect/2], ConnPkt, Channel) of {ok, NConnPkt, NChannel} -> process_connect(NConnPkt, NChannel); @@ -230,7 +230,7 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) -> handle_in(Packet = ?PUBLISH_PACKET(_QoS, Topic, _PacketId), Channel = #channel{protocol = Protocol}) -> - case pipeline([fun validate_packet/2, + case pipeline([fun emqx_packet:check/1, fun process_alias/2, fun check_publish/2], Packet, Channel) of {ok, NPacket, NChannel} -> @@ -300,22 +300,27 @@ handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = S {ok, Channel} end; -handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), Channel) -> - case validate_packet(Packet, Channel) of - ok -> - TopicFilters = preprocess_subscribe(Properties, RawTopicFilters, Channel), - {ReasonCodes, NChannel} = process_subscribe(TopicFilters, Channel), - handle_out({suback, PacketId, ReasonCodes}, NChannel); +handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), + Channel = #channel{client = Client}) -> + case emqx_packet:check(Packet) of + ok -> TopicFilters1 = emqx_hooks:run_fold('client.subscribe', + [Client, Properties], + parse_topic_filters(TopicFilters)), + TopicFilters2 = enrich_subid(Properties, TopicFilters1), + {ReasonCodes, NChannel} = process_subscribe(TopicFilters2, Channel), + handle_out({suback, PacketId, ReasonCodes}, NChannel); {error, ReasonCode} -> handle_out({disconnect, ReasonCode}, Channel) end; -handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters), Channel) -> - case validate_packet(Packet, Channel) of - ok -> - TopicFilters = preprocess_unsubscribe(Properties, RawTopicFilters, Channel), - {ReasonCodes, NChannel} = process_unsubscribe(TopicFilters, Channel), - handle_out({unsuback, PacketId, ReasonCodes}, NChannel); +handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), + Channel = #channel{client = Client}) -> + case emqx_packet:check(Packet) of + ok -> TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe', + [Client, Properties], + parse_topic_filters(TopicFilters)), + {ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel), + handle_out({unsuback, PacketId, ReasonCodes}, NChannel); {error, ReasonCode} -> handle_out({disconnect, ReasonCode}, Channel) end; @@ -419,24 +424,18 @@ process_publish(PacketId, Msg = #message{qos = ?QOS_2}, handle_out({pubrec, PacketId, RC}, Channel) end. -publish_to_msg(Packet, #channel{client = Client = #{mountpoint := MountPoint}}) -> +publish_to_msg(Packet, #channel{client = Client = #{mountpoint := MountPoint}, + protocol = Protocol}) -> Msg = emqx_packet:to_message(Client, Packet), Msg1 = emqx_message:set_flag(dup, false, Msg), - emqx_mountpoint:mount(MountPoint, Msg1). + ProtoVer = emqx_protocol:info(proto_ver, Protocol), + Msg2 = emqx_message:set_header(proto_ver, ProtoVer, Msg1), + emqx_mountpoint:mount(MountPoint, Msg2). %%-------------------------------------------------------------------- %% Process Subscribe %%-------------------------------------------------------------------- --compile({inline, [preprocess_subscribe/3]}). -preprocess_subscribe(Properties, RawTopicFilters, #channel{client = Client}) -> - RunHook = fun(TopicFilters) -> - emqx_hooks:run_fold('client.subscribe', - [Client, Properties], TopicFilters) - end, - Enrich = fun(TopicFilters) -> enrich_subid(Properties, TopicFilters) end, - run_fold([fun parse_topic_filters/1, RunHook, Enrich], RawTopicFilters). - process_subscribe(TopicFilters, Channel) -> process_subscribe(TopicFilters, [], Channel). @@ -466,14 +465,6 @@ do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel = %% Process Unsubscribe %%-------------------------------------------------------------------- --compile({inline, [preprocess_unsubscribe/3]}). -preprocess_unsubscribe(Properties, RawTopicFilter, #channel{client = Client}) -> - RunHook = fun(TopicFilters) -> - emqx_hooks:run_fold('client.unsubscribe', - [Client, Properties], TopicFilters) - end, - run_fold([fun parse_topic_filters/1, RunHook], RawTopicFilter). - -compile({inline, [process_unsubscribe/2]}). process_unsubscribe(TopicFilters, Channel) -> process_unsubscribe(TopicFilters, [], Channel). @@ -523,7 +514,7 @@ handle_out({connack, ReasonCode}, Channel = #channel{client = Client, }) -> ok = emqx_hooks:run('client.connected', [Client, ReasonCode, attrs(Channel)]), ProtoVer = case Protocol of - undefined -> undefined; + undefined -> ?MQTT_PROTO_V5; _ -> emqx_protocol:info(proto_ver, Protocol) end, ReasonCode1 = if @@ -551,9 +542,6 @@ handle_out({deliver, Delivers}, Channel = #channel{session = Session}) -> {ok, Channel#channel{session = NSession}} end; -handle_out({publish, [Publish]}, Channel) -> - handle_out(Publish, Channel); - handle_out({publish, Publishes}, Channel) when is_list(Publishes) -> Packets = lists:foldl( fun(Publish, Acc) -> @@ -576,7 +564,7 @@ handle_out({publish, PacketId, Msg}, Channel = Msg1 = emqx_message:update_expiry(Msg), Msg2 = emqx_hooks:run_fold('message.delivered', [Client], Msg1), Msg3 = emqx_mountpoint:unmount(MountPoint, Msg2), - {ok, emqx_packet:from_message(PacketId, Msg3), Channel}; + {ok, emqx_message:to_packet(PacketId, Msg3), Channel}; handle_out({puback, PacketId, ReasonCode}, Channel) -> {ok, ?PUBACK_PACKET(PacketId, ReasonCode), Channel}; @@ -628,7 +616,10 @@ handle_out({Type, Data}, Channel) -> handle_call(kick, Channel) -> {stop, {shutdown, kicked}, ok, Channel}; -handle_call(discard, Channel) -> +handle_call(discard, Channel = #channel{connected = true}) -> + Packet = ?DISCONNECT_PACKET(?RC_SESSION_TAKEN_OVER), + {stop, {shutdown, discarded}, Packet, ok, Channel}; +handle_call(discard, Channel = #channel{connected = false}) -> {stop, {shutdown, discarded}, ok, Channel}; %% Session Takeover @@ -666,16 +657,18 @@ handle_cast(Msg, Channel) -> -spec(handle_info(Info :: term(), channel()) -> {ok, channel()} | {stop, Reason :: term(), channel()}). -handle_info({subscribe, RawTopicFilters}, Channel) -> - TopicFilters = preprocess_subscribe(#{'Internal' => true}, - RawTopicFilters, Channel), - {_ReasonCodes, NChannel} = process_subscribe(TopicFilters, Channel), +handle_info({subscribe, TopicFilters}, Channel = #channel{client = Client}) -> + TopicFilters1 = emqx_hooks:run_fold('client.subscribe', + [Client, #{'Internal' => true}], + parse_topic_filters(TopicFilters)), + {_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel), {ok, NChannel}; -handle_info({unsubscribe, RawTopicFilters}, Channel) -> - TopicFilters = preprocess_unsubscribe(#{'Internal' => true}, - RawTopicFilters, Channel), - {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters, Channel), +handle_info({unsubscribe, TopicFilters}, Channel = #channel{client = Client}) -> + TopicFilters1 = emqx_hooks:run_fold('client.unsubscribe', + [Client, #{'Internal' => true}], + parse_topic_filters(TopicFilters)), + {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel), {ok, NChannel}; handle_info(disconnected, Channel = #channel{connected = undefined}) -> @@ -828,141 +821,51 @@ received(Oct, Channel) -> sent(Oct, Channel) -> ensure_timer(stats_timer, maybe_gc_and_check_oom(Oct, Channel)). -%%TODO: Improve will msg:) +%% TODO: Improve will msg:) publish_will_msg(undefined) -> ok; publish_will_msg(Msg) -> emqx_broker:publish(Msg). -%% @doc Validate incoming packet. --spec(validate_packet(emqx_types:packet(), channel()) - -> ok | {error, emqx_types:reason_code()}). -validate_packet(Packet, _Channel) -> - try emqx_packet:validate(Packet) of - true -> ok - catch - error:protocol_error -> - {error, ?RC_PROTOCOL_ERROR}; - error:subscription_identifier_invalid -> - {error, ?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED}; - error:topic_alias_invalid -> - {error, ?RC_TOPIC_ALIAS_INVALID}; - error:topic_filters_invalid -> - {error, ?RC_TOPIC_FILTER_INVALID}; - error:topic_name_invalid -> - {error, ?RC_TOPIC_FILTER_INVALID}; - error:_Reason -> - {error, ?RC_MALFORMED_PACKET} - end. +%% @doc Check connect packet. +check_connpkt(ConnPkt, #channel{client = #{zone := Zone}}) -> + emqx_packet:check(ConnPkt, emqx_mqtt_caps:get_caps(Zone)). -%%-------------------------------------------------------------------- -%% Check connect packet -%%-------------------------------------------------------------------- +%% @doc Init protocol record. +init_protocol(ConnPkt, Channel = #channel{client = #{zone := Zone}}) -> + {ok, Channel#channel{protocol = emqx_protocol:init(ConnPkt, Zone)}}. -check_connect(ConnPkt, Channel) -> - pipeline([fun check_proto_ver/2, - fun check_client_id/2, - %%fun check_flapping/2, - fun check_banned/2, - fun check_will_topic/2, - fun check_will_retain/2], ConnPkt, Channel). - -check_proto_ver(#mqtt_packet_connect{proto_ver = Ver, - proto_name = Name}, _Channel) -> - case lists:member({Ver, Name}, ?PROTOCOL_NAMES) of - true -> ok; - false -> {error, ?RC_UNSUPPORTED_PROTOCOL_VERSION} - end. - -%% MQTT3.1 does not allow null clientId -check_client_id(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V3, - client_id = <<>> - }, _Channel) -> - {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}; - -%% Issue#599: Null clientId and clean_start = false -check_client_id(#mqtt_packet_connect{client_id = <<>>, - clean_start = false}, _Channel) -> - {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}; - -check_client_id(#mqtt_packet_connect{client_id = <<>>, - clean_start = true}, _Channel) -> - ok; - -check_client_id(#mqtt_packet_connect{client_id = ClientId}, - #channel{client = #{zone := Zone}}) -> - Len = byte_size(ClientId), - MaxLen = emqx_zone:get_env(Zone, max_clientid_len), - case (1 =< Len) andalso (Len =< MaxLen) of - true -> ok; - false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID} - end. - -%%TODO: check banned... -check_banned(#mqtt_packet_connect{client_id = ClientId, - username = Username}, - #channel{client = Client = #{zone := Zone}}) -> - case emqx_zone:get_env(Zone, enable_ban, false) of - true -> - case emqx_banned:check(Client#{client_id => ClientId, - username => Username}) of - true -> {error, ?RC_BANNED}; - false -> ok - end; - false -> ok - end. - -check_will_topic(#mqtt_packet_connect{will_flag = false}, _Channel) -> - ok; -check_will_topic(#mqtt_packet_connect{will_topic = WillTopic}, _Channel) -> - try emqx_topic:validate(WillTopic) of - true -> ok - catch error:_Error -> - {error, ?RC_TOPIC_NAME_INVALID} - end. - -check_will_retain(#mqtt_packet_connect{will_retain = false}, _Channel) -> - ok; -check_will_retain(#mqtt_packet_connect{will_retain = true}, - #channel{client = #{zone := Zone}}) -> - case emqx_zone:get_env(Zone, mqtt_retain_available, true) of - true -> ok; - false -> {error, ?RC_RETAIN_NOT_SUPPORTED} - end. - -init_protocol(ConnPkt, Channel) -> - {ok, Channel#channel{protocol = emqx_protocol:init(ConnPkt)}}. - -%%-------------------------------------------------------------------- -%% Enrich client -%%-------------------------------------------------------------------- - -enrich_client(ConnPkt = #mqtt_packet_connect{is_bridge = IsBridge}, - Channel = #channel{client = Client}) -> +%% @doc Enrich client +enrich_client(ConnPkt, Channel = #channel{client = Client}) -> {ok, NConnPkt, NClient} = pipeline([fun set_username/2, + fun set_bridge_mode/2, fun maybe_username_as_clientid/2, fun maybe_assign_clientid/2, fun fix_mountpoint/2 ], ConnPkt, Client), - {ok, NConnPkt, Channel#channel{client = NClient#{is_bridge => IsBridge}}}. + {ok, NConnPkt, Channel#channel{client = NClient}}. -%% Username may be not undefined if peer_cert_as_username set_username(#mqtt_packet_connect{username = Username}, Client = #{username := undefined}) -> {ok, Client#{username => Username}}; set_username(_ConnPkt, Client) -> {ok, Client}. +set_bridge_mode(#mqtt_packet_connect{is_bridge = true}, Client) -> + {ok, Client#{is_bridge => true}}; +set_bridge_mode(_ConnPkt, _Client) -> ok. + maybe_username_as_clientid(_ConnPkt, Client = #{username := undefined}) -> {ok, Client}; maybe_username_as_clientid(_ConnPkt, Client = #{zone := Zone, username := Username}) -> - case emqx_zone:get_env(Zone, use_username_as_clientid, false) of + case emqx_zone:use_username_as_clientid(Zone) of true -> {ok, Client#{client_id => Username}}; false -> ok end. maybe_assign_clientid(#mqtt_packet_connect{client_id = <<>>}, Client) -> - RandClientId = emqx_guid:to_base62(emqx_guid:gen()), - {ok, Client#{client_id => RandClientId}}; + %% Generate a rand clientId + RandId = emqx_guid:to_base62(emqx_guid:gen()), + {ok, Client#{client_id => RandId}}; maybe_assign_clientid(#mqtt_packet_connect{client_id = ClientId}, Client) -> {ok, Client#{client_id => ClientId}}. @@ -974,6 +877,23 @@ fix_mountpoint(_ConnPkt, Client = #{mountpoint := Mountpoint}) -> set_logger_meta(_ConnPkt, #channel{client = #{client_id := ClientId}}) -> emqx_logger:set_metadata_client_id(ClientId). +%%-------------------------------------------------------------------- +%% Check banned/flapping +%%-------------------------------------------------------------------- + +check_banned(_ConnPkt, #channel{client = Client = #{zone := Zone}}) -> + case emqx_zone:enable_banned(Zone) andalso emqx_banned:check(Client) of + true -> {error, ?RC_BANNED}; + false -> ok + end. + +check_flapping(_ConnPkt, #channel{client = Client = #{zone := Zone}}) -> + case emqx_zone:enable_flapping_detect(Zone) + andalso emqx_flapping:check(Client) of + true -> {error, ?RC_CONNECTION_RATE_EXCEEDED}; + false -> ok + end. + %%-------------------------------------------------------------------- %% Auth Connect %%-------------------------------------------------------------------- @@ -1184,7 +1104,7 @@ maybe_resume_session(#channel{session = Session, %% @doc Is ACL enabled? is_acl_enabled(#{zone := Zone, is_superuser := IsSuperuser}) -> - (not IsSuperuser) andalso emqx_zone:get_env(Zone, enable_acl, true). + (not IsSuperuser) andalso emqx_zone:enable_acl(Zone). %% @doc Parse Topic Filters -compile({inline, [parse_topic_filters/1]}). diff --git a/src/emqx_cm_locker.erl b/src/emqx_cm_locker.erl index c5cd9c2f6..874863b63 100644 --- a/src/emqx_cm_locker.erl +++ b/src/emqx_cm_locker.erl @@ -62,5 +62,5 @@ unlock(ClientId) -> -spec(strategy() -> local | one | quorum | all). strategy() -> - emqx_config:get_env(session_locking_strategy, quorum). + emqx:get_env(session_locking_strategy, quorum). diff --git a/src/emqx_cm_registry.erl b/src/emqx_cm_registry.erl index ce843a427..04dd04e80 100644 --- a/src/emqx_cm_registry.erl +++ b/src/emqx_cm_registry.erl @@ -62,7 +62,7 @@ start_link() -> %% @doc Is the global registry enabled? -spec(is_enabled() -> boolean()). is_enabled() -> - emqx_config:get_env(enable_channel_registry, true). + emqx:get_env(enable_channel_registry, true). %% @doc Register a global channel. -spec(register_channel(emqx_types:client_id() diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 54da647fb..f4a11bcbc 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -104,7 +104,7 @@ info(CPid) when is_pid(CPid) -> info(Conn = #connection{chan_state = ChanState}) -> ConnInfo = info(?INFO_KEYS, Conn), ChanInfo = emqx_channel:info(ChanState), - maps:merge(ChanInfo, #{connection => maps:from_list(ConnInfo)}). + maps:merge(ChanInfo, #{conninfo => maps:from_list(ConnInfo)}). info(Keys, Conn) when is_list(Keys) -> [{Key, info(Key, Conn)} || Key <- Keys]; @@ -135,7 +135,7 @@ attrs(CPid) when is_pid(CPid) -> attrs(Conn = #connection{chan_state = ChanState}) -> ConnAttrs = info(?ATTR_KEYS, Conn), ChanAttrs = emqx_channel:attrs(ChanState), - maps:merge(ChanAttrs, #{connection => maps:from_list(ConnAttrs)}). + maps:merge(ChanAttrs, #{conninfo => maps:from_list(ConnAttrs)}). %% @doc Get stats of the channel. -spec(stats(pid()|connection()) -> emqx_types:stats()). @@ -191,7 +191,8 @@ init({Transport, RawSocket, Options}) -> rate_limit = RateLimit, pub_limit = PubLimit, parse_state = ParseState, - chan_state = ChanState + chan_state = ChanState, + serialize = serialize_fun(?MQTT_PROTO_V5, undefined) }, gen_statem:enter_loop(?MODULE, [{hibernate_after, 2 * IdleTimout}], idle, State, self(), [IdleTimout]). @@ -217,8 +218,9 @@ idle(timeout, _Timeout, State) -> shutdown(idle_timeout, State); idle(cast, {incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) -> - #mqtt_packet_connect{proto_ver = ProtoVer} = ConnPkt, - NState = State#connection{serialize = serialize_fun(ProtoVer)}, + #mqtt_packet_connect{proto_ver = ProtoVer, properties = Properties} = ConnPkt, + MaxPacketSize = emqx_mqtt_props:get_property('Maximum-Packet-Size', Properties, undefined), + NState = State#connection{serialize = serialize_fun(ProtoVer, MaxPacketSize)}, SuccFun = fun(NewSt) -> {next_state, connected, NewSt} end, handle_incoming(Packet, SuccFun, NState); @@ -283,6 +285,10 @@ handle({call, From}, Req, State = #connection{chan_state = ChanState}) -> {ok, Reply, NChanState} -> reply(From, Reply, State#connection{chan_state = NChanState}); {stop, Reason, Reply, NChanState} -> + ok = gen_statem:reply(From, Reply), + stop(Reason, State#connection{chan_state = NChanState}); + {stop, Reason, Packet, Reply, NChanState} -> + handle_outgoing(Packet, fun (_) -> ok end, State#connection{chan_state = NChanState}), ok = gen_statem:reply(From, Reply), stop(Reason, State#connection{chan_state = NChanState}) end; @@ -402,9 +408,10 @@ process_incoming(Data, State) -> process_incoming(<<>>, Packets, State) -> {keep_state, State, next_incoming_events(Packets)}; -process_incoming(Data, Packets, State = #connection{parse_state = ParseState, chan_state = ChanState}) -> +process_incoming(Data, Packets, State = #connection{parse_state = ParseState, + chan_state = ChanState}) -> try emqx_frame:parse(Data, ParseState) of - {ok, NParseState} -> + {more, NParseState} -> NState = State#connection{parse_state = NParseState}, {keep_state, NState, next_incoming_events(Packets)}; {ok, Packet, Rest, NParseState} -> @@ -414,15 +421,25 @@ process_incoming(Data, Packets, State = #connection{parse_state = ParseState, ch shutdown(Reason, State) catch error:Reason:Stk -> - ?LOG(error, "Parse failed for ~p~n\ - Stacktrace:~p~nError data:~p", [Reason, Stk, Data]), - case emqx_channel:handle_out({disconnect, emqx_reason_codes:mqtt_frame_error(Reason)}, ChanState) of + ?LOG(error, "Parse failed for ~p~nStacktrace:~p~nError data:~p", [Reason, Stk, Data]), + Result = + case emqx_channel:info(connected, ChanState) of + undefined -> + emqx_channel:handle_out({connack, emqx_reason_codes:mqtt_frame_error(Reason)}, ChanState); + true -> + emqx_channel:handle_out({disconnect, emqx_reason_codes:mqtt_frame_error(Reason)}, ChanState); + _ -> + ignore + end, + case Result of {stop, Reason0, OutPackets, NChanState} -> Shutdown = fun(NewSt) -> stop(Reason0, NewSt) end, NState = State#connection{chan_state = NChanState}, handle_outgoing(OutPackets, Shutdown, NState); {stop, Reason0, NChanState} -> - stop(Reason0, State#connection{chan_state = NChanState}) + stop(Reason0, State#connection{chan_state = NChanState}); + ignore -> + keep_state(State) end end. @@ -479,12 +496,19 @@ handle_outgoing(Packet, SuccFun, State = #connection{serialize = Serialize}) -> %%-------------------------------------------------------------------- %% Serialize fun -serialize_fun(ProtoVer) -> +serialize_fun(ProtoVer, MaxPacketSize) -> fun(Packet = ?PACKET(Type)) -> - ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]), - _ = inc_outgoing_stats(Type), - _ = emqx_metrics:inc_sent(Packet), - emqx_frame:serialize(Packet, ProtoVer) + IoData = emqx_frame:serialize(Packet, ProtoVer), + case Type =/= ?PUBLISH orelse MaxPacketSize =:= undefined orelse iolist_size(IoData) =< MaxPacketSize of + true -> + ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]), + _ = inc_outgoing_stats(Type), + _ = emqx_metrics:inc_sent(Packet), + IoData; + false -> + ?LOG(warning, "DROP ~s due to oversize packet size", [emqx_packet:format(Packet)]), + <<"">> + end end. %%-------------------------------------------------------------------- diff --git a/src/emqx_ctl.erl b/src/emqx_ctl.erl index 60ee37039..dbde2b60b 100644 --- a/src/emqx_ctl.erl +++ b/src/emqx_ctl.erl @@ -34,6 +34,19 @@ , lookup_command/1 ]). +-export([ print/1 + , print/2 + , usage/1 + , usage/2 + ]). + +%% format/1,2 and format_usage/1,2 are exported mainly for test cases +-export([ format/1 + , format/2 + , format_usage/1 + , format_usage/2 + ]). + %% gen_server callbacks -export([ init/1 , handle_call/3 @@ -46,6 +59,8 @@ -record(state, {seq = 0}). -type(cmd() :: atom()). +-type(cmd_descr() :: string()). +-type(cmd_usage() :: {cmd(), cmd_descr()}). -define(SERVER, ?MODULE). -define(TAB, emqx_command). @@ -75,7 +90,7 @@ run_command([Cmd | Args]) -> -spec(run_command(cmd(), [string()]) -> ok | {error, term()}). run_command(help, []) -> - usage(); + help(); run_command(Cmd, Args) when is_atom(Cmd) -> case lookup_command(Cmd) of [{Mod, Fun}] -> @@ -87,7 +102,7 @@ run_command(Cmd, Args) when is_atom(Cmd) -> {error, Reason} end; [] -> - usage(), {error, cmd_not_found} + help(), {error, cmd_not_found} end. -spec(lookup_command(cmd()) -> [{module(), atom()}]). @@ -97,11 +112,51 @@ lookup_command(Cmd) when is_atom(Cmd) -> [] -> [] end. -usage() -> - io:format("Usage: ~s~n", [?MODULE]), - [begin io:format("~80..-s~n", [""]), Mod:Cmd(usage) end +help() -> + print("Usage: ~s~n", [?MODULE]), + [begin print("~80..-s~n", [""]), Mod:Cmd(usage) end || {_, {Mod, Cmd}, _} <- ets:tab2list(?TAB)]. +-spec(print(io:format()) -> ok). +print(Msg) -> + io:format(format(Msg)). + +-spec(print(io:format(), [term()]) -> ok). +print(Format, Args) -> + io:format(format(Format, Args)). + +-spec(usage([cmd_usage()]) -> ok). +usage(UsageList) -> + io:format(format_usage(UsageList)). + +-spec(usage(cmd(), cmd_descr()) -> ok). +usage(Cmd, Desc) -> + io:format(format_usage(Cmd, Desc)). + +-spec(format(io:format()) -> string()). +format(Msg) -> + lists:flatten(io_lib:format("~p", [Msg])). + +-spec(format(io:format(), [term()]) -> string()). +format(Format, Args) -> + lists:flatten(io_lib:format(Format, Args)). + +-spec(format_usage([cmd_usage()]) -> ok). +format_usage(UsageList) -> + lists:map( + fun({Cmd, Desc}) -> + format_usage(Cmd, Desc) + end, UsageList). + +-spec(format_usage(cmd(), cmd_descr()) -> string()). +format_usage(Cmd, Desc) -> + CmdLines = split_cmd(Cmd), + DescLines = split_cmd(Desc), + lists:foldl( + fun({CmdStr, DescStr}, Usage) -> + Usage ++ format("~-48s# ~s~n", [CmdStr, DescStr]) + end, "", zip_cmd(CmdLines, DescLines)). + %%------------------------------------------------------------------------------ %% gen_server callbacks %%------------------------------------------------------------------------------ @@ -151,3 +206,11 @@ noreply(State) -> next_seq(State = #state{seq = Seq}) -> State#state{seq = Seq + 1}. +split_cmd(CmdStr) -> + Lines = string:split(CmdStr, "\n", all), + [L || L <- Lines, L =/= []]. + +zip_cmd([X | Xs], [Y | Ys]) -> [{X, Y} | zip_cmd(Xs, Ys)]; +zip_cmd([X | Xs], []) -> [{X, ""} | zip_cmd(Xs, [])]; +zip_cmd([], [Y | Ys]) -> [{"", Y} | zip_cmd([], Ys)]; +zip_cmd([], []) -> []. diff --git a/src/emqx_flapping.erl b/src/emqx_flapping.erl index ba3f444b7..6e5f98c14 100644 --- a/src/emqx_flapping.erl +++ b/src/emqx_flapping.erl @@ -14,129 +14,184 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc This module is used to garbage clean the flapping records. - -module(emqx_flapping). +-behaviour(gen_server). + -include("emqx.hrl"). -include("types.hrl"). +-include("logger.hrl"). --behaviour(gen_statem). +-logger_header("[Flapping]"). --export([start_link/0]). +-export([start_link/0, stop/0]). -%% gen_statem callbacks +%% API +-export([check/1, detect/1]). + +%% gen_server callbacks -export([ init/1 - , initialized/3 - , callback_mode/0 - , terminate/3 - , code_change/4 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 ]). +%% Tab -define(FLAPPING_TAB, ?MODULE). - --define(default_flapping_clean_interval, 3600000). - --export([check/3]). - --record(flapping, { - client_id :: binary(), - check_count :: integer(), - timestamp :: integer() +%% Default Policy +-define(FLAPPING_THRESHOLD, 30). +-define(FLAPPING_DURATION, 60000). +-define(FLAPPING_BANNED_INTERVAL, 300000). +-define(DEFAULT_DETECT_POLICY, + #{threshold => ?FLAPPING_THRESHOLD, + duration => ?FLAPPING_DURATION, + banned_interval => ?FLAPPING_BANNED_INTERVAL }). --type(flapping_record() :: #flapping{}). +-record(flapping, { + client_id :: emqx_types:client_id(), + peername :: emqx_types:peername(), + started_at :: pos_integer(), + detect_cnt :: pos_integer(), + banned_at :: pos_integer() + }). --type(flapping_state() :: flapping | ok). +-opaque(flapping() :: #flapping{}). -%% @doc This function is used to initialize flapping records -%% the expiry time unit is minutes. --spec(init_flapping(ClientId :: binary(), Interval :: integer()) -> flapping_record()). -init_flapping(ClientId, Interval) -> - #flapping{client_id = ClientId, - check_count = 1, - timestamp = emqx_time:now_secs() + Interval}. +-export_type([flapping/0]). -%% @doc This function is used to initialize flapping records -%% the expiry time unit is minutes. --spec(check(Action :: atom(), ClientId :: binary(), - Threshold :: {integer(), integer()}) -> flapping_state()). -check(Action, ClientId, Threshold = {_TimesThreshold, TimeInterval}) -> - check(Action, ClientId, Threshold, init_flapping(ClientId, TimeInterval)). - --spec(check(Action :: atom(), ClientId :: binary(), - Threshold :: {integer(), integer()}, - InitFlapping :: flapping_record()) -> flapping_state()). -check(Action, ClientId, Threshold, InitFlapping) -> - case ets:update_counter(?FLAPPING_TAB, ClientId, {#flapping.check_count, 1}, InitFlapping) of - 1 -> ok; - CheckCount -> - case ets:lookup(?FLAPPING_TAB, ClientId) of - [Flapping] -> - check_flapping(Action, CheckCount, Threshold, Flapping); - _Flapping -> - ok - end - end. - -check_flapping(Action, CheckCount, _Threshold = {TimesThreshold, TimeInterval}, - Flapping = #flapping{ client_id = ClientId - , timestamp = Timestamp }) -> - case emqx_time:now_secs() of - NowTimestamp when NowTimestamp =< Timestamp, - CheckCount > TimesThreshold -> - ets:delete(?FLAPPING_TAB, ClientId), - flapping; - NowTimestamp when NowTimestamp > Timestamp, - Action =:= disconnect -> - ets:delete(?FLAPPING_TAB, ClientId), - ok; - NowTimestamp -> - NewFlapping = Flapping#flapping{timestamp = NowTimestamp + TimeInterval}, - ets:insert(?FLAPPING_TAB, NewFlapping), - ok - end. - -%%-------------------------------------------------------------------- -%% gen_statem callbacks -%%-------------------------------------------------------------------- --spec(start_link() -> startlink_ret()). +-spec(start_link() -> emqx_types:startlink_ret()). start_link() -> - gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []). + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +stop() -> gen_server:stop(?MODULE). + +%% @doc Check flapping when a MQTT client connected. +-spec(check(emqx_types:client()) -> boolean()). +check(#{client_id := ClientId}) -> + check(ClientId, get_policy()). + +check(ClientId, #{banned_interval := Interval}) -> + case ets:lookup(?FLAPPING_TAB, {banned, ClientId}) of + [] -> false; + [#flapping{banned_at = BannedAt}] -> + now_diff(BannedAt) < Interval + end. + +%% @doc Detect flapping when a MQTT client disconnected. +-spec(detect(emqx_types:client()) -> boolean()). +detect(Client) -> detect(Client, get_policy()). + +detect(#{client_id := ClientId, peername := Peername}, + Policy = #{threshold := Threshold}) -> + try ets:update_counter(?FLAPPING_TAB, ClientId, {#flapping.detect_cnt, 1}) of + Cnt when Cnt < Threshold -> false; + _Cnt -> case ets:lookup(?FLAPPING_TAB, ClientId) of + [Flapping] -> + ok = gen_server:cast(?MODULE, {detected, Flapping, Policy}), + true; + [] -> false + end + catch + error:badarg -> + %% Create a flapping record. + Flapping = #flapping{client_id = ClientId, + peername = Peername, + started_at = emqx_time:now_ms(), + detect_cnt = 1 + }, + true = ets:insert(?FLAPPING_TAB, Flapping), + false + end. + +-compile({inline, [get_policy/0, now_diff/1]}). + +get_policy() -> + emqx:get_env(flapping_detect_policy, ?DEFAULT_DETECT_POLICY). + +now_diff(TS) -> emqx_time:now_ms() - TS. + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- init([]) -> - Interval = emqx_config:get_env(flapping_clean_interval, ?default_flapping_clean_interval), - TabOpts = [ public - , set - , {keypos, 2} - , {write_concurrency, true} - , {read_concurrency, true}], - ok = emqx_tables:new(?FLAPPING_TAB, TabOpts), - {ok, initialized, #{timer_interval => Interval}}. + #{duration := Duration, banned_interval := Interval} = get_policy(), + ok = emqx_tables:new(?FLAPPING_TAB, [public, set, + {keypos, 2}, + {read_concurrency, true}, + {write_concurrency, true} + ]), + State = #{time => max(Duration, Interval) + 1, tref => undefined}, + {ok, ensure_timer(State), hibernate}. -callback_mode() -> [state_functions, state_enter]. +handle_call(Req, _From, State) -> + ?LOG(error, "Unexpected call: ~p", [Req]), + {reply, ignored, State}. -initialized(enter, _OldState, #{timer_interval := Time}) -> - Action = {state_timeout, Time, clean_expired_records}, - {keep_state_and_data, Action}; -initialized(state_timeout, clean_expired_records, #{}) -> - clean_expired_records(), - repeat_state_and_data. +handle_cast({detected, Flapping = #flapping{client_id = ClientId, + peername = Peername, + started_at = StartedAt, + detect_cnt = DetectCnt}, + #{duration := Duration}}, State) -> + case (Interval = now_diff(StartedAt)) < Duration of + true -> %% Flapping happened:( + %% Log first + ?LOG(error, "Flapping detected: ~s(~s) disconnected ~w times in ~wms", + [ClientId, esockd_net:format(Peername), DetectCnt, Duration]), + %% TODO: Send Alarm + %% Banned. + BannedFlapping = Flapping#flapping{client_id = {banned, ClientId}, + banned_at = emqx_time:now_ms() + }, + ets:insert(?FLAPPING_TAB, BannedFlapping); + false -> + ?LOG(warning, "~s(~s) disconnected ~w times in ~wms", + [ClientId, esockd_net:format(Peername), DetectCnt, Interval]), + ets:delete_object(?FLAPPING_TAB, Flapping) + end, + {noreply, State}; -code_change(_Vsn, State, Data, _Extra) -> - {ok, State, Data}. +handle_cast(Msg, State) -> + ?LOG(error, "Unexpected cast: ~p", [Msg]), + {noreply, State}. -terminate(_Reason, _StateName, _State) -> - emqx_tables:delete(?FLAPPING_TAB), +handle_info({timeout, TRef, expire_flapping}, State = #{tref := TRef}) -> + with_flapping_tab(fun expire_flapping/2, + [emqx_time:now_ms(), get_policy()]), + {noreply, ensure_timer(State#{tref => undefined}), hibernate}; + +handle_info(Info, State) -> + ?LOG(error, "Unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, _State) -> ok. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- -%% @doc clean expired records in ets -clean_expired_records() -> - NowTime = emqx_time:now_secs(), - MatchSpec = [{{'$1', '$2', '$3'},[{'<', '$3', NowTime}], [true]}], - ets:select_delete(?FLAPPING_TAB, MatchSpec). +ensure_timer(State = #{time := Time, tref := undefined}) -> + State#{tref => emqx_misc:start_timer(Time, expire_flapping)}; +ensure_timer(State) -> State. + +with_flapping_tab(Fun, Args) -> + case ets:info(?FLAPPING_TAB, size) of + undefined -> ok; + 0 -> ok; + _Size -> erlang:apply(Fun, Args) + end. + +expire_flapping(NowTime, #{duration := Duration, banned_interval := Interval}) -> + ets:select_delete(?FLAPPING_TAB, + [{#flapping{started_at = '$1', banned_at = undefined, _ = '_'}, + [{'<', '$1', NowTime-Duration}], [true]}, + {#flapping{client_id = {banned, '_'}, banned_at = '$1', _ = '_'}, + [{'<', '$1', NowTime-Interval}], [true]}]). diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index c0115cea8..9795e60a3 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -34,22 +34,24 @@ , parse_result/0 ]). --type(options() :: #{max_size => 1..?MAX_PACKET_SIZE, - version => emqx_types:version() +-type(options() :: #{strict_mode => boolean(), + max_size => 1..?MAX_PACKET_SIZE, + version => emqx_types:version() }). --opaque(parse_state() :: {none, options()} | {more, cont_fun()}). +-opaque(parse_state() :: {none, options()} | cont_fun()). --opaque(parse_result() :: {ok, parse_state()} +-opaque(parse_result() :: {more, cont_fun()} | {ok, emqx_types:packet(), binary(), parse_state()}). -type(cont_fun() :: fun((binary()) -> parse_result())). -define(none(Opts), {none, Opts}). --define(more(Cont), {more, Cont}). + -define(DEFAULT_OPTIONS, - #{max_size => ?MAX_PACKET_SIZE, - version => ?MQTT_PROTO_V4 + #{strict_mode => false, + max_size => ?MAX_PACKET_SIZE, + version => ?MQTT_PROTO_V4 }). %%-------------------------------------------------------------------- @@ -78,17 +80,26 @@ parse(Bin) -> -spec(parse(binary(), parse_state()) -> parse_result()). parse(<<>>, {none, Options}) -> - {ok, ?more(fun(Bin) -> parse(Bin, {none, Options}) end)}; -parse(<>, {none, Options}) -> - parse_remaining_len(Rest, #mqtt_packet_header{type = Type, - dup = bool(Dup), - qos = fixqos(Type, QoS), - retain = bool(Retain)}, Options); -parse(Bin, {more, Cont}) when is_binary(Bin), is_function(Cont) -> + {more, fun(Bin) -> parse(Bin, {none, Options}) end}; +parse(<>, + {none, Options = #{strict_mode := StrictMode}}) -> + %% Validate header if strict mode. + StrictMode andalso validate_header(Type, Dup, QoS, Retain), + Header = #mqtt_packet_header{type = Type, + dup = bool(Dup), + qos = QoS, + retain = bool(Retain) + }, + Header1 = case fixqos(Type, QoS) of + QoS -> Header; + FixedQoS -> Header#mqtt_packet_header{qos = FixedQoS} + end, + parse_remaining_len(Rest, Header1, Options); +parse(Bin, Cont) when is_binary(Bin), is_function(Cont) -> Cont(Bin). parse_remaining_len(<<>>, Header, Options) -> - {ok, ?more(fun(Bin) -> parse_remaining_len(Bin, Header, Options) end)}; + {more, fun(Bin) -> parse_remaining_len(Bin, Header, Options) end}; parse_remaining_len(Rest, Header, Options) -> parse_remaining_len(Rest, Header, 1, 0, Options). @@ -96,7 +107,7 @@ parse_remaining_len(_Bin, _Header, _Multiplier, Length, #{max_size := MaxSize}) when Length > MaxSize -> error(mqtt_frame_too_large); parse_remaining_len(<<>>, Header, Multiplier, Length, Options) -> - {ok, ?more(fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Options) end)}; + {more, fun(Bin) -> parse_remaining_len(Bin, Header, Multiplier, Length, Options) end}; %% Match DISCONNECT without payload parse_remaining_len(<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?DISCONNECT}, 1, 0, Options) -> Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}), @@ -132,9 +143,9 @@ parse_frame(Bin, Header, Length, Options) -> {ok, packet(Header, Variable), Rest, ?none(Options)} end; TooShortBin -> - {ok, ?more(fun(BinMore) -> - parse_frame(<>, Header, Length, Options) - end)} + {more, fun(BinMore) -> + parse_frame(<>, Header, Length, Options) + end} end. packet(Header) -> @@ -189,6 +200,7 @@ parse_packet(#mqtt_packet_header{type = ?PUBLISH, qos = QoS}, Bin, ?QOS_0 -> {undefined, Rest}; _ -> parse_packet_id(Rest) end, + (PacketId =/= undefined) andalso validate_packet_id(PacketId), {Properties, Payload} = parse_properties(Rest1, Ver), {#mqtt_packet_publish{topic_name = TopicName, packet_id = PacketId, @@ -196,10 +208,12 @@ parse_packet(#mqtt_packet_header{type = ?PUBLISH, qos = QoS}, Bin, parse_packet(#mqtt_packet_header{type = PubAck}, <>, _Options) when ?PUBACK =< PubAck, PubAck =< ?PUBCOMP -> + ok = validate_packet_id(PacketId), #mqtt_packet_puback{packet_id = PacketId, reason_code = 0}; parse_packet(#mqtt_packet_header{type = PubAck}, <>, #{version := Ver = ?MQTT_PROTO_V5}) when ?PUBACK =< PubAck, PubAck =< ?PUBCOMP -> + ok = validate_packet_id(PacketId), {Properties, <<>>} = parse_properties(Rest, Ver), #mqtt_packet_puback{packet_id = PacketId, reason_code = ReasonCode, @@ -207,6 +221,7 @@ parse_packet(#mqtt_packet_header{type = PubAck}, <>, #{version := Ver}) -> + ok = validate_packet_id(PacketId), {Properties, Rest1} = parse_properties(Rest, Ver), TopicFilters = parse_topic_filters(subscribe, Rest1), #mqtt_packet_subscribe{packet_id = PacketId, @@ -215,6 +230,7 @@ parse_packet(#mqtt_packet_header{type = ?SUBSCRIBE}, <>, #{version := Ver}) -> + ok = validate_packet_id(PacketId), {Properties, Rest1} = parse_properties(Rest, Ver), #mqtt_packet_suback{packet_id = PacketId, properties = Properties, @@ -222,6 +238,7 @@ parse_packet(#mqtt_packet_header{type = ?SUBACK}, <>, #{version := Ver}) -> + ok = validate_packet_id(PacketId), {Properties, Rest1} = parse_properties(Rest, Ver), TopicFilters = parse_topic_filters(unsubscribe, Rest1), #mqtt_packet_unsubscribe{packet_id = PacketId, @@ -229,9 +246,11 @@ parse_packet(#mqtt_packet_header{type = ?UNSUBSCRIBE}, <>, _Options) -> + ok = validate_packet_id(PacketId), #mqtt_packet_unsuback{packet_id = PacketId}; parse_packet(#mqtt_packet_header{type = ?UNSUBACK}, <>, #{version := Ver}) -> + ok = validate_packet_id(PacketId), {Properties, Rest1} = parse_properties(Rest, Ver), ReasonCodes = parse_reason_codes(Rest1), #mqtt_packet_unsuback{packet_id = PacketId, @@ -260,12 +279,12 @@ parse_will_message(Packet = #mqtt_packet_connect{will_flag = true, parse_will_message(Packet, Bin) -> {Packet, Bin}. -% protocol_approved(Ver, Name) -> -% lists:member({Ver, Name}, ?PROTOCOL_NAMES). - parse_packet_id(<>) -> {PacketId, Rest}. +validate_packet_id(0) -> error(bad_packet_id); +validate_packet_id(_) -> ok. + parse_properties(Bin, Ver) when Ver =/= ?MQTT_PROTO_V5 -> {undefined, Bin}; %% TODO: version mess? @@ -336,7 +355,8 @@ parse_property(<<16#26, Bin/binary>>, Props) -> {Pair, Rest} = parse_utf8_pair(Bin), case maps:find('User-Property', Props) of {ok, UserProps} -> - parse_property(Rest,Props#{'User-Property' := [Pair|UserProps]}); + UserProps1 = lists:append(UserProps, [Pair]), + parse_property(Rest, Props#{'User-Property' := UserProps1}); error -> parse_property(Rest, Props#{'User-Property' => [Pair]}) end; @@ -357,7 +377,7 @@ parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) -> {Value + Len * Multiplier, Rest}. parse_topic_filters(subscribe, Bin) -> - [{Topic, #{rh => Rh, rap => Rap, nl => Nl, qos => QoS, rc => 0}} + [{Topic, #{rh => Rh, rap => Rap, nl => Nl, qos => validate_subqos(QoS), rc => 0}} || <> <= Bin]; parse_topic_filters(unsubscribe, Bin) -> @@ -638,6 +658,30 @@ serialize_variable_byte_integer(N) when N =< ?LOWBITS -> serialize_variable_byte_integer(N) -> <<1:1, (N rem ?HIGHBIT):7, (serialize_variable_byte_integer(N div ?HIGHBIT))/binary>>. +%% Validate header if sctrict mode. See: mqtt-v5.0: 2.1.3 Flags +validate_header(?CONNECT, 0, 0, 0) -> ok; +validate_header(?CONNACK, 0, 0, 0) -> ok; +validate_header(?PUBLISH, 0, ?QOS_0, _) -> ok; +validate_header(?PUBLISH, _, ?QOS_1, _) -> ok; +validate_header(?PUBLISH, 0, ?QOS_2, _) -> ok; +validate_header(?PUBACK, 0, 0, 0) -> ok; +validate_header(?PUBREC, 0, 0, 0) -> ok; +validate_header(?PUBREL, 0, 1, 0) -> ok; +validate_header(?PUBCOMP, 0, 0, 0) -> ok; +validate_header(?SUBSCRIBE, 0, 1, 0) -> ok; +validate_header(?SUBACK, 0, 0, 0) -> ok; +validate_header(?UNSUBSCRIBE, 0, 1, 0) -> ok; +validate_header(?UNSUBACK, 0, 0, 0) -> ok; +validate_header(?PINGREQ, 0, 0, 0) -> ok; +validate_header(?PINGRESP, 0, 0, 0) -> ok; +validate_header(?DISCONNECT, 0, 0, 0) -> ok; +validate_header(?AUTH, 0, 0, 0) -> ok; +validate_header(_Type, _Dup, _QoS, _Rt) -> error(bad_frame_header). + +validate_subqos(QoS) when ?QOS_0 =< QoS, QoS =< ?QOS_2 -> + QoS; +validate_subqos(_) -> error(bad_subqos). + bool(0) -> false; bool(1) -> true. diff --git a/src/emqx_json.erl b/src/emqx_json.erl index 665e7ad57..db35835ce 100644 --- a/src/emqx_json.erl +++ b/src/emqx_json.erl @@ -24,12 +24,22 @@ , safe_encode/2 ]). +-compile({inline, + [ encode/1 + , encode/2 + ]}). + -export([ decode/1 , decode/2 , safe_decode/1 , safe_decode/2 ]). +-compile({inline, + [ decode/1 + , decode/2 + ]}). + -spec(encode(jsx:json_term()) -> jsx:json_text()). encode(Term) -> jsx:encode(Term). diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index 802179065..036b60e10 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -42,7 +42,7 @@ %% @doc Start all listeners. -spec(start() -> ok). start() -> - lists:foreach(fun start_listener/1, emqx_config:get_env(listeners, [])). + lists:foreach(fun start_listener/1, emqx:get_env(listeners, [])). -spec(start_listener(listener()) -> {ok, pid()} | {error, term()}). start_listener({Proto, ListenOn, Options}) -> @@ -113,7 +113,7 @@ with_port({Addr, Port}, Opts = #{socket_opts := SocketOption}) -> %% @doc Restart all listeners -spec(restart() -> ok). restart() -> - lists:foreach(fun restart_listener/1, emqx_config:get_env(listeners, [])). + lists:foreach(fun restart_listener/1, emqx:get_env(listeners, [])). -spec(restart_listener(listener()) -> any()). restart_listener({Proto, ListenOn, Options}) -> @@ -136,7 +136,7 @@ restart_listener(Proto, ListenOn, _Opts) -> %% @doc Stop all listeners. -spec(stop() -> ok). stop() -> - lists:foreach(fun stop_listener/1, emqx_config:get_env(listeners, [])). + lists:foreach(fun stop_listener/1, emqx:get_env(listeners, [])). -spec(stop_listener(listener()) -> ok | {error, term()}). stop_listener({Proto, ListenOn, Opts}) -> diff --git a/src/emqx_message.erl b/src/emqx_message.erl index d0059cde0..6fbd3bbf5 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -57,7 +57,8 @@ , update_expiry/1 ]). --export([ to_map/1 +-export([ to_packet/2 + , to_map/1 , to_list/1 ]). @@ -188,6 +189,34 @@ update_expiry(Msg = #message{headers = #{'Message-Expiry-Interval' := Interval}, end; update_expiry(Msg) -> Msg. +%% @doc Message to PUBLISH Packet. +-spec(to_packet(emqx_types:packet_id(), emqx_types:message()) + -> emqx_types:packet()). +to_packet(PacketId, #message{qos = QoS, flags = Flags, headers = Headers, + topic = Topic, payload = Payload}) -> + Flags1 = if Flags =:= undefined -> #{}; + true -> Flags + end, + Dup = maps:get(dup, Flags1, false), + Retain = maps:get(retain, Flags1, false), + Publish = #mqtt_packet_publish{topic_name = Topic, + packet_id = PacketId, + properties = publish_props(Headers)}, + #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, + dup = Dup, + qos = QoS, + retain = Retain}, + variable = Publish, payload = Payload}. + +publish_props(Headers) -> + maps:with(['Payload-Format-Indicator', + 'Response-Topic', + 'Correlation-Data', + 'User-Property', + 'Subscription-Identifier', + 'Content-Type', + 'Message-Expiry-Interval'], Headers). + %% @doc Message to map -spec(to_map(emqx_types:message()) -> map()). to_map(#message{ @@ -228,3 +257,4 @@ format(flags, Flags) -> io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]); format(headers, Headers) -> io_lib:format("~p", [Headers]). + diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index c7f347dc5..db42cd1e8 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -280,7 +280,8 @@ do_inc_recv(?PUBLISH_PACKET(QoS, _PktId)) -> case QoS of ?QOS_0 -> inc('messages.qos0.received'); ?QOS_1 -> inc('messages.qos1.received'); - ?QOS_2 -> inc('messages.qos2.received') + ?QOS_2 -> inc('messages.qos2.received'); + _ -> ignore end, inc('packets.publish.received'); do_inc_recv(?PACKET(?PUBACK)) -> diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index 193fc9243..34717c804 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -20,7 +20,6 @@ -export([ merge_opts/2 , maybe_apply/2 - , run_fold/2 , run_fold/3 , pipeline/3 , start_timer/2 @@ -60,11 +59,6 @@ maybe_apply(_Fun, undefined) -> maybe_apply(Fun, Arg) when is_function(Fun) -> erlang:apply(Fun, [Arg]). -run_fold([], Acc) -> - Acc; -run_fold([Fun|More], Acc) -> - run_fold(More, Fun(Acc)). - %% @doc RunFold run_fold([], Acc, _State) -> Acc; @@ -76,18 +70,25 @@ pipeline([], Input, State) -> {ok, Input, State}; pipeline([Fun|More], Input, State) -> - case Fun(Input, State) of + case apply_fun(Fun, Input, State) of ok -> pipeline(More, Input, State); {ok, NState} -> pipeline(More, Input, NState); - {ok, NInput, NState} -> - pipeline(More, NInput, NState); + {ok, Output, NState} -> + pipeline(More, Output, NState); {error, Reason} -> {error, Reason, State}; {error, Reason, NState} -> {error, Reason, NState} end. +-compile({inline, [apply_fun/3]}). +apply_fun(Fun, Input, State) -> + case erlang:fun_info(Fun, arity) of + {arity, 1} -> Fun(Input); + {arity, 2} -> Fun(Input, State) + end. + -spec(start_timer(integer(), term()) -> reference()). start_timer(Interval, Msg) -> start_timer(Interval, self(), Msg). diff --git a/src/emqx_mod_acl_internal.erl b/src/emqx_mod_acl_internal.erl index 2629ed60a..36e4f67ff 100644 --- a/src/emqx_mod_acl_internal.erl +++ b/src/emqx_mod_acl_internal.erl @@ -79,8 +79,7 @@ reload_acl() -> %% Internal Functions %%-------------------------------------------------------------------- -acl_file() -> - emqx_config:get_env(acl_file). +acl_file() -> emqx:get_env(acl_file). lookup(PubSub, Rules) -> maps:get(PubSub, Rules, []). diff --git a/src/emqx_modules.erl b/src/emqx_modules.erl index c96cd6c15..6ba69e1fd 100644 --- a/src/emqx_modules.erl +++ b/src/emqx_modules.erl @@ -34,8 +34,7 @@ load({Mod, Env}) -> ok = Mod:load(Env), ?LOG(info, "Load ~s module successfully.", [Mod]). -modules() -> - emqx_config:get_env(modules, []). +modules() -> emqx:get_env(modules, []). %% @doc Unload all the extended modules. -spec(unload() -> ok). diff --git a/src/emqx_mqtt_props.erl b/src/emqx_mqtt_props.erl index 163d9baf1..241db9a2e 100644 --- a/src/emqx_mqtt_props.erl +++ b/src/emqx_mqtt_props.erl @@ -28,6 +28,10 @@ %% For tests -export([all/0]). +-export([ set_property/3 + , get_property/3 + ]). + -type(prop_name() :: atom()). -type(prop_id() :: pos_integer()). @@ -179,3 +183,13 @@ validate_value(_Type, _Val) -> false. -spec(all() -> map()). all() -> ?PROPS_TABLE. +set_property(Name, Value, undefined) -> + #{Name => Value}; +set_property(Name, Value, Props) -> + Props#{Name => Value}. + +get_property(_Name, undefined, Default) -> + Default; +get_property(Name, Props, Default) -> + maps:get(Name, Props, Default). + diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index 9f9fc7610..1113c15d7 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -19,142 +19,228 @@ -include("emqx.hrl"). -include("emqx_mqtt.hrl"). +%% Header APIs -export([ type/1 + , type_name/1 + , dup/1 , qos/1 + , retain/1 ]). -export([ proto_name/1 - , type_name/1 - , validate/1 - , format/1 - , to_message/2 - , from_message/2 + , proto_ver/1 + ]). + +%% Check API +-export([ check/1 + , check/2 + ]). + +-export([ to_message/2 , will_msg/1 ]). --compile(inline). +-export([format/1]). +-type(connect() :: #mqtt_packet_connect{}). +-type(publish() :: #mqtt_packet_publish{}). +-type(subscribe() :: #mqtt_packet_subscribe{}). +-type(unsubscribe() :: #mqtt_packet_unsubscribe{}). + +%%-------------------------------------------------------------------- +%% MQTT Packet Type and Flags. +%%-------------------------------------------------------------------- + +%% @doc MQTT packet type. +-spec(type(emqx_types:packet()) -> emqx_types:packet_type()). type(#mqtt_packet{header = #mqtt_packet_header{type = Type}}) -> Type. +%% @doc Name of MQTT packet type. +-spec(type_name(emqx_types:packet()) -> atom()). +type_name(Packet) when is_record(Packet, mqtt_packet) -> + lists:nth(type(Packet), ?TYPE_NAMES). + +%% @doc Dup flag of MQTT packet. +-spec(dup(emqx_types:packet()) -> boolean()). +dup(#mqtt_packet{header = #mqtt_packet_header{dup = Dup}}) -> + Dup. + +%% @doc QoS of MQTT packet type. +-spec(qos(emqx_types:packet()) -> emqx_types:qos()). qos(#mqtt_packet{header = #mqtt_packet_header{qos = QoS}}) -> QoS. -%% @doc Protocol name of the version. --spec(proto_name(emqx_types:version()) -> binary()). -proto_name(?MQTT_PROTO_V3) -> - <<"MQIsdp">>; -proto_name(?MQTT_PROTO_V4) -> - <<"MQTT">>; -proto_name(?MQTT_PROTO_V5) -> - <<"MQTT">>. - -%% @doc Name of MQTT packet type. --spec(type_name(emqx_types:packet_type()) -> atom()). -type_name(Type) when ?RESERVED < Type, Type =< ?AUTH -> - lists:nth(Type, ?TYPE_NAMES). +%% @doc Retain flag of MQTT packet. +-spec(retain(emqx_types:packet()) -> boolean()). +retain(#mqtt_packet{header = #mqtt_packet_header{retain = Retain}}) -> + Retain. %%-------------------------------------------------------------------- -%% Validate MQTT Packet +%% Protocol name and version of MQTT CONNECT Packet. %%-------------------------------------------------------------------- --spec(validate(emqx_types:packet()) -> true). -validate(?SUBSCRIBE_PACKET(_PacketId, _Properties, [])) -> - error(topic_filters_invalid); -validate(?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters)) -> - validate_packet_id(PacketId) - andalso validate_properties(?SUBSCRIBE, Properties) - andalso ok == lists:foreach(fun validate_subscription/1, TopicFilters); +%% @doc Protocol name of the CONNECT Packet. +-spec(proto_name(emqx_types:packet()|connect()) -> binary()). +proto_name(?CONNECT_PACKET(ConnPkt)) -> + proto_name(ConnPkt); +proto_name(#mqtt_packet_connect{proto_name = Name}) -> + Name. -validate(?UNSUBSCRIBE_PACKET(_PacketId, [])) -> - error(topic_filters_invalid); -validate(?UNSUBSCRIBE_PACKET(PacketId, TopicFilters)) -> - validate_packet_id(PacketId) - andalso ok == lists:foreach(fun emqx_topic:validate/1, TopicFilters); +%% @doc Protocol version of the CONNECT Packet. +-spec(proto_ver(emqx_types:packet()|connect()) -> emqx_types:version()). +proto_ver(?CONNACK_PACKET(ConnPkt)) -> + proto_ver(ConnPkt); +proto_ver(#mqtt_packet_connect{proto_ver = Ver}) -> + Ver. -validate(?PUBLISH_PACKET(_QoS, <<>>, _, #{'Topic-Alias':= _I}, _)) -> - true; -validate(?PUBLISH_PACKET(_QoS, <<>>, _, _, _)) -> - error(topic_name_invalid); -validate(?PUBLISH_PACKET(_QoS, Topic, _, Properties, _)) -> - ((not emqx_topic:wildcard(Topic)) orelse error(topic_name_invalid)) - andalso validate_properties(?PUBLISH, Properties); +%%-------------------------------------------------------------------- +%% Check MQTT Packet +%%-------------------------------------------------------------------- -validate(?CONNECT_PACKET(#mqtt_packet_connect{properties = Properties})) -> - validate_properties(?CONNECT, Properties); +%% @doc Check PubSub Packet. +-spec(check(emqx_types:packet()|publish()|subscribe()|unsubscribe()) + -> ok | {error, emqx_types:reason_code()}). +check(#mqtt_packet{variable = PubPkt}) when is_record(PubPkt, mqtt_packet_publish) -> + check(PubPkt); -validate(_Packet) -> - true. +check(#mqtt_packet{variable = SubPkt}) when is_record(SubPkt, mqtt_packet_subscribe) -> + check(SubPkt); -validate_packet_id(0) -> - error(packet_id_invalid); -validate_packet_id(_) -> - true. +check(#mqtt_packet{variable = UnsubPkt}) when is_record(UnsubPkt, mqtt_packet_unsubscribe) -> + check(UnsubPkt); -validate_properties(?SUBSCRIBE, #{'Subscription-Identifier' := I}) - when I =< 0; I >= 16#FFFFFFF -> - error(subscription_identifier_invalid); -validate_properties(?PUBLISH, #{'Topic-Alias':= 0}) -> - error(topic_alias_invalid); -validate_properties(?PUBLISH, #{'Subscription-Identifier' := _I}) -> - error(protocol_error); -validate_properties(?PUBLISH, #{'Response-Topic' := ResponseTopic}) -> - case emqx_topic:wildcard(ResponseTopic) of - true -> - error(protocol_error); - false -> - true +check(#mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias':= _TopicAlias}}) -> + ok; +check(#mqtt_packet_publish{topic_name = <<>>, properties = #{}}) -> + {error, ?RC_PROTOCOL_ERROR}; +check(#mqtt_packet_publish{topic_name = TopicName, properties = Props}) -> + try emqx_topic:validate(name, TopicName) of + true -> check_pub_props(Props) + catch + error:_Error -> + {error, ?RC_TOPIC_NAME_INVALID} end; -validate_properties(?CONNECT, #{'Receive-Maximum' := 0}) -> - error(protocol_error); -validate_properties(?CONNECT, #{'Request-Response-Information' := ReqRespInfo}) - when ReqRespInfo =/= 0, ReqRespInfo =/= 1 -> - error(protocol_error); -validate_properties(?CONNECT, #{'Request-Problem-Information' := ReqProInfo}) + +check(#mqtt_packet_subscribe{properties = #{'Subscription-Identifier' := I}}) + when I =< 0; I >= 16#FFFFFFF -> + {error, ?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED}; + +check(#mqtt_packet_subscribe{topic_filters = []}) -> + {error, ?RC_TOPIC_FILTER_INVALID}; + +check(#mqtt_packet_subscribe{topic_filters = TopicFilters}) -> + try validate_topic_filters(TopicFilters) + catch + error:_Error -> + {error, ?RC_TOPIC_FILTER_INVALID} + end; + +check(#mqtt_packet_unsubscribe{topic_filters = []}) -> + {error, ?RC_TOPIC_FILTER_INVALID}; + +check(#mqtt_packet_unsubscribe{topic_filters = TopicFilters}) -> + try validate_topic_filters(TopicFilters) + catch + error:_Error -> + {error, ?RC_TOPIC_FILTER_INVALID} + end. + +check_pub_props(#{'Topic-Alias' := 0}) -> + {error, ?RC_TOPIC_ALIAS_INVALID}; + +check_pub_props(#{'Subscription-Identifier' := 0}) -> + {error, ?RC_PROTOCOL_ERROR}; + +check_pub_props(#{'Response-Topic' := ResponseTopic}) -> + try emqx_topic:validate(name, ResponseTopic) of + true -> ok + catch + error:_Error -> + {error, ?RC_PROTOCOL_ERROR} + end; +check_pub_props(_Props) -> ok. + +%% @doc Check CONNECT Packet. +-spec(check(emqx_types:packet()|connect(), Opts :: map()) + -> ok | {error, emqx_types:reason_code()}). +check(?CONNECT_PACKET(ConnPkt), Opts) -> + check(ConnPkt, Opts); +check(ConnPkt, Opts) when is_record(ConnPkt, mqtt_packet_connect) -> + run_checks([fun check_proto_ver/2, + fun check_client_id/2, + fun check_conn_props/2, + fun check_will_msg/2], ConnPkt, Opts). + +check_proto_ver(#mqtt_packet_connect{proto_ver = Ver, + proto_name = Name}, _Opts) -> + case lists:member({Ver, Name}, ?PROTOCOL_NAMES) of + true -> ok; + false -> {error, ?RC_UNSUPPORTED_PROTOCOL_VERSION} + end. + +%% MQTT3.1 does not allow null clientId +check_client_id(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V3, + client_id = <<>>}, _Opts) -> + {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}; +%% Issue#599: Null clientId and clean_start = false +check_client_id(#mqtt_packet_connect{client_id = <<>>, + clean_start = false}, _Opts) -> + {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}; +check_client_id(#mqtt_packet_connect{client_id = <<>>, + clean_start = true}, _Opts) -> + ok; +check_client_id(#mqtt_packet_connect{client_id = ClientId}, + _Opts = #{max_clientid_len := MaxLen}) -> + case (1 =< (Len = byte_size(ClientId))) andalso (Len =< MaxLen) of + true -> ok; + false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID} + end. + +check_conn_props(#mqtt_packet_connect{properties = undefined}, _Opts) -> + ok; +check_conn_props(#mqtt_packet_connect{properties = #{'Receive-Maximum' := 0}}, _Opts) -> + {error, ?RC_PROTOCOL_ERROR}; +check_conn_props(#mqtt_packet_connect{properties = #{'Request-Response-Information' := ReqRespInfo}}, _Opts) + when ReqRespInfo =/= 0, ReqRespInfo =/= 1 -> + {error, ?RC_PROTOCOL_ERROR}; +check_conn_props(#mqtt_packet_connect{properties = #{'Request-Problem-Information' := ReqProInfo}}, _Opts) when ReqProInfo =/= 0, ReqProInfo =/= 1 -> - error(protocol_error); -validate_properties(_, _) -> - true. + {error, ?RC_PROTOCOL_ERROR}; +check_conn_props(_ConnPkt, _Opts) -> ok. -validate_subscription({Topic, #{qos := QoS}}) -> - emqx_topic:validate(filter, Topic) andalso validate_qos(QoS). +check_will_msg(#mqtt_packet_connect{will_flag = false}, _Caps) -> + ok; +check_will_msg(#mqtt_packet_connect{will_retain = true}, + _Opts = #{mqtt_retain_available := false}) -> + {error, ?RC_RETAIN_NOT_SUPPORTED}; +check_will_msg(#mqtt_packet_connect{will_topic = WillTopic}, _Opts) -> + try emqx_topic:validate(name, WillTopic) of + true -> ok + catch error:_Error -> + {error, ?RC_TOPIC_NAME_INVALID} + end. -validate_qos(QoS) when ?QOS_0 =< QoS, QoS =< ?QOS_2 -> - true; -validate_qos(_) -> error(bad_qos). +run_checks([], _Packet, _Options) -> + ok; +run_checks([Check|More], Packet, Options) -> + case Check(Packet, Options) of + ok -> run_checks(More, Packet, Options); + {error, Reason} -> {error, Reason} + end. -%% @doc From message to packet --spec(from_message(emqx_types:packet_id(), emqx_types:message()) - -> emqx_types:packet()). -from_message(PacketId, #message{qos = QoS, flags = Flags, headers = Headers, - topic = Topic, payload = Payload}) -> - Flags1 = if Flags =:= undefined -> - #{}; - true -> Flags - end, - Dup = maps:get(dup, Flags1, false), - Retain = maps:get(retain, Flags1, false), - Publish = #mqtt_packet_publish{topic_name = Topic, - packet_id = PacketId, - properties = publish_props(Headers)}, - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, - dup = Dup, - qos = QoS, - retain = Retain}, - variable = Publish, payload = Payload}. +%% @doc Validate MQTT Packet +%% @private +validate_topic_filters(TopicFilters) -> + lists:foreach( + fun({TopicFilter, _SubOpts}) -> + emqx_topic:validate(TopicFilter); + (TopicFilter) -> + emqx_topic:validate(TopicFilter) + end, TopicFilters). -publish_props(Headers) -> - maps:with(['Payload-Format-Indicator', - 'Response-Topic', - 'Correlation-Data', - 'User-Property', - 'Subscription-Identifier', - 'Content-Type', - 'Message-Expiry-Interval'], Headers). - -%% @doc Message from Packet --spec(to_message(emqx_types:client(), emqx_ypes:packet()) - -> emqx_types:message()). +%% @doc Publish Packet to Message. +-spec(to_message(emqx_types:client(), emqx_ypes:packet()) -> emqx_types:message()). to_message(#{client_id := ClientId, username := Username, peername := Peername}, #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, retain = Retain, @@ -198,9 +284,10 @@ format_header(#mqtt_packet_header{type = Type, retain = Retain}, S) -> S1 = if S == undefined -> <<>>; - true -> [", ", S] + true -> [", ", S] end, - io_lib:format("~s(Q~p, R~p, D~p~s)", [type_name(Type), QoS, i(Retain), i(Dup), S1]). + io_lib:format("~s(Q~p, R~p, D~p~s)", + [lists:nth(Type, ?TYPE_NAMES), QoS, i(Retain), i(Dup), S1]). format_variable(undefined, _) -> undefined; diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index e75a70e83..4a597fda6 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -39,7 +39,7 @@ %% @doc Init plugins' config -spec(init() -> ok). init() -> - case emqx_config:get_env(plugins_etc_dir) of + case emqx:get_env(plugins_etc_dir) of undefined -> ok; PluginsEtc -> CfgFiles = [filename:join(PluginsEtc, File) || @@ -57,16 +57,15 @@ init_config(CfgFile) -> -spec(load() -> list() | {error, term()}). load() -> load_expand_plugins(), - case emqx_config:get_env(plugins_loaded_file) of - undefined -> %% No plugins available - ignore; + case emqx:get_env(plugins_loaded_file) of + undefined -> ignore; %% No plugins available File -> ensure_file(File), with_loaded_file(File, fun(Names) -> load_plugins(Names, false) end) end. load_expand_plugins() -> - case emqx_config:get_env(expand_plugins_dir) of + case emqx:get_env(expand_plugins_dir) of undefined -> ok; ExpandPluginsDir -> Plugins = filelib:wildcard("*", ExpandPluginsDir), @@ -138,9 +137,8 @@ load_plugins(Names, Persistent) -> %% @doc Unload all plugins before broker stopped. -spec(unload() -> list() | {error, term()}). unload() -> - case emqx_config:get_env(plugins_loaded_file) of - undefined -> - ignore; + case emqx:get_env(plugins_loaded_file) of + undefined -> ignore; File -> with_loaded_file(File, fun stop_plugins/1) end. @@ -300,7 +298,7 @@ plugin_unloaded(Name, true) -> end. read_loaded() -> - case emqx_config:get_env(plugins_loaded_file) of + case emqx:get_env(plugins_loaded_file) of undefined -> {error, not_found}; File -> read_loaded(File) end. @@ -308,7 +306,7 @@ read_loaded() -> read_loaded(File) -> file:consult(File). write_loaded(AppNames) -> - FilePath = emqx_config:get_env(plugins_loaded_file), + FilePath = emqx:get_env(plugins_loaded_file), case file:write_file(FilePath, [io_lib:format("~p.~n", [Name]) || Name <- AppNames]) of ok -> ok; {error, Error} -> diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 7f9d69666..54ff6056c 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -20,7 +20,7 @@ -include("types.hrl"). -include("emqx_mqtt.hrl"). --export([ init/1 +-export([ init/2 , info/1 , info/2 , attrs/1 @@ -48,10 +48,10 @@ username :: emqx_types:username(), %% MQTT Will Msg will_msg :: emqx_types:message(), - %% MQTT Conn Properties - conn_props :: maybe(emqx_types:properties()), %% MQTT Topic Aliases - topic_aliases :: maybe(map()) + topic_aliases :: maybe(map()), + %% MQTT Topic Alias Maximum + alias_maximum :: maybe(map()) }). -opaque(protocol() :: #protocol{}). @@ -60,23 +60,24 @@ -define(ATTR_KEYS, [proto_name, proto_ver, clean_start, keepalive]). --spec(init(#mqtt_packet_connect{}) -> protocol()). +-spec(init(#mqtt_packet_connect{}, atom()) -> protocol()). init(#mqtt_packet_connect{proto_name = ProtoName, proto_ver = ProtoVer, clean_start = CleanStart, keepalive = Keepalive, properties = Properties, client_id = ClientId, - username = Username} = ConnPkt) -> + username = Username} = ConnPkt, Zone) -> WillMsg = emqx_packet:will_msg(ConnPkt), - #protocol{proto_name = ProtoName, - proto_ver = ProtoVer, - clean_start = CleanStart, - keepalive = Keepalive, - client_id = ClientId, - username = Username, - will_msg = WillMsg, - conn_props = Properties + #protocol{proto_name = ProtoName, + proto_ver = ProtoVer, + clean_start = CleanStart, + keepalive = Keepalive, + client_id = ClientId, + username = Username, + will_msg = WillMsg, + alias_maximum = #{outbound => emqx_mqtt_props:get_property('Topic-Alias-Maximum', Properties, 0), + inbound => maps:get(max_topic_alias, emqx_mqtt_caps:get_caps(Zone), 0)} }. -spec(info(protocol()) -> emqx_types:infos()). @@ -104,10 +105,10 @@ info(will_delay_interval, #protocol{will_msg = undefined}) -> 0; info(will_delay_interval, #protocol{will_msg = WillMsg}) -> emqx_message:get_header('Will-Delay-Interval', WillMsg, 0); -info(conn_props, #protocol{conn_props = ConnProps}) -> - ConnProps; info(topic_aliases, #protocol{topic_aliases = Aliases}) -> - Aliases. + Aliases; +info(alias_maximum, #protocol{alias_maximum = AliasMaximum}) -> + AliasMaximum. -spec(attrs(protocol()) -> emqx_types:attrs()). attrs(Proto) -> @@ -128,4 +129,5 @@ save_alias(AliasId, Topic, Proto = #protocol{topic_aliases = Aliases}) -> Proto#protocol{topic_aliases = maps:put(AliasId, Topic, Aliases)}. clear_will_msg(Protocol) -> - Protocol#protocol{will_msg = undefined}. \ No newline at end of file + Protocol#protocol{will_msg = undefined}. + diff --git a/src/emqx_session.erl b/src/emqx_session.erl index d569386cc..59cc71ea9 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -437,7 +437,13 @@ dequeue(Cnt, Msgs, Q) -> case emqx_mqueue:out(Q) of {empty, _Q} -> {Msgs, Q}; {{value, Msg}, Q1} -> - dequeue(Cnt-1, [Msg|Msgs], Q1) + case emqx_message:is_expired(Msg) of + true -> + ok = emqx_metrics:inc('messages.expired'), + dequeue(Cnt-1, Msgs, Q1); + false -> + dequeue(Cnt-1, [Msg|Msgs], Q1) + end end. batch_n(Inflight) -> diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index 66be8ee81..71a20b20c 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -127,11 +127,11 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) -> -spec(strategy() -> random | round_robin | sticky | hash). strategy() -> - emqx_config:get_env(shared_subscription_strategy, round_robin). + emqx:get_env(shared_subscription_strategy, round_robin). -spec(ack_enabled() -> boolean()). ack_enabled() -> - emqx_config:get_env(shared_dispatch_ack_enabled, false). + emqx:get_env(shared_dispatch_ack_enabled, false). do_dispatch(SubPid, Topic, Msg, _Type) when SubPid =:= self() -> %% Deadlock otherwise diff --git a/src/emqx_sup.erl b/src/emqx_sup.erl index d3a81fc7f..f95e79b6d 100644 --- a/src/emqx_sup.erl +++ b/src/emqx_sup.erl @@ -62,18 +62,21 @@ stop_child(ChildId) -> %%-------------------------------------------------------------------- init([]) -> - %% Kernel Sup KernelSup = child_spec(emqx_kernel_sup, supervisor), - %% Router Sup RouterSup = child_spec(emqx_router_sup, supervisor), - %% Broker Sup BrokerSup = child_spec(emqx_broker_sup, supervisor), - %% CM Sup CMSup = child_spec(emqx_cm_sup, supervisor), - %% Sys Sup SysSup = child_spec(emqx_sys_sup, supervisor), - {ok, {{one_for_all, 0, 1}, - [KernelSup, RouterSup, BrokerSup, CMSup, SysSup]}}. + Childs = [KernelSup] ++ + [RouterSup || emqx_boot:is_enabled(router)] ++ + [BrokerSup || emqx_boot:is_enabled(broker)] ++ + [CMSup || emqx_boot:is_enabled(broker)] ++ + [SysSup], + SupFlags = #{strategy => one_for_all, + intensity => 0, + period => 1 + }, + {ok, {SupFlags, Childs}}. %%-------------------------------------------------------------------- %% Internal functions diff --git a/src/emqx_sys.erl b/src/emqx_sys.erl index 41b27729b..66c3d2dc1 100644 --- a/src/emqx_sys.erl +++ b/src/emqx_sys.erl @@ -111,12 +111,12 @@ datetime() -> %% @doc Get sys interval -spec(sys_interval() -> pos_integer()). sys_interval() -> - emqx_config:get_env(broker_sys_interval, 60000). + emqx:get_env(broker_sys_interval, 60000). %% @doc Get sys heatbeat interval -spec(sys_heatbeat_interval() -> pos_integer()). sys_heatbeat_interval() -> - emqx_config:get_env(broker_sys_heartbeat, 30000). + emqx:get_env(broker_sys_heartbeat, 30000). %% @doc Get sys info -spec(info() -> list(tuple())). diff --git a/src/emqx_sys_mon.erl b/src/emqx_sys_mon.erl index f63dd95f6..7d6e5dd6c 100644 --- a/src/emqx_sys_mon.erl +++ b/src/emqx_sys_mon.erl @@ -166,7 +166,7 @@ code_change(_OldVsn, State, _Extra) -> handle_partition_event({partition, {occurred, Node}}) -> alarm_handler:set_alarm({partitioned, Node}); -handle_partition_event({partition, {healed, Node}}) -> +handle_partition_event({partition, {healed, _Node}}) -> alarm_handler:clear_alarm(partitioned). suppress(Key, SuccFun, State = #{events := Events}) -> diff --git a/src/emqx_sys_sup.erl b/src/emqx_sys_sup.erl index cbc07650e..7a94c4288 100644 --- a/src/emqx_sys_sup.erl +++ b/src/emqx_sys_sup.erl @@ -48,6 +48,5 @@ child_spec(Mod, Args) -> modules => [Mod] }. -config(Name) -> - emqx_config:get_env(Name, []). +config(Name) -> emqx:get_env(Name, []). diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 4e6baebdf..6ed7cca63 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -76,7 +76,7 @@ info(WsPid) when is_pid(WsPid) -> info(WsConn = #ws_connection{chan_state = ChanState}) -> ConnInfo = info(?INFO_KEYS, WsConn), ChanInfo = emqx_channel:info(ChanState), - maps:merge(ChanInfo, #{connection => maps:from_list(ConnInfo)}). + maps:merge(ChanInfo, #{conninfo => maps:from_list(ConnInfo)}). info(Keys, WsConn) when is_list(Keys) -> [{Key, info(Key, WsConn)} || Key <- Keys]; @@ -97,7 +97,7 @@ attrs(WsPid) when is_pid(WsPid) -> attrs(WsConn = #ws_connection{chan_state = ChanState}) -> ConnAttrs = info(?ATTR_KEYS, WsConn), ChanAttrs = emqx_channel:attrs(ChanState), - maps:merge(ChanAttrs, #{connection => maps:from_list(ConnAttrs)}). + maps:merge(ChanAttrs, #{conninfo => maps:from_list(ConnAttrs)}). -spec(stats(pid()|ws_connection()) -> emqx_types:stats()). stats(WsPid) when is_pid(WsPid) -> @@ -192,7 +192,8 @@ websocket_init([Req, Opts]) -> fsm_state = idle, parse_state = ParseState, chan_state = ChanState, - pendings = []}}. + pendings = [], + serialize = serialize_fun(?MQTT_PROTO_V5, undefined)}}. websocket_handle({binary, Data}, State) when is_list(Data) -> websocket_handle({binary, iolist_to_binary(Data)}, State); @@ -255,8 +256,9 @@ websocket_info({cast, Msg}, State = #ws_connection{chan_state = ChanState}) -> websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State = #ws_connection{fsm_state = idle}) -> - #mqtt_packet_connect{proto_ver = ProtoVer} = ConnPkt, - NState = State#ws_connection{serialize = serialize_fun(ProtoVer)}, + #mqtt_packet_connect{proto_ver = ProtoVer, properties = Properties} = ConnPkt, + MaxPacketSize = emqx_mqtt_props:get_property('Maximum-Packet-Size', Properties, undefined), + NState = State#ws_connection{serialize = serialize_fun(ProtoVer, MaxPacketSize)}, handle_incoming(Packet, fun connected/1, NState); websocket_info({incoming, Packet}, State = #ws_connection{fsm_state = idle}) -> @@ -336,9 +338,10 @@ handle_timeout(TRef, Msg, State = #ws_connection{chan_state = ChanState}) -> process_incoming(<<>>, State) -> {ok, State}; -process_incoming(Data, State = #ws_connection{parse_state = ParseState, chan_state = ChanState}) -> +process_incoming(Data, State = #ws_connection{parse_state = ParseState, + chan_state = ChanState}) -> try emqx_frame:parse(Data, ParseState) of - {ok, NParseState} -> + {more, NParseState} -> {ok, State#ws_connection{parse_state = NParseState}}; {ok, Packet, Rest, NParseState} -> self() ! {incoming, Packet}, @@ -348,14 +351,24 @@ process_incoming(Data, State = #ws_connection{parse_state = ParseState, chan_sta stop(Reason, State) catch error:Reason:Stk -> - ?LOG(error, "Parse failed for ~p~n\ - Stacktrace:~p~nFrame data: ~p", [Reason, Stk, Data]), - case emqx_channel:handle_out({disconnect, emqx_reason_codes:mqtt_frame_error(Reason)}, ChanState) of + ?LOG(error, "Parse failed for ~p~nStacktrace:~p~nFrame data: ~p", [Reason, Stk, Data]), + Result = + case emqx_channel:info(connected, ChanState) of + undefined -> + emqx_channel:handle_out({connack, emqx_reason_codes:mqtt_frame_error(Reason)}, ChanState); + true -> + emqx_channel:handle_out({disconnect, emqx_reason_codes:mqtt_frame_error(Reason)}, ChanState); + _ -> + ignore + end, + case Result of {stop, Reason0, OutPackets, NChanState} -> NState = State#ws_connection{chan_state = NChanState}, stop(Reason0, enqueue(OutPackets, NState)); {stop, Reason0, NChanState} -> - stop(Reason0, State#ws_connection{chan_state = NChanState}) + stop(Reason0, State#ws_connection{chan_state = NChanState}); + ignore -> + {ok, State} end end. @@ -394,12 +407,19 @@ handle_outgoing(Packets, State = #ws_connection{serialize = Serialize, %%-------------------------------------------------------------------- %% Serialize fun -serialize_fun(ProtoVer) -> +serialize_fun(ProtoVer, MaxPacketSize) -> fun(Packet = ?PACKET(Type)) -> - ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]), - _ = inc_outgoing_stats(Type), - _ = emqx_metrics:inc_sent(Packet), - emqx_frame:serialize(Packet, ProtoVer) + IoData = emqx_frame:serialize(Packet, ProtoVer), + case Type =/= ?PUBLISH orelse MaxPacketSize =:= undefined orelse iolist_size(IoData) =< MaxPacketSize of + true -> + ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]), + _ = inc_outgoing_stats(Type), + _ = emqx_metrics:inc_sent(Packet), + IoData; + false -> + ?LOG(warning, "DROP ~s due to oversize packet size", [emqx_packet:format(Packet)]), + <<"">> + end end. %%-------------------------------------------------------------------- diff --git a/src/emqx_zone.erl b/src/emqx_zone.erl index bac32b849..2b696c937 100644 --- a/src/emqx_zone.erl +++ b/src/emqx_zone.erl @@ -25,7 +25,13 @@ -logger_header("[Zone]"). %% APIs --export([start_link/0]). +-export([start_link/0, stop/0]). + +-export([ use_username_as_clientid/1 + , enable_acl/1 + , enable_banned/1 + , enable_flapping_detect/1 + ]). -export([ get_env/2 , get_env/3 @@ -34,9 +40,6 @@ , force_reload/0 ]). -%% for test --export([stop/0]). - %% gen_server callbacks -export([ init/1 , handle_call/3 @@ -65,19 +68,34 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +-spec(use_username_as_clientid(zone()) -> boolean()). +use_username_as_clientid(Zone) -> + get_env(Zone, use_username_as_clientid, false). + +-spec(enable_acl(zone()) -> boolean()). +enable_acl(Zone) -> + get_env(Zone, enable_acl, true). + +-spec(enable_banned(zone()) -> boolean()). +enable_banned(Zone) -> + get_env(Zone, enable_banned, false). + +-spec(enable_flapping_detect(zone()) -> boolean()). +enable_flapping_detect(Zone) -> + get_env(Zone, enable_flapping_detect, false). + -spec(get_env(maybe(zone()), atom()) -> maybe(term())). -get_env(undefined, Key) -> - emqx_config:get_env(Key); +get_env(undefined, Key) -> emqx:get_env(Key); get_env(Zone, Key) -> get_env(Zone, Key, undefined). -spec(get_env(maybe(zone()), atom(), term()) -> maybe(term())). get_env(undefined, Key, Def) -> - emqx_config:get_env(Key, Def); + emqx:get_env(Key, Def); get_env(Zone, Key, Def) -> try persistent_term:get(?KEY(Zone, Key)) catch error:badarg -> - emqx_config:get_env(Key, Def) + emqx:get_env(Key, Def) end. -spec(set_env(zone(), atom(), term()) -> ok). @@ -132,5 +150,5 @@ code_change(_OldVsn, State, _Extra) -> do_reload() -> [persistent_term:put(?KEY(Zone, Key), Val) - || {Zone, Opts} <- emqx_config:get_env(zones, []), {Key, Val} <- Opts]. + || {Zone, Opts} <- emqx:get_env(zones, []), {Key, Val} <- Opts]. diff --git a/test/emqx_SUITE.erl b/test/emqx_SUITE.erl index c26028c76..563683bf1 100644 --- a/test/emqx_SUITE.erl +++ b/test/emqx_SUITE.erl @@ -32,6 +32,14 @@ init_per_suite(Config) -> end_per_suite(_Config) -> emqx_ct_helpers:stop_apps([]). +t_get_env(_) -> + ?assertEqual(undefined, emqx:get_env(undefined_key)), + ?assertEqual(default_value, emqx:get_env(undefined_key, default_value)), + application:set_env(emqx, undefined_key, hello), + ?assertEqual(hello, emqx:get_env(undefined_key)), + ?assertEqual(hello, emqx:get_env(undefined_key, default_value)), + application:unset_env(emqx, undefined_key). + t_emqx_pubsub_api(_) -> emqx:start(), true = emqx:is_running(node()), @@ -42,6 +50,7 @@ t_emqx_pubsub_api(_) -> Payload = <<"Hello World">>, Topic1 = <<"mytopic1">>, emqx:subscribe(Topic, ClientId), + ct:sleep(100), ?assertEqual([Topic], emqx:topics()), ?assertEqual([self()], emqx:subscribers(Topic)), ?assertEqual([{Topic,#{qos => 0,subid => ClientId}}], emqx:subscriptions(self())), diff --git a/test/emqx_access_SUITE.erl b/test/emqx_access_SUITE.erl index c98973d22..f276e4929 100644 --- a/test/emqx_access_SUITE.erl +++ b/test/emqx_access_SUITE.erl @@ -63,6 +63,7 @@ groups() -> }]. init_per_suite(Config) -> + emqx_ct_helpers:boot_modules([router, broker]), emqx_ct_helpers:start_apps([]), Config. diff --git a/test/emqx_alarm_handler_SUITE.erl b/test/emqx_alarm_handler_SUITE.erl index 6dd367b4a..3e818f007 100644 --- a/test/emqx_alarm_handler_SUITE.erl +++ b/test/emqx_alarm_handler_SUITE.erl @@ -26,6 +26,7 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> + emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:start_apps([], fun set_special_configs/1), Config. diff --git a/test/emqx_config_SUITE.erl b/test/emqx_boot_SUITE.erl similarity index 51% rename from test/emqx_config_SUITE.erl rename to test/emqx_boot_SUITE.erl index 5014e3ef3..c4a9ffc39 100644 --- a/test/emqx_config_SUITE.erl +++ b/test/emqx_boot_SUITE.erl @@ -14,7 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_config_SUITE). +-module(emqx_boot_SUITE). -compile(export_all). -compile(nowarn_export_all). @@ -23,17 +23,21 @@ all() -> emqx_ct:all(?MODULE). -init_per_suite(Config) -> - emqx_ct_helpers:start_apps([]), - Config. +t_is_enabled(_) -> + ok = application:set_env(emqx, boot_modules, all), + ?assert(emqx_boot:is_enabled(router)), + ?assert(emqx_boot:is_enabled(broker)), + ?assert(emqx_boot:is_enabled(listeners)), + ok = application:set_env(emqx, boot_modules, [router]), + ?assert(emqx_boot:is_enabled(router)), + ?assertNot(emqx_boot:is_enabled(broker)), + ?assertNot(emqx_boot:is_enabled(listeners)), + ok = application:set_env(emqx, boot_modules, [router, broker]), + ?assert(emqx_boot:is_enabled(router)), + ?assert(emqx_boot:is_enabled(broker)), + ?assertNot(emqx_boot:is_enabled(listeners)), + ok = application:set_env(emqx, boot_modules, [router, broker, listeners]), + ?assert(emqx_boot:is_enabled(router)), + ?assert(emqx_boot:is_enabled(broker)), + ?assert(emqx_boot:is_enabled(listeners)). -end_per_suite(_Config) -> - emqx_ct_helpers:stop_apps([]). - -t_get_env(_) -> - ?assertEqual(undefined, emqx_config:get_env(undefined_key)), - ?assertEqual(default_value, emqx_config:get_env(undefined_key, default_value)), - application:set_env(emqx, undefined_key, hello), - ?assertEqual(hello, emqx_config:get_env(undefined_key)), - ?assertEqual(hello, emqx_config:get_env(undefined_key, default_value)), - application:unset_env(emqx, undefined_key). \ No newline at end of file diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index 89832e2de..9d6372366 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -49,6 +49,7 @@ groups() -> }]. init_per_suite(Config) -> + emqx_ct_helpers:boot_modules([router, broker]), emqx_ct_helpers:start_apps([]), Config. diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index b06bc0b97..82f52c11b 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -31,6 +31,7 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> + emqx_ct_helpers:boot_modules([router, broker]), emqx_ct_helpers:start_apps([]), Config. @@ -288,7 +289,7 @@ with_channel(Fun) -> username = <<"username">>, password = <<"passwd">> }, - Protocol = emqx_protocol:init(ConnPkt), + Protocol = emqx_protocol:init(ConnPkt, testing), Session = emqx_session:init(#{zone => testing}, #{max_inflight => 100, expiry_interval => 0 diff --git a/test/emqx_client_SUITE.erl b/test/emqx_client_SUITE.erl index 660892112..19557143b 100644 --- a/test/emqx_client_SUITE.erl +++ b/test/emqx_client_SUITE.erl @@ -70,6 +70,7 @@ groups() -> ]. init_per_suite(Config) -> + emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:start_apps([]), Config. @@ -96,6 +97,7 @@ t_cm(_) -> ClientId = <<"myclient">>, {ok, C} = emqtt:start_link([{client_id, ClientId}]), {ok, _} = emqtt:connect(C), + ct:sleep(50), #{client := #{client_id := ClientId}} = emqx_cm:get_chan_attrs(ClientId), emqtt:subscribe(C, <<"mytopic">>, 0), ct:sleep(1200), diff --git a/test/emqx_connection_SUITE.erl b/test/emqx_connection_SUITE.erl index 8e595b8b2..d321d2c85 100644 --- a/test/emqx_connection_SUITE.erl +++ b/test/emqx_connection_SUITE.erl @@ -24,6 +24,7 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> + emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:start_apps([]), Config. diff --git a/test/emqx_ctl_SUITE.erl b/test/emqx_ctl_SUITE.erl index b4c3d953d..5e18f19c2 100644 --- a/test/emqx_ctl_SUITE.erl +++ b/test/emqx_ctl_SUITE.erl @@ -25,6 +25,7 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> + emqx_ct_helpers:boot_modules([]), emqx_ct_helpers:start_apps([]), Config. @@ -51,4 +52,3 @@ test(_) -> io:format("Hello world"). - diff --git a/test/emqx_flapping_SUITE.erl b/test/emqx_flapping_SUITE.erl index a36d21c2a..493a57b6e 100644 --- a/test/emqx_flapping_SUITE.erl +++ b/test/emqx_flapping_SUITE.erl @@ -22,34 +22,34 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> - emqx_ct_helpers:start_apps([]), - prepare_for_test(), + prepare_env(), Config. +prepare_env() -> + emqx_zone:set_env(external, enable_flapping_detect, true), + application:set_env(emqx, flapping_detect_policy, + #{threshold => 3, + duration => 100, + banned_interval => 200 + }). + end_per_suite(_Config) -> - emqx_ct_helpers:stop_apps([]). + ok. -%% t_flapping(_Config) -> -%% process_flag(trap_exit, true), -%% flapping_connect(5), -%% {ok, C} = emqtt:start_link([{client_id, <<"Client">>}]), -%% {error, _} = emqtt:connect(C), -%% receive -%% {'EXIT', Client, _Reason} -> -%% ct:log("receive exit signal, Client: ~p", [Client]) -%% after 1000 -> -%% ct:log("timeout") -%% end. +t_detect_check(_) -> + {ok, _Pid} = emqx_flapping:start_link(), + Client = #{zone => external, + client_id => <<"clientid">>, + peername => {{127,0,0,1}, 5000} + }, + false = emqx_flapping:detect(Client), + false = emqx_flapping:check(Client), + false = emqx_flapping:detect(Client), + false = emqx_flapping:check(Client), + true = emqx_flapping:detect(Client), + timer:sleep(50), + true = emqx_flapping:check(Client), + timer:sleep(300), + false = emqx_flapping:check(Client), + ok = emqx_flapping:stop(). -flapping_connect(Times) -> - lists:foreach(fun do_connect/1, lists:seq(1, Times)). - -do_connect(_I) -> - {ok, C} = emqtt:start_link([{client_id, <<"Client">>}]), - {ok, _} = emqtt:connect(C), - ok = emqtt:disconnect(C). - -prepare_for_test() -> - ok = emqx_zone:set_env(external, enable_flapping_detect, true), - ok = emqx_zone:set_env(external, flapping_threshold, {10, 60}), - ok = emqx_zone:set_env(external, flapping_expiry_interval, 3600). diff --git a/test/emqx_frame_SUITE.erl b/test/emqx_frame_SUITE.erl index cde76bfb3..e1991c359 100644 --- a/test/emqx_frame_SUITE.erl +++ b/test/emqx_frame_SUITE.erl @@ -20,10 +20,17 @@ -compile(nowarn_export_all). -include("emqx_mqtt.hrl"). +-include_lib("proper/include/proper.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("emqx_ct_helpers/include/emqx_ct.hrl"). + +%%-define(PROPTEST(F), ?assert(proper:quickcheck(F()))). +-define(PROPTEST(F), ?assert(proper:quickcheck(F(), [{to_file, user}]))). all() -> - [{group, connect}, + [{group, parse}, + {group, connect}, {group, connack}, {group, publish}, {group, puback}, @@ -36,7 +43,11 @@ all() -> {group, auth}]. groups() -> - [{connect, [parallel], + [{parse, [parallel], + [t_parse_cont, + t_parse_frame_too_large + ]}, + {connect, [parallel], [t_serialize_parse_connect, t_serialize_parse_v3_connect, t_serialize_parse_v4_connect, @@ -57,6 +68,7 @@ groups() -> ]}, {puback, [parallel], [t_serialize_parse_puback, + t_serialize_parse_puback_v3_4, t_serialize_parse_puback_v5, t_serialize_parse_pubrec, t_serialize_parse_pubrec_v5, @@ -105,19 +117,48 @@ init_per_group(_Group, Config) -> end_per_group(_Group, _Config) -> ok. +t_parse_cont(_) -> + Packet = ?CONNECT_PACKET(#mqtt_packet_connect{}), + ParseState = emqx_frame:initial_parse_state(), + <> = serialize_to_binary(Packet), + {more, ContParse} = emqx_frame:parse(<<>>, ParseState), + {more, ContParse1} = emqx_frame:parse(HdrBin, ContParse), + {more, ContParse2} = emqx_frame:parse(LenBin, ContParse1), + {more, ContParse3} = emqx_frame:parse(<<>>, ContParse2), + {ok, Packet, <<>>, _} = emqx_frame:parse(RestBin, ContParse3). + +t_parse_frame_too_large(_) -> + Packet = ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, payload(1000)), + ?catch_error(mqtt_frame_too_large, parse_serialize(Packet, #{max_size => 256})), + ?catch_error(mqtt_frame_too_large, parse_serialize(Packet, #{max_size => 512})), + ?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})). + t_serialize_parse_connect(_) -> - Packet1 = ?CONNECT_PACKET(#mqtt_packet_connect{}), - ?assertEqual(Packet1, parse_serialize(Packet1)), - Packet2 = ?CONNECT_PACKET(#mqtt_packet_connect{ - client_id = <<"clientId">>, - will_qos = ?QOS_1, - will_flag = true, - will_retain = true, - will_topic = <<"will">>, - will_payload = <<"bye">>, - clean_start = true - }), - ?assertEqual(Packet2, parse_serialize(Packet2)). + ?PROPTEST(prop_serialize_parse_connect). + +prop_serialize_parse_connect() -> + ?FORALL(Opts = #{version := ProtoVer}, parse_opts(), + begin + ProtoName = proplists:get_value(ProtoVer, ?PROTOCOL_NAMES), + DefaultProps = if ProtoVer == ?MQTT_PROTO_V5 -> + #{}; + true -> undefined + end, + Packet = ?CONNECT_PACKET(#mqtt_packet_connect{ + proto_name = ProtoName, + proto_ver = ProtoVer, + client_id = <<"clientId">>, + will_qos = ?QOS_1, + will_flag = true, + will_retain = true, + will_topic = <<"will">>, + will_props = DefaultProps, + will_payload = <<"bye">>, + clean_start = true, + properties = DefaultProps + }), + ok == ?assertEqual(Packet, parse_serialize(Packet, Opts)) + end). t_serialize_parse_v3_connect(_) -> Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115, @@ -130,16 +171,19 @@ t_serialize_parse_v3_connect(_) -> clean_start = true, keepalive = 60 }), - ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)). + {ok, Packet, <<>>, PState} = emqx_frame:parse(Bin), + ?assertMatch({none, #{version := ?MQTT_PROTO_V3}}, PState). t_serialize_parse_v4_connect(_) -> Bin = <<16,35,0,4,77,81,84,84,4,2,0,60,0,23,109,111,115,113,112,117, 98,47,49,48,52,53,49,45,105,77,97,99,46,108,111,99,97>>, - Packet = ?CONNECT_PACKET(#mqtt_packet_connect{proto_ver = 4, - proto_name = <<"MQTT">>, - client_id = <<"mosqpub/10451-iMac.loca">>, - clean_start = true, - keepalive = 60}), + Packet = ?CONNECT_PACKET( + #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V4, + proto_name = <<"MQTT">>, + client_id = <<"mosqpub/10451-iMac.loca">>, + clean_start = true, + keepalive = 60 + }), ?assertEqual(Bin, serialize_to_binary(Packet)), ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)). @@ -185,13 +229,12 @@ t_serialize_parse_v5_connect(_) -> t_serialize_parse_connect_without_clientid(_) -> Bin = <<16,12,0,4,77,81,84,84,4,2,0,60,0,0>>, - Packet = ?CONNECT_PACKET( - #mqtt_packet_connect{proto_ver = 4, - proto_name = <<"MQTT">>, - client_id = <<>>, - clean_start = true, - keepalive = 60 - }), + Packet = ?CONNECT_PACKET(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V4, + proto_name = <<"MQTT">>, + client_id = <<>>, + clean_start = true, + keepalive = 60 + }), ?assertEqual(Bin, serialize_to_binary(Packet)), ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)). @@ -237,7 +280,9 @@ t_serialize_parse_bridge_connect(_) -> will_payload = <<"0">> }}, ?assertEqual(Bin, serialize_to_binary(Packet)), - ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)). + ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)), + Packet1 = ?CONNECT_PACKET(#mqtt_packet_connect{is_bridge = true}), + ?assertEqual(Packet1, parse_serialize(Packet1)). t_serialize_parse_connack(_) -> Packet = ?CONNACK_PACKET(?RC_SUCCESS), @@ -275,7 +320,7 @@ t_serialize_parse_qos0_publish(_) -> packet_id = undefined}, payload = <<"hello">>}, ?assertEqual(Bin, serialize_to_binary(Packet)), - ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)). + ?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})). t_serialize_parse_qos1_publish(_) -> Bin = <<50,13,0,5,97,47,98,47,99,0,1,104,97,104,97>>, @@ -287,11 +332,16 @@ t_serialize_parse_qos1_publish(_) -> packet_id = 1}, payload = <<"haha">>}, ?assertEqual(Bin, serialize_to_binary(Packet)), - ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)). + ?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})), + ?catch_error(bad_packet_id, parse_serialize(?PUBLISH_PACKET(?QOS_1, <<"Topic">>, 0, <<>>))). t_serialize_parse_qos2_publish(_) -> - Packet = ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, payload()), - ?assertEqual(Packet, parse_serialize(Packet)). + Packet = ?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 1, <<>>), + Bin = <<52,9,0,5,84,111,112,105,99,0,1>>, + ?assertEqual(Packet, parse_serialize(Packet)), + ?assertEqual(Bin, serialize_to_binary(Packet)), + ?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})), + ?catch_error(bad_packet_id, parse_serialize(?PUBLISH_PACKET(?QOS_2, <<"Topic">>, 0, <<>>))). t_serialize_parse_publish_v5(_) -> Props = #{'Payload-Format-Indicator' => 1, @@ -307,7 +357,16 @@ t_serialize_parse_publish_v5(_) -> t_serialize_parse_puback(_) -> Packet = ?PUBACK_PACKET(1), ?assertEqual(<<64,2,0,1>>, serialize_to_binary(Packet)), - ?assertEqual(Packet, parse_serialize(Packet)). + ?assertEqual(Packet, parse_serialize(Packet)), + ?catch_error(bad_packet_id, parse_serialize(?PUBACK_PACKET(0))). + +t_serialize_parse_puback_v3_4(_) -> + Bin = <<64,2,0,1>>, + Packet = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBACK}, variable = 1}, + ?assertEqual(Bin, serialize_to_binary(Packet, ?MQTT_PROTO_V3)), + ?assertEqual(Bin, serialize_to_binary(Packet, ?MQTT_PROTO_V4)), + ?assertEqual(?PUBACK_PACKET(1), parse_to_packet(Bin, #{version => ?MQTT_PROTO_V3})), + ?assertEqual(?PUBACK_PACKET(1), parse_to_packet(Bin, #{version => ?MQTT_PROTO_V4})). t_serialize_parse_puback_v5(_) -> Packet = ?PUBACK_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}), @@ -316,7 +375,8 @@ t_serialize_parse_puback_v5(_) -> t_serialize_parse_pubrec(_) -> Packet = ?PUBREC_PACKET(1), ?assertEqual(<<5:4,0:4,2,0,1>>, serialize_to_binary(Packet)), - ?assertEqual(Packet, parse_serialize(Packet)). + ?assertEqual(Packet, parse_serialize(Packet)), + ?catch_error(bad_packet_id, parse_serialize(?PUBREC_PACKET(0))). t_serialize_parse_pubrec_v5(_) -> Packet = ?PUBREC_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}), @@ -326,7 +386,12 @@ t_serialize_parse_pubrel(_) -> Packet = ?PUBREL_PACKET(1), Bin = serialize_to_binary(Packet), ?assertEqual(<<6:4,2:4,2,0,1>>, Bin), - ?assertEqual(Packet, parse_serialize(Packet)). + ?assertEqual(Packet, parse_serialize(Packet)), + %% PUBREL with bad qos 0 + Bin0 = <<6:4,0:4,2,0,1>>, + ?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})), + ?catch_error(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})), + ?catch_error(bad_packet_id, parse_serialize(?PUBREL_PACKET(0))). t_serialize_parse_pubrel_v5(_) -> Packet = ?PUBREL_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}), @@ -336,7 +401,8 @@ t_serialize_parse_pubcomp(_) -> Packet = ?PUBCOMP_PACKET(1), Bin = serialize_to_binary(Packet), ?assertEqual(<<7:4,0:4,2,0,1>>, Bin), - ?assertEqual(Packet, parse_serialize(Packet)). + ?assertEqual(Packet, parse_serialize(Packet)), + ?catch_error(bad_packet_id, parse_serialize(?PUBCOMP_PACKET(0))). t_serialize_parse_pubcomp_v5(_) -> Packet = ?PUBCOMP_PACKET(16, ?RC_SUCCESS, #{'Reason-String' => <<"success">>}), @@ -344,13 +410,18 @@ t_serialize_parse_pubcomp_v5(_) -> t_serialize_parse_subscribe(_) -> %% SUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[{<<"TopicA">>,2}]) - Bin = <<130,11,0,2,0,6,84,111,112,105,99,65,2>>, + Bin = <>, TopicOpts = #{nl => 0 , rap => 0, rc => 0, rh => 0, qos => 2}, TopicFilters = [{<<"TopicA">>, TopicOpts}], Packet = ?SUBSCRIBE_PACKET(2, TopicFilters), ?assertEqual(Bin, serialize_to_binary(Packet)), - %%ct:log("Bin: ~p, Packet: ~p ~n", [Packet, parse(Bin)]), - ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)). + ?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})), + %% SUBSCRIBE with bad qos 0 + Bin0 = <>, + ?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})), + ?catch_error(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})), + ?catch_error(bad_packet_id, parse_serialize(?SUBSCRIBE_PACKET(0, TopicFilters))), + ?catch_error(bad_subqos, parse_serialize(?SUBSCRIBE_PACKET(1, [{<<"t">>, #{qos => 3}}]))). t_serialize_parse_subscribe_v5(_) -> TopicFilters = [{<<"TopicQos0">>, #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0}}, @@ -360,7 +431,8 @@ t_serialize_parse_subscribe_v5(_) -> t_serialize_parse_suback(_) -> Packet = ?SUBACK_PACKET(10, [?QOS_0, ?QOS_1, 128]), - ?assertEqual(Packet, parse_serialize(Packet)). + ?assertEqual(Packet, parse_serialize(Packet)), + ?catch_error(bad_packet_id, parse_serialize(?SUBACK_PACKET(0, [?QOS_0]))). t_serialize_parse_suback_v5(_) -> Packet = ?SUBACK_PACKET(1, #{'Reason-String' => <<"success">>, @@ -369,11 +441,17 @@ t_serialize_parse_suback_v5(_) -> ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). t_serialize_parse_unsubscribe(_) -> - %% UNSUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[<<"TopicA">>]) + %% UNSUBSCRIBE(Q1, R1, D0, PacketId=2, TopicTable=[<<"TopicA">>]) + Bin = <>, Packet = ?UNSUBSCRIBE_PACKET(2, [<<"TopicA">>]), - Bin = <<162,10,0,2,0,6,84,111,112,105,99,65>>, ?assertEqual(Bin, serialize_to_binary(Packet)), - ?assertMatch({ok, Packet, <<>>, _}, emqx_frame:parse(Bin)). + ?assertMatch(Packet, parse_to_packet(Bin, #{strict_mode => true})), + %% UNSUBSCRIBE with bad qos + %% UNSUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[<<"TopicA">>]) + Bin0 = <>, + ?assertMatch(Packet, parse_to_packet(Bin0, #{strict_mode => false})), + ?catch_error(bad_frame_header, parse_to_packet(Bin0, #{strict_mode => true})), + ?catch_error(bad_packet_id, parse_serialize(?UNSUBSCRIBE_PACKET(0, [<<"TopicA">>]))). t_serialize_parse_unsubscribe_v5(_) -> Props = #{'User-Property' => [{<<"key">>, <<"val">>}]}, @@ -419,12 +497,18 @@ t_serialize_parse_auth_v5(_) -> #{'Authentication-Method' => <<"oauth2">>, 'Authentication-Data' => <<"3zekkd">>, 'Reason-String' => <<"success">>, - 'User-Property' => [{<<"key">>, <<"val">>}] + 'User-Property' => [{<<"key1">>, <<"val1">>}, + {<<"key2">>, <<"val2">>}] }), - ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). + ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})), + ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5, + strict_mode => true})). + +parse_opts() -> + ?LET(PropList, [{strict_mode, boolean()}, {version, range(4,5)}], maps:from_list(PropList)). parse_serialize(Packet) -> - parse_serialize(Packet, #{}). + parse_serialize(Packet, #{strict_mode => true}). parse_serialize(Packet, Opts) when is_map(Opts) -> Ver = maps:get(version, Opts, ?MQTT_PROTO_V4), @@ -436,6 +520,13 @@ parse_serialize(Packet, Opts) when is_map(Opts) -> serialize_to_binary(Packet) -> iolist_to_binary(emqx_frame:serialize(Packet)). -payload() -> - iolist_to_binary(["payload." || _I <- lists:seq(1, 1000)]). +serialize_to_binary(Packet, Ver) -> + iolist_to_binary(emqx_frame:serialize(Packet, Ver)). + +parse_to_packet(Bin, Opts) -> + PState = emqx_frame:initial_parse_state(Opts), + {ok, Packet, <<>>, _} = emqx_frame:parse(Bin, PState), + Packet. + +payload(Len) -> iolist_to_binary(lists:duplicate(Len, 1)). diff --git a/test/emqx_message_SUITE.erl b/test/emqx_message_SUITE.erl index 9c699b65d..e9f51db08 100644 --- a/test/emqx_message_SUITE.erl +++ b/test/emqx_message_SUITE.erl @@ -27,6 +27,7 @@ , t_header/1 , t_format/1 , t_expired/1 + , t_to_packet/1 , t_to_map/1 ]). @@ -91,6 +92,18 @@ t_expired(_) -> Msg2 = emqx_message:update_expiry(Msg1), ?assertEqual(1, emqx_message:get_header('Message-Expiry-Interval', Msg2)). +t_to_packet(_) -> + Pkt = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, + qos = ?QOS_0, + retain = false, + dup = false}, + variable = #mqtt_packet_publish{topic_name = <<"topic">>, + packet_id = 10, + properties = #{}}, + payload = <<"payload">>}, + Msg = emqx_message:make(<<"clientid">>, ?QOS_0, <<"topic">>, <<"payload">>), + ?assertEqual(Pkt, emqx_message:to_packet(10, Msg)). + t_to_map(_) -> Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"topic">>, <<"payload">>), List = [{id, emqx_message:id(Msg)}, diff --git a/test/emqx_mod_subscription_SUITE.erl b/test/emqx_mod_subscription_SUITE.erl index 464bfcc76..0c9c9c678 100644 --- a/test/emqx_mod_subscription_SUITE.erl +++ b/test/emqx_mod_subscription_SUITE.erl @@ -28,6 +28,7 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> + emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:start_apps([emqx]), Config. diff --git a/test/emqx_msg_expiry_interval_SUITE.erl b/test/emqx_msg_expiry_interval_SUITE.erl new file mode 100644 index 000000000..0da3efde1 --- /dev/null +++ b/test/emqx_msg_expiry_interval_SUITE.erl @@ -0,0 +1,92 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_msg_expiry_interval_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +all() -> emqx_ct:all(?MODULE). + +init_per_suite(Config) -> + emqx_ct_helpers:boot_modules(all), + emqx_ct_helpers:start_apps([]), + Config. + +end_per_suite(_Config) -> + emqx_ct_helpers:stop_apps([]). + +t_message_expiry_interval_1(_) -> + ClientA = message_expiry_interval_init(), + [message_expiry_interval_exipred(ClientA, QoS) || QoS <- [0,1,2]]. + +t_message_expiry_interval_2(_) -> + ClientA = message_expiry_interval_init(), + [message_expiry_interval_not_exipred(ClientA, QoS) || QoS <- [0,1,2]]. + +message_expiry_interval_init() -> + {ok, ClientA} = emqtt:start_link([{proto_ver,v5}, {client_id, <<"client-a">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, ClientB} = emqtt:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, _} = emqtt:connect(ClientA), + {ok, _} = emqtt:connect(ClientB), + %% subscribe and disconnect client-b + emqtt:subscribe(ClientB, <<"t/a">>, 1), + emqtt:stop(ClientB), + ClientA. + +message_expiry_interval_exipred(ClientA, QoS) -> + ct:pal("~p ~p", [?FUNCTION_NAME, QoS]), + %% publish to t/a and waiting for the message expired + emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 1}, <<"this will be purged in 1s">>, [{qos, QoS}]), + ct:sleep(1000), + + %% resume the session for client-b + {ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, _} = emqtt:connect(ClientB1), + + %% verify client-b could not receive the publish message + receive + {publish,#{client_pid := ClientB1, topic := <<"t/a">>}} -> + ct:fail(should_have_expired) + after 300 -> + ok + end, + emqtt:stop(ClientB1). + +message_expiry_interval_not_exipred(ClientA, QoS) -> + ct:pal("~p ~p", [?FUNCTION_NAME, QoS]), + %% publish to t/a + emqtt:publish(ClientA, <<"t/a">>, #{'Message-Expiry-Interval' => 20}, <<"this will be purged in 1s">>, [{qos, QoS}]), + + %% wait for 1s and then resume the session for client-b, the message should not expires + %% as Message-Expiry-Interval = 20s + ct:sleep(1000), + {ok, ClientB1} = emqtt:start_link([{proto_ver,v5}, {client_id, <<"client-b">>}, {clean_start, false},{properties, #{'Session-Expiry-Interval' => 360}}]), + {ok, _} = emqtt:connect(ClientB1), + + %% verify client-b could receive the publish message and the Message-Expiry-Interval is set + receive + {publish,#{client_pid := ClientB1, topic := <<"t/a">>, + properties := #{'Message-Expiry-Interval' := MsgExpItvl}}} + when MsgExpItvl < 20 -> ok; + {publish, _} = Msg -> + ct:fail({incorrect_publish, Msg}) + after 300 -> + ct:fail(no_publish_received) + end, + emqtt:stop(ClientB1). diff --git a/test/emqx_packet_SUITE.erl b/test/emqx_packet_SUITE.erl index 732c48fe3..f802e7718 100644 --- a/test/emqx_packet_SUITE.erl +++ b/test/emqx_packet_SUITE.erl @@ -26,63 +26,127 @@ all() -> emqx_ct:all(?MODULE). -t_proto_name(_) -> - ?assertEqual(<<"MQIsdp">>, emqx_packet:proto_name(3)), - ?assertEqual(<<"MQTT">>, emqx_packet:proto_name(4)), - ?assertEqual(<<"MQTT">>, emqx_packet:proto_name(5)). +t_type(_) -> + ?assertEqual(?CONNECT, emqx_packet:type(?CONNECT_PACKET(#mqtt_packet_connect{}))), + ?assertEqual(?CONNACK, emqx_packet:type(?CONNACK_PACKET(?RC_SUCCESS))), + ?assertEqual(?PUBLISH, emqx_packet:type(?PUBLISH_PACKET(?QOS_1))), + ?assertEqual(?PUBACK, emqx_packet:type(?PUBACK_PACKET(1))), + ?assertEqual(?PUBREC, emqx_packet:type(?PUBREC_PACKET(1))), + ?assertEqual(?PUBREL, emqx_packet:type(?PUBREL_PACKET(1))), + ?assertEqual(?PUBCOMP, emqx_packet:type(?PUBCOMP_PACKET(1))), + ?assertEqual(?SUBSCRIBE, emqx_packet:type(?SUBSCRIBE_PACKET(1, []))), + ?assertEqual(?SUBACK, emqx_packet:type(?SUBACK_PACKET(1, [0]))), + ?assertEqual(?UNSUBSCRIBE, emqx_packet:type(?UNSUBSCRIBE_PACKET(1, []))), + ?assertEqual(?UNSUBACK, emqx_packet:type(?UNSUBACK_PACKET(1))), + ?assertEqual(?DISCONNECT, emqx_packet:type(?DISCONNECT_PACKET(?RC_SUCCESS))), + ?assertEqual(?AUTH, emqx_packet:type(?AUTH_PACKET())). t_type_name(_) -> - ?assertEqual('CONNECT', emqx_packet:type_name(?CONNECT)), - ?assertEqual('UNSUBSCRIBE', emqx_packet:type_name(?UNSUBSCRIBE)). + ?assertEqual('CONNECT', emqx_packet:type_name(?CONNECT_PACKET(#mqtt_packet_connect{}))), + ?assertEqual('CONNACK', emqx_packet:type_name(?CONNACK_PACKET(?RC_SUCCESS))), + ?assertEqual('PUBLISH', emqx_packet:type_name(?PUBLISH_PACKET(?QOS_1))), + ?assertEqual('PUBACK', emqx_packet:type_name(?PUBACK_PACKET(1))), + ?assertEqual('PUBREC', emqx_packet:type_name(?PUBREC_PACKET(1))), + ?assertEqual('PUBREL', emqx_packet:type_name(?PUBREL_PACKET(1))), + ?assertEqual('PUBCOMP', emqx_packet:type_name(?PUBCOMP_PACKET(1))), + ?assertEqual('SUBSCRIBE', emqx_packet:type_name(?SUBSCRIBE_PACKET(1, []))), + ?assertEqual('SUBACK', emqx_packet:type_name(?SUBACK_PACKET(1, [0]))), + ?assertEqual('UNSUBSCRIBE', emqx_packet:type_name(?UNSUBSCRIBE_PACKET(1, []))), + ?assertEqual('UNSUBACK', emqx_packet:type_name(?UNSUBACK_PACKET(1))), + ?assertEqual('DISCONNECT', emqx_packet:type_name(?DISCONNECT_PACKET(?RC_SUCCESS))), + ?assertEqual('AUTH', emqx_packet:type_name(?AUTH_PACKET())). -t_validate(_) -> - ?assert(emqx_packet:validate(?SUBSCRIBE_PACKET(15, #{'Subscription-Identifier' => 1}, - [{<<"topic">>, #{qos => ?QOS_0}}]))), - ?assert(emqx_packet:validate(?UNSUBSCRIBE_PACKET(89, [<<"topic">>]))), - ?assert(emqx_packet:validate(?CONNECT_PACKET(#mqtt_packet_connect{}))), +t_dup(_) -> + ?assertEqual(false, emqx_packet:dup(?PUBLISH_PACKET(?QOS_1))). + +t_qos(_) -> + ?assertEqual(?QOS_0, emqx_packet:qos(?PUBLISH_PACKET(?QOS_0))), + ?assertEqual(?QOS_1, emqx_packet:qos(?PUBLISH_PACKET(?QOS_1))), + ?assertEqual(?QOS_2, emqx_packet:qos(?PUBLISH_PACKET(?QOS_2))). + +t_retain(_) -> + ?assertEqual(false, emqx_packet:retain(?PUBLISH_PACKET(?QOS_1))). + +t_proto_name(_) -> + lists:foreach( + fun({Ver, Name}) -> + ConnPkt = ?CONNECT_PACKET(#mqtt_packet_connect{proto_ver = Ver, + proto_name = Name}), + ?assertEqual(Name, emqx_packet:proto_name(ConnPkt)) + end, ?PROTOCOL_NAMES). + +t_proto_ver(_) -> + lists:foreach( + fun(Ver) -> + ?assertEqual(Ver, emqx_packet:proto_ver(#mqtt_packet_connect{proto_ver = Ver})) + end, [?MQTT_PROTO_V3, ?MQTT_PROTO_V4, ?MQTT_PROTO_V5]). + +t_check_publish(_) -> Props = #{'Response-Topic' => <<"responsetopic">>, 'Topic-Alias' => 1}, - ?assert(emqx_packet:validate(?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, Props, <<"payload">>))), - ?assert(emqx_packet:validate(?CONNECT_PACKET(#mqtt_packet_connect{properties = #{'Receive-Maximum' => 1}}))), - ?assertError(subscription_identifier_invalid, - emqx_packet:validate( - ?SUBSCRIBE_PACKET(15, #{'Subscription-Identifier' => -1}, - [{<<"topic">>, #{qos => ?QOS_0}}]))), - ?assertError(topic_filters_invalid, - emqx_packet:validate(?UNSUBSCRIBE_PACKET(1,[]))), - ?assertError(topic_name_invalid, - emqx_packet:validate(?PUBLISH_PACKET(1,<<>>,1,#{},<<"payload">>))), - ?assertError(topic_name_invalid, - emqx_packet:validate(?PUBLISH_PACKET - (1, <<"+/+">>, 1, #{}, <<"payload">>))), - ?assertError(topic_alias_invalid, - emqx_packet:validate( - ?PUBLISH_PACKET - (1, <<"topic">>, 1, #{'Topic-Alias' => 0}, <<"payload">>))), - ?assertError(protocol_error, - emqx_packet:validate( - ?PUBLISH_PACKET(1, <<"topic">>, 1, - #{'Subscription-Identifier' => 10}, <<"payload">>))), - ?assertError(protocol_error, - emqx_packet:validate( - ?PUBLISH_PACKET(1, <<"topic">>, 1, - #{'Response-Topic' => <<"+/+">>}, <<"payload">>))), - ?assertError(protocol_error, - emqx_packet:validate( - ?CONNECT_PACKET(#mqtt_packet_connect{ - properties = - #{'Request-Response-Information' => -1}}))), - ?assertError(protocol_error, - emqx_packet:validate( - ?CONNECT_PACKET(#mqtt_packet_connect{ - properties = - #{'Request-Problem-Information' => 2}}))), - ?assertError(protocol_error, - emqx_packet:validate( - ?CONNECT_PACKET(#mqtt_packet_connect{ - properties = - #{'Receive-Maximum' => 0}}))). + ok = emqx_packet:check(?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, Props, <<"payload">>)), + ok = emqx_packet:check(#mqtt_packet_publish{packet_id = 1, topic_name = <<"t">>}), + {error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(?PUBLISH_PACKET(1,<<>>,1,#{},<<"payload">>)), + {error, ?RC_TOPIC_NAME_INVALID} = emqx_packet:check(?PUBLISH_PACKET(1, <<"+/+">>, 1, #{}, <<"payload">>)), + {error, ?RC_TOPIC_ALIAS_INVALID} = emqx_packet:check(?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Topic-Alias' => 0}, <<"payload">>)), + %% TODO:: + %% {error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Subscription-Identifier' => 10}, <<"payload">>)), + ok = emqx_packet:check(?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Subscription-Identifier' => 10}, <<"payload">>)), + {error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Response-Topic' => <<"+/+">>}, <<"payload">>)). + +t_check_subscribe(_) -> + ok = emqx_packet:check(?SUBSCRIBE_PACKET(1, #{'Subscription-Identifier' => 1}, + [{<<"topic">>, #{qos => ?QOS_0}}])), + {error, ?RC_SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED} = + emqx_packet:check(?SUBSCRIBE_PACKET(1, #{'Subscription-Identifier' => -1}, + [{<<"topic">>, #{qos => ?QOS_0, rp => 0}}])). + +t_check_unsubscribe(_) -> + ok = emqx_packet:check(?UNSUBSCRIBE_PACKET(1, [<<"topic">>])), + {error, ?RC_TOPIC_FILTER_INVALID} = emqx_packet:check(?UNSUBSCRIBE_PACKET(1,[])). + +t_check_connect(_) -> + Opts = #{max_clientid_len => 5, mqtt_retain_available => false}, + ok = emqx_packet:check(#mqtt_packet_connect{}, Opts), + ok = emqx_packet:check(?CONNECT_PACKET(#mqtt_packet_connect{properties = #{'Receive-Maximum' => 1}}), Opts), + ConnPkt1 = #mqtt_packet_connect{proto_name = <<"MQIsdp">>, + proto_ver = ?MQTT_PROTO_V5 + }, + {error, ?RC_UNSUPPORTED_PROTOCOL_VERSION} = emqx_packet:check(ConnPkt1, Opts), + + ConnPkt2 = #mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V3, + proto_name = <<"MQIsdp">>, + client_id = <<>> + }, + {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID} = emqx_packet:check(ConnPkt2, Opts), + + ConnPkt3 = #mqtt_packet_connect{client_id = <<"123456">>}, + {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID} = emqx_packet:check(ConnPkt3, Opts), + + ConnPkt4 = #mqtt_packet_connect{will_flag = true, + will_retain = true + }, + {error, ?RC_RETAIN_NOT_SUPPORTED} = emqx_packet:check(ConnPkt4, Opts), + + ConnPkt5 = #mqtt_packet_connect{will_flag = true, + will_topic = <<"#">> + }, + {error, ?RC_TOPIC_NAME_INVALID} = emqx_packet:check(ConnPkt5, Opts), + + ConnPkt6 = ?CONNECT_PACKET(#mqtt_packet_connect{properties = #{'Request-Response-Information' => -1}}), + {error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(ConnPkt6, Opts), + + {error, ?RC_PROTOCOL_ERROR} = emqx_packet:check( + ?CONNECT_PACKET(#mqtt_packet_connect{ + properties = #{'Request-Problem-Information' => 2}}), Opts), + {error, ?RC_PROTOCOL_ERROR} = emqx_packet:check( + ?CONNECT_PACKET(#mqtt_packet_connect{ + properties = #{'Receive-Maximum' => 0}}), Opts). t_from_to_message(_) -> + ExpectedMsg = emqx_message:set_headers( + #{peername => {{127,0,0,1}, 9527}, username => <<"test">>}, + emqx_message:make(<<"clientid">>, ?QOS_0, <<"topic">>, <<"payload">>)), + ExpectedMsg1 = emqx_message:set_flag(retain, false, ExpectedMsg), Pkt = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, qos = ?QOS_0, retain = false, @@ -91,30 +155,12 @@ t_from_to_message(_) -> packet_id = 10, properties = #{}}, payload = <<"payload">>}, - Msg = emqx_message:make(<<"clientid">>, ?QOS_0, <<"topic">>, <<"payload">>), - Msg2 = emqx_message:set_flag(retain, false, Msg), - Pkt = emqx_packet:from_message(10, Msg2), - Msg3 = emqx_message:set_header( - peername, {{127,0,0,1}, 9527}, - emqx_message:set_header(username, "test", Msg2) - ), - Msg4 = emqx_packet:to_message(#{client_id => <<"clientid">>, - username => "test", - peername => {{127,0,0,1}, 9527}}, Pkt), - Msg5 = Msg4#message{timestamp = Msg3#message.timestamp, id = Msg3#message.id}, - Msg5 = Msg3. - -t_packet_format(_) -> - io:format("~s", [emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{}))]), - io:format("~s", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]), - io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1))]), - io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]), - io:format("~s", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]), - io:format("~s", [emqx_packet:format(?PUBREL_PACKET(99))]), - io:format("~s", [emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS_0}, {<<"topic1">>, ?QOS_1}]))]), - io:format("~s", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS_0, ?QOS_1]))]), - io:format("~s", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]), - io:format("~s", [emqx_packet:format(?UNSUBACK_PACKET(90))]). + MsgFromPkt = emqx_packet:to_message(#{client_id => <<"clientid">>, + username => <<"test">>, + peername => {{127,0,0,1}, 9527}}, Pkt), + ?assertEqual(ExpectedMsg1, MsgFromPkt#message{id = emqx_message:id(ExpectedMsg), + timestamp = emqx_message:timestamp(ExpectedMsg) + }). t_will_msg(_) -> Pkt = #mqtt_packet_connect{will_flag = true, @@ -130,3 +176,15 @@ t_will_msg(_) -> ?assertEqual(<<"clientid">>, Msg#message.from), ?assertEqual(<<"topic">>, Msg#message.topic). +t_format(_) -> + io:format("~s", [emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{}))]), + io:format("~s", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]), + io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1))]), + io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]), + io:format("~s", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]), + io:format("~s", [emqx_packet:format(?PUBREL_PACKET(99))]), + io:format("~s", [emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS_0}, {<<"topic1">>, ?QOS_1}]))]), + io:format("~s", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS_0, ?QOS_1]))]), + io:format("~s", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]), + io:format("~s", [emqx_packet:format(?UNSUBACK_PACKET(90))]). + diff --git a/test/emqx_plugins_SUITE.erl b/test/emqx_plugins_SUITE.erl index 5cab5b362..793e506a3 100644 --- a/test/emqx_plugins_SUITE.erl +++ b/test/emqx_plugins_SUITE.erl @@ -37,6 +37,7 @@ init_per_suite(Config) -> code:add_path(filename:join([AppPath, "_build", "default", "lib", "emqx_mini_plugin", "ebin"])), put(loaded_file, filename:join([DataPath, "loaded_plugins"])), + emqx_ct_helpers:boot_modules([]), emqx_ct_helpers:start_apps([], fun set_sepecial_cfg/1), Config. diff --git a/test/emqx_protocol_SUITE.erl b/test/emqx_protocol_SUITE.erl index 2cd091e30..2c7e9479f 100644 --- a/test/emqx_protocol_SUITE.erl +++ b/test/emqx_protocol_SUITE.erl @@ -38,7 +38,7 @@ init_protocol() -> client_id = <<"clientid">>, username = <<"username">>, password = <<"passwd">> - }). + }, testing). end_per_suite(_Config) -> ok. @@ -48,11 +48,11 @@ t_init_info_1(Config) -> proto_ver => ?MQTT_PROTO_V5, clean_start => true, keepalive => 30, - conn_props => #{}, will_msg => undefined, client_id => <<"clientid">>, username => <<"username">>, - topic_aliases => undefined + topic_aliases => undefined, + alias_maximum => #{outbound => 0, inbound => 0} }, emqx_protocol:info(Proto)). t_init_info_2(Config) -> @@ -65,8 +65,8 @@ t_init_info_2(Config) -> ?assertEqual(<<"username">>, emqx_protocol:info(username, Proto)), ?assertEqual(undefined, emqx_protocol:info(will_msg, Proto)), ?assertEqual(0, emqx_protocol:info(will_delay_interval, Proto)), - ?assertEqual(#{}, emqx_protocol:info(conn_props, Proto)), - ?assertEqual(undefined, emqx_protocol:info(topic_aliases, Proto)). + ?assertEqual(undefined, emqx_protocol:info(topic_aliases, Proto)), + ?assertEqual(#{outbound => 0, inbound => 0}, emqx_protocol:info(alias_maximum, Proto)). t_find_save_alias(Config) -> Proto = proplists:get_value(proto, Config), diff --git a/test/emqx_request_responser_SUITE.erl b/test/emqx_request_responser_SUITE.erl index 4912f72ea..d603f2b96 100644 --- a/test/emqx_request_responser_SUITE.erl +++ b/test/emqx_request_responser_SUITE.erl @@ -22,6 +22,7 @@ -include_lib("common_test/include/ct.hrl"). init_per_suite(Config) -> + emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:start_apps([]), Config. diff --git a/test/emqx_router_SUITE.erl b/test/emqx_router_SUITE.erl index 711f1b1f9..511b62c8b 100644 --- a/test/emqx_router_SUITE.erl +++ b/test/emqx_router_SUITE.erl @@ -27,6 +27,7 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> + emqx_ct_helpers:boot_modules([router]), emqx_ct_helpers:start_apps([]), Config. diff --git a/test/emqx_shared_sub_SUITE.erl b/test/emqx_shared_sub_SUITE.erl index 1834fdbaf..14baf8a76 100644 --- a/test/emqx_shared_sub_SUITE.erl +++ b/test/emqx_shared_sub_SUITE.erl @@ -32,6 +32,7 @@ all() -> emqx_ct:all(?SUITE). init_per_suite(Config) -> + emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:start_apps([]), Config. diff --git a/test/emqx_sys_mon_SUITE.erl b/test/emqx_sys_mon_SUITE.erl index 7f9025b58..b64181fa3 100644 --- a/test/emqx_sys_mon_SUITE.erl +++ b/test/emqx_sys_mon_SUITE.erl @@ -42,6 +42,7 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> + emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:start_apps([]), Config. diff --git a/test/emqx_tracer_SUITE.erl b/test/emqx_tracer_SUITE.erl index 684f17e7f..1d9afe028 100644 --- a/test/emqx_tracer_SUITE.erl +++ b/test/emqx_tracer_SUITE.erl @@ -26,6 +26,7 @@ all() -> [t_start_traces]. init_per_suite(Config) -> + emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:start_apps([]), Config. diff --git a/test/emqx_ws_connection_SUITE.erl b/test/emqx_ws_connection_SUITE.erl index dfb348253..51b9fde8d 100644 --- a/test/emqx_ws_connection_SUITE.erl +++ b/test/emqx_ws_connection_SUITE.erl @@ -24,6 +24,7 @@ all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> + emqx_ct_helpers:boot_modules(all), emqx_ct_helpers:start_apps([]), Config.