diff --git a/apps/emqx_exhook/src/emqx_exhook_handler.erl b/apps/emqx_exhook/src/emqx_exhook_handler.erl index ea6cdf5bc..63d41d8eb 100644 --- a/apps/emqx_exhook/src/emqx_exhook_handler.erl +++ b/apps/emqx_exhook/src/emqx_exhook_handler.erl @@ -63,6 +63,8 @@ , call_fold/3 ]). +-elvis([{elvis_style, god_modules, disable}]). + %%-------------------------------------------------------------------- %% Clients %%-------------------------------------------------------------------- @@ -341,11 +343,7 @@ merge_responsed_bool(_Req, #{type := 'IGNORE'}) -> ignore; merge_responsed_bool(Req, #{type := Type, value := {bool_result, NewBool}}) when is_boolean(NewBool) -> - NReq = Req#{result => NewBool}, - case Type of - 'CONTINUE' -> {ok, NReq}; - 'STOP_AND_RETURN' -> {stop, NReq} - end; + {ret(Type), Req#{result => NewBool}}; merge_responsed_bool(_Req, Resp) -> ?LOG(warning, "Unknown responsed value ~0p to merge to callback chain", [Resp]), ignore. @@ -353,11 +351,10 @@ merge_responsed_bool(_Req, Resp) -> merge_responsed_message(_Req, #{type := 'IGNORE'}) -> ignore; merge_responsed_message(Req, #{type := Type, value := {message, NMessage}}) -> - NReq = Req#{message => NMessage}, - case Type of - 'CONTINUE' -> {ok, NReq}; - 'STOP_AND_RETURN' -> {stop, NReq} - end; + {ret(Type), Req#{message => NMessage}}; merge_responsed_message(_Req, Resp) -> ?LOG(warning, "Unknown responsed value ~0p to merge to callback chain", [Resp]), ignore. + +ret('CONTINUE') -> ok; +ret('STOP_AND_RETURN') -> stop. diff --git a/apps/emqx_exhook/src/emqx_exhook_mngr.erl b/apps/emqx_exhook/src/emqx_exhook_mngr.erl index 54e106f13..0f3c8b8ab 100644 --- a/apps/emqx_exhook/src/emqx_exhook_mngr.erl +++ b/apps/emqx_exhook/src/emqx_exhook_mngr.erl @@ -86,11 +86,11 @@ start_link(Servers, AutoReconnect, ReqOpts) -> gen_server:start_link(?MODULE, [Servers, AutoReconnect, ReqOpts], []). --spec enable(pid(), atom()|string()) -> ok | {error, term()}. +-spec enable(pid(), atom() | string()) -> ok | {error, term()}. enable(Pid, Name) -> call(Pid, {load, Name}). --spec disable(pid(), atom()|string()) -> ok | {error, term()}. +-spec disable(pid(), atom() | string()) -> ok | {error, term()}. disable(Pid, Name) -> call(Pid, {unload, Name}). @@ -141,7 +141,7 @@ load_all_servers(Servers, ReqOpts) -> load_all_servers(Servers, ReqOpts, #{}, #{}). load_all_servers([], _Request, Waiting, Running) -> {Waiting, Running}; -load_all_servers([{Name, Options}|More], ReqOpts, Waiting, Running) -> +load_all_servers([{Name, Options} | More], ReqOpts, Waiting, Running) -> {NWaiting, NRunning} = case emqx_exhook_server:load(Name, Options, ReqOpts) of {ok, ServerState} -> diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index 123cbb558..9e88d774d 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -77,6 +77,8 @@ -dialyzer({nowarn_function, [inc_metrics/2]}). +-elvis([{elvis_style, dont_repeat_yourself, disable}]). + %%-------------------------------------------------------------------- %% Load/Unload APIs %%-------------------------------------------------------------------- @@ -125,7 +127,11 @@ channel_opts(Opts) -> SvrAddr = format_http_uri(Scheme, Host, Port), ClientOpts = case Scheme of https -> - SslOpts = lists:keydelete(ssl, 1, proplists:get_value(ssl_options, Opts, [])), + SslOpts = lists:keydelete( + ssl, + 1, + proplists:get_value(ssl_options, Opts, []) + ), #{gun_opts => #{transport => ssl, transport_opts => SslOpts}}; @@ -175,16 +181,18 @@ resovle_hookspec(HookSpecs) when is_list(HookSpecs) -> case maps:get(name, HookSpec, undefined) of undefined -> Acc; Name0 -> - Name = try binary_to_existing_atom(Name0, utf8) catch T:R:_ -> {T,R} end, - case lists:member(Name, AvailableHooks) of - true -> - case lists:member(Name, MessageHooks) of - true -> - Acc#{Name => #{topics => maps:get(topics, HookSpec, [])}}; - _ -> - Acc#{Name => #{}} - end; - _ -> error({unknown_hookpoint, Name}) + Name = try + binary_to_existing_atom(Name0, utf8) + catch T:R:_ -> {T,R} + end, + case {lists:member(Name, AvailableHooks), + lists:member(Name, MessageHooks)} of + {false, _} -> + error({unknown_hookpoint, Name}); + {true, false} -> + Acc#{Name => #{}}; + {true, true} -> + Acc#{Name => #{topics => maps:get(topics, HookSpec, [])}} end end end, #{}, HookSpecs). @@ -256,7 +264,7 @@ call(Hookpoint, Req, #server{name = ChannName, options = ReqOpts, %% @private inc_metrics(IncFun, Name) when is_function(IncFun) -> %% BACKW: e4.2.0-e4.2.2 - {env, [Prefix|_]} = erlang:fun_info(IncFun, env), + {env, [Prefix | _]} = erlang:fun_info(IncFun, env), inc_metrics(Prefix, Name); inc_metrics(Prefix, Name) when is_list(Prefix) -> emqx_metrics:inc(list_to_atom(Prefix ++ atom_to_list(Name))). @@ -272,8 +280,8 @@ do_call(ChannName, Fun, Req, ReqOpts) -> Options = ReqOpts#{channel => ChannName}, ?LOG(debug, "Call ~0p:~0p(~0p, ~0p)", [?PB_CLIENT_MOD, Fun, Req, Options]), case catch apply(?PB_CLIENT_MOD, Fun, [Req, Options]) of - {ok, Resp, _Metadata} -> - ?LOG(debug, "Response {ok, ~0p, ~0p}", [Resp, _Metadata]), + {ok, Resp, Metadata} -> + ?LOG(debug, "Response {ok, ~0p, ~0p}", [Resp, Metadata]), {ok, Resp}; {error, {Code, Msg}, _Metadata} -> ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) response errcode: ~0p, errmsg: ~0p", diff --git a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl index 794eedd6b..27215cd4f 100644 --- a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl @@ -50,6 +50,9 @@ %% erlang:system_time should be unique and random enough -define(CLIENTID, iolist_to_binary([atom_to_list(?FUNCTION_NAME), "-", integer_to_list(erlang:system_time())])). + +-elvis([{elvis_style, dont_repeat_yourself, disable}]). + %%-------------------------------------------------------------------- %% Setups %%-------------------------------------------------------------------- @@ -66,8 +69,10 @@ end_per_suite(_) -> emqx_ct_helpers:stop_apps([emqx_sn]). set_special_confs(emqx) -> - application:set_env(emqx, plugins_loaded_file, - emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins")); + application:set_env( + emqx, + plugins_loaded_file, + emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins")); set_special_confs(emqx_sn) -> application:set_env(emqx_sn, enable_qos3, ?ENABLE_QOS3), application:set_env(emqx_sn, enable_stats, true), @@ -113,7 +118,8 @@ t_subscribe(_) -> ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), TopicName1 = <<"abcD">>, send_register_msg(Socket, TopicName1, MsgId), - ?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, + receive_response(Socket)), send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId), ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16, @@ -145,7 +151,8 @@ t_subscribe_case01(_) -> TopicName1 = <<"abcD">>, send_register_msg(Socket, TopicName1, MsgId), - ?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, + receive_response(Socket)), send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId), ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ReturnCode>>, @@ -166,7 +173,7 @@ t_subscribe_case02(_) -> Will = 0, CleanSession = 0, MsgId = 1, - TopicId = ?PREDEF_TOPIC_ID1, %this TopicId is the predefined topic id corresponding to ?PREDEF_TOPIC_NAME1 + TopicId = ?PREDEF_TOPIC_ID1, ReturnCode = 0, {ok, Socket} = gen_udp:open(0, [binary]), @@ -176,7 +183,8 @@ t_subscribe_case02(_) -> Topic1 = ?PREDEF_TOPIC_NAME1, send_register_msg(Socket, Topic1, MsgId), - ?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, + receive_response(Socket)), send_subscribe_msg_predefined_topic(Socket, QoS, TopicId, MsgId), ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ReturnCode>>, @@ -206,9 +214,11 @@ t_subscribe_case03(_) -> ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_short_topic(Socket, QoS, <<"te">>, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, - ?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ReturnCode>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, + CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId:16, MsgId:16, ReturnCode>>, + receive_response(Socket) + ), send_unsubscribe_msg_short_topic(Socket, <<"te">>, MsgId), ?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)), @@ -217,8 +227,12 @@ t_subscribe_case03(_) -> ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). -%%In this case We use predefined topic name to register and subcribe, and expect to receive the corresponding predefined topic id but not a new generated topic id from broker. We design this case to illustrate -%% emqx_sn_gateway's compatibility of dealing with predefined and normal topics. Once we give more restrictions to different topic id type, this case would be deleted or modified. +%% In this case We use predefined topic name to register and subcribe, and +%% expect to receive the corresponding predefined topic id but not a new +%% generated topic id from broker. We design this case to illustrate +%% emqx_sn_gateway's compatibility of dealing with predefined and normal topics. +%% Once we give more restrictions to different topic id type, this case would +%% be deleted or modified. t_subscribe_case04(_) -> Dup = 0, QoS = 0, @@ -226,7 +240,7 @@ t_subscribe_case04(_) -> Will = 0, CleanSession = 0, MsgId = 1, - TopicId = ?PREDEF_TOPIC_ID1, %this TopicId is the predefined topic id corresponding to ?PREDEF_TOPIC_NAME1 + TopicId = ?PREDEF_TOPIC_ID1, ReturnCode = 0, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, @@ -234,10 +248,14 @@ t_subscribe_case04(_) -> ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), Topic1 = ?PREDEF_TOPIC_NAME1, send_register_msg(Socket, Topic1, MsgId), - ?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, + receive_response(Socket)), send_subscribe_msg_normal_topic(Socket, QoS, Topic1, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ReturnCode>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId:16, MsgId:16, ReturnCode>>, + receive_response(Socket) + ), send_unsubscribe_msg_normal_topic(Socket, Topic1, MsgId), ?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)), @@ -264,19 +282,30 @@ t_subscribe_case05(_) -> ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_register_msg(Socket, <<"abcD">>, MsgId), - ?assertEqual(<<7, ?SN_REGACK, TopicId1:16, MsgId:16, 0:8>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_REGACK, TopicId1:16, MsgId:16, 0:8>>, + receive_response(Socket) + ), send_subscribe_msg_normal_topic(Socket, QoS, <<"abcD">>, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ReturnCode>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId1:16, MsgId:16, ReturnCode>>, + receive_response(Socket) + ), send_subscribe_msg_normal_topic(Socket, QoS, <<"/sport/#">>, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ReturnCode>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId0:16, MsgId:16, ReturnCode>>, + receive_response(Socket) + ), send_subscribe_msg_normal_topic(Socket, QoS, <<"/a/+/water">>, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ReturnCode>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId0:16, MsgId:16, ReturnCode>>, + receive_response(Socket) + ), send_subscribe_msg_normal_topic(Socket, QoS, <<"/Tom/Home">>, MsgId), ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, @@ -306,19 +335,32 @@ t_subscribe_case06(_) -> ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_register_msg(Socket, <<"abc">>, MsgId), - ?assertEqual(<<7, ?SN_REGACK, TopicId1:16, MsgId:16, 0:8>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_REGACK, TopicId1:16, MsgId:16, 0:8>>, + receive_response(Socket) + ), send_register_msg(Socket, <<"/blue/#">>, MsgId), - ?assertEqual(<<7, ?SN_REGACK, TopicId0:16, MsgId:16, ?SN_RC_NOT_SUPPORTED:8>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_REGACK, TopicId0:16, + MsgId:16, ?SN_RC_NOT_SUPPORTED:8>>, + receive_response(Socket) + ), send_register_msg(Socket, <<"/blue/+/white">>, MsgId), - ?assertEqual(<<7, ?SN_REGACK, TopicId0:16, MsgId:16, ?SN_RC_NOT_SUPPORTED:8>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_REGACK, TopicId0:16, + MsgId:16, ?SN_RC_NOT_SUPPORTED:8>>, + receive_response(Socket) + ), send_register_msg(Socket, <<"/$sys/rain">>, MsgId), - ?assertEqual(<<7, ?SN_REGACK, TopicId2:16, MsgId:16, 0:8>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_REGACK, TopicId2:16, MsgId:16, 0:8>>, + receive_response(Socket) + ), send_subscribe_msg_short_topic(Socket, QoS, <<"Q2">>, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ReturnCode>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId0:16, MsgId:16, ReturnCode>>, + receive_response(Socket) + ), send_unsubscribe_msg_normal_topic(Socket, <<"Q2">>, MsgId), ?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)), @@ -342,8 +384,11 @@ t_subscribe_case07(_) -> ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_predefined_topic(Socket, QoS, TopicId1, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId1:16, MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>, + receive_response(Socket) + ), send_unsubscribe_msg_predefined_topic(Socket, TopicId2, MsgId), ?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)), @@ -365,8 +410,11 @@ t_subscribe_case08(_) -> ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_reserved_topic(Socket, QoS, TopicId2, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, ?SN_INVALID_TOPIC_ID:16, MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + ?SN_INVALID_TOPIC_ID:16, MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>, + receive_response(Socket) + ), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), @@ -390,15 +438,20 @@ t_publish_negqos_case09(_) -> send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), MsgId1 = 3, Payload1 = <<20, 21, 22, 23>>, send_publish_msg_normal_topic(Socket, NegQoS, MsgId1, TopicId1, Payload1), timer:sleep(100), case ?ENABLE_QOS3 of true -> - Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, + Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, What = receive_response(Socket), ?assertEqual(Eexp, What) end, @@ -431,7 +484,9 @@ t_publish_qos0_case01(_) -> send_publish_msg_normal_topic(Socket, QoS, MsgId1, TopicId1, Payload1), timer:sleep(100), - Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, + Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, What = receive_response(Socket), ?assertEqual(Eexp, What), @@ -453,15 +508,20 @@ t_publish_qos0_case02(_) -> ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_predefined_topic(Socket, QoS, PredefTopicId, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), MsgId1 = 3, Payload1 = <<20, 21, 22, 23>>, send_publish_msg_predefined_topic(Socket, QoS, MsgId1, PredefTopicId, Payload1), timer:sleep(100), - Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_PREDEFINED_TOPIC:2, PredefTopicId:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, + Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_PREDEFINED_TOPIC:2, + PredefTopicId:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, What = receive_response(Socket), ?assertEqual(Eexp, What), @@ -484,15 +544,20 @@ t_publish_qos0_case3(_) -> Topic = <<"/a/b/c">>, send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), MsgId1 = 3, Payload1 = <<20, 21, 22, 23>>, send_publish_msg_predefined_topic(Socket, QoS, MsgId1, TopicId, Payload1), timer:sleep(100), - Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, + Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, What = receive_response(Socket), ?assertEqual(Eexp, What), @@ -514,8 +579,11 @@ t_publish_qos0_case04(_) -> ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_normal_topic(Socket, QoS, <<"#">>, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), MsgId1 = 2, Payload1 = <<20, 21, 22, 23>>, @@ -523,7 +591,9 @@ t_publish_qos0_case04(_) -> send_publish_msg_short_topic(Socket, QoS, MsgId1, Topic, Payload1), timer:sleep(100), - Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_SHORT_TOPIC:2, Topic/binary, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, + Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_SHORT_TOPIC:2, + Topic/binary, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, What = receive_response(Socket), ?assertEqual(Eexp, What), @@ -544,8 +614,11 @@ t_publish_qos0_case05(_) -> send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_short_topic(Socket, QoS, <<"/#">>, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), @@ -567,15 +640,20 @@ t_publish_qos0_case06(_) -> Topic = <<"abc">>, send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), MsgId1 = 3, Payload1 = <<20, 21, 22, 23>>, send_publish_msg_normal_topic(Socket, QoS, MsgId1, TopicId1, Payload1), timer:sleep(100), - Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, + Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, What = receive_response(Socket), ?assertEqual(Eexp, What), @@ -597,16 +675,25 @@ t_publish_qos1_case01(_) -> send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, - ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), Payload1 = <<20, 21, 22, 23>>, send_publish_msg_normal_topic(Socket, QoS, MsgId, TopicId1, Payload1), - ?assertEqual(<<7, ?SN_PUBACK, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_PUBACK, TopicId1:16, + MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), timer:sleep(100), - ?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, <<20, 21, 22, 23>>/binary>>, receive_response(Socket)), + ?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId1:16, MsgId:16, <<20, 21, 22, 23>>/binary>>, + receive_response(Socket) + ), send_disconnect_msg(Socket, undefined), gen_udp:close(Socket). @@ -625,12 +712,18 @@ t_publish_qos1_case02(_) -> ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_predefined_topic(Socket, QoS, PredefTopicId, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), Payload1 = <<20, 21, 22, 23>>, send_publish_msg_predefined_topic(Socket, QoS, MsgId, PredefTopicId, Payload1), - ?assertEqual(<<7, ?SN_PUBACK, PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_PUBACK, PredefTopicId:16, + MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), timer:sleep(100), send_disconnect_msg(Socket, undefined), @@ -645,7 +738,10 @@ t_publish_qos1_case03(_) -> send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_publish_msg_predefined_topic(Socket, QoS, MsgId, tid(5), <<20, 21, 22, 23>>), - ?assertEqual(<<7, ?SN_PUBACK, TopicId5:16, MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_PUBACK, TopicId5:16, + MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>, + receive_response(Socket) + ), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), @@ -664,15 +760,20 @@ t_publish_qos1_case04(_) -> send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_short_topic(Socket, QoS, <<"ab">>, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, - ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), Topic = <<"ab">>, Payload1 = <<20, 21, 22, 23>>, send_publish_msg_short_topic(Socket, QoS, MsgId, Topic, Payload1), <> = Topic, - ?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16, + MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), timer:sleep(100), send_disconnect_msg(Socket, undefined), @@ -692,13 +793,18 @@ t_publish_qos1_case05(_) -> ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_normal_topic(Socket, QoS, <<"ab">>, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, - ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), send_publish_msg_short_topic(Socket, QoS, MsgId, <<"/#">>, <<20, 21, 22, 23>>), <> = <<"/#">>, - ?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16, MsgId:16, ?SN_RC_NOT_SUPPORTED>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16, + MsgId:16, ?SN_RC_NOT_SUPPORTED>>, + receive_response(Socket) + ), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), @@ -724,7 +830,10 @@ t_publish_qos1_case06(_) -> send_publish_msg_short_topic(Socket, QoS, MsgId, <<"/+">>, <<20, 21, 22, 23>>), <> = <<"/+">>, - ?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16, MsgId:16, ?SN_RC_NOT_SUPPORTED>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16, + MsgId:16, ?SN_RC_NOT_SUPPORTED>>, + receive_response(Socket) + ), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), @@ -751,7 +860,11 @@ t_publish_qos2_case01(_) -> send_publish_msg_normal_topic(Socket, QoS, MsgId, TopicId1, Payload1), ?assertEqual(<<4, ?SN_PUBREC, MsgId:16>>, receive_response(Socket)), send_pubrel_msg(Socket, MsgId), - ?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, 1:16, <<20, 21, 22, 23>>/binary>>, receive_response(Socket)), + ?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId1:16, 1:16, <<20, 21, 22, 23>>/binary>>, + receive_response(Socket) + ), ?assertEqual(<<4, ?SN_PUBCOMP, MsgId:16>>, receive_response(Socket)), timer:sleep(100), @@ -773,15 +886,21 @@ t_publish_qos2_case02(_) -> ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_predefined_topic(Socket, QoS, PredefTopicId, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, ?FNU:1, QoS:2, ?FNU:5, PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, ?FNU:1, QoS:2, ?FNU:5, + PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), Payload1 = <<20, 21, 22, 23>>, send_publish_msg_predefined_topic(Socket, QoS, MsgId, PredefTopicId, Payload1), ?assertEqual(<<4, ?SN_PUBREC, MsgId:16>>, receive_response(Socket)), send_pubrel_msg(Socket, MsgId), - ?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_PREDEFINED_TOPIC :2, PredefTopicId:16, 1:16, <<20, 21, 22, 23>>/binary>>, receive_response(Socket)), + ?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_PREDEFINED_TOPIC:2, + PredefTopicId:16, 1:16, <<20, 21, 22, 23>>/binary>>, + receive_response(Socket) + ), ?assertEqual(<<4, ?SN_PUBCOMP, MsgId:16>>, receive_response(Socket)), timer:sleep(100), @@ -812,7 +931,11 @@ t_publish_qos2_case03(_) -> ?assertEqual(<<4, ?SN_PUBREC, MsgId:16>>, receive_response(Socket)), send_pubrel_msg(Socket, MsgId), - ?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_SHORT_TOPIC :2, <<"/a">>/binary, 1:16, <<20, 21, 22, 23>>/binary>>, receive_response(Socket)), + ?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_SHORT_TOPIC:2, + "/a", 1:16, <<20, 21, 22, 23>>/binary>>, + receive_response(Socket) + ), ?assertEqual(<<4, ?SN_PUBCOMP, MsgId:16>>, receive_response(Socket)), timer:sleep(100), @@ -1083,7 +1206,11 @@ t_asleep_test03_to_awake_qos1_dl_msg(_) -> send_register_msg(Socket, TopicName1, MsgId1), ?assertEqual(<<7, ?SN_REGACK, TopicId1:16, MsgId1:16, 0:8>>, receive_response(Socket)), send_subscribe_msg_predefined_topic(Socket, QoS, TopicId1, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ReturnCode>>, receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId1:16, MsgId:16, ReturnCode>>, + receive_response(Socket) + ), % goto asleep state send_disconnect_msg(Socket, 1), @@ -1109,7 +1236,10 @@ t_asleep_test03_to_awake_qos1_dl_msg(_) -> %% the broker should sent dl msgs to the awake client before sending the pingresp UdpData = receive_response(Socket), - MsgId_udp = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicId1, Payload1}, UdpData), + MsgId_udp = check_publish_msg_on_udp( + {Dup, QoS, Retain, WillBit, + CleanSession, ?SN_NORMAL_TOPIC, + TopicId1, Payload1}, UdpData), send_puback_msg(Socket, TopicId1, MsgId_udp), %% check the pingresp is received at last @@ -1141,8 +1271,11 @@ t_asleep_test04_to_awake_qos1_dl_msg(_) -> CleanSession = 0, ReturnCode = 0, send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId1), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId1:16, ReturnCode>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + WillBit:1,CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId0:16, MsgId1:16, ReturnCode>>, + receive_response(Socket) + ), % goto asleep state send_disconnect_msg(Socket, 1), @@ -1176,11 +1309,17 @@ t_asleep_test04_to_awake_qos1_dl_msg(_) -> send_regack_msg(Socket, TopicIdNew, MsgId3), UdpData2 = receive_response(Socket), - MsgId_udp2 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload1}, UdpData2), + MsgId_udp2 = check_publish_msg_on_udp( + {Dup, QoS, Retain, WillBit, + CleanSession, ?SN_NORMAL_TOPIC, + TopicIdNew, Payload1}, UdpData2), send_puback_msg(Socket, TopicIdNew, MsgId_udp2), UdpData3 = receive_response(Socket), - MsgId_udp3 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload2}, UdpData3), + MsgId_udp3 = check_publish_msg_on_udp( + {Dup, QoS, Retain, WillBit, + CleanSession, ?SN_NORMAL_TOPIC, + TopicIdNew, Payload2}, UdpData3), send_puback_msg(Socket, TopicIdNew, MsgId_udp3), ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), @@ -1216,8 +1355,11 @@ t_asleep_test05_to_awake_qos1_dl_msg(_) -> CleanSession = 0, ReturnCode = 0, send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId1), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId1:16, ReturnCode>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId0:16, MsgId1:16, ReturnCode>>, + receive_response(Socket) + ), % goto asleep state SleepDuration = 30, @@ -1250,21 +1392,28 @@ t_asleep_test05_to_awake_qos1_dl_msg(_) -> send_regack_msg(Socket, TopicIdNew, MsgId_reg), UdpData2 = receive_response(Socket), - MsgId2 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload2}, UdpData2), + MsgId2 = check_publish_msg_on_udp( + {Dup, QoS, Retain, WillBit, + CleanSession, ?SN_NORMAL_TOPIC, + TopicIdNew, Payload2}, UdpData2), send_puback_msg(Socket, TopicIdNew, MsgId2), timer:sleep(50), UdpData3 = wrap_receive_response(Socket), - MsgId3 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload3}, UdpData3), + MsgId3 = check_publish_msg_on_udp( + {Dup, QoS, Retain, WillBit, + CleanSession, ?SN_NORMAL_TOPIC, + TopicIdNew, Payload3}, UdpData3), send_puback_msg(Socket, TopicIdNew, MsgId3), timer:sleep(50), case receive_response(Socket) of <<2,23>> -> ok; UdpData4 -> - MsgId4 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, - CleanSession, ?SN_NORMAL_TOPIC, - TopicIdNew, Payload4}, UdpData4), + MsgId4 = check_publish_msg_on_udp( + {Dup, QoS, Retain, WillBit, + CleanSession, ?SN_NORMAL_TOPIC, + TopicIdNew, Payload4}, UdpData4), send_puback_msg(Socket, TopicIdNew, MsgId4) end, ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), @@ -1322,7 +1471,10 @@ t_asleep_test06_to_awake_qos2_dl_msg(_) -> send_pingreq_msg(Socket, ClientId), UdpData = wrap_receive_response(Socket), - MsgId_udp = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicId_tom, Payload1}, UdpData), + MsgId_udp = check_publish_msg_on_udp( + {Dup, QoS, Retain, WillBit, + CleanSession, ?SN_NORMAL_TOPIC, + TopicId_tom, Payload1}, UdpData), send_pubrec_msg(Socket, MsgId_udp), ?assertMatch(<<_:8, ?SN_PUBREL:8, _/binary>>, receive_response(Socket)), send_pubcomp_msg(Socket, MsgId_udp), @@ -1357,8 +1509,11 @@ t_asleep_test07_to_connected(_) -> send_register_msg(Socket, TopicName_tom, MsgId1), TopicId_tom = check_regack_msg_on_udp(MsgId1, receive_response(Socket)), send_subscribe_msg_predefined_topic(Socket, QoS, TopicId_tom, MsgId1), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId_tom:16, MsgId1:16, ReturnCode>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + WillBit:1,CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId_tom:16, MsgId1:16, ReturnCode>>, + receive_response(Socket) + ), % goto asleep state send_disconnect_msg(Socket, SleepDuration), @@ -1436,8 +1591,11 @@ t_asleep_test09_to_awake_again_qos1_dl_msg(_) -> CleanSession = 0, ReturnCode = 0, send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId1), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId1:16, ReturnCode>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + WillBit:1,CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId0:16, MsgId1:16, ReturnCode>>, + receive_response(Socket) + ), % goto asleep state SleepDuration = 30, send_disconnect_msg(Socket, SleepDuration), @@ -1471,7 +1629,10 @@ t_asleep_test09_to_awake_again_qos1_dl_msg(_) -> udp_receive_timeout -> ok; UdpData2 -> - MsgId2 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload2}, UdpData2), + MsgId2 = check_publish_msg_on_udp( + {Dup, QoS, Retain, WillBit, + CleanSession, ?SN_NORMAL_TOPIC, + TopicIdNew, Payload2}, UdpData2), send_puback_msg(Socket, TopicIdNew, MsgId2) end, timer:sleep(100), @@ -1480,7 +1641,10 @@ t_asleep_test09_to_awake_again_qos1_dl_msg(_) -> udp_receive_timeout -> ok; UdpData3 -> - MsgId3 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload3}, UdpData3), + MsgId3 = check_publish_msg_on_udp( + {Dup, QoS, Retain, WillBit, + CleanSession, ?SN_NORMAL_TOPIC, + TopicIdNew, Payload3}, UdpData3), send_puback_msg(Socket, TopicIdNew, MsgId3) end, timer:sleep(100), @@ -1489,16 +1653,18 @@ t_asleep_test09_to_awake_again_qos1_dl_msg(_) -> udp_receive_timeout -> ok; UdpData4 -> - MsgId4 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, - CleanSession, ?SN_NORMAL_TOPIC, - TopicIdNew, Payload4}, UdpData4), + MsgId4 = check_publish_msg_on_udp( + {Dup, QoS, Retain, WillBit, + CleanSession, ?SN_NORMAL_TOPIC, + TopicIdNew, Payload4}, UdpData4), send_puback_msg(Socket, TopicIdNew, MsgId4) end, ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), %% send PINGREQ again to enter awake state send_pingreq_msg(Socket, ClientId), - %% will not receive any buffered PUBLISH messages buffered before last awake, only receive PINGRESP here + %% will not receive any buffered PUBLISH messages buffered before last + %% awake, only receive PINGRESP here ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), gen_udp:close(Socket). @@ -1901,8 +2067,12 @@ check_dispatched_message(Dup, QoS, Retain, TopicIdType, TopicId, Payload, Socket PubMsg = receive_response(Socket), Length = 7 + byte_size(Payload), ?LOG("check_dispatched_message ~p~n", [PubMsg]), - ?LOG("expected ~p xx ~p~n", [<>, Payload]), - <> = PubMsg, + ?LOG("expected ~p xx ~p~n", + [<>, Payload]), + <> = PubMsg, case QoS of 0 -> ok; 1 -> send_puback_msg(Socket, TopicId, MsgId); @@ -1914,11 +2084,14 @@ check_dispatched_message(Dup, QoS, Retain, TopicIdType, TopicId, Payload, Socket get_udp_broadcast_address() -> "255.255.255.255". -check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, TopicType, TopicId, Payload}, UdpData) -> +check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, + CleanSession, TopicType, TopicId, Payload}, UdpData) -> <> = UdpData, ct:pal("UdpData: ~p, Payload: ~p, PayloadIn: ~p", [UdpData, Payload, PayloadIn]), Size9 = byte_size(Payload) + 7, - Eexp = <>, + Eexp = <>, ?assertEqual(Eexp, HeaderUdp), % mqtt-sn header should be same ?assertEqual(Payload, PayloadIn), % payload should be same MsgId. diff --git a/src/emqx_types.erl b/src/emqx_types.erl index 410ed5a27..3d5fb66a5 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -147,7 +147,7 @@ dn => binary(), atom() => term() }). --type(clientid() :: binary()|atom()). +-type(clientid() :: binary() | atom()). -type(username() :: maybe(binary())). -type(password() :: maybe(binary())). -type(peerhost() :: inet:ip_address()).