Merge branch 'emqx30-feng' of github.com:emqtt/emqttd into emqx30-feng

This commit is contained in:
Feng Lee 2018-08-29 17:55:13 +08:00
commit 2dc8f9c4c5
56 changed files with 786 additions and 423 deletions

View File

@ -4,18 +4,19 @@ PROJECT = emqx
PROJECT_DESCRIPTION = EMQ X Broker PROJECT_DESCRIPTION = EMQ X Broker
PROJECT_VERSION = 3.0 PROJECT_VERSION = 3.0
DEPS = jsx gproc gen_rpc lager ekka esockd cowboy clique DEPS = jsx gproc gen_rpc lager ekka esockd cowboy clique lager_syslog
dep_jsx = git https://github.com/talentdeficit/jsx 2.9.0 dep_jsx = git https://github.com/talentdeficit/jsx 2.9.0
dep_gproc = git https://github.com/uwiger/gproc 0.8.0 dep_gproc = git https://github.com/uwiger/gproc 0.8.0
dep_gen_rpc = git https://github.com/emqx/gen_rpc 2.1.1 dep_gen_rpc = git https://github.com/emqx/gen_rpc 2.2.0
dep_lager = git https://github.com/erlang-lager/lager 3.6.4 dep_lager = git https://github.com/erlang-lager/lager 3.6.4
dep_esockd = git https://github.com/emqx/esockd emqx30 dep_esockd = git https://github.com/emqx/esockd emqx30
dep_ekka = git https://github.com/emqx/ekka emqx30 dep_ekka = git https://github.com/emqx/ekka emqx30
dep_cowboy = git https://github.com/ninenines/cowboy 2.4.0 dep_cowboy = git https://github.com/ninenines/cowboy 2.4.0
dep_clique = git https://github.com/emqx/clique dep_clique = git https://github.com/emqx/clique
dep_lager_syslog = git https://github.com/basho/lager_syslog 3.0.1
NO_AUTOPATCH = gen_rpc cuttlefish NO_AUTOPATCH = cuttlefish
ERLC_OPTS += +debug_info ERLC_OPTS += +debug_info
ERLC_OPTS += +'{parse_transform, lager_transform}' ERLC_OPTS += +'{parse_transform, lager_transform}'
@ -31,11 +32,13 @@ TEST_ERLC_OPTS += +'{parse_transform, lager_transform}'
EUNIT_OPTS = verbose EUNIT_OPTS = verbose
CT_SUITES = emqx_inflight # CT_SUITES = emqx_stats
## emqx_trie emqx_router emqx_frame emqx_mqtt_compat ## emqx_trie emqx_router emqx_frame emqx_mqtt_compat
#CT_SUITES = emqx emqx_broker emqx_mod emqx_lib emqx_topic emqx_mqueue emqx_inflight \ CT_SUITES = emqx emqx_access emqx_base62 emqx_broker emqx_client emqx_cm emqx_frame emqx_guid emqx_inflight \
# emqx_vm emqx_net emqx_protocol emqx_access emqx_router emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_misc emqx_mod emqx_mqtt_caps \
emqx_mqtt_compat emqx_mqtt_properties emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \
emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_zone
CT_OPTS = -cover test/ct.cover.spec -erl_args -name emqxct@127.0.0.1 CT_OPTS = -cover test/ct.cover.spec -erl_args -name emqxct@127.0.0.1

2
erlang.mk vendored
View File

