Merge branch 'main-v4.4' into gen-rpc-ssl-4

This commit is contained in:
k32 2021-11-15 13:47:54 +01:00 committed by GitHub
commit 32086f97ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 808 additions and 283 deletions

View File

@ -7,6 +7,12 @@
## Value: single | unknown | sharded | rs
auth.mongo.type = single
## Whether to use SRV and TXT records.
##
## Value: true | false
## Default: false
auth.mongo.srv_record = false
## The set name if type is rs.
##
## Value: String
@ -37,7 +43,6 @@ auth.mongo.pool = 8
## MongoDB AuthSource
##
## Value: String
## Default: mqtt
## auth.mongo.auth_source = admin
## MongoDB database

View File

@ -6,8 +6,12 @@
{datatype, {enum, [single, unknown, sharded, rs]}}
]}.
{mapping, "auth.mongo.srv_record", "emqx_auth_mongo.server", [
{default, false},
{datatype, {enum, [true, false]}}
]}.
{mapping, "auth.mongo.rs_set_name", "emqx_auth_mongo.server", [
{default, "mqtt"},
{datatype, string}
]}.
@ -41,7 +45,6 @@
]}.
{mapping, "auth.mongo.auth_source", "emqx_auth_mongo.server", [
{default, "mqtt"},
{datatype, string}
]}.
@ -101,9 +104,9 @@
]}.
{translation, "emqx_auth_mongo.server", fun(Conf) ->
H = cuttlefish:conf_get("auth.mongo.server", Conf),
Hosts = string:tokens(H, ","),
Type0 = cuttlefish:conf_get("auth.mongo.type", Conf),
SrvRecord = cuttlefish:conf_get("auth.mongo.srv_record", Conf, false),
Server = cuttlefish:conf_get("auth.mongo.server", Conf),
Type = cuttlefish:conf_get("auth.mongo.type", Conf),
Pool = cuttlefish:conf_get("auth.mongo.pool", Conf),
%% FIXME: compatible with 4.0-4.2 version format, plan to delete in 5.0
Login = cuttlefish:conf_get("auth.mongo.username", Conf,
@ -111,7 +114,10 @@
),
Passwd = cuttlefish:conf_get("auth.mongo.password", Conf),
DB = cuttlefish:conf_get("auth.mongo.database", Conf),
AuthSrc = cuttlefish:conf_get("auth.mongo.auth_source", Conf),
AuthSource = case cuttlefish:conf_get("auth.mongo.auth_source", Conf, undefined) of
undefined -> [];
AuthSource0 -> [{auth_source, list_to_binary(AuthSource0)}]
end,
R = cuttlefish:conf_get("auth.mongo.w_mode", Conf),
W = cuttlefish:conf_get("auth.mongo.r_mode", Conf),
Login0 = case Login =:= [] of
@ -156,8 +162,8 @@
false -> []
end,
WorkerOptions = [{database, list_to_binary(DB)}, {auth_source, list_to_binary(AuthSrc)}]
++ Login0 ++ Passwd0 ++ W0 ++ R0 ++ Ssl,
WorkerOptions = [{database, list_to_binary(DB)}]
++ Login0 ++ Passwd0 ++ W0 ++ R0 ++ Ssl ++ AuthSource,
Vars = cuttlefish_variable:fuzzy_matches(["auth", "mongo", "topology", "$name"], Conf),
Options = lists:map(fun({_, Name}) ->
@ -174,16 +180,17 @@
{list_to_atom(Name2), cuttlefish:conf_get("auth.mongo.topology."++Name, Conf)}
end, Vars),
Type = case Type0 =:= rs of
true -> {Type0, list_to_binary(cuttlefish:conf_get("auth.mongo.rs_set_name", Conf))};
false -> Type0
end,
[{type, Type},
{hosts, Hosts},
ReplicaSet = case cuttlefish:conf_get("auth.mongo.rs_set_name", Conf, undefined) of
undefined -> [];
ReplicaSet0 -> [{rs_set_name, list_to_binary(ReplicaSet0)}]
end,
[{srv_record, SrvRecord},
{type, Type},
{server, Server},
{options, Options},
{worker_options, WorkerOptions},
{auto_reconnect, 1},
{pool_size, Pool}]
{pool_size, Pool}] ++ ReplicaSet
end}.
%% The mongodb operation timeout is specified by the value of `cursor_timeout` from application config,

View File

