diff --git a/apps/emqx_coap/src/emqx_coap.app.src b/apps/emqx_coap/src/emqx_coap.app.src index 2b5fcbb6a..dce5ef4f4 100644 --- a/apps/emqx_coap/src/emqx_coap.app.src +++ b/apps/emqx_coap/src/emqx_coap.app.src @@ -1,6 +1,6 @@ {application, emqx_coap, [{description, "EMQ X CoAP Gateway"}, - {vsn, "4.3.0"}, % strict semver, bump manually! + {vsn, "4.3.1"}, % strict semver, bump manually! {modules, []}, {registered, []}, {applications, [kernel,stdlib,gen_coap]}, diff --git a/apps/emqx_coap/src/emqx_coap.appup.src b/apps/emqx_coap/src/emqx_coap.appup.src new file mode 100644 index 000000000..b73e26be6 --- /dev/null +++ b/apps/emqx_coap/src/emqx_coap.appup.src @@ -0,0 +1,9 @@ +%% -*-: erlang -*- +{VSN, + [{"4.3.0",[ + {load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []}]}, + {<<".*">>, []}], + [{"4.3.0",[ + {load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []}]}, + {<<".*">>, []}] +}. diff --git a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl index d465f9ca3..a6ea09881 100644 --- a/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl +++ b/apps/emqx_coap/src/emqx_coap_mqtt_adapter.erl @@ -58,6 +58,8 @@ -define(SUBOPTS, #{rh => 0, rap => 0, nl => 0, qos => ?QOS_0, is_new => false}). +-define(PROTO_VER, 1). + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- @@ -139,7 +141,7 @@ handle_call({subscribe, Topic, CoapPid}, _From, State=#state{sub_topics = TopicL NewTopics = proplists:delete(Topic, TopicList), IsWild = emqx_topic:wildcard(Topic), {reply, chann_subscribe(Topic, State), State#state{sub_topics = - [{Topic, {IsWild, CoapPid}}|NewTopics]}, hibernate}; + [{Topic, {IsWild, CoapPid}} | NewTopics]}, hibernate}; handle_call({unsubscribe, Topic, _CoapPid}, _From, State=#state{sub_topics = TopicList}) -> NewTopics = proplists:delete(Topic, TopicList), @@ -244,15 +246,26 @@ chann_publish(Topic, Payload, State = #state{clientid = ClientId}) -> case emqx_access_control:check_acl(clientinfo(State), publish, Topic) of allow -> _ = emqx_broker:publish( - emqx_message:set_flag(retain, false, - emqx_message:make(ClientId, ?QOS_0, Topic, Payload))), - ok; + packet_to_message(Topic, Payload, State)), ok; deny -> ?LOG(warning, "publish to ~p by clientid ~p failed due to acl check.", [Topic, ClientId]), {error, forbidden} end. +packet_to_message(Topic, Payload, + #state{clientid = ClientId, + username = Username, + peername = {PeerHost, _}}) -> + Message = emqx_message:set_flag( + retain, false, + emqx_message:make(ClientId, ?QOS_0, Topic, Payload) + ), + emqx_message:set_headers( + #{ proto_ver => ?PROTO_VER + , protocol => coap + , username => Username + , peerhost => PeerHost}, Message). %%-------------------------------------------------------------------- %% Deliver @@ -270,7 +283,7 @@ do_deliver({Topic, Payload}, Subscribers) -> deliver_to_coap(_TopicName, _Payload, []) -> ok; -deliver_to_coap(TopicName, Payload, [{TopicFilter, {IsWild, CoapPid}}|T]) -> +deliver_to_coap(TopicName, Payload, [{TopicFilter, {IsWild, CoapPid}} | T]) -> Matched = case IsWild of true -> emqx_topic:match(TopicName, TopicFilter); false -> TopicName =:= TopicFilter @@ -324,7 +337,7 @@ conninfo(#state{peername = Peername, peercert => nossl, %% TODO: dtls conn_mod => ?MODULE, proto_name => <<"CoAP">>, - proto_ver => 1, + proto_ver => ?PROTO_VER, clean_start => true, clientid => ClientId, username => undefined, @@ -384,4 +397,3 @@ clientinfo(#state{peername = {PeerHost, _}, mountpoint => undefined, ws_cookie => undefined }. - diff --git a/apps/emqx_exhook/etc/emqx_exhook.conf b/apps/emqx_exhook/etc/emqx_exhook.conf index ffb71e43b..23895f902 100644 --- a/apps/emqx_exhook/etc/emqx_exhook.conf +++ b/apps/emqx_exhook/etc/emqx_exhook.conf @@ -24,6 +24,11 @@ ## Value: false | Duration #exhook.auto_reconnect = 60s +## The process pool size for gRPC client +## +## Default: Equals cpu cores +## Value: Integer +#exhook.pool_size = 16 ##-------------------------------------------------------------------- ## The Hook callback servers diff --git a/apps/emqx_exhook/priv/emqx_exhook.schema b/apps/emqx_exhook/priv/emqx_exhook.schema index d11001c0d..f55913d72 100644 --- a/apps/emqx_exhook/priv/emqx_exhook.schema +++ b/apps/emqx_exhook/priv/emqx_exhook.schema @@ -26,6 +26,10 @@ end end}. +{mapping, "exhook.pool_size", "emqx_exhook.pool_size", [ + {datatype, integer} +]}. + {mapping, "exhook.server.$name.url", "emqx_exhook.servers", [ {datatype, string} ]}. diff --git a/apps/emqx_exhook/priv/protos/exhook.proto b/apps/emqx_exhook/priv/protos/exhook.proto index 72ba26581..639066c6a 100644 --- a/apps/emqx_exhook/priv/protos/exhook.proto +++ b/apps/emqx_exhook/priv/protos/exhook.proto @@ -358,6 +358,31 @@ message Message { bytes payload = 6; uint64 timestamp = 7; + + // The key of header can be: + // - username: + // * Readonly + // * The username of sender client + // * Value type: utf8 string + // - protocol: + // * Readonly + // * The protocol name of sender client + // * Value type: string enum with "mqtt", "mqtt-sn", ... + // - peerhost: + // * Readonly + // * The peerhost of sender client + // * Value type: ip address string + // - allow_publish: + // * Writable + // * Whether to allow the message to be published by emqx + // * Value type: string enum with "true", "false", default is "true" + // + // Notes: All header may be missing, which means that the message does not + // carry these headers. We can guarantee that clients coming from MQTT, + // MQTT-SN, CoAP, LwM2M and other natively supported protocol clients will + // carry these headers, but there is no guarantee that messages published + // by other means will do, e.g. messages published by HTTP-API + map headers = 8; } message Property { diff --git a/apps/emqx_exhook/rebar.config b/apps/emqx_exhook/rebar.config index 3529b6314..d1cc4d778 100644 --- a/apps/emqx_exhook/rebar.config +++ b/apps/emqx_exhook/rebar.config @@ -5,7 +5,7 @@ ]}. {deps, - [{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.3"}}} + [{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.4"}}} ]}. {grpc, diff --git a/apps/emqx_exhook/src/emqx_exhook.app.src b/apps/emqx_exhook/src/emqx_exhook.app.src index 46223d212..715060df4 100644 --- a/apps/emqx_exhook/src/emqx_exhook.app.src +++ b/apps/emqx_exhook/src/emqx_exhook.app.src @@ -1,6 +1,6 @@ {application, emqx_exhook, [{description, "EMQ X Extension for Hook"}, - {vsn, "4.3.4"}, + {vsn, "4.4.0"}, {modules, []}, {registered, []}, {mod, {emqx_exhook_app, []}}, diff --git a/apps/emqx_exhook/src/emqx_exhook.appup.src b/apps/emqx_exhook/src/emqx_exhook.appup.src index d6a699c33..0fe0ea78f 100644 --- a/apps/emqx_exhook/src/emqx_exhook.appup.src +++ b/apps/emqx_exhook/src/emqx_exhook.appup.src @@ -1,15 +1,7 @@ %% -*-: erlang -*- {VSN, - [ - {<<"4.3.[0-3]">>, [ - {restart_application, emqx_exhook} - ]}, - {<<".*">>, []} + [{<<".*">>, []} ], - [ - {<<"4.3.[0-3]">>, [ - {restart_application, emqx_exhook} - ]}, - {<<".*">>, []} + [{<<".*">>, []} ] }. diff --git a/apps/emqx_exhook/src/emqx_exhook_handler.erl b/apps/emqx_exhook/src/emqx_exhook_handler.erl index f3964dc42..63d41d8eb 100644 --- a/apps/emqx_exhook/src/emqx_exhook_handler.erl +++ b/apps/emqx_exhook/src/emqx_exhook_handler.erl @@ -50,6 +50,7 @@ %% Utils -export([ message/1 + , headers/1 , stringfy/1 , merge_responsed_bool/2 , merge_responsed_message/2 @@ -62,6 +63,8 @@ , call_fold/3 ]). +-elvis([{elvis_style, god_modules, disable}]). + %%-------------------------------------------------------------------- %% Clients %%-------------------------------------------------------------------- @@ -258,17 +261,58 @@ clientinfo(ClientInfo = cn => maybe(maps:get(cn, ClientInfo, undefined)), dn => maybe(maps:get(dn, ClientInfo, undefined))}. -message(#message{id = Id, qos = Qos, from = From, topic = Topic, payload = Payload, timestamp = Ts}) -> +message(#message{id = Id, qos = Qos, from = From, topic = Topic, + payload = Payload, timestamp = Ts, headers = Headers}) -> #{node => stringfy(node()), id => emqx_guid:to_hexstr(Id), qos => Qos, from => stringfy(From), topic => Topic, payload => Payload, - timestamp => Ts}. + timestamp => Ts, + headers => headers(Headers) + }. -assign_to_message(#{qos := Qos, topic := Topic, payload := Payload}, Message) -> - Message#message{qos = Qos, topic = Topic, payload = Payload}. +headers(Headers) -> + Ls = [username, protocol, peerhost, allow_publish], + maps:fold( + fun + (_, undefined, Acc) -> + Acc; %% Ignore undefined value + (K, V, Acc) -> + case lists:member(K, Ls) of + true -> + Acc#{atom_to_binary(K) => bin(K, V)}; + _ -> + Acc + end + end, #{}, Headers). + +bin(K, V) when K == username; + K == protocol; + K == allow_publish -> + bin(V); +bin(peerhost, V) -> + bin(inet:ntoa(V)). + +bin(V) when is_binary(V) -> V; +bin(V) when is_atom(V) -> atom_to_binary(V); +bin(V) when is_list(V) -> iolist_to_binary(V). + +assign_to_message(InMessage = #{qos := Qos, topic := Topic, + payload := Payload}, Message) -> + NMsg = Message#message{qos = Qos, topic = Topic, payload = Payload}, + enrich_header(maps:get(headers, InMessage, #{}), NMsg). + +enrich_header(Headers, Message) -> + case maps:get(<<"allow_publish">>, Headers, undefined) of + <<"false">> -> + emqx_message:set_header(allow_publish, false, Message); + <<"true">> -> + emqx_message:set_header(allow_publish, true, Message); + _ -> + Message + end. topicfilters(Tfs) when is_list(Tfs) -> [#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs]. @@ -299,11 +343,7 @@ merge_responsed_bool(_Req, #{type := 'IGNORE'}) -> ignore; merge_responsed_bool(Req, #{type := Type, value := {bool_result, NewBool}}) when is_boolean(NewBool) -> - NReq = Req#{result => NewBool}, - case Type of - 'CONTINUE' -> {ok, NReq}; - 'STOP_AND_RETURN' -> {stop, NReq} - end; + {ret(Type), Req#{result => NewBool}}; merge_responsed_bool(_Req, Resp) -> ?LOG(warning, "Unknown responsed value ~0p to merge to callback chain", [Resp]), ignore. @@ -311,11 +351,10 @@ merge_responsed_bool(_Req, Resp) -> merge_responsed_message(_Req, #{type := 'IGNORE'}) -> ignore; merge_responsed_message(Req, #{type := Type, value := {message, NMessage}}) -> - NReq = Req#{message => NMessage}, - case Type of - 'CONTINUE' -> {ok, NReq}; - 'STOP_AND_RETURN' -> {stop, NReq} - end; + {ret(Type), Req#{message => NMessage}}; merge_responsed_message(_Req, Resp) -> ?LOG(warning, "Unknown responsed value ~0p to merge to callback chain", [Resp]), ignore. + +ret('CONTINUE') -> ok; +ret('STOP_AND_RETURN') -> stop. diff --git a/apps/emqx_exhook/src/emqx_exhook_mngr.erl b/apps/emqx_exhook/src/emqx_exhook_mngr.erl index cadd5eb37..d4c493cb8 100644 --- a/apps/emqx_exhook/src/emqx_exhook_mngr.erl +++ b/apps/emqx_exhook/src/emqx_exhook_mngr.erl @@ -36,6 +36,8 @@ , server/1 , put_request_failed_action/1 , get_request_failed_action/0 + , put_pool_size/1 + , get_pool_size/0 ]). %% gen_server callbacks @@ -84,11 +86,11 @@ start_link(Servers, AutoReconnect, ReqOpts) -> gen_server:start_link(?MODULE, [Servers, AutoReconnect, ReqOpts], []). --spec enable(pid(), atom()|string()) -> ok | {error, term()}. +-spec enable(pid(), atom() | string()) -> ok | {error, term()}. enable(Pid, Name) -> call(Pid, {load, Name}). --spec disable(pid(), atom()|string()) -> ok | {error, term()}. +-spec disable(pid(), atom() | string()) -> ok | {error, term()}. disable(Pid, Name) -> call(Pid, {unload, Name}). @@ -117,6 +119,9 @@ init([Servers, AutoReconnect, ReqOpts0]) -> put_request_failed_action( maps:get(request_failed_action, ReqOpts0, deny) ), + put_pool_size( + maps:get(pool_size, ReqOpts0, erlang:system_info(schedulers)) + ), %% Load the hook servers ReqOpts = maps:without([request_failed_action], ReqOpts0), @@ -136,7 +141,7 @@ load_all_servers(Servers, ReqOpts) -> load_all_servers(Servers, ReqOpts, #{}, #{}). load_all_servers([], _Request, Waiting, Running) -> {Waiting, Running}; -load_all_servers([{Name, Options}|More], ReqOpts, Waiting, Running) -> +load_all_servers([{Name, Options} | More], ReqOpts, Waiting, Running) -> {NWaiting, NRunning} = case emqx_exhook_server:load(Name, Options, ReqOpts) of {ok, ServerState} -> @@ -286,6 +291,14 @@ put_request_failed_action(Val) -> get_request_failed_action() -> persistent_term:get({?APP, request_failed_action}). +put_pool_size(Val) -> + persistent_term:put({?APP, pool_size}, Val). + +get_pool_size() -> + %% Avoid the scenario that the parameter is not set after + %% the hot upgrade completed. + persistent_term:get({?APP, pool_size}, erlang:system_info(schedulers)). + save(Name, ServerState) -> Saved = persistent_term:get(?APP, []), persistent_term:put(?APP, lists:reverse([Name | Saved])), diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index 7df5b643c..276f5a638 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -77,6 +77,8 @@ -dialyzer({nowarn_function, [inc_metrics/2]}). +-elvis([{elvis_style, dont_repeat_yourself, disable}]). + %%-------------------------------------------------------------------- %% Load/Unload APIs %%-------------------------------------------------------------------- @@ -125,13 +127,18 @@ channel_opts(Opts) -> SvrAddr = format_http_uri(Scheme, Host, Port), ClientOpts = case Scheme of https -> - SslOpts = lists:keydelete(ssl, 1, proplists:get_value(ssl_options, Opts, [])), + SslOpts = lists:keydelete( + ssl, + 1, + proplists:get_value(ssl_options, Opts, []) + ), #{gun_opts => #{transport => ssl, transport_opts => SslOpts}}; _ -> #{} end, - {SvrAddr, ClientOpts}. + NClientOpts = ClientOpts#{pool_size => emqx_exhook_mngr:get_pool_size()}, + {SvrAddr, NClientOpts}. format_http_uri(Scheme, Host0, Port) -> Host = case is_tuple(Host0) of @@ -174,16 +181,18 @@ resovle_hookspec(HookSpecs) when is_list(HookSpecs) -> case maps:get(name, HookSpec, undefined) of undefined -> Acc; Name0 -> - Name = try binary_to_existing_atom(Name0, utf8) catch T:R:_ -> {T,R} end, - case lists:member(Name, AvailableHooks) of - true -> - case lists:member(Name, MessageHooks) of - true -> - Acc#{Name => #{topics => maps:get(topics, HookSpec, [])}}; - _ -> - Acc#{Name => #{}} - end; - _ -> error({unknown_hookpoint, Name}) + Name = try + binary_to_existing_atom(Name0, utf8) + catch T:R -> {T,R} + end, + case {lists:member(Name, AvailableHooks), + lists:member(Name, MessageHooks)} of + {false, _} -> + error({unknown_hookpoint, Name}); + {true, false} -> + Acc#{Name => #{}}; + {true, true} -> + Acc#{Name => #{topics => maps:get(topics, HookSpec, [])}} end end end, #{}, HookSpecs). @@ -255,7 +264,7 @@ call(Hookpoint, Req, #server{name = ChannName, options = ReqOpts, %% @private inc_metrics(IncFun, Name) when is_function(IncFun) -> %% BACKW: e4.2.0-e4.2.2 - {env, [Prefix|_]} = erlang:fun_info(IncFun, env), + {env, [Prefix | _]} = erlang:fun_info(IncFun, env), inc_metrics(Prefix, Name); inc_metrics(Prefix, Name) when is_list(Prefix) -> emqx_metrics:inc(list_to_atom(Prefix ++ atom_to_list(Name))). @@ -271,8 +280,8 @@ do_call(ChannName, Fun, Req, ReqOpts) -> Options = ReqOpts#{channel => ChannName}, ?LOG(debug, "Call ~0p:~0p(~0p, ~0p)", [?PB_CLIENT_MOD, Fun, Req, Options]), case catch apply(?PB_CLIENT_MOD, Fun, [Req, Options]) of - {ok, Resp, _Metadata} -> - ?LOG(debug, "Response {ok, ~0p, ~0p}", [Resp, _Metadata]), + {ok, Resp, Metadata} -> + ?LOG(debug, "Response {ok, ~0p, ~0p}", [Resp, Metadata]), {ok, Resp}; {error, {Code, Msg}, _Metadata} -> ?LOG(error, "CALL ~0p:~0p(~0p, ~0p) response errcode: ~0p, errmsg: ~0p", diff --git a/apps/emqx_exhook/src/emqx_exhook_sup.erl b/apps/emqx_exhook/src/emqx_exhook_sup.erl index e9c405de0..c92fd6ca4 100644 --- a/apps/emqx_exhook/src/emqx_exhook_sup.erl +++ b/apps/emqx_exhook/src/emqx_exhook_sup.erl @@ -54,7 +54,8 @@ auto_reconnect() -> request_options() -> #{timeout => env(request_timeout, 5000), - request_failed_action => env(request_failed_action, deny) + request_failed_action => env(request_failed_action, deny), + pool_size => env(pool_size, erlang:system_info(schedulers)) }. env(Key, Def) -> @@ -67,7 +68,7 @@ env(Key, Def) -> -spec start_grpc_client_channel( string(), uri_string:uri_string(), - grpc_client:options()) -> {ok, pid()} | {error, term()}. + grpc_client_sup:options()) -> {ok, pid()} | {error, term()}. start_grpc_client_channel(Name, SvrAddr, Options) -> grpc_client_sup:create_channel_pool(Name, SvrAddr, Options). diff --git a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl index c2db04dd4..b05748856 100644 --- a/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl +++ b/apps/emqx_exhook/test/emqx_exhook_demo_svr.erl @@ -299,21 +299,31 @@ on_message_publish(#{message := #{from := From} = Msg} = Req, Md) -> %% some cases for testing case From of <<"baduser">> -> - NMsg = Msg#{qos => 0, + NMsg = deny(Msg#{qos => 0, topic => <<"">>, payload => <<"">> - }, + }), {ok, #{type => 'STOP_AND_RETURN', value => {message, NMsg}}, Md}; <<"gooduser">> -> - NMsg = Msg#{topic => From, - payload => From}, + NMsg = allow(Msg#{topic => From, + payload => From}), {ok, #{type => 'STOP_AND_RETURN', value => {message, NMsg}}, Md}; _ -> {ok, #{type => 'IGNORE'}, Md} end. +deny(Msg) -> + NHeader = maps:put(<<"allow_publish">>, <<"false">>, + maps:get(headers, Msg, #{})), + maps:put(headers, NHeader, Msg). + +allow(Msg) -> + NHeader = maps:put(<<"allow_publish">>, <<"true">>, + maps:get(headers, Msg, #{})), + maps:put(headers, NHeader, Msg). + -spec on_message_delivered(emqx_exhook_pb:message_delivered_request(), grpc:metadata()) -> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()} | {error, grpc_cowboy_h:error_response()}. diff --git a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl index 24f45c8b0..88eba8f11 100644 --- a/apps/emqx_exhook/test/props/prop_exhook_hooks.erl +++ b/apps/emqx_exhook/test/props/prop_exhook_hooks.erl @@ -299,19 +299,24 @@ prop_message_publish() -> _ -> ExpectedOutMsg = case emqx_message:from(Msg) of <<"baduser">> -> - MsgMap = emqx_message:to_map(Msg), + MsgMap = #{headers := Headers} + = emqx_message:to_map(Msg), emqx_message:from_map( MsgMap#{qos => 0, topic => <<"">>, - payload => <<"">> + payload => <<"">>, + headers => maps:put(allow_publish, false, Headers) }); <<"gooduser">> = From -> - MsgMap = emqx_message:to_map(Msg), + MsgMap = #{headers := Headers} + = emqx_message:to_map(Msg), emqx_message:from_map( MsgMap#{topic => From, - payload => From + payload => From, + headers => maps:put(allow_publish, true, Headers) }); - _ -> Msg + _ -> + Msg end, ?assertEqual(ExpectedOutMsg, OutMsg), @@ -464,7 +469,9 @@ from_message(Msg) -> from => stringfy(emqx_message:from(Msg)), topic => emqx_message:topic(Msg), payload => emqx_message:payload(Msg), - timestamp => emqx_message:timestamp(Msg) + timestamp => emqx_message:timestamp(Msg), + headers => emqx_exhook_handler:headers( + emqx_message:get_headers(Msg)) }. %%-------------------------------------------------------------------- diff --git a/apps/emqx_exproto/rebar.config b/apps/emqx_exproto/rebar.config index 4ad1aa192..da868de82 100644 --- a/apps/emqx_exproto/rebar.config +++ b/apps/emqx_exproto/rebar.config @@ -13,7 +13,7 @@ ]}. {deps, - [{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.3"}}} + [{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.4"}}} ]}. {grpc, diff --git a/apps/emqx_exproto/src/emqx_exproto.app.src b/apps/emqx_exproto/src/emqx_exproto.app.src index f7cab4c2e..0ed54e6fd 100644 --- a/apps/emqx_exproto/src/emqx_exproto.app.src +++ b/apps/emqx_exproto/src/emqx_exproto.app.src @@ -1,6 +1,6 @@ {application, emqx_exproto, [{description, "EMQ X Extension for Protocol"}, - {vsn, "4.3.4"}, %% 4.3.3 is used by ee + {vsn, "4.3.5"}, %% 4.3.3 is used by ee {modules, []}, {registered, []}, {mod, {emqx_exproto_app, []}}, diff --git a/apps/emqx_exproto/src/emqx_exproto.appup.src b/apps/emqx_exproto/src/emqx_exproto.appup.src index e0a021af5..7da28e773 100644 --- a/apps/emqx_exproto/src/emqx_exproto.appup.src +++ b/apps/emqx_exproto/src/emqx_exproto.appup.src @@ -1,6 +1,8 @@ %% -*- mode: erlang -*- {VSN, - [{"4.3.3", + [{"4.3.4", + [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, + {"4.3.3", [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {"4.3.2", @@ -12,7 +14,9 @@ {load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.3.3", + [{"4.3.4", + [{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, + {"4.3.3", [{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]}, {load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]}, {"4.3.2", diff --git a/apps/emqx_exproto/src/emqx_exproto_channel.erl b/apps/emqx_exproto/src/emqx_exproto_channel.erl index 67f85f932..44b8b64d3 100644 --- a/apps/emqx_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_exproto/src/emqx_exproto_channel.erl @@ -340,17 +340,14 @@ handle_call({unsubscribe, TopicFilter}, handle_call({publish, Topic, Qos, Payload}, Channel = #channel{ conn_state = connected, - clientinfo = ClientInfo - = #{clientid := From, - mountpoint := Mountpoint}}) -> + clientinfo = ClientInfo}) -> case is_acl_enabled(ClientInfo) andalso emqx_access_control:check_acl(ClientInfo, publish, Topic) of deny -> {reply, {error, ?RESP_PERMISSION_DENY, <<"ACL deny">>}, Channel}; _ -> - Msg = emqx_message:make(From, Qos, Topic, Payload), - NMsg = emqx_mountpoint:mount(Mountpoint, Msg), - _ = emqx:publish(NMsg), + Msg = packet_to_message(Topic, Qos, Payload, Channel), + _ = emqx:publish(Msg), {reply, ok, Channel} end; @@ -419,6 +416,24 @@ is_anonymous(_AuthResult) -> false. clean_anonymous_clients() -> ets:delete(?CHAN_CONN_TAB, ?CHANMOCK(self())). +packet_to_message(Topic, Qos, Payload, + #channel{ + conninfo = #{proto_ver := ProtoVer}, + clientinfo = #{ + protocol := Protocol, + clientid := ClientId, + username := Username, + peerhost := PeerHost, + mountpoint := Mountpoint}}) -> + Msg = emqx_message:make( + ClientId, Qos, + Topic, Payload, #{}, + #{proto_ver => ProtoVer, + protocol => Protocol, + username => Username, + peerhost => PeerHost}), + emqx_mountpoint:mount(Mountpoint, Msg). + %%-------------------------------------------------------------------- %% Sub/UnSub %%-------------------------------------------------------------------- diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m.app.src b/apps/emqx_lwm2m/src/emqx_lwm2m.app.src index 551cf8d07..b929ad854 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m.app.src +++ b/apps/emqx_lwm2m/src/emqx_lwm2m.app.src @@ -1,6 +1,6 @@ {application,emqx_lwm2m, [{description,"EMQ X LwM2M Gateway"}, - {vsn, "4.3.4"}, % strict semver, bump manually! + {vsn, "4.3.5"}, % strict semver, bump manually! {modules,[]}, {registered,[emqx_lwm2m_sup]}, {applications,[kernel,stdlib,lwm2m_coap]}, diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src b/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src index 600cf236b..6656eb149 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src +++ b/apps/emqx_lwm2m/src/emqx_lwm2m.appup.src @@ -1,5 +1,5 @@ %% -*-: erlang -*- -{"4.3.4", +{VSN, [ {<<"4\\.3\\.[0-1]">>, [ {restart_application, emqx_lwm2m} @@ -7,7 +7,10 @@ {"4.3.2", [ {load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []} ]}, - {"4.3.3", []} %% only config change + {"4.3.3", []}, %% only config change + {"4.3.4", [ + {load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []} + ]} ], [ {<<"4\\.3\\.[0-1]">>, [ @@ -16,6 +19,9 @@ {"4.3.2", [ {load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []} ]}, - {"4.3.3", []} %% only config change + {"4.3.3", []}, %% only config change + {"4.3.4", [ + {load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []} + ]} ] }. diff --git a/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl b/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl index 34c72dcca..7f40041a5 100644 --- a/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl +++ b/apps/emqx_lwm2m/src/emqx_lwm2m_protocol.erl @@ -74,7 +74,8 @@ call(Pid, Msg, Timeout) -> Error -> {error, Error} end. -init(CoapPid, EndpointName, Peername = {_Peerhost, _Port}, RegInfo = #{<<"lt">> := LifeTime, <<"lwm2m">> := Ver}) -> +init(CoapPid, EndpointName, Peername = {_Peerhost, _Port}, + RegInfo = #{<<"lt">> := LifeTime, <<"lwm2m">> := Ver}) -> Mountpoint = proplists:get_value(mountpoint, lwm2m_coap_responder:options(), ""), Lwm2mState = #lwm2m_state{peername = Peername, endpoint_name = EndpointName, @@ -103,9 +104,10 @@ init(CoapPid, EndpointName, Peername = {_Peerhost, _Port}, RegInfo = #{<<"lt">> emqx_cm:register_channel(EndpointName, CoapPid, conninfo(Lwm2mState1)) end), emqx_cm:insert_channel_info(EndpointName, info(Lwm2mState1), stats(Lwm2mState1)), - emqx_lwm2m_cm:register_channel(EndpointName, RegInfo, LifeTime, Ver, Peername), + emqx_lwm2m_cm:register_channel(EndpointName, RegInfo, LifeTime, Ver, Peername), - {ok, Lwm2mState1#lwm2m_state{life_timer = emqx_lwm2m_timer:start_timer(LifeTime, {life_timer, expired})}}; + NTimer = emqx_lwm2m_timer:start_timer(LifeTime, {life_timer, expired}), + {ok, Lwm2mState1#lwm2m_state{life_timer = NTimer}}; {error, Error} -> _ = run_hooks('client.connack', [conninfo(Lwm2mState), not_authorized], undefined), {error, Error} @@ -133,7 +135,7 @@ update_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, regi %% - report the registration info update, but only when objectList is updated. case NewRegInfo of #{<<"objectList">> := _} -> - emqx_lwm2m_cm:update_reg_info(Epn, NewRegInfo), + emqx_lwm2m_cm:update_reg_info(Epn, NewRegInfo), send_to_broker(<<"update">>, #{<<"data">> => UpdatedRegInfo}, Lwm2mState); _ -> ok end @@ -186,7 +188,8 @@ deliver(#message{topic = Topic, payload = Payload}, started_at = StartedAt, endpoint_name = EndpointName}) -> IsCacheMode = is_cache_mode(RegInfo, StartedAt), - ?LOG(debug, "Get MQTT message from broker, IsCacheModeNow?: ~p, Topic: ~p, Payload: ~p", [IsCacheMode, Topic, Payload]), + ?LOG(debug, "Get MQTT message from broker, IsCacheModeNow?: ~p, " + "Topic: ~p, Payload: ~p", [IsCacheMode, Topic, Payload]), AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>), deliver_to_coap(AlternatePath, Payload, CoapPid, IsCacheMode, EndpointName), Lwm2mState. @@ -235,8 +238,20 @@ unsubscribe(Topic, Lwm2mState = #lwm2m_state{endpoint_name = _EndpointName}) -> emqx_broker:unsubscribe(Topic), emqx_hooks:run('session.unsubscribed', [clientinfo(Lwm2mState), Topic, Opts]). -publish(Topic, Payload, Qos, EndpointName) -> - emqx_broker:publish(emqx_message:set_flag(retain, false, emqx_message:make(EndpointName, Qos, Topic, Payload))). +publish(Topic, Payload, Qos, + #lwm2m_state{ + version = ProtoVer, + peername = {PeerHost, _}, + endpoint_name = EndpointName}) -> + Message = emqx_message:set_flag( + retain, false, + emqx_message:make(EndpointName, Qos, Topic, Payload) + ), + NMessage = emqx_message:set_headers( + #{proto_ver => ProtoVer, + protocol => lwm2m, + peerhost => PeerHost}, Message), + emqx_broker:publish(NMessage). time_now() -> erlang:system_time(millisecond). @@ -244,7 +259,8 @@ time_now() -> erlang:system_time(millisecond). %% Deliver downlink message to coap %%-------------------------------------------------------------------- -deliver_to_coap(AlternatePath, JsonData, CoapPid, CacheMode, EndpointName) when is_binary(JsonData)-> +deliver_to_coap(AlternatePath, JsonData, + CoapPid, CacheMode, EndpointName) when is_binary(JsonData)-> try TermData = emqx_json:decode(JsonData, [return_maps]), deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName) @@ -273,7 +289,8 @@ deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName) when send_to_broker(EventType, Payload = #{}, Lwm2mState) -> do_send_to_broker(EventType, Payload, Lwm2mState). -do_send_to_broker(EventType, #{<<"data">> := Data} = Payload, #lwm2m_state{endpoint_name = EndpointName} = Lwm2mState) -> +do_send_to_broker(EventType, #{<<"data">> := Data} = Payload, + #lwm2m_state{endpoint_name = EndpointName} = Lwm2mState) -> ReqPath = maps:get(<<"reqPath">>, Data, undefined), Code = maps:get(<<"code">>, Data, undefined), CodeMsg = maps:get(<<"codeMsg">>, Data, undefined), @@ -281,7 +298,7 @@ do_send_to_broker(EventType, #{<<"data">> := Data} = Payload, #lwm2m_state{endpo emqx_lwm2m_cm:register_cmd(EndpointName, ReqPath, EventType, {Code, CodeMsg, Content}), NewPayload = maps:put(<<"msgType">>, EventType, Payload), Topic = uplink_topic(EventType, Lwm2mState), - publish(Topic, emqx_json:encode(NewPayload), _Qos = 0, Lwm2mState#lwm2m_state.endpoint_name). + publish(Topic, emqx_json:encode(NewPayload), _Qos = 0, Lwm2mState). %%-------------------------------------------------------------------- %% Auto Observe @@ -315,18 +332,27 @@ auto_observe(AlternatePath, ObjectList, CoapPid, EndpointName) -> observe_object_list(AlternatePath, ObjectList, CoapPid, EndpointName) -> lists:foreach(fun(ObjectPath) -> - [ObjId| LastPath] = emqx_lwm2m_cmd_handler:path_list(ObjectPath), + [ObjId | LastPath] = emqx_lwm2m_cmd_handler:path_list(ObjectPath), case ObjId of <<"19">> -> [ObjInsId | _LastPath1] = LastPath, case ObjInsId of <<"0">> -> - observe_object_slowly(AlternatePath, <<"/19/0/0">>, CoapPid, 100, EndpointName); + observe_object_slowly( + AlternatePath, <<"/19/0/0">>, + CoapPid, 100, EndpointName + ); _ -> - observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName) + observe_object_slowly( + AlternatePath, ObjectPath, + CoapPid, 100, EndpointName + ) end; _ -> - observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName) + observe_object_slowly( + AlternatePath, ObjectPath, + CoapPid, 100, EndpointName + ) end end, ObjectList). @@ -380,11 +406,12 @@ get_cached_downlink_messages() -> is_cache_mode(RegInfo, StartedAt) -> case is_psm(RegInfo) orelse is_qmode(RegInfo) of true -> - QModeTimeWind = proplists:get_value(qmode_time_window, lwm2m_coap_responder:options(), 22), - Now = time_now(), - if (Now - StartedAt) >= QModeTimeWind -> true; - true -> false - end; + QModeTimeWind = proplists:get_value( + qmode_time_window, + lwm2m_coap_responder:options(), + 22 + ), + (time_now() - StartedAt) >= QModeTimeWind; false -> false end. diff --git a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl index 0947bdaca..27215cd4f 100644 --- a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl @@ -50,6 +50,9 @@ %% erlang:system_time should be unique and random enough -define(CLIENTID, iolist_to_binary([atom_to_list(?FUNCTION_NAME), "-", integer_to_list(erlang:system_time())])). + +-elvis([{elvis_style, dont_repeat_yourself, disable}]). + %%-------------------------------------------------------------------- %% Setups %%-------------------------------------------------------------------- @@ -66,8 +69,10 @@ end_per_suite(_) -> emqx_ct_helpers:stop_apps([emqx_sn]). set_special_confs(emqx) -> - application:set_env(emqx, plugins_loaded_file, - emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins")); + application:set_env( + emqx, + plugins_loaded_file, + emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins")); set_special_confs(emqx_sn) -> application:set_env(emqx_sn, enable_qos3, ?ENABLE_QOS3), application:set_env(emqx_sn, enable_stats, true), @@ -113,7 +118,8 @@ t_subscribe(_) -> ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), TopicName1 = <<"abcD">>, send_register_msg(Socket, TopicName1, MsgId), - ?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, + receive_response(Socket)), send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId), ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16, @@ -145,7 +151,8 @@ t_subscribe_case01(_) -> TopicName1 = <<"abcD">>, send_register_msg(Socket, TopicName1, MsgId), - ?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, + receive_response(Socket)), send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId), ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ReturnCode>>, @@ -166,17 +173,18 @@ t_subscribe_case02(_) -> Will = 0, CleanSession = 0, MsgId = 1, - TopicId = ?PREDEF_TOPIC_ID1, %this TopicId is the predefined topic id corresponding to ?PREDEF_TOPIC_NAME1 + TopicId = ?PREDEF_TOPIC_ID1, ReturnCode = 0, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, - send_connect_msg(Socket, ?CLIENTID), + send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), Topic1 = ?PREDEF_TOPIC_NAME1, send_register_msg(Socket, Topic1, MsgId), - ?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, + receive_response(Socket)), send_subscribe_msg_predefined_topic(Socket, QoS, TopicId, MsgId), ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ReturnCode>>, @@ -206,9 +214,11 @@ t_subscribe_case03(_) -> ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_short_topic(Socket, QoS, <<"te">>, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, - ?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ReturnCode>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, + CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId:16, MsgId:16, ReturnCode>>, + receive_response(Socket) + ), send_unsubscribe_msg_short_topic(Socket, <<"te">>, MsgId), ?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)), @@ -217,8 +227,12 @@ t_subscribe_case03(_) -> ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), gen_udp:close(Socket). -%%In this case We use predefined topic name to register and subcribe, and expect to receive the corresponding predefined topic id but not a new generated topic id from broker. We design this case to illustrate -%% emqx_sn_gateway's compatibility of dealing with predefined and normal topics. Once we give more restrictions to different topic id type, this case would be deleted or modified. +%% In this case We use predefined topic name to register and subcribe, and +%% expect to receive the corresponding predefined topic id but not a new +%% generated topic id from broker. We design this case to illustrate +%% emqx_sn_gateway's compatibility of dealing with predefined and normal topics. +%% Once we give more restrictions to different topic id type, this case would +%% be deleted or modified. t_subscribe_case04(_) -> Dup = 0, QoS = 0, @@ -226,7 +240,7 @@ t_subscribe_case04(_) -> Will = 0, CleanSession = 0, MsgId = 1, - TopicId = ?PREDEF_TOPIC_ID1, %this TopicId is the predefined topic id corresponding to ?PREDEF_TOPIC_NAME1 + TopicId = ?PREDEF_TOPIC_ID1, ReturnCode = 0, {ok, Socket} = gen_udp:open(0, [binary]), ClientId = ?CLIENTID, @@ -234,10 +248,14 @@ t_subscribe_case04(_) -> ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), Topic1 = ?PREDEF_TOPIC_NAME1, send_register_msg(Socket, Topic1, MsgId), - ?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, + receive_response(Socket)), send_subscribe_msg_normal_topic(Socket, QoS, Topic1, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ReturnCode>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId:16, MsgId:16, ReturnCode>>, + receive_response(Socket) + ), send_unsubscribe_msg_normal_topic(Socket, Topic1, MsgId), ?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)), @@ -264,19 +282,30 @@ t_subscribe_case05(_) -> ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_register_msg(Socket, <<"abcD">>, MsgId), - ?assertEqual(<<7, ?SN_REGACK, TopicId1:16, MsgId:16, 0:8>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_REGACK, TopicId1:16, MsgId:16, 0:8>>, + receive_response(Socket) + ), send_subscribe_msg_normal_topic(Socket, QoS, <<"abcD">>, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ReturnCode>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId1:16, MsgId:16, ReturnCode>>, + receive_response(Socket) + ), send_subscribe_msg_normal_topic(Socket, QoS, <<"/sport/#">>, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ReturnCode>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId0:16, MsgId:16, ReturnCode>>, + receive_response(Socket) + ), send_subscribe_msg_normal_topic(Socket, QoS, <<"/a/+/water">>, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ReturnCode>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId0:16, MsgId:16, ReturnCode>>, + receive_response(Socket) + ), send_subscribe_msg_normal_topic(Socket, QoS, <<"/Tom/Home">>, MsgId), ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, @@ -306,19 +335,32 @@ t_subscribe_case06(_) -> ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_register_msg(Socket, <<"abc">>, MsgId), - ?assertEqual(<<7, ?SN_REGACK, TopicId1:16, MsgId:16, 0:8>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_REGACK, TopicId1:16, MsgId:16, 0:8>>, + receive_response(Socket) + ), send_register_msg(Socket, <<"/blue/#">>, MsgId), - ?assertEqual(<<7, ?SN_REGACK, TopicId0:16, MsgId:16, ?SN_RC_NOT_SUPPORTED:8>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_REGACK, TopicId0:16, + MsgId:16, ?SN_RC_NOT_SUPPORTED:8>>, + receive_response(Socket) + ), send_register_msg(Socket, <<"/blue/+/white">>, MsgId), - ?assertEqual(<<7, ?SN_REGACK, TopicId0:16, MsgId:16, ?SN_RC_NOT_SUPPORTED:8>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_REGACK, TopicId0:16, + MsgId:16, ?SN_RC_NOT_SUPPORTED:8>>, + receive_response(Socket) + ), send_register_msg(Socket, <<"/$sys/rain">>, MsgId), - ?assertEqual(<<7, ?SN_REGACK, TopicId2:16, MsgId:16, 0:8>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_REGACK, TopicId2:16, MsgId:16, 0:8>>, + receive_response(Socket) + ), send_subscribe_msg_short_topic(Socket, QoS, <<"Q2">>, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ReturnCode>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId0:16, MsgId:16, ReturnCode>>, + receive_response(Socket) + ), send_unsubscribe_msg_normal_topic(Socket, <<"Q2">>, MsgId), ?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)), @@ -342,8 +384,11 @@ t_subscribe_case07(_) -> ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_predefined_topic(Socket, QoS, TopicId1, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId1:16, MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>, + receive_response(Socket) + ), send_unsubscribe_msg_predefined_topic(Socket, TopicId2, MsgId), ?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)), @@ -365,8 +410,11 @@ t_subscribe_case08(_) -> ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_reserved_topic(Socket, QoS, TopicId2, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, ?SN_INVALID_TOPIC_ID:16, MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + ?SN_INVALID_TOPIC_ID:16, MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>, + receive_response(Socket) + ), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), @@ -390,15 +438,20 @@ t_publish_negqos_case09(_) -> send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), MsgId1 = 3, Payload1 = <<20, 21, 22, 23>>, send_publish_msg_normal_topic(Socket, NegQoS, MsgId1, TopicId1, Payload1), timer:sleep(100), case ?ENABLE_QOS3 of true -> - Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, + Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, What = receive_response(Socket), ?assertEqual(Eexp, What) end, @@ -431,7 +484,9 @@ t_publish_qos0_case01(_) -> send_publish_msg_normal_topic(Socket, QoS, MsgId1, TopicId1, Payload1), timer:sleep(100), - Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, + Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, What = receive_response(Socket), ?assertEqual(Eexp, What), @@ -453,15 +508,20 @@ t_publish_qos0_case02(_) -> ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_predefined_topic(Socket, QoS, PredefTopicId, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), MsgId1 = 3, Payload1 = <<20, 21, 22, 23>>, send_publish_msg_predefined_topic(Socket, QoS, MsgId1, PredefTopicId, Payload1), timer:sleep(100), - Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_PREDEFINED_TOPIC:2, PredefTopicId:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, + Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_PREDEFINED_TOPIC:2, + PredefTopicId:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, What = receive_response(Socket), ?assertEqual(Eexp, What), @@ -484,15 +544,20 @@ t_publish_qos0_case3(_) -> Topic = <<"/a/b/c">>, send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), MsgId1 = 3, Payload1 = <<20, 21, 22, 23>>, send_publish_msg_predefined_topic(Socket, QoS, MsgId1, TopicId, Payload1), timer:sleep(100), - Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, + Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, What = receive_response(Socket), ?assertEqual(Eexp, What), @@ -514,8 +579,11 @@ t_publish_qos0_case04(_) -> ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_normal_topic(Socket, QoS, <<"#">>, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), MsgId1 = 2, Payload1 = <<20, 21, 22, 23>>, @@ -523,7 +591,9 @@ t_publish_qos0_case04(_) -> send_publish_msg_short_topic(Socket, QoS, MsgId1, Topic, Payload1), timer:sleep(100), - Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_SHORT_TOPIC:2, Topic/binary, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, + Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_SHORT_TOPIC:2, + Topic/binary, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, What = receive_response(Socket), ?assertEqual(Eexp, What), @@ -544,8 +614,11 @@ t_publish_qos0_case05(_) -> send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_short_topic(Socket, QoS, <<"/#">>, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), @@ -567,15 +640,20 @@ t_publish_qos0_case06(_) -> Topic = <<"abc">>, send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), MsgId1 = 3, Payload1 = <<20, 21, 22, 23>>, send_publish_msg_normal_topic(Socket, QoS, MsgId1, TopicId1, Payload1), timer:sleep(100), - Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, + Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>, What = receive_response(Socket), ?assertEqual(Eexp, What), @@ -597,16 +675,25 @@ t_publish_qos1_case01(_) -> send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, - ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), Payload1 = <<20, 21, 22, 23>>, send_publish_msg_normal_topic(Socket, QoS, MsgId, TopicId1, Payload1), - ?assertEqual(<<7, ?SN_PUBACK, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_PUBACK, TopicId1:16, + MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), timer:sleep(100), - ?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, <<20, 21, 22, 23>>/binary>>, receive_response(Socket)), + ?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId1:16, MsgId:16, <<20, 21, 22, 23>>/binary>>, + receive_response(Socket) + ), send_disconnect_msg(Socket, undefined), gen_udp:close(Socket). @@ -625,12 +712,18 @@ t_publish_qos1_case02(_) -> ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_predefined_topic(Socket, QoS, PredefTopicId, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), Payload1 = <<20, 21, 22, 23>>, send_publish_msg_predefined_topic(Socket, QoS, MsgId, PredefTopicId, Payload1), - ?assertEqual(<<7, ?SN_PUBACK, PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_PUBACK, PredefTopicId:16, + MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), timer:sleep(100), send_disconnect_msg(Socket, undefined), @@ -645,7 +738,10 @@ t_publish_qos1_case03(_) -> send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_publish_msg_predefined_topic(Socket, QoS, MsgId, tid(5), <<20, 21, 22, 23>>), - ?assertEqual(<<7, ?SN_PUBACK, TopicId5:16, MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_PUBACK, TopicId5:16, + MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>, + receive_response(Socket) + ), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), @@ -664,15 +760,20 @@ t_publish_qos1_case04(_) -> send_connect_msg(Socket, ClientId), ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_short_topic(Socket, QoS, <<"ab">>, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, - ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), Topic = <<"ab">>, Payload1 = <<20, 21, 22, 23>>, send_publish_msg_short_topic(Socket, QoS, MsgId, Topic, Payload1), <> = Topic, - ?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16, + MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), timer:sleep(100), send_disconnect_msg(Socket, undefined), @@ -692,13 +793,18 @@ t_publish_qos1_case05(_) -> ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_normal_topic(Socket, QoS, <<"ab">>, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, - ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), send_publish_msg_short_topic(Socket, QoS, MsgId, <<"/#">>, <<20, 21, 22, 23>>), <> = <<"/#">>, - ?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16, MsgId:16, ?SN_RC_NOT_SUPPORTED>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16, + MsgId:16, ?SN_RC_NOT_SUPPORTED>>, + receive_response(Socket) + ), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), @@ -724,7 +830,10 @@ t_publish_qos1_case06(_) -> send_publish_msg_short_topic(Socket, QoS, MsgId, <<"/+">>, <<20, 21, 22, 23>>), <> = <<"/+">>, - ?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16, MsgId:16, ?SN_RC_NOT_SUPPORTED>>, receive_response(Socket)), + ?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16, + MsgId:16, ?SN_RC_NOT_SUPPORTED>>, + receive_response(Socket) + ), send_disconnect_msg(Socket, undefined), ?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)), @@ -751,7 +860,11 @@ t_publish_qos2_case01(_) -> send_publish_msg_normal_topic(Socket, QoS, MsgId, TopicId1, Payload1), ?assertEqual(<<4, ?SN_PUBREC, MsgId:16>>, receive_response(Socket)), send_pubrel_msg(Socket, MsgId), - ?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, 1:16, <<20, 21, 22, 23>>/binary>>, receive_response(Socket)), + ?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId1:16, 1:16, <<20, 21, 22, 23>>/binary>>, + receive_response(Socket) + ), ?assertEqual(<<4, ?SN_PUBCOMP, MsgId:16>>, receive_response(Socket)), timer:sleep(100), @@ -773,15 +886,21 @@ t_publish_qos2_case02(_) -> ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), send_subscribe_msg_predefined_topic(Socket, QoS, PredefTopicId, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, ?FNU:1, QoS:2, ?FNU:5, PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, ?FNU:1, QoS:2, ?FNU:5, + PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>, + receive_response(Socket) + ), Payload1 = <<20, 21, 22, 23>>, send_publish_msg_predefined_topic(Socket, QoS, MsgId, PredefTopicId, Payload1), ?assertEqual(<<4, ?SN_PUBREC, MsgId:16>>, receive_response(Socket)), send_pubrel_msg(Socket, MsgId), - ?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_PREDEFINED_TOPIC :2, PredefTopicId:16, 1:16, <<20, 21, 22, 23>>/binary>>, receive_response(Socket)), + ?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_PREDEFINED_TOPIC:2, + PredefTopicId:16, 1:16, <<20, 21, 22, 23>>/binary>>, + receive_response(Socket) + ), ?assertEqual(<<4, ?SN_PUBCOMP, MsgId:16>>, receive_response(Socket)), timer:sleep(100), @@ -812,7 +931,11 @@ t_publish_qos2_case03(_) -> ?assertEqual(<<4, ?SN_PUBREC, MsgId:16>>, receive_response(Socket)), send_pubrel_msg(Socket, MsgId), - ?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_SHORT_TOPIC :2, <<"/a">>/binary, 1:16, <<20, 21, 22, 23>>/binary>>, receive_response(Socket)), + ?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, + Will:1, CleanSession:1, ?SN_SHORT_TOPIC:2, + "/a", 1:16, <<20, 21, 22, 23>>/binary>>, + receive_response(Socket) + ), ?assertEqual(<<4, ?SN_PUBCOMP, MsgId:16>>, receive_response(Socket)), timer:sleep(100), @@ -1083,7 +1206,11 @@ t_asleep_test03_to_awake_qos1_dl_msg(_) -> send_register_msg(Socket, TopicName1, MsgId1), ?assertEqual(<<7, ?SN_REGACK, TopicId1:16, MsgId1:16, 0:8>>, receive_response(Socket)), send_subscribe_msg_predefined_topic(Socket, QoS, TopicId1, MsgId), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ReturnCode>>, receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId1:16, MsgId:16, ReturnCode>>, + receive_response(Socket) + ), % goto asleep state send_disconnect_msg(Socket, 1), @@ -1109,7 +1236,10 @@ t_asleep_test03_to_awake_qos1_dl_msg(_) -> %% the broker should sent dl msgs to the awake client before sending the pingresp UdpData = receive_response(Socket), - MsgId_udp = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicId1, Payload1}, UdpData), + MsgId_udp = check_publish_msg_on_udp( + {Dup, QoS, Retain, WillBit, + CleanSession, ?SN_NORMAL_TOPIC, + TopicId1, Payload1}, UdpData), send_puback_msg(Socket, TopicId1, MsgId_udp), %% check the pingresp is received at last @@ -1141,8 +1271,11 @@ t_asleep_test04_to_awake_qos1_dl_msg(_) -> CleanSession = 0, ReturnCode = 0, send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId1), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId1:16, ReturnCode>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + WillBit:1,CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId0:16, MsgId1:16, ReturnCode>>, + receive_response(Socket) + ), % goto asleep state send_disconnect_msg(Socket, 1), @@ -1176,11 +1309,17 @@ t_asleep_test04_to_awake_qos1_dl_msg(_) -> send_regack_msg(Socket, TopicIdNew, MsgId3), UdpData2 = receive_response(Socket), - MsgId_udp2 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload1}, UdpData2), + MsgId_udp2 = check_publish_msg_on_udp( + {Dup, QoS, Retain, WillBit, + CleanSession, ?SN_NORMAL_TOPIC, + TopicIdNew, Payload1}, UdpData2), send_puback_msg(Socket, TopicIdNew, MsgId_udp2), UdpData3 = receive_response(Socket), - MsgId_udp3 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload2}, UdpData3), + MsgId_udp3 = check_publish_msg_on_udp( + {Dup, QoS, Retain, WillBit, + CleanSession, ?SN_NORMAL_TOPIC, + TopicIdNew, Payload2}, UdpData3), send_puback_msg(Socket, TopicIdNew, MsgId_udp3), ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), @@ -1216,8 +1355,11 @@ t_asleep_test05_to_awake_qos1_dl_msg(_) -> CleanSession = 0, ReturnCode = 0, send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId1), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId1:16, ReturnCode>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId0:16, MsgId1:16, ReturnCode>>, + receive_response(Socket) + ), % goto asleep state SleepDuration = 30, @@ -1250,21 +1392,28 @@ t_asleep_test05_to_awake_qos1_dl_msg(_) -> send_regack_msg(Socket, TopicIdNew, MsgId_reg), UdpData2 = receive_response(Socket), - MsgId2 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload2}, UdpData2), + MsgId2 = check_publish_msg_on_udp( + {Dup, QoS, Retain, WillBit, + CleanSession, ?SN_NORMAL_TOPIC, + TopicIdNew, Payload2}, UdpData2), send_puback_msg(Socket, TopicIdNew, MsgId2), timer:sleep(50), UdpData3 = wrap_receive_response(Socket), - MsgId3 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload3}, UdpData3), + MsgId3 = check_publish_msg_on_udp( + {Dup, QoS, Retain, WillBit, + CleanSession, ?SN_NORMAL_TOPIC, + TopicIdNew, Payload3}, UdpData3), send_puback_msg(Socket, TopicIdNew, MsgId3), timer:sleep(50), case receive_response(Socket) of <<2,23>> -> ok; UdpData4 -> - MsgId4 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, - CleanSession, ?SN_NORMAL_TOPIC, - TopicIdNew, Payload4}, UdpData4), + MsgId4 = check_publish_msg_on_udp( + {Dup, QoS, Retain, WillBit, + CleanSession, ?SN_NORMAL_TOPIC, + TopicIdNew, Payload4}, UdpData4), send_puback_msg(Socket, TopicIdNew, MsgId4) end, ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), @@ -1322,7 +1471,10 @@ t_asleep_test06_to_awake_qos2_dl_msg(_) -> send_pingreq_msg(Socket, ClientId), UdpData = wrap_receive_response(Socket), - MsgId_udp = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicId_tom, Payload1}, UdpData), + MsgId_udp = check_publish_msg_on_udp( + {Dup, QoS, Retain, WillBit, + CleanSession, ?SN_NORMAL_TOPIC, + TopicId_tom, Payload1}, UdpData), send_pubrec_msg(Socket, MsgId_udp), ?assertMatch(<<_:8, ?SN_PUBREL:8, _/binary>>, receive_response(Socket)), send_pubcomp_msg(Socket, MsgId_udp), @@ -1357,8 +1509,11 @@ t_asleep_test07_to_connected(_) -> send_register_msg(Socket, TopicName_tom, MsgId1), TopicId_tom = check_regack_msg_on_udp(MsgId1, receive_response(Socket)), send_subscribe_msg_predefined_topic(Socket, QoS, TopicId_tom, MsgId1), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId_tom:16, MsgId1:16, ReturnCode>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + WillBit:1,CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId_tom:16, MsgId1:16, ReturnCode>>, + receive_response(Socket) + ), % goto asleep state send_disconnect_msg(Socket, SleepDuration), @@ -1436,8 +1591,11 @@ t_asleep_test09_to_awake_again_qos1_dl_msg(_) -> CleanSession = 0, ReturnCode = 0, send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId1), - ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId1:16, ReturnCode>>, - receive_response(Socket)), + ?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, + WillBit:1,CleanSession:1, ?SN_NORMAL_TOPIC:2, + TopicId0:16, MsgId1:16, ReturnCode>>, + receive_response(Socket) + ), % goto asleep state SleepDuration = 30, send_disconnect_msg(Socket, SleepDuration), @@ -1471,7 +1629,10 @@ t_asleep_test09_to_awake_again_qos1_dl_msg(_) -> udp_receive_timeout -> ok; UdpData2 -> - MsgId2 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload2}, UdpData2), + MsgId2 = check_publish_msg_on_udp( + {Dup, QoS, Retain, WillBit, + CleanSession, ?SN_NORMAL_TOPIC, + TopicIdNew, Payload2}, UdpData2), send_puback_msg(Socket, TopicIdNew, MsgId2) end, timer:sleep(100), @@ -1480,7 +1641,10 @@ t_asleep_test09_to_awake_again_qos1_dl_msg(_) -> udp_receive_timeout -> ok; UdpData3 -> - MsgId3 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload3}, UdpData3), + MsgId3 = check_publish_msg_on_udp( + {Dup, QoS, Retain, WillBit, + CleanSession, ?SN_NORMAL_TOPIC, + TopicIdNew, Payload3}, UdpData3), send_puback_msg(Socket, TopicIdNew, MsgId3) end, timer:sleep(100), @@ -1489,16 +1653,18 @@ t_asleep_test09_to_awake_again_qos1_dl_msg(_) -> udp_receive_timeout -> ok; UdpData4 -> - MsgId4 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, - CleanSession, ?SN_NORMAL_TOPIC, - TopicIdNew, Payload4}, UdpData4), + MsgId4 = check_publish_msg_on_udp( + {Dup, QoS, Retain, WillBit, + CleanSession, ?SN_NORMAL_TOPIC, + TopicIdNew, Payload4}, UdpData4), send_puback_msg(Socket, TopicIdNew, MsgId4) end, ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), %% send PINGREQ again to enter awake state send_pingreq_msg(Socket, ClientId), - %% will not receive any buffered PUBLISH messages buffered before last awake, only receive PINGRESP here + %% will not receive any buffered PUBLISH messages buffered before last + %% awake, only receive PINGRESP here ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), gen_udp:close(Socket). @@ -1901,8 +2067,12 @@ check_dispatched_message(Dup, QoS, Retain, TopicIdType, TopicId, Payload, Socket PubMsg = receive_response(Socket), Length = 7 + byte_size(Payload), ?LOG("check_dispatched_message ~p~n", [PubMsg]), - ?LOG("expected ~p xx ~p~n", [<>, Payload]), - <> = PubMsg, + ?LOG("expected ~p xx ~p~n", + [<>, Payload]), + <> = PubMsg, case QoS of 0 -> ok; 1 -> send_puback_msg(Socket, TopicId, MsgId); @@ -1914,11 +2084,14 @@ check_dispatched_message(Dup, QoS, Retain, TopicIdType, TopicId, Payload, Socket get_udp_broadcast_address() -> "255.255.255.255". -check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, TopicType, TopicId, Payload}, UdpData) -> +check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, + CleanSession, TopicType, TopicId, Payload}, UdpData) -> <> = UdpData, ct:pal("UdpData: ~p, Payload: ~p, PayloadIn: ~p", [UdpData, Payload, PayloadIn]), Size9 = byte_size(Payload) + 7, - Eexp = <>, + Eexp = <>, ?assertEqual(Eexp, HeaderUdp), % mqtt-sn header should be same ?assertEqual(Payload, PayloadIn), % payload should be same MsgId. diff --git a/apps/emqx_stomp/src/emqx_stomp.app.src b/apps/emqx_stomp/src/emqx_stomp.app.src index d2ecae53b..c2f4b57d3 100644 --- a/apps/emqx_stomp/src/emqx_stomp.app.src +++ b/apps/emqx_stomp/src/emqx_stomp.app.src @@ -1,6 +1,6 @@ {application, emqx_stomp, [{description, "EMQ X Stomp Protocol Plugin"}, - {vsn, "4.3.2"}, % strict semver, bump manually! + {vsn, "4.3.3"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_stomp_sup]}, {applications, [kernel,stdlib]}, diff --git a/apps/emqx_stomp/src/emqx_stomp.appup.src b/apps/emqx_stomp/src/emqx_stomp.appup.src index bf4603e52..dce441b3c 100644 --- a/apps/emqx_stomp/src/emqx_stomp.appup.src +++ b/apps/emqx_stomp/src/emqx_stomp.appup.src @@ -1,10 +1,16 @@ %% -*- mode: erlang -*- {VSN, - [{"4.3.1",[{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]}, + [{"4.3.2",[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}]}, + {"4.3.1",[ + {load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}, + {load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]}, {"4.3.0", [{restart_application,emqx_stomp}]}, {<<".*">>,[]}], - [{"4.3.1",[{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]}, + [{"4.3.2",[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}]}, + {"4.3.1",[ + {load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}, + {load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]}, {"4.3.0", [{restart_application,emqx_stomp}]}, {<<".*">>,[]}]}. diff --git a/apps/emqx_stomp/src/emqx_stomp_protocol.erl b/apps/emqx_stomp/src/emqx_stomp_protocol.erl index fc211be10..4e371d9b0 100644 --- a/apps/emqx_stomp/src/emqx_stomp_protocol.erl +++ b/apps/emqx_stomp/src/emqx_stomp_protocol.erl @@ -108,6 +108,8 @@ , init/2 ]}). +-elvis([{elvis_style, dont_repeat_yourself, disable}]). + -type(pstate() :: #pstate{}). %% @doc Init protocol @@ -132,8 +134,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port}, AllowAnonymous = get_value(allow_anonymous, Opts, false), DefaultUser = get_value(default_user, Opts), - - #pstate{ + #pstate{ conninfo = NConnInfo, clientinfo = ClientInfo, heartfun = HeartFun, @@ -165,7 +166,7 @@ default_conninfo(ConnInfo) -> info(State) -> maps:from_list(info(?INFO_KEYS, State)). --spec info(list(atom())|atom(), pstate()) -> term(). +-spec info(list(atom()) | atom(), pstate()) -> term(). info(Keys, State) when is_list(Keys) -> [{Key, info(Key, State)} || Key <- Keys]; info(conninfo, #pstate{conninfo = ConnInfo}) -> @@ -288,7 +289,12 @@ received(#stomp_frame{command = <<"CONNECT">>}, State = #pstate{connected = true received(Frame = #stomp_frame{command = <<"SEND">>, headers = Headers}, State) -> case header(<<"transaction">>, Headers) of undefined -> {ok, handle_recv_send_frame(Frame, State)}; - TransactionId -> add_action(TransactionId, {fun ?MODULE:handle_recv_send_frame/2, [Frame]}, receipt_id(Headers), State) + TransactionId -> + add_action(TransactionId, + {fun ?MODULE:handle_recv_send_frame/2, [Frame]}, + receipt_id(Headers), + State + ) end; received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers}, @@ -346,7 +352,11 @@ received(#stomp_frame{command = <<"UNSUBSCRIBE">>, headers = Headers}, received(Frame = #stomp_frame{command = <<"ACK">>, headers = Headers}, State) -> case header(<<"transaction">>, Headers) of undefined -> {ok, handle_recv_ack_frame(Frame, State)}; - TransactionId -> add_action(TransactionId, {fun ?MODULE:handle_recv_ack_frame/2, [Frame]}, receipt_id(Headers), State) + TransactionId -> + add_action(TransactionId, + {fun ?MODULE:handle_recv_ack_frame/2, [Frame]}, + receipt_id(Headers), + State) end; %% NACK @@ -357,7 +367,11 @@ received(Frame = #stomp_frame{command = <<"ACK">>, headers = Headers}, State) -> received(Frame = #stomp_frame{command = <<"NACK">>, headers = Headers}, State) -> case header(<<"transaction">>, Headers) of undefined -> {ok, handle_recv_nack_frame(Frame, State)}; - TransactionId -> add_action(TransactionId, {fun ?MODULE:handle_recv_nack_frame/2, [Frame]}, receipt_id(Headers), State) + TransactionId -> + add_action(TransactionId, + {fun ?MODULE:handle_recv_nack_frame/2, [Frame]}, + receipt_id(Headers), + State) end; %% BEGIN @@ -516,9 +530,9 @@ negotiate_version(Accepts) -> negotiate_version(Ver, []) -> {error, <<"Supported protocol versions < ", Ver/binary>>}; -negotiate_version(Ver, [AcceptVer|_]) when Ver >= AcceptVer -> +negotiate_version(Ver, [AcceptVer | _]) when Ver >= AcceptVer -> {ok, AcceptVer}; -negotiate_version(Ver, [_|T]) -> +negotiate_version(Ver, [_ | T]) -> negotiate_version(Ver, T). check_login(Login, _, AllowAnonymous, _) @@ -537,7 +551,7 @@ check_login(Login, Passcode, _, DefaultUser) -> add_action(Id, Action, ReceiptId, State = #pstate{transaction = Trans}) -> case maps:get(Id, Trans, undefined) of {Ts, Actions} -> - NTrans = Trans#{Id => {Ts, [Action|Actions]}}, + NTrans = Trans#{Id => {Ts, [Action | Actions]}}, {ok, State#pstate{transaction = NTrans}}; _ -> send(error_frame(ReceiptId, ["Transaction ", Id, " not found"]), State) @@ -588,15 +602,29 @@ next_ackid() -> put(ackid, AckId + 1), AckId. -make_mqtt_message(Topic, Headers, Body) -> - Msg = emqx_message:make(stomp, Topic, Body), - Headers1 = lists:foldl(fun(Key, Headers0) -> - proplists:delete(Key, Headers0) - end, Headers, [<<"destination">>, - <<"content-length">>, - <<"content-type">>, - <<"transaction">>, - <<"receipt">>]), +make_mqtt_message(Topic, Headers, Body, + #pstate{ + conninfo = #{proto_ver := ProtoVer}, + clientinfo = #{ + protocol := Protocol, + clientid := ClientId, + username := Username, + peerhost := PeerHost}}) -> + Msg = emqx_message:make( + ClientId, ?QOS_0, + Topic, Body, #{}, + #{proto_ver => ProtoVer, + protocol => Protocol, + username => Username, + peerhost => PeerHost}), + Headers1 = lists:foldl( + fun(Key, Headers0) -> + proplists:delete(Key, Headers0) + end, Headers, [<<"destination">>, + <<"content-length">>, + <<"content-type">>, + <<"transaction">>, + <<"receipt">>]), emqx_message:set_headers(#{stomp_headers => Headers1}, Msg). receipt_id(Headers) -> @@ -611,7 +639,7 @@ handle_recv_send_frame(#stomp_frame{command = <<"SEND">>, headers = Headers, bod allow -> _ = maybe_send_receipt(receipt_id(Headers), State), _ = emqx_broker:publish( - make_mqtt_message(Topic, Headers, iolist_to_binary(Body)) + make_mqtt_message(Topic, Headers, iolist_to_binary(Body), State) ), State; deny -> @@ -699,7 +727,7 @@ find_sub_by_id(Id, Subs) -> end, Subs), case maps:to_list(Found) of [] -> undefined; - [Sub|_] -> Sub + [Sub | _] -> Sub end. is_acl_enabled(_) -> diff --git a/src/emqx_types.erl b/src/emqx_types.erl index 3e53eafd1..3d5fb66a5 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -147,7 +147,7 @@ dn => binary(), atom() => term() }). --type(clientid() :: binary()|atom()). +-type(clientid() :: binary() | atom()). -type(username() :: maybe(binary())). -type(password() :: maybe(binary())). -type(peerhost() :: inet:ip_address()). @@ -193,6 +193,7 @@ username => username(), peerhost => peerhost(), properties => properties(), + allow_publish => boolean(), atom() => term()}). -type(banned() :: #banned{}).