@ -2174,7 +2174,7 @@ help::
CT_RUN = ct_run \ CT_RUN = ct_run \
-no_auto_compile \ -no_auto_compile \
-noinput \ -noinput \
-pa $(CURDIR)/ebin $(DEPS_DIR)/*/ebin $(APPS_DIR)/*/ebin $(TEST_DIR) \ -pa $(CURDIR)/ebin $(DEPS_DIR)/*/ebin $(DEPS_DIR)/gen_rpc/_build/dev/lib/*/ebin $(APPS_DIR)/*/ebin $(TEST_DIR) \
-dir $(TEST_DIR) \ -dir $(TEST_DIR) \
-logdir $(CURDIR)/logs -logdir $(CURDIR)/logs

View File

@ -412,7 +412,7 @@ log.syslog = on
## Sets the severity level for syslog. ## Sets the severity level for syslog.
## ##
## Value: debug | info | notice | warning | error | critical | alert | emergency ## Value: debug | info | notice | warning | error | critical | alert | emergency
## log.syslog.level = error log.syslog.level = error
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## Authentication/Access Control ## Authentication/Access Control

View File

@ -54,7 +54,7 @@
-type(subid() :: binary() | atom()). -type(subid() :: binary() | atom()).
-type(subopts() :: #{qos => integer(), -type(subopts() :: #{qos => integer(),
share => '$queue' | binary(), share => binary(),
atom() => term()}). atom() => term()}).
-record(subscription, { -record(subscription, {

View File

@ -442,7 +442,7 @@ end}.
]}. ]}.
{mapping, "log.syslog", "lager.handlers", [ {mapping, "log.syslog", "lager.handlers", [
%%{default, off}, {default, off},
{datatype, flag} {datatype, flag}
]}. ]}.
@ -456,10 +456,10 @@ end}.
{datatype, {enum, [daemon, local0, local1, local2, local3, local4, local5, local6, local7]}} {datatype, {enum, [daemon, local0, local1, local2, local3, local4, local5, local6, local7]}}
]}. ]}.
%%{mapping, "log.syslog.level", "lager.handlers", [ {mapping, "log.syslog.level", "lager.handlers", [
%% {default, error}, {default, error},
%% {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency]}} {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency]}}
%%]}. ]}.
{mapping, "log.error.redirect", "lager.error_logger_redirect", [ {mapping, "log.error.redirect", "lager.error_logger_redirect", [
{default, on}, {default, on},
@ -511,14 +511,13 @@ end}.
both -> [ConsoleHandler, ConsoleFileHandler]; both -> [ConsoleHandler, ConsoleFileHandler];
_ -> [] _ -> []
end, end,
SyslogHandler = [], SyslogHandler = case cuttlefish:conf_get("log.syslog", Conf) of
%%case cuttlefish:conf_get("log.syslog", Conf, false) of false -> [];
%% false -> []; true -> [{lager_syslog_backend,
%% true -> [{lager_syslog_backend, [cuttlefish:conf_get("log.syslog.identity", Conf),
%% [cuttlefish:conf_get("log.syslog.identity", Conf), cuttlefish:conf_get("log.syslog.facility", Conf),
%% cuttlefish:conf_get("log.syslog.facility", Conf), cuttlefish:conf_get("log.syslog.level", Conf)]}]
%% cuttlefish:conf_get("log.syslog.level", Conf)]}] end,
%%end,
ConsoleHandlers ++ ErrorHandler ++ InfoHandler ++ SyslogHandler ConsoleHandlers ++ ErrorHandler ++ InfoHandler ++ SyslogHandler
end end
}. }.

View File

@ -3,8 +3,8 @@
{vsn,"3.0"}, {vsn,"3.0"},
{modules,[]}, {modules,[]},
{registered,[emqx_sup]}, {registered,[emqx_sup]},
{applications,[kernel,stdlib,jsx,gproc,gen_rpc,lager,esockd,cowboy {applications,[kernel,stdlib,jsx,gproc,gen_rpc,lager,esockd,
]}, cowboy,lager_syslog]},
{env,[]}, {env,[]},
{mod,{emqx_app,[]}}, {mod,{emqx_app,[]}},
{maintainers,["Feng Lee <feng@emqx.io>"]}, {maintainers,["Feng Lee <feng@emqx.io>"]},

View File

@ -92,8 +92,11 @@ unsubscribe(Topic) ->
emqx_broker:unsubscribe(iolist_to_binary(Topic)). emqx_broker:unsubscribe(iolist_to_binary(Topic)).
-spec(unsubscribe(topic() | string(), subscriber() | string()) -> ok | {error, term()}). -spec(unsubscribe(topic() | string(), subscriber() | string()) -> ok | {error, term()}).
unsubscribe(Topic, Subscriber) -> unsubscribe(Topic, Sub) when is_list(Sub) ->
emqx_broker:unsubscribe(iolist_to_binary(Topic), list_to_subid(Subscriber)). emqx_broker:unsubscribe(iolist_to_binary(Topic), list_to_subid(Sub));
unsubscribe(Topic, Subscriber) when is_tuple(Subscriber) ->
{SubPid, SubId} = Subscriber,
emqx_broker:unsubscribe(iolist_to_binary(Topic), SubPid, SubId).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% PubSub management API %% PubSub management API
@ -114,9 +117,9 @@ topics() -> emqx_router:topics().
subscribers(Topic) -> subscribers(Topic) ->
emqx_broker:subscribers(iolist_to_binary(Topic)). emqx_broker:subscribers(iolist_to_binary(Topic)).
-spec(subscriptions(subscriber() | string()) -> [{topic(), subopts()}]). -spec(subscriptions(subscriber()) -> [{topic(), subopts()}]).
subscriptions(Subscriber) -> subscriptions(Subscriber) ->
emqx_broker:subscriptions(list_to_subid(Subscriber)). emqx_broker:subscriptions(Subscriber).
-spec(subscribed(topic() | string(), subscriber()) -> boolean()). -spec(subscribed(topic() | string(), subscriber()) -> boolean()).
subscribed(Topic, Subscriber) -> subscribed(Topic, Subscriber) ->

View File

@ -176,7 +176,7 @@ handle_call({unregister_mod, Type, Mod}, _From, State) ->
reply(case lists:keyfind(Mod, 1, Mods) of reply(case lists:keyfind(Mod, 1, Mods) of
false -> false ->
{error, not_found}; {error, not_found};
true -> {Mod, _ModState, _Seq} ->
ets:insert(?TAB, {tab_key(Type), lists:keydelete(Mod, 1, Mods)}), ok ets:insert(?TAB, {tab_key(Type), lists:keydelete(Mod, 1, Mods)}), ok
end, State); end, State);

View File

@ -81,7 +81,7 @@ handle_event({set_alarm, Alarm = #alarm{timestamp = undefined}}, State)->
handle_event({set_alarm, Alarm = #alarm{id = AlarmId}}, State = #state{alarms = Alarms}) -> handle_event({set_alarm, Alarm = #alarm{id = AlarmId}}, State = #state{alarms = Alarms}) ->
case encode_alarm(Alarm) of case encode_alarm(Alarm) of
{ok, Json} -> {ok, Json} ->
ok = emqx_broker:safe_publish(alarm_msg(alert, AlarmId, Json)); emqx_broker:safe_publish(alarm_msg(alert, AlarmId, Json));
{error, Reason} -> {error, Reason} ->
emqx_logger:error("[AlarmMgr] Failed to encode alarm: ~p", [Reason]) emqx_logger:error("[AlarmMgr] Failed to encode alarm: ~p", [Reason])
end, end,

View File

@ -183,16 +183,18 @@ route([{To, Node}], Delivery) when Node =:= node() ->
route([{To, Node}], Delivery = #delivery{flows = Flows}) when is_atom(Node) -> route([{To, Node}], Delivery = #delivery{flows = Flows}) when is_atom(Node) ->
forward(Node, To, Delivery#delivery{flows = [{route, Node, To}|Flows]}); forward(Node, To, Delivery#delivery{flows = [{route, Node, To}|Flows]});
route([{To, Shared}], Delivery) when is_tuple(Shared); is_binary(Shared) -> route([{To, Group}], Delivery) when is_tuple(Group); is_binary(Group) ->
emqx_shared_sub:dispatch(Shared, To, Delivery); emqx_shared_sub:dispatch(Group, To, Delivery);
route(Routes, Delivery) -> route(Routes, Delivery) ->
lists:foldl(fun(Route, Acc) -> route([Route], Acc) end, Delivery, Routes). lists:foldl(fun(Route, Acc) -> route([Route], Acc) end, Delivery, Routes).
aggre([]) -> aggre([]) ->
[]; [];
aggre([#route{topic = To, dest = Dest}]) -> aggre([#route{topic = To, dest = Node}]) when is_atom(Node) ->
[{To, Dest}]; [{To, Node}];
aggre([#route{topic = To, dest = {Group, _Node}}]) ->
[{To, Group}];
aggre(Routes) -> aggre(Routes) ->
lists:foldl( lists:foldl(
fun(#route{topic = To, dest = Node}, Acc) when is_atom(Node) -> fun(#route{topic = To, dest = Node}, Acc) when is_atom(Node) ->
@ -379,9 +381,18 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions %% Internal functions
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
insert_subscriber(Group, Topic, Subscriber) ->
Subscribers = subscribers(Topic),
case lists:member(Subscriber, Subscribers) of
false ->
ets:insert(?SUBSCRIBER, {Topic, shared(Group, Subscriber)});
_ ->
ok
end.
do_subscribe(Group, Topic, Subscriber, SubOpts) -> do_subscribe(Group, Topic, Subscriber, SubOpts) ->
ets:insert(?SUBSCRIPTION, {Subscriber, shared(Group, Topic)}), ets:insert(?SUBSCRIPTION, {Subscriber, shared(Group, Topic)}),
ets:insert(?SUBSCRIBER, {Topic, shared(Group, Subscriber)}), insert_subscriber(Group, Topic, Subscriber),
ets:insert(?SUBOPTION, {{Topic, Subscriber}, SubOpts}). ets:insert(?SUBOPTION, {{Topic, Subscriber}, SubOpts}).
do_unsubscribe(Group, Topic, Subscriber) -> do_unsubscribe(Group, Topic, Subscriber) ->
@ -390,20 +401,21 @@ do_unsubscribe(Group, Topic, Subscriber) ->
ets:delete(?SUBOPTION, {Topic, Subscriber}). ets:delete(?SUBOPTION, {Topic, Subscriber}).
subscriber_down(Subscriber) -> subscriber_down(Subscriber) ->
Topics = lists:map(fun({_, {share, _, Topic}}) -> Topics = lists:map(fun({_, {share, Group, Topic}}) ->
Topic; {Topic, Group};
({_, Topic}) -> ({_, Topic}) ->
Topic {Topic, undefined}
end, ets:lookup(?SUBSCRIPTION, Subscriber)), end, ets:lookup(?SUBSCRIPTION, Subscriber)),
lists:foreach(fun(Topic) -> lists:foreach(fun({Topic, undefined}) ->
case ets:lookup(?SUBOPTION, {Topic, Subscriber}) of true = do_unsubscribe(undefined, Topic, Subscriber),
[{_, SubOpts}] -> ets:member(?SUBSCRIBER, Topic) orelse emqx_router:del_route(Topic, dest(undefined));
Group = maps:get(share, SubOpts, undefined), ({Topic, Group}) ->
true = do_unsubscribe(Group, Topic, Subscriber), true = do_unsubscribe(Group, Topic, Subscriber),
ets:member(?SUBSCRIBER, Topic) Groups = groups(Topic),
orelse emqx_router:del_route(Topic, dest(Group)); case lists:member(Group, lists:usort(Groups)) of
[] -> ok true -> ok;
end false -> emqx_router:del_route(Topic, dest(Group))
end
end, Topics). end, Topics).
monitor_subscriber({SubPid, SubId}, State = #state{submap = SubMap, submon = SubMon}) -> monitor_subscriber({SubPid, SubId}, State = #state{submap = SubMap, submon = SubMon}) ->
@ -421,3 +433,9 @@ dest(Group) -> {Group, node()}.
shared(undefined, Name) -> Name; shared(undefined, Name) -> Name;
shared(Group, Name) -> {share, Group, Name}. shared(Group, Name) -> {share, Group, Name}.
groups(Topic) ->
lists:foldl(fun({_, {share, Group, _}}, Acc) ->
[Group | Acc];
({_, _}, Acc) ->
Acc
end, [], ets:lookup(?SUBSCRIBER, Topic)).

View File

@ -373,12 +373,22 @@ init([Options]) ->
{_ver, undefined} -> random_client_id(); {_ver, undefined} -> random_client_id();
{_ver, Id} -> iolist_to_binary(Id) {_ver, Id} -> iolist_to_binary(Id)
end, end,
Username = case proplists:get_value(username, Options) of
undefined -> <<>>;
Name -> Name
end,
Password = case proplists:get_value(password, Options) of
undefined -> <<>>;
Passw -> Passw
end,
State = init(Options, #state{host = {127,0,0,1}, State = init(Options, #state{host = {127,0,0,1},
port = 1883, port = 1883,
hosts = [], hosts = [],
sock_opts = [], sock_opts = [],
bridge_mode = false, bridge_mode = false,
client_id = ClientId, client_id = ClientId,
username = Username,
password = Password,
clean_start = true, clean_start = true,
proto_ver = ?MQTT_PROTO_V4, proto_ver = ?MQTT_PROTO_V4,
proto_name = <<"MQTT">>, proto_name = <<"MQTT">>,
@ -542,7 +552,8 @@ mqtt_connect(State = #state{client_id = ClientId,
properties = Properties}) -> properties = Properties}) ->
?WILL_MSG(WillQoS, WillRetain, WillTopic, WillProps, WillPayload) = WillMsg, ?WILL_MSG(WillQoS, WillRetain, WillTopic, WillProps, WillPayload) = WillMsg,
ConnProps = emqx_mqtt_properties:filter(?CONNECT, Properties), ConnProps = emqx_mqtt_properties:filter(?CONNECT, Properties),
io:format("ConnProps: ~p~n", [ConnProps]), io:format("ConnProps: ~p, ClientID: ~p, Username: ~p, Password: ~p~n",
[ConnProps, ClientId, Username, Password]),
send(?CONNECT_PACKET( send(?CONNECT_PACKET(
#mqtt_packet_connect{proto_ver = ProtoVer, #mqtt_packet_connect{proto_ver = ProtoVer,
proto_name = ProtoName, proto_name = ProtoName,
@ -1082,4 +1093,3 @@ next_packet_id(State = #state{last_packet_id = 16#ffff}) ->
next_packet_id(State = #state{last_packet_id = Id}) -> next_packet_id(State = #state{last_packet_id = Id}) ->
State#state{last_packet_id = Id + 1}. State#state{last_packet_id = Id + 1}.

View File

@ -62,7 +62,7 @@ do_check_pub(Props = #{qos := QoS}, [{max_qos_allowed, MaxQoS}|Caps]) ->
end; end;
do_check_pub(#{retain := true}, [{mqtt_retain_available, false}|_Caps]) -> do_check_pub(#{retain := true}, [{mqtt_retain_available, false}|_Caps]) ->
{error, ?RC_RETAIN_NOT_SUPPORTED}; {error, ?RC_RETAIN_NOT_SUPPORTED};
do_check_pub(Props, [{mqtt_retain_available, true}|Caps]) -> do_check_pub(Props, [{mqtt_retain_available, _}|Caps]) ->
do_check_pub(Props, Caps). do_check_pub(Props, Caps).
-spec(check_sub(zone(), mqtt_topic_filters()) -> {ok | error, mqtt_topic_filters()}). -spec(check_sub(zone(), mqtt_topic_filters()) -> {ok | error, mqtt_topic_filters()}).

View File

@ -476,6 +476,9 @@ send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun
ok -> ok ->
emqx_metrics:sent(Packet), emqx_metrics:sent(Packet),
{ok, inc_stats(send, Type, PState)}; {ok, inc_stats(send, Type, PState)};
{binary, _Data} ->
emqx_metrics:sent(Packet),
{ok, inc_stats(send, Type, PState)};
{error, Reason} -> {error, Reason} ->
{error, Reason} {error, Reason}
end. end.

View File

@ -167,6 +167,13 @@ handle_cast({del_route, From, Route}, State) ->
_ = gen_server:reply(From, ok), _ = gen_server:reply(From, ok),
{noreply, NewState}; {noreply, NewState};
handle_cast({del_route, Route = #route{topic = Topic, dest = Dest}}, State) when is_tuple(Dest) ->
{noreply, case emqx_topic:wildcard(Topic) of
true -> log(trans(fun del_trie_route/1, [Route])),
State;
false -> del_direct_route(Route, State)
end};
handle_cast({del_route, Route = #route{topic = Topic}}, State) -> handle_cast({del_route, Route = #route{topic = Topic}}, State) ->
%% Confirm if there are still subscribers... %% Confirm if there are still subscribers...
{noreply, case ets:member(emqx_subscriber, Topic) of {noreply, case ets:member(emqx_subscriber, Topic) of

View File

@ -401,7 +401,7 @@ handle_call(stats, _From, State) ->
reply(stats(State), State); reply(stats(State), State);
handle_call(close, _From, State) -> handle_call(close, _From, State) ->
{stop, normal, State}; {stop, normal, ok, State};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
emqx_logger:error("[Session] unexpected call: ~p", [Req]), emqx_logger:error("[Session] unexpected call: ~p", [Req]),

View File

@ -81,7 +81,7 @@ record(Group, Topic, SubPid) ->
#emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}. #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
%% TODO: dispatch strategy, ensure the delivery... %% TODO: dispatch strategy, ensure the delivery...
dispatch({Group, _Node}, Topic, Delivery = #delivery{message = Msg, flows = Flows}) -> dispatch(Group, Topic, Delivery = #delivery{message = Msg, flows = Flows}) ->
case pick(subscribers(Group, Topic)) of case pick(subscribers(Group, Topic)) of
false -> Delivery; false -> Delivery;
SubPid -> SubPid ! {dispatch, Topic, Msg}, SubPid -> SubPid ! {dispatch, Topic, Msg},
@ -93,8 +93,7 @@ pick([]) ->
pick([SubPid]) -> pick([SubPid]) ->
SubPid; SubPid;
pick(SubPids) -> pick(SubPids) ->
X = abs(erlang:monotonic_time() bxor erlang:unique_integer()), lists:nth(rand:uniform(length(SubPids)), SubPids).
lists:nth((X rem length(SubPids)) + 1, SubPids).
subscribers(Group, Topic) -> subscribers(Group, Topic) ->
ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]). ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]).

View File

@ -57,7 +57,9 @@
'subscribers/count', 'subscribers/count',
'subscribers/max', 'subscribers/max',
'subscriptions/count', 'subscriptions/count',
'subscriptions/max' 'subscriptions/max',
'subscriptions/shared/count',
'subscriptions/shared/max'
]). ]).
-define(ROUTE_STATS, [ -define(ROUTE_STATS, [

View File

@ -14,7 +14,7 @@
-module(emqx_time). -module(emqx_time).
-export([seed/0, now_secs/0, now_ms/0, now_ms/1]). -export([seed/0, now_secs/0, now_secs/1, now_ms/0, now_ms/1]).
seed() -> seed() ->
rand:seed(exsplus, erlang:timestamp()). rand:seed(exsplus, erlang:timestamp()).
@ -22,9 +22,11 @@ seed() ->
now_secs() -> now_secs() ->
erlang:system_time(second). erlang:system_time(second).
now_secs({MegaSecs, Secs, _MicroSecs}) ->
MegaSecs * 1000000 + Secs.
now_ms() -> now_ms() ->
erlang:system_time(millisecond). erlang:system_time(millisecond).
now_ms({MegaSecs, Secs, MicroSecs}) -> now_ms({MegaSecs, Secs, MicroSecs}) ->
(MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs/1000). (MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs/1000).

View File

@ -185,7 +185,7 @@ parse(Topic = <<"$queue/", _/binary>>, #{share := _Group}) ->
parse(Topic = <<"$share/", _/binary>>, #{share := _Group}) -> parse(Topic = <<"$share/", _/binary>>, #{share := _Group}) ->
error({invalid_topic, Topic}); error({invalid_topic, Topic});
parse(<<"$queue/", Topic1/binary>>, Options) -> parse(<<"$queue/", Topic1/binary>>, Options) ->
parse(Topic1, maps:put(share, '$queue', Options)); parse(Topic1, maps:put(share, <<"$queue">>, Options));
parse(<<"$share/", Topic1/binary>>, Options) -> parse(<<"$share/", Topic1/binary>>, Options) ->
[Group, Topic2] = binary:split(Topic1, <<"/">>), [Group, Topic2] = binary:split(Topic1, <<"/">>),
{Topic2, maps:put(share, Group, Options)}; {Topic2, maps:put(share, Group, Options)};

View File

@ -87,14 +87,11 @@ init(Req, Opts) ->
case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of case cowboy_req:parse_header(<<"sec-websocket-protocol">>, Req) of
undefined -> undefined ->
{cowboy_websocket, Req, #state{}}; {cowboy_websocket, Req, #state{}};
Subprotocols -> [<<"mqtt", Vsn/binary>>] ->
case lists:member(<<"mqtt">>, Subprotocols) of Resp = cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, <<"mqtt", Vsn/binary>>, Req),
true -> {cowboy_websocket, Resp, #state{request = Req, options = Opts}, #{idle_timeout => 86400000}};
Resp = cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, <<"mqtt">>, Req), _ ->
{cowboy_websocket, Resp, #state{request = Req, options = Opts}, #{idle_timeout => 86400000}}; {ok, cowboy_req:reply(400, Req), #state{}}
false ->
{ok, cowboy_req:reply(400, Req), #state{}}
end
end. end.
websocket_init(#state{request = Req, options = Options}) -> websocket_init(#state{request = Req, options = Options}) ->
@ -130,9 +127,9 @@ stat_fun() ->
fun() -> {ok, get(recv_oct)} end. fun() -> {ok, get(recv_oct)} end.
websocket_handle({binary, <<>>}, State) -> websocket_handle({binary, <<>>}, State) ->
{ok, State}; {ok, ensure_stats_timer(State)};
websocket_handle({binary, [<<>>]}, State) -> websocket_handle({binary, [<<>>]}, State) ->
{ok, State}; {ok, ensure_stats_timer(State)};
websocket_handle({binary, Data}, State = #state{parser_state = ParserState, websocket_handle({binary, Data}, State = #state{parser_state = ParserState,
proto_state = ProtoState}) -> proto_state = ProtoState}) ->
BinSize = iolist_size(Data), BinSize = iolist_size(Data),
@ -199,7 +196,7 @@ websocket_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
websocket_info(emit_stats, State = #state{proto_state = ProtoState}) -> websocket_info(emit_stats, State = #state{proto_state = ProtoState}) ->
Stats = lists:append([wsock_stats(), emqx_misc:proc_stats(), Stats = lists:append([wsock_stats(), emqx_misc:proc_stats(),
emqx_protocol:stats(ProtoState)]), emqx_protocol:stats(ProtoState)]),
emqx_cm:set_conn_stats(emqx_protocol:clientid(ProtoState), Stats), emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), Stats),
{ok, State#state{stats_timer = undefined}, hibernate}; {ok, State#state{stats_timer = undefined}, hibernate};
websocket_info({keepalive, start, Interval}, State) -> websocket_info({keepalive, start, Interval}, State) ->
@ -239,7 +236,7 @@ websocket_info(Info, State) ->
{ok, State}. {ok, State}.
terminate(SockError, _Req, #state{keepalive = Keepalive, terminate(SockError, _Req, #state{keepalive = Keepalive,
proto_state = ProtoState, proto_state = _ProtoState,
shutdown_reason = Reason}) -> shutdown_reason = Reason}) ->
emqx_keepalive:cancel(Keepalive), emqx_keepalive:cancel(Keepalive),
io:format("Websocket shutdown for ~p, sockerror: ~p~n", [Reason, SockError]), io:format("Websocket shutdown for ~p, sockerror: ~p~n", [Reason, SockError]),

View File

@ -1,5 +1,4 @@
%%-------------------------------------------------------------------- %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -12,7 +11,6 @@
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_SUITE). -module(emqx_SUITE).
@ -38,10 +36,11 @@ all() ->
groups() -> groups() ->
[{connect, [non_parallel_tests], [{connect, [non_parallel_tests],
[mqtt_connect, [mqtt_connect,
mqtt_connect_with_tcp, % mqtt_connect_with_tcp,
mqtt_connect_with_ssl_oneway, mqtt_connect_with_ssl_oneway,
mqtt_connect_with_ssl_twoway, mqtt_connect_with_ssl_twoway%,
mqtt_connect_with_ws]}, % mqtt_connect_with_ws
]},
{cleanSession, [sequence], {cleanSession, [sequence],
[cleanSession_validate] [cleanSession_validate]
} }
@ -72,15 +71,16 @@ connect_broker_(Packet, RecvSize) ->
gen_tcp:close(Sock), gen_tcp:close(Sock),
Data. Data.
mqtt_connect_with_tcp(_) ->
%% Issue #599 %% mqtt_connect_with_tcp(_) ->
%% Empty clientId and clean_session = false %% %% Issue #599
{ok, Sock} = gen_tcp:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}]), %% %% Empty clientId and clean_session = false
Packet = raw_send_serialise(?CLIENT), %% {ok, Sock} = gen_tcp:connect({127,0,0,1}, 1883, [binary, {packet, raw}, {active, false}]),
gen_tcp:send(Sock, Packet), %% Packet = raw_send_serialise(?CLIENT),
{ok, Data} = gen_tcp:recv(Sock, 0), %% gen_tcp:send(Sock, Packet),
{ok, ?CONNACK_PACKET(0), _} = raw_recv_pase(Data), %% {ok, Data} = gen_tcp:recv(Sock, 0),
gen_tcp:close(Sock). %% % {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(Data),
%% gen_tcp:close(Sock).
mqtt_connect_with_ssl_oneway(_) -> mqtt_connect_with_ssl_oneway(_) ->
emqx:stop(), emqx:stop(),
@ -127,15 +127,16 @@ mqtt_connect_with_ssl_twoway(_Config) ->
emqttc:disconnect(SslTwoWay), emqttc:disconnect(SslTwoWay),
emqttc:disconnect(Sub). emqttc:disconnect(Sub).
mqtt_connect_with_ws(_Config) ->
WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()), %% mqtt_connect_with_ws(_Config) ->
{ok, _} = rfc6455_client:open(WS), %% WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()),
Packet = raw_send_serialise(?CLIENT), %% {ok, _} = rfc6455_client:open(WS),
ok = rfc6455_client:send_binary(WS, Packet), %% Packet = raw_send_serialise(?CLIENT),
{binary, P} = rfc6455_client:recv(WS), %% ok = rfc6455_client:send_binary(WS, Packet),
{ok, ?CONNACK_PACKET(0), _} = raw_recv_pase(P), %% {binary, P} = rfc6455_client:recv(WS),
{close, _} = rfc6455_client:close(WS), %% % {ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(P),
ok. %% {close, _} = rfc6455_client:close(WS),
%% ok.
cleanSession_validate(_) -> cleanSession_validate(_) ->
{ok, C1} = emqttc:start_link([{host, "localhost"}, {ok, C1} = emqttc:start_link([{host, "localhost"},

View File

@ -1,5 +1,4 @@
%%-------------------------------------------------------------------- %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -12,7 +11,6 @@
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_access_SUITE). -module(emqx_access_SUITE).

View File

@ -1,5 +1,4 @@
%%-------------------------------------------------------------------- %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -12,7 +11,6 @@
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_acl_test_mod). -module(emqx_acl_test_mod).

View File

@ -1,5 +1,4 @@
%%-------------------------------------------------------------------- %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -12,7 +11,6 @@
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_auth_anonymous_test_mod). -module(emqx_auth_anonymous_test_mod).

View File

@ -1,5 +1,4 @@
%%-------------------------------------------------------------------- %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -12,7 +11,6 @@
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_auth_dashboard). -module(emqx_auth_dashboard).

View File

@ -1,39 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_base62_SUITE).
-include_lib("eunit/include/eunit.hrl").
-define(BASE62, emqx_base62).
-compile(export_all).
-compile(nowarn_export_all).
all() -> [t_base62_encode].
t_base62_encode(_) ->
<<"10">> = ?BASE62:decode(?BASE62:encode(<<"10">>)),
<<"100">> = ?BASE62:decode(?BASE62:encode(<<"100">>)),
<<"9999">> = ?BASE62:decode(?BASE62:encode(<<"9999">>)),
<<"65535">> = ?BASE62:decode(?BASE62:encode(<<"65535">>)),
<<X:128/unsigned-big-integer>> = emqx_guid:gen(),
<<Y:128/unsigned-big-integer>> = emqx_guid:gen(),
X = ?BASE62:decode(?BASE62:encode(X), integer),
Y = ?BASE62:decode(?BASE62:encode(Y), integer),
<<"helloworld">> = ?BASE62:decode(?BASE62:encode("helloworld")),
"helloworld" = ?BASE62:decode(?BASE62:encode("helloworld", string), string).

View File

@ -1,5 +1,4 @@
%%-------------------------------------------------------------------- %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2017 EMQ Enterprise, Inc. (http://emqtt.io)
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -12,7 +11,7 @@
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_broker_SUITE). -module(emqx_broker_SUITE).
-compile(export_all). -compile(export_all).
@ -28,8 +27,7 @@
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
all() -> all() ->
[ [{group, pubsub},
{group, pubsub},
{group, session}, {group, session},
{group, broker}, {group, broker},
{group, metrics}, {group, metrics},
@ -56,7 +54,7 @@ init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(), emqx_ct_broker_helpers:run_setup_steps(),
Config. Config.
end_per_suite(Config) -> end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps(). emqx_ct_broker_helpers:run_teardown_steps().
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -64,12 +62,12 @@ end_per_suite(Config) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
subscribe_unsubscribe(_) -> subscribe_unsubscribe(_) ->
ok = emqx:subscribe(<<"topic">>, <<"clientId">>), ok = emqx:subscribe(<<"topic">>, "clientId"),
ok = emqx:subscribe(<<"topic/1">>, <<"clientId">>, [{qos, 1}]), ok = emqx:subscribe(<<"topic/1">>, "clientId", #{ qos => 1 }),
ok = emqx:subscribe(<<"topic/2">>, <<"clientId">>, [{qos, 2}]), ok = emqx:subscribe(<<"topic/2">>, "clientId", #{ qos => 2 }),
ok = emqx:unsubscribe(<<"topic">>, <<"clientId">>), ok = emqx:unsubscribe(<<"topic">>, "clientId"),
ok = emqx:unsubscribe(<<"topic/1">>, <<"clientId">>), ok = emqx:unsubscribe(<<"topic/1">>, "clientId"),
ok = emqx:unsubscribe(<<"topic/2">>, <<"clientId">>). ok = emqx:unsubscribe(<<"topic/2">>, "clientId").
publish(_) -> publish(_) ->
Msg = emqx_message:make(ct, <<"test/pubsub">>, <<"hello">>), Msg = emqx_message:make(ct, <<"test/pubsub">>, <<"hello">>),
@ -80,13 +78,17 @@ publish(_) ->
pubsub(_) -> pubsub(_) ->
Self = self(), Self = self(),
ok = emqx:subscribe(<<"a/b/c">>, Self, [{qos, 1}]), Subscriber = {Self, <<"clientId">>},
?assertMatch({error, _}, emqx:subscribe(<<"a/b/c">>, Self, [{qos, 2}])), ok = emqx:subscribe(<<"a/b/c">>, Subscriber, #{ qos => 1 }),
#{ qos := 1} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2),
ok = emqx:subscribe(<<"a/b/c">>, Subscriber, #{ qos => 2 }),
#{ qos := 2} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2),
%% ct:log("Emq Sub: ~p.~n", [ets:lookup(emqx_suboption, {<<"a/b/c">>, Subscriber})]),
timer:sleep(10), timer:sleep(10),
[{Self, <<"a/b/c">>}] = ets:lookup(mqtt_subscription, Self), [{<<"a/b/c">>, #{qos := 2}}] = emqx_broker:subscriptions(Subscriber),
[{<<"a/b/c">>, Self}] = ets:lookup(mqtt_subscriber, <<"a/b/c">>), [{Self, <<"clientId">>}] = emqx_broker:subscribers(<<"a/b/c">>),
emqx:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)), emqx:publish(emqx_message:make(ct, <<"a/b/c">>, <<"hello">>)),
?assert(receive {dispatch, <<"a/b/c">>, _} -> true after 2 -> false end), ?assert(receive {dispatch, <<"a/b/c">>, _ } -> true; P -> ct:log("Receive Message: ~p~n",[P]) after 2 -> false end),
spawn(fun() -> spawn(fun() ->
emqx:subscribe(<<"a/b/c">>), emqx:subscribe(<<"a/b/c">>),
emqx:subscribe(<<"c/d/e">>), emqx:subscribe(<<"c/d/e">>),
@ -97,32 +99,33 @@ pubsub(_) ->
emqx:unsubscribe(<<"a/b/c">>). emqx:unsubscribe(<<"a/b/c">>).
t_local_subscribe(_) -> t_local_subscribe(_) ->
ok = emqx:subscribe("$local/topic0"), ok = emqx:subscribe(<<"$local/topic0">>),
ok = emqx:subscribe("$local/topic1", <<"x">>), ok = emqx:subscribe(<<"$local/topic1">>, "clientId"),
ok = emqx:subscribe("$local/topic2", <<"x">>, [{qos, 2}]), ok = emqx:subscribe(<<"$local/topic2">>, "clientId", #{ qos => 2 }),
timer:sleep(10), timer:sleep(10),
?assertEqual([self()], emqx:subscribers("$local/topic0")), ?assertEqual([{self(), undefined}], emqx:subscribers("$local/topic0")),
?assertEqual([{<<"x">>, self()}], emqx:subscribers("$local/topic1")), ?assertEqual([{self(), <<"clientId">>}], emqx:subscribers("$local/topic1")),
?assertEqual([{{<<"x">>, self()}, <<"$local/topic1">>, []}, ?assertEqual([{<<"$local/topic1">>, #{}},
{{<<"x">>, self()}, <<"$local/topic2">>, [{qos,2}]}], {<<"$local/topic2">>, #{ qos => 2 }}],
emqx:subscriptions(<<"x">>)), emqx:subscriptions({self(), <<"clientId">>})),
?assertEqual(ok, emqx:unsubscribe("$local/topic0")), ?assertEqual(ok, emqx:unsubscribe("$local/topic0")),
?assertMatch({error, {subscription_not_found, _}}, emqx:unsubscribe("$local/topic0")), ?assertEqual(ok, emqx:unsubscribe("$local/topic0")),
?assertEqual(ok, emqx:unsubscribe("$local/topic1", <<"x">>)), ?assertEqual(ok, emqx:unsubscribe("$local/topic1", "clientId")),
?assertEqual(ok, emqx:unsubscribe("$local/topic2", <<"x">>)), ?assertEqual(ok, emqx:unsubscribe("$local/topic2", "clientId")),
?assertEqual([], emqx:subscribers("topic1")), ?assertEqual([], emqx:subscribers("topic1")),
?assertEqual([], emqx:subscriptions(<<"x">>)). ?assertEqual([], emqx:subscriptions({self(), <<"clientId">>})).
t_shared_subscribe(_) -> t_shared_subscribe(_) ->
emqx:subscribe("$local/$share/group1/topic1"), emqx:subscribe("$local/$share/group1/topic1"),
emqx:subscribe("$share/group2/topic2"), emqx:subscribe("$share/group2/topic2"),
emqx:subscribe("$queue/topic3"), emqx:subscribe("$queue/topic3"),
timer:sleep(10), timer:sleep(10),
?assertEqual([self()], emqx:subscribers(<<"$local/$share/group1/topic1">>)), ct:log("share subscriptions: ~p~n", [emqx:subscriptions({self(), undefined})]),
?assertEqual([{self(), <<"$local/$share/group1/topic1">>, []}, ?assertEqual([{self(), undefined}], emqx:subscribers(<<"$local/$share/group1/topic1">>)),
{self(), <<"$queue/topic3">>, []}, ?assertEqual([{<<"$local/$share/group1/topic1">>, #{}},
{self(), <<"$share/group2/topic2">>, []}], {<<"$queue/topic3">>, #{}},
lists:sort(emqx:subscriptions(self()))), {<<"$share/group2/topic2">>, #{}}],
lists:sort(emqx:subscriptions({self(), undefined}))),
emqx:unsubscribe("$local/$share/group1/topic1"), emqx:unsubscribe("$local/$share/group1/topic1"),
emqx:unsubscribe("$share/group2/topic2"), emqx:unsubscribe("$share/group2/topic2"),
emqx:unsubscribe("$queue/topic3"), emqx:unsubscribe("$queue/topic3"),
@ -146,17 +149,18 @@ t_shared_subscribe(_) ->
%% Session Group %% Session Group
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
start_session(_) -> start_session(_) ->
{ok, ClientPid} = emqx_mock_client:start_link(<<"clientId">>), ClientId = <<"clientId">>,
{ok, SessPid} = emqx_mock_client:start_session(ClientPid), {ok, ClientPid} = emqx_mock_client:start_link(ClientId),
Message = emqx_message:make(<<"clientId">>, 2, <<"topic">>, <<"hello">>), {ok, SessPid} = emqx_mock_client:open_session(ClientPid, ClientId, internal),
Message1 = Message#message{id = 1}, Message1 = emqx_message:make(<<"clientId">>, 2, <<"topic">>, <<"hello">>),
emqx_session:publish(SessPid, Message1), emqx_session:publish(SessPid, 1, Message1),
emqx_session:pubrel(SessPid, 1), emqx_session:pubrel(SessPid, 2, reasoncode),
emqx_session:subscribe(SessPid, [{<<"topic/session">>, [{qos, 2}]}]), emqx_session:subscribe(SessPid, [{<<"topic/session">>, #{qos => 2}}]),
Message2 = emqx_message:make(<<"clientId">>, 1, <<"topic/session">>, <<"test">>), Message2 = emqx_message:make(<<"clientId">>, 1, <<"topic/session">>, <<"test">>),
emqx_session:publish(SessPid, Message2), emqx_session:publish(SessPid, 3, Message2),
emqx_session:unsubscribe(SessPid, [{<<"topic/session">>, []}]), emqx_session:unsubscribe(SessPid, [{<<"topic/session">>, []}]),
emqx_mock_client:stop(ClientPid). %% emqx_mock_client:stop(ClientPid).
emqx_mock_client:close_session(ClientPid, SessPid).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Broker Group %% Broker Group
@ -231,10 +235,10 @@ hook_fun8(arg, initArg) -> stop.
set_alarms(_) -> set_alarms(_) ->
AlarmTest = #alarm{id = <<"1">>, severity = error, title="alarm title", summary="alarm summary"}, AlarmTest = #alarm{id = <<"1">>, severity = error, title="alarm title", summary="alarm summary"},
emqx_alarm:set_alarm(AlarmTest), emqx_alarm_mgr:set_alarm(AlarmTest),
Alarms = emqx_alarm:get_alarms(), Alarms = emqx_alarm_mgr:get_alarms(),
ct:log("Alarms Length: ~p ~n", [length(Alarms)]),
?assertEqual(1, length(Alarms)), ?assertEqual(1, length(Alarms)),
emqx_alarm:clear_alarm(<<"1">>), emqx_alarm_mgr:clear_alarm(<<"1">>),
[] = emqx_alarm:get_alarms(). [] = emqx_alarm_mgr:get_alarms().

View File

@ -1,18 +1,16 @@
%%%=================================================================== %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. %%
%%% %% Licensed under the Apache License, Version 2.0 (the "License");
%%% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License.
%%% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at
%%% You may obtain a copy of the License at %%
%%% %% http://www.apache.org/licenses/LICENSE-2.0
%%% http://www.apache.org/licenses/LICENSE-2.0 %%
%%% %% Unless required by applicable law or agreed to in writing, software
%%% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS,
%%% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and
%%% See the License for the specific language governing permissions and %% limitations under the License.
%%% limitations under the License.
%%%===================================================================
-module(emqx_client_SUITE). -module(emqx_client_SUITE).

37
test/emqx_cm_SUITE.erl Normal file
View File

@ -0,0 +1,37 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_cm_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
all() -> [t_register_unregister_connection].
t_register_unregister_connection(_) ->
{ok, _} = emqx_cm_sup:start_link(),
Pid = self(),
emqx_cm:register_connection(<<"conn1">>),
emqx_cm:register_connection({<<"conn2">>, Pid}, [{port, 8080}, {ip, "192.168.0.1"}]),
timer:sleep(2000),
[{<<"conn1">>, Pid}] = emqx_cm:lookup_connection(<<"conn1">>),
[{<<"conn2">>, Pid}] = emqx_cm:lookup_connection(<<"conn2">>),
Pid = emqx_cm:lookup_conn_pid(<<"conn1">>),
emqx_cm:unregister_connection(<<"conn1">>),
[] = emqx_cm:lookup_connection(<<"conn1">>),
[{port, 8080}, {ip, "192.168.0.1"}] = emqx_cm:get_conn_attrs({<<"conn2">>, Pid}),
emqx_cm:set_conn_stats(<<"conn2">>, [[{count, 1}, {max, 2}]]),
[[{count, 1}, {max, 2}]] = emqx_cm:get_conn_stats({<<"conn2">>, Pid}).

View File

@ -1,18 +1,16 @@
%%%=================================================================== %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. %%
%%% %% Licensed under the Apache License, Version 2.0 (the "License");
%%% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License.
%%% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at
%%% You may obtain a copy of the License at %%
%%% %% http://www.apache.org/licenses/LICENSE-2.0
%%% http://www.apache.org/licenses/LICENSE-2.0 %%
%%% %% Unless required by applicable law or agreed to in writing, software
%%% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS,
%%% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and
%%% See the License for the specific language governing permissions and %% limitations under the License.
%%% limitations under the License.
%%%===================================================================
-module(emqx_ct_broker_helpers). -module(emqx_ct_broker_helpers).

View File

@ -1,18 +1,16 @@
%%%=================================================================== %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. %%
%%% %% Licensed under the Apache License, Version 2.0 (the "License");
%%% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License.
%%% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at
%%% You may obtain a copy of the License at %%
%%% %% http://www.apache.org/licenses/LICENSE-2.0
%%% http://www.apache.org/licenses/LICENSE-2.0 %%
%%% %% Unless required by applicable law or agreed to in writing, software
%%% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS,
%%% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and
%%% See the License for the specific language governing permissions and %% limitations under the License.
%%% limitations under the License.
%%%===================================================================
-module(emqx_ct_helpers). -module(emqx_ct_helpers).

View File

@ -1,18 +1,16 @@
%%%=================================================================== %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. %%
%%% %% Licensed under the Apache License, Version 2.0 (the "License");
%%% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License.
%%% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at
%%% You may obtain a copy of the License at %%
%%% %% http://www.apache.org/licenses/LICENSE-2.0
%%% http://www.apache.org/licenses/LICENSE-2.0 %%
%%% %% Unless required by applicable law or agreed to in writing, software
%%% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS,
%%% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and
%%% See the License for the specific language governing permissions and %% limitations under the License.
%%% limitations under the License.
%%%===================================================================
-module(emqx_frame_SUITE). -module(emqx_frame_SUITE).
@ -331,15 +329,17 @@ serialize_parse_pubcomp_v5(_) ->
serialize_parse_subscribe(_) -> serialize_parse_subscribe(_) ->
%% SUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[{<<"TopicA">>,2}]) %% SUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[{<<"TopicA">>,2}])
Bin = <<130,11,0,2,0,6,84,111,112,105,99,65,2>>, Bin = <<130,11,0,2,0,6,84,111,112,105,99,65,2>>,
TopicFilters = [{<<"TopicA">>, #mqtt_subopts{qos = 2}}], TopicOpts = #{ nl => 0 , rap => 0, rc => 0,
rh => 0, subid => 0 , qos => 2 },
TopicFilters = [{<<"TopicA">>, TopicOpts}],
Packet = ?SUBSCRIBE_PACKET(2, TopicFilters), Packet = ?SUBSCRIBE_PACKET(2, TopicFilters),
?assertEqual(Bin, iolist_to_binary(serialize(Packet))), ?assertEqual(Bin, iolist_to_binary(serialize(Packet))),
?assertEqual({ok, Packet, <<>>}, parse(Bin)). ?assertEqual({ok, Packet, <<>>}, parse(Bin)).
serialize_parse_subscribe_v5(_) -> serialize_parse_subscribe_v5(_) ->
TopicFilters = [{<<"TopicQos0">>, #mqtt_subopts{rh = 1, qos = ?QOS_0}}, TopicFilters = [{<<"TopicQos0">>, #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0, subid => 0}},
{<<"TopicQos1">>, #mqtt_subopts{rh = 1, qos =?QOS_1}}], {<<"TopicQos1">>, #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0, subid => 0}}],
Packet = ?SUBSCRIBE_PACKET(1, #{'Subscription-Identifier' => 16#FFFFFFF}, Packet = ?SUBSCRIBE_PACKET(3, #{'Subscription-Identifier' => 16#FFFFFFF},
TopicFilters), TopicFilters),
?assertEqual({ok, Packet, <<>>}, ?assertEqual({ok, Packet, <<>>},
parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})). parse_serialize(Packet, #{version => ?MQTT_PROTO_V5})).

View File

@ -1,5 +1,4 @@
%%-------------------------------------------------------------------- %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -12,7 +11,6 @@
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_guid_SUITE). -module(emqx_guid_SUITE).

View File

@ -1,62 +1,41 @@
%%%=================================================================== %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. %%
%%% %% Licensed under the Apache License, Version 2.0 (the "License");
%%% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License.
%%% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at
%%% You may obtain a copy of the License at %%
%%% %% http://www.apache.org/licenses/LICENSE-2.0
%%% http://www.apache.org/licenses/LICENSE-2.0 %%
%%% %% Unless required by applicable law or agreed to in writing, software
%%% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS,
%%% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and
%%% See the License for the specific language governing permissions and %% limitations under the License.
%%% limitations under the License.
%%%===================================================================
-module(emqx_inflight_SUITE). -module(emqx_inflight_SUITE).
-include_lib("eunit/include/eunit.hrl").
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
-import(emqx_inflight, [new/1, contain/2, insert/3, lookup/2, update/3, all() -> [t_inflight_all].
delete/2, is_empty/1, is_full/1]).
all() ->
[t_contain, t_lookup, t_insert, t_update, t_delete, t_window,
t_is_full, t_is_empty].
t_contain(_) ->
?assertNot(contain(k, new(0))),
?assert(contain(k, insert(k, v, new(0)))).
t_lookup(_) ->
Inflight = insert(k, v, new(0)),
?assertEqual({value, v}, lookup(k, Inflight)),
?assertEqual(none, lookup(x, Inflight)).
t_insert(_) ->
Inflight = insert(k2, v2, insert(k1, v1, new(0))),
?assertEqual({value, v1}, lookup(k1, Inflight)),
?assertEqual({value, v2}, lookup(k2, Inflight)).
t_update(_) ->
Inflight = update(k, v2, insert(k, v1, new(0))),
?assertEqual({value, v2}, lookup(k, Inflight)).
t_delete(_) ->
?assert(is_empty(delete(k, insert(k, v1, new(0))))).
t_window(_) ->
?assertEqual([], emqx_inflight:window(new(10))),
Inflight = insert(2, 2, insert(1, 1, new(0))),
?assertEqual([1, 2], emqx_inflight:window(Inflight)).
t_is_full(_) ->
?assert(is_full(insert(k, v1, new(1)))).
t_is_empty(_) ->
?assertNot(is_empty(insert(k, v1, new(1)))).
t_inflight_all(_) ->
Empty = emqx_inflight:new(2),
true = emqx_inflight:is_empty(Empty),
2 = emqx_inflight:max_size(Empty),
false = emqx_inflight:contain(a, Empty),
none = emqx_inflight:lookup(a, Empty),
try emqx_inflight:update(a, 1, Empty) catch
error:Reason -> io:format("Reason: ~w~n", [Reason])
end,
0 = emqx_inflight:size(Empty),
Inflight1 = emqx_inflight:insert(a, 1, Empty),
Inflight2 = emqx_inflight:insert(b, 2, Inflight1),
2 = emqx_inflight:size(Inflight2),
true = emqx_inflight:is_full(Inflight2),
{value, 1} = emqx_inflight:lookup(a, Inflight1),
{value, 2} = emqx_inflight:lookup(a, emqx_inflight:update(a, 2, Inflight1)),
false = emqx_inflight:contain(a, emqx_inflight:delete(a, Inflight1)),
[1, 2] = emqx_inflight:values(Inflight2),
[{a, 1}, {b ,2}] = emqx_inflight:to_list(Inflight2),
[a, b] = emqx_inflight:window(Inflight2).

37
test/emqx_json_SUITE.erl Normal file
View File

@ -0,0 +1,37 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_json_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
all() -> [t_decode_encode, t_safe_decode_encode].
t_decode_encode(_) ->
JsonText = <<"{\"library\": \"jsx\", \"awesome\": true}">>,
JsonTerm = emqx_json:decode(JsonText),
JsonMaps = #{library => <<"jsx">>, awesome => true},
JsonMaps = emqx_json:decode(JsonText, [{labels, atom}, return_maps]),
JsonText = emqx_json:encode(JsonTerm, [{space, 1}]).
t_safe_decode_encode(_) ->
JsonText = <<"{\"library\": \"jsx\", \"awesome\": true}">>,
{ok, JsonTerm} = emqx_json:safe_decode(JsonText),
JsonMaps = #{library => <<"jsx">>, awesome => true},
{ok, JsonMaps} = emqx_json:safe_decode(JsonText, [{labels, atom}, return_maps]),
{ok, JsonText} = emqx_json:safe_encode(JsonTerm, [{space, 1}]),
BadJsonText = <<"{\"library\", \"awesome\": true}">>,
{error, _} = emqx_json:safe_decode(BadJsonText),
{error, _} = emqx_json:safe_encode({a, {b ,1}}).

View File

@ -1,5 +1,4 @@
%%-------------------------------------------------------------------- %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -12,7 +11,6 @@
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_keepalive_SUITE). -module(emqx_keepalive_SUITE).

View File

@ -1,5 +1,4 @@
%%-------------------------------------------------------------------- %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -12,7 +11,6 @@
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_lib_SUITE). -module(emqx_lib_SUITE).
@ -163,12 +161,13 @@ node_parse_name(_) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
base62_encode(_) -> base62_encode(_) ->
10 = ?BASE62:decode(?BASE62:encode(10)), <<"10">> = ?BASE62:decode(?BASE62:encode(<<"10">>)),
100 = ?BASE62:decode(?BASE62:encode(100)), <<"100">> = ?BASE62:decode(?BASE62:encode(<<"100">>)),
9999 = ?BASE62:decode(?BASE62:encode(9999)), <<"9999">> = ?BASE62:decode(?BASE62:encode(<<"9999">>)),
65535 = ?BASE62:decode(?BASE62:encode(65535)), <<"65535">> = ?BASE62:decode(?BASE62:encode(<<"65535">>)),
<<X:128/unsigned-big-integer>> = emqx_guid:gen(), <<X:128/unsigned-big-integer>> = emqx_guid:gen(),
<<Y:128/unsigned-big-integer>> = emqx_guid:gen(), <<Y:128/unsigned-big-integer>> = emqx_guid:gen(),
X = ?BASE62:decode(?BASE62:encode(X)), X = ?BASE62:decode(?BASE62:encode(X), integer),
Y = ?BASE62:decode(?BASE62:encode(Y)). Y = ?BASE62:decode(?BASE62:encode(Y), integer),
<<"helloworld">> = ?BASE62:decode(?BASE62:encode("helloworld")),
"helloworld" = ?BASE62:decode(?BASE62:encode("helloworld", string), string).

View File

@ -0,0 +1,39 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_metrics_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
all() -> [t_inc_dec_metrics].
t_inc_dec_metrics(_) ->
{ok, _} = emqx_metrics:start_link(),
{0, 0} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')},
emqx_metrics:inc('bytes/received'),
emqx_metrics:inc({counter, 'bytes/received'}, 2),
emqx_metrics:inc(counter, 'bytes/received', 2),
emqx_metrics:inc({gauge, 'messages/retained'}, 2),
emqx_metrics:inc(gauge, 'messages/retained', 2),
{5, 4} = {emqx_metrics:val('bytes/received'), emqx_metrics:val('messages/retained')},
emqx_metrics:dec(gauge, 'messages/retained'),
emqx_metrics:dec(gauge, 'messages/retained', 1),
2 = emqx_metrics:val('messages/retained'),
emqx_metrics:received(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT}}),
{1, 1} = {emqx_metrics:val('packets/received'), emqx_metrics:val('packets/connect')},
emqx_metrics:sent(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}}),
{1, 1} = {emqx_metrics:val('packets/sent'), emqx_metrics:val('packets/connack')}.

View File

@ -1,5 +1,4 @@
%%-------------------------------------------------------------------- %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -12,7 +11,6 @@
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_misc_SUITE). -module(emqx_misc_SUITE).

View File

@ -1,5 +1,4 @@
%%-------------------------------------------------------------------- %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -12,34 +11,54 @@
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mock_client). -module(emqx_mock_client).
-behaviour(gen_server). -behaviour(gen_server).
-export([start_link/1, start_session/1, stop/1]). -export([start_link/1, open_session/3, close_session/2, stop/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
-record(state, {clientid, session}). -record(state, {clean_start, client_id, client_pid}).
start_link(ClientId) -> start_link(ClientId) ->
gen_server:start_link(?MODULE, [ClientId], []). gen_server:start_link(?MODULE, [ClientId], []).
start_session(CPid) -> open_session(ClientPid, ClientId, Zone) ->
gen_server:call(CPid, start_session). gen_server:call(ClientPid, {start_session, ClientPid, ClientId, Zone}).
close_session(ClientPid, SessPid) ->
gen_server:call(ClientPid, {stop_session, SessPid}).
stop(CPid) -> stop(CPid) ->
gen_server:call(CPid, stop). gen_server:call(CPid, stop).
init([ClientId]) -> init([ClientId]) ->
{ok, #state{clientid = ClientId}}. {ok,
#state{clean_start = true,
client_id = ClientId}
}.
handle_call(start_session, _From, State = #state{clientid = ClientId}) -> handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) ->
{ok, SessPid, _} = emqx_sm:start_session(true, {ClientId, undefined}), Attrs = #{ zone => Zone,
{reply, {ok, SessPid}, State#state{session = SessPid}}; client_id => ClientId,
client_pid => ClientPid,
clean_start => true,
username => undefined,
conn_props => undefined
},
{ok, SessPid} = emqx_sm:open_session(Attrs),
{reply, {ok, SessPid}, State#state{
clean_start = true,
client_id = ClientId,
client_pid = ClientPid
}};
handle_call({stop_session, SessPid}, _From, State) ->
emqx_sm:close_session(SessPid),
{stop, normal, ok, State};
handle_call(stop, _From, State) -> handle_call(stop, _From, State) ->
{stop, normal, ok, State}; {stop, normal, ok, State};

View File

@ -1,5 +1,4 @@
%%-------------------------------------------------------------------- %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -12,7 +11,6 @@
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mod_SUITE). -module(emqx_mod_SUITE).

View File

@ -16,11 +16,100 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
%% CT %% CT
-compile(export_all). -compile(export_all).
-compile(nowarn_export_all). -compile(nowarn_export_all).
all() -> all() -> [t_get_set_caps, t_check_pub, t_check_sub].
[].
t_get_set_caps(_) ->
{ok, _} = emqx_zone:start_link(),
Caps = #{
max_packet_size => ?MAX_PACKET_SIZE,
max_clientid_len => ?MAX_CLIENTID_LEN,
max_topic_alias => 0,
max_topic_levels => 0,
max_qos_allowed => ?QOS_2,
mqtt_retain_available => true,
mqtt_shared_subscription => true,
mqtt_wildcard_subscription => true
},
Caps = emqx_mqtt_caps:get_caps(zone),
PubCaps = #{
max_qos_allowed => ?QOS_2,
mqtt_retain_available => true
},
PubCaps = emqx_mqtt_caps:get_caps(zone, publish),
NewPubCaps = PubCaps#{max_qos_allowed => ?QOS_1},
emqx_zone:set_env(zone, '$mqtt_pub_caps', NewPubCaps),
timer:sleep(100),
NewPubCaps = emqx_mqtt_caps:get_caps(zone, publish),
SubCaps = #{
max_topic_levels => 0,
max_qos_allowed => ?QOS_2,
mqtt_shared_subscription => true,
mqtt_wildcard_subscription => true
},
SubCaps = emqx_mqtt_caps:get_caps(zone, subscribe).
t_check_pub(_) ->
{ok, _} = emqx_zone:start_link(),
PubCaps = #{
max_qos_allowed => ?QOS_1,
mqtt_retain_available => false
},
emqx_zone:set_env(zone, '$mqtt_pub_caps', PubCaps),
timer:sleep(100),
BadPubProps1 = #{
qos => ?QOS_2,
retain => false
},
{error, ?RC_QOS_NOT_SUPPORTED} = emqx_mqtt_caps:check_pub(zone, BadPubProps1),
BadPubProps2 = #{
qos => ?QOS_1,
retain => true
},
{error, ?RC_RETAIN_NOT_SUPPORTED} = emqx_mqtt_caps:check_pub(zone, BadPubProps2),
PubProps = #{
qos => ?QOS_1,
retain => false
},
ok = emqx_mqtt_caps:check_pub(zone, PubProps).
t_check_sub(_) ->
{ok, _} = emqx_zone:start_link(),
Opts = #{qos => ?QOS_2, share => true, rc => 0},
Caps = #{
max_topic_levels => 0,
max_qos_allowed => ?QOS_2,
mqtt_shared_subscription => true,
mqtt_wildcard_subscription => true
},
ok = do_check_sub([{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts}]),
ok = do_check_sub(Caps#{max_qos_allowed => ?QOS_1}, [{<<"client/stat">>, Opts}], [{<<"client/stat">>, Opts#{qos => ?QOS_1}}]),
ok = do_check_sub(Caps#{max_topic_levels => 1},
[{<<"client/stat">>, Opts}],
[{<<"client/stat">>, Opts#{rc => ?RC_TOPIC_FILTER_INVALID}}]),
ok = do_check_sub(Caps#{mqtt_shared_subscription => false},
[{<<"client/stat">>, Opts}],
[{<<"client/stat">>, Opts#{rc => ?RC_SHARED_SUBSCRIPTIONS_NOT_SUPPORTED}}]),
ok = do_check_sub(Caps#{mqtt_wildcard_subscription => false},
[{<<"vlient/+/dsofi">>, Opts}],
[{<<"vlient/+/dsofi">>, Opts#{rc => ?RC_WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED}}]).
do_check_sub(TopicFilters, Topics) ->
{ok, Topics} = emqx_mqtt_caps:check_sub(zone, TopicFilters),
ok.
do_check_sub(Caps, TopicFilters, Topics) ->
emqx_zone:set_env(zone, '$mqtt_sub_caps', Caps),
timer:sleep(100),
{_, Topics} = emqx_mqtt_caps:check_sub(zone, TopicFilters),
ok.

View File

@ -1,18 +1,16 @@
%%%=================================================================== %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. %%
%%% %% Licensed under the Apache License, Version 2.0 (the "License");
%%% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License.
%%% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at
%%% You may obtain a copy of the License at %%
%%% %% http://www.apache.org/licenses/LICENSE-2.0
%%% http://www.apache.org/licenses/LICENSE-2.0 %%
%%% %% Unless required by applicable law or agreed to in writing, software
%%% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS,
%%% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and
%%% See the License for the specific language governing permissions and %% limitations under the License.
%%% limitations under the License.
%%%===================================================================
-module(emqx_mqtt_compat_SUITE). -module(emqx_mqtt_compat_SUITE).

View File

@ -0,0 +1,27 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_mqtt_properties_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
all() -> [t_mqtt_properties_all].
t_mqtt_properties_all(_) ->
Props = emqx_mqtt_properties:filter(?CONNECT, #{'Session-Expiry-Interval' => 1, 'Maximum-Packet-Size' => 255}),
ok = emqx_mqtt_properties:validate(Props),
#{} = emqx_mqtt_properties:filter(?CONNECT, #{'Maximum-QoS' => ?QOS_2}).

View File

@ -1,5 +1,4 @@
%%-------------------------------------------------------------------- %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -12,7 +11,6 @@
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mqueue_SUITE). -module(emqx_mqueue_SUITE).

View File

@ -1,5 +1,4 @@
%%-------------------------------------------------------------------- %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -12,7 +11,6 @@
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_net_SUITE). -module(emqx_net_SUITE).

View File

@ -1,5 +1,4 @@
%%-------------------------------------------------------------------- %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -12,7 +11,6 @@
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_pqueue_SUITE). -module(emqx_pqueue_SUITE).

View File

@ -1,18 +1,16 @@
%%%=================================================================== %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. %%
%%% %% Licensed under the Apache License, Version 2.0 (the "License");
%%% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License.
%%% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at
%%% You may obtain a copy of the License at %%
%%% %% http://www.apache.org/licenses/LICENSE-2.0
%%% http://www.apache.org/licenses/LICENSE-2.0 %%
%%% %% Unless required by applicable law or agreed to in writing, software
%%% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS,
%%% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and
%%% See the License for the specific language governing permissions and %% limitations under the License.
%%% limitations under the License.
%%%===================================================================
-module(emqx_router_SUITE). -module(emqx_router_SUITE).
@ -49,11 +47,21 @@ end_per_testcase(_TestCase, _Config) ->
add_del_route(_) -> add_del_route(_) ->
From = {self(), make_ref()}, From = {self(), make_ref()},
?R:add_route(From, <<"a/b/c">>, node()), ?R:add_route(From, <<"a/b/c">>, node()),
timer:sleep(1),
?R:add_route(From, <<"a/b/c">>, node()), ?R:add_route(From, <<"a/b/c">>, node()),
timer:sleep(1),
?R:add_route(From, <<"a/+/b">>, node()), ?R:add_route(From, <<"a/+/b">>, node()),
ct:log("Topics: ~p ~n", [emqx_topic:wildcard(<<"a/+/b">>)]),
timer:sleep(1),
?assertEqual([<<"a/+/b">>, <<"a/b/c">>], lists:sort(?R:topics())), ?assertEqual([<<"a/+/b">>, <<"a/b/c">>], lists:sort(?R:topics())),
?R:del_route(From, <<"a/b/c">>, node()), ?R:del_route(From, <<"a/b/c">>, node()),
?R:del_route(From, <<"a/+/b">>, node()), ?R:del_route(From, <<"a/+/b">>, node()),
timer:sleep(1),
?assertEqual([], lists:sort(?R:topics())). ?assertEqual([], lists:sort(?R:topics())).
match_routes(_) -> match_routes(_) ->
@ -62,6 +70,7 @@ match_routes(_) ->
?R:add_route(From, <<"a/+/c">>, node()), ?R:add_route(From, <<"a/+/c">>, node()),
?R:add_route(From, <<"a/b/#">>, node()), ?R:add_route(From, <<"a/b/#">>, node()),
?R:add_route(From, <<"#">>, node()), ?R:add_route(From, <<"#">>, node()),
timer:sleep(1000),
?assertEqual([#route{topic = <<"#">>, dest = node()}, ?assertEqual([#route{topic = <<"#">>, dest = node()},
#route{topic = <<"a/+/c">>, dest = node()}, #route{topic = <<"a/+/c">>, dest = node()},
#route{topic = <<"a/b/#">>, dest = node()}, #route{topic = <<"a/b/#">>, dest = node()},

40
test/emqx_sm_SUITE.erl Normal file
View File

@ -0,0 +1,40 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_sm_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
all() -> [t_open_close_session].
t_open_close_session(_) ->
emqx_ct_broker_helpers:run_setup_steps(),
{ok, ClientPid} = emqx_mock_client:start_link(<<"client">>),
Attrs = #{clean_start => true, client_id => <<"client">>, client_pid => ClientPid, zone => internal, username => <<"zhou">>, conn_props => ref},
{ok, _SPid} = emqx_sm:open_session(Attrs),
[{<<"client">>, SPid}] = emqx_sm:lookup_session(<<"client">>),
SPid = emqx_sm:lookup_session_pid(<<"client">>),
{ok, NewClientPid} = emqx_mock_client:start_link(<<"client">>),
{ok, SPid, true} = emqx_sm:open_session(Attrs#{clean_start => false, client_pid => NewClientPid}),
[{<<"client">>, SPid}] = emqx_sm:lookup_session(<<"client">>),
SAttrs = emqx_sm:get_session_attrs({<<"client">>, SPid}),
<<"client">> = proplists:get_value(client_id, SAttrs),
Session = {<<"client">>, SPid},
emqx_sm:set_session_stats(Session, {open, true}),
{open, true} = emqx_sm:get_session_stats(Session),
ok = emqx_sm:close_session(SPid),
[] = emqx_sm:lookup_session(<<"client">>).

55
test/emqx_stats_SUITE.erl Normal file
View File

@ -0,0 +1,55 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_stats_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("common_test/include/ct.hrl").
all() -> [t_set_get_state, t_update_interval].
t_set_get_state(_) ->
{ok, _} = emqx_stats:start_link(),
SetConnsCount = emqx_stats:statsfun('connections/count'),
SetConnsCount(1),
1 = emqx_stats:getstat('connections/count'),
emqx_stats:setstat('connections/count', 2),
2 = emqx_stats:getstat('connections/count'),
emqx_stats:setstat('connections/count', 'connections/max', 3),
timer:sleep(100),
3 = emqx_stats:getstat('connections/count'),
3 = emqx_stats:getstat('connections/max'),
emqx_stats:setstat('connections/count', 'connections/max', 2),
timer:sleep(100),
2 = emqx_stats:getstat('connections/count'),
3 = emqx_stats:getstat('connections/max'),
SetConns = emqx_stats:statsfun('connections/count', 'connections/max'),
SetConns(4),
timer:sleep(100),
4 = emqx_stats:getstat('connections/count'),
4 = emqx_stats:getstat('connections/max'),
Conns = emqx_stats:getstats(),
4 = proplists:get_value('connections/count', Conns),
4 = proplists:get_value('connections/max', Conns).
t_update_interval(_) ->
{ok, _} = emqx_stats:start_link(),
ok = emqx_stats:update_interval(cm_stats, fun update_stats/0),
timer:sleep(2500),
1 = emqx_stats:getstat('connections/count').
update_stats() ->
emqx_stats:setstat('connections/count', 1).

View File

@ -0,0 +1,26 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_tables_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
all() -> [t_new].
t_new(_) ->
TId = emqx_tables:new(test_table, [{read_concurrency, true}]),
ets:insert(TId, {loss, 100}),
TId = emqx_tables:new(test_table, [{read_concurrency, true}]),
100 = ets:lookup_element(TId, loss, 2).

View File

@ -1,5 +1,4 @@
%%-------------------------------------------------------------------- %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -12,7 +11,6 @@
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_time_SUITE). -module(emqx_time_SUITE).

View File

@ -132,20 +132,22 @@ t_validate(_) ->
true = validate({filter, <<"x">>}), true = validate({filter, <<"x">>}),
true = validate({name, <<"x//y">>}), true = validate({name, <<"x//y">>}),
true = validate({filter, <<"sport/tennis/#">>}), true = validate({filter, <<"sport/tennis/#">>}),
false = validate({name, <<>>}), catch validate({name, <<>>}),
false = validate({name, long_topic()}), catch validate({name, long_topic()}),
false = validate({name, <<"abc/#">>}), catch validate({name, <<"abc/#">>}),
false = validate({filter, <<"abc/#/1">>}), catch validate({filter, <<"abc/#/1">>}),
false = validate({filter, <<"abc/#xzy/+">>}), catch validate({filter, <<"abc/#xzy/+">>}),
false = validate({filter, <<"abc/xzy/+9827">>}), catch validate({filter, <<"abc/xzy/+9827">>}),
false = validate({filter, <<"sport/tennis#">>}), catch validate({filter, <<"sport/tennis#">>}),
false = validate({filter, <<"sport/tennis/#/ranking">>}). catch validate({filter, <<"sport/tennis/#/ranking">>}),
ok.
t_sigle_level_validate(_) -> t_sigle_level_validate(_) ->
true = validate({filter, <<"+">>}), true = validate({filter, <<"+">>}),
true = validate({filter, <<"+/tennis/#">>}), true = validate({filter, <<"+/tennis/#">>}),
true = validate({filter, <<"sport/+/player1">>}), true = validate({filter, <<"sport/+/player1">>}),
false = validate({filter, <<"sport+">>}). catch validate({filter, <<"sport+">>}),
ok.
t_triples(_) -> t_triples(_) ->
Triples = [{root,<<"a">>,<<"a">>}, Triples = [{root,<<"a">>,<<"a">>},
@ -199,11 +201,11 @@ long_topic() ->
iolist_to_binary([[integer_to_list(I), "/"] || I <- lists:seq(0, 10000)]). iolist_to_binary([[integer_to_list(I), "/"] || I <- lists:seq(0, 10000)]).
t_parse(_) -> t_parse(_) ->
?assertEqual({<<"a/b/+/#">>, []}, parse(<<"a/b/+/#">>)), ?assertEqual({<<"a/b/+/#">>, #{}}, parse(<<"a/b/+/#">>)),
?assertEqual({<<"topic">>, [{share, '$queue'}]}, parse(<<"$queue/topic">>)), ?assertEqual({<<"topic">>, #{ share => <<"$queue">> }}, parse(<<"$queue/topic">>)),
?assertEqual({<<"topic">>, [{share, <<"group">>}]}, parse(<<"$share/group/topic">>)), ?assertEqual({<<"topic">>, #{ share => <<"group">>}}, parse(<<"$share/group/topic">>)),
?assertEqual({<<"topic">>, [local]}, parse(<<"$local/topic">>)), ?assertEqual({<<"$local/topic">>, #{}}, parse(<<"$local/topic">>)),
?assertEqual({<<"topic">>, [{share, '$queue'}, local]}, parse(<<"$local/$queue/topic">>)), ?assertEqual({<<"$local/$queue/topic">>, #{}}, parse(<<"$local/$queue/topic">>)),
?assertEqual({<<"/a/b/c">>, [{share, <<"group">>}, local]}, parse(<<"$local/$share/group//a/b/c">>)), ?assertEqual({<<"$local/$share/group/a/b/c">>, #{}}, parse(<<"$local/$share/group/a/b/c">>)),
?assertEqual({<<"topic">>, [fastlane]}, parse(<<"$fastlane/topic">>)). ?assertEqual({<<"$fastlane/topic">>, #{}}, parse(<<"$fastlane/topic">>)).

View File

@ -1,18 +1,16 @@
%%%=================================================================== %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. %%
%%% %% Licensed under the Apache License, Version 2.0 (the "License");
%%% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License.
%%% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at
%%% You may obtain a copy of the License at %%
%%% %% http://www.apache.org/licenses/LICENSE-2.0
%%% http://www.apache.org/licenses/LICENSE-2.0 %%
%%% %% Unless required by applicable law or agreed to in writing, software
%%% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS,
%%% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and
%%% See the License for the specific language governing permissions and %% limitations under the License.
%%% limitations under the License.
%%%===================================================================
-module(emqx_trie_SUITE). -module(emqx_trie_SUITE).

View File

@ -1,5 +1,4 @@
%%-------------------------------------------------------------------- %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -12,7 +11,6 @@
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_vm_SUITE). -module(emqx_vm_SUITE).

32
test/emqx_zone_SUITE.erl Normal file
View File

@ -0,0 +1,32 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_zone_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
all() -> [t_set_get_env].
t_set_get_env(_) ->
{ok, _} = emqx_zone:start_link(),
ok = emqx_zone:set_env(china, language, chinese),
timer:sleep(100), % make sure set_env/3 is okay
chinese = emqx_zone:get_env(china, language),
cn470 = emqx_zone:get_env(china, ism_band, cn470),
undefined = emqx_zone:get_env(undefined, delay),
500 = emqx_zone:get_env(undefined, delay, 500).