@ -1,6 +1,6 @@
{application, emqx_auth_mongo,
[{description, "EMQ X Authentication/ACL with MongoDB"},
{vsn, "4.3.0"}, % strict semver, bump manually!
{vsn, "4.4.0"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_auth_mongo_sup]},
{applications, [kernel,stdlib,mongodb,ecpool]},

View File

@ -28,7 +28,96 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
{ok, PoolEnv} = application:get_env(?APP, server),
PoolSpec = ecpool:pool_spec(?APP, ?APP, ?APP, PoolEnv),
{ok, Opts} = application:get_env(?APP, server),
NOpts = may_parse_srv_and_txt_records(Opts),
PoolSpec = ecpool:pool_spec(?APP, ?APP, ?APP, NOpts),
{ok, {{one_for_all, 10, 100}, [PoolSpec]}}.
may_parse_srv_and_txt_records(Opts) when is_list(Opts) ->
maps:to_list(may_parse_srv_and_txt_records(maps:from_list(Opts)));
may_parse_srv_and_txt_records(#{type := Type,
srv_record := false,
server := Server} = Opts) ->
Hosts = to_hosts(Server),
case Type =:= rs of
true ->
case maps:get(rs_set_name, Opts, undefined) of
undefined ->
error({missing_parameter, rs_set_name});
ReplicaSet ->
Opts#{type => {rs, ReplicaSet},
hosts => Hosts}
end;
false ->
Opts#{hosts => Hosts}
end;
may_parse_srv_and_txt_records(#{type := Type,
srv_record := true,
server := Server,
worker_options := WorkerOptions} = Opts) ->
Hosts = parse_srv_records(Server),
Opts0 = parse_txt_records(Type, Server),
NWorkerOptions = maps:to_list(maps:merge(maps:from_list(WorkerOptions), maps:with([auth_source], Opts0))),
NOpts = Opts#{hosts => Hosts, worker_options => NWorkerOptions},
case Type =:= rs of
true ->
case maps:get(rs_set_name, Opts0, maps:get(rs_set_name, NOpts, undefined)) of
undefined ->
error({missing_parameter, rs_set_name});
ReplicaSet ->
NOpts#{type => {Type, ReplicaSet}}
end;
false ->
NOpts
end.
to_hosts(Server) ->
[string:trim(H) || H <- string:tokens(Server, ",")].
parse_srv_records(Server) ->
case inet_res:lookup("_mongodb._tcp." ++ Server, in, srv) of
[] ->
error(service_not_found);
Services ->
[Host ++ ":" ++ integer_to_list(Port) || {_, _, Port, Host} <- Services]
end.
parse_txt_records(Type, Server) ->
case inet_res:lookup(Server, in, txt) of
[] ->
#{};
[[QueryString]] ->
case uri_string:dissect_query(QueryString) of
{error, _, _} ->
error({invalid_txt_record, invalid_query_string});
Options ->
Fields = case Type of
rs -> ["authSource", "replicaSet"];
_ -> ["authSource"]
end,
take_and_convert(Fields, Options)
end;
_ ->
error({invalid_txt_record, multiple_records})
end.
take_and_convert(Fields, Options) ->
take_and_convert(Fields, Options, #{}).
take_and_convert([], [_ | _], _Acc) ->
error({invalid_txt_record, invalid_option});
take_and_convert([], [], Acc) ->
Acc;
take_and_convert([Field | More], Options, Acc) ->
case lists:keytake(Field, 1, Options) of
{value, {"authSource", V}, NOptions} ->
take_and_convert(More, NOptions, Acc#{auth_source => list_to_binary(V)});
{value, {"replicaSet", V}, NOptions} ->
take_and_convert(More, NOptions, Acc#{rs_set_name => list_to_binary(V)});
{value, _, _} ->
error({invalid_txt_record, invalid_option});
false ->
take_and_convert(More, Options, Acc)
end.

View File

@ -1,6 +1,6 @@
{application, emqx_coap,
[{description, "EMQ X CoAP Gateway"},
{vsn, "4.3.0"}, % strict semver, bump manually!
{vsn, "4.3.1"}, % strict semver, bump manually!
{modules, []},
{registered, []},
{applications, [kernel,stdlib,gen_coap]},

View File

@ -0,0 +1,9 @@
%% -*-: erlang -*-
{VSN,
[{"4.3.0",[
{load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []}]},
{<<".*">>, []}],
[{"4.3.0",[
{load_module, emqx_coap_mqtt_adapter, brutal_purge, soft_purge, []}]},
{<<".*">>, []}]
}.

View File

@ -58,6 +58,8 @@
-define(SUBOPTS, #{rh => 0, rap => 0, nl => 0, qos => ?QOS_0, is_new => false}).
-define(PROTO_VER, 1).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
@ -139,7 +141,7 @@ handle_call({subscribe, Topic, CoapPid}, _From, State=#state{sub_topics = TopicL
NewTopics = proplists:delete(Topic, TopicList),
IsWild = emqx_topic:wildcard(Topic),
{reply, chann_subscribe(Topic, State), State#state{sub_topics =
[{Topic, {IsWild, CoapPid}}|NewTopics]}, hibernate};
[{Topic, {IsWild, CoapPid}} | NewTopics]}, hibernate};
handle_call({unsubscribe, Topic, _CoapPid}, _From, State=#state{sub_topics = TopicList}) ->
NewTopics = proplists:delete(Topic, TopicList),
@ -244,15 +246,26 @@ chann_publish(Topic, Payload, State = #state{clientid = ClientId}) ->
case emqx_access_control:check_acl(clientinfo(State), publish, Topic) of
allow ->
_ = emqx_broker:publish(
emqx_message:set_flag(retain, false,
emqx_message:make(ClientId, ?QOS_0, Topic, Payload))),
ok;
packet_to_message(Topic, Payload, State)), ok;
deny ->
?LOG(warning, "publish to ~p by clientid ~p failed due to acl check.",
[Topic, ClientId]),
{error, forbidden}
end.
packet_to_message(Topic, Payload,
#state{clientid = ClientId,
username = Username,
peername = {PeerHost, _}}) ->
Message = emqx_message:set_flag(
retain, false,
emqx_message:make(ClientId, ?QOS_0, Topic, Payload)
),
emqx_message:set_headers(
#{ proto_ver => ?PROTO_VER
, protocol => coap
, username => Username
, peerhost => PeerHost}, Message).
%%--------------------------------------------------------------------
%% Deliver
@ -270,7 +283,7 @@ do_deliver({Topic, Payload}, Subscribers) ->
deliver_to_coap(_TopicName, _Payload, []) ->
ok;
deliver_to_coap(TopicName, Payload, [{TopicFilter, {IsWild, CoapPid}}|T]) ->
deliver_to_coap(TopicName, Payload, [{TopicFilter, {IsWild, CoapPid}} | T]) ->
Matched = case IsWild of
true -> emqx_topic:match(TopicName, TopicFilter);
false -> TopicName =:= TopicFilter
@ -324,7 +337,7 @@ conninfo(#state{peername = Peername,
peercert => nossl, %% TODO: dtls
conn_mod => ?MODULE,
proto_name => <<"CoAP">>,
proto_ver => 1,
proto_ver => ?PROTO_VER,
clean_start => true,
clientid => ClientId,
username => undefined,
@ -384,4 +397,3 @@ clientinfo(#state{peername = {PeerHost, _},
mountpoint => undefined,
ws_cookie => undefined
}.

View File

@ -24,6 +24,11 @@
## Value: false | Duration
#exhook.auto_reconnect = 60s
## The process pool size for gRPC client
##
## Default: Equals cpu cores
## Value: Integer
#exhook.pool_size = 16
##--------------------------------------------------------------------
## The Hook callback servers

View File

@ -26,6 +26,10 @@
end
end}.
{mapping, "exhook.pool_size", "emqx_exhook.pool_size", [
{datatype, integer}
]}.
{mapping, "exhook.server.$name.url", "emqx_exhook.servers", [
{datatype, string}
]}.

View File

@ -358,6 +358,31 @@ message Message {
bytes payload = 6;
uint64 timestamp = 7;
// The key of header can be:
// - username:
// * Readonly
// * The username of sender client
// * Value type: utf8 string
// - protocol:
// * Readonly
// * The protocol name of sender client
// * Value type: string enum with "mqtt", "mqtt-sn", ...
// - peerhost:
// * Readonly
// * The peerhost of sender client
// * Value type: ip address string
// - allow_publish:
// * Writable
// * Whether to allow the message to be published by emqx
// * Value type: string enum with "true", "false", default is "true"
//
// Notes: All header may be missing, which means that the message does not
// carry these headers. We can guarantee that clients coming from MQTT,
// MQTT-SN, CoAP, LwM2M and other natively supported protocol clients will
// carry these headers, but there is no guarantee that messages published
// by other means will do, e.g. messages published by HTTP-API
map<string, string> headers = 8;
}
message Property {

View File

@ -5,7 +5,7 @@
]}.
{deps,
[{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.3"}}}
[{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.4"}}}
]}.
{grpc,

View File

@ -1,6 +1,6 @@
{application, emqx_exhook,
[{description, "EMQ X Extension for Hook"},
{vsn, "4.3.4"},
{vsn, "4.4.0"},
{modules, []},
{registered, []},
{mod, {emqx_exhook_app, []}},

View File

@ -1,15 +1,7 @@
%% -*-: erlang -*-
{VSN,
[
{<<"4.3.[0-3]">>, [
{restart_application, emqx_exhook}
]},
{<<".*">>, []}
[{<<".*">>, []}
],
[
{<<"4.3.[0-3]">>, [
{restart_application, emqx_exhook}
]},
{<<".*">>, []}
[{<<".*">>, []}
]
}.

View File

@ -50,6 +50,7 @@
%% Utils
-export([ message/1
, headers/1
, stringfy/1
, merge_responsed_bool/2
, merge_responsed_message/2
@ -62,6 +63,8 @@
, call_fold/3
]).
-elvis([{elvis_style, god_modules, disable}]).
%%--------------------------------------------------------------------
%% Clients
%%--------------------------------------------------------------------
@ -258,17 +261,58 @@ clientinfo(ClientInfo =
cn => maybe(maps:get(cn, ClientInfo, undefined)),
dn => maybe(maps:get(dn, ClientInfo, undefined))}.
message(#message{id = Id, qos = Qos, from = From, topic = Topic, payload = Payload, timestamp = Ts}) ->
message(#message{id = Id, qos = Qos, from = From, topic = Topic,
payload = Payload, timestamp = Ts, headers = Headers}) ->
#{node => stringfy(node()),
id => emqx_guid:to_hexstr(Id),
qos => Qos,
from => stringfy(From),
topic => Topic,
payload => Payload,
timestamp => Ts}.
timestamp => Ts,
headers => headers(Headers)
}.
assign_to_message(#{qos := Qos, topic := Topic, payload := Payload}, Message) ->
Message#message{qos = Qos, topic = Topic, payload = Payload}.
headers(Headers) ->
Ls = [username, protocol, peerhost, allow_publish],
maps:fold(
fun
(_, undefined, Acc) ->
Acc; %% Ignore undefined value
(K, V, Acc) ->
case lists:member(K, Ls) of
true ->
Acc#{atom_to_binary(K) => bin(K, V)};
_ ->
Acc
end
end, #{}, Headers).
bin(K, V) when K == username;
K == protocol;
K == allow_publish ->
bin(V);
bin(peerhost, V) ->
bin(inet:ntoa(V)).
bin(V) when is_binary(V) -> V;
bin(V) when is_atom(V) -> atom_to_binary(V);
bin(V) when is_list(V) -> iolist_to_binary(V).
assign_to_message(InMessage = #{qos := Qos, topic := Topic,
payload := Payload}, Message) ->
NMsg = Message#message{qos = Qos, topic = Topic, payload = Payload},
enrich_header(maps:get(headers, InMessage, #{}), NMsg).
enrich_header(Headers, Message) ->
case maps:get(<<"allow_publish">>, Headers, undefined) of
<<"false">> ->
emqx_message:set_header(allow_publish, false, Message);
<<"true">> ->
emqx_message:set_header(allow_publish, true, Message);
_ ->
Message
end.
topicfilters(Tfs) when is_list(Tfs) ->
[#{name => Topic, qos => Qos} || {Topic, #{qos := Qos}} <- Tfs].
@ -299,11 +343,7 @@ merge_responsed_bool(_Req, #{type := 'IGNORE'}) ->
ignore;
merge_responsed_bool(Req, #{type := Type, value := {bool_result, NewBool}})
when is_boolean(NewBool) ->
NReq = Req#{result => NewBool},
case Type of
'CONTINUE' -> {ok, NReq};
'STOP_AND_RETURN' -> {stop, NReq}
end;
{ret(Type), Req#{result => NewBool}};
merge_responsed_bool(_Req, Resp) ->
?LOG(warning, "Unknown responsed value ~0p to merge to callback chain", [Resp]),
ignore.
@ -311,11 +351,10 @@ merge_responsed_bool(_Req, Resp) ->
merge_responsed_message(_Req, #{type := 'IGNORE'}) ->
ignore;
merge_responsed_message(Req, #{type := Type, value := {message, NMessage}}) ->
NReq = Req#{message => NMessage},
case Type of
'CONTINUE' -> {ok, NReq};
'STOP_AND_RETURN' -> {stop, NReq}
end;
{ret(Type), Req#{message => NMessage}};
merge_responsed_message(_Req, Resp) ->
?LOG(warning, "Unknown responsed value ~0p to merge to callback chain", [Resp]),
ignore.
ret('CONTINUE') -> ok;
ret('STOP_AND_RETURN') -> stop.

View File

@ -36,6 +36,8 @@
, server/1
, put_request_failed_action/1
, get_request_failed_action/0
, put_pool_size/1
, get_pool_size/0
]).
%% gen_server callbacks
@ -84,11 +86,11 @@
start_link(Servers, AutoReconnect, ReqOpts) ->
gen_server:start_link(?MODULE, [Servers, AutoReconnect, ReqOpts], []).
-spec enable(pid(), atom()|string()) -> ok | {error, term()}.
-spec enable(pid(), atom() | string()) -> ok | {error, term()}.
enable(Pid, Name) ->
call(Pid, {load, Name}).
-spec disable(pid(), atom()|string()) -> ok | {error, term()}.
-spec disable(pid(), atom() | string()) -> ok | {error, term()}.
disable(Pid, Name) ->
call(Pid, {unload, Name}).
@ -117,6 +119,9 @@ init([Servers, AutoReconnect, ReqOpts0]) ->
put_request_failed_action(
maps:get(request_failed_action, ReqOpts0, deny)
),
put_pool_size(
maps:get(pool_size, ReqOpts0, erlang:system_info(schedulers))
),
%% Load the hook servers
ReqOpts = maps:without([request_failed_action], ReqOpts0),
@ -136,7 +141,7 @@ load_all_servers(Servers, ReqOpts) ->
load_all_servers(Servers, ReqOpts, #{}, #{}).
load_all_servers([], _Request, Waiting, Running) ->
{Waiting, Running};
load_all_servers([{Name, Options}|More], ReqOpts, Waiting, Running) ->
load_all_servers([{Name, Options} | More], ReqOpts, Waiting, Running) ->
{NWaiting, NRunning} =
case emqx_exhook_server:load(Name, Options, ReqOpts) of
{ok, ServerState} ->
@ -286,6 +291,14 @@ put_request_failed_action(Val) ->
get_request_failed_action() ->
persistent_term:get({?APP, request_failed_action}).
put_pool_size(Val) ->
persistent_term:put({?APP, pool_size}, Val).
get_pool_size() ->
%% Avoid the scenario that the parameter is not set after
%% the hot upgrade completed.
persistent_term:get({?APP, pool_size}, erlang:system_info(schedulers)).
save(Name, ServerState) ->
Saved = persistent_term:get(?APP, []),
persistent_term:put(?APP, lists:reverse([Name | Saved])),

View File

@ -77,6 +77,8 @@
-dialyzer({nowarn_function, [inc_metrics/2]}).
-elvis([{elvis_style, dont_repeat_yourself, disable}]).
%%--------------------------------------------------------------------
%% Load/Unload APIs
%%--------------------------------------------------------------------
@ -125,13 +127,18 @@ channel_opts(Opts) ->
SvrAddr = format_http_uri(Scheme, Host, Port),
ClientOpts = case Scheme of
https ->
SslOpts = lists:keydelete(ssl, 1, proplists:get_value(ssl_options, Opts, [])),
SslOpts = lists:keydelete(
ssl,
1,
proplists:get_value(ssl_options, Opts, [])
),
#{gun_opts =>
#{transport => ssl,
transport_opts => SslOpts}};
_ -> #{}
end,
{SvrAddr, ClientOpts}.
NClientOpts = ClientOpts#{pool_size => emqx_exhook_mngr:get_pool_size()},
{SvrAddr, NClientOpts}.
format_http_uri(Scheme, Host0, Port) ->
Host = case is_tuple(Host0) of
@ -174,16 +181,18 @@ resovle_hookspec(HookSpecs) when is_list(HookSpecs) ->
case maps:get(name, HookSpec, undefined) of
undefined -> Acc;
Name0 ->
Name = try binary_to_existing_atom(Name0, utf8) catch T:R:_ -> {T,R} end,
case lists:member(Name, AvailableHooks) of
true ->
case lists:member(Name, MessageHooks) of
true ->
Acc#{Name => #{topics => maps:get(topics, HookSpec, [])}};
_ ->
Acc#{Name => #{}}
end;
_ -> error({unknown_hookpoint, Name})
Name = try
binary_to_existing_atom(Name0, utf8)
catch T:R -> {T,R}
end,
case {lists:member(Name, AvailableHooks),
lists:member(Name, MessageHooks)} of
{false, _} ->
error({unknown_hookpoint, Name});
{true, false} ->
Acc#{Name => #{}};
{true, true} ->
Acc#{Name => #{topics => maps:get(topics, HookSpec, [])}}
end
end
end, #{}, HookSpecs).
@ -255,7 +264,7 @@ call(Hookpoint, Req, #server{name = ChannName, options = ReqOpts,
%% @private
inc_metrics(IncFun, Name) when is_function(IncFun) ->
%% BACKW: e4.2.0-e4.2.2
{env, [Prefix|_]} = erlang:fun_info(IncFun, env),
{env, [Prefix | _]} = erlang:fun_info(IncFun, env),
inc_metrics(Prefix, Name);
inc_metrics(Prefix, Name) when is_list(Prefix) ->
emqx_metrics:inc(list_to_atom(Prefix ++ atom_to_list(Name))).
@ -271,8 +280,8 @@ do_call(ChannName, Fun, Req, ReqOpts) ->
Options = ReqOpts#{channel => ChannName},
?LOG(debug, "Call ~0p:~0p(~0p, ~0p)", [?PB_CLIENT_MOD, Fun, Req, Options]),
case catch apply(?PB_CLIENT_MOD, Fun, [Req, Options]) of
{ok, Resp, _Metadata} ->
?LOG(debug, "Response {ok, ~0p, ~0p}", [Resp, _Metadata]),
{ok, Resp, Metadata} ->
?LOG(debug, "Response {ok, ~0p, ~0p}", [Resp, Metadata]),
{ok, Resp};
{error, {Code, Msg}, _Metadata} ->
?LOG(error, "CALL ~0p:~0p(~0p, ~0p) response errcode: ~0p, errmsg: ~0p",

View File

@ -54,7 +54,8 @@ auto_reconnect() ->
request_options() ->
#{timeout => env(request_timeout, 5000),
request_failed_action => env(request_failed_action, deny)
request_failed_action => env(request_failed_action, deny),
pool_size => env(pool_size, erlang:system_info(schedulers))
}.
env(Key, Def) ->
@ -67,7 +68,7 @@ env(Key, Def) ->
-spec start_grpc_client_channel(
string(),
uri_string:uri_string(),
grpc_client:options()) -> {ok, pid()} | {error, term()}.
grpc_client_sup:options()) -> {ok, pid()} | {error, term()}.
start_grpc_client_channel(Name, SvrAddr, Options) ->
grpc_client_sup:create_channel_pool(Name, SvrAddr, Options).

View File

@ -299,21 +299,31 @@ on_message_publish(#{message := #{from := From} = Msg} = Req, Md) ->
%% some cases for testing
case From of
<<"baduser">> ->
NMsg = Msg#{qos => 0,
NMsg = deny(Msg#{qos => 0,
topic => <<"">>,
payload => <<"">>
},
}),
{ok, #{type => 'STOP_AND_RETURN',
value => {message, NMsg}}, Md};
<<"gooduser">> ->
NMsg = Msg#{topic => From,
payload => From},
NMsg = allow(Msg#{topic => From,
payload => From}),
{ok, #{type => 'STOP_AND_RETURN',
value => {message, NMsg}}, Md};
_ ->
{ok, #{type => 'IGNORE'}, Md}
end.
deny(Msg) ->
NHeader = maps:put(<<"allow_publish">>, <<"false">>,
maps:get(headers, Msg, #{})),
maps:put(headers, NHeader, Msg).
allow(Msg) ->
NHeader = maps:put(<<"allow_publish">>, <<"true">>,
maps:get(headers, Msg, #{})),
maps:put(headers, NHeader, Msg).
-spec on_message_delivered(emqx_exhook_pb:message_delivered_request(), grpc:metadata())
-> {ok, emqx_exhook_pb:empty_success(), grpc:metadata()}
| {error, grpc_cowboy_h:error_response()}.

View File

@ -299,19 +299,24 @@ prop_message_publish() ->
_ ->
ExpectedOutMsg = case emqx_message:from(Msg) of
<<"baduser">> ->
MsgMap = emqx_message:to_map(Msg),
MsgMap = #{headers := Headers}
= emqx_message:to_map(Msg),
emqx_message:from_map(
MsgMap#{qos => 0,
topic => <<"">>,
payload => <<"">>
payload => <<"">>,
headers => maps:put(allow_publish, false, Headers)
});
<<"gooduser">> = From ->
MsgMap = emqx_message:to_map(Msg),
MsgMap = #{headers := Headers}
= emqx_message:to_map(Msg),
emqx_message:from_map(
MsgMap#{topic => From,
payload => From
payload => From,
headers => maps:put(allow_publish, true, Headers)
});
_ -> Msg
_ ->
Msg
end,
?assertEqual(ExpectedOutMsg, OutMsg),
@ -464,7 +469,9 @@ from_message(Msg) ->
from => stringfy(emqx_message:from(Msg)),
topic => emqx_message:topic(Msg),
payload => emqx_message:payload(Msg),
timestamp => emqx_message:timestamp(Msg)
timestamp => emqx_message:timestamp(Msg),
headers => emqx_exhook_handler:headers(
emqx_message:get_headers(Msg))
}.
%%--------------------------------------------------------------------

View File

@ -13,7 +13,7 @@
]}.
{deps,
[{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.3"}}}
[{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.4"}}}
]}.
{grpc,

View File

@ -1,6 +1,6 @@
{application, emqx_exproto,
[{description, "EMQ X Extension for Protocol"},
{vsn, "4.3.4"}, %% 4.3.3 is used by ee
{vsn, "4.3.5"}, %% 4.3.3 is used by ee
{modules, []},
{registered, []},
{mod, {emqx_exproto_app, []}},

View File

@ -1,6 +1,8 @@
%% -*- mode: erlang -*-
{VSN,
[{"4.3.3",
[{"4.3.4",
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{"4.3.3",
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{"4.3.2",
@ -12,7 +14,9 @@
{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.3.3",
[{"4.3.4",
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{"4.3.3",
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{"4.3.2",

View File

@ -340,17 +340,14 @@ handle_call({unsubscribe, TopicFilter},
handle_call({publish, Topic, Qos, Payload},
Channel = #channel{
conn_state = connected,
clientinfo = ClientInfo
= #{clientid := From,
mountpoint := Mountpoint}}) ->
clientinfo = ClientInfo}) ->
case is_acl_enabled(ClientInfo) andalso
emqx_access_control:check_acl(ClientInfo, publish, Topic) of
deny ->
{reply, {error, ?RESP_PERMISSION_DENY, <<"ACL deny">>}, Channel};
_ ->
Msg = emqx_message:make(From, Qos, Topic, Payload),
NMsg = emqx_mountpoint:mount(Mountpoint, Msg),
_ = emqx:publish(NMsg),
Msg = packet_to_message(Topic, Qos, Payload, Channel),
_ = emqx:publish(Msg),
{reply, ok, Channel}
end;
@ -419,6 +416,24 @@ is_anonymous(_AuthResult) -> false.
clean_anonymous_clients() ->
ets:delete(?CHAN_CONN_TAB, ?CHANMOCK(self())).
packet_to_message(Topic, Qos, Payload,
#channel{
conninfo = #{proto_ver := ProtoVer},
clientinfo = #{
protocol := Protocol,
clientid := ClientId,
username := Username,
peerhost := PeerHost,
mountpoint := Mountpoint}}) ->
Msg = emqx_message:make(
ClientId, Qos,
Topic, Payload, #{},
#{proto_ver => ProtoVer,
protocol => Protocol,
username => Username,
peerhost => PeerHost}),
emqx_mountpoint:mount(Mountpoint, Msg).
%%--------------------------------------------------------------------
%% Sub/UnSub
%%--------------------------------------------------------------------

View File

@ -1,6 +1,6 @@
{application,emqx_lwm2m,
[{description,"EMQ X LwM2M Gateway"},
{vsn, "4.3.4"}, % strict semver, bump manually!
{vsn, "4.3.5"}, % strict semver, bump manually!
{modules,[]},
{registered,[emqx_lwm2m_sup]},
{applications,[kernel,stdlib,lwm2m_coap]},

View File

@ -1,5 +1,5 @@
%% -*-: erlang -*-
{"4.3.4",
{VSN,
[
{<<"4\\.3\\.[0-1]">>, [
{restart_application, emqx_lwm2m}
@ -7,7 +7,10 @@
{"4.3.2", [
{load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []}
]},
{"4.3.3", []} %% only config change
{"4.3.3", []}, %% only config change
{"4.3.4", [
{load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []}
]}
],
[
{<<"4\\.3\\.[0-1]">>, [
@ -16,6 +19,9 @@
{"4.3.2", [
{load_module, emqx_lwm2m_message, brutal_purge, soft_purge, []}
]},
{"4.3.3", []} %% only config change
{"4.3.3", []}, %% only config change
{"4.3.4", [
{load_module, emqx_lwm2m_protocol, brutal_purge, soft_purge, []}
]}
]
}.

View File

@ -74,7 +74,8 @@ call(Pid, Msg, Timeout) ->
Error -> {error, Error}
end.
init(CoapPid, EndpointName, Peername = {_Peerhost, _Port}, RegInfo = #{<<"lt">> := LifeTime, <<"lwm2m">> := Ver}) ->
init(CoapPid, EndpointName, Peername = {_Peerhost, _Port},
RegInfo = #{<<"lt">> := LifeTime, <<"lwm2m">> := Ver}) ->
Mountpoint = proplists:get_value(mountpoint, lwm2m_coap_responder:options(), ""),
Lwm2mState = #lwm2m_state{peername = Peername,
endpoint_name = EndpointName,
@ -103,9 +104,10 @@ init(CoapPid, EndpointName, Peername = {_Peerhost, _Port}, RegInfo = #{<<"lt">>
emqx_cm:register_channel(EndpointName, CoapPid, conninfo(Lwm2mState1))
end),
emqx_cm:insert_channel_info(EndpointName, info(Lwm2mState1), stats(Lwm2mState1)),
emqx_lwm2m_cm:register_channel(EndpointName, RegInfo, LifeTime, Ver, Peername),
emqx_lwm2m_cm:register_channel(EndpointName, RegInfo, LifeTime, Ver, Peername),
{ok, Lwm2mState1#lwm2m_state{life_timer = emqx_lwm2m_timer:start_timer(LifeTime, {life_timer, expired})}};
NTimer = emqx_lwm2m_timer:start_timer(LifeTime, {life_timer, expired}),
{ok, Lwm2mState1#lwm2m_state{life_timer = NTimer}};
{error, Error} ->
_ = run_hooks('client.connack', [conninfo(Lwm2mState), not_authorized], undefined),
{error, Error}
@ -133,7 +135,7 @@ update_reg_info(NewRegInfo, Lwm2mState=#lwm2m_state{life_timer = LifeTimer, regi
%% - report the registration info update, but only when objectList is updated.
case NewRegInfo of
#{<<"objectList">> := _} ->
emqx_lwm2m_cm:update_reg_info(Epn, NewRegInfo),
emqx_lwm2m_cm:update_reg_info(Epn, NewRegInfo),
send_to_broker(<<"update">>, #{<<"data">> => UpdatedRegInfo}, Lwm2mState);
_ -> ok
end
@ -186,7 +188,8 @@ deliver(#message{topic = Topic, payload = Payload},
started_at = StartedAt,
endpoint_name = EndpointName}) ->
IsCacheMode = is_cache_mode(RegInfo, StartedAt),
?LOG(debug, "Get MQTT message from broker, IsCacheModeNow?: ~p, Topic: ~p, Payload: ~p", [IsCacheMode, Topic, Payload]),
?LOG(debug, "Get MQTT message from broker, IsCacheModeNow?: ~p, "
"Topic: ~p, Payload: ~p", [IsCacheMode, Topic, Payload]),
AlternatePath = maps:get(<<"alternatePath">>, RegInfo, <<"/">>),
deliver_to_coap(AlternatePath, Payload, CoapPid, IsCacheMode, EndpointName),
Lwm2mState.
@ -235,8 +238,20 @@ unsubscribe(Topic, Lwm2mState = #lwm2m_state{endpoint_name = _EndpointName}) ->
emqx_broker:unsubscribe(Topic),
emqx_hooks:run('session.unsubscribed', [clientinfo(Lwm2mState), Topic, Opts]).
publish(Topic, Payload, Qos, EndpointName) ->
emqx_broker:publish(emqx_message:set_flag(retain, false, emqx_message:make(EndpointName, Qos, Topic, Payload))).
publish(Topic, Payload, Qos,
#lwm2m_state{
version = ProtoVer,
peername = {PeerHost, _},
endpoint_name = EndpointName}) ->
Message = emqx_message:set_flag(
retain, false,
emqx_message:make(EndpointName, Qos, Topic, Payload)
),
NMessage = emqx_message:set_headers(
#{proto_ver => ProtoVer,
protocol => lwm2m,
peerhost => PeerHost}, Message),
emqx_broker:publish(NMessage).
time_now() -> erlang:system_time(millisecond).
@ -244,7 +259,8 @@ time_now() -> erlang:system_time(millisecond).
%% Deliver downlink message to coap
%%--------------------------------------------------------------------
deliver_to_coap(AlternatePath, JsonData, CoapPid, CacheMode, EndpointName) when is_binary(JsonData)->
deliver_to_coap(AlternatePath, JsonData,
CoapPid, CacheMode, EndpointName) when is_binary(JsonData)->
try
TermData = emqx_json:decode(JsonData, [return_maps]),
deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName)
@ -273,7 +289,8 @@ deliver_to_coap(AlternatePath, TermData, CoapPid, CacheMode, EndpointName) when
send_to_broker(EventType, Payload = #{}, Lwm2mState) ->
do_send_to_broker(EventType, Payload, Lwm2mState).
do_send_to_broker(EventType, #{<<"data">> := Data} = Payload, #lwm2m_state{endpoint_name = EndpointName} = Lwm2mState) ->
do_send_to_broker(EventType, #{<<"data">> := Data} = Payload,
#lwm2m_state{endpoint_name = EndpointName} = Lwm2mState) ->
ReqPath = maps:get(<<"reqPath">>, Data, undefined),
Code = maps:get(<<"code">>, Data, undefined),
CodeMsg = maps:get(<<"codeMsg">>, Data, undefined),
@ -281,7 +298,7 @@ do_send_to_broker(EventType, #{<<"data">> := Data} = Payload, #lwm2m_state{endpo
emqx_lwm2m_cm:register_cmd(EndpointName, ReqPath, EventType, {Code, CodeMsg, Content}),
NewPayload = maps:put(<<"msgType">>, EventType, Payload),
Topic = uplink_topic(EventType, Lwm2mState),
publish(Topic, emqx_json:encode(NewPayload), _Qos = 0, Lwm2mState#lwm2m_state.endpoint_name).
publish(Topic, emqx_json:encode(NewPayload), _Qos = 0, Lwm2mState).
%%--------------------------------------------------------------------
%% Auto Observe
@ -315,18 +332,27 @@ auto_observe(AlternatePath, ObjectList, CoapPid, EndpointName) ->
observe_object_list(AlternatePath, ObjectList, CoapPid, EndpointName) ->
lists:foreach(fun(ObjectPath) ->
[ObjId| LastPath] = emqx_lwm2m_cmd_handler:path_list(ObjectPath),
[ObjId | LastPath] = emqx_lwm2m_cmd_handler:path_list(ObjectPath),
case ObjId of
<<"19">> ->
[ObjInsId | _LastPath1] = LastPath,
case ObjInsId of
<<"0">> ->
observe_object_slowly(AlternatePath, <<"/19/0/0">>, CoapPid, 100, EndpointName);
observe_object_slowly(
AlternatePath, <<"/19/0/0">>,
CoapPid, 100, EndpointName
);
_ ->
observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName)
observe_object_slowly(
AlternatePath, ObjectPath,
CoapPid, 100, EndpointName
)
end;
_ ->
observe_object_slowly(AlternatePath, ObjectPath, CoapPid, 100, EndpointName)
observe_object_slowly(
AlternatePath, ObjectPath,
CoapPid, 100, EndpointName
)
end
end, ObjectList).
@ -380,11 +406,12 @@ get_cached_downlink_messages() ->
is_cache_mode(RegInfo, StartedAt) ->
case is_psm(RegInfo) orelse is_qmode(RegInfo) of
true ->
QModeTimeWind = proplists:get_value(qmode_time_window, lwm2m_coap_responder:options(), 22),
Now = time_now(),
if (Now - StartedAt) >= QModeTimeWind -> true;
true -> false
end;
QModeTimeWind = proplists:get_value(
qmode_time_window,
lwm2m_coap_responder:options(),
22
),
(time_now() - StartedAt) >= QModeTimeWind;
false -> false
end.

View File

@ -24,7 +24,7 @@
-logger_header("[SLOW TOPICS]").
-export([ start_link/1, on_publish_done/5, enable/0
-export([ start_link/1, on_publish_done/3, enable/0
, disable/0, clear_history/0
]).
@ -42,7 +42,7 @@
-type state() :: #{ config := proplist:proplist()
, period := pos_integer()
, last_tick_at := pos_integer()
, counter := counters:counter_ref()
, counter := counters:counters_ref()
, enable := boolean()
}.
@ -70,6 +70,13 @@
-type slow_log() :: #slow_log{}.
-type top_k_map() :: #{emqx_types:topic() => top_k()}.
-type publish_done_env() :: #{ ignore_before_create := boolean()
, threshold := pos_integer()
, counter := counters:counters_ref()
}.
-type publish_done_args() :: #{session_rebirth_time => pos_integer()}.
-ifdef(TEST).
-define(TOPK_ACCESS, public).
-else.
@ -90,13 +97,16 @@
start_link(Env) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Env], []).
-spec on_publish_done(message(),
pos_integer(), boolean(), pos_integer(), counters:counters_ref()) -> ok.
on_publish_done(#message{timestamp = Timestamp}, Created, IgnoreBeforeCreate, _, _)
-spec on_publish_done(message(), publish_done_args(), publish_done_env()) -> ok.
on_publish_done(#message{timestamp = Timestamp},
#{session_rebirth_time := Created},
#{ignore_before_create := IgnoreBeforeCreate})
when IgnoreBeforeCreate, Timestamp < Created ->
ok;
on_publish_done(#message{timestamp = Timestamp} = Msg, _, _, Threshold, Counter) ->
on_publish_done(#message{timestamp = Timestamp} = Msg,
_,
#{threshold := Threshold, counter := Counter}) ->
case ?NOW - Timestamp of
Elapsed when Elapsed > Threshold ->
case get_log_quota(Counter) of
@ -202,7 +212,7 @@ init_topk_tab(_) ->
, {read_concurrency, true}
]).
-spec get_log_quota(counter:counter_ref()) -> boolean().
-spec get_log_quota(counters:counters_ref()) -> boolean().
get_log_quota(Counter) ->
case counters:get(Counter, ?QUOTA_IDX) of
Quota when Quota > 0 ->
@ -212,7 +222,7 @@ get_log_quota(Counter) ->
false
end.
-spec set_log_quota(proplists:proplist(), counter:counter_ref()) -> ok.
-spec set_log_quota(proplists:proplist(), counters:counters_ref()) -> ok.
set_log_quota(Cfg, Counter) ->
MaxLogNum = get_value(max_log_num, Cfg),
counters:put(Counter, ?QUOTA_IDX, MaxLogNum).
@ -328,12 +338,15 @@ publish(TickTime, Cfg, Notices) ->
load(IgnoreBeforeCreate, Threshold, Counter) ->
_ = emqx:hook('message.publish_done',
fun ?MODULE:on_publish_done/5,
[IgnoreBeforeCreate, Threshold, Counter]),
fun ?MODULE:on_publish_done/3,
[#{ignore_before_create => IgnoreBeforeCreate,
threshold => Threshold,
counter => Counter}
]),
ok.
unload() ->
emqx:unhook('message.publish_done', fun ?MODULE:on_publish_done/5).
emqx:unhook('message.publish_done', fun ?MODULE:on_publish_done/3).
-spec get_topic(proplists:proplist()) -> binary().
get_topic(Cfg) ->

View File

@ -16,6 +16,7 @@
-module(emqx_trace_api).
-include_lib("emqx/include/logger.hrl").
-include_lib("kernel/include/file.hrl").
%% API
-export([ list_trace/2
@ -74,10 +75,10 @@ download_zip_log(#{name := Name}, _Param) ->
TraceFiles = collect_trace_file(TraceLog),
ZipDir = emqx_trace:zip_dir(),
Zips = group_trace_file(ZipDir, TraceLog, TraceFiles),
ZipFileName = ZipDir ++ TraceLog,
{ok, ZipFile} = zip:zip(ZipFileName, Zips),
ZipFileName = ZipDir ++ binary_to_list(Name) ++ ".zip",
{ok, ZipFile} = zip:zip(ZipFileName, Zips, [{cwd, ZipDir}]),
emqx_trace:delete_files_after_send(ZipFileName, Zips),
{ok, #{}, {sendfile, 0, filelib:file_size(ZipFile), ZipFile}};
{ok, ZipFile};
{error, Reason} ->
{error, Reason}
end.
@ -88,7 +89,7 @@ group_trace_file(ZipDir, TraceLog, TraceFiles) ->
{ok, Node, Bin} ->
ZipName = ZipDir ++ Node ++ "-" ++ TraceLog,
ok = file:write_file(ZipName, Bin),
[ZipName | Acc];
[Node ++ "-" ++ TraceLog | Acc];
{error, Node, Reason} ->
?LOG(error, "download trace log error:~p", [{Node, TraceLog, Reason}]),
Acc
@ -101,20 +102,19 @@ collect_trace_file(TraceLog) ->
BadNodes =/= [] andalso ?LOG(error, "download log rpc failed on ~p", [BadNodes]),
Files.
%% _page as position and _limit as bytes for front-end reusing components
stream_log_file(#{name := Name}, Params) ->
Node0 = proplists:get_value(<<"node">>, Params, atom_to_binary(node())),
Position0 = proplists:get_value(<<"_page">>, Params, <<"0">>),
Bytes0 = proplists:get_value(<<"_limit">>, Params, <<"500">>),
Position0 = proplists:get_value(<<"position">>, Params, <<"0">>),
Bytes0 = proplists:get_value(<<"bytes">>, Params, <<"1000">>),
Node = binary_to_existing_atom(Node0),
Position = binary_to_integer(Position0),
Bytes = binary_to_integer(Bytes0),
case rpc:call(Node, ?MODULE, read_trace_file, [Name, Position, Bytes]) of
{ok, Bin} ->
Meta = #{<<"page">> => Position + byte_size(Bin), <<"limit">> => Bytes},
Meta = #{<<"position">> => Position + byte_size(Bin), <<"bytes">> => Bytes},
{ok, #{meta => Meta, items => Bin}};
eof ->
Meta = #{<<"page">> => Position, <<"limit">> => Bytes},
{eof, Size} ->
Meta = #{<<"position">> => Size, <<"bytes">> => Bytes},
{ok, #{meta => Meta, items => <<"">>}};
{error, Reason} ->
logger:log(error, "read_file_failed by ~p", [{Name, Reason, Position, Bytes}]),
@ -134,6 +134,7 @@ read_trace_file(Name, Position, Limit) ->
[] -> {error, not_found}
end.
-dialyzer({nowarn_function, read_file/3}).
read_file(Path, Offset, Bytes) ->
{ok, IoDevice} = file:open(Path, [read, raw, binary]),
try
@ -141,7 +142,13 @@ read_file(Path, Offset, Bytes) ->
0 -> ok;
_ -> file:position(IoDevice, {bof, Offset})
end,
file:read(IoDevice, Bytes)
case file:read(IoDevice, Bytes) of
{ok, Bin} -> {ok, Bin};
{error, Reason} -> {error, Reason};
eof ->
#file_info{size = Size} = file:read_file_info(IoDevice),
{eof, Size}
end
after
file:close(IoDevice)
end.

View File

@ -336,9 +336,8 @@ t_download_log(_Config) ->
{ok, _} = emqtt:connect(Client),
[begin _ = emqtt:ping(Client) end ||_ <- lists:seq(1, 5)],
ct:sleep(100),
{ok, #{}, {sendfile, 0, ZipFileSize, _ZipFile}} =
emqx_trace_api:download_zip_log(#{name => Name}, []),
?assert(ZipFileSize > 0),
{ok, ZipFile} = emqx_trace_api:download_zip_log(#{name => Name}, []),
?assert(filelib:file_size(ZipFile) > 0),
ok = emqtt:disconnect(Client),
unload(),
ok.

View File

@ -1,6 +1,6 @@
{application, emqx_retainer,
[{description, "EMQ X Retainer"},
{vsn, "4.4.1"}, % strict semver, bump manually!
{vsn, "4.4.0"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_retainer_sup]},
{applications, [kernel,stdlib]},

View File

@ -156,7 +156,7 @@ start_expire_timer(0, State) ->
start_expire_timer(undefined, State) ->
State;
start_expire_timer(Ms, State) ->
Timer = erlang:send_after(Ms, self(), stats),
Timer = erlang:send_after(Ms, self(), {expire, Ms}),
State#state{expiry_timer = Timer}.
handle_call(Req, _From, State) ->
@ -168,12 +168,14 @@ handle_cast(Msg, State) ->
{noreply, State}.
handle_info(stats, State = #state{stats_fun = StatsFun}) ->
StatsTimer = erlang:send_after(timer:seconds(1), self(), stats),
StatsFun(retained_count()),
{noreply, State, hibernate};
{noreply, State#state{stats_timer = StatsTimer}, hibernate};
handle_info(expire, State) ->
handle_info({expire, Ms} = Expire, State) ->
Timer = erlang:send_after(Ms, self(), Expire),
ok = expire_messages(),
{noreply, State, hibernate};
{noreply, State#state{expiry_timer = Timer}, hibernate};
handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),

View File

@ -50,6 +50,9 @@
%% erlang:system_time should be unique and random enough
-define(CLIENTID, iolist_to_binary([atom_to_list(?FUNCTION_NAME), "-",
integer_to_list(erlang:system_time())])).
-elvis([{elvis_style, dont_repeat_yourself, disable}]).
%%--------------------------------------------------------------------
%% Setups
%%--------------------------------------------------------------------
@ -66,8 +69,10 @@ end_per_suite(_) ->
emqx_ct_helpers:stop_apps([emqx_sn]).
set_special_confs(emqx) ->
application:set_env(emqx, plugins_loaded_file,
emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins"));
application:set_env(
emqx,
plugins_loaded_file,
emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins"));
set_special_confs(emqx_sn) ->
application:set_env(emqx_sn, enable_qos3, ?ENABLE_QOS3),
application:set_env(emqx_sn, enable_stats, true),
@ -113,7 +118,8 @@ t_subscribe(_) ->
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
TopicName1 = <<"abcD">>,
send_register_msg(Socket, TopicName1, MsgId),
?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, receive_response(Socket)),
?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>,
receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1,
CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16,
@ -145,7 +151,8 @@ t_subscribe_case01(_) ->
TopicName1 = <<"abcD">>,
send_register_msg(Socket, TopicName1, MsgId),
?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, receive_response(Socket)),
?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>,
receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ReturnCode>>,
@ -166,17 +173,18 @@ t_subscribe_case02(_) ->
Will = 0,
CleanSession = 0,
MsgId = 1,
TopicId = ?PREDEF_TOPIC_ID1, %this TopicId is the predefined topic id corresponding to ?PREDEF_TOPIC_NAME1
TopicId = ?PREDEF_TOPIC_ID1,
ReturnCode = 0,
{ok, Socket} = gen_udp:open(0, [binary]),
ClientId = ?CLIENTID,
send_connect_msg(Socket, ?CLIENTID),
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
Topic1 = ?PREDEF_TOPIC_NAME1,
send_register_msg(Socket, Topic1, MsgId),
?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, receive_response(Socket)),
?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>,
receive_response(Socket)),
send_subscribe_msg_predefined_topic(Socket, QoS, TopicId, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ReturnCode>>,
@ -206,9 +214,11 @@ t_subscribe_case03(_) ->
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_subscribe_msg_short_topic(Socket, QoS, <<"te">>, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ReturnCode>>,
receive_response(Socket)),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1,
CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId:16, MsgId:16, ReturnCode>>,
receive_response(Socket)
),
send_unsubscribe_msg_short_topic(Socket, <<"te">>, MsgId),
?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)),
@ -217,8 +227,12 @@ t_subscribe_case03(_) ->
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket).
%%In this case We use predefined topic name to register and subcribe, and expect to receive the corresponding predefined topic id but not a new generated topic id from broker. We design this case to illustrate
%% emqx_sn_gateway's compatibility of dealing with predefined and normal topics. Once we give more restrictions to different topic id type, this case would be deleted or modified.
%% In this case We use predefined topic name to register and subcribe, and
%% expect to receive the corresponding predefined topic id but not a new
%% generated topic id from broker. We design this case to illustrate
%% emqx_sn_gateway's compatibility of dealing with predefined and normal topics.
%% Once we give more restrictions to different topic id type, this case would
%% be deleted or modified.
t_subscribe_case04(_) ->
Dup = 0,
QoS = 0,
@ -226,7 +240,7 @@ t_subscribe_case04(_) ->
Will = 0,
CleanSession = 0,
MsgId = 1,
TopicId = ?PREDEF_TOPIC_ID1, %this TopicId is the predefined topic id corresponding to ?PREDEF_TOPIC_NAME1
TopicId = ?PREDEF_TOPIC_ID1,
ReturnCode = 0,
{ok, Socket} = gen_udp:open(0, [binary]),
ClientId = ?CLIENTID,
@ -234,10 +248,14 @@ t_subscribe_case04(_) ->
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
Topic1 = ?PREDEF_TOPIC_NAME1,
send_register_msg(Socket, Topic1, MsgId),
?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>, receive_response(Socket)),
?assertEqual(<<7, ?SN_REGACK, TopicId:16, MsgId:16, 0:8>>,
receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, QoS, Topic1, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ReturnCode>>,
receive_response(Socket)),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId:16, MsgId:16, ReturnCode>>,
receive_response(Socket)
),
send_unsubscribe_msg_normal_topic(Socket, Topic1, MsgId),
?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)),
@ -264,19 +282,30 @@ t_subscribe_case05(_) ->
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_register_msg(Socket, <<"abcD">>, MsgId),
?assertEqual(<<7, ?SN_REGACK, TopicId1:16, MsgId:16, 0:8>>, receive_response(Socket)),
?assertEqual(<<7, ?SN_REGACK, TopicId1:16, MsgId:16, 0:8>>,
receive_response(Socket)
),
send_subscribe_msg_normal_topic(Socket, QoS, <<"abcD">>, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ReturnCode>>,
receive_response(Socket)),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId1:16, MsgId:16, ReturnCode>>,
receive_response(Socket)
),
send_subscribe_msg_normal_topic(Socket, QoS, <<"/sport/#">>, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ReturnCode>>,
receive_response(Socket)),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId0:16, MsgId:16, ReturnCode>>,
receive_response(Socket)
),
send_subscribe_msg_normal_topic(Socket, QoS, <<"/a/+/water">>, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ReturnCode>>,
receive_response(Socket)),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId0:16, MsgId:16, ReturnCode>>,
receive_response(Socket)
),
send_subscribe_msg_normal_topic(Socket, QoS, <<"/Tom/Home">>, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
@ -306,19 +335,32 @@ t_subscribe_case06(_) ->
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_register_msg(Socket, <<"abc">>, MsgId),
?assertEqual(<<7, ?SN_REGACK, TopicId1:16, MsgId:16, 0:8>>, receive_response(Socket)),
?assertEqual(<<7, ?SN_REGACK, TopicId1:16, MsgId:16, 0:8>>,
receive_response(Socket)
),
send_register_msg(Socket, <<"/blue/#">>, MsgId),
?assertEqual(<<7, ?SN_REGACK, TopicId0:16, MsgId:16, ?SN_RC_NOT_SUPPORTED:8>>, receive_response(Socket)),
?assertEqual(<<7, ?SN_REGACK, TopicId0:16,
MsgId:16, ?SN_RC_NOT_SUPPORTED:8>>,
receive_response(Socket)
),
send_register_msg(Socket, <<"/blue/+/white">>, MsgId),
?assertEqual(<<7, ?SN_REGACK, TopicId0:16, MsgId:16, ?SN_RC_NOT_SUPPORTED:8>>, receive_response(Socket)),
?assertEqual(<<7, ?SN_REGACK, TopicId0:16,
MsgId:16, ?SN_RC_NOT_SUPPORTED:8>>,
receive_response(Socket)
),
send_register_msg(Socket, <<"/$sys/rain">>, MsgId),
?assertEqual(<<7, ?SN_REGACK, TopicId2:16, MsgId:16, 0:8>>, receive_response(Socket)),
?assertEqual(<<7, ?SN_REGACK, TopicId2:16, MsgId:16, 0:8>>,
receive_response(Socket)
),
send_subscribe_msg_short_topic(Socket, QoS, <<"Q2">>, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ReturnCode>>,
receive_response(Socket)),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId0:16, MsgId:16, ReturnCode>>,
receive_response(Socket)
),
send_unsubscribe_msg_normal_topic(Socket, <<"Q2">>, MsgId),
?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)),
@ -342,8 +384,11 @@ t_subscribe_case07(_) ->
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_subscribe_msg_predefined_topic(Socket, QoS, TopicId1, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>,
receive_response(Socket)),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId1:16, MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>,
receive_response(Socket)
),
send_unsubscribe_msg_predefined_topic(Socket, TopicId2, MsgId),
?assertEqual(<<4, ?SN_UNSUBACK, MsgId:16>>, receive_response(Socket)),
@ -365,8 +410,11 @@ t_subscribe_case08(_) ->
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_subscribe_msg_reserved_topic(Socket, QoS, TopicId2, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, ?SN_INVALID_TOPIC_ID:16, MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>,
receive_response(Socket)),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
?SN_INVALID_TOPIC_ID:16, MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>,
receive_response(Socket)
),
send_disconnect_msg(Socket, undefined),
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
@ -390,15 +438,20 @@ t_publish_negqos_case09(_) ->
send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)
),
MsgId1 = 3,
Payload1 = <<20, 21, 22, 23>>,
send_publish_msg_normal_topic(Socket, NegQoS, MsgId1, TopicId1, Payload1),
timer:sleep(100),
case ?ENABLE_QOS3 of
true ->
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
What = receive_response(Socket),
?assertEqual(Eexp, What)
end,
@ -431,7 +484,9 @@ t_publish_qos0_case01(_) ->
send_publish_msg_normal_topic(Socket, QoS, MsgId1, TopicId1, Payload1),
timer:sleep(100),
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
What = receive_response(Socket),
?assertEqual(Eexp, What),
@ -453,15 +508,20 @@ t_publish_qos0_case02(_) ->
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_subscribe_msg_predefined_topic(Socket, QoS, PredefTopicId, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)
),
MsgId1 = 3,
Payload1 = <<20, 21, 22, 23>>,
send_publish_msg_predefined_topic(Socket, QoS, MsgId1, PredefTopicId, Payload1),
timer:sleep(100),
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_PREDEFINED_TOPIC:2, PredefTopicId:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_PREDEFINED_TOPIC:2,
PredefTopicId:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
What = receive_response(Socket),
?assertEqual(Eexp, What),
@ -484,15 +544,20 @@ t_publish_qos0_case3(_) ->
Topic = <<"/a/b/c">>,
send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)
),
MsgId1 = 3,
Payload1 = <<20, 21, 22, 23>>,
send_publish_msg_predefined_topic(Socket, QoS, MsgId1, TopicId, Payload1),
timer:sleep(100),
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
What = receive_response(Socket),
?assertEqual(Eexp, What),
@ -514,8 +579,11 @@ t_publish_qos0_case04(_) ->
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, QoS, <<"#">>, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)
),
MsgId1 = 2,
Payload1 = <<20, 21, 22, 23>>,
@ -523,7 +591,9 @@ t_publish_qos0_case04(_) ->
send_publish_msg_short_topic(Socket, QoS, MsgId1, Topic, Payload1),
timer:sleep(100),
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_SHORT_TOPIC:2, Topic/binary, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_SHORT_TOPIC:2,
Topic/binary, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
What = receive_response(Socket),
?assertEqual(Eexp, What),
@ -544,8 +614,11 @@ t_publish_qos0_case05(_) ->
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_subscribe_msg_short_topic(Socket, QoS, <<"/#">>, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)
),
send_disconnect_msg(Socket, undefined),
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
@ -567,15 +640,20 @@ t_publish_qos0_case06(_) ->
Topic = <<"abc">>,
send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)
),
MsgId1 = 3,
Payload1 = <<20, 21, 22, 23>>,
send_publish_msg_normal_topic(Socket, QoS, MsgId1, TopicId1, Payload1),
timer:sleep(100),
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
Eexp = <<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId1:16, (mid(0)):16, <<20, 21, 22, 23>>/binary>>,
What = receive_response(Socket),
?assertEqual(Eexp, What),
@ -597,16 +675,25 @@ t_publish_qos1_case01(_) ->
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, QoS, Topic, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)
),
Payload1 = <<20, 21, 22, 23>>,
send_publish_msg_normal_topic(Socket, QoS, MsgId, TopicId1, Payload1),
?assertEqual(<<7, ?SN_PUBACK, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket)),
?assertEqual(<<7, ?SN_PUBACK, TopicId1:16,
MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)
),
timer:sleep(100),
?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, <<20, 21, 22, 23>>/binary>>, receive_response(Socket)),
?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId1:16, MsgId:16, <<20, 21, 22, 23>>/binary>>,
receive_response(Socket)
),
send_disconnect_msg(Socket, undefined),
gen_udp:close(Socket).
@ -625,12 +712,18 @@ t_publish_qos1_case02(_) ->
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_subscribe_msg_predefined_topic(Socket, QoS, PredefTopicId, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)
),
Payload1 = <<20, 21, 22, 23>>,
send_publish_msg_predefined_topic(Socket, QoS, MsgId, PredefTopicId, Payload1),
?assertEqual(<<7, ?SN_PUBACK, PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket)),
?assertEqual(<<7, ?SN_PUBACK, PredefTopicId:16,
MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)
),
timer:sleep(100),
send_disconnect_msg(Socket, undefined),
@ -645,7 +738,10 @@ t_publish_qos1_case03(_) ->
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_publish_msg_predefined_topic(Socket, QoS, MsgId, tid(5), <<20, 21, 22, 23>>),
?assertEqual(<<7, ?SN_PUBACK, TopicId5:16, MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>, receive_response(Socket)),
?assertEqual(<<7, ?SN_PUBACK, TopicId5:16,
MsgId:16, ?SN_RC_INVALID_TOPIC_ID>>,
receive_response(Socket)
),
send_disconnect_msg(Socket, undefined),
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
@ -664,15 +760,20 @@ t_publish_qos1_case04(_) ->
send_connect_msg(Socket, ClientId),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_subscribe_msg_short_topic(Socket, QoS, <<"ab">>, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId0:16, MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)
),
Topic = <<"ab">>,
Payload1 = <<20, 21, 22, 23>>,
send_publish_msg_short_topic(Socket, QoS, MsgId, Topic, Payload1),
<<TopicIdShort:16>> = Topic,
?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16, MsgId:16, ?SN_RC_ACCEPTED>>, receive_response(Socket)),
?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16,
MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)
),
timer:sleep(100),
send_disconnect_msg(Socket, undefined),
@ -692,13 +793,18 @@ t_publish_qos1_case05(_) ->
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, QoS, <<"ab">>, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId1:16, MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)
),
send_publish_msg_short_topic(Socket, QoS, MsgId, <<"/#">>, <<20, 21, 22, 23>>),
<<TopicIdShort:16>> = <<"/#">>,
?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16, MsgId:16, ?SN_RC_NOT_SUPPORTED>>, receive_response(Socket)),
?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16,
MsgId:16, ?SN_RC_NOT_SUPPORTED>>,
receive_response(Socket)
),
send_disconnect_msg(Socket, undefined),
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
@ -724,7 +830,10 @@ t_publish_qos1_case06(_) ->
send_publish_msg_short_topic(Socket, QoS, MsgId, <<"/+">>, <<20, 21, 22, 23>>),
<<TopicIdShort:16>> = <<"/+">>,
?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16, MsgId:16, ?SN_RC_NOT_SUPPORTED>>, receive_response(Socket)),
?assertEqual(<<7, ?SN_PUBACK, TopicIdShort:16,
MsgId:16, ?SN_RC_NOT_SUPPORTED>>,
receive_response(Socket)
),
send_disconnect_msg(Socket, undefined),
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
@ -751,7 +860,11 @@ t_publish_qos2_case01(_) ->
send_publish_msg_normal_topic(Socket, QoS, MsgId, TopicId1, Payload1),
?assertEqual(<<4, ?SN_PUBREC, MsgId:16>>, receive_response(Socket)),
send_pubrel_msg(Socket, MsgId),
?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, 1:16, <<20, 21, 22, 23>>/binary>>, receive_response(Socket)),
?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId1:16, 1:16, <<20, 21, 22, 23>>/binary>>,
receive_response(Socket)
),
?assertEqual(<<4, ?SN_PUBCOMP, MsgId:16>>, receive_response(Socket)),
timer:sleep(100),
@ -773,15 +886,21 @@ t_publish_qos2_case02(_) ->
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_subscribe_msg_predefined_topic(Socket, QoS, PredefTopicId, MsgId),
?assertEqual(<<8, ?SN_SUBACK, ?FNU:1, QoS:2, ?FNU:5, PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
?assertEqual(<<8, ?SN_SUBACK, ?FNU:1, QoS:2, ?FNU:5,
PredefTopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)
),
Payload1 = <<20, 21, 22, 23>>,
send_publish_msg_predefined_topic(Socket, QoS, MsgId, PredefTopicId, Payload1),
?assertEqual(<<4, ?SN_PUBREC, MsgId:16>>, receive_response(Socket)),
send_pubrel_msg(Socket, MsgId),
?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_PREDEFINED_TOPIC :2, PredefTopicId:16, 1:16, <<20, 21, 22, 23>>/binary>>, receive_response(Socket)),
?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_PREDEFINED_TOPIC:2,
PredefTopicId:16, 1:16, <<20, 21, 22, 23>>/binary>>,
receive_response(Socket)
),
?assertEqual(<<4, ?SN_PUBCOMP, MsgId:16>>, receive_response(Socket)),
timer:sleep(100),
@ -812,7 +931,11 @@ t_publish_qos2_case03(_) ->
?assertEqual(<<4, ?SN_PUBREC, MsgId:16>>, receive_response(Socket)),
send_pubrel_msg(Socket, MsgId),
?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1, ?SN_SHORT_TOPIC :2, <<"/a">>/binary, 1:16, <<20, 21, 22, 23>>/binary>>, receive_response(Socket)),
?assertEqual(<<11, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_SHORT_TOPIC:2,
"/a", 1:16, <<20, 21, 22, 23>>/binary>>,
receive_response(Socket)
),
?assertEqual(<<4, ?SN_PUBCOMP, MsgId:16>>, receive_response(Socket)),
timer:sleep(100),
@ -1083,7 +1206,11 @@ t_asleep_test03_to_awake_qos1_dl_msg(_) ->
send_register_msg(Socket, TopicName1, MsgId1),
?assertEqual(<<7, ?SN_REGACK, TopicId1:16, MsgId1:16, 0:8>>, receive_response(Socket)),
send_subscribe_msg_predefined_topic(Socket, QoS, TopicId1, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId1:16, MsgId:16, ReturnCode>>, receive_response(Socket)),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId1:16, MsgId:16, ReturnCode>>,
receive_response(Socket)
),
% goto asleep state
send_disconnect_msg(Socket, 1),
@ -1109,7 +1236,10 @@ t_asleep_test03_to_awake_qos1_dl_msg(_) ->
%% the broker should sent dl msgs to the awake client before sending the pingresp
UdpData = receive_response(Socket),
MsgId_udp = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicId1, Payload1}, UdpData),
MsgId_udp = check_publish_msg_on_udp(
{Dup, QoS, Retain, WillBit,
CleanSession, ?SN_NORMAL_TOPIC,
TopicId1, Payload1}, UdpData),
send_puback_msg(Socket, TopicId1, MsgId_udp),
%% check the pingresp is received at last
@ -1141,8 +1271,11 @@ t_asleep_test04_to_awake_qos1_dl_msg(_) ->
CleanSession = 0,
ReturnCode = 0,
send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId1),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId1:16, ReturnCode>>,
receive_response(Socket)),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
WillBit:1,CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId0:16, MsgId1:16, ReturnCode>>,
receive_response(Socket)
),
% goto asleep state
send_disconnect_msg(Socket, 1),
@ -1176,11 +1309,17 @@ t_asleep_test04_to_awake_qos1_dl_msg(_) ->
send_regack_msg(Socket, TopicIdNew, MsgId3),
UdpData2 = receive_response(Socket),
MsgId_udp2 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload1}, UdpData2),
MsgId_udp2 = check_publish_msg_on_udp(
{Dup, QoS, Retain, WillBit,
CleanSession, ?SN_NORMAL_TOPIC,
TopicIdNew, Payload1}, UdpData2),
send_puback_msg(Socket, TopicIdNew, MsgId_udp2),
UdpData3 = receive_response(Socket),
MsgId_udp3 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload2}, UdpData3),
MsgId_udp3 = check_publish_msg_on_udp(
{Dup, QoS, Retain, WillBit,
CleanSession, ?SN_NORMAL_TOPIC,
TopicIdNew, Payload2}, UdpData3),
send_puback_msg(Socket, TopicIdNew, MsgId_udp3),
?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
@ -1216,8 +1355,11 @@ t_asleep_test05_to_awake_qos1_dl_msg(_) ->
CleanSession = 0,
ReturnCode = 0,
send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId1),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId1:16, ReturnCode>>,
receive_response(Socket)),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId0:16, MsgId1:16, ReturnCode>>,
receive_response(Socket)
),
% goto asleep state
SleepDuration = 30,
@ -1250,21 +1392,28 @@ t_asleep_test05_to_awake_qos1_dl_msg(_) ->
send_regack_msg(Socket, TopicIdNew, MsgId_reg),
UdpData2 = receive_response(Socket),
MsgId2 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload2}, UdpData2),
MsgId2 = check_publish_msg_on_udp(
{Dup, QoS, Retain, WillBit,
CleanSession, ?SN_NORMAL_TOPIC,
TopicIdNew, Payload2}, UdpData2),
send_puback_msg(Socket, TopicIdNew, MsgId2),
timer:sleep(50),
UdpData3 = wrap_receive_response(Socket),
MsgId3 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload3}, UdpData3),
MsgId3 = check_publish_msg_on_udp(
{Dup, QoS, Retain, WillBit,
CleanSession, ?SN_NORMAL_TOPIC,
TopicIdNew, Payload3}, UdpData3),
send_puback_msg(Socket, TopicIdNew, MsgId3),
timer:sleep(50),
case receive_response(Socket) of
<<2,23>> -> ok;
UdpData4 ->
MsgId4 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit,
CleanSession, ?SN_NORMAL_TOPIC,
TopicIdNew, Payload4}, UdpData4),
MsgId4 = check_publish_msg_on_udp(
{Dup, QoS, Retain, WillBit,
CleanSession, ?SN_NORMAL_TOPIC,
TopicIdNew, Payload4}, UdpData4),
send_puback_msg(Socket, TopicIdNew, MsgId4)
end,
?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
@ -1322,7 +1471,10 @@ t_asleep_test06_to_awake_qos2_dl_msg(_) ->
send_pingreq_msg(Socket, ClientId),
UdpData = wrap_receive_response(Socket),
MsgId_udp = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicId_tom, Payload1}, UdpData),
MsgId_udp = check_publish_msg_on_udp(
{Dup, QoS, Retain, WillBit,
CleanSession, ?SN_NORMAL_TOPIC,
TopicId_tom, Payload1}, UdpData),
send_pubrec_msg(Socket, MsgId_udp),
?assertMatch(<<_:8, ?SN_PUBREL:8, _/binary>>, receive_response(Socket)),
send_pubcomp_msg(Socket, MsgId_udp),
@ -1357,8 +1509,11 @@ t_asleep_test07_to_connected(_) ->
send_register_msg(Socket, TopicName_tom, MsgId1),
TopicId_tom = check_regack_msg_on_udp(MsgId1, receive_response(Socket)),
send_subscribe_msg_predefined_topic(Socket, QoS, TopicId_tom, MsgId1),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId_tom:16, MsgId1:16, ReturnCode>>,
receive_response(Socket)),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
WillBit:1,CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId_tom:16, MsgId1:16, ReturnCode>>,
receive_response(Socket)
),
% goto asleep state
send_disconnect_msg(Socket, SleepDuration),
@ -1436,8 +1591,11 @@ t_asleep_test09_to_awake_again_qos1_dl_msg(_) ->
CleanSession = 0,
ReturnCode = 0,
send_subscribe_msg_normal_topic(Socket, QoS, TopicName1, MsgId1),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, ?SN_NORMAL_TOPIC:2, TopicId0:16, MsgId1:16, ReturnCode>>,
receive_response(Socket)),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1,
WillBit:1,CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId0:16, MsgId1:16, ReturnCode>>,
receive_response(Socket)
),
% goto asleep state
SleepDuration = 30,
send_disconnect_msg(Socket, SleepDuration),
@ -1471,7 +1629,10 @@ t_asleep_test09_to_awake_again_qos1_dl_msg(_) ->
udp_receive_timeout ->
ok;
UdpData2 ->
MsgId2 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload2}, UdpData2),
MsgId2 = check_publish_msg_on_udp(
{Dup, QoS, Retain, WillBit,
CleanSession, ?SN_NORMAL_TOPIC,
TopicIdNew, Payload2}, UdpData2),
send_puback_msg(Socket, TopicIdNew, MsgId2)
end,
timer:sleep(100),
@ -1480,7 +1641,10 @@ t_asleep_test09_to_awake_again_qos1_dl_msg(_) ->
udp_receive_timeout ->
ok;
UdpData3 ->
MsgId3 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, ?SN_NORMAL_TOPIC, TopicIdNew, Payload3}, UdpData3),
MsgId3 = check_publish_msg_on_udp(
{Dup, QoS, Retain, WillBit,
CleanSession, ?SN_NORMAL_TOPIC,
TopicIdNew, Payload3}, UdpData3),
send_puback_msg(Socket, TopicIdNew, MsgId3)
end,
timer:sleep(100),
@ -1489,16 +1653,18 @@ t_asleep_test09_to_awake_again_qos1_dl_msg(_) ->
udp_receive_timeout ->
ok;
UdpData4 ->
MsgId4 = check_publish_msg_on_udp({Dup, QoS, Retain, WillBit,
CleanSession, ?SN_NORMAL_TOPIC,
TopicIdNew, Payload4}, UdpData4),
MsgId4 = check_publish_msg_on_udp(
{Dup, QoS, Retain, WillBit,
CleanSession, ?SN_NORMAL_TOPIC,
TopicIdNew, Payload4}, UdpData4),
send_puback_msg(Socket, TopicIdNew, MsgId4)
end,
?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
%% send PINGREQ again to enter awake state
send_pingreq_msg(Socket, ClientId),
%% will not receive any buffered PUBLISH messages buffered before last awake, only receive PINGRESP here
%% will not receive any buffered PUBLISH messages buffered before last
%% awake, only receive PINGRESP here
?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
gen_udp:close(Socket).
@ -1901,8 +2067,12 @@ check_dispatched_message(Dup, QoS, Retain, TopicIdType, TopicId, Payload, Socket
PubMsg = receive_response(Socket),
Length = 7 + byte_size(Payload),
?LOG("check_dispatched_message ~p~n", [PubMsg]),
?LOG("expected ~p xx ~p~n", [<<Length, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, ?FNU:2, TopicIdType:2, TopicId:16>>, Payload]),
<<Length, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, ?FNU:2, TopicIdType:2, TopicId:16, MsgId:16, Payload/binary>> = PubMsg,
?LOG("expected ~p xx ~p~n",
[<<Length, ?SN_PUBLISH,
Dup:1, QoS:2, Retain:1, ?FNU:2, TopicIdType:2, TopicId:16>>, Payload]),
<<Length, ?SN_PUBLISH,
Dup:1, QoS:2, Retain:1, ?FNU:2, TopicIdType:2,
TopicId:16, MsgId:16, Payload/binary>> = PubMsg,
case QoS of
0 -> ok;
1 -> send_puback_msg(Socket, TopicId, MsgId);
@ -1914,11 +2084,14 @@ check_dispatched_message(Dup, QoS, Retain, TopicIdType, TopicId, Payload, Socket
get_udp_broadcast_address() ->
"255.255.255.255".
check_publish_msg_on_udp({Dup, QoS, Retain, WillBit, CleanSession, TopicType, TopicId, Payload}, UdpData) ->
check_publish_msg_on_udp({Dup, QoS, Retain, WillBit,
CleanSession, TopicType, TopicId, Payload}, UdpData) ->
<<HeaderUdp:5/binary, MsgId:16, PayloadIn/binary>> = UdpData,
ct:pal("UdpData: ~p, Payload: ~p, PayloadIn: ~p", [UdpData, Payload, PayloadIn]),
Size9 = byte_size(Payload) + 7,
Eexp = <<Size9:8, ?SN_PUBLISH, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1, TopicType:2, TopicId:16>>,
Eexp = <<Size9:8,
?SN_PUBLISH, Dup:1, QoS:2, Retain:1, WillBit:1, CleanSession:1,
TopicType:2, TopicId:16>>,
?assertEqual(Eexp, HeaderUdp), % mqtt-sn header should be same
?assertEqual(Payload, PayloadIn), % payload should be same
MsgId.

View File

@ -1,6 +1,6 @@
{application, emqx_stomp,
[{description, "EMQ X Stomp Protocol Plugin"},
{vsn, "4.3.2"}, % strict semver, bump manually!
{vsn, "4.3.3"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_stomp_sup]},
{applications, [kernel,stdlib]},

View File

@ -1,10 +1,16 @@
%% -*- mode: erlang -*-
{VSN,
[{"4.3.1",[{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
[{"4.3.2",[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}]},
{"4.3.1",[
{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
{"4.3.0",
[{restart_application,emqx_stomp}]},
{<<".*">>,[]}],
[{"4.3.1",[{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
[{"4.3.2",[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}]},
{"4.3.1",[
{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
{"4.3.0",
[{restart_application,emqx_stomp}]},
{<<".*">>,[]}]}.

View File

@ -108,6 +108,8 @@
, init/2
]}).
-elvis([{elvis_style, dont_repeat_yourself, disable}]).
-type(pstate() :: #pstate{}).
%% @doc Init protocol
@ -132,8 +134,7 @@ init(ConnInfo = #{peername := {PeerHost, _Port},
AllowAnonymous = get_value(allow_anonymous, Opts, false),
DefaultUser = get_value(default_user, Opts),
#pstate{
#pstate{
conninfo = NConnInfo,
clientinfo = ClientInfo,
heartfun = HeartFun,
@ -165,7 +166,7 @@ default_conninfo(ConnInfo) ->
info(State) ->
maps:from_list(info(?INFO_KEYS, State)).
-spec info(list(atom())|atom(), pstate()) -> term().
-spec info(list(atom()) | atom(), pstate()) -> term().
info(Keys, State) when is_list(Keys) ->
[{Key, info(Key, State)} || Key <- Keys];
info(conninfo, #pstate{conninfo = ConnInfo}) ->
@ -288,7 +289,12 @@ received(#stomp_frame{command = <<"CONNECT">>}, State = #pstate{connected = true
received(Frame = #stomp_frame{command = <<"SEND">>, headers = Headers}, State) ->
case header(<<"transaction">>, Headers) of
undefined -> {ok, handle_recv_send_frame(Frame, State)};
TransactionId -> add_action(TransactionId, {fun ?MODULE:handle_recv_send_frame/2, [Frame]}, receipt_id(Headers), State)
TransactionId ->
add_action(TransactionId,
{fun ?MODULE:handle_recv_send_frame/2, [Frame]},
receipt_id(Headers),
State
)
end;
received(#stomp_frame{command = <<"SUBSCRIBE">>, headers = Headers},
@ -346,7 +352,11 @@ received(#stomp_frame{command = <<"UNSUBSCRIBE">>, headers = Headers},
received(Frame = #stomp_frame{command = <<"ACK">>, headers = Headers}, State) ->
case header(<<"transaction">>, Headers) of
undefined -> {ok, handle_recv_ack_frame(Frame, State)};
TransactionId -> add_action(TransactionId, {fun ?MODULE:handle_recv_ack_frame/2, [Frame]}, receipt_id(Headers), State)
TransactionId ->
add_action(TransactionId,
{fun ?MODULE:handle_recv_ack_frame/2, [Frame]},
receipt_id(Headers),
State)
end;
%% NACK
@ -357,7 +367,11 @@ received(Frame = #stomp_frame{command = <<"ACK">>, headers = Headers}, State) ->
received(Frame = #stomp_frame{command = <<"NACK">>, headers = Headers}, State) ->
case header(<<"transaction">>, Headers) of
undefined -> {ok, handle_recv_nack_frame(Frame, State)};
TransactionId -> add_action(TransactionId, {fun ?MODULE:handle_recv_nack_frame/2, [Frame]}, receipt_id(Headers), State)
TransactionId ->
add_action(TransactionId,
{fun ?MODULE:handle_recv_nack_frame/2, [Frame]},
receipt_id(Headers),
State)
end;
%% BEGIN
@ -516,9 +530,9 @@ negotiate_version(Accepts) ->
negotiate_version(Ver, []) ->
{error, <<"Supported protocol versions < ", Ver/binary>>};
negotiate_version(Ver, [AcceptVer|_]) when Ver >= AcceptVer ->
negotiate_version(Ver, [AcceptVer | _]) when Ver >= AcceptVer ->
{ok, AcceptVer};
negotiate_version(Ver, [_|T]) ->
negotiate_version(Ver, [_ | T]) ->
negotiate_version(Ver, T).
check_login(Login, _, AllowAnonymous, _)
@ -537,7 +551,7 @@ check_login(Login, Passcode, _, DefaultUser) ->
add_action(Id, Action, ReceiptId, State = #pstate{transaction = Trans}) ->
case maps:get(Id, Trans, undefined) of
{Ts, Actions} ->
NTrans = Trans#{Id => {Ts, [Action|Actions]}},
NTrans = Trans#{Id => {Ts, [Action | Actions]}},
{ok, State#pstate{transaction = NTrans}};
_ ->
send(error_frame(ReceiptId, ["Transaction ", Id, " not found"]), State)
@ -588,15 +602,29 @@ next_ackid() ->
put(ackid, AckId + 1),
AckId.
make_mqtt_message(Topic, Headers, Body) ->
Msg = emqx_message:make(stomp, Topic, Body),
Headers1 = lists:foldl(fun(Key, Headers0) ->
proplists:delete(Key, Headers0)
end, Headers, [<<"destination">>,
<<"content-length">>,
<<"content-type">>,
<<"transaction">>,
<<"receipt">>]),
make_mqtt_message(Topic, Headers, Body,
#pstate{
conninfo = #{proto_ver := ProtoVer},
clientinfo = #{
protocol := Protocol,
clientid := ClientId,
username := Username,
peerhost := PeerHost}}) ->
Msg = emqx_message:make(
ClientId, ?QOS_0,
Topic, Body, #{},
#{proto_ver => ProtoVer,
protocol => Protocol,
username => Username,
peerhost => PeerHost}),
Headers1 = lists:foldl(
fun(Key, Headers0) ->
proplists:delete(Key, Headers0)
end, Headers, [<<"destination">>,
<<"content-length">>,
<<"content-type">>,
<<"transaction">>,
<<"receipt">>]),
emqx_message:set_headers(#{stomp_headers => Headers1}, Msg).
receipt_id(Headers) ->
@ -611,7 +639,7 @@ handle_recv_send_frame(#stomp_frame{command = <<"SEND">>, headers = Headers, bod
allow ->
_ = maybe_send_receipt(receipt_id(Headers), State),
_ = emqx_broker:publish(
make_mqtt_message(Topic, Headers, iolist_to_binary(Body))
make_mqtt_message(Topic, Headers, iolist_to_binary(Body), State)
),
State;
deny ->
@ -699,7 +727,7 @@ find_sub_by_id(Id, Subs) ->
end, Subs),
case maps:to_list(Found) of
[] -> undefined;
[Sub|_] -> Sub
[Sub | _] -> Sub
end.
is_acl_enabled(_) ->

View File

@ -101,6 +101,11 @@ cluster.autoclean = 5m
## Value: String
## cluster.dns.app = emqx
## Type of dns record.
##
## Value: Value: a | srv
## cluster.dns.type = a
##--------------------------------------------------------------------
## Cluster using etcd

View File

@ -87,9 +87,12 @@ update_trace(Path, Params) ->
download_zip_log(Path, Params) ->
case emqx_trace_api:download_zip_log(Path, Params) of
{ok, _Header, _File}= Return -> Return;
{error, _Reason} = Err -> return(Err)
{ok, File} -> minirest:return_file(File);
{error, Reason} -> return({error, 'NOT_FOUND', Reason})
end.
stream_log_file(Path, Params) ->
return(emqx_trace_api:stream_log_file(Path, Params)).
case emqx_trace_api:stream_log_file(Path, Params) of
{ok, File} -> return({ok, File});
{error, Reason} -> return({error, 'NOT_FOUND', Reason})
end.

View File

@ -124,14 +124,14 @@ t_stream_log(_Config) ->
{ok, FileBin} = file:read_file(File),
ct:pal("FileBin: ~p ~s", [byte_size(FileBin), FileBin]),
Header = auth_header_(),
{ok, Binary} = request_api(get, api_path("trace/test_stream_log/log?_limit=10"), Header),
{ok, Binary} = request_api(get, api_path("trace/test_stream_log/log?bytes=10"), Header),
#{<<"code">> := 0, <<"data">> := #{<<"meta">> := Meta, <<"items">> := Bin}} = json(Binary),
?assertEqual(10, byte_size(Bin)),
?assertEqual(#{<<"page">> => 10, <<"limit">> => 10}, Meta),
Path = api_path("trace/test_stream_log/log?_page=20&_limit=10"),
?assertEqual(#{<<"position">> => 10, <<"bytes">> => 10}, Meta),
Path = api_path("trace/test_stream_log/log?position=20&bytes=10"),
{ok, Binary1} = request_api(get, Path, Header),
#{<<"code">> := 0, <<"data">> := #{<<"meta">> := Meta1, <<"items">> := Bin1}} = json(Binary1),
?assertEqual(#{<<"page">> => 30, <<"limit">> => 10}, Meta1),
?assertEqual(#{<<"position">> => 30, <<"bytes">> => 10}, Meta1),
?assertEqual(10, byte_size(Bin1)),
unload(),
ok.

View File

@ -96,6 +96,11 @@
{datatype, string}
]}.
{mapping, "cluster.dns.type", "ekka.cluster_discovery", [
{datatype, {enum, [a, srv]}},
{default, a}
]}.
%%--------------------------------------------------------------------
%% Cluster using etcd
@ -171,7 +176,8 @@
{loop, cuttlefish:conf_get("cluster.mcast.loop", Conf, true)}];
(dns) ->
[{name, cuttlefish:conf_get("cluster.dns.name", Conf)},
{app, cuttlefish:conf_get("cluster.dns.app", Conf)}];
{app, cuttlefish:conf_get("cluster.dns.app", Conf)},
{type, cuttlefish:conf_get("cluster.dns.type", Conf)}];
(etcd) ->
SslOpts = fun(Conf) ->
Options = cuttlefish_variable:filter_by_prefix("cluster.etcd.ssl", Conf),

View File

@ -320,7 +320,8 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel,
puback(PacketId, Session = #session{inflight = Inflight, created_at = CreatedAt}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {Msg, _Ts}} when is_record(Msg, message) ->
emqx:run_hook('message.publish_done', [Msg, CreatedAt]),
emqx:run_hook('message.publish_done',
[Msg, #{session_rebirth_time => CreatedAt}]),
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
return_with(Msg, dequeue(Session#session{inflight = Inflight1}));
{value, {_Pubrel, _Ts}} ->
@ -346,7 +347,8 @@ pubrec(PacketId, Session = #session{inflight = Inflight, created_at = CreatedAt}
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {Msg, _Ts}} when is_record(Msg, message) ->
%% execute hook here, because message record will be replaced by pubrel
emqx:run_hook('message.publish_done', [Msg, CreatedAt]),
emqx:run_hook('message.publish_done',
[Msg, #{session_rebirth_time => CreatedAt}]),
Inflight1 = emqx_inflight:update(PacketId, with_ts(pubrel), Inflight),
{ok, Msg, Session#session{inflight = Inflight1}};
{value, {pubrel, _Ts}} ->
@ -443,7 +445,8 @@ deliver([Msg | More], Acc, Session) ->
end.
deliver_msg(Msg = #message{qos = ?QOS_0}, Session) ->
emqx:run_hook('message.publish_done', [Msg, Session#session.created_at]),
emqx:run_hook('message.publish_done',
[Msg, #{session_rebirth_time => Session#session.created_at}]),
{ok, [{undefined, maybe_ack(Msg)}], Session};
deliver_msg(Msg = #message{qos = QoS}, Session =

View File

@ -147,7 +147,7 @@
dn => binary(),
atom() => term()
}).
-type(clientid() :: binary()|atom()).
-type(clientid() :: binary() | atom()).
-type(username() :: maybe(binary())).
-type(password() :: maybe(binary())).
-type(peerhost() :: inet:ip_address()).
@ -193,6 +193,7 @@
username => username(),
peerhost => peerhost(),
properties => properties(),
allow_publish => boolean(),
atom() => term()}).
-type(banned() :: #banned{}).