Merge pull request #6076 from HJianBo/enhance-message-hook
feat(exhook): expose headers for on_messages_publish hook
This commit is contained in:
commit
d305111929
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_coap,
|
||||
[{description, "EMQ X CoAP Gateway"},
|
||||
{vsn, "4.3.0"}, % strict semver, bump manually!
|
||||
{vsn, "4.3.1"}, % strict semver, bump manually!
|
||||
{modules, []},
|
||||
{registered, []},
|
||||
{applications, [kernel,stdlib,gen_coap]},
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
%% -*-: erlang -*-
|
||||
{VSN,
|
||||
[{"4.3.0",[
|
||||
{load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []}]},
|
||||
{<<".*">>, []}],
|
||||
[{"4.3.0",[
|
||||
{load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []}]},
|
||||
{<<".*">>, []}]
|
||||
}.
|
|
@ -58,6 +58,8 @@
|
|||
|
||||
-define(SUBOPTS, #{rh => 0, rap => 0, nl => 0, qos => ?QOS_0, is_new => false}).
|
||||
|
||||
-define(PROTO_VER, 1).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% API
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -244,15 +246,26 @@ chann_publish(Topic, Payload, State = #state{clientid = ClientId}) ->
|
|||
case emqx_access_control:check_acl(clientinfo(State), publish, Topic) of
|
||||
allow ->
|
||||
_ = emqx_broker:publish(
|
||||
emqx_message:set_flag(retain, false,
|
||||
emqx_message:make(ClientId, ?QOS_0, Topic, Payload))),
|
||||
ok;
|
||||
packet_to_message(Topic, Payload, State)), ok;
|
||||
deny ->
|
||||
?LOG(warning, "publish to ~p by clientid ~p failed due to acl check.",
|
||||
[Topic, ClientId]),
|
||||
{error, forbidden}
|
||||
end.
|
||||
|
||||
packet_to_message(Topic, Payload,
|
||||
#state{clientid = ClientId,
|
||||
username = Username,
|
||||
peername = {PeerHost, _}}) ->
|
||||
Message = emqx_message:set_flag(
|
||||
retain, false,
|
||||
emqx_message:make(ClientId, ?QOS_0, Topic, Payload)
|
||||
),
|
||||
emqx_message:set_headers(
|
||||
#{ proto_ver => ?PROTO_VER
|
||||
, protocol => coap
|
||||
, username => Username
|
||||
, peerhost => PeerHost}, Message).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Deliver
|
||||
|
@ -324,7 +337,7 @@ conninfo(#state{peername = Peername,
|
|||
peercert => nossl, %% TODO: dtls
|
||||
conn_mod => ?MODULE,
|
||||
proto_name => <<"CoAP">>,
|
||||
proto_ver => 1,
|
||||
proto_ver => ?PROTO_VER,
|
||||
clean_start => true,
|
||||
clientid => ClientId,
|
||||
username => undefined,
|
||||
|
@ -384,4 +397,3 @@ clientinfo(#state{peername = {PeerHost, _},
|
|||
mountpoint => undefined,
|
||||
ws_cookie => undefined
|
||||
}.
|
||||
|
||||
|
|
|
@ -24,6 +24,11 @@
|
|||
## Value: false | Duration
|
||||
#exhook.auto_reconnect = 60s
|
||||
|
||||
## The process pool size for gRPC client
|
||||
##
|
||||
## Default: Equals cpu cores
|
||||
## Value: Integer
|
||||
#exhook.pool_size = 16
|
||||
|
||||
##--------------------------------------------------------------------
|
||||
## The Hook callback servers
|
||||
|
|
|
@ -26,6 +26,10 @@
|
|||
end
|
||||
end}.
|
||||
|
||||
{mapping, "exhook.pool_size", "emqx_exhook.pool_size", [
|
||||
{datatype, integer}
|
||||
]}.
|
||||
|
||||
{mapping, "exhook.server.$name.url", "emqx_exhook.servers", [
|
||||
{datatype, string}
|
||||
]}.
|
||||
|
|
|
@ -358,6 +358,31 @@ message Message {
|
|||
bytes payload = 6;
|
||||
|
||||
uint64 timestamp = 7;
|
||||
|
||||
// The key of header can be:
|
||||
// - username:
|
||||
// * Readonly
|
||||
// * The username of sender client
|
||||
// * Value type: utf8 string
|
||||
// - protocol:
|
||||
// * Readonly
|
||||
// * The protocol name of sender client
|
||||
// * Value type: string enum with "mqtt", "mqtt-sn", ...
|
||||
// - peerhost:
|
||||
// * Readonly
|
||||
// * The peerhost of sender client
|
||||
// * Value type: ip address string
|
||||
// - allow_publish:
|
||||
// * Writable
|
||||
// * Whether to allow the message to be published by emqx
|
||||
// * Value type: string enum with "true", "false", default is "true"
|
||||
//
|
||||
// Notes: All header may be missing, which means that the message does not
|
||||
// carry these headers. We can guarantee that clients coming from MQTT,
|
||||
// MQTT-SN, CoAP, LwM2M and other natively supported protocol clients will
|
||||
// carry these headers, but there is no guarantee that messages published
|
||||
// by other means will do, e.g. messages published by HTTP-API
|
||||
map<string, string> headers = 8;
|
||||
}
|
||||
|
||||
message Property {
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
]}.
|
||||
|
||||
{deps,
|
||||
[{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.3"}}}
|
||||
[{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.4"}}}
|
||||
]}.
|
||||
|
||||
{grpc,
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_exhook,
|
||||
[{description, "EMQ X Extension for Hook"},
|
||||
{vsn, "4.3.4"},
|
||||
{vsn, "4.4.0"},
|
||||
{modules, []},
|
||||
{registered, []},
|
||||
{mod, {emqx_exhook_app, []}},
|
||||
|
|
|
@ -1,15 +1,7 @@
|
|||
%% -*-: erlang -*-
|
||||
{VSN,
|
||||
[
|
||||
{<<"4.3.[0-3]">>, [
|
||||
{restart_application, emqx_exhook}
|
||||
]},
|
||||
{<<".*">>, []}
|
||||
[{<<".*">>, []}
|
||||
],
|
||||
[
|
||||
{<<"4.3.[0-3]">>, [
|
||||
{restart_application, emqx_exhook}
|
||||
]},
|
||||
{<<".*">>, []}
|
||||
[{<<".*">>, []}
|
||||
]
|
||||
}.
|
||||
|
|
|
@ -50,6 +50,7 @@
|
|||
|
||||
%% Utils
|
||||
-export([ message/1
|
||||
, headers/1
|
||||
, stringfy/1
|
||||
, merge_responsed_bool/2
|
||||
, merge_responsed_message/2
|
||||
|
@ -62,6 +63,8 @@
|
|||
, call_fold/3
|
||||
]).
|
||||
|
||||
-elvis([{elvis_style, god_modules, disable}]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Clients
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -258,17 +261,58 @@ clientinfo(ClientInfo =
|
|||
cn => maybe(maps:get(cn, ClientInfo, undefined)),
|
||||
dn => maybe(maps:get(dn, ClientInfo, undefined))}.
|
||||
|
||||
message(#message{id = Id, qos = Qos, from = From, topic = Topic, payload = Payload, timestamp = Ts}) ->
|
||||
message(#message{id = Id, qos = Qos, from = From, topic = Topic,
|
||||
payload = Payload, timestamp = Ts, headers = Headers}) ->
|
||||
#{node => stringfy(node()),
|
||||
id => emqx_guid:to_hexstr(Id),
|
||||
qos => Qos,
|
||||
from => stringfy(From),
|
||||
topic => Topic,
|
||||
payload => Payload,
|
||||
timestamp => Ts}.
|
||||
timestamp => Ts,
|
||||
headers => headers(Headers)
|
||||
}.
|
||||
|
||||
assign_to_message(#{qos := Qos, topic := Topic, payload := Payload}, Message) ->
|
||||
Message#message{qos = Qos, topic = Topic, payload = Payload}.
|
||||
headers(Headers) ->
|
||||
Ls = [username, protocol, peerhost, allow_publish],
|
||||
maps:fold(
|
||||
fun
|
||||
(_, undefined, Acc) ->
|
||||
Acc; %% Ignore undefined value
|
||||
(K, V, Acc) ->
|
||||
case lists:member(K, Ls) of
|
||||
true ->
|
||||
Acc#{atom_to_binary(K) => bin(K, V)};
|
||||
_ ->
|
||||
Acc
|
||||
end
|
||||
end, #{}, Headers).
|
||||
|
||||
bin(K, V) when K == username;
|
||||
K == protocol;
|
||||
K == allow_publish ->
|
||||
bin(V);
|
||||
bin(peerhost, V) ->
|
||||
bin(inet:ntoa(V)).
|
||||
|
||||
bin(V) when is_binary(V) -> V;
|
||||
bin(V) when is_atom(V) -> atom_to_binary(V);
|
||||
bin(V) when is_list(V) -> iolist_to_binary(V).
|
||||
|
||||
assign_to_message(InMessage = #{qos := Qos, topic := Topic,
|
||||
payload := Payload}, Message) ->
|
||||
NMsg = Message#message{qos = Qos, topic = Topic, payload = Payload},
|
||||
enrich_header(maps:get(headers, InMessage, #{}), NMsg).
|
||||
|
||||
enrich_header(Headers, Message) ->
|
||||
case maps:get(<<"allow_publish">>, Headers, undefined) of
|
||||
<<"false">> ->
|
||||
emqx_message:set_header(allow_publish, false, Message);
|
||||
<<"true">> ->
|
||||
emqx_message:set_header(allow_publish, true, Message);
|
||||
_ ->
|
||||
Message
|
||||
end.
|
||||
|
||||
topicfilters(Tfs) when is_list(Tfs) ->
|
||||
[#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs].
|
||||
|
@ -299,11 +343,7 @@ merge_responsed_bool(_Req, #{type := 'IGNORE'}) ->
|
|||
ignore;
|
||||
merge_responsed_bool(Req, #{type := Type, value := {bool_result, NewBool}})
|
||||
when is_boolean(NewBool) ->
|
||||
NReq = Req#{result => NewBool},
|
||||
case Type of
|
||||
'CONTINUE' -> {ok, NReq};
|
||||
'STOP_AND_RETURN' -> {stop, NReq}
|
||||
end;
|
||||
{ret(Type), Req#{result => NewBool}};
|
||||
merge_responsed_bool(_Req, Resp) ->
|
||||
?LOG(warning, "Unknown responsed value ~0p to merge to callback chain", [Resp]),
|
||||
ignore.
|
||||
|
@ -311,11 +351,10 @@ merge_responsed_bool(_Req, Resp) ->
|
|||
merge_responsed_message(_Req, #{type := 'IGNORE'}) ->
|
||||
ignore;
|
||||
merge_responsed_message(Req, #{type := Type, value := {message, NMessage}}) ->
|
||||
NReq = Req#{message => NMessage},
|
||||
case Type of
|
||||
'CONTINUE' -> {ok, NReq};
|
||||
'STOP_AND_RETURN' -> {stop, NReq}
|
||||
end;
|
||||
{ret(Type), Req#{message => NMessage}};
|
||||
merge_responsed_message(_Req, Resp) ->
|
||||
?LOG(warning, "Unknown responsed value ~0p to merge to callback chain", [Resp]),
|
||||
ignore.
|
||||
|
||||
ret('CONTINUE') -> ok;
|
||||
ret('STOP_AND_RETURN') -> stop.
|
||||
|
|
|
@ -36,6 +36,8 @@
|
|||
, server/1
|
||||
, put_request_failed_action/1
|
||||
, get_request_failed_action/0
|
||||
, put_pool_size/1
|
||||
, get_pool_size/0
|
||||
]).
|
||||
|
||||
%% gen_server callbacks
|
||||
|
@ -117,6 +119,9 @@ init([Servers, AutoReconnect, ReqOpts0]) ->
|
|||
put_request_failed_action(
|
||||
maps:get(request_failed_action, ReqOpts0, deny)
|
||||
),
|
||||
put_pool_size(
|
||||
maps:get(pool_size, ReqOpts0, erlang:system_info(schedulers))
|
||||
),
|
||||
|
||||
%% Load the hook servers
|
||||
ReqOpts = maps:without([request_failed_action], ReqOpts0),
|
||||
|
@ -286,6 +291,14 @@ put_request_failed_action(Val) ->
|
|||
get_request_failed_action() ->
|
||||
persistent_term:get({?APP, request_failed_action}).
|
||||
|
||||
put_pool_size(Val) ->
|
||||
persistent_term:put({?APP, pool_size}, Val).
|
||||
|
||||
get_pool_size() ->
|
||||
%% Avoid the scenario that the parameter is not set after
|
||||
%% the hot upgrade completed.
|
||||
persistent_term:get({?APP, pool_size}, erlang:system_info(schedulers)).
|
||||
|
||||
save(Name, ServerState) ->
|
||||
Saved = persistent_term:get(?APP, []),
|
||||
persistent_term:put(?APP, lists:reverse([Name | Saved])),
|
||||
|
|
|
@ -77,6 +77,8 @@
|
|||
|
||||
-dialyzer({nowarn_function, [inc_metrics/2]}).
|
||||
|
||||
-elvis([{elvis_style, dont_repeat_yourself, disable}]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Load/Unload APIs
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -125,13 +127,18 @@ channel_opts(Opts) ->
|
|||
SvrAddr = format_http_uri(Scheme, Host, Port),
|
||||
ClientOpts = case Scheme of
|
||||
https ->
|
||||
SslOpts = lists:keydelete(ssl, 1, proplists:get_value(ssl_options, Opts, [])),
|
||||
SslOpts = lists:keydelete(
|
||||
ssl,
|
||||
1,
|
||||
proplists:get_value(ssl_options, Opts, [])
|
||||
),
|
||||
#{gun_opts =>
|
||||
#{transport => ssl,
|
||||
transport_opts => SslOpts}};
|
||||
_ -> #{}
|
||||
end,
|
||||
{SvrAddr, ClientOpts}.
|
||||
NClientOpts = ClientOpts#{pool_size => emqx_exhook_mngr:get_pool_size()},
|
||||
{SvrAddr, NClientOpts}.
|
||||
|
||||
format_http_uri(Scheme, Host0, Port) ->
|
||||
Host = case is_tuple(Host0) of
|
||||
|
@ -174,16 +181,18 @@ resovle_hookspec(HookSpecs) when is_list(HookSpecs) ->
|
|||
case maps:get(name, HookSpec, undefined) of
|
||||
undefined -> Acc;
|
||||
Name0 ->
|
||||
Name = try binary_to_existing_atom(Name0, utf8) catch T:R:_ -> {T,R} end,
|
||||
case lists:member(Name, AvailableHooks) of
|
||||
true ->
|
||||
case lists:member(Name, MessageHooks) of
|
||||
true ->
|
||||
Acc#{Name => #{topics => maps:get(topics, HookSpec, [])}};
|
||||
_ ->
|
||||
Acc#{Name => #{}}
|
||||
end;
|
||||
_ -> error({unknown_hookpoint, Name})
|
||||
Name = try
|
||||
binary_to_existing_atom(Name0, utf8)
|
||||
catch T:R -> {T,R}
|
||||
end,
|
||||
case {lists:member(Name, AvailableHooks),
|
||||
lists:member(Name, MessageHooks)} of
|
||||
{false, _} ->
|
||||
error({unknown_hookpoint, Name});
|
||||
{true, false} ->
|
||||
Acc#{Name => #{}};
|
||||
{true, true} ->
|
||||
Acc#{Name => #{topics => maps:get(topics, HookSpec, [])}}
|
||||
end
|
||||
end
|
||||
end, #{}, HookSpecs).
|
||||
|
@ -271,8 +280,8 @@ do_call(ChannName, Fun, Req, ReqOpts) ->
|
|||
Options = ReqOpts#{channel => ChannName},
|
||||
?LOG(debug, "Call ~0p:~0p(~0p, ~0p)", [?PB_CLIENT_MOD, Fun, Req, Options]),
|
||||
case catch apply(?PB_CLIENT_MOD, Fun, [Req, Options]) of
|
||||
{ok, Resp, _Metadata} ->
|
||||
?LOG(debug, "Response {ok, ~0p, ~0p}", [Resp, _Metadata]),
|
||||
{ok, Resp, Metadata} ->
|
||||
?LOG(debug, "Response {ok, ~0p, ~0p}", [Resp, Metadata]),
|
||||
{ok, Resp};
|
||||
{error, {Code, Msg}, _Metadata} ->
|
||||
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) response errcode: ~0p, errmsg: ~0p",
|
||||
|
|
|
@ -54,7 +54,8 @@ auto_reconnect() ->
|
|||
|
||||
request_options() ->
|
||||
#{timeout => env(request_timeout, 5000),
|
||||
request_failed_action => env(request_failed_action, deny)
|
||||
request_failed_action => env(request_failed_action, deny),
|
||||
pool_size => env(pool_size, erlang:system_info(schedulers))
|
||||
}.
|
||||
|
||||
env(Key, Def) ->
|
||||
|
@ -67,7 +68,7 @@ env(Key, Def) ->
|
|||
-spec start_grpc_client_channel(
|
||||
string(),
|
||||
uri_string:uri_string(),
|
||||
grpc_client:options()) -> {ok, pid()} | {error, term()}.
|
||||
grpc_client_sup:options()) -> {ok, pid()} | {error, term()}.
|
||||
start_grpc_client_channel(Name, SvrAddr, Options) ->
|
||||
grpc_client_sup:create_channel_pool(Name, SvrAddr, Options).
|
||||
|
||||
|
|
|
@ -299,21 +299,31 @@ on_message_publish(#{message := #{from := From} = Msg} = Req, Md) ->
|
|||
%% some cases for testing
|
||||
case From of
|
||||
<<"baduser">> ->
|
||||
NMsg = Msg#{qos => 0,
|
||||
NMsg = deny(Msg#{qos => 0,
|
||||
topic => <<"">>,
|
||||
payload => <<"">>
|
||||
},
|
||||
}),
|
||||
{ok, #{type => 'STOP_AND_RETURN',
|
||||
value => {message, NMsg}}, Md};
|
||||
<<"gooduser">> ->
|
||||
NMsg = Msg#{topic => From,
|
||||
payload => From},
|
||||
NMsg = allow(Msg#{topic => From,
|
||||
payload => From}),
|
||||
{ok, #{type => 'STOP_AND_RETURN',
|
||||
value => {message, NMsg}}, Md};
|
||||
_ ->
|
||||
{ok, #{type => 'IGNORE'}, Md}
|
||||
end.
|
||||
|
||||
deny(Msg) ->
|
||||
NHeader = maps:put(<<"allow_publish">>, <<"false">>,
|
||||
maps:get(headers, Msg, #{})),
|
||||
maps:put(headers, NHeader, Msg).
|
||||
|
||||
allow(Msg) ->
|
||||
NHeader = maps:put(<<"allow_publish">>, <<"true">>,
|
||||
maps:get(headers, Msg, #{})),
|
||||
maps:put(headers, NHeader, Msg).
|
||||
|
||||
-spec on_message_delivered(emqx_exhook_pb:message_delivered_request(), grpc:metadata())
|
||||
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
|
||||
| {error, grpc_cowboy_h:error_response()}.
|
||||
|
|
|
@ -299,19 +299,24 @@ prop_message_publish() ->
|
|||
_ ->
|
||||
ExpectedOutMsg = case emqx_message:from(Msg) of
|
||||
<<"baduser">> ->
|
||||
MsgMap = emqx_message:to_map(Msg),
|
||||
MsgMap = #{headers := Headers}
|
||||
= emqx_message:to_map(Msg),
|
||||
emqx_message:from_map(
|
||||
MsgMap#{qos => 0,
|
||||
topic => <<"">>,
|
||||
payload => <<"">>
|
||||
payload => <<"">>,
|
||||
headers => maps:put(allow_publish, false, Headers)
|
||||
});
|
||||
<<"gooduser">> = From ->
|
||||
MsgMap = emqx_message:to_map(Msg),
|
||||
MsgMap = #{headers := Headers}
|
||||
= emqx_message:to_map(Msg),
|
||||
emqx_message:from_map(
|
||||
MsgMap#{topic => From,
|
||||
payload => From
|
||||
payload => From,
|
||||
headers => maps:put(allow_publish, true, Headers)
|
||||
});
|
||||
_ -> Msg
|
||||
_ ->
|
||||
Msg
|
||||
end,
|
||||
?assertEqual(ExpectedOutMsg, OutMsg),
|
||||
|
||||
|
@ -464,7 +469,9 @@ from_message(Msg) ->
|
|||
from => stringfy(emqx_message:from(Msg)),
|
||||
topic => emqx_message:topic(Msg),
|
||||
payload => emqx_message:payload(Msg),
|
||||
timestamp => emqx_message:timestamp(Msg)
|
||||
timestamp => emqx_message:timestamp(Msg),
|
||||
headers => emqx_exhook_handler:headers(
|
||||
emqx_message:get_headers(Msg))
|
||||
}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
]}.
|
||||
|
||||
{deps,
|
||||
[{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.3"}}}
|
||||
[{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.4"}}}
|
||||
]}.
|
||||
|
||||
{grpc,
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_exproto,
|
||||
[{description, "EMQ X Extension for Protocol"},
|
||||
{vsn, "4.3.4"}, %% 4.3.3 is used by ee
|
||||
{vsn, "4.3.5"}, %% 4.3.3 is used by ee
|
||||
{modules, []},
|
||||
{registered, []},
|
||||
{mod, {emqx_exproto_app, []}},
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{VSN,
|
||||
[{"4.3.3",
|
||||
[{"4.3.4",
|
||||
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.3",
|
||||
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.2",
|
||||
|
@ -12,7 +14,9 @@
|
|||
{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||
{<<".*">>,[]}],
|
||||
[{"4.3.3",
|
||||
[{"4.3.4",
|
||||
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.3",
|
||||
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.2",
|
||||
|
|
|
@ -340,17 +340,14 @@ handle_call({unsubscribe, TopicFilter},
|
|||
handle_call({publish, Topic, Qos, Payload},
|
||||
Channel = #channel{
|
||||
conn_state = connected,
|
||||
clientinfo = ClientInfo
|
||||
= #{clientid := From,
|
||||
mountpoint := Mountpoint}}) ->
|
||||
clientinfo = ClientInfo}) ->
|
||||
case is_acl_enabled(ClientInfo) andalso
|
||||
emqx_access_control:check_acl(ClientInfo, publish, Topic) of
|
||||
deny ->
|
||||
{reply, {error, ?RESP_PERMISSION_DENY, <<"ACL deny">>}, Channel};
|
||||
_ ->
|
||||
Msg = emqx_message:make(From, Qos, Topic, Payload),
|
||||
NMsg = emqx_mountpoint:mount(Mountpoint, Msg),
|
||||
_ = emqx:publish(NMsg),
|
||||
Msg = packet_to_message(Topic, Qos, Payload, Channel),
|
||||
_ = emqx:publish(Msg),
|
||||
{reply, ok, Channel}
|
||||
end;
|
||||
|
||||
|
@ -419,6 +416,24 @@ is_anonymous(_AuthResult) -> false.
|
|||
clean_anonymous_clients() ->
|
||||
ets:delete(?CHAN_CONN_TAB, ?CHANMOCK(self())).
|
||||
|
||||
packet_to_message(Topic, Qos, Payload,
|
||||
#channel{
|
||||
conninfo = #{proto_ver := ProtoVer},
|
||||
clientinfo = #{
|
||||
protocol := Protocol,
|
||||
clientid := ClientId,
|
||||
username := Username,
|
||||
peerhost := PeerHost,
|
||||
mountpoint := Mountpoint}}) ->
|
||||
Msg = emqx_message:make(
|
||||
ClientId, Qos,
|
||||
Topic, Payload, #{},
|
||||
#{proto_ver => ProtoVer,
|
||||
protocol => Protocol,
|
||||
username => Username,
|
||||
peerhost => PeerHost}),
|
||||
emqx_mountpoint:mount(Mountpoint, Msg).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Sub/UnSub
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application,emqx_lwm2m,
|
||||
[{description,"EMQ X LwM2M Gateway"},
|
||||
{vsn, "4.3.4"}, % strict semver, bump manually!
|
||||
{vsn, "4.3.5"}, % strict semver, bump manually!
|
||||
{modules,[]},
|
||||
{registered,[emqx_lwm2m_sup]},
|
||||
{applications,[kernel,stdlib,lwm2m_coap]},
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
%% -*-: erlang -*-
|
||||
{"4.3.4",
|
||||
{VSN,
|
||||
[
|
||||
{<<"4\\.3\\.[0-1]">>, [
|
||||
{restart_application, emqx_lwm2m}
|
||||
|
@ -7,7 +7,10 @@
|
|||
{"4.3.2", [
|
||||
{load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []}
|
||||
]},
|
||||
{"4.3.3", []} %% only config change
|
||||
{"4.3.3", []}, %% only config change
|
||||
{"4.3.4", [
|
||||
{load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []}
|
||||
]}
|
||||
],
|
||||
[
|
||||
{<<"4\\.3\\.[0-1]">>, [
|
||||
|
@ -16,6 +19,9 @@
|
|||
{"4.3.2", [
|
||||
{load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []}
|
||||
]},
|
||||
{"4.3.3", []} %% only config change
|
||||
{"4.3.3", []}, %% only config change
|
||||
{"4.3.4", [
|
||||
{load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []}
|
||||
]}
|
||||
]
|
||||
}.
|
||||
|
|
|
@ -74,7 +74,8 @@ call(Pid, Msg, Timeout) ->
|
|||
Error -> {error, Error}
|
||||
end.
|
||||
|
||||
init(CoapPid, EndpointName, Peername = {_Peerhost, _Port}, RegInfo = #{<<"lt">> := LifeTime, <<"lwm2m">> := Ver}) ->
|
||||
init(CoapPid, EndpointName, Peername = {_Peerhost, _Port},
|
||||
RegInfo = #{<<"lt">> := LifeTime, <<"lwm2m">> := Ver}) ->
|
||||
Mountpoint = proplists:get_value(mountpoint, lwm2m_coap_responder:options(), ""),
|
||||
Lwm2mState = #lwm2m_state{peername = Peername,
|
||||
endpoint_name = EndpointName,
|
||||
|
@ -105,7 +106,8 @@ init(CoapPid, EndpointName, Peername = {_Peerhost, _Port}, RegInfo = #{<<"lt">>
|
|||
emqx_cm:insert_channel_info(EndpointName, info(Lwm2mState1), stats(Lwm2mState1)),
|
||||
emqx_lwm2m_cm:register_channel(EndpointName, RegInfo, LifeTime, Ver, Peername),
|
||||
|
||||
{ok, Lwm2mState1#lwm2m_state{life_timer = emqx_lwm2m_timer:start_timer(LifeTime, {life_timer, expired})}};
|
||||
NTimer = emqx_lwm2m_timer:start_timer(LifeTime, {life_timer, expired}),
|
||||
{ok, Lwm2mState1#lwm2m_state{life_timer = NTimer}};
|
||||
{error, Error} ->
|
||||
_ = run_hooks('client.connack', [conninfo(Lwm2mState), not_authorized], undefined),
|
||||
{error, Error}
|
||||
|
@ -186,7 +188,8 @@ deliver(#message{topic = Topic, payload = Payload},
|
|||
started_at = StartedAt,
|
||||
endpoint_name = EndpointName}) ->
|
||||
IsCacheMode = is_cache_mode(RegInfo, StartedAt),
|
||||
?LOG(debug, "Get MQTT message from broker, IsCacheModeNow?: ~p, Topic: ~p, Payload: ~p", [IsCacheMode, Topic, Payload]),
|
||||
?LOG(debug, "Get MQTT message from broker, IsCacheModeNow?: ~p, "
|
||||
"Topic: ~p, Payload: ~p", [IsCacheMode, Topic, Payload]),
|
||||
AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>),
|
||||
deliver_to_coap(AlternatePath, Payload, CoapPid, IsCacheMode, EndpointName),
|
||||
Lwm2mState.
|
||||
|
@ -235,8 +238,20 @@ unsubscribe(Topic, Lwm2mState = #lwm2m_state{endpoint_name = _EndpointName}) ->
|
|||
emqx_broker:unsubscribe(Topic),
|
||||
emqx_hooks:run('session.unsubscribed', [clientinfo(Lwm2mState), Topic, Opts]).
|
||||
|
||||
publish(Topic, Payload, Qos, EndpointName) ->
|
||||
emqx_broker:publish(emqx_message:set_flag(retain, false, emqx_message:make(EndpointName, Qos, Topic, Payload))).
|
||||
publish(Topic, Payload, Qos,
|
||||
#lwm2m_state{
|
||||
version = ProtoVer,
|
||||
peername = {PeerHost, _},
|
||||
endpoint_name = EndpointName}) ->
|
||||
Message = emqx_message:set_flag(
|
||||
retain, false,
|
||||
emqx_message:make(EndpointName, Qos, Topic, Payload)
|
||||
),
|
||||
NMessage = emqx_message:set_headers(
|
||||
#{proto_ver => ProtoVer,
|
||||
protocol => lwm2m,
|
||||
peerhost => PeerHost}, Message),
|
||||
emqx_broker:publish(NMessage).
|
||||
|
||||
time_now() -> erlang:system_time(millisecond).
|
||||
|
||||
|
@ -244,7 +259,8 @@ time_now() -> erlang:system_time(millisecond).
|
|||
%% Deliver downlink message to coap
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
deliver_to_coap(AlternatePath, JsonData, CoapPid, CacheMode, EndpointName) when is_binary(JsonData)->
|
||||
deliver_to_coap(AlternatePath, JsonData,
|
||||
CoapPid, CacheMode, EndpointName) when is_binary(JsonData)->
|
||||
try
|
||||
TermData = emqx_json:decode(JsonData, [return_maps]),
|
||||
deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName)
|
||||
|
@ -273,7 +289,8 @@ deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName) when
|
|||
send_to_broker(EventType, Payload = #{}, Lwm2mState) ->
|
||||
do_send_to_broker(EventType, Payload, Lwm2mState).
|
||||
|
||||
do_send_to_broker(EventType, #{<<"data">> := Data} = Payload, #lwm2m_state{endpoint_name = EndpointName} = Lwm2mState) ->
|
||||
do_send_to_broker(EventType, #{<<"data">> := Data} = Payload,
|
||||
#lwm2m_state{endpoint_name = EndpointName} = Lwm2mState) ->
|
||||
ReqPath = maps:get(<<"reqPath">>, Data, undefined),
|
||||
Code = maps:get(<<"code">>, Data, undefined),
|
||||
CodeMsg = maps:get(<<"codeMsg">>, Data, undefined),
|
||||
|
@ -281,7 +298,7 @@ do_send_to_broker(EventType, #{<<"data">> := Data} = Payload, #lwm2m_state{endpo
|
|||
emqx_lwm2m_cm:register_cmd(EndpointName, ReqPath, EventType, {Code, CodeMsg, Content}),
|
||||
NewPayload = maps:put(<<"msgType">>, EventType, Payload),
|
||||
Topic = uplink_topic(EventType, Lwm2mState),
|
||||
publish(Topic, emqx_json:encode(NewPayload), _Qos = 0, Lwm2mState#lwm2m_state.endpoint_name).
|
||||
publish(Topic, emqx_json:encode(NewPayload), _Qos = 0, Lwm2mState).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Auto Observe
|
||||
|
@ -321,12 +338,21 @@ observe_object_list(AlternatePath, ObjectList, CoapPid, EndpointName) ->
|
|||
[ObjInsId | _LastPath1] = LastPath,
|
||||
case ObjInsId of
|
||||
<<"0">> ->
|
||||
observe_object_slowly(AlternatePath, <<"/19/0/0">>, CoapPid, 100, EndpointName);
|
||||
observe_object_slowly(
|
||||
AlternatePath, <<"/19/0/0">>,
|
||||
CoapPid, 100, EndpointName
|
||||
);
|
||||
_ ->
|
||||
observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName)
|
||||
observe_object_slowly(
|
||||
AlternatePath, ObjectPath,
|
||||
CoapPid, 100, EndpointName
|
||||
)
|
||||
end;
|
||||
_ ->
|
||||
observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName)
|
||||
observe_object_slowly(
|
||||
AlternatePath, ObjectPath,
|
||||
CoapPid, 100, EndpointName
|
||||
)
|
||||
end
|
||||
end, ObjectList).
|
||||
|
||||
|
@ -380,11 +406,12 @@ get_cached_downlink_messages() ->
|
|||
is_cache_mode(RegInfo, StartedAt) ->
|
||||
case is_psm(RegInfo) orelse is_qmode(RegInfo) of
|
||||
true ->
|
||||
QModeTimeWind = proplists:get_value(qmode_time_window, lwm2m_coap_responder:options(), 22),
|
||||
Now = time_now(),
|
||||
if (Now - StartedAt) >= QModeTimeWind -> true;
|
||||
true -> false
|
||||
end;
|
||||
QModeTimeWind = proplists:get_value(
|
||||
qmode_time_window,
|
||||
lwm2m_coap_responder:options(),
|
||||
22
|
||||
),
|
||||
(time_now() - StartedAt) >= QModeTimeWind;
|
||||
false -> false
|
||||
end.
|
||||
|
||||
|
|
|
@ -50,6 +50,9 @@
|
|||
%% erlang:system_time should be unique and random enough
|
||||
-define(CLIENTID, iolist_to_binary([atom_to_list(?FUNCTION_NAME), "-",
|
||||
integer_to_list(erlang:system_time())])).
|
||||
|
||||
-elvis([{elvis_style, dont_repeat_yourself, disable}]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Setups
|
||||
%%--------------------------------------------------------------------
|
||||
|
@ -66,7 +69,9 @@ end_per_suite(_) ->
|
|||
emqx_ct_helpers:stop_apps([emqx_sn]).
|
||||
|
||||
set_special_confs(emqx) ->
|
||||
application:set_env(emqx, plugins_loaded_file,
|
||||
application:set_env(
|
||||
emqx,
|
||||
plugins_loaded_file,
|
||||
emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins"));
|
||||
set_special_confs(emqx_sn) ->
|
||||
application:set_env(emqx_sn, enable_qos3, ?ENABLE_QOS3),
|
||||
|
@ -113,7 +118,8 @@ t_subscribe(_) ->
|
|||
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||
TopicName1 = <<"abcD">>,
|
||||
send_register_msg(Socket, TopicName1, MsgId),
|
||||
?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, receive_response(Socket)),
|
||||
?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>,
|
||||
receive_response(Socket)),
|
||||
send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1,
|
||||
CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16,
|
||||
|
@ -145,7 +151,8 @@ t_subscribe_case01(_) ->
|
|||
|
||||
TopicName1 = <<"abcD">>,
|
||||
send_register_msg(Socket, TopicName1, MsgId),
|
||||
?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, receive_response(Socket)),
|
||||
?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>,
|
||||
receive_response(Socket)),
|
||||
send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
|
||||
?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ReturnCode>>,
|
||||
|
@ -166,17 +173,18 @@ t_subscribe_case02(_) ->
|
|||
Will = 0,
|
||||
CleanSession = 0,
|
||||
MsgId = 1,
|
||||
TopicId = ?PREDEF_TOPIC_ID1, %this TopicId is the predefined topic id corresponding to ?PREDEF_TOPIC_NAME1
|
||||
TopicId = ?PREDEF_TOPIC_ID1,
|
||||
ReturnCode = 0,
|
||||
{ok, Socket} = gen_udp:open(0, [binary]),
|
||||
|
||||
ClientId = ?CLIENTID,
|
||||
send_connect_msg(Socket, ?CLIENTID),
|
||||
send_connect_msg(Socket, ClientId),
|
||||
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||
|
||||
Topic1 = ?PREDEF_TOPIC_NAME1,
|
||||
send_register_msg(Socket, Topic1, MsgId),
|
||||
?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, receive_response(Socket)),
|
||||
?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>,
|
||||
receive_response(Socket)),
|
||||
send_subscribe_msg_predefined_topic(Socket, QoS, TopicId, MsgId),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
|
||||
?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ReturnCode>>,
|
||||
|
@ -206,9 +214,11 @@ t_subscribe_case03(_) ->
|
|||
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||
|
||||
send_subscribe_msg_short_topic(Socket, QoS, <<"te">>, MsgId),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
|
||||
?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ReturnCode>>,
|
||||
receive_response(Socket)),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1,
|
||||
CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||
TopicId:16, MsgId:16, ReturnCode>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
|
||||
send_unsubscribe_msg_short_topic(Socket, <<"te">>, MsgId),
|
||||
?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)),
|
||||
|
@ -217,8 +227,12 @@ t_subscribe_case03(_) ->
|
|||
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
||||
gen_udp:close(Socket).
|
||||
|
||||
%%In this case We use predefined topic name to register and subcribe, and expect to receive the corresponding predefined topic id but not a new generated topic id from broker. We design this case to illustrate
|
||||
%% emqx_sn_gateway's compatibility of dealing with predefined and normal topics. Once we give more restrictions to different topic id type, this case would be deleted or modified.
|
||||
%% In this case We use predefined topic name to register and subcribe, and
|
||||
%% expect to receive the corresponding predefined topic id but not a new
|
||||
%% generated topic id from broker. We design this case to illustrate
|
||||
%% emqx_sn_gateway's compatibility of dealing with predefined and normal topics.
|
||||
%% Once we give more restrictions to different topic id type, this case would
|
||||
%% be deleted or modified.
|
||||
t_subscribe_case04(_) ->
|
||||
Dup = 0,
|
||||
QoS = 0,
|
||||
|
@ -226,7 +240,7 @@ t_subscribe_case04(_) ->
|
|||
Will = 0,
|
||||
CleanSession = 0,
|
||||
MsgId = 1,
|
||||
TopicId = ?PREDEF_TOPIC_ID1, %this TopicId is the predefined topic id corresponding to ?PREDEF_TOPIC_NAME1
|
||||
TopicId = ?PREDEF_TOPIC_ID1,
|
||||
ReturnCode = 0,
|
||||
{ok, Socket} = gen_udp:open(0, [binary]),
|
||||
ClientId = ?CLIENTID,
|
||||
|
@ -234,10 +248,14 @@ t_subscribe_case04(_) ->
|
|||
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||
Topic1 = ?PREDEF_TOPIC_NAME1,
|
||||
send_register_msg(Socket, Topic1, MsgId),
|
||||
?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, receive_response(Socket)),
|
||||
send_subscribe_msg_normal_topic(Socket, QoS, Topic1, MsgId),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ReturnCode>>,
|
||||
?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>,
|
||||
receive_response(Socket)),
|
||||
send_subscribe_msg_normal_topic(Socket, QoS, Topic1, MsgId),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
|
||||
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||
TopicId:16, MsgId:16, ReturnCode>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
|
||||
send_unsubscribe_msg_normal_topic(Socket, Topic1, MsgId),
|
||||
?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)),
|
||||
|
@ -264,19 +282,30 @@ t_subscribe_case05(_) ->
|
|||
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||
|
||||
send_register_msg(Socket, <<"abcD">>, MsgId),
|
||||
?assertEqual(<<7, ?SN_REGACK, TopicId1:16, MsgId:16, 0:8>>, receive_response(Socket)),
|
||||
?assertEqual(<<7, ?SN_REGACK, TopicId1:16, MsgId:16, 0:8>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
|
||||
send_subscribe_msg_normal_topic(Socket, QoS, <<"abcD">>, MsgId),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ReturnCode>>,
|
||||
receive_response(Socket)),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
|
||||
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||
TopicId1:16, MsgId:16, ReturnCode>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
|
||||
send_subscribe_msg_normal_topic(Socket, QoS, <<"/sport/#">>, MsgId),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ReturnCode>>,
|
||||
receive_response(Socket)),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
|
||||
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||
TopicId0:16, MsgId:16, ReturnCode>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
|
||||
send_subscribe_msg_normal_topic(Socket, QoS, <<"/a/+/water">>, MsgId),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ReturnCode>>,
|
||||
receive_response(Socket)),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
|
||||
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||
TopicId0:16, MsgId:16, ReturnCode>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
|
||||
send_subscribe_msg_normal_topic(Socket, QoS, <<"/Tom/Home">>, MsgId),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
|
||||
|
@ -306,19 +335,32 @@ t_subscribe_case06(_) ->
|
|||
|
||||
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||
send_register_msg(Socket, <<"abc">>, MsgId),
|
||||
?assertEqual(<<7, ?SN_REGACK, TopicId1:16, MsgId:16, 0:8>>, receive_response(Socket)),
|
||||
?assertEqual(<<7, ?SN_REGACK, TopicId1:16, MsgId:16, 0:8>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
|
||||
send_register_msg(Socket, <<"/blue/#">>, MsgId),
|
||||
?assertEqual(<<7, ?SN_REGACK, TopicId0:16, MsgId:16, ?SN_RC_NOT_SUPPORTED:8>>, receive_response(Socket)),
|
||||
?assertEqual(<<7, ?SN_REGACK, TopicId0:16,
|
||||
MsgId:16, ?SN_RC_NOT_SUPPORTED:8>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
|
||||
send_register_msg(Socket, <<"/blue/+/white">>, MsgId),
|
||||
?assertEqual(<<7, ?SN_REGACK, TopicId0:16, MsgId:16, ?SN_RC_NOT_SUPPORTED:8>>, receive_response(Socket)),
|
||||
?assertEqual(<<7, ?SN_REGACK, TopicId0:16,
|
||||
MsgId:16, ?SN_RC_NOT_SUPPORTED:8>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
send_register_msg(Socket, <<"/$sys/rain">>, MsgId),
|
||||
?assertEqual(<<7, ?SN_REGACK, TopicId2:16, MsgId:16, 0:8>>, receive_response(Socket)),
|
||||
?assertEqual(<<7, ?SN_REGACK, TopicId2:16, MsgId:16, 0:8>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
|
||||
send_subscribe_msg_short_topic(Socket, QoS, <<"Q2">>, MsgId),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ReturnCode>>,
|
||||
receive_response(Socket)),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
|
||||
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||
TopicId0:16, MsgId:16, ReturnCode>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
|
||||
send_unsubscribe_msg_normal_topic(Socket, <<"Q2">>, MsgId),
|
||||
?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)),
|
||||
|
@ -342,8 +384,11 @@ t_subscribe_case07(_) ->
|
|||
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||
|
||||
send_subscribe_msg_predefined_topic(Socket, QoS, TopicId1, MsgId),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>,
|
||||
receive_response(Socket)),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
|
||||
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||
TopicId1:16, MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
|
||||
send_unsubscribe_msg_predefined_topic(Socket, TopicId2, MsgId),
|
||||
?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)),
|
||||
|
@ -365,8 +410,11 @@ t_subscribe_case08(_) ->
|
|||
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||
|
||||
send_subscribe_msg_reserved_topic(Socket, QoS, TopicId2, MsgId),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, ?SN_INVALID_TOPIC_ID:16, MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>,
|
||||
receive_response(Socket)),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
|
||||
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||
?SN_INVALID_TOPIC_ID:16, MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
|
||||
send_disconnect_msg(Socket, undefined),
|
||||
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
||||
|
@ -390,15 +438,20 @@ t_publish_negqos_case09(_) ->
|
|||
|
||||
|
||||
send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||
receive_response(Socket)),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
|
||||
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||
TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
MsgId1 = 3,
|
||||
Payload1 = <<20, 21, 22, 23>>,
|
||||
send_publish_msg_normal_topic(Socket, NegQoS, MsgId1, TopicId1, Payload1),
|
||||
timer:sleep(100),
|
||||
case ?ENABLE_QOS3 of
|
||||
true ->
|
||||
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
|
||||
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1,
|
||||
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||
TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
|
||||
What = receive_response(Socket),
|
||||
?assertEqual(Eexp, What)
|
||||
end,
|
||||
|
@ -431,7 +484,9 @@ t_publish_qos0_case01(_) ->
|
|||
send_publish_msg_normal_topic(Socket, QoS, MsgId1, TopicId1, Payload1),
|
||||
timer:sleep(100),
|
||||
|
||||
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
|
||||
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1,
|
||||
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||
TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
|
||||
What = receive_response(Socket),
|
||||
?assertEqual(Eexp, What),
|
||||
|
||||
|
@ -453,15 +508,20 @@ t_publish_qos0_case02(_) ->
|
|||
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||
|
||||
send_subscribe_msg_predefined_topic(Socket, QoS, PredefTopicId, MsgId),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||
receive_response(Socket)),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
|
||||
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||
PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
|
||||
MsgId1 = 3,
|
||||
Payload1 = <<20, 21, 22, 23>>,
|
||||
send_publish_msg_predefined_topic(Socket, QoS, MsgId1, PredefTopicId, Payload1),
|
||||
timer:sleep(100),
|
||||
|
||||
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_PREDEFINED_TOPIC:2, PredefTopicId:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
|
||||
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1,
|
||||
Will:1, CleanSession:1, ?SN_PREDEFINED_TOPIC:2,
|
||||
PredefTopicId:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
|
||||
What = receive_response(Socket),
|
||||
?assertEqual(Eexp, What),
|
||||
|
||||
|
@ -484,15 +544,20 @@ t_publish_qos0_case3(_) ->
|
|||
|
||||
Topic = <<"/a/b/c">>,
|
||||
send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||
receive_response(Socket)),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
|
||||
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||
TopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
|
||||
MsgId1 = 3,
|
||||
Payload1 = <<20, 21, 22, 23>>,
|
||||
send_publish_msg_predefined_topic(Socket, QoS, MsgId1, TopicId, Payload1),
|
||||
timer:sleep(100),
|
||||
|
||||
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
|
||||
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1,
|
||||
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||
TopicId:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
|
||||
What = receive_response(Socket),
|
||||
?assertEqual(Eexp, What),
|
||||
|
||||
|
@ -514,8 +579,11 @@ t_publish_qos0_case04(_) ->
|
|||
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||
|
||||
send_subscribe_msg_normal_topic(Socket, QoS, <<"#">>, MsgId),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||
receive_response(Socket)),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
|
||||
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||
TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
|
||||
MsgId1 = 2,
|
||||
Payload1 = <<20, 21, 22, 23>>,
|
||||
|
@ -523,7 +591,9 @@ t_publish_qos0_case04(_) ->
|
|||
send_publish_msg_short_topic(Socket, QoS, MsgId1, Topic, Payload1),
|
||||
timer:sleep(100),
|
||||
|
||||
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_SHORT_TOPIC:2, Topic/binary, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
|
||||
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1,
|
||||
Will:1, CleanSession:1, ?SN_SHORT_TOPIC:2,
|
||||
Topic/binary, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
|
||||
What = receive_response(Socket),
|
||||
?assertEqual(Eexp, What),
|
||||
|
||||
|
@ -544,8 +614,11 @@ t_publish_qos0_case05(_) ->
|
|||
send_connect_msg(Socket, ClientId),
|
||||
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||
send_subscribe_msg_short_topic(Socket, QoS, <<"/#">>, MsgId),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||
receive_response(Socket)),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
|
||||
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||
TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
send_disconnect_msg(Socket, undefined),
|
||||
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
||||
|
||||
|
@ -567,15 +640,20 @@ t_publish_qos0_case06(_) ->
|
|||
|
||||
Topic = <<"abc">>,
|
||||
send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||
receive_response(Socket)),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
|
||||
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||
TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
|
||||
MsgId1 = 3,
|
||||
Payload1 = <<20, 21, 22, 23>>,
|
||||
send_publish_msg_normal_topic(Socket, QoS, MsgId1, TopicId1, Payload1),
|
||||
timer:sleep(100),
|
||||
|
||||
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
|
||||
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1,
|
||||
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||
TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
|
||||
What = receive_response(Socket),
|
||||
?assertEqual(Eexp, What),
|
||||
|
||||
|
@ -597,16 +675,25 @@ t_publish_qos1_case01(_) ->
|
|||
send_connect_msg(Socket, ClientId),
|
||||
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||
send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
|
||||
?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||
receive_response(Socket)),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
|
||||
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||
TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
|
||||
Payload1 = <<20, 21, 22, 23>>,
|
||||
send_publish_msg_normal_topic(Socket, QoS, MsgId, TopicId1, Payload1),
|
||||
?assertEqual(<<7, ?SN_PUBACK, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket)),
|
||||
?assertEqual(<<7, ?SN_PUBACK, TopicId1:16,
|
||||
MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
timer:sleep(100),
|
||||
|
||||
?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, <<20, 21, 22, 23>>/binary>>, receive_response(Socket)),
|
||||
?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1,
|
||||
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||
TopicId1:16, MsgId:16, <<20, 21, 22, 23>>/binary>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
|
||||
send_disconnect_msg(Socket, undefined),
|
||||
gen_udp:close(Socket).
|
||||
|
@ -625,12 +712,18 @@ t_publish_qos1_case02(_) ->
|
|||
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||
|
||||
send_subscribe_msg_predefined_topic(Socket, QoS, PredefTopicId, MsgId),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||
receive_response(Socket)),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
|
||||
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||
PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
|
||||
Payload1 = <<20, 21, 22, 23>>,
|
||||
send_publish_msg_predefined_topic(Socket, QoS, MsgId, PredefTopicId, Payload1),
|
||||
?assertEqual(<<7, ?SN_PUBACK, PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket)),
|
||||
?assertEqual(<<7, ?SN_PUBACK, PredefTopicId:16,
|
||||
MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
timer:sleep(100),
|
||||
|
||||
send_disconnect_msg(Socket, undefined),
|
||||
|
@ -645,7 +738,10 @@ t_publish_qos1_case03(_) ->
|
|||
send_connect_msg(Socket, ClientId),
|
||||
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||
send_publish_msg_predefined_topic(Socket, QoS, MsgId, tid(5), <<20, 21, 22, 23>>),
|
||||
?assertEqual(<<7, ?SN_PUBACK, TopicId5:16, MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>, receive_response(Socket)),
|
||||
?assertEqual(<<7, ?SN_PUBACK, TopicId5:16,
|
||||
MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
|
||||
send_disconnect_msg(Socket, undefined),
|
||||
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
||||
|
@ -664,15 +760,20 @@ t_publish_qos1_case04(_) ->
|
|||
send_connect_msg(Socket, ClientId),
|
||||
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||
send_subscribe_msg_short_topic(Socket, QoS, <<"ab">>, MsgId),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
|
||||
?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||
receive_response(Socket)),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
|
||||
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||
TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
|
||||
Topic = <<"ab">>,
|
||||
Payload1 = <<20, 21, 22, 23>>,
|
||||
send_publish_msg_short_topic(Socket, QoS, MsgId, Topic, Payload1),
|
||||
<<TopicIdShort:16>> = Topic,
|
||||
?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket)),
|
||||
?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16,
|
||||
MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
timer:sleep(100),
|
||||
|
||||
send_disconnect_msg(Socket, undefined),
|
||||
|
@ -692,13 +793,18 @@ t_publish_qos1_case05(_) ->
|
|||
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||
|
||||
send_subscribe_msg_normal_topic(Socket, QoS, <<"ab">>, MsgId),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
|
||||
?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||
receive_response(Socket)),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
|
||||
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||
TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
|
||||
send_publish_msg_short_topic(Socket, QoS, MsgId, <<"/#">>, <<20, 21, 22, 23>>),
|
||||
<<TopicIdShort:16>> = <<"/#">>,
|
||||
?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16, MsgId:16, ?SN_RC_NOT_SUPPORTED>>, receive_response(Socket)),
|
||||
?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16,
|
||||
MsgId:16, ?SN_RC_NOT_SUPPORTED>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
|
||||
send_disconnect_msg(Socket, undefined),
|
||||
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
||||
|
@ -724,7 +830,10 @@ t_publish_qos1_case06(_) ->
|
|||
|
||||
send_publish_msg_short_topic(Socket, QoS, MsgId, <<"/+">>, <<20, 21, 22, 23>>),
|
||||
<<TopicIdShort:16>> = <<"/+">>,
|
||||
?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16, MsgId:16, ?SN_RC_NOT_SUPPORTED>>, receive_response(Socket)),
|
||||
?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16,
|
||||
MsgId:16, ?SN_RC_NOT_SUPPORTED>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
|
||||
send_disconnect_msg(Socket, undefined),
|
||||
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
|
||||
|
@ -751,7 +860,11 @@ t_publish_qos2_case01(_) ->
|
|||
send_publish_msg_normal_topic(Socket, QoS, MsgId, TopicId1, Payload1),
|
||||
?assertEqual(<<4, ?SN_PUBREC, MsgId:16>>, receive_response(Socket)),
|
||||
send_pubrel_msg(Socket, MsgId),
|
||||
?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, 1:16, <<20, 21, 22, 23>>/binary>>, receive_response(Socket)),
|
||||
?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1,
|
||||
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||
TopicId1:16, 1:16, <<20, 21, 22, 23>>/binary>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
?assertEqual(<<4, ?SN_PUBCOMP, MsgId:16>>, receive_response(Socket)),
|
||||
timer:sleep(100),
|
||||
|
||||
|
@ -773,15 +886,21 @@ t_publish_qos2_case02(_) ->
|
|||
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
|
||||
|
||||
send_subscribe_msg_predefined_topic(Socket, QoS, PredefTopicId, MsgId),
|
||||
?assertEqual(<<8, ?SN_SUBACK, ?FNU:1, QoS:2, ?FNU:5, PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||
receive_response(Socket)),
|
||||
?assertEqual(<<8, ?SN_SUBACK, ?FNU:1, QoS:2, ?FNU:5,
|
||||
PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
|
||||
Payload1 = <<20, 21, 22, 23>>,
|
||||
send_publish_msg_predefined_topic(Socket, QoS, MsgId, PredefTopicId, Payload1),
|
||||
?assertEqual(<<4, ?SN_PUBREC, MsgId:16>>, receive_response(Socket)),
|
||||
send_pubrel_msg(Socket, MsgId),
|
||||
|
||||
?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_PREDEFINED_TOPIC :2, PredefTopicId:16, 1:16, <<20, 21, 22, 23>>/binary>>, receive_response(Socket)),
|
||||
?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1,
|
||||
Will:1, CleanSession:1, ?SN_PREDEFINED_TOPIC:2,
|
||||
PredefTopicId:16, 1:16, <<20, 21, 22, 23>>/binary>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
?assertEqual(<<4, ?SN_PUBCOMP, MsgId:16>>, receive_response(Socket)),
|
||||
|
||||
timer:sleep(100),
|
||||
|
@ -812,7 +931,11 @@ t_publish_qos2_case03(_) ->
|
|||
?assertEqual(<<4, ?SN_PUBREC, MsgId:16>>, receive_response(Socket)),
|
||||
send_pubrel_msg(Socket, MsgId),
|
||||
|
||||
?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_SHORT_TOPIC :2, <<"/a">>/binary, 1:16, <<20, 21, 22, 23>>/binary>>, receive_response(Socket)),
|
||||
?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1,
|
||||
Will:1, CleanSession:1, ?SN_SHORT_TOPIC:2,
|
||||
"/a", 1:16, <<20, 21, 22, 23>>/binary>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
?assertEqual(<<4, ?SN_PUBCOMP, MsgId:16>>, receive_response(Socket)),
|
||||
timer:sleep(100),
|
||||
|
||||
|
@ -1083,7 +1206,11 @@ t_asleep_test03_to_awake_qos1_dl_msg(_) ->
|
|||
send_register_msg(Socket, TopicName1, MsgId1),
|
||||
?assertEqual(<<7, ?SN_REGACK, TopicId1:16, MsgId1:16, 0:8>>, receive_response(Socket)),
|
||||
send_subscribe_msg_predefined_topic(Socket, QoS, TopicId1, MsgId),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ReturnCode>>, receive_response(Socket)),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
|
||||
WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||
TopicId1:16, MsgId:16, ReturnCode>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
|
||||
% goto asleep state
|
||||
send_disconnect_msg(Socket, 1),
|
||||
|
@ -1109,7 +1236,10 @@ t_asleep_test03_to_awake_qos1_dl_msg(_) ->
|
|||
|
||||
%% the broker should sent dl msgs to the awake client before sending the pingresp
|
||||
UdpData = receive_response(Socket),
|
||||
MsgId_udp = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicId1, Payload1}, UdpData),
|
||||
MsgId_udp = check_publish_msg_on_udp(
|
||||
{Dup, QoS, Retain, WillBit,
|
||||
CleanSession, ?SN_NORMAL_TOPIC,
|
||||
TopicId1, Payload1}, UdpData),
|
||||
send_puback_msg(Socket, TopicId1, MsgId_udp),
|
||||
|
||||
%% check the pingresp is received at last
|
||||
|
@ -1141,8 +1271,11 @@ t_asleep_test04_to_awake_qos1_dl_msg(_) ->
|
|||
CleanSession = 0,
|
||||
ReturnCode = 0,
|
||||
send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId1),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId1:16, ReturnCode>>,
|
||||
receive_response(Socket)),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
|
||||
WillBit:1,CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||
TopicId0:16, MsgId1:16, ReturnCode>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
|
||||
% goto asleep state
|
||||
send_disconnect_msg(Socket, 1),
|
||||
|
@ -1176,11 +1309,17 @@ t_asleep_test04_to_awake_qos1_dl_msg(_) ->
|
|||
send_regack_msg(Socket, TopicIdNew, MsgId3),
|
||||
|
||||
UdpData2 = receive_response(Socket),
|
||||
MsgId_udp2 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload1}, UdpData2),
|
||||
MsgId_udp2 = check_publish_msg_on_udp(
|
||||
{Dup, QoS, Retain, WillBit,
|
||||
CleanSession, ?SN_NORMAL_TOPIC,
|
||||
TopicIdNew, Payload1}, UdpData2),
|
||||
send_puback_msg(Socket, TopicIdNew, MsgId_udp2),
|
||||
|
||||
UdpData3 = receive_response(Socket),
|
||||
MsgId_udp3 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload2}, UdpData3),
|
||||
MsgId_udp3 = check_publish_msg_on_udp(
|
||||
{Dup, QoS, Retain, WillBit,
|
||||
CleanSession, ?SN_NORMAL_TOPIC,
|
||||
TopicIdNew, Payload2}, UdpData3),
|
||||
send_puback_msg(Socket, TopicIdNew, MsgId_udp3),
|
||||
|
||||
?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
|
||||
|
@ -1216,8 +1355,11 @@ t_asleep_test05_to_awake_qos1_dl_msg(_) ->
|
|||
CleanSession = 0,
|
||||
ReturnCode = 0,
|
||||
send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId1),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId1:16, ReturnCode>>,
|
||||
receive_response(Socket)),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
|
||||
WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||
TopicId0:16, MsgId1:16, ReturnCode>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
|
||||
% goto asleep state
|
||||
SleepDuration = 30,
|
||||
|
@ -1250,19 +1392,26 @@ t_asleep_test05_to_awake_qos1_dl_msg(_) ->
|
|||
send_regack_msg(Socket, TopicIdNew, MsgId_reg),
|
||||
|
||||
UdpData2 = receive_response(Socket),
|
||||
MsgId2 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload2}, UdpData2),
|
||||
MsgId2 = check_publish_msg_on_udp(
|
||||
{Dup, QoS, Retain, WillBit,
|
||||
CleanSession, ?SN_NORMAL_TOPIC,
|
||||
TopicIdNew, Payload2}, UdpData2),
|
||||
send_puback_msg(Socket, TopicIdNew, MsgId2),
|
||||
timer:sleep(50),
|
||||
|
||||
UdpData3 = wrap_receive_response(Socket),
|
||||
MsgId3 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload3}, UdpData3),
|
||||
MsgId3 = check_publish_msg_on_udp(
|
||||
{Dup, QoS, Retain, WillBit,
|
||||
CleanSession, ?SN_NORMAL_TOPIC,
|
||||
TopicIdNew, Payload3}, UdpData3),
|
||||
send_puback_msg(Socket, TopicIdNew, MsgId3),
|
||||
timer:sleep(50),
|
||||
|
||||
case receive_response(Socket) of
|
||||
<<2,23>> -> ok;
|
||||
UdpData4 ->
|
||||
MsgId4 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit,
|
||||
MsgId4 = check_publish_msg_on_udp(
|
||||
{Dup, QoS, Retain, WillBit,
|
||||
CleanSession, ?SN_NORMAL_TOPIC,
|
||||
TopicIdNew, Payload4}, UdpData4),
|
||||
send_puback_msg(Socket, TopicIdNew, MsgId4)
|
||||
|
@ -1322,7 +1471,10 @@ t_asleep_test06_to_awake_qos2_dl_msg(_) ->
|
|||
send_pingreq_msg(Socket, ClientId),
|
||||
|
||||
UdpData = wrap_receive_response(Socket),
|
||||
MsgId_udp = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicId_tom, Payload1}, UdpData),
|
||||
MsgId_udp = check_publish_msg_on_udp(
|
||||
{Dup, QoS, Retain, WillBit,
|
||||
CleanSession, ?SN_NORMAL_TOPIC,
|
||||
TopicId_tom, Payload1}, UdpData),
|
||||
send_pubrec_msg(Socket, MsgId_udp),
|
||||
?assertMatch(<<_:8, ?SN_PUBREL:8, _/binary>>, receive_response(Socket)),
|
||||
send_pubcomp_msg(Socket, MsgId_udp),
|
||||
|
@ -1357,8 +1509,11 @@ t_asleep_test07_to_connected(_) ->
|
|||
send_register_msg(Socket, TopicName_tom, MsgId1),
|
||||
TopicId_tom = check_regack_msg_on_udp(MsgId1, receive_response(Socket)),
|
||||
send_subscribe_msg_predefined_topic(Socket, QoS, TopicId_tom, MsgId1),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId_tom:16, MsgId1:16, ReturnCode>>,
|
||||
receive_response(Socket)),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
|
||||
WillBit:1,CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||
TopicId_tom:16, MsgId1:16, ReturnCode>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
|
||||
% goto asleep state
|
||||
send_disconnect_msg(Socket, SleepDuration),
|
||||
|
@ -1436,8 +1591,11 @@ t_asleep_test09_to_awake_again_qos1_dl_msg(_) ->
|
|||
CleanSession = 0,
|
||||
ReturnCode = 0,
|
||||
send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId1),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId1:16, ReturnCode>>,
|
||||
receive_response(Socket)),
|
||||
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
|
||||
WillBit:1,CleanSession:1, ?SN_NORMAL_TOPIC:2,
|
||||
TopicId0:16, MsgId1:16, ReturnCode>>,
|
||||
receive_response(Socket)
|
||||
),
|
||||
% goto asleep state
|
||||
SleepDuration = 30,
|
||||
send_disconnect_msg(Socket, SleepDuration),
|
||||
|
@ -1471,7 +1629,10 @@ t_asleep_test09_to_awake_again_qos1_dl_msg(_) ->
|
|||
udp_receive_timeout ->
|
||||
ok;
|
||||
UdpData2 ->
|
||||
MsgId2 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload2}, UdpData2),
|
||||
MsgId2 = check_publish_msg_on_udp(
|
||||
{Dup, QoS, Retain, WillBit,
|
||||
CleanSession, ?SN_NORMAL_TOPIC,
|
||||
TopicIdNew, Payload2}, UdpData2),
|
||||
send_puback_msg(Socket, TopicIdNew, MsgId2)
|
||||
end,
|
||||
timer:sleep(100),
|
||||
|
@ -1480,7 +1641,10 @@ t_asleep_test09_to_awake_again_qos1_dl_msg(_) ->
|
|||
udp_receive_timeout ->
|
||||
ok;
|
||||
UdpData3 ->
|
||||
MsgId3 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload3}, UdpData3),
|
||||
MsgId3 = check_publish_msg_on_udp(
|
||||
{Dup, QoS, Retain, WillBit,
|
||||
CleanSession, ?SN_NORMAL_TOPIC,
|
||||
TopicIdNew, Payload3}, UdpData3),
|
||||
send_puback_msg(Socket, TopicIdNew, MsgId3)
|
||||
end,
|
||||
timer:sleep(100),
|
||||
|
@ -1489,7 +1653,8 @@ t_asleep_test09_to_awake_again_qos1_dl_msg(_) ->
|
|||
udp_receive_timeout ->
|
||||
ok;
|
||||
UdpData4 ->
|
||||
MsgId4 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit,
|
||||
MsgId4 = check_publish_msg_on_udp(
|
||||
{Dup, QoS, Retain, WillBit,
|
||||
CleanSession, ?SN_NORMAL_TOPIC,
|
||||
TopicIdNew, Payload4}, UdpData4),
|
||||
send_puback_msg(Socket, TopicIdNew, MsgId4)
|
||||
|
@ -1498,7 +1663,8 @@ t_asleep_test09_to_awake_again_qos1_dl_msg(_) ->
|
|||
|
||||
%% send PINGREQ again to enter awake state
|
||||
send_pingreq_msg(Socket, ClientId),
|
||||
%% will not receive any buffered PUBLISH messages buffered before last awake, only receive PINGRESP here
|
||||
%% will not receive any buffered PUBLISH messages buffered before last
|
||||
%% awake, only receive PINGRESP here
|
||||
?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
|
||||
|
||||
gen_udp:close(Socket).
|
||||
|
@ -1901,8 +2067,12 @@ check_dispatched_message(Dup, QoS, Retain, TopicIdType, TopicId, Payload, Socket
|
|||
PubMsg = receive_response(Socket),
|
||||
Length = 7 + byte_size(Payload),
|
||||
?LOG("check_dispatched_message ~p~n", [PubMsg]),
|
||||
?LOG("expected ~p xx ~p~n", [<<Length, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, ?FNU:2, TopicIdType:2, TopicId:16>>, Payload]),
|
||||
<<Length, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, ?FNU:2, TopicIdType:2, TopicId:16, MsgId:16, Payload/binary>> = PubMsg,
|
||||
?LOG("expected ~p xx ~p~n",
|
||||
[<<Length, ?SN_PUBLISH,
|
||||
Dup:1, QoS:2, Retain:1, ?FNU:2, TopicIdType:2, TopicId:16>>, Payload]),
|
||||
<<Length, ?SN_PUBLISH,
|
||||
Dup:1, QoS:2, Retain:1, ?FNU:2, TopicIdType:2,
|
||||
TopicId:16, MsgId:16, Payload/binary>> = PubMsg,
|
||||
case QoS of
|
||||
0 -> ok;
|
||||
1 -> send_puback_msg(Socket, TopicId, MsgId);
|
||||
|
@ -1914,11 +2084,14 @@ check_dispatched_message(Dup, QoS, Retain, TopicIdType, TopicId, Payload, Socket
|
|||
get_udp_broadcast_address() ->
|
||||
"255.255.255.255".
|
||||
|
||||
check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, TopicType, TopicId, Payload}, UdpData) ->
|
||||
check_publish_msg_on_udp({Dup, QoS, Retain, WillBit,
|
||||
CleanSession, TopicType, TopicId, Payload}, UdpData) ->
|
||||
<<HeaderUdp:5/binary, MsgId:16, PayloadIn/binary>> = UdpData,
|
||||
ct:pal("UdpData: ~p, Payload: ~p, PayloadIn: ~p", [UdpData, Payload, PayloadIn]),
|
||||
Size9 = byte_size(Payload) + 7,
|
||||
Eexp = <<Size9:8, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, TopicType:2, TopicId:16>>,
|
||||
Eexp = <<Size9:8,
|
||||
?SN_PUBLISH, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1,
|
||||
TopicType:2, TopicId:16>>,
|
||||
?assertEqual(Eexp, HeaderUdp), % mqtt-sn header should be same
|
||||
?assertEqual(Payload, PayloadIn), % payload should be same
|
||||
MsgId.
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{application, emqx_stomp,
|
||||
[{description, "EMQ X Stomp Protocol Plugin"},
|
||||
{vsn, "4.3.2"}, % strict semver, bump manually!
|
||||
{vsn, "4.3.3"}, % strict semver, bump manually!
|
||||
{modules, []},
|
||||
{registered, [emqx_stomp_sup]},
|
||||
{applications, [kernel,stdlib]},
|
||||
|
|
|
@ -1,10 +1,16 @@
|
|||
%% -*- mode: erlang -*-
|
||||
{VSN,
|
||||
[{"4.3.1",[{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
|
||||
[{"4.3.2",[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.1",[
|
||||
{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.0",
|
||||
[{restart_application,emqx_stomp}]},
|
||||
{<<".*">>,[]}],
|
||||
[{"4.3.1",[{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
|
||||
[{"4.3.2",[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.1",[
|
||||
{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]},
|
||||
{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
|
||||
{"4.3.0",
|
||||
[{restart_application,emqx_stomp}]},
|
||||
{<<".*">>,[]}]}.
|
||||
|
|
|
@ -108,6 +108,8 @@
|
|||
, init/2
|
||||
]}).
|
||||
|
||||
-elvis([{elvis_style, dont_repeat_yourself, disable}]).
|
||||
|
||||
-type(pstate() :: #pstate{}).
|
||||
|
||||
%% @doc Init protocol
|
||||
|
@ -132,7 +134,6 @@ init(ConnInfo = #{peername := {PeerHost, _Port},
|
|||
|
||||
AllowAnonymous = get_value(allow_anonymous, Opts, false),
|
||||
DefaultUser = get_value(default_user, Opts),
|
||||
|
||||
#pstate{
|
||||
conninfo = NConnInfo,
|
||||
clientinfo = ClientInfo,
|
||||
|
@ -288,7 +289,12 @@ received(#stomp_frame{command = <<"CONNECT">>}, State = #pstate{connected = true
|
|||
received(Frame = #stomp_frame{command = <<"SEND">>, headers = Headers}, State) ->
|
||||
case header(<<"transaction">>, Headers) of
|
||||
undefined -> {ok, handle_recv_send_frame(Frame, State)};
|
||||
TransactionId -> add_action(TransactionId, {fun ?MODULE:handle_recv_send_frame/2, [Frame]}, receipt_id(Headers), State)
|
||||
TransactionId ->
|
||||
add_action(TransactionId,
|
||||
{fun ?MODULE:handle_recv_send_frame/2, [Frame]},
|
||||
receipt_id(Headers),
|
||||
State
|
||||
)
|
||||
end;
|
||||
|
||||
received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers},
|
||||
|
@ -346,7 +352,11 @@ received(#stomp_frame{command = <<"UNSUBSCRIBE">>, headers = Headers},
|
|||
received(Frame = #stomp_frame{command = <<"ACK">>, headers = Headers}, State) ->
|
||||
case header(<<"transaction">>, Headers) of
|
||||
undefined -> {ok, handle_recv_ack_frame(Frame, State)};
|
||||
TransactionId -> add_action(TransactionId, {fun ?MODULE:handle_recv_ack_frame/2, [Frame]}, receipt_id(Headers), State)
|
||||
TransactionId ->
|
||||
add_action(TransactionId,
|
||||
{fun ?MODULE:handle_recv_ack_frame/2, [Frame]},
|
||||
receipt_id(Headers),
|
||||
State)
|
||||
end;
|
||||
|
||||
%% NACK
|
||||
|
@ -357,7 +367,11 @@ received(Frame = #stomp_frame{command = <<"ACK">>, headers = Headers}, State) ->
|
|||
received(Frame = #stomp_frame{command = <<"NACK">>, headers = Headers}, State) ->
|
||||
case header(<<"transaction">>, Headers) of
|
||||
undefined -> {ok, handle_recv_nack_frame(Frame, State)};
|
||||
TransactionId -> add_action(TransactionId, {fun ?MODULE:handle_recv_nack_frame/2, [Frame]}, receipt_id(Headers), State)
|
||||
TransactionId ->
|
||||
add_action(TransactionId,
|
||||
{fun ?MODULE:handle_recv_nack_frame/2, [Frame]},
|
||||
receipt_id(Headers),
|
||||
State)
|
||||
end;
|
||||
|
||||
%% BEGIN
|
||||
|
@ -588,9 +602,23 @@ next_ackid() ->
|
|||
put(ackid, AckId + 1),
|
||||
AckId.
|
||||
|
||||
make_mqtt_message(Topic, Headers, Body) ->
|
||||
Msg = emqx_message:make(stomp, Topic, Body),
|
||||
Headers1 = lists:foldl(fun(Key, Headers0) ->
|
||||
make_mqtt_message(Topic, Headers, Body,
|
||||
#pstate{
|
||||
conninfo = #{proto_ver := ProtoVer},
|
||||
clientinfo = #{
|
||||
protocol := Protocol,
|
||||
clientid := ClientId,
|
||||
username := Username,
|
||||
peerhost := PeerHost}}) ->
|
||||
Msg = emqx_message:make(
|
||||
ClientId, ?QOS_0,
|
||||
Topic, Body, #{},
|
||||
#{proto_ver => ProtoVer,
|
||||
protocol => Protocol,
|
||||
username => Username,
|
||||
peerhost => PeerHost}),
|
||||
Headers1 = lists:foldl(
|
||||
fun(Key, Headers0) ->
|
||||
proplists:delete(Key, Headers0)
|
||||
end, Headers, [<<"destination">>,
|
||||
<<"content-length">>,
|
||||
|
@ -611,7 +639,7 @@ handle_recv_send_frame(#stomp_frame{command = <<"SEND">>, headers = Headers, bod
|
|||
allow ->
|
||||
_ = maybe_send_receipt(receipt_id(Headers), State),
|
||||
_ = emqx_broker:publish(
|
||||
make_mqtt_message(Topic, Headers, iolist_to_binary(Body))
|
||||
make_mqtt_message(Topic, Headers, iolist_to_binary(Body), State)
|
||||
),
|
||||
State;
|
||||
deny ->
|
||||
|
|
|
@ -193,6 +193,7 @@
|
|||
username => username(),
|
||||
peerhost => peerhost(),
|
||||
properties => properties(),
|
||||
allow_publish => boolean(),
|
||||
atom() => term()}).
|
||||
|
||||
-type(banned() :: #banned{}).
|
||||
|
|
Loading…
Reference in New Issue