diff --git a/.github/workflows/run_test_case.yaml b/.github/workflows/run_test_case.yaml index 8bb918294..ef49d8696 100644 --- a/.github/workflows/run_test_case.yaml +++ b/.github/workflows/run_test_case.yaml @@ -13,10 +13,13 @@ jobs: steps: - uses: actions/checkout@v1 - - name: Run tests + - name: Code dialyzer run: | make xref + make dialyzer rm -f rebar.lock + - name: Run tests + run: | make eunit rm -f rebar.lock make ct diff --git a/Makefile b/Makefile index f0ef7d6e8..d1cd12a60 100644 --- a/Makefile +++ b/Makefile @@ -71,6 +71,14 @@ coveralls: xref: @rebar3 xref +.PHONY: dialyzer +dialyzer: + @rebar3 dialyzer + +.PHONY: proper +proper: + @rebar3 proper -d test/props -c + .PHONY: deps deps: @rebar3 get-deps diff --git a/etc/emqx.conf b/etc/emqx.conf index 915ca597a..b22f22c24 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -663,6 +663,11 @@ mqtt.ignore_loop_deliver = false ## Value: true | false mqtt.strict_mode = false +## Specify the response information returned to the client +## +## Value: String +## mqtt.response_information = example + ##-------------------------------------------------------------------- ## Zones ##-------------------------------------------------------------------- @@ -868,6 +873,11 @@ zone.external.ignore_loop_deliver = false ## Value: true | false zone.external.strict_mode = false +## Specify the response information returned to the client +## +## Value: String +## zone.external.response_information = example + ##-------------------------------------------------------------------- ## Internal Zone @@ -954,6 +964,11 @@ zone.internal.ignore_loop_deliver = false ## Value: true | false zone.internal.strict_mode = false +## Specify the response information returned to the client +## +## Value: String +## zone.internal.response_information = example + ## Allow the zone's clients to bypass authentication step ## ## Value: true | false @@ -1705,16 +1720,6 @@ listener.wss.external.access.1 = allow all ## Value: on | off listener.wss.external.verify_protocol_header = on -## See: listener.ws.external.proxy_address_header -## -## Value: String -## listener.wss.external.proxy_address_header = X-Forwarded-For - -## See: listener.ws.external.proxy_port_header -## -## Value: String -## listener.wss.external.proxy_port_header = X-Forwarded-Port - ## Enable the Proxy Protocol V1/2 support. ## ## See: listener.tcp.$name.proxy_protocol diff --git a/include/emqx.hrl b/include/emqx.hrl index acca359c4..5a10b84e6 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -62,13 +62,15 @@ %% Message from from :: atom() | binary(), %% Message flags - flags :: #{atom() => boolean()}, - %% Message headers, or MQTT 5.0 Properties - headers :: map(), + flags = #{} :: emqx_types:flags(), + %% Message headers. May contain any metadata. e.g. the + %% protocol version number, username, peerhost or + %% the PUBLISH properties (MQTT 5.0). + headers = #{} :: emqx_types:headers(), %% Topic that the message is published to - topic :: binary(), + topic :: emqx_types:topic(), %% Message Payload - payload :: binary(), + payload :: emqx_types:payload(), %% Timestamp (Unit: millisecond) timestamp :: integer() }). @@ -97,7 +99,7 @@ node_id :: trie_node_id(), edge_count = 0 :: non_neg_integer(), topic :: binary() | undefined, - flags :: list(atom()) + flags :: list(atom()) | undefined }). -record(trie_edge, { @@ -119,7 +121,8 @@ severity :: notice | warning | error | critical, title :: iolist(), summary :: iolist(), - timestamp :: erlang:timestamp() + %% Timestamp (Unit: millisecond) + timestamp :: integer() | undefined }). %%-------------------------------------------------------------------- @@ -128,11 +131,11 @@ -record(plugin, { name :: atom(), - dir :: string(), + dir :: string() | undefined, descr :: string(), - vendor :: string(), + vendor :: string() | undefined, active = false :: boolean(), - info :: map(), + info = #{} :: map(), type :: atom() }). diff --git a/include/emqx_mqtt.hrl b/include/emqx_mqtt.hrl index 152846a1b..078baf301 100644 --- a/include/emqx_mqtt.hrl +++ b/include/emqx_mqtt.hrl @@ -219,9 +219,9 @@ will_qos = ?QOS_0, will_retain = false, keepalive = 0, - properties = undefined, + properties = #{}, clientid = <<>>, - will_props = undefined, + will_props = #{}, will_topic = undefined, will_payload = undefined, username = undefined, @@ -231,53 +231,53 @@ -record(mqtt_packet_connack, { ack_flags, reason_code, - properties + properties = #{} }). -record(mqtt_packet_publish, { topic_name, packet_id, - properties + properties = #{} }). -record(mqtt_packet_puback, { packet_id, reason_code, - properties + properties = #{} }). -record(mqtt_packet_subscribe, { packet_id, - properties, + properties = #{}, topic_filters }). -record(mqtt_packet_suback, { packet_id, - properties, + properties = #{}, reason_codes }). -record(mqtt_packet_unsubscribe, { packet_id, - properties, + properties = #{}, topic_filters }). -record(mqtt_packet_unsuback, { packet_id, - properties, + properties = #{}, reason_codes }). -record(mqtt_packet_disconnect, { reason_code, - properties + properties = #{} }). -record(mqtt_packet_auth, { reason_code, - properties + properties = #{} }). %%-------------------------------------------------------------------- diff --git a/include/types.hrl b/include/types.hrl index 0cd10246c..e92f3f2a2 100644 --- a/include/types.hrl +++ b/include/types.hrl @@ -22,3 +22,5 @@ -type(ok_or_error(Value, Reason) :: {ok, Value} | {error, Reason}). +-type(mfargs() :: {module(), atom(), [term()]}). + diff --git a/priv/emqx.schema b/priv/emqx.schema index 1149e7c79..abd790b53 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -805,6 +805,11 @@ end}. {datatype, {enum, [true, false]}} ]}. +%% @doc Specify the response information returned to the client +{mapping, "mqtt.response_information", "emqx.response_information", [ + {datatype, string} +]}. + %%-------------------------------------------------------------------- %% Zones %%-------------------------------------------------------------------- @@ -1019,6 +1024,11 @@ end}. {datatype, {enum, [true, false]}} ]}. +%% @doc Specify the response information returned to the client +{mapping, "zone.$name.response_information", "emqx.zones", [ + {datatype, string} +]}. + %% @doc Whether to bypass the authentication step {mapping, "zone.$name.bypass_auth_plugins", "emqx.zones", [ {default, false}, @@ -1079,6 +1089,8 @@ end}. end; ("mountpoint", Val) -> {mountpoint, iolist_to_binary(Val)}; + ("response_information", Val) -> + {response_information, iolist_to_binary(Val)}; (Opt, Val) -> {list_to_atom(Opt), Val} end, diff --git a/rebar.config b/rebar.config index 4f73fb23e..76c927a78 100644 --- a/rebar.config +++ b/rebar.config @@ -1,10 +1,12 @@ {minimum_otp_vsn, "21.3"}. +{plugins, [rebar3_proper]}. + {deps, [{gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}, - {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.4"}}}, + {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}}, - {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.6.2"}}}, + {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.7.0"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.3"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.4.1"}}}, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} @@ -39,7 +41,7 @@ {deps, [{bbmustache, "1.7.0"}, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.2.0"}}}, - {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.2.2"}}} + {emqx_ct_helpers, {git, "https://github.com/emqx/emqx-ct-helpers", {tag, "1.3.0"}}} ]}, {erl_opts, [debug_info]} ]} diff --git a/src/emqx.erl b/src/emqx.erl index c21f252d5..2c6d444e7 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -146,7 +146,7 @@ unsubscribe(Topic) -> -spec(topics() -> list(emqx_topic:topic())). topics() -> emqx_router:topics(). --spec(subscribers(emqx_topic:topic() | string()) -> list(emqx_types:subscriber())). +-spec(subscribers(emqx_topic:topic() | string()) -> [pid()]). subscribers(Topic) -> emqx_broker:subscribers(iolist_to_binary(Topic)). @@ -168,7 +168,9 @@ subscribed(SubId, Topic) when is_atom(SubId); is_binary(SubId) -> hook(HookPoint, Action) -> emqx_hooks:add(HookPoint, Action). --spec(hook(emqx_hooks:hookpoint(), emqx_hooks:action(), emqx_hooks:filter() | integer()) +-spec(hook(emqx_hooks:hookpoint(), + emqx_hooks:action(), + emqx_hooks:filter() | integer() | list()) -> ok | {error, already_exists}). hook(HookPoint, Action, Priority) when is_integer(Priority) -> emqx_hooks:add(HookPoint, Action, Priority); @@ -182,7 +184,7 @@ hook(HookPoint, Action, InitArgs) when is_list(InitArgs) -> hook(HookPoint, Action, Filter, Priority) -> emqx_hooks:add(HookPoint, Action, Filter, Priority). --spec(unhook(emqx_hooks:hookpoint(), emqx_hooks:action()) -> ok). +-spec(unhook(emqx_hooks:hookpoint(), function() | {module(), atom()}) -> ok). unhook(HookPoint, Action) -> emqx_hooks:del(HookPoint, Action). diff --git a/src/emqx_alarm_handler.erl b/src/emqx_alarm_handler.erl index 990f8a76e..77d2b60a0 100644 --- a/src/emqx_alarm_handler.erl +++ b/src/emqx_alarm_handler.erl @@ -171,9 +171,10 @@ encode_alarm({AlarmId, AlarmDesc}) -> }). alarm_msg(Topic, Payload) -> - Msg = emqx_message:make(?MODULE, Topic, Payload), - emqx_message:set_headers(#{'Content-Type' => <<"application/json">>}, - emqx_message:set_flag(sys, Msg)). + emqx_message:make(?MODULE, 0, Topic, Payload, + #{sys => true}, + #{properties => #{'Content-Type' => <<"application/json">>}} + ). topic(alert) -> emqx_topic:systop(<<"alarms/alert">>); diff --git a/src/emqx_boot.erl b/src/emqx_boot.erl index 015a1a95e..be27607be 100644 --- a/src/emqx_boot.erl +++ b/src/emqx_boot.erl @@ -20,7 +20,7 @@ -define(BOOT_MODULES, [router, broker, listeners]). --spec(is_enabled(all|list(router|broker|listeners)) -> boolean()). +-spec(is_enabled(all|router|broker|listeners) -> boolean()). is_enabled(Mod) -> (BootMods = boot_modules()) =:= all orelse lists:member(Mod, BootMods). diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index df5939ef5..c5d9c6793 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -21,6 +21,7 @@ -include("emqx.hrl"). -include("logger.hrl"). -include("types.hrl"). +-include("emqx_mqtt.hrl"). -logger_header("[Broker]"). @@ -118,12 +119,13 @@ subscribe(Topic) when is_binary(Topic) -> -spec(subscribe(emqx_topic:topic(), emqx_types:subid() | emqx_types:subopts()) -> ok). subscribe(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) -> - subscribe(Topic, SubId, #{qos => 0}); + subscribe(Topic, SubId, ?DEFAULT_SUBOPTS); subscribe(Topic, SubOpts) when is_binary(Topic), is_map(SubOpts) -> subscribe(Topic, undefined, SubOpts). -spec(subscribe(emqx_topic:topic(), emqx_types:subid(), emqx_types:subopts()) -> ok). -subscribe(Topic, SubId, SubOpts) when is_binary(Topic), ?is_subid(SubId), is_map(SubOpts) -> +subscribe(Topic, SubId, SubOpts0) when is_binary(Topic), ?is_subid(SubId), is_map(SubOpts0) -> + SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0), case ets:member(?SUBOPTION, {SubPid = self(), Topic}) of false -> %% New ok = emqx_broker_helper:register_sub(SubPid, SubId), @@ -215,9 +217,8 @@ safe_publish(Msg) when is_record(Msg, message) -> catch _:Error:Stk-> ?LOG(error, "Publish error: ~0p~n~s~n~0p", - [Error, emqx_message:format(Msg), Stk]) - after - [] + [Error, emqx_message:format(Msg), Stk]), + [] end. -compile({inline, [delivery/1]}). @@ -283,7 +284,7 @@ forward(Node, To, Delivery, sync) -> dispatch(Topic, #delivery{message = Msg}) -> case subscribers(Topic) of [] -> ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]), - ok = inc_dropped_cnt(Topic), + ok = inc_dropped_cnt(Msg), {error, no_subscribers}; [Sub] -> %% optimize? dispatch(Sub, Topic, Msg); @@ -322,7 +323,8 @@ inc_dropped_cnt(Msg) -> end. -compile({inline, [subscribers/1]}). --spec(subscribers(emqx_topic:topic()) -> [pid()]). +-spec(subscribers(emqx_topic:topic() | {shard, emqx_topic:topic(), non_neg_integer()}) + -> [pid()]). subscribers(Topic) when is_binary(Topic) -> lookup_value(?SUBSCRIBER, Topic, []); subscribers(Shard = {shard, _Topic, _I}) -> @@ -367,7 +369,7 @@ subscriptions(SubId) -> undefined -> [] end. --spec(subscribed(pid(), emqx_topic:topic()) -> boolean()). +-spec(subscribed(pid() | emqx_types:subid(), emqx_topic:topic()) -> boolean()). subscribed(SubPid, Topic) when is_pid(SubPid) -> ets:member(?SUBOPTION, {SubPid, Topic}); subscribed(SubId, Topic) when ?is_subid(SubId) -> diff --git a/src/emqx_broker_helper.erl b/src/emqx_broker_helper.erl index e0452fbcb..18ebbccf2 100644 --- a/src/emqx_broker_helper.erl +++ b/src/emqx_broker_helper.erl @@ -75,7 +75,7 @@ register_sub(SubPid, SubId) when is_pid(SubPid) -> lookup_subid(SubPid) when is_pid(SubPid) -> emqx_tables:lookup_value(?SUBMON, SubPid). --spec(lookup_subpid(emqx_types:subid()) -> pid()). +-spec(lookup_subpid(emqx_types:subid()) -> maybe(pid())). lookup_subpid(SubId) -> emqx_tables:lookup_value(?SUBID, SubId). diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index ff964eff1..a5d02734b 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -62,9 +62,9 @@ %% MQTT ClientInfo clientinfo :: emqx_types:clientinfo(), %% MQTT Session - session :: emqx_session:session(), + session :: maybe(emqx_session:session()), %% Keepalive - keepalive :: emqx_keepalive:keepalive(), + keepalive :: maybe(emqx_keepalive:keepalive()), %% MQTT Will Msg will_msg :: maybe(emqx_types:message()), %% MQTT Topic Aliases @@ -108,6 +108,8 @@ -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]). +-dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}). + %%-------------------------------------------------------------------- %% Info, Attrs and Caps %%-------------------------------------------------------------------- @@ -281,14 +283,14 @@ handle_in(Packet = ?PUBLISH_PACKET(_QoS), Channel) -> handle_out(disconnect, ReasonCode, Channel) end; -handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), Channel +handle_in(?PUBACK_PACKET(PacketId, _ReasonCode, Properties), Channel = #channel{clientinfo = ClientInfo, session = Session}) -> case emqx_session:puback(PacketId, Session) of {ok, Msg, NSession} -> - ok = after_message_acked(ClientInfo, Msg), + ok = after_message_acked(ClientInfo, Msg, Properties), {ok, Channel#channel{session = NSession}}; {ok, Msg, Publishes, NSession} -> - ok = after_message_acked(ClientInfo, Msg), + ok = after_message_acked(ClientInfo, Msg, Properties), handle_out(publish, Publishes, Channel#channel{session = NSession}); {error, ?RC_PACKET_IDENTIFIER_IN_USE} -> ?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]), @@ -300,11 +302,11 @@ handle_in(?PUBACK_PACKET(PacketId, _ReasonCode), Channel {ok, Channel} end; -handle_in(?PUBREC_PACKET(PacketId, _ReasonCode), Channel +handle_in(?PUBREC_PACKET(PacketId, _ReasonCode, Properties), Channel = #channel{clientinfo = ClientInfo, session = Session}) -> case emqx_session:pubrec(PacketId, Session) of {ok, Msg, NSession} -> - ok = after_message_acked(ClientInfo, Msg), + ok = after_message_acked(ClientInfo, Msg, Properties), NChannel = Channel#channel{session = NSession}, handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel); {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} -> @@ -347,12 +349,12 @@ handle_in(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), Channel = #channel{clientinfo = ClientInfo = #{zone := Zone}}) -> case emqx_packet:check(Packet) of ok -> TopicFilters1 = parse_topic_filters(TopicFilters), - TopicFilters2 = enrich_subid(Properties, TopicFilters1), + TopicFilters2 = put_subid_in_subopts(Properties, TopicFilters1), TopicFilters3 = run_hooks('client.subscribe', [ClientInfo, Properties], TopicFilters2 ), - {ReasonCodes, NChannel} = process_subscribe(TopicFilters3, Channel), + {ReasonCodes, NChannel} = process_subscribe(TopicFilters3, Properties, Channel), case emqx_zone:get_env(Zone, acl_deny_action, ignore) =:= disconnect andalso lists:any(fun(ReasonCode) -> ReasonCode =:= ?RC_NOT_AUTHORIZED @@ -373,7 +375,7 @@ handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), [ClientInfo, Properties], parse_topic_filters(TopicFilters) ), - {ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel), + {ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Properties, Channel), handle_out(unsuback, {PacketId, ReasonCodes}, NChannel); {error, ReasonCode} -> handle_out(disconnect, ReasonCode, Channel) @@ -382,8 +384,8 @@ handle_in(Packet = ?UNSUBSCRIBE_PACKET(PacketId, Properties, TopicFilters), handle_in(?PACKET(?PINGREQ), Channel) -> {ok, ?PACKET(?PINGRESP), Channel}; -handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel) -> - NChannel = maybe_clean_will_msg(ReasonCode, Channel), +handle_in(?DISCONNECT_PACKET(ReasonCode, Properties), Channel = #channel{conninfo = ConnInfo}) -> + NChannel = maybe_clean_will_msg(ReasonCode, Channel#channel{conninfo = ConnInfo#{disconn_props => Properties}}), process_disconnect(ReasonCode, Properties, NChannel); handle_in(?AUTH_PACKET(), Channel) -> @@ -437,7 +439,7 @@ process_connect(AckProps, Channel = #channel{conninfo = #{clean_start := CleanSt %% Process Publish %%-------------------------------------------------------------------- -process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), +process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel = #channel{clientinfo = #{zone := Zone}}) -> case pipeline([fun process_alias/2, fun check_pub_alias/2, @@ -466,12 +468,23 @@ process_publish(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), handle_out(disconnect, ReasonCode, NChannel) end. -packet_to_message(Packet, #channel{conninfo = #{proto_ver := ProtoVer}, - clientinfo = ClientInfo = - #{mountpoint := MountPoint}}) -> - emqx_mountpoint:mount( - MountPoint, emqx_packet:to_message( - ClientInfo, #{proto_ver => ProtoVer}, Packet)). +packet_to_message(Packet, #channel{ + conninfo = #{proto_ver := ProtoVer}, + clientinfo = #{ + protocol := Protocol, + clientid := ClientId, + username := Username, + peerhost := PeerHost, + mountpoint := MountPoint + } + }) -> + emqx_mountpoint:mount(MountPoint, + emqx_packet:to_message( + Packet, ClientId, + #{proto_ver => ProtoVer, + protocol => Protocol, + username => Username, + peerhost => PeerHost})). do_publish(_PacketId, Msg = #message{qos = ?QOS_0}, Channel) -> _ = emqx_broker:publish(Msg), @@ -504,25 +517,26 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2}, puback_reason_code([]) -> ?RC_NO_MATCHING_SUBSCRIBERS; puback_reason_code([_|_]) -> ?RC_SUCCESS. --compile({inline, [after_message_acked/2]}). -after_message_acked(ClientInfo, Msg) -> +-compile({inline, [after_message_acked/3]}). +after_message_acked(ClientInfo, Msg, PubAckProps) -> ok = emqx_metrics:inc('messages.acked'), - emqx_hooks:run('message.acked', [ClientInfo, Msg]). + emqx_hooks:run('message.acked', [ClientInfo, + emqx_message:set_header(puback_props, PubAckProps, Msg)]). %%-------------------------------------------------------------------- %% Process Subscribe %%-------------------------------------------------------------------- --compile({inline, [process_subscribe/2]}). -process_subscribe(TopicFilters, Channel) -> - process_subscribe(TopicFilters, [], Channel). +-compile({inline, [process_subscribe/3]}). +process_subscribe(TopicFilters, SubProps, Channel) -> + process_subscribe(TopicFilters, SubProps, Channel, []). -process_subscribe([], Acc, Channel) -> +process_subscribe([], _SubProps, Channel, Acc) -> {lists:reverse(Acc), Channel}; -process_subscribe([{TopicFilter, SubOpts}|More], Acc, Channel) -> - {RC, NChannel} = do_subscribe(TopicFilter, SubOpts, Channel), - process_subscribe(More, [RC|Acc], NChannel). +process_subscribe([{TopicFilter, SubOpts}|More], SubProps, Channel, Acc) -> + {RC, NChannel} = do_subscribe(TopicFilter, SubOpts#{sub_props => SubProps}, Channel), + process_subscribe(More, SubProps, NChannel, [RC|Acc]). do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel = #channel{clientinfo = ClientInfo = #{mountpoint := MountPoint}, @@ -557,22 +571,22 @@ process_force_subscribe(Subscriptions, Channel = %% Process Unsubscribe %%-------------------------------------------------------------------- --compile({inline, [process_unsubscribe/2]}). -process_unsubscribe(TopicFilters, Channel) -> - process_unsubscribe(TopicFilters, [], Channel). +-compile({inline, [process_unsubscribe/3]}). +process_unsubscribe(TopicFilters, UnSubProps, Channel) -> + process_unsubscribe(TopicFilters, UnSubProps, Channel, []). -process_unsubscribe([], Acc, Channel) -> +process_unsubscribe([], _UnSubProps, Channel, Acc) -> {lists:reverse(Acc), Channel}; -process_unsubscribe([{TopicFilter, SubOpts}|More], Acc, Channel) -> - {RC, NChannel} = do_unsubscribe(TopicFilter, SubOpts, Channel), - process_unsubscribe(More, [RC|Acc], NChannel). +process_unsubscribe([{TopicFilter, SubOpts}|More], UnSubProps, Channel, Acc) -> + {RC, NChannel} = do_unsubscribe(TopicFilter, SubOpts#{unsub_props => UnSubProps}, Channel), + process_unsubscribe(More, UnSubProps, NChannel, [RC|Acc]). -do_unsubscribe(TopicFilter, _SubOpts, Channel = +do_unsubscribe(TopicFilter, SubOpts, Channel = #channel{clientinfo = ClientInfo = #{mountpoint := MountPoint}, session = Session}) -> TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter), - case emqx_session:unsubscribe(ClientInfo, TopicFilter1, Session) of + case emqx_session:unsubscribe(ClientInfo, TopicFilter1, SubOpts, Session) of {ok, NSession} -> {?RC_SUCCESS, Channel#channel{session = NSession}}; {error, RC} -> {RC, Channel} @@ -582,9 +596,9 @@ do_unsubscribe(TopicFilter, _SubOpts, Channel = process_force_unsubscribe(Subscriptions, Channel = #channel{clientinfo = ClientInfo = #{mountpoint := MountPoint}, session = Session}) -> - lists:foldl(fun({TopicFilter, _SubOpts}, {ReasonCodes, ChannelAcc}) -> + lists:foldl(fun({TopicFilter, SubOpts}, {ReasonCodes, ChannelAcc}) -> NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter), - case emqx_session:unsubscribe(ClientInfo, NTopicFilter, Session) of + case emqx_session:unsubscribe(ClientInfo, NTopicFilter, SubOpts, Session) of {ok, NSession} -> {ReasonCodes ++ [?RC_SUCCESS], ChannelAcc#channel{session = NSession}}; {error, ReasonCode} -> @@ -662,6 +676,7 @@ not_nacked({deliver, _Topic, Msg}) -> handle_out(connack, {?RC_SUCCESS, SP, Props}, Channel = #channel{conninfo = ConnInfo}) -> AckProps = run_fold([fun enrich_connack_caps/2, fun enrich_server_keepalive/2, + fun enrich_response_information/2, fun enrich_assigned_clientid/2 ], Props, Channel), NAckProps = run_hooks('client.connack', [ConnInfo, emqx_reason_codes:name(?RC_SUCCESS)], AckProps), @@ -806,7 +821,8 @@ return_unsuback(Packet, Channel) -> -spec(handle_call(Req :: term(), channel()) -> {reply, Reply :: term(), channel()} - | {shutdown, Reason :: term(), Reply :: term(), channel()}). + | {shutdown, Reason :: term(), Reply :: term(), channel()} + | {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}). handle_call(kick, Channel) -> Channel1 = ensure_disconnected(kicked, Channel), disconnect_and_shutdown(kicked, ok, Channel1); @@ -844,7 +860,7 @@ handle_info({subscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInf [ClientInfo, #{'Internal' => true}], parse_topic_filters(TopicFilters) ), - {_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel), + {_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, #{}, Channel), {ok, NChannel}; handle_info({force_subscribe, TopicFilters}, Channel) -> @@ -856,7 +872,7 @@ handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientI [ClientInfo, #{'Internal' => true}], parse_topic_filters(TopicFilters) ), - {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel), + {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, #{}, Channel), {ok, NChannel}; handle_info({force_unsubscribe, TopicFilters}, Channel) -> @@ -924,9 +940,6 @@ handle_timeout(_TRef, retry_delivery, case emqx_session:retry(Session) of {ok, NSession} -> {ok, clean_timer(retry_timer, Channel#channel{session = NSession})}; - {ok, Publishes, NSession} -> - NChannel = Channel#channel{session = NSession}, - handle_out(publish, Publishes, reset_timer(retry_timer, NChannel)); {ok, Publishes, Timeout, NSession} -> NChannel = Channel#channel{session = NSession}, handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel)) @@ -1001,6 +1014,7 @@ interval(will_timer, #channel{will_msg = WillMsg}) -> %% Terminate %%-------------------------------------------------------------------- +-spec(terminate(any(), channel()) -> ok). terminate(_, #channel{conn_state = idle}) -> ok; terminate(normal, Channel) -> run_terminate_hook(normal, Channel); @@ -1176,7 +1190,7 @@ enhanced_auth(?CONNECT_PACKET(#mqtt_packet_connect{ end; enhanced_auth(?AUTH_PACKET(_ReasonCode, Properties), Channel = #channel{conninfo = ConnInfo}) -> - AuthMethod = maps:get('Authentication-Method', maps:get(conn_props, ConnInfo), undefined), + AuthMethod = emqx_mqtt_props:get('Authentication-Method', emqx_mqtt_props:get(conn_props, ConnInfo, #{}), undefined), NAuthMethod = emqx_mqtt_props:get('Authentication-Method', Properties, undefined), AuthData = emqx_mqtt_props:get('Authentication-Data', Properties, undefined), case NAuthMethod =:= undefined orelse NAuthMethod =/= AuthMethod of @@ -1329,9 +1343,9 @@ check_sub_caps(TopicFilter, SubOpts, #channel{clientinfo = #{zone := Zone}}) -> %%-------------------------------------------------------------------- %% Enrich SubId -enrich_subid(#{'Subscription-Identifier' := SubId}, TopicFilters) -> +put_subid_in_subopts(#{'Subscription-Identifier' := SubId}, TopicFilters) -> [{Topic, SubOpts#{subid => SubId}} || {Topic, SubOpts} <- TopicFilters]; -enrich_subid(_Properties, TopicFilters) -> TopicFilters. +put_subid_in_subopts(_Properties, TopicFilters) -> TopicFilters. %%-------------------------------------------------------------------- %% Enrich SubOpts @@ -1380,6 +1394,16 @@ enrich_server_keepalive(AckProps, #channel{clientinfo = #{zone := Zone}}) -> Keepalive -> AckProps#{'Server-Keep-Alive' => Keepalive} end. +%%-------------------------------------------------------------------- +%% Enrich response information + +enrich_response_information(AckProps, #channel{conninfo = #{conn_props := ConnProps}, + clientinfo = #{zone := Zone}}) -> + case emqx_mqtt_props:get('Request-Response-Information', ConnProps, 0) of + 0 -> AckProps; + 1 -> AckProps#{'Response-Information' => emqx_zone:response_information(Zone)} + end. + %%-------------------------------------------------------------------- %% Enrich Assigned ClientId @@ -1490,7 +1514,7 @@ mabye_publish_will_msg(Channel = #channel{will_msg = WillMsg}) -> end. will_delay_interval(WillMsg) -> - emqx_message:get_header('Will-Delay-Interval', WillMsg, 0). + maps:get('Will-Delay-Interval', emqx_message:get_header(properties, WillMsg), 0). publish_will_msg(Msg) -> emqx_broker:publish(Msg). diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 92d54c056..a24bb8fba 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -107,7 +107,7 @@ start_link() -> register_channel(ClientId, Info = #{conninfo := ConnInfo}, Stats) -> Chan = {ClientId, ChanPid = self()}, true = ets:insert(?CHAN_INFO_TAB, {Chan, Info, Stats}), - register_channel(ClientId, ChanPid, ConnInfo); + register_channel_(ClientId, ChanPid, ConnInfo). %% @private %% @doc Register a channel with pid and conn_mod. @@ -117,7 +117,7 @@ register_channel(ClientId, Info = #{conninfo := ConnInfo}, Stats) -> %% the conn_mod first for taking up the clientid access right. %% %% Note that: It should be called on a lock transaction -register_channel(ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid) -> +register_channel_(ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid) -> Chan = {ClientId, ChanPid}, true = ets:insert(?CHAN_TAB, Chan), true = ets:insert(?CHAN_CONN_TAB, {Chan, ConnMod}), @@ -211,7 +211,7 @@ open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) -> CleanStart = fun(_) -> ok = discard_session(ClientId), Session = create_session(ClientInfo, ConnInfo), - register_channel(ClientId, Self, ConnInfo), + register_channel_(ClientId, Self, ConnInfo), {ok, #{session => Session, present => false}} end, emqx_cm_locker:trans(ClientId, CleanStart); @@ -223,13 +223,13 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> {ok, ConnMod, ChanPid, Session} -> ok = emqx_session:resume(ClientInfo, Session), Pendings = ConnMod:call(ChanPid, {takeover, 'end'}), - register_channel(ClientId, Self, ConnInfo), + register_channel_(ClientId, Self, ConnInfo), {ok, #{session => Session, present => true, pendings => Pendings}}; {error, not_found} -> Session = create_session(ClientInfo, ConnInfo), - register_channel(ClientId, Self, ConnInfo), + register_channel_(ClientId, Self, ConnInfo), {ok, #{session => Session, present => false}} end end, @@ -243,7 +243,8 @@ create_session(ClientInfo, ConnInfo) -> %% @doc Try to takeover a session. -spec(takeover_session(emqx_types:clientid()) - -> {ok, emqx_session:session()} | {error, Reason :: term()}). + -> {error, term()} + | {ok, atom(), pid(), emqx_session:session()}). takeover_session(ClientId) -> case lookup_channels(ClientId) of [] -> {error, not_found}; diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 3554ae893..e54495a71 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -103,6 +103,13 @@ -define(ENABLED(X), (X =/= undefined)). +-dialyzer({no_match, [info/2]}). +-dialyzer({nowarn_function, [ init/4 + , init_state/3 + , run_loop/2 + , system_terminate/4 + ]}). + -spec(start_link(esockd:transport(), esockd:socket(), proplists:proplist()) -> {ok, pid()}). start_link(Transport, Socket, Options) -> diff --git a/src/emqx_ctl.erl b/src/emqx_ctl.erl index 4abe0ebf3..45cf34543 100644 --- a/src/emqx_ctl.erl +++ b/src/emqx_ctl.erl @@ -159,7 +159,7 @@ format(Msg) -> format(Format, Args) -> lists:flatten(io_lib:format(Format, Args)). --spec(format_usage([cmd_usage()]) -> ok). +-spec(format_usage([cmd_usage()]) -> [string()]). format_usage(UsageList) -> lists:map( fun({CmdParams, Desc}) -> diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index 9e5f85dfc..1c27548f7 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -42,10 +42,10 @@ version => emqx_types:version() }). --opaque(parse_state() :: {none, options()} | cont_fun()). +-type(parse_state() :: {none, options()} | cont_fun()). --opaque(parse_result() :: {more, cont_fun()} - | {ok, emqx_types:packet(), binary(), parse_state()}). +-type(parse_result() :: {more, cont_fun()} + | {ok, emqx_types:packet(), binary(), parse_state()}). -type(cont_fun() :: fun((binary()) -> parse_result())). @@ -59,6 +59,8 @@ version => ?MQTT_PROTO_V4 }). +-dialyzer({no_match, [serialize_utf8_string/2]}). + %%-------------------------------------------------------------------- %% Init Parse State %%-------------------------------------------------------------------- @@ -307,7 +309,7 @@ parse_packet_id(<>) -> {PacketId, Rest}. parse_properties(Bin, Ver) when Ver =/= ?MQTT_PROTO_V5 -> - {undefined, Bin}; + {#{}, Bin}; %% TODO: version mess? parse_properties(<<>>, ?MQTT_PROTO_V5) -> {#{}, <<>>}; diff --git a/src/emqx_gen_mod.erl b/src/emqx_gen_mod.erl index e320ec877..26a3c13cb 100644 --- a/src/emqx_gen_mod.erl +++ b/src/emqx_gen_mod.erl @@ -16,21 +16,8 @@ -module(emqx_gen_mod). --ifdef(use_specs). - -callback(load(Opts :: any()) -> ok | {error, term()}). -callback(unload(State :: term()) -> term()). -callback(description() -> any()). - --else. - --export([behaviour_info/1]). - -behaviour_info(callbacks) -> - [{load, 1}, {unload, 1}]; -behaviour_info(_Other) -> - undefined. - --endif. diff --git a/src/emqx_hooks.erl b/src/emqx_hooks.erl index d079a38c3..273071d75 100644 --- a/src/emqx_hooks.erl +++ b/src/emqx_hooks.erl @@ -60,12 +60,12 @@ %% equal priority values. -type(hookpoint() :: atom()). --type(action() :: function() | mfa()). --type(filter() :: function() | mfa()). +-type(action() :: function() | {function(), [term()]} | mfargs()). +-type(filter() :: function() | mfargs()). -record(callback, { action :: action(), - filter :: filter(), + filter :: maybe(filter()), priority :: integer() }). @@ -112,7 +112,7 @@ add(HookPoint, Action, Filter, Priority) when is_integer(Priority) -> add(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}). %% @doc Unregister a callback. --spec(del(hookpoint(), action()) -> ok). +-spec(del(hookpoint(), function() | {module(), atom()}) -> ok). del(HookPoint, Action) -> gen_server:cast(?SERVER, {del, HookPoint, Action}). diff --git a/src/emqx_json.erl b/src/emqx_json.erl index 847c91b34..92da203f4 100644 --- a/src/emqx_json.erl +++ b/src/emqx_json.erl @@ -103,16 +103,13 @@ safe_decode(Json, Opts) -> , from_ejson/1 ]}). -to_ejson([[{_,_}|_]|_] = L) -> - [to_ejson(E) || E <- L]; to_ejson([{_, _}|_] = L) -> - lists:foldl( - fun({Name, Value}, Acc) -> - Acc#{Name => to_ejson(Value)} - end, #{}, L); + {[{K, to_ejson(V)} || {K, V} <- L ]}; +to_ejson(L) when is_list(L) -> + [to_ejson(E) || E <- L]; to_ejson(T) -> T. -from_ejson([{_}|_] = L) -> +from_ejson(L) when is_list(L) -> [from_ejson(E) || E <- L]; from_ejson({L}) -> [{Name, from_ejson(Value)} || {Name, Value} <- L]; diff --git a/src/emqx_logger.erl b/src/emqx_logger.erl index ebd2ac697..f250c7e93 100644 --- a/src/emqx_logger.erl +++ b/src/emqx_logger.erl @@ -221,7 +221,7 @@ trans([{eof, L} | AST], LogHeader, ResAST) -> trans([{attribute, _, module, _Mod} = M | AST], Header, ResAST) -> trans(AST, Header, [export_header_fun(), M | ResAST]); trans([{attribute, _, logger_header, Header} | AST], _, ResAST) -> - io_lib:printable_list(Header) orelse error({invalid_string, Header}), + io_lib:printable_list(Header) orelse erlang:error({invalid_string, Header}), trans(AST, Header, ResAST); trans([F | AST], LogHeader, ResAST) -> trans(AST, LogHeader, [F | ResAST]). diff --git a/src/emqx_logger_formatter.erl b/src/emqx_logger_formatter.erl index ad5c9cef7..2528a63b5 100644 --- a/src/emqx_logger_formatter.erl +++ b/src/emqx_logger_formatter.erl @@ -253,9 +253,7 @@ format_time(SysTime,#{}) {Date, _Time = {H, Mi, S}} = calendar:system_time_to_local_time(SysTime, microsecond), format_time({Date, {H, Mi, S, Ms}}). format_time({{Y, M, D}, {H, Mi, S, Ms}}) -> - io_lib:format("~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b.~3..0b", [Y, M, D, H, Mi, S, Ms]); -format_time({{Y, M, D}, {H, Mi, S}}) -> - io_lib:format("~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b", [Y, M, D, H, Mi, S]). + io_lib:format("~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b.~3..0b", [Y, M, D, H, Mi, S, Ms]). format_mfa({M,F,A},_) when is_atom(M), is_atom(F), is_integer(A) -> atom_to_list(M)++":"++atom_to_list(F)++"/"++integer_to_list(A); diff --git a/src/emqx_message.erl b/src/emqx_message.erl index f7fbd6021..decbb0c18 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -26,6 +26,8 @@ -export([ make/2 , make/3 , make/4 + , make/6 + , make/7 ]). %% Fields @@ -69,8 +71,6 @@ -export([format/1]). --type(flag() :: atom()). - -spec(make(emqx_topic:topic(), emqx_types:payload()) -> emqx_types:message()). make(Topic, Payload) -> make(undefined, Topic, Payload). @@ -95,6 +95,47 @@ make(From, QoS, Topic, Payload) when ?QOS_0 =< QoS, QoS =< ?QOS_2 -> timestamp = Now }. +-spec(make(emqx_types:clientid(), + emqx_types:qos(), + emqx_topic:topic(), + emqx_types:payload(), + emqx_types:flags(), + emqx_types:headers()) -> emqx_types:message()). +make(From, QoS, Topic, Payload, Flags, Headers) + when ?QOS_0 =< QoS, QoS =< ?QOS_2, + is_map(Flags), is_map(Headers) -> + Now = erlang:system_time(millisecond), + #message{id = emqx_guid:gen(), + qos = QoS, + from = From, + flags = Flags, + headers = Headers, + topic = Topic, + payload = Payload, + timestamp = Now + }. + +-spec(make(MsgId :: binary(), + emqx_types:clientid(), + emqx_types:qos(), + emqx_topic:topic(), + emqx_types:payload(), + emqx_types:flags(), + emqx_types:headers()) -> emqx_types:message()). +make(MsgId, From, QoS, Topic, Payload, Flags, Headers) + when ?QOS_0 =< QoS, QoS =< ?QOS_2, + is_map(Flags), is_map(Headers) -> + Now = erlang:system_time(millisecond), + #message{id = MsgId, + qos = QoS, + from = From, + flags = Flags, + headers = Headers, + topic = Topic, + payload = Payload, + timestamp = Now + }. + -spec(id(emqx_types:message()) -> maybe(binary())). id(#message{id = Id}) -> Id. @@ -126,39 +167,29 @@ clean_dup(Msg = #message{flags = Flags = #{dup := true}}) -> clean_dup(Msg) -> Msg. -spec(set_flags(map(), emqx_types:message()) -> emqx_types:message()). -set_flags(Flags, Msg = #message{flags = undefined}) when is_map(Flags) -> - Msg#message{flags = Flags}; set_flags(New, Msg = #message{flags = Old}) when is_map(New) -> Msg#message{flags = maps:merge(Old, New)}. --spec(get_flag(flag(), emqx_types:message()) -> boolean()). -get_flag(_Flag, #message{flags = undefined}) -> - false; +-spec(get_flag(emqx_types:flag(), emqx_types:message()) -> boolean()). get_flag(Flag, Msg) -> get_flag(Flag, Msg, false). -get_flag(_Flag, #message{flags = undefined}, Default) -> - Default; get_flag(Flag, #message{flags = Flags}, Default) -> maps:get(Flag, Flags, Default). -spec(get_flags(emqx_types:message()) -> maybe(map())). get_flags(#message{flags = Flags}) -> Flags. --spec(set_flag(flag(), emqx_types:message()) -> emqx_types:message()). -set_flag(Flag, Msg = #message{flags = undefined}) when is_atom(Flag) -> - Msg#message{flags = #{Flag => true}}; +-spec(set_flag(emqx_types:flag(), emqx_types:message()) -> emqx_types:message()). set_flag(Flag, Msg = #message{flags = Flags}) when is_atom(Flag) -> Msg#message{flags = maps:put(Flag, true, Flags)}. --spec(set_flag(flag(), boolean() | integer(), emqx_types:message()) +-spec(set_flag(emqx_types:flag(), boolean() | integer(), emqx_types:message()) -> emqx_types:message()). -set_flag(Flag, Val, Msg = #message{flags = undefined}) when is_atom(Flag) -> - Msg#message{flags = #{Flag => Val}}; set_flag(Flag, Val, Msg = #message{flags = Flags}) when is_atom(Flag) -> Msg#message{flags = maps:put(Flag, Val, Flags)}. --spec(unset_flag(flag(), emqx_types:message()) -> emqx_types:message()). +-spec(unset_flag(emqx_types:flag(), emqx_types:message()) -> emqx_types:message()). unset_flag(Flag, Msg = #message{flags = Flags}) -> case maps:is_key(Flag, Flags) of true -> Msg#message{flags = maps:remove(Flag, Flags)}; @@ -166,8 +197,6 @@ unset_flag(Flag, Msg = #message{flags = Flags}) -> end. -spec(set_headers(map(), emqx_types:message()) -> emqx_types:message()). -set_headers(Headers, Msg = #message{headers = undefined}) when is_map(Headers) -> - Msg#message{headers = Headers}; set_headers(New, Msg = #message{headers = Old}) when is_map(New) -> Msg#message{headers = maps:merge(Old, New)}. @@ -175,25 +204,17 @@ set_headers(New, Msg = #message{headers = Old}) when is_map(New) -> get_headers(Msg) -> Msg#message.headers. -spec(get_header(term(), emqx_types:message()) -> term()). -get_header(_Hdr, #message{headers = undefined}) -> - undefined; get_header(Hdr, Msg) -> get_header(Hdr, Msg, undefined). -spec(get_header(term(), emqx_types:message(), term()) -> term()). -get_header(_Hdr, #message{headers = undefined}, Default) -> - Default; get_header(Hdr, #message{headers = Headers}, Default) -> maps:get(Hdr, Headers, Default). -spec(set_header(term(), term(), emqx_types:message()) -> emqx_types:message()). -set_header(Hdr, Val, Msg = #message{headers = undefined}) -> - Msg#message{headers = #{Hdr => Val}}; set_header(Hdr, Val, Msg = #message{headers = Headers}) -> Msg#message{headers = maps:put(Hdr, Val, Headers)}. -spec(remove_header(term(), emqx_types:message()) -> emqx_types:message()). -remove_header(_Hdr, Msg = #message{headers = undefined}) -> - Msg; remove_header(Hdr, Msg = #message{headers = Headers}) -> case maps:is_key(Hdr, Headers) of true -> Msg#message{headers = maps:remove(Hdr, Headers)}; @@ -201,18 +222,18 @@ remove_header(Hdr, Msg = #message{headers = Headers}) -> end. -spec(is_expired(emqx_types:message()) -> boolean()). -is_expired(#message{headers = #{'Message-Expiry-Interval' := Interval}, +is_expired(#message{headers = #{properties := #{'Message-Expiry-Interval' := Interval}}, timestamp = CreatedAt}) -> elapsed(CreatedAt) > timer:seconds(Interval); is_expired(_Msg) -> false. -spec(update_expiry(emqx_types:message()) -> emqx_types:message()). -update_expiry(Msg = #message{headers = #{'Message-Expiry-Interval' := Interval}, +update_expiry(Msg = #message{headers = #{properties := Props = #{'Message-Expiry-Interval' := Interval}}, timestamp = CreatedAt}) -> case elapsed(CreatedAt) of Elapsed when Elapsed > 0 -> Interval1 = max(1, Interval - (Elapsed div 1000)), - set_header('Message-Expiry-Interval', Interval1, Msg); + set_header(properties, Props#{'Message-Expiry-Interval' => Interval1}, Msg); _ -> Msg end; update_expiry(Msg) -> Msg. @@ -229,20 +250,20 @@ to_packet(PacketId, Msg = #message{qos = QoS, headers = Headers, }, variable = #mqtt_packet_publish{topic_name = Topic, packet_id = PacketId, - properties = props(Headers) + properties = filter_pub_props(maps:get(properties, Headers, #{})) }, payload = Payload }. -props(undefined) -> undefined; -props(Headers) -> maps:with(['Payload-Format-Indicator', - 'Response-Topic', - 'Correlation-Data', - 'User-Property', - 'Subscription-Identifier', - 'Content-Type', - 'Message-Expiry-Interval' - ], Headers). +filter_pub_props(Props) -> + maps:with(['Payload-Format-Indicator', + 'Message-Expiry-Interval', + 'Response-Topic', + 'Correlation-Data', + 'User-Property', + 'Subscription-Identifier', + 'Content-Type' + ], Props). %% @doc Message to map -spec(to_map(emqx_types:message()) -> map()). @@ -267,7 +288,7 @@ to_map(#message{ }. %% @doc Message to tuple list --spec(to_list(emqx_types:message()) -> map()). +-spec(to_list(emqx_types:message()) -> list()). to_list(Msg) -> lists:zip(record_info(fields, message), tl(tuple_to_list(Msg))). @@ -279,8 +300,6 @@ format(#message{id = Id, qos = QoS, topic = Topic, from = From, flags = Flags, h io_lib:format("Message(Id=~s, QoS=~w, Topic=~s, From=~p, Flags=~s, Headers=~s)", [Id, QoS, Topic, From, format(flags, Flags), format(headers, Headers)]). -format(_, undefined) -> - ""; format(flags, Flags) -> io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]); format(headers, Headers) -> diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index d08d3ba35..88e3c91a6 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -62,11 +62,17 @@ maybe_apply(_Fun, undefined) -> undefined; maybe_apply(Fun, Arg) when is_function(Fun) -> erlang:apply(Fun, [Arg]). --spec(compose(list(F)) -> G when F :: fun((any()) -> any()), - G :: fun((any()) -> any())). +-spec(compose(list(F)) -> G + when F :: fun((any()) -> any()), + G :: fun((any()) -> any())). compose([F|More]) -> compose(F, More). --spec(compose(fun((X) -> Y), fun((Y) -> Z)) -> fun((X) -> Z)). +-spec(compose(F, G|[Gs]) -> C + when F :: fun((X1) -> X2), + G :: fun((X2) -> X3), + Gs :: [fun((Xn) -> Xn1)], + C :: fun((X1) -> Xm), + X3 :: any(), Xn :: any(), Xn1 :: any(), Xm :: any()). compose(F, G) when is_function(G) -> fun(X) -> G(F(X)) end; compose(F, [G]) -> compose(F, G); compose(F, [G|More]) -> compose(compose(F, G), More). diff --git a/src/emqx_mod_acl_internal.erl b/src/emqx_mod_acl_internal.erl index 0a4037a31..770f90c83 100644 --- a/src/emqx_mod_acl_internal.erl +++ b/src/emqx_mod_acl_internal.erl @@ -33,8 +33,6 @@ , description/0 ]). --define(MFA(M, F, A), {M, F, A}). - -type(acl_rules() :: #{publish => [emqx_access_rule:rule()], subscribe => [emqx_access_rule:rule()]}). @@ -44,11 +42,10 @@ load(_Env) -> Rules = rules_from_file(emqx:get_env(acl_file)), - emqx_hooks:add('client.check_acl', ?MFA(?MODULE, check_acl, [Rules]), -1). + emqx_hooks:add('client.check_acl', {?MODULE, check_acl, [Rules]}, -1). unload(_Env) -> - Rules = rules_from_file(emqx:get_env(acl_file)), - emqx_hooks:del('client.check_acl', ?MFA(?MODULE, check_acl, [Rules])). + emqx_hooks:del('client.check_acl', {?MODULE, check_acl}). reload(_Env) -> emqx_acl_cache:is_enabled() andalso ( diff --git a/src/emqx_mod_delayed.erl b/src/emqx_mod_delayed.erl index f9a1c4756..b4ea28e77 100644 --- a/src/emqx_mod_delayed.erl +++ b/src/emqx_mod_delayed.erl @@ -61,7 +61,7 @@ load(_Env) -> -spec(unload(list()) -> ok). unload(_Env) -> - emqx:unhook('message.publish', {?MODULE, on_message_publish, []}), + emqx:unhook('message.publish', {?MODULE, on_message_publish}), emqx_mod_sup:stop_child(?MODULE). description() -> @@ -83,21 +83,13 @@ on_message_publish(Msg = #message{id = Id, topic = <<"$delayed/", Topic/binary>> end end, PubMsg = Msg#message{topic = Topic1}, - Headers = case PubMsg#message.headers of - undefined -> #{}; - Headers0 -> Headers0 - end, - ok = store(#delayed_message{key = {PubAt, delayed_mid(Id)}, msg = PubMsg}), + Headers = PubMsg#message.headers, + ok = store(#delayed_message{key = {PubAt, Id}, msg = PubMsg}), {stop, PubMsg#message{headers = Headers#{allow_publish => false}}}; on_message_publish(Msg) -> {ok, Msg}. -%% @private -delayed_mid(undefined) -> - emqx_guid:gen(); -delayed_mid(MsgId) -> MsgId. - %%-------------------------------------------------------------------- %% Start delayed publish server %%-------------------------------------------------------------------- diff --git a/src/emqx_mod_topic_metrics.erl b/src/emqx_mod_topic_metrics.erl index 4e9b44fa8..66fac6ee4 100644 --- a/src/emqx_mod_topic_metrics.erl +++ b/src/emqx_mod_topic_metrics.erl @@ -97,14 +97,14 @@ load(_Env) -> emqx_mod_sup:start_child(?MODULE, worker), - emqx:hook('message.publish', fun ?MODULE:on_message_publish/1, []), - emqx:hook('message.dropped', fun ?MODULE:on_message_dropped/3, []), - emqx:hook('message.delivered', fun ?MODULE:on_message_delivered/2, []). + emqx:hook('message.publish', {?MODULE, on_message_publish, []}), + emqx:hook('message.dropped', {?MODULE, on_message_dropped, []}), + emqx:hook('message.delivered', {?MODULE, on_message_delivered, []}). unload(_Env) -> - emqx:unhook('message.publish', fun ?MODULE:on_message_publish/1), - emqx:unhook('message.dropped', fun ?MODULE:on_message_dropped/3), - emqx:unhook('message.delivered', fun ?MODULE:on_message_delivered/2), + emqx:unhook('message.publish', {?MODULE, on_message_publish}), + emqx:unhook('message.dropped', {?MODULE, on_message_dropped}), + emqx:unhook('message.delivered', {?MODULE, on_message_delivered}), emqx_mod_sup:stop_child(?MODULE). description() -> diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index 47f02edda..f29d59915 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -71,8 +71,9 @@ }). -spec(check_pub(emqx_types:zone(), - #{qos => emqx_types:qos(), - retain => boolean()}) + #{qos := emqx_types:qos(), + retain := boolean(), + topic := emqx_topic:topic()}) -> ok_or_error(emqx_types:reason_code())). check_pub(Zone, Flags) when is_map(Flags) -> do_check_pub(case maps:take(topic, Flags) of @@ -156,4 +157,4 @@ with_env(Zone, Key, InitFun) -> ok = emqx_zone:set_env(Zone, Key, Caps), Caps; ZoneCaps -> ZoneCaps - end. \ No newline at end of file + end. diff --git a/src/emqx_mqtt_props.erl b/src/emqx_mqtt_props.erl index 535c5b696..7acea6761 100644 --- a/src/emqx_mqtt_props.erl +++ b/src/emqx_mqtt_props.erl @@ -128,24 +128,21 @@ name(16#29) -> 'Subscription-Identifier-Available'; name(16#2A) -> 'Shared-Subscription-Available'; name(Id) -> error({unsupported_property, Id}). --spec(filter(emqx_types:packet_type(), emqx_types:properties()|list()) +-spec(filter(emqx_types:packet_type(), emqx_types:properties()) -> emqx_types:properties()). -filter(PacketType, Props) when is_map(Props) -> - maps:from_list(filter(PacketType, maps:to_list(Props))); - -filter(PacketType, Props) when ?CONNECT =< PacketType, - PacketType =< ?AUTH, - is_list(Props) -> - Filter = fun(Name) -> - case maps:find(id(Name), ?PROPS_TABLE) of - {ok, {Name, _Type, 'ALL'}} -> - true; - {ok, {Name, _Type, AllowedTypes}} -> - lists:member(PacketType, AllowedTypes); - error -> false - end - end, - [Prop || Prop = {Name, _} <- Props, Filter(Name)]. +filter(PacketType, Props) when is_map(Props), + PacketType >= ?CONNECT, + PacketType =< ?AUTH -> + F = fun(Name, _) -> + case maps:find(id(Name), ?PROPS_TABLE) of + {ok, {Name, _Type, 'ALL'}} -> + true; + {ok, {Name, _Type, AllowedTypes}} -> + lists:member(PacketType, AllowedTypes); + error -> false + end + end, + maps:filter(F, Props). -spec(validate(emqx_types:properties()) -> ok). validate(Props) when is_map(Props) -> diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index bef155e63..f77a1d98f 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -101,7 +101,7 @@ q = ?PQUEUE:new() :: pq() }). --opaque(mqueue() :: #mqueue{}). +-type(mqueue() :: #mqueue{}). -spec(init(options()) -> mqueue()). init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) -> diff --git a/src/emqx_os_mon.erl b/src/emqx_os_mon.erl index 0f6c7d597..6f10a7cfe 100644 --- a/src/emqx_os_mon.erl +++ b/src/emqx_os_mon.erl @@ -144,9 +144,6 @@ handle_info({timeout, Timer, check}, State = #{timer := Timer, NState = case emqx_vm:cpu_util() of %% TODO: should be improved? 0 -> State#{timer := undefined}; - {error, Reason} -> - ?LOG(error, "Failed to get cpu utilization: ~p", [Reason]), - ensure_check_timer(State); Busy when Busy / 100 >= CPUHighWatermark -> alarm_handler:set_alarm({cpu_high_watermark, Busy}), ensure_check_timer(State#{is_cpu_alarm_set := true}); diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index 768bf79e1..015f95e0d 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -373,34 +373,26 @@ validate_topic_filters(TopicFilters) -> emqx_topic:validate(TopicFilter) end, TopicFilters). --spec(to_message(emqx_types:clientinfo(), emqx_ypes:packet()) -> emqx_types:message()). -to_message(ClientInfo, Packet) -> - to_message(ClientInfo, #{}, Packet). +-spec(to_message(emqx_types:packet(), emqx_types:clientid()) -> emqx_types:message()). +to_message(Packet, ClientId) -> + to_message(Packet, ClientId, #{}). %% @doc Transform Publish Packet to Message. --spec(to_message(emqx_types:clientinfo(), map(), emqx_ypes:packet()) - -> emqx_types:message()). -to_message(#{protocol := Protocol, - clientid := ClientId, - username := Username, - peerhost := PeerHost - }, Headers, - #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, - retain = Retain, - qos = QoS, - dup = Dup - }, - variable = #mqtt_packet_publish{topic_name = Topic, - properties = Props - }, - payload = Payload - }) -> +-spec(to_message(emqx_types:packet(), emqx_types:clientid(), map()) -> emqx_types:message()). +to_message(#mqtt_packet{ + header = #mqtt_packet_header{ + type = ?PUBLISH, + retain = Retain, + qos = QoS, + dup = Dup}, + variable = #mqtt_packet_publish{ + topic_name = Topic, + properties = Props}, + payload = Payload + }, ClientId, Headers) -> Msg = emqx_message:make(ClientId, QoS, Topic, Payload), - Headers1 = merge_props(Headers#{protocol => Protocol, - username => Username, - peerhost => PeerHost - }, Props), - Msg#message{flags = #{dup => Dup, retain => Retain}, headers = Headers1}. + Msg#message{flags = #{dup => Dup, retain => Retain}, + headers = Headers#{properties => Props}}. -spec(will_msg(#mqtt_packet_connect{}) -> emqx_types:message()). will_msg(#mqtt_packet_connect{will_flag = false}) -> @@ -413,13 +405,8 @@ will_msg(#mqtt_packet_connect{clientid = ClientId, will_props = Props, will_payload = Payload}) -> Msg = emqx_message:make(ClientId, QoS, Topic, Payload), - Headers = merge_props(#{username => Username}, Props), - Msg#message{flags = #{dup => false, retain => Retain}, headers = Headers}. - -merge_props(Headers, undefined) -> - Headers; -merge_props(Headers, Props) -> - maps:merge(Headers, Props). + Msg#message{flags = #{dup => false, retain => Retain}, + headers = #{username => Username, properties => Props}}. %% @doc Format packet -spec(format(emqx_types:packet()) -> iolist()). @@ -497,10 +484,11 @@ format_variable(#mqtt_packet_suback{packet_id = PacketId, format_variable(#mqtt_packet_unsuback{packet_id = PacketId}) -> io_lib:format("PacketId=~p", [PacketId]); -format_variable(PacketId) when is_integer(PacketId) -> - io_lib:format("PacketId=~p", [PacketId]); +format_variable(#mqtt_packet_auth{reason_code = ReasonCode}) -> + io_lib:format("ReasonCode=~p", [ReasonCode]); -format_variable(undefined) -> undefined. +format_variable(PacketId) when is_integer(PacketId) -> + io_lib:format("PacketId=~p", [PacketId]). format_password(undefined) -> undefined; format_password(_Password) -> '******'. diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index eddad6810..faa6abae3 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -38,6 +38,11 @@ -compile(export_all). -compile(nowarn_export_all). -endif. + +-dialyzer({no_match, [ plugin_loaded/2 + , plugin_unloaded/2 + ]}). + %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- diff --git a/src/emqx_pool_sup.erl b/src/emqx_pool_sup.erl index ff6fec6b8..080cd842b 100644 --- a/src/emqx_pool_sup.erl +++ b/src/emqx_pool_sup.erl @@ -18,6 +18,8 @@ -behaviour(supervisor). +-include("types.hrl"). + -export([spec/1, spec/2]). -export([ start_link/0 @@ -46,12 +48,12 @@ spec(ChildId, Args) -> start_link() -> start_link(?POOL, random, {?POOL, start_link, []}). --spec(start_link(atom() | tuple(), atom(), mfa()) +-spec(start_link(atom() | tuple(), atom(), mfargs()) -> {ok, pid()} | {error, term()}). start_link(Pool, Type, MFA) -> start_link(Pool, Type, emqx_vm:schedulers(), MFA). --spec(start_link(atom() | tuple(), atom(), pos_integer(), mfa()) +-spec(start_link(atom() | tuple(), atom(), pos_integer(), mfargs()) -> {ok, pid()} | {error, term()}). start_link(Pool, Type, Size, MFA) -> supervisor:start_link(?MODULE, [Pool, Type, Size, MFA]). diff --git a/src/emqx_router_helper.erl b/src/emqx_router_helper.erl index 2e587c446..ba5cc5543 100644 --- a/src/emqx_router_helper.erl +++ b/src/emqx_router_helper.erl @@ -53,6 +53,8 @@ -define(ROUTING_NODE, emqx_routing_node). -define(LOCK, {?MODULE, cleanup_routes}). +-dialyzer({nowarn_function, [cleanup_routes/1]}). + %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 684589ff2..88664e028 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -63,7 +63,7 @@ ]). -export([ subscribe/4 - , unsubscribe/3 + , unsubscribe/4 ]). -export([ publish/3 @@ -123,7 +123,7 @@ created_at :: pos_integer() }). --opaque(session() :: #session{}). +-type(session() :: #session{}). -type(publish() :: {maybe(emqx_types:packet_id()), emqx_types:message()}). @@ -152,6 +152,7 @@ -define(DEFAULT_BATCH_N, 1000). + %%-------------------------------------------------------------------- %% Init a Session %%-------------------------------------------------------------------- @@ -261,13 +262,13 @@ is_subscriptions_full(#session{subscriptions = Subs, %% Client -> Broker: UNSUBSCRIBE %%-------------------------------------------------------------------- --spec(unsubscribe(emqx_types:clientinfo(), emqx_types:topic(), session()) +-spec(unsubscribe(emqx_types:clientinfo(), emqx_types:topic(), emqx_types:subopts(), session()) -> {ok, session()} | {error, emqx_types:reason_code()}). -unsubscribe(ClientInfo, TopicFilter, Session = #session{subscriptions = Subs}) -> +unsubscribe(ClientInfo, TopicFilter, UnSubOpts, Session = #session{subscriptions = Subs}) -> case maps:find(TopicFilter, Subs) of {ok, SubOpts} -> ok = emqx_broker:unsubscribe(TopicFilter), - ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, TopicFilter, SubOpts]), + ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, TopicFilter, maps:merge(SubOpts, UnSubOpts)]), {ok, Session#session{subscriptions = maps:remove(TopicFilter, Subs)}}; error -> {error, ?RC_NO_SUBSCRIPTION_EXISTED} @@ -523,7 +524,8 @@ enrich_subopts([{rap, 0}|Opts], Msg = #message{headers = #{retained := true}}, S enrich_subopts([{rap, 0}|Opts], Msg, Session) -> enrich_subopts(Opts, emqx_message:set_flag(retain, false, Msg), Session); enrich_subopts([{subid, SubId}|Opts], Msg, Session) -> - Msg1 = emqx_message:set_header('Subscription-Identifier', SubId, Msg), + Props = emqx_message:get_header(properties, Msg, #{}), + Msg1 = emqx_message:set_header(properties, Props#{'Subscription-Identifier' => SubId}, Msg), enrich_subopts(Opts, Msg1, Session). %%-------------------------------------------------------------------- @@ -615,19 +617,16 @@ resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions = -spec(replay(session()) -> {ok, replies(), session()}). replay(Session = #session{inflight = Inflight}) -> - Pubs = replay(Inflight), + Pubs = lists:map(fun({PacketId, {pubrel, _Ts}}) -> + {pubrel, PacketId}; + ({PacketId, {Msg, _Ts}}) -> + {PacketId, emqx_message:set_flag(dup, true, Msg)} + end, emqx_inflight:to_list(Inflight)), case dequeue(Session) of {ok, NSession} -> {ok, Pubs, NSession}; {ok, More, NSession} -> {ok, lists:append(Pubs, More), NSession} - end; - -replay(Inflight) -> - lists:map(fun({PacketId, {pubrel, _Ts}}) -> - {pubrel, PacketId}; - ({PacketId, {Msg, _Ts}}) -> - {PacketId, emqx_message:set_flag(dup, true, Msg)} - end, emqx_inflight:to_list(Inflight)). + end. -spec(terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok). terminate(ClientInfo, discarded, Session) -> diff --git a/src/emqx_stats.erl b/src/emqx_stats.erl index 7548867e5..a6d53cfeb 100644 --- a/src/emqx_stats.erl +++ b/src/emqx_stats.erl @@ -58,7 +58,7 @@ -record(update, {name, countdown, interval, func}). -record(state, { - timer :: reference(), + timer :: maybe(reference()), updates :: [#update{}], tick_ms :: timeout() }). @@ -159,7 +159,7 @@ setstat(Stat, Val) when is_integer(Val) -> %% @doc Set stats with max value. -spec(setstat(Stat :: atom(), MaxStat :: atom(), - Val :: pos_integer()) -> boolean()). + Val :: pos_integer()) -> ok). setstat(Stat, MaxStat, Val) when is_integer(Val) -> cast({setstat, Stat, MaxStat, Val}). diff --git a/src/emqx_sys.erl b/src/emqx_sys.erl index e2698d06a..38387b3ce 100644 --- a/src/emqx_sys.erl +++ b/src/emqx_sys.erl @@ -19,6 +19,7 @@ -behaviour(gen_server). -include("emqx.hrl"). +-include("types.hrl"). -include("logger.hrl"). -logger_header("[SYS]"). @@ -53,20 +54,12 @@ -import(emqx_topic, [systop/1]). -import(emqx_misc, [start_timer/2]). --type(timeref() :: reference()). - --type(tickeref() :: reference()). - --type(version() :: string()). - --type(sysdescr() :: string()). - -record(state, { start_time :: erlang:timestamp() - , heartbeat :: timeref() - , ticker :: tickeref() - , version :: version() - , sysdescr :: sysdescr() + , heartbeat :: maybe(reference()) + , ticker :: maybe(reference()) + , version :: binary() + , sysdescr :: binary() }). -define(APP, emqx). diff --git a/src/emqx_sys_mon.erl b/src/emqx_sys_mon.erl index 8743c7dad..0eabb1253 100644 --- a/src/emqx_sys_mon.erl +++ b/src/emqx_sys_mon.erl @@ -18,8 +18,8 @@ -behavior(gen_server). --include("logger.hrl"). -include("types.hrl"). +-include("logger.hrl"). -logger_header("[SYSMON]"). @@ -171,9 +171,11 @@ handle_partition_event({partition, {healed, _Node}}) -> suppress(Key, SuccFun, State = #{events := Events}) -> case lists:member(Key, Events) of - true -> {noreply, State}; - false -> SuccFun(), - {noreply, State#{events := [Key|Events]}} + true -> + {noreply, State}; + false -> + SuccFun(), + {noreply, State#{events := [Key|Events]}} end. procinfo(Pid) -> diff --git a/src/emqx_topic.erl b/src/emqx_topic.erl index 487b04830..cda057f06 100644 --- a/src/emqx_topic.erl +++ b/src/emqx_topic.erl @@ -21,7 +21,6 @@ , validate/1 , validate/2 , levels/1 - , triples/1 , tokens/1 , words/1 , wildcard/1 @@ -36,14 +35,12 @@ -export_type([ group/0 , topic/0 , word/0 - , triple/0 ]). -type(group() :: binary()). -type(topic() :: binary()). -type(word() :: '' | '+' | '#' | binary()). -type(words() :: list(word())). --opaque(triple() :: {root | binary(), word(), binary()}). -define(MAX_TOPIC_LEN, 4096). @@ -129,32 +126,15 @@ validate3(<>) when C == $#; C == $+; C == 0 -> validate3(<<_/utf8, Rest/binary>>) -> validate3(Rest). -%% @doc Topic to triples. --spec(triples(topic()) -> list(triple())). -triples(Topic) when is_binary(Topic) -> - triples(words(Topic), root, []). - -triples([], _Parent, Acc) -> - lists:reverse(Acc); -triples([W|Words], Parent, Acc) -> - Node = join(Parent, W), - triples(Words, Node, [{Parent, W, Node}|Acc]). - -join(root, W) -> - bin(W); -join(Parent, W) -> - <<(bin(Parent))/binary, $/, (bin(W))/binary>>. - %% @doc Prepend a topic prefix. %% Ensured to have only one / between prefix and suffix. -prepend(root, W) -> bin(W); prepend(undefined, W) -> bin(W); prepend(<<>>, W) -> bin(W); prepend(Parent0, W) -> Parent = bin(Parent0), case binary:last(Parent) of $/ -> <>; - _ -> join(Parent, W) + _ -> <> end. bin('') -> <<>>; @@ -184,6 +164,7 @@ word(<<"#">>) -> '#'; word(Bin) -> Bin. %% @doc '$SYS' Topic. +-spec(systop(atom()|string()|binary()) -> topic()). systop(Name) when is_atom(Name); is_list(Name) -> iolist_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])); systop(Name) when is_binary(Name) -> diff --git a/src/emqx_tracer.erl b/src/emqx_tracer.erl index a2112cda3..befb02b96 100644 --- a/src/emqx_tracer.erl +++ b/src/emqx_tracer.erl @@ -57,6 +57,8 @@ L =:= info orelse L =:= debug). +-dialyzer({nowarn_function, [install_trace_handler/3]}). + %%------------------------------------------------------------------------------ %% APIs %%------------------------------------------------------------------------------ @@ -68,7 +70,7 @@ trace(publish, #message{from = From, topic = Topic, payload = Payload}) emqx_logger:info(#{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY} }, "PUBLISH to ~s: ~0p", [Topic, Payload]). %% @doc Start to trace clientid or topic. --spec(start_trace(trace_who(), logger:level(), string()) -> ok | {error, term()}). +-spec(start_trace(trace_who(), logger:level() | all, string()) -> ok | {error, term()}). start_trace(Who, all, LogFile) -> start_trace(Who, debug, LogFile); start_trace(Who, Level, LogFile) -> diff --git a/src/emqx_trie.erl b/src/emqx_trie.erl index adb184aa7..4189c0270 100644 --- a/src/emqx_trie.erl +++ b/src/emqx_trie.erl @@ -33,6 +33,13 @@ -export([empty/0]). +-ifdef(TEST). +-compile(export_all). +-compile(nowarn_export_all). +-endif. + +-type(triple() :: {root | binary(), emqx_topic:word(), binary()}). + %% Mnesia tables -define(TRIE_TAB, emqx_trie). -define(TRIE_NODE_TAB, emqx_trie_node). @@ -80,7 +87,7 @@ insert(Topic) when is_binary(Topic) -> write_trie_node(TrieNode#trie_node{topic = Topic}); [] -> %% Add trie path - ok = lists:foreach(fun add_path/1, emqx_topic:triples(Topic)), + ok = lists:foreach(fun add_path/1, triples(Topic)), %% Add last node write_trie_node(#trie_node{node_id = Topic, topic = Topic}) end. @@ -102,7 +109,7 @@ delete(Topic) when is_binary(Topic) -> case mnesia:wread({?TRIE_NODE_TAB, Topic}) of [#trie_node{edge_count = 0}] -> ok = mnesia:delete({?TRIE_NODE_TAB, Topic}), - delete_path(lists:reverse(emqx_topic:triples(Topic))); + delete_path(lists:reverse(triples(Topic))); [TrieNode] -> write_trie_node(TrieNode#trie_node{topic = undefined}); [] -> ok @@ -117,6 +124,22 @@ empty() -> %% Internal functions %%-------------------------------------------------------------------- +%% @doc Topic to triples. +-spec(triples(emqx_topic:topic()) -> list(triple())). +triples(Topic) when is_binary(Topic) -> + triples(emqx_topic:words(Topic), root, []). + +triples([], _Parent, Acc) -> + lists:reverse(Acc); +triples([W|Words], Parent, Acc) -> + Node = join(Parent, W), + triples(Words, Node, [{Parent, W, Node}|Acc]). + +join(root, W) -> + emqx_topic:join([W]); +join(Parent, W) -> + emqx_topic:join([Parent, W]). + %% @private %% @doc Add a path to the trie. add_path({Node, Word, Child}) -> diff --git a/src/emqx_types.erl b/src/emqx_types.erl index 31f9ad65c..718b757a5 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -63,6 +63,9 @@ -export_type([ payload/0 , message/0 + , flag/0 + , flags/0 + , headers/0 ]). -export_type([ deliver/0 @@ -133,7 +136,7 @@ is_bridge := boolean(), is_superuser := boolean(), mountpoint := maybe(binary()), - ws_cookie := maybe(list()), + ws_cookie => maybe(list()), password => maybe(binary()), auth_result => auth_result(), anonymous => boolean(), @@ -179,6 +182,9 @@ -type(subscriber() :: {pid(), subid()}). -type(payload() :: binary() | iodata()). -type(message() :: #message{}). +-type(flag() :: atom()). +-type(flags() :: #{flag() := boolean()}). +-type(headers() :: map()). -type(banned() :: #banned{}). -type(deliver() :: {deliver, topic(), message()}). -type(delivery() :: #delivery{}). @@ -195,7 +201,7 @@ -type(caps() :: emqx_mqtt_caps:caps()). -type(attrs() :: #{atom() => term()}). -type(infos() :: #{atom() => term()}). --type(stats() :: #{atom() => non_neg_integer()|stats()}). +-type(stats() :: [{atom(), term()}]). -type(oom_policy() :: #{message_queue_len => non_neg_integer(), max_heap_size => non_neg_integer() diff --git a/src/emqx_vm.erl b/src/emqx_vm.erl index b4a36f410..539b8b91b 100644 --- a/src/emqx_vm.erl +++ b/src/emqx_vm.erl @@ -48,13 +48,13 @@ , get_port_info/1 ]). +-export([cpu_util/0]). + -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). -endif. --export([cpu_util/0]). - -define(UTIL_ALLOCATORS, [temp_alloc, eheap_alloc, binary_alloc, @@ -408,9 +408,6 @@ port_info(PortTerm, specific) -> [] end, {specific, Props}; -port_info(PortTerm, Keys) when is_list(Keys) -> - Port = transform_port(PortTerm), - [erlang:port_info(Port, Key) || Key <- Keys]; port_info(PortTerm, Key) when is_atom(Key) -> Port = transform_port(PortTerm), erlang:port_info(Port, Key). diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 1b09cf9e6..25eb8ff6c 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -63,7 +63,7 @@ %% Simulate the active_n opt active_n :: pos_integer(), %% Limiter - limiter :: emqx_limiter:limiter(), + limiter :: maybe(emqx_limiter:limiter()), %% Limit Timer limit_timer :: maybe(reference()), %% Parse State @@ -81,7 +81,7 @@ %% Idle Timeout idle_timeout :: timeout(), %% Idle Timer - idle_timer :: reference() + idle_timer :: maybe(reference()) }). -type(state() :: #state{}). @@ -95,6 +95,9 @@ -define(ENABLED(X), (X =/= undefined)). +-dialyzer({no_match, [info/2]}). +-dialyzer({nowarn_function, [websocket_init/1]}). + %%-------------------------------------------------------------------- %% Info, Stats %%-------------------------------------------------------------------- diff --git a/src/emqx_zone.erl b/src/emqx_zone.erl index 68befdfdc..311ccaa77 100644 --- a/src/emqx_zone.erl +++ b/src/emqx_zone.erl @@ -45,6 +45,7 @@ , session_expiry_interval/1 , force_gc_policy/1 , force_shutdown_policy/1 + , response_information/1 , get_env/2 , get_env/3 ]}). @@ -72,6 +73,7 @@ , session_expiry_interval/1 , force_gc_policy/1 , force_shutdown_policy/1 + , response_information/1 ]). -export([ init_gc_state/1 @@ -180,7 +182,7 @@ enable_flapping_detect(Zone) -> ignore_loop_deliver(Zone) -> get_env(Zone, ignore_loop_deliver, false). --spec(server_keepalive(zone()) -> pos_integer()). +-spec(server_keepalive(zone()) -> maybe(pos_integer())). server_keepalive(Zone) -> get_env(Zone, server_keepalive). @@ -204,6 +206,10 @@ force_gc_policy(Zone) -> force_shutdown_policy(Zone) -> get_env(Zone, force_shutdown_policy). +-spec(response_information(zone()) -> string()). +response_information(Zone) -> + get_env(Zone, response_information). + %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- diff --git a/test/emqx_SUITE.erl b/test/emqx_SUITE.erl index 24ef77753..48c2177e1 100644 --- a/test/emqx_SUITE.erl +++ b/test/emqx_SUITE.erl @@ -77,7 +77,10 @@ t_emqx_pubsub_api(_) -> ?assertEqual([self()], emqx:subscribers(Topic)), ?assertEqual([self()], emqx:subscribers(Topic1)), ?assertEqual([self()], emqx:subscribers(Topic2)), - ?assertEqual([{Topic,#{qos => 0,subid => ClientId}}, {Topic1,#{qos => 1,subid => ClientId}}, {Topic2,#{qos => 2,subid => ClientId}}], emqx:subscriptions(self())), + + ?assertEqual([{Topic, #{nl => 0, qos => 0, rap => 0, rh => 0, subid => ClientId}}, + {Topic1, #{nl => 0, qos => 1, rap => 0, rh => 0, subid => ClientId}}, + {Topic2, #{nl => 0, qos => 2, rap => 0, rh => 0, subid => ClientId}}], emqx:subscriptions(self())), ?assertEqual(true, emqx:subscribed(self(), Topic)), ?assertEqual(true, emqx:subscribed(ClientId, Topic)), ?assertEqual(true, emqx:subscribed(self(), Topic1)), diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index 2dd360759..d52132497 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -59,12 +59,18 @@ t_subopts(_) -> ?assertEqual(undefined, emqx_broker:get_subopts(<<"clientid">>, <<"topic">>)), emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 1}), timer:sleep(200), - ?assertEqual(#{qos => 1, subid => <<"clientid">>}, emqx_broker:get_subopts(self(), <<"topic">>)), - ?assertEqual(#{qos => 1, subid => <<"clientid">>}, emqx_broker:get_subopts(<<"clientid">>,<<"topic">>)), + ?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>}, + emqx_broker:get_subopts(self(), <<"topic">>)), + ?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>}, + emqx_broker:get_subopts(<<"clientid">>,<<"topic">>)), + emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 2}), - ?assertEqual(#{qos => 2, subid => <<"clientid">>}, emqx_broker:get_subopts(self(), <<"topic">>)), - ?assertEqual(true, emqx_broker:set_subopts(<<"topic">>, #{qos => 2})), - ?assertEqual(#{qos => 2, subid => <<"clientid">>}, emqx_broker:get_subopts(self(), <<"topic">>)), + ?assertEqual(#{nl => 0, qos => 2, rap => 0, rh => 0, subid => <<"clientid">>}, + emqx_broker:get_subopts(self(), <<"topic">>)), + + ?assertEqual(true, emqx_broker:set_subopts(<<"topic">>, #{qos => 0})), + ?assertEqual(#{nl => 0, qos => 0, rap => 0, rh => 0, subid => <<"clientid">>}, + emqx_broker:get_subopts(self(), <<"topic">>)), emqx_broker:unsubscribe(<<"topic">>). t_topics(_) -> @@ -91,9 +97,9 @@ t_subscribers(_) -> t_subscriptions(_) -> emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 1}), ok = timer:sleep(100), - ?assertEqual(#{qos => 1, subid => <<"clientid">>}, + ?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>}, proplists:get_value(<<"topic">>, emqx_broker:subscriptions(self()))), - ?assertEqual(#{qos => 1, subid => <<"clientid">>}, + ?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>}, proplists:get_value(<<"topic">>, emqx_broker:subscriptions(<<"clientid">>))), emqx_broker:unsubscribe(<<"topic">>). diff --git a/test/emqx_channel_SUITE.erl b/test/emqx_channel_SUITE.erl index 086ff557e..8a48b578f 100644 --- a/test/emqx_channel_SUITE.erl +++ b/test/emqx_channel_SUITE.erl @@ -58,7 +58,8 @@ end_per_suite(_Config) -> emqx_session, emqx_broker, emqx_hooks, - emqx_cm + emqx_cm, + emqx_zone ]). init_per_testcase(_TestCase, Config) -> @@ -152,6 +153,8 @@ t_handle_in_re_auth(_) -> }, {ok, [{outgoing, ?DISCONNECT_PACKET(?RC_BAD_AUTHENTICATION_METHOD)}, {close, bad_authentication_method}], _} = emqx_channel:handle_in(?AUTH_PACKET(?RC_RE_AUTHENTICATE,Properties), channel()), + {ok, [{outgoing, ?DISCONNECT_PACKET(?RC_BAD_AUTHENTICATION_METHOD)}, {close, bad_authentication_method}], _} = + emqx_channel:handle_in(?AUTH_PACKET(?RC_RE_AUTHENTICATE,Properties), channel(#{conninfo => #{proto_ver => ?MQTT_PROTO_V5, conn_props => undefined}})), {ok, [{outgoing, ?DISCONNECT_PACKET(?RC_NOT_AUTHORIZED)}, {close, not_authorized}], _} = emqx_channel:handle_in(?AUTH_PACKET(?RC_RE_AUTHENTICATE,Properties), channel(#{conninfo => #{proto_ver => ?MQTT_PROTO_V5, conn_props => Properties}})). @@ -279,7 +282,7 @@ t_handle_in_subscribe(_) -> t_handle_in_unsubscribe(_) -> ok = meck:expect(emqx_session, unsubscribe, - fun(_, _, Session) -> + fun(_, _, _, Session) -> {ok, Session} end), Channel = channel(#{conn_state => connected}), @@ -345,12 +348,12 @@ t_process_publish_qos1(_) -> t_process_subscribe(_) -> ok = meck:expect(emqx_session, subscribe, fun(_, _, _, Session) -> {ok, Session} end), TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}], - {[?RC_SUCCESS], _Channel} = emqx_channel:process_subscribe(TopicFilters, channel()). + {[?RC_SUCCESS], _Channel} = emqx_channel:process_subscribe(TopicFilters, #{}, channel()). t_process_unsubscribe(_) -> - ok = meck:expect(emqx_session, unsubscribe, fun(_, _, Session) -> {ok, Session} end), + ok = meck:expect(emqx_session, unsubscribe, fun(_, _, _, Session) -> {ok, Session} end), TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}], - {[?RC_SUCCESS], _Channel} = emqx_channel:process_unsubscribe(TopicFilters, channel()). + {[?RC_SUCCESS], _Channel} = emqx_channel:process_unsubscribe(TopicFilters, #{}, channel()). %%-------------------------------------------------------------------- %% Test cases for handle_deliver @@ -392,6 +395,27 @@ t_handle_out_connack_sucess(_) -> emqx_channel:handle_out(connack, {?RC_SUCCESS, 0, #{}}, channel()), ?assertEqual(connected, emqx_channel:info(conn_state, Channel)). +t_handle_out_connack_response_information(_) -> + ok = meck:expect(emqx_cm, open_session, + fun(true, _ClientInfo, _ConnInfo) -> + {ok, #{session => session(), present => false}} + end), + ok = meck:expect(emqx_zone, response_information, fun(_) -> test end), + IdleChannel = channel(#{conn_state => idle}), + {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, #{'Response-Information' := test})}], _} = + emqx_channel:handle_in(?CONNECT_PACKET(connpkt(#{'Request-Response-Information' => 1})), IdleChannel). + +t_handle_out_connack_not_response_information(_) -> + ok = meck:expect(emqx_cm, open_session, + fun(true, _ClientInfo, _ConnInfo) -> + {ok, #{session => session(), present => false}} + end), + ok = meck:expect(emqx_zone, response_information, fun(_) -> test end), + IdleChannel = channel(#{conn_state => idle}), + {ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, AckProps)}], _} = + emqx_channel:handle_in(?CONNECT_PACKET(connpkt(#{'Request-Response-Information' => 0})), IdleChannel), + ?assertEqual(false, maps:is_key('Response-Information', AckProps)). + t_handle_out_connack_failure(_) -> {shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _Chan} = emqx_channel:handle_out(connack, ?RC_NOT_AUTHORIZED, channel()). @@ -465,7 +489,7 @@ t_handle_info_subscribe(_) -> {ok, _Chan} = emqx_channel:handle_info({subscribe, topic_filters()}, channel()). t_handle_info_unsubscribe(_) -> - ok = meck:expect(emqx_session, unsubscribe, fun(_, _, Session) -> {ok, Session} end), + ok = meck:expect(emqx_session, unsubscribe, fun(_, _, _, Session) -> {ok, Session} end), {ok, _Chan} = emqx_channel:handle_info({unsubscribe, topic_filters()}, channel()). t_handle_info_sock_closed(_) -> @@ -541,7 +565,7 @@ t_packing_alias(_) -> ?assertEqual(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias' => 1}}}, RePacket2), {RePacket3, _} = emqx_channel:packing_alias(Packet2, NChannel2), - ?assertEqual(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"y">>, properties = undefined}}, RePacket3), + ?assertEqual(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"y">>, properties = #{}}}, RePacket3), ?assertMatch({#mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"z">>}}, _}, emqx_channel:packing_alias(#mqtt_packet{variable = #mqtt_packet_publish{topic_name = <<"z">>}}, channel())). @@ -637,14 +661,15 @@ clientinfo(InitProps) -> topic_filters() -> [{<<"+">>, ?DEFAULT_SUBOPTS}, {<<"#">>, ?DEFAULT_SUBOPTS}]. -connpkt() -> +connpkt() -> connpkt(#{}). +connpkt(Props) -> #mqtt_packet_connect{ proto_name = <<"MQTT">>, proto_ver = ?MQTT_PROTO_V4, is_bridge = false, clean_start = true, keepalive = 30, - properties = undefined, + properties = Props, clientid = <<"clientid">>, username = <<"username">>, password = <<"passwd">> diff --git a/test/emqx_frame_SUITE.erl b/test/emqx_frame_SUITE.erl index 96e744a5d..5d4e146db 100644 --- a/test/emqx_frame_SUITE.erl +++ b/test/emqx_frame_SUITE.erl @@ -20,14 +20,10 @@ -compile(nowarn_export_all). -include("emqx_mqtt.hrl"). --include_lib("proper/include/proper.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("emqx_ct_helpers/include/emqx_ct.hrl"). -%%-define(PROPTEST(F), ?assert(proper:quickcheck(F()))). --define(PROPTEST(F), ?assert(proper:quickcheck(F(), [{to_file, user}]))). - all() -> [{group, parse}, {group, connect}, @@ -49,8 +45,7 @@ groups() -> t_parse_frame_too_large ]}, {connect, [parallel], - [t_serialize_parse_connect, - t_serialize_parse_v3_connect, + [t_serialize_parse_v3_connect, t_serialize_parse_v4_connect, t_serialize_parse_v5_connect, t_serialize_parse_connect_without_clientid, @@ -134,33 +129,6 @@ t_parse_frame_too_large(_) -> ?catch_error(frame_too_large, parse_serialize(Packet, #{max_size => 512})), ?assertEqual(Packet, parse_serialize(Packet, #{max_size => 2048, version => ?MQTT_PROTO_V4})). -t_serialize_parse_connect(_) -> - ?PROPTEST(prop_serialize_parse_connect). - -prop_serialize_parse_connect() -> - ?FORALL(Opts = #{version := ProtoVer}, parse_opts(), - begin - ProtoName = proplists:get_value(ProtoVer, ?PROTOCOL_NAMES), - DefaultProps = if ProtoVer == ?MQTT_PROTO_V5 -> - #{}; - true -> undefined - end, - Packet = ?CONNECT_PACKET(#mqtt_packet_connect{ - proto_name = ProtoName, - proto_ver = ProtoVer, - clientid = <<"clientId">>, - will_qos = ?QOS_1, - will_flag = true, - will_retain = true, - will_topic = <<"will">>, - will_props = DefaultProps, - will_payload = <<"bye">>, - clean_start = true, - properties = DefaultProps - }), - ok == ?assertEqual(Packet, parse_serialize(Packet, Opts)) - end). - t_serialize_parse_v3_connect(_) -> Bin = <<16,37,0,6,77,81,73,115,100,112,3,2,0,60,0,23,109,111,115, 113,112,117, 98,47,49,48,52,53,49,45,105,77,97,99,46,108, @@ -534,9 +502,6 @@ t_serialize_parse_auth_v5(_) -> ?assertEqual(Packet, parse_serialize(Packet, #{version => ?MQTT_PROTO_V5, strict_mode => true})). -parse_opts() -> - ?LET(PropList, [{strict_mode, boolean()}, {version, range(4,5)}], maps:from_list(PropList)). - parse_serialize(Packet) -> parse_serialize(Packet, #{strict_mode => true}). diff --git a/test/emqx_listeners_SUITE.erl b/test/emqx_listeners_SUITE.erl index 3bf53e683..1472cc072 100644 --- a/test/emqx_listeners_SUITE.erl +++ b/test/emqx_listeners_SUITE.erl @@ -48,7 +48,7 @@ t_restart_listeners(_) -> ok = emqx_listeners:stop(). render_config_file() -> - Path = local_path(["etc", "emqx.conf"]), + Path = local_path(["..", "..", "..", "..", "etc", "emqx.conf"]), {ok, Temp} = file:read_file(Path), Vars0 = mustache_vars(), Vars = [{atom_to_list(N), iolist_to_binary(V)} || {N, V} <- Vars0], diff --git a/test/emqx_message_SUITE.erl b/test/emqx_message_SUITE.erl index ecac9366f..2eebdc1dc 100644 --- a/test/emqx_message_SUITE.erl +++ b/test/emqx_message_SUITE.erl @@ -86,7 +86,7 @@ t_clean_dup(_) -> ?assertNot(emqx_message:get_flag(dup, Msg2)). t_get_set_flags(_) -> - Msg = #message{id = <<"id">>, qos = ?QOS_1, flags = undefined}, + Msg = #message{id = <<"id">>, qos = ?QOS_1}, Msg1 = emqx_message:set_flags(#{retain => true}, Msg), ?assertEqual(#{retain => true}, emqx_message:get_flags(Msg1)), Msg2 = emqx_message:set_flags(#{dup => true}, Msg1), @@ -109,7 +109,7 @@ t_get_set_flag(_) -> Msg6 = emqx_message:set_flags(#{dup => true, retain => true}, Msg5), ?assert(emqx_message:get_flag(dup, Msg6)), ?assert(emqx_message:get_flag(retain, Msg6)), - Msg7 = #message{id = <<"id">>, qos = ?QOS_1, flags = undefined}, + Msg7 = #message{id = <<"id">>, qos = ?QOS_1}, Msg8 = emqx_message:set_flag(retain, Msg7), Msg9 = emqx_message:set_flag(retain, true, Msg7), ?assertEqual(#{retain => true}, emqx_message:get_flags(Msg8)), @@ -135,7 +135,7 @@ t_get_set_header(_) -> ?assertEqual(#{b => 2, c => 3}, emqx_message:get_headers(Msg4)). t_undefined_headers(_) -> - Msg = #message{id = <<"id">>, qos = ?QOS_0, headers = undefined}, + Msg = #message{id = <<"id">>, qos = ?QOS_0}, Msg1 = emqx_message:set_headers(#{a => 1, b => 2}, Msg), ?assertEqual(1, emqx_message:get_header(a, Msg1)), Msg2 = emqx_message:set_header(c, 3, Msg), @@ -144,14 +144,14 @@ t_undefined_headers(_) -> t_format(_) -> Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), io:format("~s~n", [emqx_message:format(Msg)]), - Msg1 = emqx_message:set_header('Subscription-Identifier', 1, + Msg1 = emqx_message:set_header(properties, #{'Subscription-Identifier' => 1}, emqx_message:set_flag(dup, Msg)), io:format("~s~n", [emqx_message:format(Msg1)]). t_is_expired(_) -> Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), ?assertNot(emqx_message:is_expired(Msg)), - Msg1 = emqx_message:set_headers(#{'Message-Expiry-Interval' => 1}, Msg), + Msg1 = emqx_message:set_headers(#{properties => #{'Message-Expiry-Interval' => 1}}, Msg), timer:sleep(500), ?assertNot(emqx_message:is_expired(Msg1)), timer:sleep(600), @@ -159,7 +159,8 @@ t_is_expired(_) -> timer:sleep(1000), Msg = emqx_message:update_expiry(Msg), Msg2 = emqx_message:update_expiry(Msg1), - ?assertEqual(1, emqx_message:get_header('Message-Expiry-Interval', Msg2)). + Props = emqx_message:get_header(properties, Msg2), + ?assertEqual(1, maps:get('Message-Expiry-Interval', Props)). % t_to_list(_) -> % error('TODO'). @@ -172,7 +173,7 @@ t_to_packet(_) -> }, variable = #mqtt_packet_publish{topic_name = <<"topic">>, packet_id = 10, - properties = undefined + properties = #{} }, payload = <<"payload">> }, @@ -193,7 +194,7 @@ t_to_packet_with_props(_) -> payload = <<"payload">> }, Msg = emqx_message:make(<<"clientid">>, ?QOS_0, <<"topic">>, <<"payload">>), - Msg1 = emqx_message:set_header('Subscription-Identifier', 1, Msg), + Msg1 = emqx_message:set_header(properties, #{'Subscription-Identifier' => 1}, Msg), ?assertEqual(Pkt, emqx_message:to_packet(10, Msg1)). t_to_map(_) -> @@ -201,8 +202,8 @@ t_to_map(_) -> List = [{id, emqx_message:id(Msg)}, {qos, ?QOS_1}, {from, <<"clientid">>}, - {flags, undefined}, - {headers, undefined}, + {flags, #{}}, + {headers, #{}}, {topic, <<"topic">>}, {payload, <<"payload">>}, {timestamp, emqx_message:timestamp(Msg)}], diff --git a/test/emqx_packet_SUITE.erl b/test/emqx_packet_SUITE.erl index bada2d85a..00ab6e0c5 100644 --- a/test/emqx_packet_SUITE.erl +++ b/test/emqx_packet_SUITE.erl @@ -85,7 +85,6 @@ t_connect_info(_) -> will_retain = true, will_qos = ?QOS_2, will_topic = <<"topic">>, - will_props = undefined, will_payload = <<"payload">> }, ?assertEqual(<<"MQTT">>, emqx_packet:info(proto_name, ConnPkt)), @@ -96,9 +95,9 @@ t_connect_info(_) -> ?assertEqual(?QOS_2, emqx_packet:info(will_qos, ConnPkt)), ?assertEqual(true, emqx_packet:info(will_retain, ConnPkt)), ?assertEqual(0, emqx_packet:info(keepalive, ConnPkt)), - ?assertEqual(undefined, emqx_packet:info(properties, ConnPkt)), + ?assertEqual(#{}, emqx_packet:info(properties, ConnPkt)), ?assertEqual(<<"clientid">>, emqx_packet:info(clientid, ConnPkt)), - ?assertEqual(undefined, emqx_packet:info(will_props, ConnPkt)), + ?assertEqual(#{}, emqx_packet:info(will_props, ConnPkt)), ?assertEqual(<<"topic">>, emqx_packet:info(will_topic, ConnPkt)), ?assertEqual(<<"payload">>, emqx_packet:info(will_payload, ConnPkt)), ?assertEqual(<<"username">>, emqx_packet:info(username, ConnPkt)), @@ -108,54 +107,54 @@ t_connack_info(_) -> AckPkt = #mqtt_packet_connack{ack_flags = 0, reason_code = 0}, ?assertEqual(0, emqx_packet:info(ack_flags, AckPkt)), ?assertEqual(0, emqx_packet:info(reason_code, AckPkt)), - ?assertEqual(undefined, emqx_packet:info(properties, AckPkt)). + ?assertEqual(#{}, emqx_packet:info(properties, AckPkt)). t_publish_info(_) -> PubPkt = #mqtt_packet_publish{topic_name = <<"t">>, packet_id = 1}, ?assertEqual(1, emqx_packet:info(packet_id, PubPkt)), ?assertEqual(<<"t">>, emqx_packet:info(topic_name, PubPkt)), - ?assertEqual(undefined, emqx_packet:info(properties, PubPkt)). + ?assertEqual(#{}, emqx_packet:info(properties, PubPkt)). t_puback_info(_) -> AckPkt = #mqtt_packet_puback{packet_id = 1, reason_code = 0}, ?assertEqual(1, emqx_packet:info(packet_id, AckPkt)), ?assertEqual(0, emqx_packet:info(reason_code, AckPkt)), - ?assertEqual(undefined, emqx_packet:info(properties, AckPkt)). + ?assertEqual(#{}, emqx_packet:info(properties, AckPkt)). t_subscribe_info(_) -> TopicFilters = [{<<"t/#">>, #{}}], SubPkt = #mqtt_packet_subscribe{packet_id = 1, topic_filters = TopicFilters}, ?assertEqual(1, emqx_packet:info(packet_id, SubPkt)), - ?assertEqual(undefined, emqx_packet:info(properties, SubPkt)), + ?assertEqual(#{}, emqx_packet:info(properties, SubPkt)), ?assertEqual(TopicFilters, emqx_packet:info(topic_filters, SubPkt)). t_suback_info(_) -> SubackPkt = #mqtt_packet_suback{packet_id = 1, reason_codes = [0]}, ?assertEqual(1, emqx_packet:info(packet_id, SubackPkt)), - ?assertEqual(undefined, emqx_packet:info(properties, SubackPkt)), + ?assertEqual(#{}, emqx_packet:info(properties, SubackPkt)), ?assertEqual([0], emqx_packet:info(reason_codes, SubackPkt)). t_unsubscribe_info(_) -> UnsubPkt = #mqtt_packet_unsubscribe{packet_id = 1, topic_filters = [<<"t/#">>]}, ?assertEqual(1, emqx_packet:info(packet_id, UnsubPkt)), - ?assertEqual(undefined, emqx_packet:info(properties, UnsubPkt)), + ?assertEqual(#{}, emqx_packet:info(properties, UnsubPkt)), ?assertEqual([<<"t/#">>], emqx_packet:info(topic_filters, UnsubPkt)). t_unsuback_info(_) -> AckPkt = #mqtt_packet_unsuback{packet_id = 1, reason_codes = [0]}, ?assertEqual(1, emqx_packet:info(packet_id, AckPkt)), ?assertEqual([0], emqx_packet:info(reason_codes, AckPkt)), - ?assertEqual(undefined, emqx_packet:info(properties, AckPkt)). + ?assertEqual(#{}, emqx_packet:info(properties, AckPkt)). t_disconnect_info(_) -> DisconnPkt = #mqtt_packet_disconnect{reason_code = 0}, ?assertEqual(0, emqx_packet:info(reason_code, DisconnPkt)), - ?assertEqual(undefined, emqx_packet:info(properties, DisconnPkt)). + ?assertEqual(#{}, emqx_packet:info(properties, DisconnPkt)). t_auth_info(_) -> AuthPkt = #mqtt_packet_auth{reason_code = 0}, ?assertEqual(0, emqx_packet:info(reason_code, AuthPkt)), - ?assertEqual(undefined, emqx_packet:info(properties, AuthPkt)). + ?assertEqual(#{}, emqx_packet:info(properties, AuthPkt)). t_set_props(_) -> Pkts = [#mqtt_packet_connect{}, #mqtt_packet_connack{}, #mqtt_packet_publish{}, @@ -245,6 +244,7 @@ t_from_to_message(_) -> ExpectedMsg1 = emqx_message:set_flags(#{dup => false, retain => false}, ExpectedMsg), ExpectedMsg2 = emqx_message:set_headers(#{peerhost => {127,0,0,1}, protocol => mqtt, + properties => #{}, username => <<"test">> }, ExpectedMsg1), Pkt = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, @@ -255,10 +255,10 @@ t_from_to_message(_) -> packet_id = 10, properties = #{}}, payload = <<"payload">>}, - MsgFromPkt = emqx_packet:to_message(#{protocol => mqtt, - clientid => <<"clientid">>, - username => <<"test">>, - peerhost => {127,0,0,1}}, Pkt), + MsgFromPkt = emqx_packet:to_message(Pkt, <<"clientid">>, + #{protocol => mqtt, + username => <<"test">>, + peerhost => {127,0,0,1}}), ?assertEqual(ExpectedMsg2, MsgFromPkt#message{id = emqx_message:id(ExpectedMsg), timestamp = emqx_message:timestamp(ExpectedMsg) }). @@ -283,7 +283,6 @@ t_will_msg(_) -> will_retain = true, will_qos = ?QOS_2, will_topic = <<"topic">>, - will_props = undefined, will_payload = <<"payload">> }, Msg2 = emqx_packet:will_msg(Pkt2), @@ -297,7 +296,6 @@ t_format(_) -> will_retain = true, will_qos = ?QOS_2, will_topic = <<"topic">>, - will_props = undefined, will_payload = <<"payload">>}))]), io:format("~s", [emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{password = password}))]), io:format("~s", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]), diff --git a/test/emqx_reason_codes_SUITE.erl b/test/emqx_reason_codes_SUITE.erl index 60e29885a..027d6d85f 100644 --- a/test/emqx_reason_codes_SUITE.erl +++ b/test/emqx_reason_codes_SUITE.erl @@ -20,7 +20,6 @@ -compile(nowarn_export_all). -include("emqx_mqtt.hrl"). --include_lib("proper/include/proper.hrl"). -include_lib("eunit/include/eunit.hrl"). all() -> emqx_ct:all(?MODULE). @@ -29,124 +28,3 @@ t_frame_error(_) -> ?assertEqual(?RC_PACKET_TOO_LARGE, emqx_reason_codes:frame_error(frame_too_large)), ?assertEqual(?RC_MALFORMED_PACKET, emqx_reason_codes:frame_error(bad_packet_id)), ?assertEqual(?RC_MALFORMED_PACKET, emqx_reason_codes:frame_error(bad_qos)). - -t_prop_name_text(_) -> - ?assert(proper:quickcheck(prop_name_text(), prop_name_text(opts))). - -t_prop_compat(_) -> - ?assert(proper:quickcheck(prop_compat(), prop_compat(opts))). - -t_prop_connack_error(_) -> - ?assert(proper:quickcheck(prop_connack_error(), default_opts([]))). - -prop_name_text(opts) -> - default_opts([{numtests, 1000}]). - -prop_name_text() -> - ?FORALL(UnionArgs, union_args(), - is_atom(apply_fun(name, UnionArgs)) andalso - is_binary(apply_fun(text, UnionArgs))). - -prop_compat(opts) -> - default_opts([{numtests, 512}]). - -prop_compat() -> - ?FORALL(CompatArgs, compat_args(), - begin - Result = apply_fun(compat, CompatArgs), - is_number(Result) orelse Result =:= undefined - end). - -prop_connack_error() -> - ?FORALL(CONNACK_ERROR_ARGS, connack_error_args(), - is_integer(apply_fun(connack_error, CONNACK_ERROR_ARGS))). - -%%-------------------------------------------------------------------- -%% Helper -%%-------------------------------------------------------------------- - -default_opts() -> - default_opts([]). - -default_opts(AdditionalOpts) -> - [{to_file, user} | AdditionalOpts]. - -apply_fun(Fun, Args) -> - apply(emqx_reason_codes, Fun, Args). - -%%-------------------------------------------------------------------- -%% Generator -%%-------------------------------------------------------------------- - -union_args() -> - frequency([{6, [real_mqttv3_rc(), mqttv3_version()]}, - {43, [real_mqttv5_rc(), mqttv5_version()]}]). - -compat_args() -> - frequency([{18, [connack, compat_rc()]}, - {2, [suback, compat_rc()]}, - {1, [unsuback, compat_rc()]}]). - -connack_error_args() -> - [frequency([{10, connack_error()}, - {1, unexpected_connack_error()}])]. - -connack_error() -> - oneof([client_identifier_not_valid, - bad_username_or_password, - bad_clientid_or_password, - username_or_password_undefined, - password_error, - not_authorized, - server_unavailable, - server_busy, - banned, - bad_authentication_method]). - -unexpected_connack_error() -> - oneof([who_knows]). - - -real_mqttv3_rc() -> - frequency([{6, mqttv3_rc()}, - {1, unexpected_rc()}]). - -real_mqttv5_rc() -> - frequency([{43, mqttv5_rc()}, - {2, unexpected_rc()}]). - -compat_rc() -> - frequency([{95, ?SUCHTHAT(RC , mqttv5_rc(), RC >= 16#80 orelse RC =< 2)}, - {5, unexpected_rc()}]). - -mqttv3_rc() -> - oneof(mqttv3_rcs()). - -mqttv5_rc() -> - oneof(mqttv5_rcs()). - -unexpected_rc() -> - oneof(unexpected_rcs()). - -mqttv3_rcs() -> - [0, 1, 2, 3, 4, 5]. - -mqttv5_rcs() -> - [16#00, 16#01, 16#02, 16#04, 16#10, 16#11, 16#18, 16#19, - 16#80, 16#81, 16#82, 16#83, 16#84, 16#85, 16#86, 16#87, - 16#88, 16#89, 16#8A, 16#8B, 16#8C, 16#8D, 16#8E, 16#8F, - 16#90, 16#91, 16#92, 16#93, 16#94, 16#95, 16#96, 16#97, - 16#98, 16#99, 16#9A, 16#9B, 16#9C, 16#9D, 16#9E, 16#9F, - 16#A0, 16#A1, 16#A2]. - -unexpected_rcs() -> - ReasonCodes = mqttv3_rcs() ++ mqttv5_rcs(), - Unexpected = lists:seq(0, 16#FF) -- ReasonCodes, - lists:sublist(Unexpected, 5). - -mqttv5_version() -> - ?MQTT_PROTO_V5. - -mqttv3_version() -> - oneof([?MQTT_PROTO_V3, ?MQTT_PROTO_V4]). - diff --git a/test/emqx_rpc_SUITE.erl b/test/emqx_rpc_SUITE.erl deleted file mode 100644 index cd0ceb93c..000000000 --- a/test/emqx_rpc_SUITE.erl +++ /dev/null @@ -1,161 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_rpc_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("proper/include/proper.hrl"). --include_lib("eunit/include/eunit.hrl"). - -all() -> emqx_ct:all(?MODULE). - -t_prop_rpc(_) -> - ok = load(), - Opts = [{to_file, user}, {numtests, 10}], - {ok, _Apps} = application:ensure_all_started(gen_rpc), - ok = application:set_env(gen_rpc, call_receive_timeout, 1), - ok = emqx_logger:set_log_level(emergency), - ?assert(proper:quickcheck(prop_node(), Opts)), - ?assert(proper:quickcheck(prop_node_with_key(), Opts)), - ?assert(proper:quickcheck(prop_nodes(), Opts)), - ?assert(proper:quickcheck(prop_nodes_with_key(), Opts)), - ok = application:stop(gen_rpc), - ok = unload(). - -prop_node() -> - ?FORALL(Node, nodename(), - begin - ?assert(emqx_rpc:cast(Node, erlang, system_time, [])), - case emqx_rpc:call(Node, erlang, system_time, []) of - {badrpc, _Reason} -> true; - Delivery when is_integer(Delivery) -> true; - _Other -> false - end - end). - -prop_node_with_key() -> - ?FORALL({Node, Key}, nodename_with_key(), - begin - ?assert(emqx_rpc:cast(Key, Node, erlang, system_time, [])), - case emqx_rpc:call(Key, Node, erlang, system_time, []) of - {badrpc, _Reason} -> true; - Delivery when is_integer(Delivery) -> true; - _Other -> false - end - end). - -prop_nodes() -> - ?FORALL(Nodes, nodesname(), - begin - case emqx_rpc:multicall(Nodes, erlang, system_time, []) of - {badrpc, _Reason} -> true; - {RealResults, RealBadNodes} - when is_list(RealResults); - is_list(RealBadNodes) -> - true; - _Other -> false - end - end). - -prop_nodes_with_key() -> - ?FORALL({Nodes, Key}, nodesname_with_key(), - begin - case emqx_rpc:multicall(Key, Nodes, erlang, system_time, []) of - {badrpc, _Reason} -> true; - {RealResults, RealBadNodes} - when is_list(RealResults); - is_list(RealBadNodes) -> - true; - _Other -> false - end - end). - -%%-------------------------------------------------------------------- -%% helper -%%-------------------------------------------------------------------- - -load() -> - ok = meck:new(gen_rpc, [passthrough, no_history]), - ok = meck:expect(gen_rpc, multicall, - fun(Nodes, Mod, Fun, Args) -> - gen_rpc:multicall(Nodes, Mod, Fun, Args, 1) - end). - -unload() -> - ok = meck:unload(gen_rpc). - -%%-------------------------------------------------------------------- -%% Generator -%%-------------------------------------------------------------------- - -nodename() -> - ?LET({NodePrefix, HostName}, - {node_prefix(), hostname()}, - begin - Node = NodePrefix ++ "@" ++ HostName, - list_to_atom(Node) - end). - -nodename_with_key() -> - ?LET({NodePrefix, HostName, Key}, - {node_prefix(), hostname(), choose(0, 10)}, - begin - Node = NodePrefix ++ "@" ++ HostName, - {list_to_atom(Node), Key} - end). - -nodesname() -> - oneof([list(nodename()), ['emqxct@127.0.0.1']]). - -nodesname_with_key() -> - oneof([{list(nodename()), choose(0, 10)}, {['emqxct@127.0.0.1'], 1}]). - -node_prefix() -> - oneof(["emqxct", text_like()]). - -text_like() -> - ?SUCHTHAT(Text, list(range($a, $z)), (length(Text) =< 5 andalso length(Text) > 0)). - -hostname() -> - oneof([ipv4_address(), ipv6_address(), "127.0.0.1", "localhost"]). - -ipv4_address() -> - ?LET({Num1, Num2, Num3, Num4}, - { choose(0, 255) - , choose(0, 255) - , choose(0, 255) - , choose(0, 255)}, - make_ip([Num1, Num2, Num3, Num4], ipv4)). - -ipv6_address() -> - ?LET({Num1, Num2, Num3, Num4, Num5, Num6}, - { choose(0, 65535) - , choose(0, 65535) - , choose(0, 65535) - , choose(0, 65535) - , choose(0, 65535) - , choose(0, 65535)}, - make_ip([Num1, Num2, Num3, Num4, Num5, Num6], ipv6)). - - -make_ip(NumList, ipv4) when is_list(NumList) -> - string:join([integer_to_list(Num) || Num <- NumList], "."); -make_ip(NumList, ipv6) when is_list(NumList) -> - string:join([integer_to_list(Num) || Num <- NumList], ":"); -make_ip(_List, _protocol) -> - "127.0.0.1". diff --git a/test/emqx_session_SUITE.erl b/test/emqx_session_SUITE.erl index d8710eb58..dd9611b71 100644 --- a/test/emqx_session_SUITE.erl +++ b/test/emqx_session_SUITE.erl @@ -20,7 +20,6 @@ -compile(nowarn_export_all). -include("emqx_mqtt.hrl"). --include_lib("proper/include/proper.hrl"). -include_lib("eunit/include/eunit.hrl"). all() -> emqx_ct:all(?MODULE). @@ -116,9 +115,9 @@ t_is_subscriptions_full_true(_) -> t_unsubscribe(_) -> ok = meck:expect(emqx_broker, unsubscribe, fun(_) -> ok end), Session = session(#{subscriptions => #{<<"#">> => subopts()}}), - {ok, Session1} = emqx_session:unsubscribe(clientinfo(), <<"#">>, Session), + {ok, Session1} = emqx_session:unsubscribe(clientinfo(), <<"#">>, #{}, Session), {error, ?RC_NO_SUBSCRIPTION_EXISTED} = - emqx_session:unsubscribe(clientinfo(), <<"#">>, Session1). + emqx_session:unsubscribe(clientinfo(), <<"#">>, #{}, Session1). t_publish_qos0(_) -> ok = meck:expect(emqx_broker, publish, fun(_) -> [] end), diff --git a/test/emqx_sys_SUITE.erl b/test/emqx_sys_SUITE.erl index 64193ef9e..0814f8b00 100644 --- a/test/emqx_sys_SUITE.erl +++ b/test/emqx_sys_SUITE.erl @@ -19,16 +19,8 @@ -compile(export_all). -compile(nowarn_export_all). --include_lib("proper/include/proper.hrl"). -include_lib("eunit/include/eunit.hrl"). --define(mock_modules, - [ emqx_metrics - , emqx_stats - , emqx_broker - , ekka_mnesia - ]). - all() -> emqx_ct:all(?MODULE). init_per_suite(Config) -> @@ -66,95 +58,3 @@ t_uptime(_) -> % t_info(_) -> % error('TODO'). - -t_prop_sys(_) -> - Opts = [{numtests, 100}, {to_file, user}], - ok = load(?mock_modules), - ?assert(proper:quickcheck(prop_sys(), Opts)), - ok = unload(?mock_modules). - -prop_sys() -> - ?FORALL(Cmds, commands(?MODULE), - begin - {ok, _Pid} = emqx_sys:start_link(), - {History, State, Result} = run_commands(?MODULE, Cmds), - ok = emqx_sys:stop(), - ?WHENFAIL(io:format("History: ~p\nState: ~p\nResult: ~p\n", - [History,State,Result]), - aggregate(command_names(Cmds), true)) - end). - -load(Modules) -> - [mock(Module) || Module <- Modules], - ok. - -unload(Modules) -> - lists:foreach(fun(Module) -> - ok = meck:unload(Module) - end, Modules). - -mock(Module) -> - ok = meck:new(Module, [passthrough, no_history]), - do_mock(Module). - -do_mock(emqx_broker) -> - meck:expect(emqx_broker, publish, - fun(Msg) -> {node(), <<"test">>, Msg} end), - meck:expect(emqx_broker, safe_publish, - fun(Msg) -> {node(), <<"test">>, Msg} end); -do_mock(emqx_stats) -> - meck:expect(emqx_stats, getstats, fun() -> [0] end); -do_mock(ekka_mnesia) -> - meck:expect(ekka_mnesia, running_nodes, fun() -> [node()] end); -do_mock(emqx_metrics) -> - meck:expect(emqx_metrics, all, fun() -> [{hello, 3}] end). - -unmock() -> - meck:unload(emqx_broker). - -%%%%%%%%%%%%% -%%% MODEL %%% -%%%%%%%%%%%%% -%% @doc Initial model value at system start. Should be deterministic. -initial_state() -> - #{}. - -%% @doc List of possible commands to run against the system -command(_State) -> - oneof([{call, emqx_sys, info, []}, - {call, emqx_sys, version, []}, - {call, emqx_sys, uptime, []}, - {call, emqx_sys, datetime, []}, - {call, emqx_sys, sysdescr, []}, - {call, emqx_sys, sys_interval, []}, - {call, emqx_sys, sys_heatbeat_interval, []}, - %------------ unexpected message ----------------------% - {call, emqx_sys, handle_call, [emqx_sys, other, state]}, - {call, emqx_sys, handle_cast, [emqx_sys, other]}, - {call, emqx_sys, handle_info, [info, state]} - ]). - -precondition(_State, {call, _Mod, _Fun, _Args}) -> - timer:sleep(1), - true. - -postcondition(_State, {call, emqx_sys, info, []}, Info) -> - is_list(Info) andalso length(Info) =:= 4; -postcondition(_State, {call, emqx_sys, version, []}, Version) -> - is_list(Version); -postcondition(_State, {call, emqx_sys, uptime, []}, Uptime) -> - is_list(Uptime); -postcondition(_State, {call, emqx_sys, datetime, []}, Datetime) -> - is_list(Datetime); -postcondition(_State, {call, emqx_sys, sysdescr, []}, Sysdescr) -> - is_list(Sysdescr); -postcondition(_State, {call, emqx_sys, sys_interval, []}, SysInterval) -> - is_integer(SysInterval) andalso SysInterval > 0; -postcondition(_State, {call, emqx_sys, sys_heartbeat_interval, []}, SysHeartInterval) -> - is_integer(SysHeartInterval) andalso SysHeartInterval > 0; -postcondition(_State, {call, _Mod, _Fun, _Args}, _Res) -> - true. - -next_state(State, _Res, {call, _Mod, _Fun, _Args}) -> - NewState = State, - NewState. diff --git a/test/emqx_topic_SUITE.erl b/test/emqx_topic_SUITE.erl index 707e4a6aa..89b3b11af 100644 --- a/test/emqx_topic_SUITE.erl +++ b/test/emqx_topic_SUITE.erl @@ -26,7 +26,6 @@ [ wildcard/1 , match/2 , validate/1 - , triples/1 , prepend/2 , join/1 , words/1 @@ -143,18 +142,7 @@ t_sigle_level_validate(_) -> true = validate({filter, <<"sport/+/player1">>}), ok = ?catch_error(topic_invalid_char, validate({filter, <<"sport+">>})). -t_triples(_) -> - Triples = [{root,<<"a">>,<<"a">>}, - {<<"a">>,<<"b">>,<<"a/b">>}, - {<<"a/b">>,<<"c">>,<<"a/b/c">>}], - ?assertEqual(Triples, triples(<<"a/b/c">>)). - -t_triples_perf(_) -> - Topic = <<"/abkc/19383/192939/akakdkkdkak/xxxyyuya/akakak">>, - ok = bench('triples/1', fun emqx_topic:triples/1, [Topic]). - t_prepend(_) -> - ?assertEqual(<<"a/b/c">>, prepend(root, <<"a/b/c">>)), ?assertEqual(<<"ab">>, prepend(undefined, <<"ab">>)), ?assertEqual(<<"a/b">>, prepend(<<>>, <<"a/b">>)), ?assertEqual(<<"x/a/b">>, prepend("x/", <<"a/b">>)), diff --git a/test/emqx_trie_SUITE.erl b/test/emqx_trie_SUITE.erl index 5324829c9..f1db3813c 100644 --- a/test/emqx_trie_SUITE.erl +++ b/test/emqx_trie_SUITE.erl @@ -141,6 +141,12 @@ t_delete3(_) -> end, ?assertEqual({atomic, {[], []}}, trans(Fun)). +t_triples(_) -> + Triples = [{root,<<"a">>,<<"a">>}, + {<<"a">>,<<"b">>,<<"a/b">>}, + {<<"a/b">>,<<"c">>,<<"a/b/c">>}], + ?assertEqual(Triples, emqx_trie:triples(<<"a/b/c">>)). + clear_tables() -> lists:foreach(fun mnesia:clear_table/1, ?TRIE_TABS). diff --git a/test/emqx_vm_SUITE.erl b/test/emqx_vm_SUITE.erl index acc75e99a..19545f0ea 100644 --- a/test/emqx_vm_SUITE.erl +++ b/test/emqx_vm_SUITE.erl @@ -80,8 +80,7 @@ t_get_port_info(_Config) -> {ok, Sock} = gen_tcp:connect("localhost", 5678, [binary, {packet, 0}]), emqx_vm:get_port_info(), ok = gen_tcp:close(Sock), - [Port | _] = erlang:ports(), - [{connected, _}, {name, _}] = emqx_vm:port_info(Port, [connected, name]). + [Port | _] = erlang:ports(). t_transform_port(_Config) -> [Port | _] = erlang:ports(), diff --git a/test/emqx_ws_connection_SUITE.erl b/test/emqx_ws_connection_SUITE.erl index 09c9d5dd7..eccdcd677 100644 --- a/test/emqx_ws_connection_SUITE.erl +++ b/test/emqx_ws_connection_SUITE.erl @@ -304,7 +304,7 @@ t_parse_incoming(_) -> St = ?ws_conn:parse_incoming(<<48,3>>, st()), St1 = ?ws_conn:parse_incoming(<<0,1,116>>, St), Packet = ?PUBLISH_PACKET(?QOS_0, <<"t">>, undefined, <<>>), - [{incoming, Packet}] = ?ws_conn:info(postponed, St1). + ?assertMatch([{incoming, Packet}], ?ws_conn:info(postponed, St1)). t_parse_incoming_frame_error(_) -> St = ?ws_conn:parse_incoming(<<3,2,1,0>>, st()), diff --git a/test/emqx_base62_SUITE.erl b/test/props/prop_emqx_base62.erl similarity index 79% rename from test/emqx_base62_SUITE.erl rename to test/props/prop_emqx_base62.erl index eb8ac99a5..4bae49524 100644 --- a/test/emqx_base62_SUITE.erl +++ b/test/props/prop_emqx_base62.erl @@ -14,24 +14,14 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_base62_SUITE). - --compile(export_all). --compile(nowarn_export_all). +-module(prop_emqx_base62). -include_lib("proper/include/proper.hrl"). --include_lib("eunit/include/eunit.hrl"). -all() -> emqx_ct:all(?MODULE). +%%-------------------------------------------------------------------- +%% Properties +%%-------------------------------------------------------------------- -t_proper_base62(_) -> - Opts = [{numtests, 100}, {to_file, user}], - ?assert(proper:quickcheck(prop_symmetric(), Opts)), - ?assert(proper:quickcheck(prop_size(), Opts)). - -%%%%%%%%%%%%%%%%%% -%%% Properties %%% -%%%%%%%%%%%%%%%%%% prop_symmetric() -> ?FORALL(Data, raw_data(), begin @@ -46,9 +36,10 @@ prop_size() -> base62_size(Data, Encoded) end). -%%%%%%%%%%%%%%% -%%% Helpers %%% -%%%%%%%%%%%%%%% +%%-------------------------------------------------------------------- +%% Helpers +%%-------------------------------------------------------------------- + to_binary(Data) when is_list(Data) -> unicode:characters_to_binary(Data); to_binary(Data) when is_integer(Data) -> @@ -73,7 +64,9 @@ base62_size(Data, Encoded) -> EncodedSize >= RangeStart andalso EncodedSize =< RangeEnd end. -%%%%%%%%%%%%%%%%%% -%%% Generators %%% -%%%%%%%%%%%%%%%%%% -raw_data() -> oneof([integer(), string(), binary()]). +%%-------------------------------------------------------------------- +%% Generators +%%-------------------------------------------------------------------- + +raw_data() -> + oneof([integer(), string(), binary()]). diff --git a/test/props/prop_emqx_frame.erl b/test/props/prop_emqx_frame.erl new file mode 100644 index 000000000..731e4ba1e --- /dev/null +++ b/test/props/prop_emqx_frame.erl @@ -0,0 +1,62 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(prop_emqx_frame). + +-include("emqx_mqtt.hrl"). +-include_lib("proper/include/proper.hrl"). + +%%-------------------------------------------------------------------- +%% Properties +%%-------------------------------------------------------------------- + +prop_serialize_parse_connect() -> + ?FORALL(Opts = #{version := ProtoVer}, parse_opts(), + begin + ProtoName = proplists:get_value(ProtoVer, ?PROTOCOL_NAMES), + Packet = ?CONNECT_PACKET(#mqtt_packet_connect{ + proto_name = ProtoName, + proto_ver = ProtoVer, + clientid = <<"clientId">>, + will_qos = ?QOS_1, + will_flag = true, + will_retain = true, + will_topic = <<"will">>, + will_props = #{}, + will_payload = <<"bye">>, + clean_start = true, + properties = #{} + }), + Packet =:= parse_serialize(Packet, Opts) + end). + +%%-------------------------------------------------------------------- +%% Helpers +%%-------------------------------------------------------------------- + +parse_serialize(Packet, Opts) when is_map(Opts) -> + Ver = maps:get(version, Opts, ?MQTT_PROTO_V4), + Bin = iolist_to_binary(emqx_frame:serialize(Packet, Ver)), + ParseState = emqx_frame:initial_parse_state(Opts), + {ok, NPacket, <<>>, _} = emqx_frame:parse(Bin, ParseState), + NPacket. + +%%-------------------------------------------------------------------- +%% Generators +%%-------------------------------------------------------------------- + +parse_opts() -> + ?LET(PropList, [{strict_mode, boolean()}, {version, range(4,5)}], maps:from_list(PropList)). diff --git a/test/props/prop_emqx_json.erl b/test/props/prop_emqx_json.erl new file mode 100644 index 000000000..86f57a1d3 --- /dev/null +++ b/test/props/prop_emqx_json.erl @@ -0,0 +1,196 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(prop_emqx_json). + +-import(emqx_json, + [ decode/1 + , decode/2 + , encode/1 + , safe_decode/1 + , safe_decode/2 + , safe_encode/1 + ]). + +-include_lib("proper/include/proper.hrl"). + +%%-------------------------------------------------------------------- +%% Properties +%%-------------------------------------------------------------------- + +prop_json_basic() -> + ?FORALL(T, json_basic(), + begin + {ok, J} = safe_encode(T), + {ok, T} = safe_decode(J), + T = decode(encode(T)), + true + end). + +prop_json_basic_atom() -> + ?FORALL(T0, latin_atom(), + begin + T = atom_to_binary(T0, utf8), + {ok, J} = safe_encode(T0), + {ok, T} = safe_decode(J), + T = decode(encode(T0)), + true + end). + +prop_object_proplist_to_proplist() -> + ?FORALL(T, json_object(), + begin + {ok, J} = safe_encode(T), + {ok, T} = safe_decode(J), + T = decode(encode(T)), + true + end). + +prop_object_map_to_map() -> + ?FORALL(T, json_object_map(), + begin + {ok, J} = safe_encode(T), + {ok, T} = safe_decode(J, [return_maps]), + T = decode(encode(T), [return_maps]), + true + end). + +%% The duplicated key will be overriden +prop_object_proplist_to_map() -> + ?FORALL(T0, json_object(), + begin + T = to_map(T0), + {ok, J} = safe_encode(T0), + {ok, T} = safe_decode(J, [return_maps]), + T = decode(encode(T0), [return_maps]), + true + end). + +prop_object_map_to_proplist() -> + ?FORALL(T0, json_object_map(), + begin + %% jiffy encode a map with descending order, that is, + %% it is opposite with maps traversal sequence + %% see: the `to_list` implementation + T = to_list(T0), + {ok, J} = safe_encode(T0), + {ok, T} = safe_decode(J), + T = decode(encode(T0)), + true + end). + +prop_safe_encode() -> + ?FORALL(T, invalid_json_term(), + begin + {error, _} = safe_encode(T), true + end). + +prop_safe_decode() -> + ?FORALL(T, invalid_json_str(), + begin + {error, _} = safe_decode(T), true + end). + +%%-------------------------------------------------------------------- +%% Helpers +%%-------------------------------------------------------------------- + +to_map([{_, _}|_] = L) -> + lists:foldl( + fun({Name, Value}, Acc) -> + Acc#{Name => to_map(Value)} + end, #{}, L); +to_map(L) when is_list(L) -> + [to_map(E) || E <- L]; +to_map(T) -> T. + +to_list(L) when is_list(L) -> + [to_list(E) || E <- L]; +to_list(M) when is_map(M) -> + maps:fold( + fun(K, V, Acc) -> + [{K, to_list(V)}|Acc] + end, [], M); +to_list(T) -> T. + +%%-------------------------------------------------------------------- +%% Generators (https://tools.ietf.org/html/rfc8259) +%%-------------------------------------------------------------------- + +%% true, false, null, and number(), string() +json_basic() -> + oneof([true, false, null, number(), json_string()]). + +latin_atom() -> + ?LET(L, list(latin_char()), list_to_atom(L)). + +latin_char() -> + L = lists:concat([lists:seq($0, $9), + lists:seq($a, $z), + lists:seq($A, $Z)]), + oneof(L). + +json_string() -> utf8(). + +json_object() -> + oneof([json_array_1(), json_object_1(), json_array_object_1(), + json_array_2(), json_object_2(), json_array_object_2()]). + +json_object_map() -> + ?LET(L, json_object(), to_map(L)). + +json_array_1() -> + list(json_basic()). + +json_array_2() -> + list([json_basic(), json_array_1()]). + +json_object_1() -> + list({json_key(), json_basic()}). + +json_object_2() -> + list({json_key(), oneof([json_basic(), + json_array_1(), + json_object_1()])}). + +json_array_object_1() -> + list(json_object_1()). + +json_array_object_2() -> + list(json_object_2()). + +%% @private +json_key() -> + ?LET(K, latin_atom(), atom_to_binary(K, utf8)). + +invalid_json_term() -> + ?SUCHTHAT(T, tuple(), (tuple_size(T) /= 1)). + +invalid_json_str() -> + ?LET(T, json_object_2(), chaos(encode(T))). + +%% @private +chaos(S) when is_binary(S) -> + T = [$\r, $\n, $", ${, $}, $[, $], $:, $,], + iolist_to_binary(chaos(binary_to_list(S), 100, T)). + +chaos(S, 0, _) -> + S; +chaos(S, N, T) -> + I = rand:uniform(length(S)), + {L1, L2} = lists:split(I, S), + chaos(lists:flatten([L1, lists:nth(rand:uniform(length(T)), T), L2]), N-1, T). + diff --git a/test/emqx_psk_SUITE.erl b/test/props/prop_emqx_psk.erl similarity index 68% rename from test/emqx_psk_SUITE.erl rename to test/props/prop_emqx_psk.erl index 72b2bd42e..d853c1b93 100644 --- a/test/emqx_psk_SUITE.erl +++ b/test/props/prop_emqx_psk.erl @@ -14,48 +14,46 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_psk_SUITE). --compile(export_all). --compile(nowarn_export_all). +-module(prop_emqx_psk). -include_lib("proper/include/proper.hrl"). --include_lib("eunit/include/eunit.hrl"). --include_lib("common_test/include/ct.hrl"). -all() -> emqx_ct:all(?MODULE). +-define(ALL(Vars, Types, Exprs), + ?SETUP(fun() -> + State = do_setup(), + fun() -> do_teardown(State) end + end, ?FORALL(Vars, Types, Exprs))). -t_lookup(_) -> - ok = load(), - ok = emqx_logger:set_log_level(emergency), - Opts = [{to_file, user}, {numtests, 10}], - ?assert(proper:quickcheck(prop_lookup(), Opts)), - ok = unload(), - ok = emqx_logger:set_log_level(error). +%%-------------------------------------------------------------------- +%% Properties +%%-------------------------------------------------------------------- prop_lookup() -> - ?FORALL({ClientPSKID, UserState}, - {client_pskid(), user_state()}, - begin - case emqx_psk:lookup(psk, ClientPSKID, UserState) of - {ok, _Result} -> true; - error -> true; - _Other -> false - end - end). + ?ALL({ClientPSKID, UserState}, + {client_pskid(), user_state()}, + begin + case emqx_psk:lookup(psk, ClientPSKID, UserState) of + {ok, _Result} -> true; + error -> true; + _Other -> false + end + end). %%-------------------------------------------------------------------- %% Helper %%-------------------------------------------------------------------- -load() -> +do_setup() -> + ok = emqx_logger:set_log_level(emergency), ok = meck:new(emqx_hooks, [passthrough, no_history]), ok = meck:expect(emqx_hooks, run_fold, fun('tls_handshake.psk_lookup', [ClientPSKID], not_found) -> unicode:characters_to_binary(ClientPSKID) end). -unload() -> +do_teardown(_) -> + ok = emqx_logger:set_log_level(error), ok = meck:unload(emqx_hooks). %%-------------------------------------------------------------------- @@ -65,3 +63,4 @@ unload() -> client_pskid() -> oneof([string(), integer(), [1, [-1]]]). user_state() -> term(). + diff --git a/test/props/prop_emqx_reason_codes.erl b/test/props/prop_emqx_reason_codes.erl new file mode 100644 index 000000000..f6f5e81b0 --- /dev/null +++ b/test/props/prop_emqx_reason_codes.erl @@ -0,0 +1,123 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(prop_emqx_reason_codes). + +-include("emqx_mqtt.hrl"). +-include_lib("proper/include/proper.hrl"). + +%%-------------------------------------------------------------------- +%% Properties +%%-------------------------------------------------------------------- + +prop_name_text() -> + ?FORALL(UnionArgs, union_args(), + is_atom(apply_fun(name, UnionArgs)) andalso + is_binary(apply_fun(text, UnionArgs))). + +prop_compat() -> + ?FORALL(CompatArgs, compat_args(), + begin + Result = apply_fun(compat, CompatArgs), + is_number(Result) orelse Result =:= undefined + end). + +prop_connack_error() -> + ?FORALL(CONNACK_ERROR_ARGS, connack_error_args(), + is_integer(apply_fun(connack_error, CONNACK_ERROR_ARGS))). + +%%-------------------------------------------------------------------- +%% Helper +%%-------------------------------------------------------------------- + +apply_fun(Fun, Args) -> + apply(emqx_reason_codes, Fun, Args). + +%%-------------------------------------------------------------------- +%% Generator +%%-------------------------------------------------------------------- + +union_args() -> + frequency([{6, [real_mqttv3_rc(), mqttv3_version()]}, + {43, [real_mqttv5_rc(), mqttv5_version()]}]). + +compat_args() -> + frequency([{18, [connack, compat_rc()]}, + {2, [suback, compat_rc()]}, + {1, [unsuback, compat_rc()]}]). + +connack_error_args() -> + [frequency([{10, connack_error()}, + {1, unexpected_connack_error()}])]. + +connack_error() -> + oneof([client_identifier_not_valid, + bad_username_or_password, + bad_clientid_or_password, + username_or_password_undefined, + password_error, + not_authorized, + server_unavailable, + server_busy, + banned, + bad_authentication_method]). + +unexpected_connack_error() -> + oneof([who_knows]). + + +real_mqttv3_rc() -> + frequency([{6, mqttv3_rc()}, + {1, unexpected_rc()}]). + +real_mqttv5_rc() -> + frequency([{43, mqttv5_rc()}, + {2, unexpected_rc()}]). + +compat_rc() -> + frequency([{95, ?SUCHTHAT(RC , mqttv5_rc(), RC >= 16#80 orelse RC =< 2)}, + {5, unexpected_rc()}]). + +mqttv3_rc() -> + oneof(mqttv3_rcs()). + +mqttv5_rc() -> + oneof(mqttv5_rcs()). + +unexpected_rc() -> + oneof(unexpected_rcs()). + +mqttv3_rcs() -> + [0, 1, 2, 3, 4, 5]. + +mqttv5_rcs() -> + [16#00, 16#01, 16#02, 16#04, 16#10, 16#11, 16#18, 16#19, + 16#80, 16#81, 16#82, 16#83, 16#84, 16#85, 16#86, 16#87, + 16#88, 16#89, 16#8A, 16#8B, 16#8C, 16#8D, 16#8E, 16#8F, + 16#90, 16#91, 16#92, 16#93, 16#94, 16#95, 16#96, 16#97, + 16#98, 16#99, 16#9A, 16#9B, 16#9C, 16#9D, 16#9E, 16#9F, + 16#A0, 16#A1, 16#A2]. + +unexpected_rcs() -> + ReasonCodes = mqttv3_rcs() ++ mqttv5_rcs(), + Unexpected = lists:seq(0, 16#FF) -- ReasonCodes, + lists:sublist(Unexpected, 5). + +mqttv5_version() -> + ?MQTT_PROTO_V5. + +mqttv3_version() -> + oneof([?MQTT_PROTO_V3, ?MQTT_PROTO_V4]). diff --git a/test/props/prop_emqx_rpc.erl b/test/props/prop_emqx_rpc.erl new file mode 100644 index 000000000..ab876db39 --- /dev/null +++ b/test/props/prop_emqx_rpc.erl @@ -0,0 +1,132 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(prop_emqx_rpc). + +-include_lib("proper/include/proper.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-define(ALL(Vars, Types, Exprs), + ?SETUP(fun() -> + State = do_setup(), + fun() -> do_teardown(State) end + end, ?FORALL(Vars, Types, Exprs))). + +%%-------------------------------------------------------------------- +%% Properties +%%-------------------------------------------------------------------- + +prop_node() -> + ?ALL(Node, nodename(), + begin + ?assert(emqx_rpc:cast(Node, erlang, system_time, [])), + case emqx_rpc:call(Node, erlang, system_time, []) of + {badrpc, _Reason} -> true; + Delivery when is_integer(Delivery) -> true; + _Other -> false + end + end). + +prop_node_with_key() -> + ?ALL({Node, Key}, nodename_with_key(), + begin + ?assert(emqx_rpc:cast(Key, Node, erlang, system_time, [])), + case emqx_rpc:call(Key, Node, erlang, system_time, []) of + {badrpc, _Reason} -> true; + Delivery when is_integer(Delivery) -> true; + _Other -> false + end + end). + +prop_nodes() -> + ?ALL(Nodes, nodesname(), + begin + case emqx_rpc:multicall(Nodes, erlang, system_time, []) of + {badrpc, _Reason} -> true; + {RealResults, RealBadNodes} + when is_list(RealResults); + is_list(RealBadNodes) -> + true; + _Other -> false + end + end). + +prop_nodes_with_key() -> + ?ALL({Nodes, Key}, nodesname_with_key(), + begin + case emqx_rpc:multicall(Key, Nodes, erlang, system_time, []) of + {badrpc, _Reason} -> true; + {RealResults, RealBadNodes} + when is_list(RealResults); + is_list(RealBadNodes) -> + true; + _Other -> false + end + end). + +%%-------------------------------------------------------------------- +%% Helper +%%-------------------------------------------------------------------- + +do_setup() -> + {ok, _Apps} = application:ensure_all_started(gen_rpc), + ok = application:set_env(gen_rpc, call_receive_timeout, 1), + ok = emqx_logger:set_log_level(emergency), + ok = meck:new(gen_rpc, [passthrough, no_history]), + ok = meck:expect(gen_rpc, multicall, + fun(Nodes, Mod, Fun, Args) -> + gen_rpc:multicall(Nodes, Mod, Fun, Args, 1) + end). + +do_teardown(_) -> + ok = emqx_logger:set_log_level(debug), + ok = application:stop(gen_rpc), + ok = meck:unload(gen_rpc). + +%%-------------------------------------------------------------------- +%% Generator +%%-------------------------------------------------------------------- + +nodename() -> + ?LET({NodePrefix, HostName}, + {node_prefix(), hostname()}, + begin + Node = NodePrefix ++ "@" ++ HostName, + list_to_atom(Node) + end). + +nodename_with_key() -> + ?LET({NodePrefix, HostName, Key}, + {node_prefix(), hostname(), choose(0, 10)}, + begin + Node = NodePrefix ++ "@" ++ HostName, + {list_to_atom(Node), Key} + end). + +nodesname() -> + oneof([list(nodename()), [node()]]). + +nodesname_with_key() -> + oneof([{list(nodename()), choose(0, 10)}, {[node()], 1}]). + +node_prefix() -> + oneof(["emqxct", text_like()]). + +text_like() -> + ?SUCHTHAT(Text, list(range($a, $z)), (length(Text) =< 5 andalso length(Text) > 0)). + +hostname() -> + oneof(["127.0.0.1", "localhost"]). diff --git a/test/props/prop_emqx_sys.erl b/test/props/prop_emqx_sys.erl new file mode 100644 index 000000000..3c2b599c4 --- /dev/null +++ b/test/props/prop_emqx_sys.erl @@ -0,0 +1,133 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(prop_emqx_sys). + +-include_lib("proper/include/proper.hrl"). + +-export([ initial_state/0 + , command/1 + , precondition/2 + , postcondition/3 + , next_state/3 + ]). + +-define(mock_modules, + [ emqx_metrics + , emqx_stats + , emqx_broker + , ekka_mnesia + ]). + +-define(ALL(Vars, Types, Exprs), + ?SETUP(fun() -> + State = do_setup(), + fun() -> do_teardown(State) end + end, ?FORALL(Vars, Types, Exprs))). + +%%-------------------------------------------------------------------- +%% Properties +%%-------------------------------------------------------------------- + +prop_sys() -> + ?ALL(Cmds, commands(?MODULE), + begin + {ok, _Pid} = emqx_sys:start_link(), + {History, State, Result} = run_commands(?MODULE, Cmds), + ok = emqx_sys:stop(), + ?WHENFAIL(io:format("History: ~p\nState: ~p\nResult: ~p\n", + [History,State,Result]), + aggregate(command_names(Cmds), true)) + end). + +%%-------------------------------------------------------------------- +%% Helpers +%%-------------------------------------------------------------------- + +do_setup() -> + ok = emqx_logger:set_log_level(emergency), + [mock(Mod) || Mod <- ?mock_modules], + ok. + +do_teardown(_) -> + ok = emqx_logger:set_log_level(error), + [ok = meck:unload(Mod) || Mod <- ?mock_modules], + ok. + +mock(Module) -> + ok = meck:new(Module, [passthrough, no_history]), + do_mock(Module). + +do_mock(emqx_broker) -> + meck:expect(emqx_broker, publish, + fun(Msg) -> {node(), <<"test">>, Msg} end), + meck:expect(emqx_broker, safe_publish, + fun(Msg) -> {node(), <<"test">>, Msg} end); +do_mock(emqx_stats) -> + meck:expect(emqx_stats, getstats, fun() -> [0] end); +do_mock(ekka_mnesia) -> + meck:expect(ekka_mnesia, running_nodes, fun() -> [node()] end); +do_mock(emqx_metrics) -> + meck:expect(emqx_metrics, all, fun() -> [{hello, 3}] end). + +%%-------------------------------------------------------------------- +%% MODEL +%%-------------------------------------------------------------------- + +%% @doc Initial model value at system start. Should be deterministic. +initial_state() -> + #{}. + +%% @doc List of possible commands to run against the system +command(_State) -> + oneof([{call, emqx_sys, info, []}, + {call, emqx_sys, version, []}, + {call, emqx_sys, uptime, []}, + {call, emqx_sys, datetime, []}, + {call, emqx_sys, sysdescr, []}, + {call, emqx_sys, sys_interval, []}, + {call, emqx_sys, sys_heatbeat_interval, []}, + %------------ unexpected message ----------------------% + {call, emqx_sys, handle_call, [emqx_sys, other, state]}, + {call, emqx_sys, handle_cast, [emqx_sys, other]}, + {call, emqx_sys, handle_info, [info, state]} + ]). + +precondition(_State, {call, _Mod, _Fun, _Args}) -> + timer:sleep(1), + true. + +postcondition(_State, {call, emqx_sys, info, []}, Info) -> + is_list(Info) andalso length(Info) =:= 4; +postcondition(_State, {call, emqx_sys, version, []}, Version) -> + is_list(Version); +postcondition(_State, {call, emqx_sys, uptime, []}, Uptime) -> + is_list(Uptime); +postcondition(_State, {call, emqx_sys, datetime, []}, Datetime) -> + is_list(Datetime); +postcondition(_State, {call, emqx_sys, sysdescr, []}, Sysdescr) -> + is_list(Sysdescr); +postcondition(_State, {call, emqx_sys, sys_interval, []}, SysInterval) -> + is_integer(SysInterval) andalso SysInterval > 0; +postcondition(_State, {call, emqx_sys, sys_heartbeat_interval, []}, SysHeartInterval) -> + is_integer(SysHeartInterval) andalso SysHeartInterval > 0; +postcondition(_State, {call, _Mod, _Fun, _Args}, _Res) -> + true. + +next_state(State, _Res, {call, _Mod, _Fun, _Args}) -> + NewState = State, + NewState. +